From 6840f16ee7f299b1dfec4f17a2504f5cec54679e Mon Sep 17 00:00:00 2001 From: Viet Anh Nguyen Date: Wed, 13 Dec 2023 18:43:10 +0700 Subject: [PATCH] Fix result accumulation --- .../celery_worker/client_connector.py | 2 +- .../celery_worker/client_connector_fi.py | 1 + cope2n-ai-fi/celery_worker/worker.py | 1 + cope2n-ai-fi/celery_worker/worker_fi.py | 1 + cope2n-ai-fi/requirements.txt | 3 +- cope2n-api/fwd/settings.py | 3 + cope2n-api/fwd_api/api/ctel_view.py | 9 --- .../fwd_api/celery_worker/client_connector.py | 1 + .../celery_worker/process_result_tasks.py | 75 +++++++++++-------- cope2n-api/fwd_api/celery_worker/worker.py | 1 + cope2n-api/fwd_api/exception/exceptions.py | 2 +- cope2n-api/fwd_api/utils/RedisUtils.py | 42 +++++++++++ cope2n-api/requirements.txt | 3 +- docker-compose.yml | 14 +++- speedtest_sync.py | 11 ++- 15 files changed, 118 insertions(+), 51 deletions(-) create mode 100644 cope2n-api/fwd_api/utils/RedisUtils.py diff --git a/cope2n-ai-fi/celery_worker/client_connector.py b/cope2n-ai-fi/celery_worker/client_connector.py index 49191e7..c9f7082 100755 --- a/cope2n-ai-fi/celery_worker/client_connector.py +++ b/cope2n-ai-fi/celery_worker/client_connector.py @@ -22,7 +22,7 @@ class CeleryConnector: app = Celery( "postman", broker=env.str("CELERY_BROKER", "amqp://test:test@rabbitmq:5672"), - # backend="rpc://", + broker_transport_options={'confirm_publish': True}, ) def process_id_result(self, args): diff --git a/cope2n-ai-fi/celery_worker/client_connector_fi.py b/cope2n-ai-fi/celery_worker/client_connector_fi.py index cd5f3ba..43b205d 100755 --- a/cope2n-ai-fi/celery_worker/client_connector_fi.py +++ b/cope2n-ai-fi/celery_worker/client_connector_fi.py @@ -19,6 +19,7 @@ class CeleryConnector: app = Celery( "postman", broker= env.str("CELERY_BROKER", "amqp://test:test@rabbitmq:5672"), + broker_transport_options={'confirm_publish': True}, ) # mock task for FI diff --git a/cope2n-ai-fi/celery_worker/worker.py b/cope2n-ai-fi/celery_worker/worker.py index 68b9d89..a8025b8 100755 --- a/cope2n-ai-fi/celery_worker/worker.py +++ b/cope2n-ai-fi/celery_worker/worker.py @@ -12,6 +12,7 @@ app: Celery = Celery( include=[ "celery_worker.mock_process_tasks", ], + broker_transport_options={'confirm_publish': True}, ) task_exchange = Exchange("default", type="direct") task_create_missing_queues = False diff --git a/cope2n-ai-fi/celery_worker/worker_fi.py b/cope2n-ai-fi/celery_worker/worker_fi.py index 54c49ea..7e9b906 100755 --- a/cope2n-ai-fi/celery_worker/worker_fi.py +++ b/cope2n-ai-fi/celery_worker/worker_fi.py @@ -11,6 +11,7 @@ app: Celery = Celery( include=[ "celery_worker.mock_process_tasks_fi", ], + broker_transport_options={'confirm_publish': True}, ) task_exchange = Exchange("default", type="direct") task_create_missing_queues = False diff --git a/cope2n-ai-fi/requirements.txt b/cope2n-ai-fi/requirements.txt index 82cbf93..1100c93 100755 --- a/cope2n-ai-fi/requirements.txt +++ b/cope2n-ai-fi/requirements.txt @@ -10,4 +10,5 @@ pymupdf easydict imagesize==1.4.1 -pdf2image==1.16.3 \ No newline at end of file +pdf2image==1.16.3 +redis==5.0.1 \ No newline at end of file diff --git a/cope2n-api/fwd/settings.py b/cope2n-api/fwd/settings.py index d63c3b0..15f8d18 100755 --- a/cope2n-api/fwd/settings.py +++ b/cope2n-api/fwd/settings.py @@ -44,6 +44,9 @@ S3_ENDPOINT = env.str("S3_ENDPOINT", "") S3_ACCESS_KEY = env.str("S3_ACCESS_KEY", "TannedCung") S3_SECRET_KEY = env.str("S3_SECRET_KEY", "TannedCung") S3_BUCKET_NAME = env.str("S3_BUCKET_NAME", "ocr-data") +REDIS_HOST = env.str("REDIS_HOST", "result-cache") +REDIS_PORT = env.int("REDIS_PORT", 6379) + INSTALLED_APPS = [ "django.contrib.auth", diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index 821501c..6e8526f 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -171,9 +171,7 @@ class CtelViewSet(viewsets.ViewSet): while True: current_time = time.time() waiting_time = current_time - start_time - print("Waiting for: ", waiting_time) if waiting_time > time_limit: - print("Timeout!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") break time.sleep(0.2) report_filter = SubscriptionRequest.objects.filter(request_id=rq_id) @@ -196,19 +194,12 @@ class CtelViewSet(viewsets.ViewSet): if report_filter[0].status == 400: raise FileContentInvalidException() if report_filter[0].status == 100: # continue, only return when result is fullfilled - print(serializer.data) - print("Status Code: 100") continue - if len(serializer.data) == 0: - print("No data found") continue if serializer.data[0].get("data", None) is None: - print(serializer.data[0]) - print("No data[0] found") continue if serializer.data[0]["data"].get("status", 200) != 200: - print("No data status found") continue return Response(status=status.HTTP_200_OK, data=serializer.data[0]) diff --git a/cope2n-api/fwd_api/celery_worker/client_connector.py b/cope2n-api/fwd_api/celery_worker/client_connector.py index 8382376..a95a9ca 100755 --- a/cope2n-api/fwd_api/celery_worker/client_connector.py +++ b/cope2n-api/fwd_api/celery_worker/client_connector.py @@ -37,6 +37,7 @@ class CeleryConnector: app = Celery( 'postman', broker=settings.BROKER_URL, + broker_transport_options={'confirm_publish': True}, ) def do_pdf(self, args): return self.send_task('do_pdf', args) diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index 191f112..a8dfa40 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -1,30 +1,44 @@ import traceback +from copy import deepcopy + from fwd_api.celery_worker.worker import app from fwd_api.models import SubscriptionRequest from fwd_api.exception.exceptions import InvalidException from fwd_api.models import SubscriptionRequest from fwd_api.constant.common import ProcessType +from fwd_api.utils.RedisUtils import RedisUtils +redis_client = RedisUtils() -def aggregate_result(src_result, des_result, doc_type): - if src_result["status"] != 200: - return src_result - if not des_result: - return src_result - des_result["content"]["total_pages"] += 1 - des_result["content"]["ocr_num_pages"] += 1 - des_result["content"]["document"][0]["end_page"] += 1 - if doc_type == "imei": - des_result["content"]["document"][0]["content"][3]["value"] += src_result["content"]["document"][0]["content"][3]["value"] - elif doc_type == "invoice": - des_result["content"]["document"][0]["content"][0]["value"] = src_result["content"]["document"][0]["content"][0]["value"] - des_result["content"]["document"][0]["content"][1]["value"] = src_result["content"]["document"][0]["content"][1]["value"] - des_result["content"]["document"][0]["content"][2]["value"] += src_result["content"]["document"][0]["content"][2]["value"] - elif doc_type == "all": - des_result.update(src_result) - else: - raise InvalidException(f"doc_type: {doc_type}") +def aggregate_result(resutls, doc_types): + doc_types = doc_types.split(',') + + des_result = deepcopy(list(resutls.values()))[0] + des_result["content"]["total_pages"] = 0 + des_result["content"]["ocr_num_pages"] = 0 + des_result["content"]["document"][0]["end_page"] = 0 + des_result["content"]["document"][0]["content"][3]["value"] = [None for _ in range(doc_types.count("imei"))] + des_result["content"]["document"][0]["content"][2]["value"] = [] + + print(f"[INFO]: resutls: {resutls}") + for index, resutl in resutls.items(): + index = int(index) + doc_type = doc_types[index] + + des_result["content"]["total_pages"] += 1 + des_result["content"]["ocr_num_pages"] += 1 + des_result["content"]["document"][0]["end_page"] += 1 + if doc_type == "imei": + des_result["content"]["document"][0]["content"][3]["value"][index] = resutl["content"]["document"][0]["content"][3]["value"][0] + elif doc_type == "invoice": + des_result["content"]["document"][0]["content"][0]["value"] = resutl["content"]["document"][0]["content"][0]["value"] + des_result["content"]["document"][0]["content"][1]["value"] = resutl["content"]["document"][0]["content"][1]["value"] + des_result["content"]["document"][0]["content"][2]["value"] += resutl["content"]["document"][0]["content"][2]["value"] + elif doc_type == "all": + des_result.update(resutl) + else: + raise InvalidException(f"doc_type: {doc_type}") return des_result @@ -114,7 +128,6 @@ def process_invoice_manulife_result(rq_id, result): @app.task(name='process_sbt_invoice_result') def process_invoice_sbt_result(rq_id, result): print_id(f"[DEBUG]: Received SBT request with id {rq_id}") - print_id(f"[DEBUG]: result: {result}") try: page_index = int(rq_id.split("_sub_")[1]) rq_id = rq_id.split("_sub_")[0] @@ -122,23 +135,23 @@ def process_invoice_sbt_result(rq_id, result): SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.SBT_INVOICE.value)[0] # status = to_status(result) status = result.get("status", 200) - - rq.pages_left = rq.pages_left - 1 - done = rq.pages_left <= 0 - # aggregate results from multiple pages - rq.predict_result = aggregate_result(result, rq.predict_result, rq.doc_type.split(",")[page_index]) - - print_id(f"[DEBUG]: status: {status}") - + redis_client.set_cache(rq_id, page_index, result) + done = rq.pages == redis_client.get_size(rq_id) if status == 200: - if not done: - rq.status = 100 # continue - else: + if done: rq.status = 200 # stop waiting + results = redis_client.get_all_cache(rq_id) + rq.predict_result = aggregate_result(results, rq.doc_type) + print(f"[DEBUG]: rq.predict_result: {rq.predict_result}") + redis_client.remove_cache(rq_id) + rq.save() + else: rq.status = 404 # stop waiting + rq.predict_result = result + redis_client.remove_cache(rq_id) + rq.save() - rq.save() update_user(rq) except IndexError as e: print(e) diff --git a/cope2n-api/fwd_api/celery_worker/worker.py b/cope2n-api/fwd_api/celery_worker/worker.py index 3412bce..5867612 100755 --- a/cope2n-api/fwd_api/celery_worker/worker.py +++ b/cope2n-api/fwd_api/celery_worker/worker.py @@ -13,6 +13,7 @@ app: Celery = Celery( 'postman', broker=settings.BROKER_URL, include=['fwd_api.celery_worker.process_result_tasks', 'fwd_api.celery_worker.internal_task'], + broker_transport_options={'confirm_publish': True}, ) app.conf.update({ diff --git a/cope2n-api/fwd_api/exception/exceptions.py b/cope2n-api/fwd_api/exception/exceptions.py index 5e0d687..3292a5e 100755 --- a/cope2n-api/fwd_api/exception/exceptions.py +++ b/cope2n-api/fwd_api/exception/exceptions.py @@ -1,5 +1,5 @@ from rest_framework import status -from rest_framework.exceptions import APIException, ValidationError +from rest_framework.exceptions import APIException from fwd import settings diff --git a/cope2n-api/fwd_api/utils/RedisUtils.py b/cope2n-api/fwd_api/utils/RedisUtils.py new file mode 100644 index 0000000..da4c7ec --- /dev/null +++ b/cope2n-api/fwd_api/utils/RedisUtils.py @@ -0,0 +1,42 @@ +import redis +import json + +from django.conf import settings + +class RedisUtils: + def __init__(self, host=settings.REDIS_HOST, port=settings.REDIS_PORT): + self.redis_client = redis.Redis(host=host, port=port, decode_responses=True) + + def set_cache(self, request_id, image_index, data): + """ + request_id: str + data: dict + image_index: int + """ + self.redis_client.hset(request_id, image_index, json.dumps(data)) + + def get_all_cache(self, request_id): + resutlt = {} + for key, value in self.redis_client.hgetall(request_id).items(): + resutlt[key] = json.loads(value) + return resutlt + + def get_size(self, request_id): + return self.redis_client.hlen(request_id) + + def remove_cache(self, request_id): + self.redis_client.delete(request_id) + +if __name__ == '__main__': + _host = "127.0.0.1" + _port = 6379 + Yujii_A = RedisUtils(_host, _port) + Yujii_A.set_cache("SAP123", 1, {"status": 1}) + Yujii_A.set_cache("SAP123", 2, {"status": 2}) + Yujii_A.set_cache("SAP123", 3, {"status": 3}) + print("[INFO]: data for request_id: {}".format(Yujii_A.get_all_cache("SAP123"))) + print("[INFO]: len for request_id: {}".format(Yujii_A.get_size("SAP123"))) + + Yujii_A.remove_cache("SAP123") + print("[INFO]: data for request_id: {}".format(Yujii_A.get_all_cache("SAP123"))) + print("[INFO]: len for request_id: {}".format(Yujii_A.get_size("SAP123"))) diff --git a/cope2n-api/requirements.txt b/cope2n-api/requirements.txt index 7e8f4dd..327bea3 100755 --- a/cope2n-api/requirements.txt +++ b/cope2n-api/requirements.txt @@ -48,4 +48,5 @@ PyMuPDF==1.21.1 djangorestframework-xml==2.0.0 boto3==1.29.7 imagesize==1.4.1 -pdf2image==1.16.3 \ No newline at end of file +pdf2image==1.16.3 +redis==5.0.1 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index dfd1e84..08d1670 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -102,6 +102,15 @@ services: - ctel-sbt command: server --address :9884 --console-address :9885 /data + result-cache: + image: redis:6.2-alpine + restart: always + command: redis-server --save 20 1 --loglevel warning + volumes: + - ./data/redis:/data + networks: + - ctel-sbt + be-celery-sbt: # build: # context: cope2n-api @@ -134,6 +143,9 @@ services: - S3_SECRET_KEY=${S3_SECRET_KEY} - S3_BUCKET_NAME=${S3_BUCKET_NAME} - BASE_URL=http://be-ctel-sbt:${BASE_PORT} + - REDIS_HOST=result-cache + - REDIS_PORT=6379 + networks: - ctel-sbt @@ -148,7 +160,7 @@ services: - ./cope2n-api:/app working_dir: /app - command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO --pool=solo" + command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 3" # Back-end persistent db-sbt: diff --git a/speedtest_sync.py b/speedtest_sync.py index b63cb61..7c7bc7d 100644 --- a/speedtest_sync.py +++ b/speedtest_sync.py @@ -94,12 +94,12 @@ def process_file(data): -invoice_files = [ - ('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())), -] # invoice_files = [ -# ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), +# ('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())), # ] +invoice_files = [ + ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), +] imei_files = [ ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/invoice.jpg", "rb").read())), ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), @@ -108,8 +108,7 @@ imei_files = [ ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), ] def get_imei_files(): - # num_files = random.randint(1, len(imei_files) + 1) - num_files = 1 + num_files = random.randint(1, len(imei_files) + 1) print("Num imeis", num_files) files = imei_files[:num_files] # print("Num of imei files:", len(files))