diff --git a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py index 28b86a4..ef16d45 100755 --- a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py +++ b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py @@ -58,17 +58,18 @@ def process_manulife_invoice(rq_id, list_url): return {"rq_id": rq_id} @app.task(name="process_sbt_invoice") -def process_sbt_invoice(rq_id, list_url): +def process_sbt_invoice(rq_id, list_url, metadata): # TODO: simply returning 200 and 404 doesn't make any sense c_connector = CeleryConnector() try: result = compile_output_sbt(list_url) + metadata['ai_inference_profile'] = result.pop("inference_profile") hoadon = {"status": 200, "content": result, "message": "Success"} print(hoadon) - c_connector.process_sbt_invoice_result((rq_id, hoadon)) + c_connector.process_sbt_invoice_result((rq_id, hoadon, metadata)) return {"rq_id": rq_id} except Exception as e: print(e) hoadon = {"status": 404, "content": {}} - c_connector.process_sbt_invoice_result((rq_id, hoadon)) + c_connector.process_sbt_invoice_result((rq_id, hoadon, metadata)) return {"rq_id": rq_id} \ No newline at end of file diff --git a/cope2n-ai-fi/common/process_pdf.py b/cope2n-ai-fi/common/process_pdf.py index 4ad48b1..151d3ea 100755 --- a/cope2n-ai-fi/common/process_pdf.py +++ b/cope2n-ai-fi/common/process_pdf.py @@ -1,5 +1,6 @@ import os import json +import time from common import json2xml from common.json2xml import convert_key_names, replace_xml_values @@ -213,6 +214,8 @@ def compile_output_sbt(list_url): dict: output compiled """ + inference_profile = {} + results = { "model":{ "name":"Invoice", @@ -225,16 +228,23 @@ def compile_output_sbt(list_url): outputs = [] + start = time.time() + pages_predict_time = [] for page in list_url: output_model = predict_sbt(page['page_number'], page['file_url']) + pages_predict_time.append(time.time()) if "doc_type" in page: output_model['doc_type'] = page['doc_type'] outputs.append(output_model) + start_postprocess = time.time() documents = merge_sbt_output(outputs) + inference_profile["postprocess"] = [start_postprocess, time.time()] + inference_profile["inference"] = [start, pages_predict_time] results = { "total_pages": len(list_url), "ocr_num_pages": len(list_url), - "document": documents + "document": documents, + "inference_profile": inference_profile } return results diff --git a/cope2n-ai-fi/modules/sdsvkvu b/cope2n-ai-fi/modules/sdsvkvu index a471c10..4caed0d 160000 --- a/cope2n-ai-fi/modules/sdsvkvu +++ b/cope2n-ai-fi/modules/sdsvkvu @@ -1 +1 @@ -Subproject commit a471c1018c17cc917d1723776bae81f829450f95 +Subproject commit 4caed0d5ee08d1114727effd19bf32beab5263dc diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index c10af80..de752c0 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -1,6 +1,7 @@ import time import uuid from wsgiref.util import FileWrapper +from datetime import datetime from django.core.files.uploadedfile import TemporaryUploadedFile from django.http import HttpResponse, JsonResponse @@ -56,11 +57,11 @@ class CtelViewSet(viewsets.ViewSet): validated_data = ProcessUtil.validate_ocr_request_and_get(request, sub) provider_code = 'SAP' - rq_id = provider_code + uuid.uuid4().hex file_obj: TemporaryUploadedFile = validated_data['file'] file_extension = file_obj.name.split(".")[-1].lower() p_type = validated_data['type'] + rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex file_name = f"temp_{rq_id}.{file_extension}" is_test_request = validated_data.get("is_test_request", False) @@ -134,7 +135,6 @@ class CtelViewSet(viewsets.ViewSet): validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) provider_code = 'SAP' - rq_id = provider_code + uuid.uuid4().hex imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] @@ -145,6 +145,7 @@ class CtelViewSet(viewsets.ViewSet): } total_page = len(files.keys()) + rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex p_type = validated_data['type'] new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, pages_left=total_page, @@ -205,7 +206,6 @@ class CtelViewSet(viewsets.ViewSet): validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) provider_code = 'SAP' - rq_id = provider_code + uuid.uuid4().hex imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] @@ -214,7 +214,7 @@ class CtelViewSet(viewsets.ViewSet): "imei": imei_file_objs, "invoice": invoice_file_objs } - + rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex count = 0 doc_files_with_type = [] for doc_type, doc_files in files.items(): diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index eeefc5c..aed978c 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -64,12 +64,13 @@ def process_image_file(file_name: str, file_path, request, user) -> list: def process_pdf(rq_id, sub_id, p_type, user_id, files): """ files: [{ + "idx": int "file_name": "", "file_path": "", # local path to file "file_type": "" },] """ - new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] + new_request = SubscriptionRequest.objects.filter(request_id=rq_id).first() user = UserProfile.objects.filter(id=user_id).first() new_request.pages = len(files) new_request.pages_left = len(files) @@ -101,42 +102,24 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): files_with_idx = [(idx, file) for idx, file in enumerate(files)] for idx, url in pool.map(process_and_save_file, files_with_idx): b_urls[idx] = url - new_request.preprocessing_time = time.time() - start_time + preprocessing_time = time.time() - start_time # TODO: send to queue with different request_ids - doc_type_string = "" to_queue = [] + ai_inference_start_time = time.time() for i, b_url in enumerate(b_urls): + file_meta = {} fractorized_request_id = rq_id + f"_sub_{i}" - to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type)) - doc_type_string += "{},".format(b_url["doc_type"]) - doc_type_string = doc_type_string[:-1] - new_request.doc_type = doc_type_string - new_request.ai_inference_start_time = time.time() - new_request.save() - - trials = 0 - while True: - rq: SubscriptionRequest = \ - SubscriptionRequest.objects.filter(request_id=rq_id).first() - if rq.ai_inference_start_time != 0: - break - time.sleep(0.1) - trials += 1 - if trials > 5: - rq.preprocessing_time = time.time() - start_time - rq.doc_type = doc_type_string - rq.ai_inference_start_time = time.time() - rq.save() - if trials > 10: - rq.status = 404 - rq.save() - return - + file_meta["doc_type"] = b_url["doc_type"] + file_meta["ai_inference_start_time"] = ai_inference_start_time + file_meta["ai_inference_profile"] = {} + file_meta["index_in_request"] = i + file_meta["preprocessing_time"] = preprocessing_time + to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type, file_meta)) + # Send to next queue - for sub_rq_id, sub_id, urls, user_id, p_type in to_queue: - ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type) - + for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue: + ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata) @app.task(name='upload_file_to_s3') def upload_file_to_s3(local_file_path, s3_key, request_id): diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index b86d2f5..5c2cc7c 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -15,23 +15,23 @@ from fwd_api.utils import process as ProcessUtil redis_client = RedisUtils() -def aggregate_result(results, doc_types): - doc_types = doc_types.split(',') +def aggregate_result(results): + sorted_results = [None] * len(results) + doc_types = [] + for index, result in results.items(): + index = int(index) + doc_type = result["metadata"]["doc_type"] + doc_types.append(doc_type) + sorted_results[index] = ((doc_type, result)) des_result = deepcopy(list(results.values()))[0] + des_result.pop("metadata") des_result["content"]["total_pages"] = 0 des_result["content"]["ocr_num_pages"] = 0 des_result["content"]["document"][0]["end_page"] = 0 des_result["content"]["document"][0]["content"][3]["value"] = [None for _ in range(doc_types.count("imei"))] des_result["content"]["document"][0]["content"][2]["value"] = [] - sorted_results = [None] * len(doc_types) - for index, result in results.items(): - index = int(index) - doc_type = doc_types[index] - sorted_results[index] = ((doc_type, result)) - - imei_count = 0 for doc_type, result in sorted_results: des_result["content"]["total_pages"] += 1 @@ -136,7 +136,7 @@ def process_invoice_manulife_result(rq_id, result): random_processor_name = None @app.task(name='process_sbt_invoice_result') -def process_invoice_sbt_result(rq_id, result): +def process_invoice_sbt_result(rq_id, result, metadata): global random_processor_name if random_processor_name is None: random_processor_name = uuid.uuid4() @@ -146,21 +146,8 @@ def process_invoice_sbt_result(rq_id, result): page_index = int(rq_id.split("_sub_")[1]) rq_id = rq_id.split("_sub_")[0] rq: SubscriptionRequest = SubscriptionRequest.objects.filter(request_id=rq_id).first() - for i in range(10): - if rq.ai_inference_start_time == 0: - logging.warn(f"ai_inference_start_time = 0, looks like database is lagging, attemp {i} in 0.2 second ...") - rq.refresh_from_db() - time.sleep(0.2) - if i == 9: # return an error - logging.warn("Unable to retrieve rq, exiting") - rq.status = 404 # stop waiting - rq.predict_result = result - rq.save() - update_user(rq) - return "FailInvoice" - else: - break + result["metadata"] = metadata # status = to_status(result) status = result.get("status", 200) redis_client.set_cache(rq_id, page_index, result) @@ -169,17 +156,26 @@ def process_invoice_sbt_result(rq_id, result): if done: rq.status = 200 # stop waiting results = redis_client.get_all_cache(rq_id) - rq.predict_result = aggregate_result(results, rq.doc_type) + rq.predict_result = aggregate_result(results) # print(f"[DEBUG]: rq.predict_result: {rq.predict_result}") + ai_inference_profile = {} + doc_type_string = "" + for idx, result in results.items(): + ai_inference_profile["{doc_type}_{idx}".format(doc_type=result["metadata"]["doc_type"], idx=result["metadata"]["index_in_request"])] = result["metadata"]["ai_inference_profile"] + doc_type_string += "{},".format(result["metadata"]["doc_type"]) + doc_type_string = doc_type_string[:-1] + rq.ai_inference_profile = ai_inference_profile + rq.doc_type = doc_type_string + rq.ai_inference_start_time = result["metadata"]["ai_inference_start_time"] # advancing the last result + rq.preprocessing_time = result["metadata"]["preprocessing_time"] # advancing the last result + rq.ai_inference_time = time.time() - rq.ai_inference_start_time rq.save() - + else: rq.status = 404 # stop waiting rq.predict_result = result rq.save() - rq.ai_inference_time = time.time() - rq.ai_inference_start_time - rq.save() update_user(rq) except IndexError as e: print(e) diff --git a/cope2n-api/fwd_api/migrations/0163_subscriptionrequest_ai_inference_profile.py b/cope2n-api/fwd_api/migrations/0163_subscriptionrequest_ai_inference_profile.py new file mode 100644 index 0000000..7039658 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0163_subscriptionrequest_ai_inference_profile.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-27 09:02 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0162_merge_20231225_1439'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_profile', + field=models.JSONField(null=True), + ), + ] diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index 0cbda44..a6c90a8 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -21,6 +21,7 @@ class SubscriptionRequest(models.Model): is_test_request = models.BooleanField(default=False) S3_uploaded = models.BooleanField(default=False) + ai_inference_profile=models.JSONField(null=True) preprocessing_time = models.FloatField(default=-1) ai_inference_start_time = models.FloatField(default=0) ai_inference_time = models.FloatField(default=0) diff --git a/cope2n-api/fwd_api/utils/process.py b/cope2n-api/fwd_api/utils/process.py index a7a19a6..20f9bb7 100644 --- a/cope2n-api/fwd_api/utils/process.py +++ b/cope2n-api/fwd_api/utils/process.py @@ -307,7 +307,7 @@ def token_value(token_type): return 1 # Basic OCR -def send_to_queue2(rq_id, sub_id, file_url, user_id, typez): +def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata): try: if typez == ProcessType.ID_CARD.value: c_connector.process_id( @@ -319,7 +319,7 @@ def send_to_queue2(rq_id, sub_id, file_url, user_id, typez): elif typez == ProcessType.MANULIFE_INVOICE.value: c_connector.process_invoice_manulife((rq_id, file_url)) elif typez == ProcessType.SBT_INVOICE.value: - c_connector.process_invoice_sbt((rq_id, file_url)) + c_connector.process_invoice_sbt((rq_id, file_url, metadata)) except Exception as e: print(e) raise BadGatewayException() diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 45b7e4f..5638fee 100755 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -82,6 +82,7 @@ services: db-sbt: condition: service_started command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input && + python manage.py makemigrations && python manage.py migrate && python manage.py compilemessages && gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod