import time import fitz import uuid import os import base64 from fwd_api.models import SubscriptionRequest, UserProfile from fwd_api.celery_worker.worker import app from ..constant.common import FolderFileType, image_extensions from ..exception.exceptions import FileContentInvalidException from ..utils import FileUtils, ProcessUtil, S3_process from celery.utils.log import get_task_logger from fwd import settings logger = get_task_logger(__name__) s3_client = S3_process.MinioS3Client( endpoint=settings.S3_ENDPOINT, access_key=settings.S3_ACCESS_KEY, secret_key=settings.S3_SECRET_KEY, bucket_name=settings.S3_BUCKET_NAME ) def process_pdf_file(file_name: str, file_path: str, request, user) -> list: from fwd_api.models import SubscriptionRequestFile try: doc: fitz.Document = fitz.open(stream=FileUtils.get_file(file_path).read(), filetype="pdf") # Origin file new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, request=request, file_name=file_name, code=f'FIL{uuid.uuid4().hex}') new_request_file.save() # Sub-file return ProcessUtil.pdf_to_images_urls(doc, request, user) except Exception as e: request.status = 400 request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"} request.save() return None def process_pdf_byte(file_name: str, file_path: str, request, user, file_obj) -> list: from fwd_api.models import SubscriptionRequestFile doc: fitz.Document = fitz.open(stream=file_obj, filetype="pdf") # Origin file new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, request=request, file_name=file_name, code=f'FIL{uuid.uuid4().hex}') new_request_file.save() try: # Sub-file return ProcessUtil.pdf_to_images_urls(doc, request, user) except Exception as e: request.status = 400 request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"} request.save() return None def process_image_file(file_name: str, file_path, request, user) -> list: from fwd_api.models import SubscriptionRequestFile new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, request=request, file_name=file_name, code=f'FIL{uuid.uuid4().hex}') new_request_file.save() return [{ 'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name), 'page_number': 0, 'request_file_id': new_request_file.code }] @app.task(name='do_pdf') def process_pdf(rq_id, sub_id, p_type, user_id, files): """ files: [{ "file_name": "", "file_path": "", # local path to file "file_type": "" },] """ start = time.time() new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] user = UserProfile.objects.filter(id=user_id).first() b_urls = [] new_request.pages = len(files) new_request.pages_left = len(files) for i, file in enumerate(files): extension = file["file_name"].split(".")[-1].lower() if extension == "pdf": _b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user) if _b_urls is None: new_request.status = 400 new_request.save() raise FileContentInvalidException for j in range(len(_b_urls)): _b_urls[j]["doc_type"] = file["file_type"] _b_urls[j]["page_number"] = j + len(b_urls) # b_urls += _b_urls # TODO: Client may request all images in a file, for now, extract the first page only b_urls.append(_b_urls[0]) elif extension in image_extensions: this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0] this_url["page_number"] = len(b_urls) if file["file_type"]: this_url["doc_type"] = file["file_type"] b_urls.append(this_url) start_process = time.time() logger.info(f"BE proccessing time: {start_process - start}") # TODO: send to queue with different request_ids doc_type_string ="" for i, b_url in enumerate(b_urls): fractorized_request_id = rq_id + f"_sub_{i}" ProcessUtil.send_to_queue2(fractorized_request_id, sub_id, [b_url], user_id, p_type) doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string = doc_type_string[:-1] new_request.doc_type = doc_type_string new_request.save() @app.task(name='upload_file_to_s3') def upload_file_to_s3(local_file_path, s3_key): if s3_client.s3_client is not None: res = s3_client.upload_file(local_file_path, s3_key) if res != None and res["ResponseMetadata"]["HTTPStatusCode"] == 200: os.remove(local_file_path) else: logger.info(f"S3 is not available, skipping,...") @app.task(name='upload_obj_to_s3') def upload_obj_to_s3(byte_obj, s3_key): if s3_client.s3_client is not None: obj = base64.b64decode(byte_obj) res = s3_client.update_object(s3_key, obj) else: logger.info(f"S3 is not available, skipping,...")