#!/usr/bin/python # -*- coding: utf-8 -*- # __author__ = 'szhdanoff@gmail.com' __version__ = '1.0.6' import os import time import csv import phonenumbers # from apscheduler.schedulers.background import BackgroundScheduler from apscheduler import version as apscheduler_version from email_validator import validate_email, EmailNotValidError from dotenv import load_dotenv # local imports from dinect_api import get_user from dinect_api import new_user_by_card from dinect_api import bonuses_update load_dotenv() COUNTRY = os.getenv('COUNTRY', 'RU') DRY_RUN = bool(os.getenv('DRY_RUN', False)) csv_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'csv') print('Script version:', __version__, '| CSV path:"', csv_path, '"| COUNTRY:', COUNTRY, '| DRY_RUN:', DRY_RUN) def validate_phone(raw_phone): """ Validates a given phone number. Args: raw_phone (str): The phone number to be validated. Returns: str: The validated phone number in E164 format if valid, otherwise an empty string. """ raw_phone = raw_phone.strip() try: raw_phone = phonenumbers.parse(raw_phone, COUNTRY) if phonenumbers.is_valid_number(raw_phone): return phonenumbers.format_number(raw_phone, phonenumbers.PhoneNumberFormat.E164) else: return '' except phonenumbers.NumberParseException: return '' def run_import(): """ Imports users and transactions data from CSV files, validates the data, creates new users if necessary, and updates user bonuses. Parameters: None Returns: None """ files = [] # r=root, d=directories, f = files for r, d, f in os.walk(csv_path): for file in f: filename, file_extension = os.path.splitext(file) if file_extension == '.csv': files.append(os.path.join(r, file)) print('Scan .csv files: ', files) for f in files: log_file_name = f'{f}-{time.strftime("%Y%m%d-%H%M%S", time.localtime())}.log' print('Log file: ', log_file_name) with open(log_file_name, "w", encoding="utf-8") as log_file: with open(f, "r", encoding="utf-8") as csv_file: file_name = os.path.basename(f) print('Importing', file_name) # ===================================================== # USERS file # ===================================================== start_time = time.time() if 'users' in file_name: print(f'Processing "users" file: {f}') csv_reader = csv.reader(csv_file, delimiter=',') line_count = 0 for row in csv_reader: if line_count == 0: line_count += 1 else: line_count += 1 try: print(f'Processing line {line_count}: {row}') nickname, full_name, card, phone, email, gender = row print(nickname, full_name, card, phone, email, gender) log_file.write( f'String: [{line_count}]- {nickname}, {full_name}, {card}, {phone}, {email}, {gender}\n') # strip whitespaces nickname = nickname.strip() full_name = full_name.strip() card = card.strip() # validate phone phone = validate_phone(phone) if phone == '': print(f'error in line: [{line_count}]- Invalid phone number: {phone}') log_file.write(f'error in line: [{line_count}]- Invalid phone number: {phone}\n') # validate email email = email.strip() try: email_info = validate_email(email, check_deliverability=False) email = email_info.normalized except EmailNotValidError as e: print(f'error in line: [{line_count}]- Invalid email: {email}') log_file.write(f'error in line: [{line_count}]- Invalid email: {email}\n') email = None # set email if not found if email is None: email = f'{card}@user.dinect.com' # validate / set gender gender = gender.strip() if gender not in ['M', 'F']: gender = 1 else: if gender == 'M': gender = 1 else: gender = 2 except ValueError as e: ret = f'Unexpected error in line: [{line_count}] {repr(e)}' log_file.write(f'{ret}\n') # Find user via the API # by card user_found_by_card, user_id, user_card, purchases_url, data = get_user(card) print(f'user_found_by_card {card}', user_found_by_card) # by phone phone = phone.replace('+', '') if not user_found_by_card: user_found_by_phone, user_id, user_card, purchases_url, data = get_user(phone, get_type='phone') print(f'user_found_by_phone {phone}', user_found_by_phone) user_found = user_found_by_card or user_found_by_phone # create new user if not found if not user_found: # user_created, data = new_user( # full_name=nickname, phone=phone, gender=1, foreign_card=None, email=email, # ) user_created, data = new_user_by_card( full_name=nickname, phone=phone, gender=1, external_card=card, email=email, use_existing=False ) if user_created: print('User created with', data['ID'], data['DIN']) log_file.write(f'Line: [{line_count}]- User created with: {data}\n') else: print(f'error creating user in line: [{line_count}]- Invalid user data: {data}\n') log_file.write( f'error creating user in line: [{line_count}]- Invalid user data: {data}\n') else: log_file.write(f'Line: [{line_count}]- User found with: {data}\n') print('User found with ', user_card, data) # print('User found with', data['DIN'], user_card) end_time = time.time() print(f'Elapsed time of USERS file processing : {end_time - start_time} seconds') # ===================================================== # TRANSACTIONS files # ===================================================== start_time = time.time() if 'transaction' in file_name: print(f'Processing "transaction" file: {f}') csv_reader = csv.reader(csv_file, delimiter=',') line_count = 0 for row in csv_reader: if line_count == 0: line_count += 1 else: try: line_count += 1 print(f'Processing line {line_count}: {row}') user_id, card, phone, summ_total, summ_discount, sum_with_discount, bonus_amount, transaction_date, transaction_time, doc_id = row if card.strip() != '': user_found_by_card, din_id, _, _, _ = get_user(card, get_type='auto') if not user_found_by_card: print('----------------------------------------------------------------') print(f'error in line: [{line_count}]- Invalid card: {card}') log_file.write(f'error in line: [{line_count}]- Invalid card: {card}\n') else: print('User found with', user_id, card) else: user_found_by_card = False if validate_phone(phone) != '': phone = phone.replace('+', '') user_found_by_phone, din_id, _, _, _ = get_user(phone, get_type='phone') if not user_found_by_phone: print(f'error in line: [{line_count}]- Invalid phone: {phone}') log_file.write(f'error in line: [{line_count}]- Invalid phone: {phone}\n') else: print('User found with', din_id, phone) else: user_found_by_phone = False user_found = user_found_by_card or user_found_by_phone if user_found: if doc_id.strip() == '': doc_id = f'{file_name[0:20]}-{line_count}-{card}' else: doc_id = doc_id.strip() result, data = bonuses_update( user_id=din_id, summ_total=summ_total, bonus_amount=bonus_amount, doc_id=doc_id, sum_with_discount=sum_with_discount, sum_discount=summ_discount, dry_run=DRY_RUN ) if not result: print(f'ERROR in line: [{line_count}]- bonuses_update: {data}') log_file.write(f'error in line: [{line_count}]- bonuses_update: {data}\n') else: print(f'RESULT=OK, Bonuses updated, user_id={din_id}') else: print(f'error in line: [{line_count}]- Invalid user: {user_id}') log_file.write(f'error in line: [{line_count}]- Invalid user: {user_id}\n') except ValueError as e: ret = f'error in line: [{line_count}] {repr(e)}' log_file.write(f'{ret}\n') end_time = time.time() print(f'Elapsed time of TRANSACTIONS file processing : {end_time - start_time} seconds') csv_file.close() log_file.close() os.rename(f, f + '.' + time.strftime("%Y%m%d-%H%M%S") + '.old') scheduler = BackgroundScheduler() scheduler.start() try: job = scheduler.add_job( run_import, 'interval', minutes=1, max_instances=1, coalesce=True, misfire_grace_time=60 ) print('#######################################################################') print('# Import running every 1 minute. Place *.csv files in "csv" directory #') print('#######################################################################') print('Press Ctrl+{0} to exit'.format('Break' if apscheduler_version >= '3.0.0' else 'C')) # This is here to simulate application activity (which keeps the main thread alive). while True: time.sleep(10) except (KeyboardInterrupt, SystemExit): # Not strictly necessary if daemonic mode is enabled but should be done if possible scheduler.shutdown()