sbt-idp/cope2n-api/fwd_api/utils/FileUtils.py

237 lines
10 KiB
Python
Raw Normal View History

2023-11-30 11:19:06 +00:00
import io
import os
import traceback
2023-12-12 05:54:34 +00:00
import pathlib
2023-12-08 12:49:00 +00:00
import json
2023-11-30 11:19:06 +00:00
from PIL import Image, ExifTags
from django.core.files.uploadedfile import TemporaryUploadedFile
from fwd import settings
from fwd_api.constant.common import allowed_file_extensions
from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \
2023-12-11 13:15:48 +00:00
ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException
2023-11-30 11:19:06 +00:00
from fwd_api.models import SubscriptionRequest, OcrTemplate
from fwd_api.utils import ProcessUtil
from fwd_api.utils.CryptoUtils import image_authenticator
2023-12-12 05:54:34 +00:00
from fwd_api.utils.image import resize
2023-11-30 11:19:06 +00:00
from ..celery_worker.client_connector import c_connector
2023-12-11 13:15:48 +00:00
import imagesize
2023-11-30 11:19:06 +00:00
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
# print(f'[DEBUG]: {f.name}')
raise InvalidException(excArgs="files")
2023-12-05 05:59:06 +00:00
extension = f.name.split(".")[-1].lower() in allowed_file_extensions
2023-11-30 11:19:06 +00:00
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=allowed_file_extensions)
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def get_file(file_path: str):
try:
return open(file_path, 'rb')
except Exception as e:
print(e)
raise GeneralException("System")
def get_template_folder_path(tem: OcrTemplate):
tem_id = str(tem.id)
sub_id = str(tem.subscription.id)
user_id = str(tem.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, "templates", tem_id)
def get_folder_path(rq: SubscriptionRequest):
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
request_id = str(rq.request_id)
logger.info(f"[DEBUG]: rq.process_type: {rq.process_type}")
p_type = ProcessUtil.map_process_type_to_folder_name(int(rq.process_type))
sub_id = str(rq.subscription.id)
user_id = str(rq.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id)
def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, 'wb+') as w:
w.write(file_bytes)
return file_path
def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
return file_path
2023-12-08 12:49:00 +00:00
def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, "w") as json_file:
json.dump(data, json_file)
return file_path
2023-11-30 11:19:06 +00:00
def delete_file_with_path(file_path: str) -> bool:
try:
os.remove(file_path)
return True
except Exception as e:
print(e)
return False
def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality):
try:
folder_path = get_template_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
return save_file_with_path(file_name, file, quality, folder_path)
except Exception as e:
print(e)
raise ServiceUnavailableException()
2023-12-12 05:54:34 +00:00
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
try:
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
if extension.lower() == "pdf":
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
print(e)
raise ServiceUnavailableException()
return file_path
2023-11-30 11:19:06 +00:00
def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality):
try:
folder_path = get_folder_path(rq)
2023-12-12 05:54:34 +00:00
pathlib.Path(folder_path).mkdir(exist_ok=True, parents=True)
2023-11-30 11:19:06 +00:00
return save_file_with_path(file_name, file, quality, folder_path)
2023-12-11 13:15:48 +00:00
except InvalidDecompressedSizeException as e:
raise e
2023-11-30 11:19:06 +00:00
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
2023-12-05 05:59:06 +00:00
def save_to_S3(file_name, rq, local_file_path):
2023-11-30 11:19:06 +00:00
try:
file_path = get_folder_path(rq)
assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id"
s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name)
2023-12-05 05:59:06 +00:00
c_connector.upload_file_to_s3((local_file_path, s3_key))
return s3_key
2023-11-30 11:19:06 +00:00
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def save_pdf(file_path: str, file: TemporaryUploadedFile):
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
def save_img(file_path: str, file: TemporaryUploadedFile, quality):
2023-12-11 13:15:48 +00:00
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
width, height = imagesize.get(input_file)
if width > settings.MAX_PIXEL_IN_A_FILE or height > settings.MAX_PIXEL_IN_A_FILE:
raise InvalidDecompressedSizeException(excArgs=(str(width), str(height), str(settings.MAX_PIXEL_IN_A_FILE)))
2023-11-30 11:19:06 +00:00
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
image = Image.open(input_file)
# read orient from metadata. WindowsPhoto keep the origin
for orientation in ExifTags.TAGS.keys():
if ExifTags.TAGS[orientation] == 'Orientation':
break
try:
e = image._getexif() # returns None if no EXIF data
if e:
exif = dict(e.items())
if orientation in exif:
orientation = exif[orientation]
if orientation == 3:
image = image.transpose(Image.ROTATE_180)
elif orientation == 6:
image = image.transpose(Image.ROTATE_270)
elif orientation == 8:
image = image.transpose(Image.ROTATE_90)
except Exception as ex:
print(ex)
print("Rotation Error")
traceback.print_exc()
2023-12-12 05:54:34 +00:00
image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1])
image = image.convert('RGB')
image.save(file_path, optimize=True, quality=quality)
2023-11-30 11:19:06 +00:00
def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
token = image_authenticator.generate_img_token()
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=uid,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=uid,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str:
token = image_authenticator.generate_img_token(user_id)
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=data_id,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=data_id,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str:
token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id)
return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'