sbt-idp/cope2n-api/fwd_api/celery_worker/internal_task.py

140 lines
5.5 KiB
Python
Executable File

import time
import uuid
import os
import base64
import traceback
from multiprocessing.pool import ThreadPool
from fwd_api.models import SubscriptionRequest, UserProfile
from fwd_api.celery_worker.worker import app
from ..constant.common import FolderFileType, image_extensions
from ..exception.exceptions import FileContentInvalidException
from fwd_api.models import SubscriptionRequestFile
from ..utils import file as FileUtils
from ..utils import process as ProcessUtil
from ..utils import s3 as S3Util
from celery.utils.log import get_task_logger
from fwd import settings
logger = get_task_logger(__name__)
s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME
)
def process_pdf_file(file_name: str, file_path: str, request, user) -> list:
try:
# Origin file
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}')
new_request_file.save()
# Sub-file
return ProcessUtil.pdf_to_images_urls(FileUtils.get_file(file_path), request, user)
except Exception as e:
traceback.print_exc()
request.status = 400
request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"}
request.save()
return None
def process_image_file(file_name: str, file_path, request, user) -> list:
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}')
new_request_file.save()
return [{
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
'page_number': 0,
'request_file_id': new_request_file.code
}]
@app.task(name='do_pdf')
def process_pdf(rq_id, sub_id, p_type, user_id, files):
"""
files: [{
"file_name": "",
"file_path": "", # local path to file
"file_type": ""
},]
"""
new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0]
user = UserProfile.objects.filter(id=user_id).first()
new_request.pages = len(files)
new_request.pages_left = len(files)
def process_and_save_file(data):
idx, file = data
extension = file["file_name"].split(".")[-1].lower()
if extension == "pdf":
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user)
if _b_urls is None:
new_request.status = 400
new_request.save()
raise FileContentInvalidException
for j in range(len(_b_urls)):
_b_urls[j]["doc_type"] = file["file_type"]
_b_urls[j]["page_number"] = idx
return idx, _b_urls[0]
elif extension in image_extensions:
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0]
this_url["page_number"] = idx
if file["file_type"]:
this_url["doc_type"] = file["file_type"]
return idx, this_url
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
start_time = time.time()
b_urls = [None] * len(files)
pool = ThreadPool(processes=2)
files_with_idx = [(idx, file) for idx, file in enumerate(files)]
for idx, url in pool.map(process_and_save_file, files_with_idx):
b_urls[idx] = url
new_request.preprocessing_time = time.time() - start_time
# TODO: send to queue with different request_ids
doc_type_string = ""
to_queue = []
for i, b_url in enumerate(b_urls):
fractorized_request_id = rq_id + f"_sub_{i}"
to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type))
doc_type_string += "{},".format(b_url["doc_type"])
doc_type_string = doc_type_string[:-1]
new_request.doc_type = doc_type_string
new_request.ai_inference_start_time = time.time()
new_request.save()
# Send to next queue
for sub_rq_id, sub_id, urls, user_id, p_type in to_queue:
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type)
@app.task(name='upload_file_to_s3')
def upload_file_to_s3(local_file_path, s3_key, request_id):
if s3_client.s3_client is not None:
try:
s3_client.upload_file(local_file_path, s3_key)
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)
sub_request.S3_uploaded = True
sub_request.save()
except Exception as e:
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(name='upload_obj_to_s3')
def upload_obj_to_s3(byte_obj, s3_key):
if s3_client.s3_client is not None:
obj = base64.b64decode(byte_obj)
res = s3_client.update_object(s3_key, obj)
else:
logger.info(f"S3 is not available, skipping,...")