Files
bonus-import-tools/app.py
2025-01-21 09:09:26 +03:00

270 lines
13 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# __author__ = 'szhdanoff@gmail.com'
__version__ = '1.0.6'
import os
import time
import csv
import phonenumbers
#
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler import version as apscheduler_version
from email_validator import validate_email, EmailNotValidError
from dotenv import load_dotenv
# local imports
from dinect_api import get_user
from dinect_api import new_user_by_card
from dinect_api import bonuses_update
load_dotenv()
COUNTRY = os.getenv('COUNTRY', 'RU')
DRY_RUN = bool(os.getenv('DRY_RUN', False))
csv_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'csv')
print('Script version:', __version__, '| CSV path:"', csv_path, '"| COUNTRY:', COUNTRY, '| DRY_RUN:', DRY_RUN)
def validate_phone(raw_phone):
"""
Validates a given phone number.
Args:
raw_phone (str): The phone number to be validated.
Returns:
str: The validated phone number in E164 format if valid, otherwise an empty string.
"""
raw_phone = raw_phone.strip()
try:
raw_phone = phonenumbers.parse(raw_phone, COUNTRY)
if phonenumbers.is_valid_number(raw_phone):
return phonenumbers.format_number(raw_phone, phonenumbers.PhoneNumberFormat.E164)
else:
return ''
except phonenumbers.NumberParseException:
return ''
def run_import():
"""
Runs the import process of given CSV files.
This function is responsible for running the import process of given CSV files located in the 'csv' directory.
It will import all CSV files it finds in the directory and its subdirectories.
"""
files = []
# r=root, d=directories, f = files
for r, d, f in os.walk(csv_path):
for file in f:
filename, file_extension = os.path.splitext(file)
if file_extension == '.csv':
files.append(os.path.join(r, file))
print('Scan .csv files: ', files)
for f in files:
log_file_name = f'{f}-{time.strftime("%Y%m%d-%H%M%S", time.localtime())}.log'
print('Log file: ', log_file_name)
with open(log_file_name, "w", encoding="utf-8") as log_file:
with open(f, "r", encoding="utf-8") as csv_file:
file_name = os.path.basename(f)
print('Importing', file_name)
# =====================================================
# USERS file
# =====================================================
start_time = time.time()
if 'users' in file_name:
print(f'Processing "users" file: {f}')
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
if line_count == 0:
line_count += 1
else:
line_count += 1
try:
print(f'Processing line {line_count}: {row}')
nickname, full_name, card, phone, email, gender = row
print(nickname, full_name, card, phone, email, gender)
log_file.write(
f'String: [{line_count}]- {nickname}, {full_name}, {card}, {phone}, {email}, {gender}\n')
# strip whitespaces
nickname = nickname.strip()
full_name = full_name.strip()
card = card.strip()
# validate phone
phone = validate_phone(phone)
if phone == '':
print(f'error in line: [{line_count}]- Invalid phone number: {phone}')
log_file.write(f'error in line: [{line_count}]- Invalid phone number: {phone}\n')
# validate email
email = email.strip()
try:
email_info = validate_email(email, check_deliverability=False)
email = email_info.normalized
except EmailNotValidError as e:
print(f'error in line: [{line_count}]- Invalid email: {email}')
log_file.write(f'error in line: [{line_count}]- Invalid email: {email}\n')
email = None
# set email if not found
if email is None:
email = f'{card}@user.dinect.com'
# validate / set gender
gender = gender.strip()
if gender not in ['M', 'F']:
gender = 1
else:
if gender == 'M':
gender = 1
else:
gender = 2
except ValueError as e:
ret = f'Unexpected error in line: [{line_count}] {repr(e)}'
log_file.write(f'{ret}\n')
# Find user via the API
# by card
user_found_by_card, user_id, user_card, purchases_url, data = get_user(card)
print(f'user_found_by_card {card}', user_found_by_card)
# by phone
phone = phone.replace('+', '')
if not user_found_by_card:
user_found_by_phone, user_id, user_card, purchases_url, data = get_user(phone,
get_type='phone')
print(f'user_found_by_phone {phone}', user_found_by_phone)
user_found = user_found_by_card or user_found_by_phone
# create new user if not found
if not user_found:
# user_created, data = new_user(
# full_name=nickname, phone=phone, gender=1, foreign_card=None, email=email,
# )
user_created, data = new_user_by_card(
full_name=nickname, phone=phone, gender=1, external_card=card, email=email,
use_existing=False
)
if user_created:
print('User created with', data['ID'], data['DIN'])
log_file.write(f'Line: [{line_count}]- User created with: {data}\n')
else:
print(f'error creating user in line: [{line_count}]- Invalid user data: {data}\n')
log_file.write(
f'error creating user in line: [{line_count}]- Invalid user data: {data}\n')
else:
log_file.write(f'Line: [{line_count}]- User found with: {data}\n')
print('User found with ', user_card, data)
# print('User found with', data['DIN'], user_card)
end_time = time.time()
print(f'Elapsed time of USERS file processing : {end_time - start_time} seconds')
# =====================================================
# TRANSACTIONS files
# =====================================================
start_time = time.time()
if 'transaction' in file_name:
print(f'Processing "transaction" file: {f}')
csv_reader = csv.reader(csv_file, delimiter=',')
line_count = 0
for row in csv_reader:
if line_count == 0:
line_count += 1
else:
try:
line_count += 1
print(f'Processing line {line_count}: {row}')
user_id, card, phone, summ_total, summ_discount, sum_with_discount, bonus_amount, transaction_date, transaction_time, doc_id = row
if card.strip() != '':
user_found_by_card, din_id, _, _, _ = get_user(card, get_type='auto')
if not user_found_by_card:
print('----------------------------------------------------------------')
print(f'error in line: [{line_count}]- Invalid card: {card}')
log_file.write(f'error in line: [{line_count}]- Invalid card: {card}\n')
else:
print('User found with', user_id, card)
else:
user_found_by_card = False
if validate_phone(phone) != '':
phone = phone.replace('+', '')
user_found_by_phone, din_id, _, _, _ = get_user(phone, get_type='phone')
if not user_found_by_phone:
print(f'error in line: [{line_count}]- Invalid phone: {phone}')
log_file.write(f'error in line: [{line_count}]- Invalid phone: {phone}\n')
else:
print('User found with', din_id, phone)
else:
user_found_by_phone = False
user_found = user_found_by_card or user_found_by_phone
if user_found:
if doc_id.strip() == '':
doc_id = f'{file_name[0:20]}-{line_count}-{card}'
else:
doc_id = doc_id.strip()
result, data = bonuses_update(
user_id=din_id,
summ_total=summ_total,
bonus_amount=bonus_amount,
doc_id=doc_id,
sum_with_discount=sum_with_discount,
sum_discount=summ_discount,
dry_run=DRY_RUN
)
if not result:
print(f'ERROR in line: [{line_count}]- bonuses_update: {data}')
log_file.write(f'error in line: [{line_count}]- bonuses_update: {data}\n')
else:
print(f'RESULT=OK, Bonuses updated, user_id={din_id}')
else:
print(f'error in line: [{line_count}]- Invalid user: {user_id}')
log_file.write(f'error in line: [{line_count}]- Invalid user: {user_id}\n')
except ValueError as e:
ret = f'error in line: [{line_count}] {repr(e)}'
log_file.write(f'{ret}\n')
end_time = time.time()
print(f'Elapsed time of TRANSACTIONS file processing : {end_time - start_time} seconds')
csv_file.close()
log_file.close()
os.rename(f, f + '.' + time.strftime("%Y%m%d-%H%M%S") + '.old')
scheduler = BackgroundScheduler()
scheduler.start()
try:
job = scheduler.add_job(
run_import, 'interval', minutes=1, max_instances=1, coalesce=True, misfire_grace_time=60
)
print('#######################################################################')
print('# Import running every 1 minute. Place *.csv files in "csv" directory #')
print('#######################################################################')
print('Press Ctrl+{0} to exit'.format('Break' if apscheduler_version >= '3.0.0' else 'C'))
# This is here to simulate application activity (which keeps the main thread alive).
while True:
time.sleep(10)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
scheduler.shutdown()