import io import os import traceback import pathlib import json from PIL import Image, ExifTags from django.core.files.uploadedfile import TemporaryUploadedFile from fwd import settings from fwd_api.constant.common import allowed_file_extensions from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \ ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException from fwd_api.models import SubscriptionRequest, OcrTemplate from fwd_api.utils import ProcessUtil from fwd_api.utils.CryptoUtils import image_authenticator from fwd_api.utils.image import resize from ..celery_worker.client_connector import c_connector import imagesize def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"): total_file_size = 0 if len(files) < min_file_num: raise RequiredFieldException(excArgs=file_field) if len(files) > max_file_num: raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), '')) for f in files: if not isinstance(f, TemporaryUploadedFile): # print(f'[DEBUG]: {f.name}') raise InvalidException(excArgs="files") extension = f.name.split(".")[-1].lower() in allowed_file_extensions if not extension or "." not in f.name: raise FileFormatInvalidException(excArgs=allowed_file_extensions) if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE: raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) total_file_size += f.size if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST: raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) def get_file(file_path: str): try: return open(file_path, 'rb') except Exception as e: print(e) raise GeneralException("System") def get_template_folder_path(tem: OcrTemplate): tem_id = str(tem.id) sub_id = str(tem.subscription.id) user_id = str(tem.subscription.user.id) return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, "templates", tem_id) def get_folder_path(rq: SubscriptionRequest): from celery.utils.log import get_task_logger logger = get_task_logger(__name__) request_id = str(rq.request_id) logger.info(f"[DEBUG]: rq.process_type: {rq.process_type}") p_type = ProcessUtil.map_process_type_to_folder_name(int(rq.process_type)) sub_id = str(rq.subscription.id) user_id = str(rq.subscription.user.id) return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id) def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) file_path = os.path.join(folder_path, file_name) with open(file_path, 'wb+') as w: w.write(file_bytes) return file_path def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) file_path = os.path.join(folder_path, file_name) f = open(file_path, 'wb+') for chunk in file.chunks(): f.write(chunk) f.close() return file_path def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) file_path = os.path.join(folder_path, file_name) with open(file_path, "w") as json_file: json.dump(data, json_file) return file_path def delete_file_with_path(file_path: str) -> bool: try: os.remove(file_path) return True except Exception as e: print(e) return False def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality): try: folder_path = get_template_folder_path(rq) is_exist = os.path.exists(folder_path) if not is_exist: # Create a new directory because it does not exist os.makedirs(folder_path) return save_file_with_path(file_name, file, quality, folder_path) except Exception as e: print(e) raise ServiceUnavailableException() def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path): try: file_path = os.path.join(folder_path, file_name) extension = file_name.split(".")[-1] if extension.lower() == "pdf": save_pdf(file_path, file) else: save_img(file_path, file, quality) except InvalidDecompressedSizeException as e: raise e except Exception as e: print(e) raise ServiceUnavailableException() return file_path def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality): try: folder_path = get_folder_path(rq) pathlib.Path(folder_path).mkdir(exist_ok=True, parents=True) return save_file_with_path(file_name, file, quality, folder_path) except InvalidDecompressedSizeException as e: raise e except Exception as e: print(f"[ERROR]: {e}") raise ServiceUnavailableException() def save_to_S3(file_name, rq, local_file_path): try: file_path = get_folder_path(rq) assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id" s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name) c_connector.upload_file_to_s3((local_file_path, s3_key)) return s3_key except Exception as e: print(f"[ERROR]: {e}") raise ServiceUnavailableException() def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path): try: file_path = os.path.join(folder_path, file_name) extension = file_name.split(".")[-1] if extension in ['pdf', 'PDF']: save_pdf(file_path, file) else: save_img(file_path, file, quality) except InvalidDecompressedSizeException as e: raise e except Exception as e: print(e) raise ServiceUnavailableException() return file_path def save_pdf(file_path: str, file: TemporaryUploadedFile): f = open(file_path, 'wb+') for chunk in file.chunks(): f.write(chunk) f.close() def save_img(file_path: str, file: TemporaryUploadedFile, quality): with open(file.temporary_file_path(), "rb") as fs: input_file = io.BytesIO(fs.read()) width, height = imagesize.get(input_file) if width > settings.MAX_PIXEL_IN_A_FILE or height > settings.MAX_PIXEL_IN_A_FILE: raise InvalidDecompressedSizeException(excArgs=(str(width), str(height), str(settings.MAX_PIXEL_IN_A_FILE))) with open(file.temporary_file_path(), "rb") as fs: input_file = io.BytesIO(fs.read()) image = Image.open(input_file) # read orient from metadata. WindowsPhoto keep the origin for orientation in ExifTags.TAGS.keys(): if ExifTags.TAGS[orientation] == 'Orientation': break try: e = image._getexif() # returns None if no EXIF data if e: exif = dict(e.items()) if orientation in exif: orientation = exif[orientation] if orientation == 3: image = image.transpose(Image.ROTATE_180) elif orientation == 6: image = image.transpose(Image.ROTATE_270) elif orientation == 8: image = image.transpose(Image.ROTATE_90) except Exception as ex: print(ex) print("Rotation Error") traceback.print_exc() image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1]) image = image.convert('RGB') image.save(file_path, optimize=True, quality=quality) def build_media_url(folder: str, uid: str, file_name: str = None) -> str: token = image_authenticator.generate_img_token() if not file_name: return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=uid, base_url=settings.BASE_URL, token=token) return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder, uid=uid, file_name=file_name, base_url=settings.BASE_URL, token=token) def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str: token = image_authenticator.generate_img_token(user_id) if not file_name: return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=data_id, base_url=settings.BASE_URL, token=token) return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder, uid=data_id, file_name=file_name, base_url=settings.BASE_URL, token=token) def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str: token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id) return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'