From 5c9a51ccdea99efece82227c942eb002189d9be2 Mon Sep 17 00:00:00 2001 From: Viet Anh Nguyen Date: Fri, 22 Dec 2023 14:03:56 +0700 Subject: [PATCH] Add AI processing time --- .../celery_worker/mock_process_tasks_fi.py | 4 +- cope2n-ai-fi/run.sh | 2 +- .../fwd_api/celery_worker/internal_task.py | 1 + .../celery_worker/process_result_tasks.py | 14 ++++++ ...criptionrequest_ai_inference_start_time.py | 18 ++++++++ ...equest_ai_inference_start_time_and_more.py | 27 ++++++++++++ .../fwd_api/models/SubscriptionRequest.py | 4 +- docker-compose.yml | 31 ++++++++++++- speedtest_sync.py | 43 ++++++++++++++----- 9 files changed, 128 insertions(+), 16 deletions(-) create mode 100644 cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py create mode 100644 cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py diff --git a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py index 00bec4a..28b86a4 100755 --- a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py +++ b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py @@ -1,4 +1,6 @@ from celery_worker.worker_fi import app +from celery_worker.client_connector_fi import CeleryConnector +from common.process_pdf import compile_output_sbt @app.task(name="process_fi_invoice") def process_invoice(rq_id, list_url): @@ -57,8 +59,6 @@ def process_manulife_invoice(rq_id, list_url): @app.task(name="process_sbt_invoice") def process_sbt_invoice(rq_id, list_url): - from celery_worker.client_connector_fi import CeleryConnector - from common.process_pdf import compile_output_sbt # TODO: simply returning 200 and 404 doesn't make any sense c_connector = CeleryConnector() try: diff --git a/cope2n-ai-fi/run.sh b/cope2n-ai-fi/run.sh index 167a73f..576b38e 100755 --- a/cope2n-ai-fi/run.sh +++ b/cope2n-ai-fi/run.sh @@ -1,2 +1,2 @@ #!/bin/bash -bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo" \ No newline at end of file +bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo -c 1" \ No newline at end of file diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index 4a6781d..2dd96bb 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -110,6 +110,7 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string = doc_type_string[:-1] new_request.doc_type = doc_type_string + new_request.ai_inference_start_time = time.time() new_request.save() # Send to next queue diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index 5dc3b3f..4121dc3 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -1,4 +1,6 @@ import traceback +import time +import uuid from copy import deepcopy @@ -129,9 +131,15 @@ def process_invoice_manulife_result(rq_id, result): print("Fail Invoice %d", rq_id) traceback.print_exc() return "FailInvoice" + +random_processor_name = None @app.task(name='process_sbt_invoice_result') def process_invoice_sbt_result(rq_id, result): + global random_processor_name + if random_processor_name is None: + random_processor_name = uuid.uuid4() + print(rq_id, random_processor_name) print_id(f"[DEBUG]: Received SBT request with id {rq_id}") try: page_index = int(rq_id.split("_sub_")[1]) @@ -157,13 +165,19 @@ def process_invoice_sbt_result(rq_id, result): redis_client.remove_cache(rq_id) rq.save() + rq.ai_inference_time = time.time() - rq.ai_inference_start_time + rq.save() update_user(rq) except IndexError as e: print(e) print("NotFound request by requestId, %d", rq_id) + rq.ai_inference_time = 0 + rq.save() except Exception as e: print(e) print("Fail Invoice %d", rq_id) traceback.print_exc() + rq.ai_inference_time = 0 + rq.save() return "FailInvoice" diff --git a/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py b/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py new file mode 100644 index 0000000..601b184 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-22 03:08 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0159_subscriptionrequest_ai_inference_time_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + field=models.DateTimeField(null=True), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py b/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py new file mode 100644 index 0000000..d219f0a --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py @@ -0,0 +1,27 @@ +# Generated by Django 4.1.3 on 2023-12-22 03:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0160_subscriptionrequest_ai_inference_start_time'), + ] + + operations = [ + migrations.RemoveField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + ), + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + field=models.FloatField(default=0), + ), + migrations.AlterField( + model_name='subscriptionrequest', + name='ai_inference_time', + field=models.FloatField(default=0), + ), + ] diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index c18ebf4..e8a4456 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -1,7 +1,6 @@ from django.db import models from django.utils import timezone -from fwd_api.models import UserProfile from fwd_api.models.Subscription import Subscription @@ -22,7 +21,8 @@ class SubscriptionRequest(models.Model): is_test_request = models.BooleanField(default=False) preprocessing_time = models.FloatField(default=-1) - ai_inference_time = models.FloatField(default=-1) + ai_inference_start_time = models.FloatField(default=0) + ai_inference_time = models.FloatField(default=0) cpu_percent = models.FloatField(default=-1) gpu_percent = models.FloatField(default=-1) used_memory = models.FloatField(default=-1) diff --git a/docker-compose.yml b/docker-compose.yml index 18b59b6..f5bbd8a 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,36 @@ services: command: bash run.sh deploy: mode: replicated - replicas: 2 + replicas: 1 + + cope2n-fi-sbt-2: + build: + context: cope2n-ai-fi + shm_size: 10gb + dockerfile: Dockerfile + shm_size: 10gb + restart: always + networks: + - ctel-sbt + privileged: true + environment: + - CELERY_BROKER=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@rabbitmq-sbt:5672 + - CUDA_VISIBLE_DEVICES=0 + volumes: + - ./cope2n-ai-fi:/workspace/cope2n-ai-fi # for dev container only + working_dir: /workspace/cope2n-ai-fi + # deploy: + # resources: + # reservations: + # devices: + # - driver: nvidia + # count: 1 + # capabilities: [gpu] + # command: bash -c "tail -f > /dev/null" + command: bash run.sh + deploy: + mode: replicated + replicas: 1 # Back-end services be-ctel-sbt: diff --git a/speedtest_sync.py b/speedtest_sync.py index 1b66550..0faeb3e 100644 --- a/speedtest_sync.py +++ b/speedtest_sync.py @@ -5,6 +5,8 @@ import multiprocessing import tqdm import random import traceback +from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor +import requests parser = argparse.ArgumentParser() @@ -36,17 +38,30 @@ except: def process_file(data): files, token = data - num_files = len(files) - files.append( - ('processType', (None, 12)), + files = ( + # 'invoice_file': ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb"), 'application/octet-stream'), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ('processType', '12'), ) + num_files = len(files) - 1 # ================================================================= # UPLOAD THE FILE start_time = time.time() + end_of_upload_time = 0 + def my_callback(monitor): + nonlocal end_of_upload_time + if monitor.bytes_read == monitor.len: + end_of_upload_time = time.time() + m = MultipartEncoderMonitor.from_fields( + fields=files, + callback=my_callback + ) try: response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={ 'Authorization': token, - }, files=files, timeout=300) + 'Content-Type': m.content_type + }, data=m, timeout=300) except requests.exceptions.Timeout: print("Timeout occurred while uploading") return { @@ -68,7 +83,7 @@ def process_file(data): "num_files": 0, } end_time = time.time() - upload_time = end_time - start_time + upload_time = end_of_upload_time - start_time # ================================================================= try: @@ -88,7 +103,7 @@ def process_file(data): "success": True, "status": 200, "upload_time": upload_time, - "process_time": upload_time, + "process_time": time.time() - start_time - upload_time, "num_files": num_files, } @@ -101,8 +116,8 @@ invoice_files = [ ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), ] imei_files = [ - ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())), - # ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), + ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read(), 'application/octet-stream')), + ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei1.jpg", "rb").read(), 'application/octet-stream')), # ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), @@ -112,8 +127,15 @@ def get_imei_files(): files = imei_files[:num_files] # print("Num of imei files:", len(files)) return files +# def get_files(): +# return imei_files + # return invoice_files + get_imei_files() def get_files(): - return invoice_files + imei_files # get_imei_files() + return { + 'invoice_file': ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read()), + 'imei_files': ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read(), 'application/octet-stream'), + 'processType': 12, + } def gen_input(num_input): for _ in range(num_input): yield (get_files(), token) @@ -141,7 +163,8 @@ if len(uploading_time) == 0: print("No valid uploading time") print("Check the results!") processing_time = [x["process_time"] for x in results if x["success"]] -print("Uploading + Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) +print("Uploading time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(uploading_time) / len(uploading_time), min(uploading_time), max(uploading_time))) +print("Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) print("--------------------------------------") print("TIME BY IMAGE") uploading_time = [x["upload_time"] for x in results if x["success"]]