import io import os import traceback import pathlib import json from PIL import Image, ExifTags from django.core.files.uploadedfile import TemporaryUploadedFile from django.utils import timezone from datetime import datetime from fwd import settings from ..utils import s3 as S3Util from fwd_api.constant.common import allowed_file_extensions from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \ ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException, RequiredColumnException from fwd_api.models import SubscriptionRequest, OcrTemplate, FeedbackRequest, SubscriptionRequestFile, Report, ReportFile from fwd_api.utils import process as ProcessUtil from fwd_api.utils.crypto import image_authenticator from fwd_api.utils.image import resize from ..celery_worker.client_connector import c_connector import imagesize import csv from openpyxl import load_workbook from openpyxl.styles import Font, Border, Side, PatternFill, NamedStyle s3_client = S3Util.MinioS3Client( endpoint=settings.S3_ENDPOINT, access_key=settings.S3_ACCESS_KEY, secret_key=settings.S3_SECRET_KEY, bucket_name=settings.S3_BUCKET_NAME ) def convert_date_string(date_string): # Parse the input date string date_format = "%Y-%m-%d %H:%M:%S.%f %z" parsed_date = datetime.strptime(date_string, date_format) # Format the date as "YYYYMMDD" formatted_date = parsed_date.strftime("%Y%m%d") return formatted_date def validate_report_list(request): start_date_str = request.GET.get('start_date') end_date_str = request.GET.get('end_date') page_number = int(request.GET.get('page', 0)) page_size = int(request.GET.get('page_size', 10)) report_id = request.GET.get('report_id', None) validated_data = {} validated_data["start_date"] = None validated_data["end_date"] = None if len(start_date_str) > 0 and len(end_date_str) > 0: try: validated_data["start_date"] = timezone.datetime.strptime(start_date_str, '%Y-%m-%dT%H:%M:%S%z') validated_data["end_date"] = timezone.datetime.strptime(end_date_str, '%Y-%m-%dT%H:%M:%S%z') except ValueError: raise InvalidException(excArgs="Date format") validated_data["report_id"] = report_id validated_data["page_size"] = page_size validated_data["page_number"] = page_number if validated_data["report_id"] is None and validated_data["start_date"] is None: raise RequiredFieldException(excArgs="report_id, start_date, end_date") return validated_data def validate_feedback_file(csv_file_path): required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli'] missing_columns = [] with open(csv_file_path, 'r') as file: reader = csv.DictReader(file) # Check if all required columns are present for column in required_columns: if column not in reader.fieldnames: missing_columns.append(column) if missing_columns: raise RequiredColumnException(excArgs=str(missing_columns)) def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"): total_file_size = 0 if len(files) < min_file_num: raise RequiredFieldException(excArgs=file_field) if len(files) > max_file_num: raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), '')) for f in files: if not isinstance(f, TemporaryUploadedFile): # print(f'[DEBUG]: {f.name}') raise InvalidException(excArgs="files") extension = f.name.split(".")[-1].lower() in allowed_file_extensions if not extension or "." not in f.name: raise FileFormatInvalidException(excArgs=list(allowed_file_extensions)) if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE: raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) total_file_size += f.size if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST: raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"): total_file_size = 0 if len(files) < min_file_num: raise RequiredFieldException(excArgs=file_field) if len(files) > max_file_num: raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), '')) for f in files: if not isinstance(f, TemporaryUploadedFile): # print(f'[DEBUG]: {f.name}') raise InvalidException(excArgs="files") extension = f.name.split(".")[-1].lower() in ["csv"] if not extension or "." not in f.name: raise FileFormatInvalidException(excArgs=[".csv"]) if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE: raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) total_file_size += f.size if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST: raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) def get_file(file_path: str): try: return open(file_path, 'rb') except Exception as e: print(e) raise GeneralException("System") def get_template_folder_path(tem: OcrTemplate): tem_id = str(tem.id) sub_id = str(tem.subscription.id) user_id = str(tem.subscription.user.id) return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, "templates", tem_id) def get_folder_path(rq: SubscriptionRequest): from celery.utils.log import get_task_logger logger = get_task_logger(__name__) request_id = str(rq.request_id) logger.info(f"[DEBUG]: rq.process_type: {rq.process_type}") p_type = ProcessUtil.map_process_type_to_folder_name(int(rq.process_type)) sub_id = str(rq.subscription.id) user_id = str(rq.subscription.user.id) return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id) def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) file_path = os.path.join(folder_path, file_name) with open(file_path, 'wb+') as w: w.write(file_bytes) return file_path def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) file_path = os.path.join(folder_path, file_name) f = open(file_path, 'wb+') for chunk in file.chunks(): f.write(chunk) f.close() return file_path def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) file_path = os.path.join(folder_path, file_name) with open(file_path, "w") as json_file: json.dump(data, json_file) return file_path def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict): user_id = str(rq.subscription.user.id) feedback_id = str(rq.id) folder_path = os.path.join(settings.MEDIA_ROOT, 'users', user_id, "feedbacks", feedback_id) os.makedirs(folder_path, exist_ok = True) file_path = os.path.join(folder_path, file_name) with uploaded_file.open() as file: # Read the contents of the file file_contents = file.read().decode('utf-8') with open(file_path, 'w', newline='') as csvfile: csvfile.write(file_contents) return file_path def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""): report_id = str(rp.report_id) if not prefix: folder_path = os.path.join(settings.MEDIA_ROOT, "report", report_id) else: folder_path = os.path.join(settings.MEDIA_ROOT, "report", prefix) os.makedirs(folder_path, exist_ok = True) file_path = os.path.join(folder_path, file_name) workbook.save(file_path) return file_path def delete_file_with_path(file_path: str) -> bool: try: os.remove(file_path) return True except Exception as e: print(e) return False def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality): try: folder_path = get_template_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) return save_file_with_path(file_name, file, quality, folder_path) except Exception as e: print(e) raise ServiceUnavailableException() def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path: str): try: file_path = os.path.join(folder_path, file_name) extension = file_name.split(".")[-1] if extension.lower() == "pdf": save_pdf(file_path, file) else: save_img(file_path, file, quality) except InvalidDecompressedSizeException as e: raise e except Exception as e: print(e) raise ServiceUnavailableException() return file_path def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality: int): try: folder_path = get_folder_path(rq) pathlib.Path(folder_path).mkdir(exist_ok=True, parents=True) # request_file: SubscriptionRequestFile = SubscriptionRequestFile( # file_name = file_name, # file_path = os.path.join(folder_path, file_name), # doc_type = doc_type, # origin_name = file.name, # request = rq, # index_in_request= index_in_request # ) # request_file.save() return save_file_with_path(file_name, file, quality, folder_path) except InvalidDecompressedSizeException as e: raise e except Exception as e: print(f"[ERROR]: {e}") raise ServiceUnavailableException() def save_to_S3(file_name, rq, local_file_path): try: file_path = get_folder_path(rq) request_id = rq.request_id assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id" s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name) c_connector.upload_file_to_s3((local_file_path, s3_key, request_id)) c_connector.remove_local_file((local_file_path, request_id)) return s3_key except Exception as e: print(f"[ERROR]: {e}") raise ServiceUnavailableException() def save_feedback_to_S3(file_name, id, local_file_path): try: # print(f"[DEBUG]: Uploading feedback to S3 with local path {local_file_path}, id: {id}, file_name: {file_name}") assert len(local_file_path.split("/")) >= 3, "file_path must have at least feedback_folder and feedback_id" # s3_key = os.path.join(local_file_path.split("/")[-3], local_file_path.split("/")[-2], file_name) s3_key = os.path.join("feedback", local_file_path.split("/")[-2], file_name) # print(f"[DEBUG]: Uploading feedback to S3 with s3_key {s3_key}") c_connector.upload_feedback_to_s3((local_file_path, s3_key, id)) c_connector.remove_local_file((local_file_path, id)) return s3_key except Exception as e: print(f"[ERROR]: {e}") raise ServiceUnavailableException() def save_report_to_S3(id, local_file_path): try: s3_key = os.path.join("report", local_file_path.split("/")[-2], local_file_path.split("/")[-1]) c_connector.upload_report_to_s3((local_file_path, s3_key, id)) c_connector.remove_local_file((local_file_path, id)) return s3_key except Exception as e: print(f"[ERROR]: {e}") raise ServiceUnavailableException() def download_from_S3(s3_key, local_file_path): s3_client.download_file(s3_key, local_file_path) def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path): try: file_path = os.path.join(folder_path, file_name) extension = file_name.split(".")[-1] if extension in ['pdf', 'PDF']: save_pdf(file_path, file) else: save_img(file_path, file, quality) except InvalidDecompressedSizeException as e: raise e except Exception as e: print(e) raise ServiceUnavailableException() return file_path def save_pdf(file_path: str, file: TemporaryUploadedFile): f = open(file_path, 'wb+') for chunk in file.chunks(): f.write(chunk) f.close() def save_img(file_path: str, file: TemporaryUploadedFile, quality): with open(file.temporary_file_path(), "rb") as fs: input_file = io.BytesIO(fs.read()) width, height = imagesize.get(input_file) if width > settings.MAX_PIXEL_IN_A_FILE or height > settings.MAX_PIXEL_IN_A_FILE: raise InvalidDecompressedSizeException(excArgs=(str(width), str(height), str(settings.MAX_PIXEL_IN_A_FILE))) with open(file.temporary_file_path(), "rb") as fs: input_file = io.BytesIO(fs.read()) image = Image.open(input_file) # read orient from metadata. WindowsPhoto keep the origin for orientation in ExifTags.TAGS.keys(): if ExifTags.TAGS[orientation] == 'Orientation': break try: e = image._getexif() # returns None if no EXIF data if e: exif = dict(e.items()) if orientation in exif: orientation = exif[orientation] if orientation == 3: image = image.transpose(Image.ROTATE_180) elif orientation == 6: image = image.transpose(Image.ROTATE_270) elif orientation == 8: image = image.transpose(Image.ROTATE_90) except Exception as ex: print(ex) print("Rotation Error") traceback.print_exc() image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1]) image = image.convert('RGB') image.save(file_path, optimize=True, quality=quality) def build_media_url(folder: str, uid: str, file_name: str = None) -> str: token = image_authenticator.generate_img_token() if not file_name: return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=uid, base_url=settings.BASE_URL, token=token) return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder, uid=uid, file_name=file_name, base_url=settings.BASE_URL, token=token) def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str: token = image_authenticator.generate_img_token(user_id) if not file_name: return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=data_id, base_url=settings.BASE_URL, token=token) return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder, uid=data_id, file_name=file_name, base_url=settings.BASE_URL, token=token) def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str: token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id) return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}' def get_value(_dict, keys): keys = keys.split('.') value = _dict try: for key in keys: if not key in value.keys(): return "-" else: value = value.get(key, {}) except Exception as e: print(f"[ERROR]: {e}") print(f"[ERROR]: value: {value}") print(f"[ERROR]: keys: {keys}") if not value: return "-" elif isinstance(value, list): value = str(value) return value def dict2xlsx(input: json, _type='report'): red = "FF0000" black = "000000" green = "E2EFDA" yellow = "FFF2CC" gray = "D0CECE" font_black = Font(name="Calibri", size=11, color=black) font_black_bold = Font(name="Calibri", size=11, color=black, bold=True) font_red = Font(name="Calibri", size=11, color=red) thin = Side(border_style="thin", color=black) border = Border(left=thin, right=thin, top=thin, bottom=thin) fill_green = PatternFill(start_color=green, end_color=green, fill_type = "solid") fill_yellow = PatternFill(start_color=yellow, end_color=yellow, fill_type = "solid") fill_gray = PatternFill(start_color=gray, end_color=gray, fill_type = "solid") normal_cell = NamedStyle(name="normal_cell", font=font_black, border=border) normal_cell_red = NamedStyle(name="normal_cell_red", font=font_red, border=border) if _type == 'report': wb = load_workbook(filename = 'report.xlsx') ws = wb['Sheet1'] mapping = { 'A': 'subs', 'B': 'extraction_date', 'C': 'num_imei', 'D': 'num_invoice', 'E': 'total_images', 'F': 'images_quality.successful', 'G': 'images_quality.successful_percent', 'H': 'images_quality.bad', 'I': 'images_quality.bad_percent', 'J': 'average_accuracy_rate.imei', 'K': 'average_accuracy_rate.purchase_date', 'L': 'average_accuracy_rate.retailer_name', 'M': 'average_processing_time.imei', 'N': 'average_processing_time.invoice', 'O': 'usage.imei', 'P': 'usage.invoice', } start_index = 5 elif _type == 'report_detail': wb = load_workbook(filename = 'report_detail.xlsx') ws = wb['Sheet1'] mapping = { 'A': 'request_id', 'B': 'redemption_number', 'C': 'image_type', 'D': 'imei_user_submitted', 'E': "imei_ocr_retrieved", 'F': "imei1_accuracy", 'G': "invoice_purchase_date_consumer", 'H': "invoice_purchase_date_ocr", 'I': "invoice_purchase_date_accuracy", 'J': "invoice_retailer_consumer", 'K': "invoice_retailer_ocr", 'L': "invoice_retailer_accuracy", 'M': "ocr_image_accuracy", 'N': "ocr_image_speed", 'O': "is_reviewed", 'P': "bad_image_reasons", 'Q': "countermeasures", 'R': 'imei_revised_accuracy', 'S': 'purchase_date_revised_accuracy', 'T': 'retailer_revised_accuracy', } start_index = 4 for subtotal in input: for key_index, key in enumerate(mapping.keys()): value = get_value(subtotal, mapping[key]) ws[key + str(start_index)] = value ws[key + str(start_index)].border = border if _type == 'report': if subtotal['subs'] == '+': ws[key + str(start_index)].font = font_black_bold if key_index == 0 or (key_index >= 9 and key_index <= 15): ws[key + str(start_index)].fill = fill_gray elif key_index == 1: ws[key + str(start_index)].fill = fill_green elif key_index >= 4 and key_index <= 8: ws[key + str(start_index)].fill = fill_yellow else: if 'average_accuracy_rate' in mapping[key] and type(value) in [int, float] and value < 95: ws[key + str(start_index)].style = normal_cell_red elif 'average_processing_time' in mapping[key] and type(value) in [int, float] and value > 2.0: ws[key + str(start_index)].style = normal_cell_red elif 'bad_percent' in mapping[key] and type(value) in [int, float] and value > 10: ws[key + str(start_index)].style = normal_cell_red else : ws[key + str(start_index)].style = normal_cell elif _type == 'report_detail': if 'accuracy' in mapping[key] and type(value) in [int, float] and value < 75: ws[key + str(start_index)].style = normal_cell_red elif 'speed' in mapping[key] and type(value) in [int, float] and value > 2.0: ws[key + str(start_index)].style = normal_cell_red else: ws[key + str(start_index)].style = normal_cell start_index += 1 return wb