diff --git a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py index 795f78c..7039c2b 100755 --- a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py +++ b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py @@ -5,6 +5,7 @@ import random import numpy as np from pathlib import Path import uuid +from copy import deepcopy import sys, os cur_dir = str(Path(__file__).parents[2]) sys.path.append(cur_dir) diff --git a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py index 00bec4a..28b86a4 100755 --- a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py +++ b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py @@ -1,4 +1,6 @@ from celery_worker.worker_fi import app +from celery_worker.client_connector_fi import CeleryConnector +from common.process_pdf import compile_output_sbt @app.task(name="process_fi_invoice") def process_invoice(rq_id, list_url): @@ -57,8 +59,6 @@ def process_manulife_invoice(rq_id, list_url): @app.task(name="process_sbt_invoice") def process_sbt_invoice(rq_id, list_url): - from celery_worker.client_connector_fi import CeleryConnector - from common.process_pdf import compile_output_sbt # TODO: simply returning 200 and 404 doesn't make any sense c_connector = CeleryConnector() try: diff --git a/cope2n-ai-fi/configs/sdsap_sbt/configs.py b/cope2n-ai-fi/configs/sdsap_sbt/configs.py index e2d43b2..8d40ba0 100755 --- a/cope2n-ai-fi/configs/sdsap_sbt/configs.py +++ b/cope2n-ai-fi/configs/sdsap_sbt/configs.py @@ -6,7 +6,7 @@ ocr_engine = { "device": device }, "recognizer": { - "version": "/workspace/cope2n-ai-fi/weights/models/ocr_engine/sdsvtr/hub/sbt_20231218_e116_sdstr.pth", + "version": "/workspace/cope2n-ai-fi/weights/models/sdsvtr/hub/sbt_20231218_e116_sdstr.pth", "device": device }, "deskew": { diff --git a/cope2n-ai-fi/run.sh b/cope2n-ai-fi/run.sh index 167a73f..576b38e 100755 --- a/cope2n-ai-fi/run.sh +++ b/cope2n-ai-fi/run.sh @@ -1,2 +1,2 @@ #!/bin/bash -bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo" \ No newline at end of file +bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo -c 1" \ No newline at end of file diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index 27f0847..e262cba 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -11,6 +11,7 @@ from rest_framework.response import Response from typing import List from rest_framework.renderers import JSONRenderer from rest_framework_xml.renderers import XMLRenderer +from multiprocessing.pool import ThreadPool from fwd import settings from ..celery_worker.client_connector import c_connector @@ -31,6 +32,9 @@ class CtelViewSet(viewsets.ViewSet): 'multipart/form-data': { 'type': 'object', 'properties': { + 'is_test_request': { + 'type': 'boolean', + }, 'file': { 'type': 'string', 'format': 'binary' @@ -43,7 +47,6 @@ class CtelViewSet(viewsets.ViewSet): } }, responses=None, tags=['OCR']) @action(detail=False, url_path="image/process", methods=["POST"]) - # @transaction.atomic def process(self, request): s_time = time.time() user_info = ProcessUtil.get_user(request) @@ -59,15 +62,19 @@ class CtelViewSet(viewsets.ViewSet): file_extension = file_obj.name.split(".")[-1].lower() p_type = validated_data['type'] file_name = f"temp_{rq_id}.{file_extension}" + is_test_request = validated_data.get("is_test_request", False) total_page = 1 - new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, - pages_left=total_page, - doc_type="all", - process_type=p_type, status=1, request_id=rq_id, - provider_code=provider_code, - subscription=sub) + new_request: SubscriptionRequest = SubscriptionRequest( + pages=total_page, + pages_left=total_page, + doc_type="all", + process_type=p_type, status=1, request_id=rq_id, + provider_code=provider_code, + subscription=sub, + is_test_request=is_test_request + ) new_request.save() from ..celery_worker.client_connector import c_connector file_obj.seek(0) @@ -82,7 +89,6 @@ class CtelViewSet(viewsets.ViewSet): if file_extension in pdf_extensions: c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files)) - # b_url = ProcessUtil.process_pdf_file(file_name, file_obj, new_request, user) elif file_extension in image_extensions: b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user) j_time = time.time() @@ -216,26 +222,39 @@ class CtelViewSet(viewsets.ViewSet): provider_code=provider_code, subscription=sub) new_request.save() + count = 0 - compact_files = [] + doc_files_with_type = [] for doc_type, doc_files in files.items(): - for i, doc_file in enumerate(doc_files): + for i, doc_file in enumerate(doc_files): _ext = doc_file.name.split(".")[-1] if _ext not in allowed_file_extensions: return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"}) - _name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" - doc_file.seek(0) - file_path = FileUtils.resize_and_save_file(_name, new_request, doc_file, 100) - FileUtils.save_to_S3(_name, new_request, file_path) + tmp_file_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" + doc_files_with_type.append(( + count, doc_type, doc_file, tmp_file_name + )) count += 1 - this_file = { - "file_name": _name, - "file_path": file_path, - "file_type": doc_type - } - compact_files.append(this_file) - c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) + # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible + compact_files = [None] * len(doc_files_with_type) + pool = ThreadPool(processes=2) + def process_file(data): + idx, doc_type, doc_file, tmp_file_name = data + doc_file.seek(0) + file_path = FileUtils.resize_and_save_file(tmp_file_name, new_request, doc_file, 100) + FileUtils.save_to_S3(tmp_file_name, new_request, file_path) + return { + "idx": idx, + "file_name": tmp_file_name, + "file_path": file_path, + "file_type": doc_type + } + for result in pool.map(process_file, doc_files_with_type): + compact_files[result["idx"]] = result + + # Send to AI queue + c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) time_limit = 120 start_time = time.time() diff --git a/cope2n-api/fwd_api/api_router.py b/cope2n-api/fwd_api/api_router.py index 6a8ac73..6743957 100755 --- a/cope2n-api/fwd_api/api_router.py +++ b/cope2n-api/fwd_api/api_router.py @@ -13,7 +13,6 @@ else: router.register("ctel", CtelViewSet, basename="CtelAPI") router.register("ctel", CtelUserViewSet, basename="CtelUserAPI") -# router.register("ctel", CtelTemplateViewSet, basename="CtelTemplateAPI") app_name = "api" urlpatterns = router.urls diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index e3615f5..2dd96bb 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -3,6 +3,7 @@ import uuid import os import base64 import traceback +from multiprocessing.pool import ThreadPool from fwd_api.models import SubscriptionRequest, UserProfile from fwd_api.celery_worker.worker import app @@ -66,14 +67,13 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): "file_type": "" },] """ - start = time.time() new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] user = UserProfile.objects.filter(id=user_id).first() - b_urls = [] new_request.pages = len(files) new_request.pages_left = len(files) - for i, file in enumerate(files): + def process_and_save_file(data): + idx, file = data extension = file["file_name"].split(".")[-1].lower() if extension == "pdf": _b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user) @@ -83,28 +83,40 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): raise FileContentInvalidException for j in range(len(_b_urls)): _b_urls[j]["doc_type"] = file["file_type"] - _b_urls[j]["page_number"] = j + len(b_urls) - # b_urls += _b_urls # TODO: Client may request all images in a file, for now, extract the first page only - b_urls.append(_b_urls[0]) + _b_urls[j]["page_number"] = idx + return idx, _b_urls[0] elif extension in image_extensions: this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0] - this_url["page_number"] = len(b_urls) + this_url["page_number"] = idx if file["file_type"]: this_url["doc_type"] = file["file_type"] - b_urls.append(this_url) + return idx, this_url + + # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible + start_time = time.time() + b_urls = [None] * len(files) + pool = ThreadPool(processes=2) + files_with_idx = [(idx, file) for idx, file in enumerate(files)] + for idx, url in pool.map(process_and_save_file, files_with_idx): + b_urls[idx] = url + new_request.preprocessing_time = time.time() - start_time - start_process = time.time() - logger.info(f"BE proccessing time: {start_process - start}") # TODO: send to queue with different request_ids doc_type_string = "" + to_queue = [] for i, b_url in enumerate(b_urls): fractorized_request_id = rq_id + f"_sub_{i}" - ProcessUtil.send_to_queue2(fractorized_request_id, sub_id, [b_url], user_id, p_type) + to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type)) doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string = doc_type_string[:-1] new_request.doc_type = doc_type_string + new_request.ai_inference_start_time = time.time() new_request.save() + # Send to next queue + for sub_rq_id, sub_id, urls, user_id, p_type in to_queue: + ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type) + @app.task(name='upload_file_to_s3') def upload_file_to_s3(local_file_path, s3_key): diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index 5dc3b3f..4121dc3 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -1,4 +1,6 @@ import traceback +import time +import uuid from copy import deepcopy @@ -129,9 +131,15 @@ def process_invoice_manulife_result(rq_id, result): print("Fail Invoice %d", rq_id) traceback.print_exc() return "FailInvoice" + +random_processor_name = None @app.task(name='process_sbt_invoice_result') def process_invoice_sbt_result(rq_id, result): + global random_processor_name + if random_processor_name is None: + random_processor_name = uuid.uuid4() + print(rq_id, random_processor_name) print_id(f"[DEBUG]: Received SBT request with id {rq_id}") try: page_index = int(rq_id.split("_sub_")[1]) @@ -157,13 +165,19 @@ def process_invoice_sbt_result(rq_id, result): redis_client.remove_cache(rq_id) rq.save() + rq.ai_inference_time = time.time() - rq.ai_inference_start_time + rq.save() update_user(rq) except IndexError as e: print(e) print("NotFound request by requestId, %d", rq_id) + rq.ai_inference_time = 0 + rq.save() except Exception as e: print(e) print("Fail Invoice %d", rq_id) traceback.print_exc() + rq.ai_inference_time = 0 + rq.save() return "FailInvoice" diff --git a/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py new file mode 100644 index 0000000..c7ab242 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_is_test_request.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-21 04:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0157_alter_subscriptionrequest_created_at'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='is_test_request', + field=models.BooleanField(default=False), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py b/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py new file mode 100644 index 0000000..edacf3f --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0159_subscriptionrequest_ai_inference_time_and_more.py @@ -0,0 +1,58 @@ +# Generated by Django 4.1.3 on 2023-12-21 06:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0158_subscriptionrequest_is_test_request'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_time', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='cpu_percent', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='gpu_percent', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='gpu_stats', + field=models.CharField(max_length=100, null=True), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='is_bad_image_quality', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='is_reviewed', + field=models.BooleanField(default=False), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='preprocessing_time', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='total_memory', + field=models.FloatField(default=-1), + ), + migrations.AddField( + model_name='subscriptionrequest', + name='used_memory', + field=models.FloatField(default=-1), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py b/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py new file mode 100644 index 0000000..601b184 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0160_subscriptionrequest_ai_inference_start_time.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-22 03:08 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0159_subscriptionrequest_ai_inference_time_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + field=models.DateTimeField(null=True), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py b/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py new file mode 100644 index 0000000..d219f0a --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0161_alter_subscriptionrequest_ai_inference_start_time_and_more.py @@ -0,0 +1,27 @@ +# Generated by Django 4.1.3 on 2023-12-22 03:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0160_subscriptionrequest_ai_inference_start_time'), + ] + + operations = [ + migrations.RemoveField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + ), + migrations.AddField( + model_name='subscriptionrequest', + name='ai_inference_start_time', + field=models.FloatField(default=0), + ), + migrations.AlterField( + model_name='subscriptionrequest', + name='ai_inference_time', + field=models.FloatField(default=0), + ), + ] diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index ea6d44c..e8a4456 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -1,7 +1,6 @@ from django.db import models from django.utils import timezone -from fwd_api.models import UserProfile from fwd_api.models.Subscription import Subscription @@ -18,4 +17,16 @@ class SubscriptionRequest(models.Model): status = models.IntegerField() # 1: Processing(Pending) 2: PredictCompleted 3: ReturnCompleted subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) created_at = models.DateTimeField(default=timezone.now, db_index=True) - updated_at = models.DateTimeField(auto_now=True) \ No newline at end of file + updated_at = models.DateTimeField(auto_now=True) + is_test_request = models.BooleanField(default=False) + + preprocessing_time = models.FloatField(default=-1) + ai_inference_start_time = models.FloatField(default=0) + ai_inference_time = models.FloatField(default=0) + cpu_percent = models.FloatField(default=-1) + gpu_percent = models.FloatField(default=-1) + used_memory = models.FloatField(default=-1) + total_memory = models.FloatField(default=-1) + gpu_stats = models.CharField(max_length=100, null=True) + is_reviewed = models.BooleanField(default=False) + is_bad_image_quality = models.BooleanField(default=False) diff --git a/cope2n-api/fwd_api/utils/health.py b/cope2n-api/fwd_api/utils/health.py index 9a69ca0..65a7cf4 100644 --- a/cope2n-api/fwd_api/utils/health.py +++ b/cope2n-api/fwd_api/utils/health.py @@ -5,7 +5,7 @@ from datetime import timedelta from ..models import SubscriptionRequest def get_latest_requests(limit=50): - requests = SubscriptionRequest.objects.order_by("-created_at")[:limit] + requests = SubscriptionRequest.objects.filter(is_test_request=False).order_by("-created_at")[:limit] requests_dict = [] for request in requests: requests_dict.append({ @@ -14,13 +14,23 @@ def get_latest_requests(limit=50): "doc_type": request.doc_type, # "predict_result": request.predict_result, "created_at": request.created_at, + "updated_at": request.updated_at, + "preprocessing_time": request.preprocessing_time, + "ai_inference_time": request.ai_inference_time, + "cpu_percent": request.cpu_percent, + "gpu_percent": request.gpu_percent, + "used_memory": request.used_memory, + "total_memory": request.total_memory, + "gpu_stats": request.gpu_stats, + "is_reviewed": request.is_reviewed, + "is_bad_image_quality": request.is_bad_image_quality, }) return requests_dict def count_requests_by_date(days_limit=5): today = timezone.now().date() start_date = today - timedelta(days=days_limit) - requests_by_date = SubscriptionRequest.objects.filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') + requests_by_date = SubscriptionRequest.objects.filter(is_test_request=False).filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') count_dict = [] for rbd in requests_by_date: count_dict.append({ diff --git a/cope2n-api/fwd_api/utils/process.py b/cope2n-api/fwd_api/utils/process.py index 7a833c9..a7a19a6 100644 --- a/cope2n-api/fwd_api/utils/process.py +++ b/cope2n-api/fwd_api/utils/process.py @@ -104,6 +104,8 @@ def validate_ocr_request_and_get(request, subscription): FileUtils.validate_list_file(list_file) validated_data['file'] = list_file[0] + validated_data['is_test_request'] = request.data.get('is_test_request', False) + return validated_data def sbt_validate_ocr_request_and_get(request, subscription): diff --git a/docker-compose.yml b/docker-compose.yml index 963434c..b677be1 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,7 @@ services: deploy: mode: replicated replicas: 2 + # Back-end services be-ctel-sbt: environment: @@ -57,7 +58,9 @@ services: - ctel-sbt volumes: - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - - BE_static:/app/static + - ./data/static:/app/static + - ./cope2n-api:/app + working_dir: /app depends_on: db-sbt: @@ -143,7 +146,7 @@ services: - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} working_dir: /app - command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5" + command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 8" # Back-end persistent db-sbt: @@ -179,7 +182,7 @@ services: image: public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:{{tag}} privileged: true ports: - - 80:80 + - ${SIDP_SERVICE_PORT:-9881}:80 depends_on: be-ctel-sbt: condition: service_started @@ -189,7 +192,7 @@ services: - VITE_PROXY=http://be-ctel-sbt:${BASE_PORT} - VITE_API_BASE_URL=http://fe-sbt:80 volumes: - - BE_static:/backend-static + - ./data/static:/backend-static networks: - ctel-sbt diff --git a/speedtest_sync.py b/speedtest_sync.py index 449f451..38cbffa 100644 --- a/speedtest_sync.py +++ b/speedtest_sync.py @@ -1,10 +1,12 @@ import requests import time +import random import argparse import multiprocessing import tqdm -import random import traceback +from requests_toolbelt import MultipartEncoderMonitor +import requests parser = argparse.ArgumentParser() @@ -30,23 +32,48 @@ try: except: print("Failed to login") print(response.content) + # After the login, store the token in the memory (RAM) or DB # Re-login to issue a new token after 6 days. # ================================================================= def process_file(data): - files, token = data + _, token = data + files = [] + if random.random() < 0.2: + files = [ + ('invoice_file', ("invoice.jpg", open("test_samples/sbt/invoice.jpg", "rb"), 'application/octet-stream')), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')), + ] + elif random.random() < 0.6: + files = [ + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ] + else: + files = [ + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')), + ] num_files = len(files) - files.append( - ('processType', (None, 12)), - ) + files.append(('processType', '12')) # ================================================================= # UPLOAD THE FILE start_time = time.time() + end_of_upload_time = 0 + def my_callback(monitor): + nonlocal end_of_upload_time + if monitor.bytes_read == monitor.len: + end_of_upload_time = time.time() + m = MultipartEncoderMonitor.from_fields( + fields=files, + callback=my_callback + ) try: response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={ 'Authorization': token, - }, files=files, timeout=300) + 'Content-Type': m.content_type + }, data=m, timeout=300) except requests.exceptions.Timeout: print("Timeout occurred while uploading") return { @@ -68,13 +95,20 @@ def process_file(data): "num_files": 0, } end_time = time.time() - upload_time = end_time - start_time + upload_time = end_of_upload_time - start_time # ================================================================= try: data = response.json() + if len(data["files"]) != num_files: + return { + "success": False, + "status": "missing_file", + "upload_time": 0, + "process_time": 0, + "num_files": 0, + } data.pop("files", None) - print(data) except: print(response.content) return { @@ -88,36 +122,14 @@ def process_file(data): "success": True, "status": 200, "upload_time": upload_time, - "process_time": upload_time, + "process_time": time.time() - start_time - upload_time, "num_files": num_files, } - -invoice_files = [ - ('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())), -] -# invoice_files = [ -# ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), -# ] -imei_files = [ - ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), -] -def get_imei_files(): - num_files = random.randint(1, len(imei_files) + 1) - print("Num imeis", num_files) - files = imei_files[:num_files] - # print("Num of imei files:", len(files)) - return files -def get_files(): - return invoice_files + get_imei_files() def gen_input(num_input): for _ in range(num_input): - yield (get_files(), token) + yield (None, token) pool = multiprocessing.Pool(processes=args.num_workers) results = [] for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=args.num_requests)), total=args.num_requests): @@ -126,7 +138,6 @@ for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=ar print("## TEST REPORT #################################") print("Number of requests: {}".format(args.num_requests)) print("Number of concurrent requests: {}".format(args.num_workers)) -print("Number of files: 1 invoice, 1-5 imei files (random)") print("--------------------------------------") print("SUCCESS RATE") counter = {} @@ -142,7 +153,8 @@ if len(uploading_time) == 0: print("No valid uploading time") print("Check the results!") processing_time = [x["process_time"] for x in results if x["success"]] -print("Uploading + Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) +print("Uploading time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(uploading_time) / len(uploading_time), min(uploading_time), max(uploading_time))) +print("Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) print("--------------------------------------") print("TIME BY IMAGE") uploading_time = [x["upload_time"] for x in results if x["success"]]