sbt-idp/cope2n-api/fwd_api/celery_worker/internal_task.py
2023-12-15 12:43:19 +07:00

124 lines
4.9 KiB
Python
Executable File

import time
import uuid
import os
import base64
import traceback
from fwd_api.models import SubscriptionRequest, UserProfile
from fwd_api.celery_worker.worker import app
from ..constant.common import FolderFileType, image_extensions
from ..exception.exceptions import FileContentInvalidException
from fwd_api.models import SubscriptionRequestFile
from ..utils import file as FileUtils
from ..utils import process as ProcessUtil
from ..utils import s3 as S3Util
from celery.utils.log import get_task_logger
from fwd import settings
logger = get_task_logger(__name__)
s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME
)
def process_pdf_file(file_name: str, file_path: str, request, user) -> list:
try:
# Origin file
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}')
new_request_file.save()
# Sub-file
return ProcessUtil.pdf_to_images_urls(FileUtils.get_file(file_path), request, user)
except Exception as e:
traceback.print_exc()
request.status = 400
request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"}
request.save()
return None
def process_image_file(file_name: str, file_path, request, user) -> list:
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}')
new_request_file.save()
return [{
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
'page_number': 0,
'request_file_id': new_request_file.code
}]
@app.task(name='do_pdf')
def process_pdf(rq_id, sub_id, p_type, user_id, files):
"""
files: [{
"file_name": "",
"file_path": "", # local path to file
"file_type": ""
},]
"""
start = time.time()
new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0]
user = UserProfile.objects.filter(id=user_id).first()
b_urls = []
new_request.pages = len(files)
new_request.pages_left = len(files)
for i, file in enumerate(files):
extension = file["file_name"].split(".")[-1].lower()
if extension == "pdf":
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user)
if _b_urls is None:
new_request.status = 400
new_request.save()
raise FileContentInvalidException
for j in range(len(_b_urls)):
_b_urls[j]["doc_type"] = file["file_type"]
_b_urls[j]["page_number"] = j + len(b_urls)
# b_urls += _b_urls # TODO: Client may request all images in a file, for now, extract the first page only
b_urls.append(_b_urls[0])
elif extension in image_extensions:
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0]
this_url["page_number"] = len(b_urls)
if file["file_type"]:
this_url["doc_type"] = file["file_type"]
b_urls.append(this_url)
start_process = time.time()
logger.info(f"BE proccessing time: {start_process - start}")
# TODO: send to queue with different request_ids
doc_type_string = ""
for i, b_url in enumerate(b_urls):
fractorized_request_id = rq_id + f"_sub_{i}"
ProcessUtil.send_to_queue2(fractorized_request_id, sub_id, [b_url], user_id, p_type)
doc_type_string += "{},".format(b_url["doc_type"])
doc_type_string = doc_type_string[:-1]
new_request.doc_type = doc_type_string
new_request.save()
@app.task(name='upload_file_to_s3')
def upload_file_to_s3(local_file_path, s3_key):
if s3_client.s3_client is not None:
res = s3_client.upload_file(local_file_path, s3_key)
if res != None and res["ResponseMetadata"]["HTTPStatusCode"] == 200:
os.remove(local_file_path)
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(name='upload_obj_to_s3')
def upload_obj_to_s3(byte_obj, s3_key):
if s3_client.s3_client is not None:
obj = base64.b64decode(byte_obj)
res = s3_client.update_object(s3_key, obj)
else:
logger.info(f"S3 is not available, skipping,...")