sbt-idp/cope2n-api/fwd_api/utils/file.py
2024-06-26 14:58:24 +07:00

630 lines
26 KiB
Python
Executable File

import io
import os
import traceback
import pathlib
import json
from PIL import Image, ExifTags
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.utils import timezone
from datetime import datetime
from fwd import settings
from ..utils import s3 as S3Util
from fwd_api.constant.common import allowed_file_extensions
from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \
ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException, RequiredColumnException
from fwd_api.models import SubscriptionRequest, OcrTemplate, FeedbackRequest, SubscriptionRequestFile, Report, ReportFile
from fwd_api.utils import process as ProcessUtil
from fwd_api.utils.crypto import image_authenticator
from fwd_api.utils.image import resize
from ..celery_worker.client_connector import c_connector
import imagesize
import csv
from openpyxl import load_workbook
from openpyxl.styles import Font, Border, Side, PatternFill, NamedStyle, numbers, Alignment
import logging
logger = logging.getLogger(__name__)
s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME
)
def convert_date_string(date_string):
# Parse the input date string
date_format = "%Y-%m-%d %H:%M:%S.%f %z"
parsed_date = datetime.strptime(date_string, date_format)
# Format the date as "YYYYMMDD"
formatted_date = parsed_date.strftime("%Y%m%d")
return formatted_date
def validate_report_list(request):
start_date_str = request.GET.get('start_date')
end_date_str = request.GET.get('end_date')
page_number = int(request.GET.get('page', 0))
page_size = int(request.GET.get('page_size', 10))
report_id = request.GET.get('report_id', None)
validated_data = {}
validated_data["start_date"] = None
validated_data["end_date"] = None
if len(start_date_str) > 0 and len(end_date_str) > 0:
try:
validated_data["start_date"] = timezone.datetime.strptime(start_date_str, '%Y-%m-%dT%H:%M:%S%z')
validated_data["end_date"] = timezone.datetime.strptime(end_date_str, '%Y-%m-%dT%H:%M:%S%z')
except ValueError:
raise InvalidException(excArgs="Date format")
validated_data["report_id"] = report_id
validated_data["page_size"] = page_size
validated_data["page_number"] = page_number
if validated_data["report_id"] is None and validated_data["start_date"] is None:
raise RequiredFieldException(excArgs="report_id, start_date, end_date")
return validated_data
def validate_feedback_file(csv_file_path):
required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli']
missing_columns = []
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
# Check if all required columns are present
for column in required_columns:
if column not in reader.fieldnames:
missing_columns.append(column)
if missing_columns:
raise RequiredColumnException(excArgs=str(missing_columns))
def validate_review(review, num_imei):
for field in settings.FIELD:
if not field in review.keys():
raise RequiredFieldException(excArgs=f'reviewed_result.{field}')
if not isinstance(review["imei_number"], list) or len(review["imei_number"]) != num_imei:
raise InvalidException(excArgs=f'imei_number')
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
raise InvalidException(excArgs="files")
extension = f.name.split(".")[-1].lower() in allowed_file_extensions
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=list(allowed_file_extensions))
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
raise InvalidException(excArgs="files")
extension = f.name.split(".")[-1].lower() in ["csv"]
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=[".csv"])
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def get_file(file_path: str):
try:
return open(file_path, 'rb')
except Exception as e:
logger.error(e)
raise GeneralException("System")
def get_template_folder_path(tem: OcrTemplate):
tem_id = str(tem.id)
sub_id = str(tem.subscription.id)
user_id = str(tem.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, "templates", tem_id)
def get_folder_path(rq: SubscriptionRequest):
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
request_id = str(rq.request_id)
logger.debug(f"rq.process_type: {rq.process_type}")
p_type = ProcessUtil.map_process_type_to_folder_name(int(rq.process_type))
sub_id = str(rq.subscription.id)
user_id = str(rq.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id)
def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, 'wb+') as w:
w.write(file_bytes)
return file_path
def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
return file_path
def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, "w") as json_file:
json.dump(data, json_file)
return file_path
def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict):
user_id = str(rq.subscription.user.id)
feedback_id = str(rq.id)
folder_path = os.path.join(settings.MEDIA_ROOT, 'users', user_id, "feedbacks", feedback_id)
os.makedirs(folder_path, exist_ok = True)
file_path = os.path.join(folder_path, file_name)
with uploaded_file.open() as file:
# Read the contents of the file
file_contents = file.read().decode('utf-8')
with open(file_path, 'w', newline='') as csvfile:
csvfile.write(file_contents)
return file_path
def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""):
report_id = str(rp.report_id)
if not prefix:
folder_path = os.path.join(settings.MEDIA_ROOT, "report", report_id)
else:
folder_path = os.path.join(settings.MEDIA_ROOT, "report", prefix)
os.makedirs(folder_path, exist_ok = True)
file_path = os.path.join(folder_path, file_name)
workbook.save(file_path)
return file_path
def delete_file_with_path(file_path: str) -> bool:
try:
os.remove(file_path)
return True
except Exception as e:
logger.error(e)
return False
def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality):
try:
folder_path = get_template_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
return save_file_with_path(file_name, file, quality, folder_path)
except Exception as e:
logger.error(e)
raise ServiceUnavailableException()
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path: str):
try:
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
if extension.lower() == "pdf":
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
logger.error(e)
raise ServiceUnavailableException()
return file_path
def save_images_to_csv_briefly(id, image_filenames):
# columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"]
columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"]
# get the SubcriptionRequestFile list
images = SubscriptionRequestFile.objects.filter(file_name__in=image_filenames)
# Create a CSV writer object
folder_path = os.path.join(settings.MEDIA_ROOT, "report", id)
file_path = os.path.join(folder_path, "bad_images.csv")
os.makedirs(folder_path, exist_ok = True)
csv_file = open(file_path, "w", newline="")
csv_writer = csv.DictWriter(csv_file, fieldnames=columns)
csv_writer.writeheader()
# Write data to the CSV file
for subscription_request_file in images:
row = {
"request_id": subscription_request_file.request.request_id,
"file_name" : subscription_request_file.file_name,
"predict_result": subscription_request_file.predict_result,
"feedback_result": subscription_request_file.feedback_result,
"reviewed_result": subscription_request_file.reviewed_result,
# "feedback_accuracy": subscription_request_file.feedback_accuracy,
# "reviewed_accuracy": subscription_request_file.reviewed_accuracy,
}
csv_writer.writerow(row)
# Close the CSV file
csv_file.close()
# save to S3
save_report_to_S3(id, file_path)
def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality: int):
try:
folder_path = get_folder_path(rq)
pathlib.Path(folder_path).mkdir(exist_ok=True, parents=True)
# request_file: SubscriptionRequestFile = SubscriptionRequestFile(
# file_name = file_name,
# file_path = os.path.join(folder_path, file_name),
# doc_type = doc_type,
# origin_name = file.name,
# request = rq,
# index_in_request= index_in_request
# )
# request_file.save()
return save_file_with_path(file_name, file, quality, folder_path)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
logger.error(f"{e}")
raise ServiceUnavailableException()
def save_to_S3(file_name, rq, local_file_path):
try:
file_path = get_folder_path(rq)
request_id = rq.request_id
assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id"
s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name)
c_connector.upload_file_to_s3((local_file_path, s3_key, request_id))
c_connector.remove_local_file((local_file_path, request_id))
return s3_key
except Exception as e:
logger.error(f"{e}")
raise ServiceUnavailableException()
def save_feedback_to_S3(file_name, id, local_file_path):
try:
assert len(local_file_path.split("/")) >= 3, "file_path must have at least feedback_folder and feedback_id"
# s3_key = os.path.join(local_file_path.split("/")[-3], local_file_path.split("/")[-2], file_name)
s3_key = os.path.join("feedback", local_file_path.split("/")[-2], file_name)
c_connector.upload_feedback_to_s3((local_file_path, s3_key, id))
c_connector.remove_local_file((local_file_path, id))
return s3_key
except Exception as e:
logger.error(f"{e}")
raise ServiceUnavailableException()
def save_report_to_S3(id, local_file_path, delay=0):
try:
s3_key = os.path.join("report", local_file_path.split("/")[-2], local_file_path.split("/")[-1])
c_connector.upload_report_to_s3((local_file_path, s3_key, id, delay))
c_connector.remove_local_file((local_file_path, id))
return s3_key
except Exception as e:
logger.error(f"{e}")
raise ServiceUnavailableException()
def download_from_S3(s3_key, local_file_path):
s3_client.download_file(s3_key, local_file_path)
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
try:
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
if extension in ['pdf', 'PDF']:
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
logger.error(e)
raise ServiceUnavailableException()
return file_path
def save_pdf(file_path: str, file: TemporaryUploadedFile):
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
def save_img(file_path: str, file: TemporaryUploadedFile, quality):
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
width, height = imagesize.get(input_file)
if width > settings.MAX_PIXEL_IN_A_FILE or height > settings.MAX_PIXEL_IN_A_FILE:
raise InvalidDecompressedSizeException(excArgs=(str(width), str(height), str(settings.MAX_PIXEL_IN_A_FILE)))
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
image = Image.open(input_file)
# read orient from metadata. WindowsPhoto keep the origin
for orientation in ExifTags.TAGS.keys():
if ExifTags.TAGS[orientation] == 'Orientation':
break
try:
e = image._getexif() # returns None if no EXIF data
if e:
exif = dict(e.items())
if orientation in exif:
orientation = exif[orientation]
if orientation == 3:
image = image.transpose(Image.ROTATE_180)
elif orientation == 6:
image = image.transpose(Image.ROTATE_270)
elif orientation == 8:
image = image.transpose(Image.ROTATE_90)
except Exception as ex:
logger.error(ex)
logger.error("Rotation Error")
traceback.print_exc()
image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1])
image = image.convert('RGB')
image.save(file_path, optimize=True, quality=quality)
def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
token = image_authenticator.generate_img_token()
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=uid,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=uid,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str:
token = image_authenticator.generate_img_token(user_id)
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=data_id,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=data_id,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str:
token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id)
return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'
def build_S3_url(s3_key, exp_time):
return s3_client.create_url_with_expiration(s3_key, exp_time)
def get_value(_dict, keys):
keys = keys.split('.')
value = _dict
try:
for key in keys:
if not key in value.keys():
return "-"
else:
value = value.get(key, {})
except Exception as e:
logger.error(f"{e}")
logger.error(f"value: {value}")
logger.error(f"keys: {keys}")
if value is None:
return "-"
elif isinstance(value, list):
value = str(value)
return value
def dict2xlsx(input: json, _type='report'):
if _type == 'report':
wb = dump_excel_report(input=input)
elif _type == 'report_detail':
wb = dump_excel_report_detail(input=input)
elif _type == 'billing_report':
wb = dump_excel_billing_report(input=input)
return wb
def dump_excel_report(input: json):
red = "FF0000"
black = "000000"
green = "E2EFDA"
yellow = "FFF2CC"
gray = "D0CECE"
font_black = Font(name="Calibri", size=11, color=black)
font_black_bold = Font(name="Calibri", size=11, color=black, bold=True)
font_red = Font(name="Calibri", size=11, color=red)
thin = Side(border_style="thin", color=black)
border = Border(left=thin, right=thin, top=thin, bottom=thin)
fill_green = PatternFill(start_color=green, end_color=green, fill_type = "solid")
fill_yellow = PatternFill(start_color=yellow, end_color=yellow, fill_type = "solid")
fill_gray = PatternFill(start_color=gray, end_color=gray, fill_type = "solid")
wb = load_workbook(filename = 'report.xlsx')
ws = wb['Sheet1']
mapping = {
'A': 'subs',
'B': 'extraction_date',
'C': 'usage.total_images',
'D': 'usage.imei',
'E': 'usage.invoice',
'F': 'total_images',
'G': 'num_imei',
'H': 'num_invoice',
'I': 'images_quality.successful',
'J': 'images_quality.successful_percent',
'K': 'images_quality.bad',
'L': 'images_quality.bad_percent',
'M': 'average_accuracy_rate.imei_number',
'N': 'average_accuracy_rate.purchase_date',
'O': 'average_accuracy_rate.retailername',
'P': 'average_accuracy_rate.invoice_no',
'Q': 'file_average_processing_time.imei',
'R': 'file_average_processing_time.invoice',
'S': 'review_progress'
}
start_index = 5
for subtotal in input:
for key in mapping.keys():
value = get_value(subtotal, mapping[key])
ws[key + str(start_index)] = value
if key in ['C', 'D', 'E'] and value == 0:
ws[key + str(start_index)] = "-"
ws[key + str(start_index)].border = border
ws[key + str(start_index)].font = font_black
if 'accuracy' in mapping[key] or 'time' in mapping[key] or 'percent' in mapping[key] or 'speed' in mapping[key] or mapping[key] in ["review_progress"]:
ws[key + str(start_index)].number_format = '0.0'
if subtotal['subs'] == '+':
ws[key + str(start_index)].font = font_black_bold
if key in ['A', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R']:
ws[key + str(start_index)].fill = fill_gray
elif key == 'B':
ws[key + str(start_index)].fill = fill_green
elif key in ['C', 'D', 'E', 'F', 'G', 'H']:
ws[key + str(start_index)].fill = fill_yellow
if 'average_accuracy_rate' in mapping[key] and type(value) in [int, float] and value < 98:
ws[key + str(start_index)].font = font_red
elif 'file_average_processing_time' in mapping[key] and type(value) in [int, float] and value > 2.0:
ws[key + str(start_index)].font = font_red
elif 'bad_percent' in mapping[key] and type(value) in [int, float] and value > 10:
ws[key + str(start_index)].font = font_red
if key in ['C', 'D', 'E', 'F', 'G', 'H', 'I', 'K']:
ws[key + str(start_index)].number_format= '#,##0'
start_index += 1
return wb
def dump_excel_report_detail(input: json):
red = "FF0000"
black = "000000"
font_black = Font(name="Calibri", size=11, color=black)
font_red = Font(name="Calibri", size=11, color=red)
thin = Side(border_style="thin", color=black)
border = Border(left=thin, right=thin, top=thin, bottom=thin)
wb = load_workbook(filename = 'report_detail.xlsx')
ws = wb['Sheet1']
mapping = {
'A': 'subs',
'B': 'request_id',
'C': "ocr_extraction_date",
'D': 'redemption_number',
'E': 'image_type',
'F': 'imei_user_submitted',
'G': "imei_ocr_retrieved",
'H': "imei_revised",
'I': "imei1_accuracy",
'J': "invoice_purchase_date_consumer",
'K': "invoice_purchase_date_ocr",
'L': "invoice_purchase_date_revised",
'M': "invoice_purchase_date_accuracy",
'N': "invoice_retailer_consumer",
'O': "invoice_retailer_ocr",
'P': 'invoice_retailer_revised',
'Q': "invoice_retailer_accuracy",
'R': "invoice_number_user",
'S': "invoice_number_ocr",
'T': "invoice_number_revised",
'U': "invoice_number_accuracy",
'V': "ocr_image_accuracy",
'W': "ocr_image_speed_(seconds)",
'X': "is_reviewed",
'Y': "bad_image_reasons",
'Z': "countermeasures",
'AA':"imei_revised_accuracy",
'AB': "invoice_number_revised_accuracy",
'AC': 'purchase_date_revised_accuracy',
'AD':'retailer_revised_accuracy',
}
start_index = 4
for subtotal in input:
for key in mapping.keys():
value = get_value(subtotal, mapping[key])
ws[key + str(start_index)] = value
if key in ['C', 'D', 'E'] and value == 0:
ws[key + str(start_index)] = "-"
ws[key + str(start_index)].border = border
ws[key + str(start_index)].font = font_black
if 'accuracy' in mapping[key] or 'time' in mapping[key] or 'percent' in mapping[key] or 'speed' in mapping[key] or mapping[key] in ["review_progress"]:
ws[key + str(start_index)].number_format = '0.0'
if 'accuracy' in mapping[key] and type(value) in [int, float] and value < 75:
ws[key + str(start_index)].font = font_red
elif 'speed' in mapping[key] and type(value) in [int, float] and value > 2.0:
ws[key + str(start_index)].font = font_red
start_index += 1
return wb
def dump_excel_billing_report(input: json):
black = "000000"
font_black = Font(name="Calibri", size=11, color=black)
thin = Side(border_style="thin", color=black)
border = Border(left=thin, right=thin, top=thin, bottom=thin)
align_right = Alignment(horizontal='right')
wb = load_workbook(filename = 'billing_report.xlsx')
ws = wb['Sheet1']
mapping = {
'B': 'request_month',
'C': 'subsidiary',
'D': 'image_type',
'E': 'redemption_number',
'F': 'request_id',
'G': "request_date",
'H': "request_time_(utc)"
}
start_index = 4
for subtotal in input:
for key in mapping.keys():
value = get_value(subtotal, mapping[key])
value = "-" if value=="" else value
ws[key + str(start_index)] = value
ws[key + str(start_index)].border = border
ws[key + str(start_index)].font = font_black
if key in ['G', 'H']:
ws[key + str(start_index)].alignment = align_right
start_index += 1
return wb