From ac5dd81185127ac67627997448eeaa63d3a2ebd2 Mon Sep 17 00:00:00 2001 From: Viet Anh Nguyen Date: Thu, 21 Dec 2023 17:31:55 +0700 Subject: [PATCH] Add preprocessing time to response --- cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py | 1 + cope2n-ai-fi/configs/sdsap_sbt/configs.py | 2 +- cope2n-api/fwd_api/api/ctel_view.py | 61 ++++++++++++------- cope2n-api/fwd_api/api_router.py | 1 - .../fwd_api/celery_worker/internal_task.py | 33 ++++++---- ...158_subscriptionrequest_is_test_request.py | 18 ++++++ ...ptionrequest_ai_inference_time_and_more.py | 58 ++++++++++++++++++ .../fwd_api/models/SubscriptionRequest.py | 13 +++- cope2n-api/fwd_api/utils/health.py | 14 ++++- cope2n-api/fwd_api/utils/process.py | 2 + docker-compose.yml | 11 ++-- speedtest_sync.py | 19 +++--- 12 files changed, 181 insertions(+), 52 deletions(-) create mode 100644 cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py create mode 100644 cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py diff --git a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py index 795f78c..7039c2b 100755 --- a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py +++ b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py @@ -5,6 +5,7 @@ import random import numpy as np from pathlib import Path import uuid +from copy import deepcopy import sys, os cur_dir = str(Path(__file__).parents[2]) sys.path.append(cur_dir) diff --git a/cope2n-ai-fi/configs/sdsap_sbt/configs.py b/cope2n-ai-fi/configs/sdsap_sbt/configs.py index 90deedd..25d4d57 100755 --- a/cope2n-ai-fi/configs/sdsap_sbt/configs.py +++ b/cope2n-ai-fi/configs/sdsap_sbt/configs.py @@ -6,7 +6,7 @@ ocr_engine = { "device": device }, "recognizer": { - "version": "/workspace/cope2n-ai-fi/weights/models/sdsvtr/hub/sbt_20231210_sdsrv.pth", + "version": "/workspace/cope2n-ai-fi/weights/models/sdsvtr/hub/sbt_20231218_e116_sdstr.pth", "device": device }, "deskew": { diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index 27f0847..e262cba 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -11,6 +11,7 @@ from rest_framework.response import Response from typing import List from rest_framework.renderers import JSONRenderer from rest_framework_xml.renderers import XMLRenderer +from multiprocessing.pool import ThreadPool from fwd import settings from ..celery_worker.client_connector import c_connector @@ -31,6 +32,9 @@ class CtelViewSet(viewsets.ViewSet): 'multipart/form-data': { 'type': 'object', 'properties': { + 'is_test_request': { + 'type': 'boolean', + }, 'file': { 'type': 'string', 'format': 'binary' @@ -43,7 +47,6 @@ class CtelViewSet(viewsets.ViewSet): } }, responses=None, tags=['OCR']) @action(detail=False, url_path="image/process", methods=["POST"]) - # @transaction.atomic def process(self, request): s_time = time.time() user_info = ProcessUtil.get_user(request) @@ -59,15 +62,19 @@ class CtelViewSet(viewsets.ViewSet): file_extension = file_obj.name.split(".")[-1].lower() p_type = validated_data['type'] file_name = f"temp_{rq_id}.{file_extension}" + is_test_request = validated_data.get("is_test_request", False) total_page = 1 - new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, - pages_left=total_page, - doc_type="all", - process_type=p_type, status=1, request_id=rq_id, - provider_code=provider_code, - subscription=sub) + new_request: SubscriptionRequest = SubscriptionRequest( + pages=total_page, + pages_left=total_page, + doc_type="all", + process_type=p_type, status=1, request_id=rq_id, + provider_code=provider_code, + subscription=sub, + is_test_request=is_test_request + ) new_request.save() from ..celery_worker.client_connector import c_connector file_obj.seek(0) @@ -82,7 +89,6 @@ class CtelViewSet(viewsets.ViewSet): if file_extension in pdf_extensions: c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files)) - # b_url = ProcessUtil.process_pdf_file(file_name, file_obj, new_request, user) elif file_extension in image_extensions: b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user) j_time = time.time() @@ -216,26 +222,39 @@ class CtelViewSet(viewsets.ViewSet): provider_code=provider_code, subscription=sub) new_request.save() + count = 0 - compact_files = [] + doc_files_with_type = [] for doc_type, doc_files in files.items(): - for i, doc_file in enumerate(doc_files): + for i, doc_file in enumerate(doc_files): _ext = doc_file.name.split(".")[-1] if _ext not in allowed_file_extensions: return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"}) - _name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" - doc_file.seek(0) - file_path = FileUtils.resize_and_save_file(_name, new_request, doc_file, 100) - FileUtils.save_to_S3(_name, new_request, file_path) + tmp_file_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" + doc_files_with_type.append(( + count, doc_type, doc_file, tmp_file_name + )) count += 1 - this_file = { - "file_name": _name, - "file_path": file_path, - "file_type": doc_type - } - compact_files.append(this_file) - c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) + # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible + compact_files = [None] * len(doc_files_with_type) + pool = ThreadPool(processes=2) + def process_file(data): + idx, doc_type, doc_file, tmp_file_name = data + doc_file.seek(0) + file_path = FileUtils.resize_and_save_file(tmp_file_name, new_request, doc_file, 100) + FileUtils.save_to_S3(tmp_file_name, new_request, file_path) + return { + "idx": idx, + "file_name": tmp_file_name, + "file_path": file_path, + "file_type": doc_type + } + for result in pool.map(process_file, doc_files_with_type): + compact_files[result["idx"]] = result + + # Send to AI queue + c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) time_limit = 120 start_time = time.time() diff --git a/cope2n-api/fwd_api/api_router.py b/cope2n-api/fwd_api/api_router.py index 6a8ac73..6743957 100755 --- a/cope2n-api/fwd_api/api_router.py +++ b/cope2n-api/fwd_api/api_router.py @@ -13,7 +13,6 @@ else: router.register("ctel", CtelViewSet, basename="CtelAPI") router.register("ctel", CtelUserViewSet, basename="CtelUserAPI") -# router.register("ctel", CtelTemplateViewSet, basename="CtelTemplateAPI") app_name = "api" urlpatterns = router.urls diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index e3615f5..4a6781d 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -3,6 +3,7 @@ import uuid import os import base64 import traceback +from multiprocessing.pool import ThreadPool from fwd_api.models import SubscriptionRequest, UserProfile from fwd_api.celery_worker.worker import app @@ -66,14 +67,13 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): "file_type": "" },] """ - start = time.time() new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] user = UserProfile.objects.filter(id=user_id).first() - b_urls = [] new_request.pages = len(files) new_request.pages_left = len(files) - for i, file in enumerate(files): + def process_and_save_file(data): + idx, file = data extension = file["file_name"].split(".")[-1].lower() if extension == "pdf": _b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user) @@ -83,28 +83,39 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): raise FileContentInvalidException for j in range(len(_b_urls)): _b_urls[j]["doc_type"] = file["file_type"] - _b_urls[j]["page_number"] = j + len(b_urls) - # b_urls += _b_urls # TODO: Client may request all images in a file, for now, extract the first page only - b_urls.append(_b_urls[0]) + _b_urls[j]["page_number"] = idx + return idx, _b_urls[0] elif extension in image_extensions: this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0] - this_url["page_number"] = len(b_urls) + this_url["page_number"] = idx if file["file_type"]: this_url["doc_type"] = file["file_type"] - b_urls.append(this_url) + return idx, this_url + + # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible + start_time = time.time() + b_urls = [None] * len(files) + pool = ThreadPool(processes=2) + files_with_idx = [(idx, file) for idx, file in enumerate(files)] + for idx, url in pool.map(process_and_save_file, files_with_idx): + b_urls[idx] = url + new_request.preprocessing_time = time.time() - start_time - start_process = time.time() - logger.info(f"BE proccessing time: {start_process - start}") # TODO: send to queue with different request_ids doc_type_string = "" + to_queue = [] for i, b_url in enumerate(b_urls): fractorized_request_id = rq_id + f"_sub_{i}" - ProcessUtil.send_to_queue2(fractorized_request_id, sub_id, [b_url], user_id, p_type) + to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type)) doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string = doc_type_string[:-1] new_request.doc_type = doc_type_string new_request.save() + # Send to next queue + for sub_rq_id, sub_id, urls, user_id, p_type in to_queue: + ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type) + @app.task(name='upload_file_to_s3') def upload_file_to_s3(local_file_path, s3_key): diff --git a/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py new file mode 100644 index 0000000..c7ab242 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-21 04:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0157_alter_subscriptionrequest_created_at'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='is_test_request', + field=models.BooleanField(default=False), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py b/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py new file mode 100644 index 0000000..edacf3f --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py @@ -0,0 +1,58 @@ +# Generated by Django 4.1.3 on 2023-12-21 06:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0158_subscriptionrequest_is_test_request'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_time', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='cpu_percent', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='gpu_percent', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='gpu_stats', + field=models.CharField(max_length=100, null=True), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='is_bad_image_quality', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='is_reviewed', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='preprocessing_time', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='total_memory', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='used_memory', + field=models.FloatField(default=-1), + ), + ] diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index ea6d44c..c18ebf4 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -18,4 +18,15 @@ class SubscriptionRequest(models.Model): status = models.IntegerField() # 1: Processing(Pending) 2: PredictCompleted 3: ReturnCompleted subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) created_at = models.DateTimeField(default=timezone.now, db_index=True) - updated_at = models.DateTimeField(auto_now=True) \ No newline at end of file + updated_at = models.DateTimeField(auto_now=True) + is_test_request = models.BooleanField(default=False) + + preprocessing_time = models.FloatField(default=-1) + ai_inference_time = models.FloatField(default=-1) + cpu_percent = models.FloatField(default=-1) + gpu_percent = models.FloatField(default=-1) + used_memory = models.FloatField(default=-1) + total_memory = models.FloatField(default=-1) + gpu_stats = models.CharField(max_length=100, null=True) + is_reviewed = models.BooleanField(default=False) + is_bad_image_quality = models.BooleanField(default=False) diff --git a/cope2n-api/fwd_api/utils/health.py b/cope2n-api/fwd_api/utils/health.py index 9a69ca0..65a7cf4 100644 --- a/cope2n-api/fwd_api/utils/health.py +++ b/cope2n-api/fwd_api/utils/health.py @@ -5,7 +5,7 @@ from datetime import timedelta from ..models import SubscriptionRequest def get_latest_requests(limit=50): - requests = SubscriptionRequest.objects.order_by("-created_at")[:limit] + requests = SubscriptionRequest.objects.filter(is_test_request=False).order_by("-created_at")[:limit] requests_dict = [] for request in requests: requests_dict.append({ @@ -14,13 +14,23 @@ def get_latest_requests(limit=50): "doc_type": request.doc_type, # "predict_result": request.predict_result, "created_at": request.created_at, + "updated_at": request.updated_at, + "preprocessing_time": request.preprocessing_time, + "ai_inference_time": request.ai_inference_time, + "cpu_percent": request.cpu_percent, + "gpu_percent": request.gpu_percent, + "used_memory": request.used_memory, + "total_memory": request.total_memory, + "gpu_stats": request.gpu_stats, + "is_reviewed": request.is_reviewed, + "is_bad_image_quality": request.is_bad_image_quality, }) return requests_dict def count_requests_by_date(days_limit=5): today = timezone.now().date() start_date = today - timedelta(days=days_limit) - requests_by_date = SubscriptionRequest.objects.filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') + requests_by_date = SubscriptionRequest.objects.filter(is_test_request=False).filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') count_dict = [] for rbd in requests_by_date: count_dict.append({ diff --git a/cope2n-api/fwd_api/utils/process.py b/cope2n-api/fwd_api/utils/process.py index 7a833c9..a7a19a6 100644 --- a/cope2n-api/fwd_api/utils/process.py +++ b/cope2n-api/fwd_api/utils/process.py @@ -104,6 +104,8 @@ def validate_ocr_request_and_get(request, subscription): FileUtils.validate_list_file(list_file) validated_data['file'] = list_file[0] + validated_data['is_test_request'] = request.data.get('is_test_request', False) + return validated_data def sbt_validate_ocr_request_and_get(request, subscription): diff --git a/docker-compose.yml b/docker-compose.yml index 037c53f..18b59b6 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,8 @@ services: command: bash run.sh deploy: mode: replicated - replicas: 3 + replicas: 2 + # Back-end services be-ctel-sbt: build: @@ -71,7 +72,7 @@ services: - ctel-sbt volumes: - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - - ./data/BE_static:/app/static + - ./data/static:/app/static - ./cope2n-api:/app working_dir: /app depends_on: @@ -160,7 +161,7 @@ services: - ./cope2n-api:/app working_dir: /app - command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5" + command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 8" # Back-end persistent db-sbt: @@ -197,7 +198,7 @@ services: shm_size: 10gb privileged: true ports: - - 80:80 + - ${SIDP_SERVICE_PORT:-9881}:80 depends_on: be-ctel-sbt: condition: service_started @@ -207,7 +208,7 @@ services: - VITE_PROXY=http://be-ctel-sbt:${BASE_PORT} - VITE_API_BASE_URL=http://fe-sbt:80 volumes: - - ./data/BE_static:/backend-static + - ./data/static:/backend-static networks: - ctel-sbt diff --git a/speedtest_sync.py b/speedtest_sync.py index 449f451..1b66550 100644 --- a/speedtest_sync.py +++ b/speedtest_sync.py @@ -94,27 +94,26 @@ def process_file(data): -invoice_files = [ - ('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())), -] # invoice_files = [ -# ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), +# ('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())), # ] +invoice_files = [ + ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), +] imei_files = [ ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), + # ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), + # ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())), + # ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())), + # ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), ] def get_imei_files(): num_files = random.randint(1, len(imei_files) + 1) - print("Num imeis", num_files) files = imei_files[:num_files] # print("Num of imei files:", len(files)) return files def get_files(): - return invoice_files + get_imei_files() + return invoice_files + imei_files # get_imei_files() def gen_input(num_input): for _ in range(num_input): yield (get_files(), token)