sbt-idp/cope2n-api/fwd_api/utils/FileUtils.py

241 lines
10 KiB
Python
Executable File

import io
import os
import traceback
import base64
import json
from PIL import Image, ExifTags
from django.core.files.uploadedfile import TemporaryUploadedFile
from fwd import settings
from fwd_api.constant.common import allowed_file_extensions
from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \
ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException
from fwd_api.models import SubscriptionRequest, OcrTemplate
from fwd_api.utils import ProcessUtil
from fwd_api.utils.CryptoUtils import image_authenticator
from ..celery_worker.client_connector import c_connector
import imagesize
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
print(f'dafile {f} is file type{type(f)}')
if not isinstance(f, TemporaryUploadedFile):
# print(f'[DEBUG]: {f.name}')
raise InvalidException(excArgs="files")
extension = f.name.split(".")[-1].lower() in allowed_file_extensions
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=allowed_file_extensions)
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def get_file(file_path: str):
try:
return open(file_path, 'rb')
except Exception as e:
print(e)
raise GeneralException("System")
def get_template_folder_path(tem: OcrTemplate):
tem_id = str(tem.id)
sub_id = str(tem.subscription.id)
user_id = str(tem.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, "templates", tem_id)
def get_folder_path(rq: SubscriptionRequest):
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
request_id = str(rq.request_id)
logger.info(f"[DEBUG]: rq.process_type: {rq.process_type}")
p_type = ProcessUtil.map_process_type_to_folder_name(int(rq.process_type))
sub_id = str(rq.subscription.id)
user_id = str(rq.subscription.user.id)
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id)
def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, 'wb+') as w:
w.write(file_bytes)
return file_path
def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
return file_path
def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
folder_path = get_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
file_path = os.path.join(folder_path, file_name)
with open(file_path, "w") as json_file:
json.dump(data, json_file)
return file_path
def delete_file_with_path(file_path: str) -> bool:
try:
os.remove(file_path)
return True
except Exception as e:
print(e)
return False
def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality):
try:
folder_path = get_template_folder_path(rq)
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
return save_file_with_path(file_name, file, quality, folder_path)
except Exception as e:
print(e)
raise ServiceUnavailableException()
def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality):
try:
folder_path = get_folder_path(rq)
# print(f"[DEBUG]: folder_path: {folder_path}")
is_exist = os.path.exists(folder_path)
if not is_exist:
# Create a new directory because it does not exist
os.makedirs(folder_path)
return save_file_with_path(file_name, file, quality, folder_path)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def save_to_S3(file_name, rq, local_file_path):
try:
# base64_obj = base64.b64encode(obj).decode('utf-8')
file_path = get_folder_path(rq)
assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id"
s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name)
# c_connector.upload_file_to_s3((file_path, s3_key))
c_connector.upload_file_to_s3((local_file_path, s3_key))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
try:
file_path = os.path.join(folder_path, file_name)
extension = file_name.split(".")[-1]
if extension in ['pdf', 'PDF']:
save_pdf(file_path, file)
else:
save_img(file_path, file, quality)
except InvalidDecompressedSizeException as e:
raise e
except Exception as e:
print(e)
raise ServiceUnavailableException()
return file_path
def save_pdf(file_path: str, file: TemporaryUploadedFile):
f = open(file_path, 'wb+')
for chunk in file.chunks():
f.write(chunk)
f.close()
def save_img(file_path: str, file: TemporaryUploadedFile, quality):
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
width, height = imagesize.get(input_file)
if width > settings.MAX_PIXEL_IN_A_FILE or height > settings.MAX_PIXEL_IN_A_FILE:
raise InvalidDecompressedSizeException(excArgs=(str(width), str(height), str(settings.MAX_PIXEL_IN_A_FILE)))
with open(file.temporary_file_path(), "rb") as fs:
input_file = io.BytesIO(fs.read())
image = Image.open(input_file)
# read orient from metadata. WindowsPhoto keep the origin
for orientation in ExifTags.TAGS.keys():
if ExifTags.TAGS[orientation] == 'Orientation':
break
try:
e = image._getexif() # returns None if no EXIF data
if e:
exif = dict(e.items())
if orientation in exif:
orientation = exif[orientation]
if orientation == 3:
image = image.transpose(Image.ROTATE_180)
elif orientation == 6:
image = image.transpose(Image.ROTATE_270)
elif orientation == 8:
image = image.transpose(Image.ROTATE_90)
except Exception as ex:
print(ex)
print("Rotation Error")
traceback.print_exc()
image.convert('RGB').save(file_path, optimize=True, quality=quality)
def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
token = image_authenticator.generate_img_token()
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=uid,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=uid,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str:
token = image_authenticator.generate_img_token(user_id)
if not file_name:
return '{base_url}/api/ctel/media/{folder}/{uid}/?token={token}'.format(folder=folder, uid=data_id,
base_url=settings.BASE_URL,
token=token)
return '{base_url}/api/ctel/media/{folder}/{uid}/?file_name={file_name}&token={token}'.format(folder=folder,
uid=data_id,
file_name=file_name,
base_url=settings.BASE_URL,
token=token)
def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str:
token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id)
return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'