sbt-idp/cope2n-api/fwd_api/utils/file.py

535 lines
23 KiB
Python
Raw Normal View History

2023-11-30 11:19:06 +00:00
import io
import os
import traceback
2023-12-12 05:54:34 +00:00
import pathlib
2023-12-08 12:49:00 +00:00
import json
2023-11-30 11:19:06 +00:00
from PIL import Image, ExifTags
from django.core.files.uploadedfile import TemporaryUploadedFile
2024-01-31 03:00:18 +00:00
from django.utils import timezone
from datetime import datetime
2023-11-30 11:19:06 +00:00
from fwd import settings
2024-02-01 07:32:20 +00:00
from ..utils import s3 as S3Util
2023-11-30 11:19:06 +00:00
from fwd_api.constant.common import allowed_file_extensions
from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \
2024-01-09 12:41:17 +00:00
ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException, RequiredColumnException
2024-02-01 07:32:20 +00:00
from fwd_api.models import SubscriptionRequest, OcrTemplate, FeedbackRequest, SubscriptionRequestFile, Report, ReportFile
2023-12-15 05:43:19 +00:00
from fwd_api.utils import process as ProcessUtil
from fwd_api.utils.crypto import image_authenticator
2023-12-12 05:54:34 +00:00
from fwd_api.utils.image import resize
2023-11-30 11:19:06 +00:00
from ..celery_worker.client_connector import c_connector
2023-12-11 13:15:48 +00:00
import imagesize
2024-01-09 12:41:17 +00:00
import csv
2024-01-30 09:33:42 +00:00
from openpyxl import load_workbook
2024-02-06 08:59:27 +00:00
from openpyxl.styles import Font, Border, Side, PatternFill, NamedStyle, numbers
2024-01-30 09:33:42 +00:00
2024-02-01 07:32:20 +00:00
s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME
)
def convert_date_string(date_string):
# Parse the input date string
date_format = "%Y-%m-%d %H:%M:%S.%f %z"
parsed_date = datetime.strptime(date_string, date_format)
# Format the date as "YYYYMMDD"
formatted_date = parsed_date.strftime("%Y%m%d")
return formatted_date
2024-01-31 03:00:18 +00:00
def validate_report_list(request):
start_date_str = request.GET.get('start_date')
end_date_str = request.GET.get('end_date')
page_number = int(request.GET.get('page', 0))
page_size = int(request.GET.get('page_size', 10))
report_id = request.GET.get('report_id', None)
validated_data = {}
validated_data["start_date"] = None
validated_data["end_date"] = None
if len(start_date_str) > 0 and len(end_date_str) > 0:
try:
validated_data["start_date"] = timezone.datetime.strptime(start_date_str, '%Y-%m-%dT%H:%M:%S%z')
validated_data["end_date"] = timezone.datetime.strptime(end_date_str, '%Y-%m-%dT%H:%M:%S%z')
except ValueError:
raise InvalidException(excArgs="Date format")
validated_data["report_id"] = report_id
validated_data["page_size"] = page_size
validated_data["page_number"] = page_number
if validated_data["report_id"] is None and validated_data["start_date"] is None:
raise RequiredFieldException(excArgs="report_id, start_date, end_date")
return validated_data
2024-01-09 12:41:17 +00:00
def validate_feedback_file(csv_file_path):
required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli']
missing_columns = []
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
# Check if all required columns are present
for column in required_columns:
if column not in reader.fieldnames:
missing_columns.append(column)
if missing_columns:
raise RequiredColumnException(excArgs=str(missing_columns))
2023-11-30 11:19:06 +00:00
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
# print(f'[DEBUG]: {f.name}')
raise InvalidException(excArgs="files")
2023-12-05 05:59:06 +00:00
extension = f.name.split(".")[-1].lower() in allowed_file_extensions
2023-11-30 11:19:06 +00:00
if not extension or "." not in f.name:
2023-12-26 11:44:03 +00:00
raise FileFormatInvalidException(excArgs=list(allowed_file_extensions))
2023-11-30 11:19:06 +00:00
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
2024-01-09 12:41:17 +00:00
def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
# print(f'[DEBUG]: {f.name}')
raise InvalidException(excArgs="files")
extension = f.name.split(".")[-1].lower() in ["csv"]
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=[".csv"])
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
2023-11-30 11:19:06 +00:00
def get_file(file_path: str):
try:
return open(file_path, 'rb')
except Exception as e:
print(e)
raise GeneralException("System")
def get_template_folder_path(tem: OcrTemplate):
tem_id = str(tem.id)
sub_id = str(tem.subscription.id)
user_id = str(tem.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, "templates", tem_id)
def get_folder_path(rq: SubscriptionRequest):
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
request_id = str(rq.request_id)
logger.info(f"[DEBUG]: rq.process_type: {rq.process_type}")
p_type = ProcessUtil.map_process_type_to_folder_name(int(rq.process_type))
sub_id = str(rq.subscription.id)
user_id = str(rq.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id)
def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, 'wb+') as w:
w.write(file_bytes)
return file_path
def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
return file_path
2023-12-08 12:49:00 +00:00
def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, "w") as json_file:
json.dump(data, json_file)
return file_path
2023-11-30 11:19:06 +00:00
2024-01-09 12:41:17 +00:00
def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict):
user_id = str(rq.subscription.user.id)
feedback_id = str(rq.id)
folder_path = os.path.join(settings.MEDIA_ROOT, 'users', user_id, "feedbacks", feedback_id)
2024-01-09 12:41:17 +00:00
os.makedirs(folder_path, exist_ok = True)
file_path = os.path.join(folder_path, file_name)
with uploaded_file.open() as file:
# Read the contents of the file
file_contents = file.read().decode('utf-8')
with open(file_path, 'w', newline='') as csvfile:
csvfile.write(file_contents)
return file_path
2024-02-06 03:14:44 +00:00
def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""):
2024-02-01 07:32:20 +00:00
report_id = str(rp.report_id)
2024-02-06 03:14:44 +00:00
if not prefix:
folder_path = os.path.join(settings.MEDIA_ROOT, "report", report_id)
else:
folder_path = os.path.join(settings.MEDIA_ROOT, "report", prefix)
2024-02-01 07:32:20 +00:00
os.makedirs(folder_path, exist_ok = True)
file_path = os.path.join(folder_path, file_name)
workbook.save(file_path)
return file_path
2023-11-30 11:19:06 +00:00
def delete_file_with_path(file_path: str) -> bool:
try:
os.remove(file_path)
return True
except Exception as e:
print(e)
return False
def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality):
try:
folder_path = get_template_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
return save_file_with_path(file_name, file, quality, folder_path)
except Exception as e:
print(e)
raise ServiceUnavailableException()
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path: str):
2023-12-12 05:54:34 +00:00
try:
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
if extension.lower() == "pdf":
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
print(e)
raise ServiceUnavailableException()
return file_path
2023-11-30 11:19:06 +00:00
def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality: int):
2023-11-30 11:19:06 +00:00
try:
folder_path = get_folder_path(rq)
2023-12-12 05:54:34 +00:00
pathlib.Path(folder_path).mkdir(exist_ok=True, parents=True)
# request_file: SubscriptionRequestFile = SubscriptionRequestFile(
# file_name = file_name,
# file_path = os.path.join(folder_path, file_name),
# doc_type = doc_type,
# origin_name = file.name,
# request = rq,
# index_in_request= index_in_request
# )
# request_file.save()
2023-11-30 11:19:06 +00:00
return save_file_with_path(file_name, file, quality, folder_path)
2023-12-11 13:15:48 +00:00
except InvalidDecompressedSizeException as e:
raise e
2023-11-30 11:19:06 +00:00
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
2023-12-05 05:59:06 +00:00
def save_to_S3(file_name, rq, local_file_path):
2023-11-30 11:19:06 +00:00
try:
file_path = get_folder_path(rq)
2023-12-25 03:32:09 +00:00
request_id = rq.request_id
2023-11-30 11:19:06 +00:00
assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id"
s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name)
2023-12-25 03:32:09 +00:00
c_connector.upload_file_to_s3((local_file_path, s3_key, request_id))
2023-12-25 11:48:50 +00:00
c_connector.remove_local_file((local_file_path, request_id))
2023-12-05 05:59:06 +00:00
return s3_key
2023-11-30 11:19:06 +00:00
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
2024-01-09 12:41:17 +00:00
def save_feedback_to_S3(file_name, id, local_file_path):
try:
# print(f"[DEBUG]: Uploading feedback to S3 with local path {local_file_path}, id: {id}, file_name: {file_name}")
assert len(local_file_path.split("/")) >= 3, "file_path must have at least feedback_folder and feedback_id"
# s3_key = os.path.join(local_file_path.split("/")[-3], local_file_path.split("/")[-2], file_name)
s3_key = os.path.join("feedback", local_file_path.split("/")[-2], file_name)
# print(f"[DEBUG]: Uploading feedback to S3 with s3_key {s3_key}")
2024-01-09 12:41:17 +00:00
c_connector.upload_feedback_to_s3((local_file_path, s3_key, id))
c_connector.remove_local_file((local_file_path, id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
2024-02-01 07:32:20 +00:00
def save_report_to_S3(id, local_file_path):
try:
s3_key = os.path.join("report", local_file_path.split("/")[-2], local_file_path.split("/")[-1])
c_connector.upload_report_to_s3((local_file_path, s3_key, id))
c_connector.remove_local_file((local_file_path, id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def download_from_S3(s3_key, local_file_path):
s3_client.download_file(s3_key, local_file_path)
2023-11-30 11:19:06 +00:00
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
try:
2023-12-12 11:51:32 +00:00
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
2023-11-30 11:19:06 +00:00
2023-12-12 11:51:32 +00:00
if extension in ['pdf', 'PDF']:
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
2023-12-11 13:15:48 +00:00
except InvalidDecompressedSizeException as e:
raise e
2023-11-30 11:19:06 +00:00
except Exception as e:
print(e)
raise ServiceUnavailableException()
return file_path
def save_pdf(file_path: str, file: TemporaryUploadedFile):
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
def save_img(file_path: str, file: TemporaryUploadedFile, quality):
2023-12-11 13:15:48 +00:00
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
width, height = imagesize.get(input_file)
if width > settings.MAX_PIXEL_IN_A_FILE or height > settings.MAX_PIXEL_IN_A_FILE:
raise InvalidDecompressedSizeException(excArgs=(str(width), str(height), str(settings.MAX_PIXEL_IN_A_FILE)))
2023-11-30 11:19:06 +00:00
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
image = Image.open(input_file)
# read orient from metadata. WindowsPhoto keep the origin
for orientation in ExifTags.TAGS.keys():
if ExifTags.TAGS[orientation] == 'Orientation':
break
try:
e = image._getexif() # returns None if no EXIF data
if e:
exif = dict(e.items())
if orientation in exif:
orientation = exif[orientation]
if orientation == 3:
image = image.transpose(Image.ROTATE_180)
elif orientation == 6:
image = image.transpose(Image.ROTATE_270)
elif orientation == 8:
image = image.transpose(Image.ROTATE_90)
except Exception as ex:
print(ex)
print("Rotation Error")
traceback.print_exc()
2023-12-12 05:54:34 +00:00
image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1])
image = image.convert('RGB')
image.save(file_path, optimize=True, quality=quality)
2023-11-30 11:19:06 +00:00
def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
token = image_authenticator.generate_img_token()
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=uid,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=uid,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str:
token = image_authenticator.generate_img_token(user_id)
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=data_id,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=data_id,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str:
token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id)
return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'
2024-01-23 07:16:22 +00:00
2024-01-30 09:33:42 +00:00
def get_value(_dict, keys):
keys = keys.split('.')
value = _dict
2024-02-06 03:14:44 +00:00
try:
for key in keys:
if not key in value.keys():
return "-"
else:
value = value.get(key, {})
except Exception as e:
print(f"[ERROR]: {e}")
print(f"[ERROR]: value: {value}")
print(f"[ERROR]: keys: {keys}")
2024-02-01 07:32:20 +00:00
if not value:
2024-01-30 09:33:42 +00:00
return "-"
2024-02-01 07:32:20 +00:00
elif isinstance(value, list):
value = str(value)
return value
2024-01-30 09:33:42 +00:00
2024-01-30 10:38:05 +00:00
def dict2xlsx(input: json, _type='report'):
2024-01-30 09:33:42 +00:00
red = "FF0000"
black = "000000"
green = "E2EFDA"
yellow = "FFF2CC"
gray = "D0CECE"
font_black = Font(name="Calibri", size=11, color=black)
font_black_bold = Font(name="Calibri", size=11, color=black, bold=True)
font_red = Font(name="Calibri", size=11, color=red)
thin = Side(border_style="thin", color=black)
border = Border(left=thin, right=thin, top=thin, bottom=thin)
fill_green = PatternFill(start_color=green, end_color=green, fill_type = "solid")
fill_yellow = PatternFill(start_color=yellow, end_color=yellow, fill_type = "solid")
fill_gray = PatternFill(start_color=gray, end_color=gray, fill_type = "solid")
normal_cell = NamedStyle(name="normal_cell", font=font_black, border=border)
normal_cell_red = NamedStyle(name="normal_cell_red", font=font_red, border=border)
2024-01-30 10:38:05 +00:00
if _type == 'report':
wb = load_workbook(filename = 'report.xlsx')
ws = wb['Sheet1']
mapping = {
'A': 'subs',
'B': 'extraction_date',
'C': 'num_imei',
'D': 'num_invoice',
'E': 'total_images',
'F': 'images_quality.successful',
'G': 'images_quality.successful_percent',
'H': 'images_quality.bad',
'I': 'images_quality.bad_percent',
'J': 'average_accuracy_rate.imei',
'K': 'average_accuracy_rate.purchase_date',
'L': 'average_accuracy_rate.retailer_name',
'M': 'average_processing_time.imei',
'N': 'average_processing_time.invoice',
'O': 'usage.imei',
'P': 'usage.invoice',
}
start_index = 5
elif _type == 'report_detail':
wb = load_workbook(filename = 'report_detail.xlsx')
ws = wb['Sheet1']
mapping = {
'A': 'request_id',
'B': 'redemption_number',
'C': 'image_type',
'D': 'imei_user_submitted',
'E': "imei_ocr_retrieved",
'F': "imei1_accuracy",
'G': "invoice_purchase_date_consumer",
'H': "invoice_purchase_date_ocr",
'I': "invoice_purchase_date_accuracy",
'J': "invoice_retailer_consumer",
'K': "invoice_retailer_ocr",
'L': "invoice_retailer_accuracy",
'M': "ocr_image_accuracy",
'N': "ocr_image_speed",
2024-01-30 10:38:05 +00:00
'O': "is_reviewed",
'P': "bad_image_reasons",
2024-01-31 02:00:04 +00:00
'Q': "countermeasures",
'R': 'imei_revised_accuracy',
'S': 'purchase_date_revised_accuracy',
'T': 'retailer_revised_accuracy',
2024-01-30 10:38:05 +00:00
}
start_index = 4
2024-01-30 09:33:42 +00:00
for subtotal in input:
2024-01-30 10:38:05 +00:00
for key_index, key in enumerate(mapping.keys()):
value = get_value(subtotal, mapping[key])
ws[key + str(start_index)] = value
2024-01-30 09:33:42 +00:00
ws[key + str(start_index)].border = border
2024-01-30 10:38:05 +00:00
if _type == 'report':
2024-02-06 03:14:44 +00:00
if subtotal['subs'] == '+':
ws[key + str(start_index)].font = font_black_bold
2024-02-06 08:59:27 +00:00
if key_index in [6, 8, 9, 10, 11, 12, 13]:
ws[key + str(start_index)].number_format = numbers.FORMAT_NUMBER_00
2024-02-06 03:14:44 +00:00
if key_index == 0 or (key_index >= 9 and key_index <= 15):
ws[key + str(start_index)].fill = fill_gray
elif key_index == 1:
ws[key + str(start_index)].fill = fill_green
elif key_index >= 4 and key_index <= 8:
ws[key + str(start_index)].fill = fill_yellow
else:
2024-02-06 08:59:27 +00:00
if 'average_accuracy_rate' in mapping[key] and type(value) in [int, float]:
if value < 95:
ws[key + str(start_index)].style = normal_cell_red
ws[key + str(start_index)].number_format = numbers.FORMAT_NUMBER_00
elif 'average_processing_time' in mapping[key] and type(value) in [int, float]:
if value > 2.0:
ws[key + str(start_index)].style = normal_cell_red
ws[key + str(start_index)].number_format = numbers.FORMAT_NUMBER_00
elif 'bad_percent' in mapping[key] and type(value) in [int, float]:
if value > 10:
ws[key + str(start_index)].style = normal_cell_red
ws[key + str(start_index)].number_format = numbers.FORMAT_NUMBER_00
elif 'percent' in mapping[key] and type(value) in [int, float]:
ws[key + str(start_index)].number_format = numbers.FORMAT_NUMBER_00
2024-01-30 10:38:05 +00:00
else :
ws[key + str(start_index)].style = normal_cell
2024-02-06 03:14:44 +00:00
elif _type == 'report_detail':
if 'accuracy' in mapping[key] and type(value) in [int, float] and value < 75:
ws[key + str(start_index)].style = normal_cell_red
elif 'speed' in mapping[key] and type(value) in [int, float] and value > 2.0:
ws[key + str(start_index)].style = normal_cell_red
else:
ws[key + str(start_index)].style = normal_cell
2024-01-30 10:38:05 +00:00
2024-02-06 03:14:44 +00:00
start_index += 1
2024-01-30 09:33:42 +00:00
return wb