From e2d576b83a8a0f4de61e50bb1befb435717ae823 Mon Sep 17 00:00:00 2001 From: dx-tan Date: Tue, 26 Dec 2023 18:44:03 +0700 Subject: [PATCH 1/5] Fix: false aggregate results --- cope2n-api/fwd/settings.py | 3 ++- .../celery_worker/process_result_tasks.py | 18 ++++++++++++++++-- .../fwd_api/models/SubscriptionRequest.py | 1 - cope2n-api/fwd_api/utils/file.py | 2 +- cope2n-api/fwd_api/utils/redis.py | 2 ++ cope2n-api/locale/vi_VN/LC_MESSAGES/django.mo | Bin 4094 -> 4244 bytes cope2n-api/locale/vi_VN/LC_MESSAGES/django.po | 6 ++++++ cope2n-fe/nginx.conf | 2 +- 8 files changed, 28 insertions(+), 6 deletions(-) diff --git a/cope2n-api/fwd/settings.py b/cope2n-api/fwd/settings.py index 514ff37..4b6a13f 100755 --- a/cope2n-api/fwd/settings.py +++ b/cope2n-api/fwd/settings.py @@ -111,6 +111,7 @@ DATABASES = { 'PASSWORD': env.str("DB_PASSWORD", None), 'HOST': env.str("DB_HOST", None), 'PORT': env.str("DB_PORT", None), + 'CONN_MAX_AGE': None, } } @@ -207,7 +208,7 @@ BROKER_URL = env.str("BROKER_URL", default="amqp://test:test@107.120.70.226:5672 CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 -MAX_UPLOAD_SIZE_OF_A_FILE = 100 * 1024 * 1024 # 100 MB +MAX_UPLOAD_SIZE_OF_A_FILE = 5 * 1024 * 1024 # 5 MB MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST = 100 * 1024 * 1024 # 100 MB MAX_UPLOAD_FILES_IN_A_REQUEST = 5 MAX_PIXEL_IN_A_FILE = 5000 diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index 4121dc3..5fe7466 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -1,6 +1,7 @@ import traceback import time import uuid +import logging from copy import deepcopy @@ -146,6 +147,21 @@ def process_invoice_sbt_result(rq_id, result): rq_id = rq_id.split("_sub_")[0] rq: SubscriptionRequest = \ SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.SBT_INVOICE.value)[0] + for i in range(10): + if rq.ai_inference_start_time == 0: + logging.warn(f"ai_inference_start_time = 0, looks like database is lagging, attemp {i} in 0.2 second ...") + rq.refresh_from_db() + time.sleep(0.2) + if i == 9: # return an error + logging.warn("Unable to retrieve rq, exiting") + rq.status = 404 # stop waiting + rq.predict_result = result + rq.save() + update_user(rq) + return "FailInvoice" + else: + break + # status = to_status(result) status = result.get("status", 200) redis_client.set_cache(rq_id, page_index, result) @@ -156,13 +172,11 @@ def process_invoice_sbt_result(rq_id, result): results = redis_client.get_all_cache(rq_id) rq.predict_result = aggregate_result(results, rq.doc_type) # print(f"[DEBUG]: rq.predict_result: {rq.predict_result}") - redis_client.remove_cache(rq_id) rq.save() else: rq.status = 404 # stop waiting rq.predict_result = result - redis_client.remove_cache(rq_id) rq.save() rq.ai_inference_time = time.time() - rq.ai_inference_start_time diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index 780df18..0cbda44 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -18,7 +18,6 @@ class SubscriptionRequest(models.Model): subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) created_at = models.DateTimeField(default=timezone.now, db_index=True) updated_at = models.DateTimeField(auto_now=True) - S3_uploaded = models.BooleanField(default=False) is_test_request = models.BooleanField(default=False) S3_uploaded = models.BooleanField(default=False) diff --git a/cope2n-api/fwd_api/utils/file.py b/cope2n-api/fwd_api/utils/file.py index 4a94b39..f3af27e 100644 --- a/cope2n-api/fwd_api/utils/file.py +++ b/cope2n-api/fwd_api/utils/file.py @@ -31,7 +31,7 @@ def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUES raise InvalidException(excArgs="files") extension = f.name.split(".")[-1].lower() in allowed_file_extensions if not extension or "." not in f.name: - raise FileFormatInvalidException(excArgs=allowed_file_extensions) + raise FileFormatInvalidException(excArgs=list(allowed_file_extensions)) if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE: raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) total_file_size += f.size diff --git a/cope2n-api/fwd_api/utils/redis.py b/cope2n-api/fwd_api/utils/redis.py index da4c7ec..7a9b54e 100644 --- a/cope2n-api/fwd_api/utils/redis.py +++ b/cope2n-api/fwd_api/utils/redis.py @@ -1,5 +1,6 @@ import redis import json +from datetime import datetime, timedelta from django.conf import settings @@ -14,6 +15,7 @@ class RedisUtils: image_index: int """ self.redis_client.hset(request_id, image_index, json.dumps(data)) + self.redis_client.expire(request_id, 3600) def get_all_cache(self, request_id): resutlt = {} diff --git a/cope2n-api/locale/vi_VN/LC_MESSAGES/django.mo b/cope2n-api/locale/vi_VN/LC_MESSAGES/django.mo index 46499c0a63c864ad4d763617a051f95c97b2b292..71bafc73dea62caf96254432277b2e0aa36a6a86 100755 GIT binary patch delta 1505 zcmZA1OGs2v9LMp$kMTKbsbyy7b+p%H_R`AKBop{G6 zb;K$nH{R?x`V+ZO&iTxC<9STRAxyzB^y5p+#0k$yTta;o({SE1G07~QdKRjG0WLL* zSdg3LG}K@!HhG>xhk6(@@H%QugUHA3anXPey!vCzq5c##-Wy~B`-s^%hkD+}TWA4k zn8N&)&&^gE0+@i!s5dxQHv0-mKsIegJhQqF(zj=23rwD{%sqsTtH+ z_=#-4W%Cg4WcjH1N;1g*L2k-vppW&U9_+(ico+HDJ1()qggw;fQ2oN3B(+~gWu)J$ z52Mb+D^vzPpeCHcEd1=X&qui7W4}E!`Bp0~L`_hEN^Jw`EVxMatPk}@{iuF}$T_qj z&oNXcUZ7s!J!)%ba0C9pQjC;xR5d_7R$vRV^|qPVLa69hu~_sLT9DpWMQ^{B(7qNE z+D;9;od^;;2^DQ$5y2U?9mFDtquQ%7~wb1p1erDPp6{S}h zR4G{?w#T|vMpSf`_+PVHRsJgL^!sVh^FT)(|S%s&b-| z(D_l(za>euO@vbYe<@pR*xTCUj@J4b;vBal6XK(Z*UNw3BFI+#sZtMOM(t zD59{HFwu5s6;T8gRI4Z?g49Jpl#8P8@A{v^obx&N-19zX?yc5;+3-_c{I(Ge5H}Iy zG zG^@l$w;fw}&S4E6#WtF~<1zgN$bU z3mY)WGSzVwRo;UqFu=7qjjHz$HBb>%?=7ywdDOsvP&@D!HE;z9Ls%O1z5y#4-gTb)FQVGL#Af`6n$T}AFK4?$HAs=k zl$o{Rah?Neso@P&gBg-0oOPe0I{b>=7-u$`P>w_s97G+(5NfOQsCtvwj&n#;dtb%= zEAxed&A5oVgegYX%6d`n`%rfv@bY2Q&Wxb$&RNvfPN4446;!(^JdAgcpM54#yCv+$ zSPlChbxfie592tVLREa@<=;>{u5vEsA> Date: Tue, 26 Dec 2023 18:44:25 +0700 Subject: [PATCH 2/5] Remove: key --- deploy_images.py | 10 ++++++---- deploy_images.sh | 24 ++++++++++++------------ 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/deploy_images.py b/deploy_images.py index 2de1774..242016e 100644 --- a/deploy_images.py +++ b/deploy_images.py @@ -5,11 +5,13 @@ import string import os import boto3 from datetime import datetime +from dotenv import load_dotenv +load_dotenv(".env_prod") BASH_FILE = './deploy_images.sh' S3_ENDPOINT = "" -S3_ACCESS_KEY = "secret" -S3_SECRET_KEY = "secret" +S3_ACCESS_KEY = os.getenv('S3_ACCESS_KEY') +S3_SECRET_KEY = os.getenv('S3_SECRET_KEY') S3_BUCKET = "ocr-deployment-config" class MinioS3Client: @@ -77,8 +79,8 @@ def deploy(): # Define the variable tag = str(random_hash()[:8]) now = datetime.now() - # tag = tag + "_" + str(now.strftime("%d%m%y%H%M%S")) - tag = "4cae5134_261223123256" + tag = tag + "_" + str(now.strftime("%d%m%y%H%M%S")) + # tag = "4cae5134_261223123256" print(tag) # Execute the Bash script with the variable as a command-line argument diff --git a/deploy_images.sh b/deploy_images.sh index b0b8195..07a1c02 100755 --- a/deploy_images.sh +++ b/deploy_images.sh @@ -5,20 +5,20 @@ tag=$1 echo "[INFO] Tag received from Python: $tag" -# echo "[INFO] Pushing AI image with tag: $tag..." -# docker compose -f docker-compose-dev.yml build cope2n-fi-sbt -# docker tag sidp/cope2n-ai-fi-sbt:latest public.ecr.aws/v4n9y6r8/sidp/cope2n-ai-fi-sbt:${tag} -# docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-ai-fi-sbt:${tag} +echo "[INFO] Pushing AI image with tag: $tag..." +docker compose -f docker-compose-dev.yml build cope2n-fi-sbt +docker tag sidp/cope2n-ai-fi-sbt:latest public.ecr.aws/v4n9y6r8/sidp/cope2n-ai-fi-sbt:${tag} +docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-ai-fi-sbt:${tag} -# echo "[INFO] Pushing BE image with tag: $tag..." -# docker compose -f docker-compose-dev.yml build be-ctel-sbt -# docker tag sidp/cope2n-be-fi-sbt:latest public.ecr.aws/v4n9y6r8/sidp/cope2n-be-fi-sbt:${tag} -# docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-be-fi-sbt:${tag} +echo "[INFO] Pushing BE image with tag: $tag..." +docker compose -f docker-compose-dev.yml build be-ctel-sbt +docker tag sidp/cope2n-be-fi-sbt:latest public.ecr.aws/v4n9y6r8/sidp/cope2n-be-fi-sbt:${tag} +docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-be-fi-sbt:${tag} -# echo "[INFO] Pushing FE image with tag: $tag..." -# docker compose -f docker-compose-dev.yml build fe-sbt -# docker tag sidp/cope2n-fe-fi-sbt:latest public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:${tag} -# docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:${tag} +echo "[INFO] Pushing FE image with tag: $tag..." +docker compose -f docker-compose-dev.yml build fe-sbt +docker tag sidp/cope2n-fe-fi-sbt:latest public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:${tag} +docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:${tag} cp ./docker-compose-prod.yml ./docker-compose_${tag}.yml sed -i "s/{{tag}}/$tag/g" ./docker-compose_${tag}.yml From cd739e00352a11eb17fdde63beb30e32d9f3757b Mon Sep 17 00:00:00 2001 From: dx-tan Date: Tue, 26 Dec 2023 21:05:35 +0700 Subject: [PATCH 3/5] Workaround to fix missing db fields --- cope2n-api/fwd_api/api/ctel_view.py | 17 ++++++++-------- .../fwd_api/celery_worker/internal_task.py | 20 +++++++++++++++++++ .../celery_worker/process_result_tasks.py | 3 +-- cope2n-api/fwd_api/utils/redis.py | 2 +- 4 files changed, 31 insertions(+), 11 deletions(-) diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index e2664e0..c10af80 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -214,14 +214,6 @@ class CtelViewSet(viewsets.ViewSet): "imei": imei_file_objs, "invoice": invoice_file_objs } - total_page = len(files.keys()) - p_type = validated_data['type'] - new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, - pages_left=total_page, - process_type=p_type, status=1, request_id=rq_id, - provider_code=provider_code, - subscription=sub) - new_request.save() count = 0 doc_files_with_type = [] @@ -236,6 +228,15 @@ class CtelViewSet(viewsets.ViewSet): )) count += 1 + total_page = len(doc_files_with_type) + p_type = validated_data['type'] + new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, + pages_left=total_page, + process_type=p_type, status=1, request_id=rq_id, + provider_code=provider_code, + subscription=sub) + new_request.save() + # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible compact_files = [None] * len(doc_files_with_type) pool = ThreadPool(processes=2) diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index d1376d5..eeefc5c 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -13,6 +13,8 @@ from fwd_api.models import SubscriptionRequestFile from ..utils import file as FileUtils from ..utils import process as ProcessUtil from ..utils import s3 as S3Util +from fwd_api.constant.common import ProcessType + from celery.utils.log import get_task_logger from fwd import settings @@ -113,6 +115,24 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): new_request.ai_inference_start_time = time.time() new_request.save() + trials = 0 + while True: + rq: SubscriptionRequest = \ + SubscriptionRequest.objects.filter(request_id=rq_id).first() + if rq.ai_inference_start_time != 0: + break + time.sleep(0.1) + trials += 1 + if trials > 5: + rq.preprocessing_time = time.time() - start_time + rq.doc_type = doc_type_string + rq.ai_inference_start_time = time.time() + rq.save() + if trials > 10: + rq.status = 404 + rq.save() + return + # Send to next queue for sub_rq_id, sub_id, urls, user_id, p_type in to_queue: ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type) diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index 5fe7466..b86d2f5 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -145,8 +145,7 @@ def process_invoice_sbt_result(rq_id, result): try: page_index = int(rq_id.split("_sub_")[1]) rq_id = rq_id.split("_sub_")[0] - rq: SubscriptionRequest = \ - SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.SBT_INVOICE.value)[0] + rq: SubscriptionRequest = SubscriptionRequest.objects.filter(request_id=rq_id).first() for i in range(10): if rq.ai_inference_start_time == 0: logging.warn(f"ai_inference_start_time = 0, looks like database is lagging, attemp {i} in 0.2 second ...") diff --git a/cope2n-api/fwd_api/utils/redis.py b/cope2n-api/fwd_api/utils/redis.py index 7a9b54e..ff65035 100644 --- a/cope2n-api/fwd_api/utils/redis.py +++ b/cope2n-api/fwd_api/utils/redis.py @@ -24,7 +24,7 @@ class RedisUtils: return resutlt def get_size(self, request_id): - return self.redis_client.hlen(request_id) + return self.redis_client.hlen(request_id) def remove_cache(self, request_id): self.redis_client.delete(request_id) From 4e9c2b7c7daeb03d88cfc9250e499485bcdd66b6 Mon Sep 17 00:00:00 2001 From: dx-tan Date: Wed, 27 Dec 2023 13:57:33 +0700 Subject: [PATCH 4/5] Update: local database version --- docker-compose-prod.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose-prod.yml b/docker-compose-prod.yml index 35106c7..dc5c1b8 100644 --- a/docker-compose-prod.yml +++ b/docker-compose-prod.yml @@ -145,7 +145,7 @@ services: db-sbt: restart: always mem_reservation: 500m - image: postgres:14.7-alpine + image: postgres:15.4-alpine volumes: - ./data/postgres_data:/var/lib/postgresql/data networks: From dc1dfcb4d1a8ed7d9183ad709f753623952ca3c6 Mon Sep 17 00:00:00 2001 From: dx-tan Date: Wed, 27 Dec 2023 18:19:17 +0700 Subject: [PATCH 5/5] Workaround: Skip sencond save() to database --- .../celery_worker/mock_process_tasks_fi.py | 7 +-- cope2n-ai-fi/common/process_pdf.py | 12 ++++- cope2n-ai-fi/modules/sdsvkvu | 2 +- cope2n-api/fwd_api/api/ctel_view.py | 8 +-- .../fwd_api/celery_worker/internal_task.py | 45 +++++----------- .../celery_worker/process_result_tasks.py | 52 +++++++++---------- ...ubscriptionrequest_ai_inference_profile.py | 18 +++++++ .../fwd_api/models/SubscriptionRequest.py | 1 + cope2n-api/fwd_api/utils/process.py | 4 +- docker-compose-dev.yml | 1 + 10 files changed, 80 insertions(+), 70 deletions(-) create mode 100644 cope2n-api/fwd_api/migrations/0163_subscriptionrequest_ai_inference_profile.py diff --git a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py index 28b86a4..ef16d45 100755 --- a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py +++ b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py @@ -58,17 +58,18 @@ def process_manulife_invoice(rq_id, list_url): return {"rq_id": rq_id} @app.task(name="process_sbt_invoice") -def process_sbt_invoice(rq_id, list_url): +def process_sbt_invoice(rq_id, list_url, metadata): # TODO: simply returning 200 and 404 doesn't make any sense c_connector = CeleryConnector() try: result = compile_output_sbt(list_url) + metadata['ai_inference_profile'] = result.pop("inference_profile") hoadon = {"status": 200, "content": result, "message": "Success"} print(hoadon) - c_connector.process_sbt_invoice_result((rq_id, hoadon)) + c_connector.process_sbt_invoice_result((rq_id, hoadon, metadata)) return {"rq_id": rq_id} except Exception as e: print(e) hoadon = {"status": 404, "content": {}} - c_connector.process_sbt_invoice_result((rq_id, hoadon)) + c_connector.process_sbt_invoice_result((rq_id, hoadon, metadata)) return {"rq_id": rq_id} \ No newline at end of file diff --git a/cope2n-ai-fi/common/process_pdf.py b/cope2n-ai-fi/common/process_pdf.py index 4ad48b1..151d3ea 100755 --- a/cope2n-ai-fi/common/process_pdf.py +++ b/cope2n-ai-fi/common/process_pdf.py @@ -1,5 +1,6 @@ import os import json +import time from common import json2xml from common.json2xml import convert_key_names, replace_xml_values @@ -213,6 +214,8 @@ def compile_output_sbt(list_url): dict: output compiled """ + inference_profile = {} + results = { "model":{ "name":"Invoice", @@ -225,16 +228,23 @@ def compile_output_sbt(list_url): outputs = [] + start = time.time() + pages_predict_time = [] for page in list_url: output_model = predict_sbt(page['page_number'], page['file_url']) + pages_predict_time.append(time.time()) if "doc_type" in page: output_model['doc_type'] = page['doc_type'] outputs.append(output_model) + start_postprocess = time.time() documents = merge_sbt_output(outputs) + inference_profile["postprocess"] = [start_postprocess, time.time()] + inference_profile["inference"] = [start, pages_predict_time] results = { "total_pages": len(list_url), "ocr_num_pages": len(list_url), - "document": documents + "document": documents, + "inference_profile": inference_profile } return results diff --git a/cope2n-ai-fi/modules/sdsvkvu b/cope2n-ai-fi/modules/sdsvkvu index a471c10..4caed0d 160000 --- a/cope2n-ai-fi/modules/sdsvkvu +++ b/cope2n-ai-fi/modules/sdsvkvu @@ -1 +1 @@ -Subproject commit a471c1018c17cc917d1723776bae81f829450f95 +Subproject commit 4caed0d5ee08d1114727effd19bf32beab5263dc diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index c10af80..de752c0 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -1,6 +1,7 @@ import time import uuid from wsgiref.util import FileWrapper +from datetime import datetime from django.core.files.uploadedfile import TemporaryUploadedFile from django.http import HttpResponse, JsonResponse @@ -56,11 +57,11 @@ class CtelViewSet(viewsets.ViewSet): validated_data = ProcessUtil.validate_ocr_request_and_get(request, sub) provider_code = 'SAP' - rq_id = provider_code + uuid.uuid4().hex file_obj: TemporaryUploadedFile = validated_data['file'] file_extension = file_obj.name.split(".")[-1].lower() p_type = validated_data['type'] + rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex file_name = f"temp_{rq_id}.{file_extension}" is_test_request = validated_data.get("is_test_request", False) @@ -134,7 +135,6 @@ class CtelViewSet(viewsets.ViewSet): validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) provider_code = 'SAP' - rq_id = provider_code + uuid.uuid4().hex imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] @@ -145,6 +145,7 @@ class CtelViewSet(viewsets.ViewSet): } total_page = len(files.keys()) + rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex p_type = validated_data['type'] new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, pages_left=total_page, @@ -205,7 +206,6 @@ class CtelViewSet(viewsets.ViewSet): validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) provider_code = 'SAP' - rq_id = provider_code + uuid.uuid4().hex imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] @@ -214,7 +214,7 @@ class CtelViewSet(viewsets.ViewSet): "imei": imei_file_objs, "invoice": invoice_file_objs } - + rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex count = 0 doc_files_with_type = [] for doc_type, doc_files in files.items(): diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index eeefc5c..aed978c 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -64,12 +64,13 @@ def process_image_file(file_name: str, file_path, request, user) -> list: def process_pdf(rq_id, sub_id, p_type, user_id, files): """ files: [{ + "idx": int "file_name": "", "file_path": "", # local path to file "file_type": "" },] """ - new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] + new_request = SubscriptionRequest.objects.filter(request_id=rq_id).first() user = UserProfile.objects.filter(id=user_id).first() new_request.pages = len(files) new_request.pages_left = len(files) @@ -101,42 +102,24 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): files_with_idx = [(idx, file) for idx, file in enumerate(files)] for idx, url in pool.map(process_and_save_file, files_with_idx): b_urls[idx] = url - new_request.preprocessing_time = time.time() - start_time + preprocessing_time = time.time() - start_time # TODO: send to queue with different request_ids - doc_type_string = "" to_queue = [] + ai_inference_start_time = time.time() for i, b_url in enumerate(b_urls): + file_meta = {} fractorized_request_id = rq_id + f"_sub_{i}" - to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type)) - doc_type_string += "{},".format(b_url["doc_type"]) - doc_type_string = doc_type_string[:-1] - new_request.doc_type = doc_type_string - new_request.ai_inference_start_time = time.time() - new_request.save() - - trials = 0 - while True: - rq: SubscriptionRequest = \ - SubscriptionRequest.objects.filter(request_id=rq_id).first() - if rq.ai_inference_start_time != 0: - break - time.sleep(0.1) - trials += 1 - if trials > 5: - rq.preprocessing_time = time.time() - start_time - rq.doc_type = doc_type_string - rq.ai_inference_start_time = time.time() - rq.save() - if trials > 10: - rq.status = 404 - rq.save() - return - + file_meta["doc_type"] = b_url["doc_type"] + file_meta["ai_inference_start_time"] = ai_inference_start_time + file_meta["ai_inference_profile"] = {} + file_meta["index_in_request"] = i + file_meta["preprocessing_time"] = preprocessing_time + to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type, file_meta)) + # Send to next queue - for sub_rq_id, sub_id, urls, user_id, p_type in to_queue: - ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type) - + for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue: + ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata) @app.task(name='upload_file_to_s3') def upload_file_to_s3(local_file_path, s3_key, request_id): diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index b86d2f5..5c2cc7c 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -15,23 +15,23 @@ from fwd_api.utils import process as ProcessUtil redis_client = RedisUtils() -def aggregate_result(results, doc_types): - doc_types = doc_types.split(',') +def aggregate_result(results): + sorted_results = [None] * len(results) + doc_types = [] + for index, result in results.items(): + index = int(index) + doc_type = result["metadata"]["doc_type"] + doc_types.append(doc_type) + sorted_results[index] = ((doc_type, result)) des_result = deepcopy(list(results.values()))[0] + des_result.pop("metadata") des_result["content"]["total_pages"] = 0 des_result["content"]["ocr_num_pages"] = 0 des_result["content"]["document"][0]["end_page"] = 0 des_result["content"]["document"][0]["content"][3]["value"] = [None for _ in range(doc_types.count("imei"))] des_result["content"]["document"][0]["content"][2]["value"] = [] - sorted_results = [None] * len(doc_types) - for index, result in results.items(): - index = int(index) - doc_type = doc_types[index] - sorted_results[index] = ((doc_type, result)) - - imei_count = 0 for doc_type, result in sorted_results: des_result["content"]["total_pages"] += 1 @@ -136,7 +136,7 @@ def process_invoice_manulife_result(rq_id, result): random_processor_name = None @app.task(name='process_sbt_invoice_result') -def process_invoice_sbt_result(rq_id, result): +def process_invoice_sbt_result(rq_id, result, metadata): global random_processor_name if random_processor_name is None: random_processor_name = uuid.uuid4() @@ -146,21 +146,8 @@ def process_invoice_sbt_result(rq_id, result): page_index = int(rq_id.split("_sub_")[1]) rq_id = rq_id.split("_sub_")[0] rq: SubscriptionRequest = SubscriptionRequest.objects.filter(request_id=rq_id).first() - for i in range(10): - if rq.ai_inference_start_time == 0: - logging.warn(f"ai_inference_start_time = 0, looks like database is lagging, attemp {i} in 0.2 second ...") - rq.refresh_from_db() - time.sleep(0.2) - if i == 9: # return an error - logging.warn("Unable to retrieve rq, exiting") - rq.status = 404 # stop waiting - rq.predict_result = result - rq.save() - update_user(rq) - return "FailInvoice" - else: - break + result["metadata"] = metadata # status = to_status(result) status = result.get("status", 200) redis_client.set_cache(rq_id, page_index, result) @@ -169,17 +156,26 @@ def process_invoice_sbt_result(rq_id, result): if done: rq.status = 200 # stop waiting results = redis_client.get_all_cache(rq_id) - rq.predict_result = aggregate_result(results, rq.doc_type) + rq.predict_result = aggregate_result(results) # print(f"[DEBUG]: rq.predict_result: {rq.predict_result}") + ai_inference_profile = {} + doc_type_string = "" + for idx, result in results.items(): + ai_inference_profile["{doc_type}_{idx}".format(doc_type=result["metadata"]["doc_type"], idx=result["metadata"]["index_in_request"])] = result["metadata"]["ai_inference_profile"] + doc_type_string += "{},".format(result["metadata"]["doc_type"]) + doc_type_string = doc_type_string[:-1] + rq.ai_inference_profile = ai_inference_profile + rq.doc_type = doc_type_string + rq.ai_inference_start_time = result["metadata"]["ai_inference_start_time"] # advancing the last result + rq.preprocessing_time = result["metadata"]["preprocessing_time"] # advancing the last result + rq.ai_inference_time = time.time() - rq.ai_inference_start_time rq.save() - + else: rq.status = 404 # stop waiting rq.predict_result = result rq.save() - rq.ai_inference_time = time.time() - rq.ai_inference_start_time - rq.save() update_user(rq) except IndexError as e: print(e) diff --git a/cope2n-api/fwd_api/migrations/0163_subscriptionrequest_ai_inference_profile.py b/cope2n-api/fwd_api/migrations/0163_subscriptionrequest_ai_inference_profile.py new file mode 100644 index 0000000..7039658 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0163_subscriptionrequest_ai_inference_profile.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-27 09:02 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0162_merge_20231225_1439'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_profile', + field=models.JSONField(null=True), + ), + ] diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index 0cbda44..a6c90a8 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -21,6 +21,7 @@ class SubscriptionRequest(models.Model): is_test_request = models.BooleanField(default=False) S3_uploaded = models.BooleanField(default=False) + ai_inference_profile=models.JSONField(null=True) preprocessing_time = models.FloatField(default=-1) ai_inference_start_time = models.FloatField(default=0) ai_inference_time = models.FloatField(default=0) diff --git a/cope2n-api/fwd_api/utils/process.py b/cope2n-api/fwd_api/utils/process.py index a7a19a6..20f9bb7 100644 --- a/cope2n-api/fwd_api/utils/process.py +++ b/cope2n-api/fwd_api/utils/process.py @@ -307,7 +307,7 @@ def token_value(token_type): return 1 # Basic OCR -def send_to_queue2(rq_id, sub_id, file_url, user_id, typez): +def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata): try: if typez == ProcessType.ID_CARD.value: c_connector.process_id( @@ -319,7 +319,7 @@ def send_to_queue2(rq_id, sub_id, file_url, user_id, typez): elif typez == ProcessType.MANULIFE_INVOICE.value: c_connector.process_invoice_manulife((rq_id, file_url)) elif typez == ProcessType.SBT_INVOICE.value: - c_connector.process_invoice_sbt((rq_id, file_url)) + c_connector.process_invoice_sbt((rq_id, file_url, metadata)) except Exception as e: print(e) raise BadGatewayException() diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 45b7e4f..5638fee 100755 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -82,6 +82,7 @@ services: db-sbt: condition: service_started command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input && + python manage.py makemigrations && python manage.py migrate && python manage.py compilemessages && gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod