diff --git a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py index 795f78c..7039c2b 100755 --- a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py +++ b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py @@ -5,6 +5,7 @@ import random import numpy as np from pathlib import Path import uuid +from copy import deepcopy import sys, os cur_dir = str(Path(__file__).parents[2]) sys.path.append(cur_dir) diff --git a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py index 00bec4a..28b86a4 100755 --- a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py +++ b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py @@ -1,4 +1,6 @@ from celery_worker.worker_fi import app +from celery_worker.client_connector_fi import CeleryConnector +from common.process_pdf import compile_output_sbt @app.task(name="process_fi_invoice") def process_invoice(rq_id, list_url): @@ -57,8 +59,6 @@ def process_manulife_invoice(rq_id, list_url): @app.task(name="process_sbt_invoice") def process_sbt_invoice(rq_id, list_url): - from celery_worker.client_connector_fi import CeleryConnector - from common.process_pdf import compile_output_sbt # TODO: simply returning 200 and 404 doesn't make any sense c_connector = CeleryConnector() try: diff --git a/cope2n-ai-fi/configs/sdsap_sbt/configs.py b/cope2n-ai-fi/configs/sdsap_sbt/configs.py index e2d43b2..8d40ba0 100755 --- a/cope2n-ai-fi/configs/sdsap_sbt/configs.py +++ b/cope2n-ai-fi/configs/sdsap_sbt/configs.py @@ -6,7 +6,7 @@ ocr_engine = { "device": device }, "recognizer": { - "version": "/workspace/cope2n-ai-fi/weights/models/ocr_engine/sdsvtr/hub/sbt_20231218_e116_sdstr.pth", + "version": "/workspace/cope2n-ai-fi/weights/models/sdsvtr/hub/sbt_20231218_e116_sdstr.pth", "device": device }, "deskew": { diff --git a/cope2n-ai-fi/run.sh b/cope2n-ai-fi/run.sh index 167a73f..576b38e 100755 --- a/cope2n-ai-fi/run.sh +++ b/cope2n-ai-fi/run.sh @@ -1,2 +1,2 @@ #!/bin/bash -bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo" \ No newline at end of file +bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo -c 1" \ No newline at end of file diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index 4de39e3..e2664e0 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -11,6 +11,7 @@ from rest_framework.response import Response from typing import List from rest_framework.renderers import JSONRenderer from rest_framework_xml.renderers import XMLRenderer +from multiprocessing.pool import ThreadPool from fwd import settings from ..celery_worker.client_connector import c_connector @@ -31,6 +32,9 @@ class CtelViewSet(viewsets.ViewSet): 'multipart/form-data': { 'type': 'object', 'properties': { + 'is_test_request': { + 'type': 'boolean', + }, 'file': { 'type': 'string', 'format': 'binary' @@ -43,7 +47,6 @@ class CtelViewSet(viewsets.ViewSet): } }, responses=None, tags=['OCR']) @action(detail=False, url_path="image/process", methods=["POST"]) - # @transaction.atomic def process(self, request): s_time = time.time() user_info = ProcessUtil.get_user(request) @@ -59,15 +62,19 @@ class CtelViewSet(viewsets.ViewSet): file_extension = file_obj.name.split(".")[-1].lower() p_type = validated_data['type'] file_name = f"temp_{rq_id}.{file_extension}" + is_test_request = validated_data.get("is_test_request", False) total_page = 1 - new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, - pages_left=total_page, - doc_type="all", - process_type=p_type, status=1, request_id=rq_id, - provider_code=provider_code, - subscription=sub) + new_request: SubscriptionRequest = SubscriptionRequest( + pages=total_page, + pages_left=total_page, + doc_type="all", + process_type=p_type, status=1, request_id=rq_id, + provider_code=provider_code, + subscription=sub, + is_test_request=is_test_request + ) new_request.save() from ..celery_worker.client_connector import c_connector file_obj.seek(0) @@ -82,7 +89,6 @@ class CtelViewSet(viewsets.ViewSet): if file_extension in pdf_extensions: c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files)) - # b_url = ProcessUtil.process_pdf_file(file_name, file_obj, new_request, user) elif file_extension in image_extensions: b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user) j_time = time.time() @@ -216,24 +222,38 @@ class CtelViewSet(viewsets.ViewSet): provider_code=provider_code, subscription=sub) new_request.save() + count = 0 - compact_files = [] + doc_files_with_type = [] for doc_type, doc_files in files.items(): - for i, doc_file in enumerate(doc_files): + for i, doc_file in enumerate(doc_files): _ext = doc_file.name.split(".")[-1] if _ext not in allowed_file_extensions: return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"}) - _name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" - doc_file.seek(0) - file_path = FileUtils.resize_and_save_file(_name, new_request, doc_file, 100) - FileUtils.save_to_S3(_name, new_request, file_path) + tmp_file_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" + doc_files_with_type.append(( + count, doc_type, doc_file, tmp_file_name + )) count += 1 - this_file = { - "file_name": _name, - "file_path": file_path, - "file_type": doc_type - } - compact_files.append(this_file) + + # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible + compact_files = [None] * len(doc_files_with_type) + pool = ThreadPool(processes=2) + def process_file(data): + idx, doc_type, doc_file, tmp_file_name = data + doc_file.seek(0) + file_path = FileUtils.resize_and_save_file(tmp_file_name, new_request, doc_file, 100) + FileUtils.save_to_S3(tmp_file_name, new_request, file_path) + return { + "idx": idx, + "file_name": tmp_file_name, + "file_path": file_path, + "file_type": doc_type + } + for result in pool.map(process_file, doc_files_with_type): + compact_files[result["idx"]] = result + + # Send to AI queue c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) time_limit = 120 diff --git a/cope2n-api/fwd_api/api_router.py b/cope2n-api/fwd_api/api_router.py index 6a8ac73..6743957 100755 --- a/cope2n-api/fwd_api/api_router.py +++ b/cope2n-api/fwd_api/api_router.py @@ -13,7 +13,6 @@ else: router.register("ctel", CtelViewSet, basename="CtelAPI") router.register("ctel", CtelUserViewSet, basename="CtelUserAPI") -# router.register("ctel", CtelTemplateViewSet, basename="CtelTemplateAPI") app_name = "api" urlpatterns = router.urls diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index f9c54cf..7ae29ac 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -3,6 +3,7 @@ import uuid import os import base64 import traceback +from multiprocessing.pool import ThreadPool from fwd_api.models import SubscriptionRequest, UserProfile from fwd_api.celery_worker.worker import app @@ -66,14 +67,13 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): "file_type": "" },] """ - start = time.time() new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] user = UserProfile.objects.filter(id=user_id).first() - b_urls = [] new_request.pages = len(files) new_request.pages_left = len(files) - for i, file in enumerate(files): + def process_and_save_file(data): + idx, file = data extension = file["file_name"].split(".")[-1].lower() if extension == "pdf": _b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user) @@ -83,28 +83,40 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): raise FileContentInvalidException for j in range(len(_b_urls)): _b_urls[j]["doc_type"] = file["file_type"] - _b_urls[j]["page_number"] = j + len(b_urls) - # b_urls += _b_urls # TODO: Client may request all images in a file, for now, extract the first page only - b_urls.append(_b_urls[0]) + _b_urls[j]["page_number"] = idx + return idx, _b_urls[0] elif extension in image_extensions: this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0] - this_url["page_number"] = len(b_urls) + this_url["page_number"] = idx if file["file_type"]: this_url["doc_type"] = file["file_type"] - b_urls.append(this_url) + return idx, this_url + + # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible + start_time = time.time() + b_urls = [None] * len(files) + pool = ThreadPool(processes=2) + files_with_idx = [(idx, file) for idx, file in enumerate(files)] + for idx, url in pool.map(process_and_save_file, files_with_idx): + b_urls[idx] = url + new_request.preprocessing_time = time.time() - start_time - start_process = time.time() - logger.info(f"BE proccessing time: {start_process - start}") # TODO: send to queue with different request_ids doc_type_string = "" + to_queue = [] for i, b_url in enumerate(b_urls): fractorized_request_id = rq_id + f"_sub_{i}" - ProcessUtil.send_to_queue2(fractorized_request_id, sub_id, [b_url], user_id, p_type) + to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type)) doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string = doc_type_string[:-1] new_request.doc_type = doc_type_string + new_request.ai_inference_start_time = time.time() new_request.save() + # Send to next queue + for sub_rq_id, sub_id, urls, user_id, p_type in to_queue: + ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type) + @app.task(name='upload_file_to_s3') def upload_file_to_s3(local_file_path, s3_key, request_id): diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index 5dc3b3f..4121dc3 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -1,4 +1,6 @@ import traceback +import time +import uuid from copy import deepcopy @@ -129,9 +131,15 @@ def process_invoice_manulife_result(rq_id, result): print("Fail Invoice %d", rq_id) traceback.print_exc() return "FailInvoice" + +random_processor_name = None @app.task(name='process_sbt_invoice_result') def process_invoice_sbt_result(rq_id, result): + global random_processor_name + if random_processor_name is None: + random_processor_name = uuid.uuid4() + print(rq_id, random_processor_name) print_id(f"[DEBUG]: Received SBT request with id {rq_id}") try: page_index = int(rq_id.split("_sub_")[1]) @@ -157,13 +165,19 @@ def process_invoice_sbt_result(rq_id, result): redis_client.remove_cache(rq_id) rq.save() + rq.ai_inference_time = time.time() - rq.ai_inference_start_time + rq.save() update_user(rq) except IndexError as e: print(e) print("NotFound request by requestId, %d", rq_id) + rq.ai_inference_time = 0 + rq.save() except Exception as e: print(e) print("Fail Invoice %d", rq_id) traceback.print_exc() + rq.ai_inference_time = 0 + rq.save() return "FailInvoice" diff --git a/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py new file mode 100644 index 0000000..c7ab242 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-21 04:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0157_alter_subscriptionrequest_created_at'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='is_test_request', + field=models.BooleanField(default=False), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py b/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py new file mode 100644 index 0000000..edacf3f --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py @@ -0,0 +1,58 @@ +# Generated by Django 4.1.3 on 2023-12-21 06:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0158_subscriptionrequest_is_test_request'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_time', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='cpu_percent', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='gpu_percent', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='gpu_stats', + field=models.CharField(max_length=100, null=True), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='is_bad_image_quality', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='is_reviewed', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='preprocessing_time', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='total_memory', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='used_memory', + field=models.FloatField(default=-1), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py b/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py new file mode 100644 index 0000000..601b184 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-22 03:08 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0159_subscriptionrequest_ai_inference_time_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + field=models.DateTimeField(null=True), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py b/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py new file mode 100644 index 0000000..d219f0a --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py @@ -0,0 +1,27 @@ +# Generated by Django 4.1.3 on 2023-12-22 03:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0160_subscriptionrequest_ai_inference_start_time'), + ] + + operations = [ + migrations.RemoveField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + ), + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + field=models.FloatField(default=0), + ), + migrations.AlterField( + model_name='subscriptionrequest', + name='ai_inference_time', + field=models.FloatField(default=0), + ), + ] diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index 1073cfe..4197c2e 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -1,7 +1,6 @@ from django.db import models from django.utils import timezone -from fwd_api.models import UserProfile from fwd_api.models.Subscription import Subscription @@ -19,4 +18,16 @@ class SubscriptionRequest(models.Model): subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) created_at = models.DateTimeField(default=timezone.now, db_index=True) updated_at = models.DateTimeField(auto_now=True) - S3_uploaded = models.BooleanField(default=False) \ No newline at end of file + S3_uploaded = models.BooleanField(default=False) + is_test_request = models.BooleanField(default=False) + + preprocessing_time = models.FloatField(default=-1) + ai_inference_start_time = models.FloatField(default=0) + ai_inference_time = models.FloatField(default=0) + cpu_percent = models.FloatField(default=-1) + gpu_percent = models.FloatField(default=-1) + used_memory = models.FloatField(default=-1) + total_memory = models.FloatField(default=-1) + gpu_stats = models.CharField(max_length=100, null=True) + is_reviewed = models.BooleanField(default=False) + is_bad_image_quality = models.BooleanField(default=False) diff --git a/cope2n-api/fwd_api/utils/health.py b/cope2n-api/fwd_api/utils/health.py index 9a69ca0..65a7cf4 100644 --- a/cope2n-api/fwd_api/utils/health.py +++ b/cope2n-api/fwd_api/utils/health.py @@ -5,7 +5,7 @@ from datetime import timedelta from ..models import SubscriptionRequest def get_latest_requests(limit=50): - requests = SubscriptionRequest.objects.order_by("-created_at")[:limit] + requests = SubscriptionRequest.objects.filter(is_test_request=False).order_by("-created_at")[:limit] requests_dict = [] for request in requests: requests_dict.append({ @@ -14,13 +14,23 @@ def get_latest_requests(limit=50): "doc_type": request.doc_type, # "predict_result": request.predict_result, "created_at": request.created_at, + "updated_at": request.updated_at, + "preprocessing_time": request.preprocessing_time, + "ai_inference_time": request.ai_inference_time, + "cpu_percent": request.cpu_percent, + "gpu_percent": request.gpu_percent, + "used_memory": request.used_memory, + "total_memory": request.total_memory, + "gpu_stats": request.gpu_stats, + "is_reviewed": request.is_reviewed, + "is_bad_image_quality": request.is_bad_image_quality, }) return requests_dict def count_requests_by_date(days_limit=5): today = timezone.now().date() start_date = today - timedelta(days=days_limit) - requests_by_date = SubscriptionRequest.objects.filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') + requests_by_date = SubscriptionRequest.objects.filter(is_test_request=False).filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') count_dict = [] for rbd in requests_by_date: count_dict.append({ diff --git a/cope2n-api/fwd_api/utils/process.py b/cope2n-api/fwd_api/utils/process.py index 7a833c9..a7a19a6 100644 --- a/cope2n-api/fwd_api/utils/process.py +++ b/cope2n-api/fwd_api/utils/process.py @@ -104,6 +104,8 @@ def validate_ocr_request_and_get(request, subscription): FileUtils.validate_list_file(list_file) validated_data['file'] = list_file[0] + validated_data['is_test_request'] = request.data.get('is_test_request', False) + return validated_data def sbt_validate_ocr_request_and_get(request, subscription): diff --git a/docker-compose.yml b/docker-compose.yml index 33afaa1..7205573 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,7 @@ services: deploy: mode: replicated replicas: 2 + # Back-end services be-ctel-sbt: environment: @@ -57,7 +58,9 @@ services: - ctel-sbt volumes: - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - - BE_static:/app/static + - ./data/static:/app/static + - ./cope2n-api:/app + working_dir: /app depends_on: db-sbt: @@ -141,7 +144,7 @@ services: - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} working_dir: /app - command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5" + command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 8" # Back-end persistent db-sbt: @@ -177,7 +180,7 @@ services: image: public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:{{tag}} privileged: true ports: - - 80:80 + - ${SIDP_SERVICE_PORT:-9881}:80 depends_on: be-ctel-sbt: condition: service_started @@ -187,7 +190,7 @@ services: - VITE_PROXY=http://be-ctel-sbt:${BASE_PORT} - VITE_API_BASE_URL=http://fe-sbt:80 volumes: - - BE_static:/backend-static + - ./data/static:/backend-static networks: - ctel-sbt diff --git a/speedtest_sync.py b/speedtest_sync.py index a24b839..1f5d84b 100644 --- a/speedtest_sync.py +++ b/speedtest_sync.py @@ -109,7 +109,6 @@ def process_file(data): "num_files": 0, } data.pop("files", None) - print(data) except: print(response.content) return {