import time import uuid import os import base64 import traceback from multiprocessing.pool import ThreadPool from fwd_api.models import SubscriptionRequest, UserProfile from fwd_api.celery_worker.worker import app from ..constant.common import FolderFileType, image_extensions from ..exception.exceptions import FileContentInvalidException from fwd_api.models import SubscriptionRequestFile from ..utils import file as FileUtils from ..utils import process as ProcessUtil from ..utils import s3 as S3Util from fwd_api.constant.common import ProcessType from celery.utils.log import get_task_logger from fwd import settings logger = get_task_logger(__name__) s3_client = S3Util.MinioS3Client( endpoint=settings.S3_ENDPOINT, access_key=settings.S3_ACCESS_KEY, secret_key=settings.S3_SECRET_KEY, bucket_name=settings.S3_BUCKET_NAME ) def process_pdf_file(file_name: str, file_path: str, request, user) -> list: try: # Origin file new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, request=request, file_name=file_name, code=f'FIL{uuid.uuid4().hex}') new_request_file.save() # Sub-file return ProcessUtil.pdf_to_images_urls(FileUtils.get_file(file_path), request, user) except Exception as e: traceback.print_exc() request.status = 400 request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"} request.save() return None def process_image_file(file_name: str, file_path, request, user) -> list: new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, request=request, file_name=file_name, code=f'FIL{uuid.uuid4().hex}') new_request_file.save() return [{ 'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name), 'page_number': 0, 'request_file_id': new_request_file.code }] @app.task(name='do_pdf') def process_pdf(rq_id, sub_id, p_type, user_id, files): """ files: [{ "idx": int "file_name": "", "file_path": "", # local path to file "file_type": "" },] """ new_request = SubscriptionRequest.objects.filter(request_id=rq_id).first() user = UserProfile.objects.filter(id=user_id).first() new_request.pages = len(files) new_request.pages_left = len(files) def process_and_save_file(data): idx, file = data extension = file["file_name"].split(".")[-1].lower() if extension == "pdf": _b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user) if _b_urls is None: new_request.status = 400 new_request.save() raise FileContentInvalidException for j in range(len(_b_urls)): _b_urls[j]["doc_type"] = file["file_type"] _b_urls[j]["page_number"] = idx return idx, _b_urls[0] elif extension in image_extensions: this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0] this_url["page_number"] = idx if file["file_type"]: this_url["doc_type"] = file["file_type"] return idx, this_url # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible start_time = time.time() b_urls = [None] * len(files) pool = ThreadPool(processes=2) files_with_idx = [(idx, file) for idx, file in enumerate(files)] for idx, url in pool.map(process_and_save_file, files_with_idx): b_urls[idx] = url preprocessing_time = time.time() - start_time # TODO: send to queue with different request_ids to_queue = [] ai_inference_start_time = time.time() for i, b_url in enumerate(b_urls): file_meta = {} fractorized_request_id = rq_id + f"_sub_{i}" file_meta["doc_type"] = b_url["doc_type"] file_meta["ai_inference_start_time"] = ai_inference_start_time file_meta["ai_inference_profile"] = {} file_meta["index_in_request"] = i file_meta["preprocessing_time"] = preprocessing_time to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type, file_meta)) # Send to next queue for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue: ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata) @app.task(name='upload_file_to_s3') def upload_file_to_s3(local_file_path, s3_key, request_id): if s3_client.s3_client is not None: try: s3_client.upload_file(local_file_path, s3_key) sub_request = SubscriptionRequest.objects.filter(request_id=request_id)[0] sub_request.S3_uploaded = True sub_request.save() except Exception as e: logger.error(f"Unable to set S3: {e}") print(f"Unable to set S3: {e}") return else: logger.info(f"S3 is not available, skipping,...") @app.task(name='remove_local_file') def remove_local_file(local_file_path, request_id): print(f"[INFO] Removing local file: {local_file_path}, ...") try: os.remove(local_file_path) except Exception as e: logger.info(f"Unable to remove local file: {e}") @app.task(name='upload_obj_to_s3') def upload_obj_to_s3(byte_obj, s3_key): if s3_client.s3_client is not None: obj = base64.b64decode(byte_obj) res = s3_client.update_object(s3_key, obj) else: logger.info(f"S3 is not available, skipping,...")