#!/usr/bin/python # -*- coding: utf-8 -*- # __author__ = 'szhdanoff@gmail.com' __version__ = '1.0.6' import os import time import csv import phonenumbers # from apscheduler.schedulers.background import BackgroundScheduler from apscheduler import version as apscheduler_version from email_validator import validate_email, EmailNotValidError from dotenv import load_dotenv # local imports from dinect_api import get_user from dinect_api import new_user_by_card from dinect_api import bonuses_update load_dotenv() COUNTRY = os.getenv('COUNTRY', 'RU') DRY_RUN = bool(os.getenv('DRY_RUN', False)) csv_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'csv') print('Script version:', __version__, '| CSV path:"', csv_path, '"| COUNTRY:', COUNTRY, '| DRY_RUN:', DRY_RUN) def validate_phone(raw_phone): """ Validates a given phone number. Args: raw_phone (str): The phone number to be validated. Returns: str: The validated phone number in E164 format if valid, otherwise an empty string. """ raw_phone = raw_phone.strip() try: raw_phone = phonenumbers.parse(raw_phone, COUNTRY) if phonenumbers.is_valid_number(raw_phone): return phonenumbers.format_number(raw_phone, phonenumbers.PhoneNumberFormat.E164) else: return '' except phonenumbers.NumberParseException: return '' def run_import(): """ Runs the import process of given CSV files. This function is responsible for running the import process of given CSV files located in the 'csv' directory. It will import all CSV files it finds in the directory and its subdirectories. """ files = [] # r=root, d=directories, f = files for r, d, f in os.walk(csv_path): for file in f: filename, file_extension = os.path.splitext(file) if file_extension == '.csv': files.append(os.path.join(r, file)) print('Scan .csv files: ', files) for f in files: log_file_name = f'{f}-{time.strftime("%Y%m%d-%H%M%S", time.localtime())}.log' print('Log file: ', log_file_name) with open(log_file_name, "w", encoding="utf-8") as log_file: with open(f, "r", encoding="utf-8") as csv_file: file_name = os.path.basename(f) print('Importing', file_name) # ===================================================== # USERS file # ===================================================== start_time = time.time() if 'users' in file_name: print(f'Processing "users" file: {f}') csv_reader = csv.reader(csv_file, delimiter=',') line_count = 0 for row in csv_reader: if line_count == 0: line_count += 1 else: line_count += 1 try: print(f'Processing line {line_count}: {row}') nickname, full_name, card, phone, email, gender = row print(nickname, full_name, card, phone, email, gender) log_file.write( f'String: [{line_count}]- {nickname}, {full_name}, {card}, {phone}, {email}, {gender}\n') # strip whitespaces nickname = nickname.strip() full_name = full_name.strip() card = card.strip() # validate phone phone = validate_phone(phone) if phone == '': print(f'error in line: [{line_count}]- Invalid phone number: {phone}') log_file.write(f'error in line: [{line_count}]- Invalid phone number: {phone}\n') # validate email email = email.strip() try: email_info = validate_email(email, check_deliverability=False) email = email_info.normalized except EmailNotValidError as e: print(f'error in line: [{line_count}]- Invalid email: {email}') log_file.write(f'error in line: [{line_count}]- Invalid email: {email}\n') email = None # set email if not found if email is None: email = f'{card}@user.dinect.com' # validate / set gender gender = gender.strip() if gender not in ['M', 'F']: gender = 1 else: if gender == 'M': gender = 1 else: gender = 2 except ValueError as e: ret = f'Unexpected error in line: [{line_count}] {repr(e)}' log_file.write(f'{ret}\n') # Find user via the API # by card user_found_by_card, user_id, user_card, purchases_url, data = get_user(card) print(f'user_found_by_card {card}', user_found_by_card) # by phone phone = phone.replace('+', '') if not user_found_by_card: user_found_by_phone, user_id, user_card, purchases_url, data = get_user(phone, get_type='phone') print(f'user_found_by_phone {phone}', user_found_by_phone) user_found = user_found_by_card or user_found_by_phone # create new user if not found if not user_found: # user_created, data = new_user( # full_name=nickname, phone=phone, gender=1, foreign_card=None, email=email, # ) user_created, data = new_user_by_card( full_name=nickname, phone=phone, gender=1, external_card=card, email=email, use_existing=False ) if user_created: print('User created with', data['ID'], data['DIN']) log_file.write(f'Line: [{line_count}]- User created with: {data}\n') else: print(f'error creating user in line: [{line_count}]- Invalid user data: {data}\n') log_file.write( f'error creating user in line: [{line_count}]- Invalid user data: {data}\n') else: log_file.write(f'Line: [{line_count}]- User found with: {data}\n') print('User found with ', user_card, data) # print('User found with', data['DIN'], user_card) end_time = time.time() print(f'Elapsed time of USERS file processing : {end_time - start_time} seconds') # ===================================================== # TRANSACTIONS files # ===================================================== start_time = time.time() if 'transaction' in file_name: print(f'Processing "transaction" file: {f}') csv_reader = csv.reader(csv_file, delimiter=',') line_count = 0 for row in csv_reader: if line_count == 0: line_count += 1 else: try: line_count += 1 print(f'Processing line {line_count}: {row}') user_id, card, phone, summ_total, summ_discount, sum_with_discount, bonus_amount, transaction_date, transaction_time, doc_id = row if card.strip() != '': user_found_by_card, din_id, _, _, _ = get_user(card, get_type='auto') if not user_found_by_card: print('----------------------------------------------------------------') print(f'error in line: [{line_count}]- Invalid card: {card}') log_file.write(f'error in line: [{line_count}]- Invalid card: {card}\n') else: print('User found with', user_id, card) else: user_found_by_card = False if validate_phone(phone) != '': phone = phone.replace('+', '') user_found_by_phone, din_id, _, _, _ = get_user(phone, get_type='phone') if not user_found_by_phone: print(f'error in line: [{line_count}]- Invalid phone: {phone}') log_file.write(f'error in line: [{line_count}]- Invalid phone: {phone}\n') else: print('User found with', din_id, phone) else: user_found_by_phone = False user_found = user_found_by_card or user_found_by_phone if user_found: if doc_id.strip() == '': doc_id = f'{file_name[0:20]}-{line_count}-{card}' else: doc_id = doc_id.strip() result, data = bonuses_update( user_id=din_id, summ_total=summ_total, bonus_amount=bonus_amount, doc_id=doc_id, sum_with_discount=sum_with_discount, sum_discount=summ_discount, dry_run=DRY_RUN ) if not result: print(f'ERROR in line: [{line_count}]- bonuses_update: {data}') log_file.write(f'error in line: [{line_count}]- bonuses_update: {data}\n') else: print(f'RESULT=OK, Bonuses updated, user_id={din_id}') else: print(f'error in line: [{line_count}]- Invalid user: {user_id}') log_file.write(f'error in line: [{line_count}]- Invalid user: {user_id}\n') except ValueError as e: ret = f'error in line: [{line_count}] {repr(e)}' log_file.write(f'{ret}\n') end_time = time.time() print(f'Elapsed time of TRANSACTIONS file processing : {end_time - start_time} seconds') csv_file.close() log_file.close() os.rename(f, f + '.' + time.strftime("%Y%m%d-%H%M%S") + '.old') scheduler = BackgroundScheduler() scheduler.start() try: job = scheduler.add_job( run_import, 'interval', minutes=1, max_instances=1, coalesce=True, misfire_grace_time=60 ) print('#######################################################################') print('# Import running every 1 minute. Place *.csv files in "csv" directory #') print('#######################################################################') print('Press Ctrl+{0} to exit'.format('Break' if apscheduler_version >= '3.0.0' else 'C')) # This is here to simulate application activity (which keeps the main thread alive). while True: time.sleep(10) except (KeyboardInterrupt, SystemExit): # Not strictly necessary if daemonic mode is enabled but should be done if possible scheduler.shutdown()