2023-11-30 11:19:06 +00:00
|
|
|
import time
|
|
|
|
import uuid
|
|
|
|
import os
|
|
|
|
import base64
|
2023-12-13 09:01:31 +00:00
|
|
|
import traceback
|
2023-12-21 10:31:55 +00:00
|
|
|
from multiprocessing.pool import ThreadPool
|
2023-11-30 11:19:06 +00:00
|
|
|
|
2023-12-12 05:54:34 +00:00
|
|
|
from fwd_api.models import SubscriptionRequest, UserProfile
|
2023-11-30 11:19:06 +00:00
|
|
|
from fwd_api.celery_worker.worker import app
|
2023-12-12 05:54:34 +00:00
|
|
|
from ..constant.common import FolderFileType, image_extensions
|
2023-12-05 05:59:06 +00:00
|
|
|
from ..exception.exceptions import FileContentInvalidException
|
2023-12-13 09:01:31 +00:00
|
|
|
from fwd_api.models import SubscriptionRequestFile
|
2023-12-15 05:43:19 +00:00
|
|
|
from ..utils import file as FileUtils
|
|
|
|
from ..utils import process as ProcessUtil
|
|
|
|
from ..utils import s3 as S3Util
|
|
|
|
|
2023-11-30 11:19:06 +00:00
|
|
|
from celery.utils.log import get_task_logger
|
|
|
|
from fwd import settings
|
|
|
|
|
|
|
|
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
2023-12-15 05:43:19 +00:00
|
|
|
s3_client = S3Util.MinioS3Client(
|
2023-11-30 11:19:06 +00:00
|
|
|
endpoint=settings.S3_ENDPOINT,
|
|
|
|
access_key=settings.S3_ACCESS_KEY,
|
|
|
|
secret_key=settings.S3_SECRET_KEY,
|
|
|
|
bucket_name=settings.S3_BUCKET_NAME
|
2023-12-13 09:01:31 +00:00
|
|
|
)
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
def process_pdf_file(file_name: str, file_path: str, request, user) -> list:
|
2023-12-05 05:59:06 +00:00
|
|
|
try:
|
|
|
|
# Origin file
|
|
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
|
|
|
request=request,
|
|
|
|
file_name=file_name,
|
|
|
|
code=f'FIL{uuid.uuid4().hex}')
|
|
|
|
new_request_file.save()
|
|
|
|
# Sub-file
|
2023-12-13 09:01:31 +00:00
|
|
|
return ProcessUtil.pdf_to_images_urls(FileUtils.get_file(file_path), request, user)
|
2023-12-05 05:59:06 +00:00
|
|
|
except Exception as e:
|
2023-12-13 09:01:31 +00:00
|
|
|
traceback.print_exc()
|
2023-12-05 05:59:06 +00:00
|
|
|
request.status = 400
|
|
|
|
request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"}
|
|
|
|
request.save()
|
|
|
|
return None
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
|
|
|
|
def process_image_file(file_name: str, file_path, request, user) -> list:
|
|
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
|
|
|
request=request,
|
|
|
|
file_name=file_name,
|
|
|
|
code=f'FIL{uuid.uuid4().hex}')
|
|
|
|
new_request_file.save()
|
|
|
|
return [{
|
|
|
|
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
|
|
|
|
'page_number': 0,
|
|
|
|
'request_file_id': new_request_file.code
|
|
|
|
}]
|
|
|
|
|
|
|
|
|
|
|
|
@app.task(name='do_pdf')
|
2023-12-05 05:59:06 +00:00
|
|
|
def process_pdf(rq_id, sub_id, p_type, user_id, files):
|
|
|
|
"""
|
2023-12-08 12:49:00 +00:00
|
|
|
files: [{
|
2023-12-05 05:59:06 +00:00
|
|
|
"file_name": "",
|
|
|
|
"file_path": "", # local path to file
|
|
|
|
"file_type": ""
|
|
|
|
},]
|
|
|
|
"""
|
2023-11-30 11:19:06 +00:00
|
|
|
new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0]
|
|
|
|
user = UserProfile.objects.filter(id=user_id).first()
|
2023-12-08 12:49:00 +00:00
|
|
|
new_request.pages = len(files)
|
|
|
|
new_request.pages_left = len(files)
|
|
|
|
|
2023-12-21 10:31:55 +00:00
|
|
|
def process_and_save_file(data):
|
|
|
|
idx, file = data
|
2023-12-05 05:59:06 +00:00
|
|
|
extension = file["file_name"].split(".")[-1].lower()
|
|
|
|
if extension == "pdf":
|
|
|
|
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user)
|
|
|
|
if _b_urls is None:
|
2023-12-08 12:49:00 +00:00
|
|
|
new_request.status = 400
|
|
|
|
new_request.save()
|
2023-12-05 05:59:06 +00:00
|
|
|
raise FileContentInvalidException
|
2023-12-06 10:15:27 +00:00
|
|
|
for j in range(len(_b_urls)):
|
|
|
|
_b_urls[j]["doc_type"] = file["file_type"]
|
2023-12-21 10:31:55 +00:00
|
|
|
_b_urls[j]["page_number"] = idx
|
|
|
|
return idx, _b_urls[0]
|
2023-12-05 05:59:06 +00:00
|
|
|
elif extension in image_extensions:
|
|
|
|
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0]
|
2023-12-21 10:31:55 +00:00
|
|
|
this_url["page_number"] = idx
|
2023-12-05 05:59:06 +00:00
|
|
|
if file["file_type"]:
|
|
|
|
this_url["doc_type"] = file["file_type"]
|
2023-12-21 10:31:55 +00:00
|
|
|
return idx, this_url
|
|
|
|
|
|
|
|
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
|
|
|
|
start_time = time.time()
|
|
|
|
b_urls = [None] * len(files)
|
|
|
|
pool = ThreadPool(processes=2)
|
|
|
|
files_with_idx = [(idx, file) for idx, file in enumerate(files)]
|
|
|
|
for idx, url in pool.map(process_and_save_file, files_with_idx):
|
|
|
|
b_urls[idx] = url
|
|
|
|
new_request.preprocessing_time = time.time() - start_time
|
2023-11-30 11:19:06 +00:00
|
|
|
|
2023-12-08 12:49:00 +00:00
|
|
|
# TODO: send to queue with different request_ids
|
2023-12-15 05:43:19 +00:00
|
|
|
doc_type_string = ""
|
2023-12-21 10:31:55 +00:00
|
|
|
to_queue = []
|
2023-12-08 12:49:00 +00:00
|
|
|
for i, b_url in enumerate(b_urls):
|
|
|
|
fractorized_request_id = rq_id + f"_sub_{i}"
|
2023-12-21 10:31:55 +00:00
|
|
|
to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type))
|
2023-12-08 12:49:00 +00:00
|
|
|
doc_type_string += "{},".format(b_url["doc_type"])
|
|
|
|
doc_type_string = doc_type_string[:-1]
|
|
|
|
new_request.doc_type = doc_type_string
|
2023-12-22 07:03:56 +00:00
|
|
|
new_request.ai_inference_start_time = time.time()
|
2023-12-08 12:49:00 +00:00
|
|
|
new_request.save()
|
|
|
|
|
2023-12-21 10:31:55 +00:00
|
|
|
# Send to next queue
|
|
|
|
for sub_rq_id, sub_id, urls, user_id, p_type in to_queue:
|
|
|
|
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type)
|
|
|
|
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
@app.task(name='upload_file_to_s3')
|
2023-12-25 03:32:09 +00:00
|
|
|
def upload_file_to_s3(local_file_path, s3_key, request_id):
|
2023-11-30 11:19:06 +00:00
|
|
|
if s3_client.s3_client is not None:
|
2023-12-25 03:32:09 +00:00
|
|
|
try:
|
|
|
|
s3_client.upload_file(local_file_path, s3_key)
|
|
|
|
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)
|
|
|
|
sub_request.S3_uploaded = True
|
|
|
|
sub_request.save()
|
|
|
|
except Exception as e:
|
|
|
|
return
|
2023-11-30 11:19:06 +00:00
|
|
|
else:
|
2023-12-05 05:59:06 +00:00
|
|
|
logger.info(f"S3 is not available, skipping,...")
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
@app.task(name='upload_obj_to_s3')
|
|
|
|
def upload_obj_to_s3(byte_obj, s3_key):
|
|
|
|
if s3_client.s3_client is not None:
|
|
|
|
obj = base64.b64decode(byte_obj)
|
|
|
|
res = s3_client.update_object(s3_key, obj)
|
|
|
|
else:
|
2023-12-05 05:59:06 +00:00
|
|
|
logger.info(f"S3 is not available, skipping,...")
|