sbt-idp/cope2n-api/fwd_api/utils/file.py
2024-02-15 15:23:42 +07:00

538 lines
23 KiB
Python

import io
import os
import traceback
import pathlib
import json
from PIL import Image, ExifTags
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.utils import timezone
from datetime import datetime
from fwd import settings
from ..utils import s3 as S3Util
from fwd_api.constant.common import allowed_file_extensions
from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \
ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException, RequiredColumnException
from fwd_api.models import SubscriptionRequest, OcrTemplate, FeedbackRequest, SubscriptionRequestFile, Report, ReportFile
from fwd_api.utils import process as ProcessUtil
from fwd_api.utils.crypto import image_authenticator
from fwd_api.utils.image import resize
from ..celery_worker.client_connector import c_connector
import imagesize
import csv
from openpyxl import load_workbook
from openpyxl.styles import Font, Border, Side, PatternFill, NamedStyle, numbers
s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME
)
def convert_date_string(date_string):
# Parse the input date string
date_format = "%Y-%m-%d %H:%M:%S.%f %z"
parsed_date = datetime.strptime(date_string, date_format)
# Format the date as "YYYYMMDD"
formatted_date = parsed_date.strftime("%Y%m%d")
return formatted_date
def validate_report_list(request):
start_date_str = request.GET.get('start_date')
end_date_str = request.GET.get('end_date')
page_number = int(request.GET.get('page', 0))
page_size = int(request.GET.get('page_size', 10))
report_id = request.GET.get('report_id', None)
validated_data = {}
validated_data["start_date"] = None
validated_data["end_date"] = None
if len(start_date_str) > 0 and len(end_date_str) > 0:
try:
validated_data["start_date"] = timezone.datetime.strptime(start_date_str, '%Y-%m-%dT%H:%M:%S%z')
validated_data["end_date"] = timezone.datetime.strptime(end_date_str, '%Y-%m-%dT%H:%M:%S%z')
except ValueError:
raise InvalidException(excArgs="Date format")
validated_data["report_id"] = report_id
validated_data["page_size"] = page_size
validated_data["page_number"] = page_number
if validated_data["report_id"] is None and validated_data["start_date"] is None:
raise RequiredFieldException(excArgs="report_id, start_date, end_date")
return validated_data
def validate_feedback_file(csv_file_path):
required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli']
missing_columns = []
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
# Check if all required columns are present
for column in required_columns:
if column not in reader.fieldnames:
missing_columns.append(column)
if missing_columns:
raise RequiredColumnException(excArgs=str(missing_columns))
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
# print(f'[DEBUG]: {f.name}')
raise InvalidException(excArgs="files")
extension = f.name.split(".")[-1].lower() in allowed_file_extensions
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=list(allowed_file_extensions))
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
# print(f'[DEBUG]: {f.name}')
raise InvalidException(excArgs="files")
extension = f.name.split(".")[-1].lower() in ["csv"]
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=[".csv"])
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def get_file(file_path: str):
try:
return open(file_path, 'rb')
except Exception as e:
print(e)
raise GeneralException("System")
def get_template_folder_path(tem: OcrTemplate):
tem_id = str(tem.id)
sub_id = str(tem.subscription.id)
user_id = str(tem.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, "templates", tem_id)
def get_folder_path(rq: SubscriptionRequest):
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
request_id = str(rq.request_id)
logger.info(f"[DEBUG]: rq.process_type: {rq.process_type}")
p_type = ProcessUtil.map_process_type_to_folder_name(int(rq.process_type))
sub_id = str(rq.subscription.id)
user_id = str(rq.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id)
def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, 'wb+') as w:
w.write(file_bytes)
return file_path
def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
return file_path
def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, "w") as json_file:
json.dump(data, json_file)
return file_path
def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict):
user_id = str(rq.subscription.user.id)
feedback_id = str(rq.id)
folder_path = os.path.join(settings.MEDIA_ROOT, 'users', user_id, "feedbacks", feedback_id)
os.makedirs(folder_path, exist_ok = True)
file_path = os.path.join(folder_path, file_name)
with uploaded_file.open() as file:
# Read the contents of the file
file_contents = file.read().decode('utf-8')
with open(file_path, 'w', newline='') as csvfile:
csvfile.write(file_contents)
return file_path
def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""):
report_id = str(rp.report_id)
if not prefix:
folder_path = os.path.join(settings.MEDIA_ROOT, "report", report_id)
else:
folder_path = os.path.join(settings.MEDIA_ROOT, "report", prefix)
os.makedirs(folder_path, exist_ok = True)
file_path = os.path.join(folder_path, file_name)
workbook.save(file_path)
return file_path
def delete_file_with_path(file_path: str) -> bool:
try:
os.remove(file_path)
return True
except Exception as e:
print(e)
return False
def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality):
try:
folder_path = get_template_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
return save_file_with_path(file_name, file, quality, folder_path)
except Exception as e:
print(e)
raise ServiceUnavailableException()
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path: str):
try:
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
if extension.lower() == "pdf":
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
print(e)
raise ServiceUnavailableException()
return file_path
def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality: int):
try:
folder_path = get_folder_path(rq)
pathlib.Path(folder_path).mkdir(exist_ok=True, parents=True)
# request_file: SubscriptionRequestFile = SubscriptionRequestFile(
# file_name = file_name,
# file_path = os.path.join(folder_path, file_name),
# doc_type = doc_type,
# origin_name = file.name,
# request = rq,
# index_in_request= index_in_request
# )
# request_file.save()
return save_file_with_path(file_name, file, quality, folder_path)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def save_to_S3(file_name, rq, local_file_path):
try:
file_path = get_folder_path(rq)
request_id = rq.request_id
assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id"
s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name)
c_connector.upload_file_to_s3((local_file_path, s3_key, request_id))
c_connector.remove_local_file((local_file_path, request_id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def save_feedback_to_S3(file_name, id, local_file_path):
try:
# print(f"[DEBUG]: Uploading feedback to S3 with local path {local_file_path}, id: {id}, file_name: {file_name}")
assert len(local_file_path.split("/")) >= 3, "file_path must have at least feedback_folder and feedback_id"
# s3_key = os.path.join(local_file_path.split("/")[-3], local_file_path.split("/")[-2], file_name)
s3_key = os.path.join("feedback", local_file_path.split("/")[-2], file_name)
# print(f"[DEBUG]: Uploading feedback to S3 with s3_key {s3_key}")
c_connector.upload_feedback_to_s3((local_file_path, s3_key, id))
c_connector.remove_local_file((local_file_path, id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def save_report_to_S3(id, local_file_path):
try:
s3_key = os.path.join("report", local_file_path.split("/")[-2], local_file_path.split("/")[-1])
c_connector.upload_report_to_s3((local_file_path, s3_key, id))
c_connector.remove_local_file((local_file_path, id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def download_from_S3(s3_key, local_file_path):
s3_client.download_file(s3_key, local_file_path)
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
try:
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
if extension in ['pdf', 'PDF']:
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
print(e)
raise ServiceUnavailableException()
return file_path
def save_pdf(file_path: str, file: TemporaryUploadedFile):
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
def save_img(file_path: str, file: TemporaryUploadedFile, quality):
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
width, height = imagesize.get(input_file)
if width > settings.MAX_PIXEL_IN_A_FILE or height > settings.MAX_PIXEL_IN_A_FILE:
raise InvalidDecompressedSizeException(excArgs=(str(width), str(height), str(settings.MAX_PIXEL_IN_A_FILE)))
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
image = Image.open(input_file)
# read orient from metadata. WindowsPhoto keep the origin
for orientation in ExifTags.TAGS.keys():
if ExifTags.TAGS[orientation] == 'Orientation':
break
try:
e = image._getexif() # returns None if no EXIF data
if e:
exif = dict(e.items())
if orientation in exif:
orientation = exif[orientation]
if orientation == 3:
image = image.transpose(Image.ROTATE_180)
elif orientation == 6:
image = image.transpose(Image.ROTATE_270)
elif orientation == 8:
image = image.transpose(Image.ROTATE_90)
except Exception as ex:
print(ex)
print("Rotation Error")
traceback.print_exc()
image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1])
image = image.convert('RGB')
image.save(file_path, optimize=True, quality=quality)
def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
token = image_authenticator.generate_img_token()
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=uid,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=uid,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str:
token = image_authenticator.generate_img_token(user_id)
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=data_id,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=data_id,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str:
token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id)
return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'
def get_value(_dict, keys):
keys = keys.split('.')
value = _dict
try:
for key in keys:
if not key in value.keys():
return "-"
else:
value = value.get(key, {})
except Exception as e:
print(f"[ERROR]: {e}")
print(f"[ERROR]: value: {value}")
print(f"[ERROR]: keys: {keys}")
if not value:
return "-"
elif isinstance(value, list):
value = str(value)
return value
def dict2xlsx(input: json, _type='report'):
red = "FF0000"
black = "000000"
green = "E2EFDA"
yellow = "FFF2CC"
gray = "D0CECE"
font_black = Font(name="Calibri", size=11, color=black)
font_black_bold = Font(name="Calibri", size=11, color=black, bold=True)
font_red = Font(name="Calibri", size=11, color=red)
thin = Side(border_style="thin", color=black)
border = Border(left=thin, right=thin, top=thin, bottom=thin)
fill_green = PatternFill(start_color=green, end_color=green, fill_type = "solid")
fill_yellow = PatternFill(start_color=yellow, end_color=yellow, fill_type = "solid")
fill_gray = PatternFill(start_color=gray, end_color=gray, fill_type = "solid")
normal_cell = NamedStyle(name="normal_cell", font=font_black, border=border)
normal_cell_red = NamedStyle(name="normal_cell_red", font=font_red, border=border)
if _type == 'report':
wb = load_workbook(filename = 'report.xlsx')
ws = wb['Sheet1']
mapping = {
'A': 'subs',
'B': 'extraction_date',
'C': 'usage.total_images',
'D': 'usage.imei',
'E': 'usage.invoice',
'F': 'total_images',
'G': 'num_imei',
'H': 'num_invoice',
'I': 'images_quality.successful',
'J': 'images_quality.successful_percent',
'K': 'images_quality.bad',
'L': 'images_quality.bad_percent',
'M': 'average_accuracy_rate.imei',
'N': 'average_accuracy_rate.purchase_date',
'O': 'average_accuracy_rate.retailer_name',
'P': 'average_processing_time.imei',
'Q': 'average_processing_time.invoice',
}
start_index = 5
elif _type == 'report_detail':
wb = load_workbook(filename = 'report_detail.xlsx')
ws = wb['Sheet1']
mapping = {
'A': 'request_id',
'B': 'redemption_number',
'C': 'image_type',
'D': 'imei_user_submitted',
'E': "imei_ocr_retrieved",
'F': "imei1_accuracy",
'G': "invoice_purchase_date_consumer",
'H': "invoice_purchase_date_ocr",
'I': "invoice_purchase_date_accuracy",
'J': "invoice_retailer_consumer",
'K': "invoice_retailer_ocr",
'L': "invoice_retailer_accuracy",
'M': "ocr_image_accuracy",
'N': "ocr_image_speed",
'O': "is_reviewed",
'P': "bad_image_reasons",
'Q': "countermeasures",
'R': 'imei_revised_accuracy',
'S': 'purchase_date_revised_accuracy',
'T': 'retailer_revised_accuracy',
}
start_index = 4
for subtotal in input:
for key_index, key in enumerate(mapping.keys()):
value = get_value(subtotal, mapping[key])
ws[key + str(start_index)] = value
ws[key + str(start_index)].border = border
if _type == 'report':
if subtotal['subs'] == '+':
ws[key + str(start_index)].font = font_black_bold
if key_index in [6, 8, 9, 10, 11, 12, 13]:
ws[key + str(start_index)].number_format = '0.0'
if key_index == 0 or (key_index >= 9 and key_index <= 15):
ws[key + str(start_index)].fill = fill_gray
elif key_index == 1:
ws[key + str(start_index)].fill = fill_green
elif key_index >= 4 and key_index <= 8:
ws[key + str(start_index)].fill = fill_yellow
else:
if 'average_accuracy_rate' in mapping[key] and type(value) in [int, float]:
if value < 98:
ws[key + str(start_index)].style = normal_cell_red
ws[key + str(start_index)].number_format = '0.0'
elif 'average_processing_time' in mapping[key] and type(value) in [int, float]:
if value > 2.0:
ws[key + str(start_index)].style = normal_cell_red
ws[key + str(start_index)].number_format = '0.0'
elif 'bad_percent' in mapping[key] and type(value) in [int, float]:
if value > 10:
ws[key + str(start_index)].style = normal_cell_red
ws[key + str(start_index)].number_format = '0.0'
elif 'percent' in mapping[key] and type(value) in [int, float]:
ws[key + str(start_index)].number_format = '0.0'
else :
ws[key + str(start_index)].style = normal_cell
elif _type == 'report_detail':
if 'accuracy' in mapping[key] and type(value) in [int, float] and value < 75:
ws[key + str(start_index)].number_format = '0.0'
ws[key + str(start_index)].style = normal_cell_red
elif 'speed' in mapping[key] and type(value) in [int, float] and value > 2.0:
ws[key + str(start_index)].number_format = '0.0'
ws[key + str(start_index)].style = normal_cell_red
else:
ws[key + str(start_index)].style = normal_cell
start_index += 1
return wb