Merge pull request #18 from dx-tan/feature/monitoring

Add preprocessing time to response
This commit is contained in:
Đỗ Xuân Tân 2023-12-25 10:18:51 +07:00 committed by GitHub Enterprise
commit 8e442a56f6
17 changed files with 283 additions and 79 deletions

View File

@ -5,6 +5,7 @@ import random
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
import uuid import uuid
from copy import deepcopy
import sys, os import sys, os
cur_dir = str(Path(__file__).parents[2]) cur_dir = str(Path(__file__).parents[2])
sys.path.append(cur_dir) sys.path.append(cur_dir)

View File

@ -1,4 +1,6 @@
from celery_worker.worker_fi import app from celery_worker.worker_fi import app
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output_sbt
@app.task(name="process_fi_invoice") @app.task(name="process_fi_invoice")
def process_invoice(rq_id, list_url): def process_invoice(rq_id, list_url):
@ -57,8 +59,6 @@ def process_manulife_invoice(rq_id, list_url):
@app.task(name="process_sbt_invoice") @app.task(name="process_sbt_invoice")
def process_sbt_invoice(rq_id, list_url): def process_sbt_invoice(rq_id, list_url):
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output_sbt
# TODO: simply returning 200 and 404 doesn't make any sense # TODO: simply returning 200 and 404 doesn't make any sense
c_connector = CeleryConnector() c_connector = CeleryConnector()
try: try:

View File

@ -6,7 +6,7 @@ ocr_engine = {
"device": device "device": device
}, },
"recognizer": { "recognizer": {
"version": "/workspace/cope2n-ai-fi/weights/models/ocr_engine/sdsvtr/hub/sbt_20231218_e116_sdstr.pth", "version": "/workspace/cope2n-ai-fi/weights/models/sdsvtr/hub/sbt_20231218_e116_sdstr.pth",
"device": device "device": device
}, },
"deskew": { "deskew": {

View File

@ -1,2 +1,2 @@
#!/bin/bash #!/bin/bash
bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo" bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo -c 1"

View File

@ -11,6 +11,7 @@ from rest_framework.response import Response
from typing import List from typing import List
from rest_framework.renderers import JSONRenderer from rest_framework.renderers import JSONRenderer
from rest_framework_xml.renderers import XMLRenderer from rest_framework_xml.renderers import XMLRenderer
from multiprocessing.pool import ThreadPool
from fwd import settings from fwd import settings
from ..celery_worker.client_connector import c_connector from ..celery_worker.client_connector import c_connector
@ -31,6 +32,9 @@ class CtelViewSet(viewsets.ViewSet):
'multipart/form-data': { 'multipart/form-data': {
'type': 'object', 'type': 'object',
'properties': { 'properties': {
'is_test_request': {
'type': 'boolean',
},
'file': { 'file': {
'type': 'string', 'type': 'string',
'format': 'binary' 'format': 'binary'
@ -43,7 +47,6 @@ class CtelViewSet(viewsets.ViewSet):
} }
}, responses=None, tags=['OCR']) }, responses=None, tags=['OCR'])
@action(detail=False, url_path="image/process", methods=["POST"]) @action(detail=False, url_path="image/process", methods=["POST"])
# @transaction.atomic
def process(self, request): def process(self, request):
s_time = time.time() s_time = time.time()
user_info = ProcessUtil.get_user(request) user_info = ProcessUtil.get_user(request)
@ -59,15 +62,19 @@ class CtelViewSet(viewsets.ViewSet):
file_extension = file_obj.name.split(".")[-1].lower() file_extension = file_obj.name.split(".")[-1].lower()
p_type = validated_data['type'] p_type = validated_data['type']
file_name = f"temp_{rq_id}.{file_extension}" file_name = f"temp_{rq_id}.{file_extension}"
is_test_request = validated_data.get("is_test_request", False)
total_page = 1 total_page = 1
new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, new_request: SubscriptionRequest = SubscriptionRequest(
pages_left=total_page, pages=total_page,
doc_type="all", pages_left=total_page,
process_type=p_type, status=1, request_id=rq_id, doc_type="all",
provider_code=provider_code, process_type=p_type, status=1, request_id=rq_id,
subscription=sub) provider_code=provider_code,
subscription=sub,
is_test_request=is_test_request
)
new_request.save() new_request.save()
from ..celery_worker.client_connector import c_connector from ..celery_worker.client_connector import c_connector
file_obj.seek(0) file_obj.seek(0)
@ -82,7 +89,6 @@ class CtelViewSet(viewsets.ViewSet):
if file_extension in pdf_extensions: if file_extension in pdf_extensions:
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files)) c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files))
# b_url = ProcessUtil.process_pdf_file(file_name, file_obj, new_request, user)
elif file_extension in image_extensions: elif file_extension in image_extensions:
b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user) b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user)
j_time = time.time() j_time = time.time()
@ -216,26 +222,39 @@ class CtelViewSet(viewsets.ViewSet):
provider_code=provider_code, provider_code=provider_code,
subscription=sub) subscription=sub)
new_request.save() new_request.save()
count = 0 count = 0
compact_files = [] doc_files_with_type = []
for doc_type, doc_files in files.items(): for doc_type, doc_files in files.items():
for i, doc_file in enumerate(doc_files): for i, doc_file in enumerate(doc_files):
_ext = doc_file.name.split(".")[-1] _ext = doc_file.name.split(".")[-1]
if _ext not in allowed_file_extensions: if _ext not in allowed_file_extensions:
return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"}) return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"})
_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" tmp_file_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}"
doc_file.seek(0) doc_files_with_type.append((
file_path = FileUtils.resize_and_save_file(_name, new_request, doc_file, 100) count, doc_type, doc_file, tmp_file_name
FileUtils.save_to_S3(_name, new_request, file_path) ))
count += 1 count += 1
this_file = {
"file_name": _name,
"file_path": file_path,
"file_type": doc_type
}
compact_files.append(this_file)
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files))
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
compact_files = [None] * len(doc_files_with_type)
pool = ThreadPool(processes=2)
def process_file(data):
idx, doc_type, doc_file, tmp_file_name = data
doc_file.seek(0)
file_path = FileUtils.resize_and_save_file(tmp_file_name, new_request, doc_file, 100)
FileUtils.save_to_S3(tmp_file_name, new_request, file_path)
return {
"idx": idx,
"file_name": tmp_file_name,
"file_path": file_path,
"file_type": doc_type
}
for result in pool.map(process_file, doc_files_with_type):
compact_files[result["idx"]] = result
# Send to AI queue
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files))
time_limit = 120 time_limit = 120
start_time = time.time() start_time = time.time()

View File

@ -13,7 +13,6 @@ else:
router.register("ctel", CtelViewSet, basename="CtelAPI") router.register("ctel", CtelViewSet, basename="CtelAPI")
router.register("ctel", CtelUserViewSet, basename="CtelUserAPI") router.register("ctel", CtelUserViewSet, basename="CtelUserAPI")
# router.register("ctel", CtelTemplateViewSet, basename="CtelTemplateAPI")
app_name = "api" app_name = "api"
urlpatterns = router.urls urlpatterns = router.urls

View File

@ -3,6 +3,7 @@ import uuid
import os import os
import base64 import base64
import traceback import traceback
from multiprocessing.pool import ThreadPool
from fwd_api.models import SubscriptionRequest, UserProfile from fwd_api.models import SubscriptionRequest, UserProfile
from fwd_api.celery_worker.worker import app from fwd_api.celery_worker.worker import app
@ -66,14 +67,13 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
"file_type": "" "file_type": ""
},] },]
""" """
start = time.time()
new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0]
user = UserProfile.objects.filter(id=user_id).first() user = UserProfile.objects.filter(id=user_id).first()
b_urls = []
new_request.pages = len(files) new_request.pages = len(files)
new_request.pages_left = len(files) new_request.pages_left = len(files)
for i, file in enumerate(files): def process_and_save_file(data):
idx, file = data
extension = file["file_name"].split(".")[-1].lower() extension = file["file_name"].split(".")[-1].lower()
if extension == "pdf": if extension == "pdf":
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user) _b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user)
@ -83,28 +83,40 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
raise FileContentInvalidException raise FileContentInvalidException
for j in range(len(_b_urls)): for j in range(len(_b_urls)):
_b_urls[j]["doc_type"] = file["file_type"] _b_urls[j]["doc_type"] = file["file_type"]
_b_urls[j]["page_number"] = j + len(b_urls) _b_urls[j]["page_number"] = idx
# b_urls += _b_urls # TODO: Client may request all images in a file, for now, extract the first page only return idx, _b_urls[0]
b_urls.append(_b_urls[0])
elif extension in image_extensions: elif extension in image_extensions:
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0] this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0]
this_url["page_number"] = len(b_urls) this_url["page_number"] = idx
if file["file_type"]: if file["file_type"]:
this_url["doc_type"] = file["file_type"] this_url["doc_type"] = file["file_type"]
b_urls.append(this_url) return idx, this_url
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
start_time = time.time()
b_urls = [None] * len(files)
pool = ThreadPool(processes=2)
files_with_idx = [(idx, file) for idx, file in enumerate(files)]
for idx, url in pool.map(process_and_save_file, files_with_idx):
b_urls[idx] = url
new_request.preprocessing_time = time.time() - start_time
start_process = time.time()
logger.info(f"BE proccessing time: {start_process - start}")
# TODO: send to queue with different request_ids # TODO: send to queue with different request_ids
doc_type_string = "" doc_type_string = ""
to_queue = []
for i, b_url in enumerate(b_urls): for i, b_url in enumerate(b_urls):
fractorized_request_id = rq_id + f"_sub_{i}" fractorized_request_id = rq_id + f"_sub_{i}"
ProcessUtil.send_to_queue2(fractorized_request_id, sub_id, [b_url], user_id, p_type) to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type))
doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string += "{},".format(b_url["doc_type"])
doc_type_string = doc_type_string[:-1] doc_type_string = doc_type_string[:-1]
new_request.doc_type = doc_type_string new_request.doc_type = doc_type_string
new_request.ai_inference_start_time = time.time()
new_request.save() new_request.save()
# Send to next queue
for sub_rq_id, sub_id, urls, user_id, p_type in to_queue:
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type)
@app.task(name='upload_file_to_s3') @app.task(name='upload_file_to_s3')
def upload_file_to_s3(local_file_path, s3_key): def upload_file_to_s3(local_file_path, s3_key):

View File

@ -1,4 +1,6 @@
import traceback import traceback
import time
import uuid
from copy import deepcopy from copy import deepcopy
@ -129,9 +131,15 @@ def process_invoice_manulife_result(rq_id, result):
print("Fail Invoice %d", rq_id) print("Fail Invoice %d", rq_id)
traceback.print_exc() traceback.print_exc()
return "FailInvoice" return "FailInvoice"
random_processor_name = None
@app.task(name='process_sbt_invoice_result') @app.task(name='process_sbt_invoice_result')
def process_invoice_sbt_result(rq_id, result): def process_invoice_sbt_result(rq_id, result):
global random_processor_name
if random_processor_name is None:
random_processor_name = uuid.uuid4()
print(rq_id, random_processor_name)
print_id(f"[DEBUG]: Received SBT request with id {rq_id}") print_id(f"[DEBUG]: Received SBT request with id {rq_id}")
try: try:
page_index = int(rq_id.split("_sub_")[1]) page_index = int(rq_id.split("_sub_")[1])
@ -157,13 +165,19 @@ def process_invoice_sbt_result(rq_id, result):
redis_client.remove_cache(rq_id) redis_client.remove_cache(rq_id)
rq.save() rq.save()
rq.ai_inference_time = time.time() - rq.ai_inference_start_time
rq.save()
update_user(rq) update_user(rq)
except IndexError as e: except IndexError as e:
print(e) print(e)
print("NotFound request by requestId, %d", rq_id) print("NotFound request by requestId, %d", rq_id)
rq.ai_inference_time = 0
rq.save()
except Exception as e: except Exception as e:
print(e) print(e)
print("Fail Invoice %d", rq_id) print("Fail Invoice %d", rq_id)
traceback.print_exc() traceback.print_exc()
rq.ai_inference_time = 0
rq.save()
return "FailInvoice" return "FailInvoice"

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.3 on 2023-12-21 04:32
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0157_alter_subscriptionrequest_created_at'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='is_test_request',
field=models.BooleanField(default=False),
),
]

View File

@ -0,0 +1,58 @@
# Generated by Django 4.1.3 on 2023-12-21 06:46
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0158_subscriptionrequest_is_test_request'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='ai_inference_time',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='cpu_percent',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='gpu_percent',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='gpu_stats',
field=models.CharField(max_length=100, null=True),
),
migrations.AddField(
model_name='subscriptionrequest',
name='is_bad_image_quality',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='subscriptionrequest',
name='is_reviewed',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='subscriptionrequest',
name='preprocessing_time',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='total_memory',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='used_memory',
field=models.FloatField(default=-1),
),
]

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.3 on 2023-12-22 03:08
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0159_subscriptionrequest_ai_inference_time_and_more'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='ai_inference_start_time',
field=models.DateTimeField(null=True),
),
]

View File

@ -0,0 +1,27 @@
# Generated by Django 4.1.3 on 2023-12-22 03:28
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0160_subscriptionrequest_ai_inference_start_time'),
]
operations = [
migrations.RemoveField(
model_name='subscriptionrequest',
name='ai_inference_start_time',
),
migrations.AddField(
model_name='subscriptionrequest',
name='ai_inference_start_time',
field=models.FloatField(default=0),
),
migrations.AlterField(
model_name='subscriptionrequest',
name='ai_inference_time',
field=models.FloatField(default=0),
),
]

View File

@ -1,7 +1,6 @@
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from fwd_api.models import UserProfile
from fwd_api.models.Subscription import Subscription from fwd_api.models.Subscription import Subscription
@ -18,4 +17,16 @@ class SubscriptionRequest(models.Model):
status = models.IntegerField() # 1: Processing(Pending) 2: PredictCompleted 3: ReturnCompleted status = models.IntegerField() # 1: Processing(Pending) 2: PredictCompleted 3: ReturnCompleted
subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE)
created_at = models.DateTimeField(default=timezone.now, db_index=True) created_at = models.DateTimeField(default=timezone.now, db_index=True)
updated_at = models.DateTimeField(auto_now=True) updated_at = models.DateTimeField(auto_now=True)
is_test_request = models.BooleanField(default=False)
preprocessing_time = models.FloatField(default=-1)
ai_inference_start_time = models.FloatField(default=0)
ai_inference_time = models.FloatField(default=0)
cpu_percent = models.FloatField(default=-1)
gpu_percent = models.FloatField(default=-1)
used_memory = models.FloatField(default=-1)
total_memory = models.FloatField(default=-1)
gpu_stats = models.CharField(max_length=100, null=True)
is_reviewed = models.BooleanField(default=False)
is_bad_image_quality = models.BooleanField(default=False)

View File

@ -5,7 +5,7 @@ from datetime import timedelta
from ..models import SubscriptionRequest from ..models import SubscriptionRequest
def get_latest_requests(limit=50): def get_latest_requests(limit=50):
requests = SubscriptionRequest.objects.order_by("-created_at")[:limit] requests = SubscriptionRequest.objects.filter(is_test_request=False).order_by("-created_at")[:limit]
requests_dict = [] requests_dict = []
for request in requests: for request in requests:
requests_dict.append({ requests_dict.append({
@ -14,13 +14,23 @@ def get_latest_requests(limit=50):
"doc_type": request.doc_type, "doc_type": request.doc_type,
# "predict_result": request.predict_result, # "predict_result": request.predict_result,
"created_at": request.created_at, "created_at": request.created_at,
"updated_at": request.updated_at,
"preprocessing_time": request.preprocessing_time,
"ai_inference_time": request.ai_inference_time,
"cpu_percent": request.cpu_percent,
"gpu_percent": request.gpu_percent,
"used_memory": request.used_memory,
"total_memory": request.total_memory,
"gpu_stats": request.gpu_stats,
"is_reviewed": request.is_reviewed,
"is_bad_image_quality": request.is_bad_image_quality,
}) })
return requests_dict return requests_dict
def count_requests_by_date(days_limit=5): def count_requests_by_date(days_limit=5):
today = timezone.now().date() today = timezone.now().date()
start_date = today - timedelta(days=days_limit) start_date = today - timedelta(days=days_limit)
requests_by_date = SubscriptionRequest.objects.filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') requests_by_date = SubscriptionRequest.objects.filter(is_test_request=False).filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date')
count_dict = [] count_dict = []
for rbd in requests_by_date: for rbd in requests_by_date:
count_dict.append({ count_dict.append({

View File

@ -104,6 +104,8 @@ def validate_ocr_request_and_get(request, subscription):
FileUtils.validate_list_file(list_file) FileUtils.validate_list_file(list_file)
validated_data['file'] = list_file[0] validated_data['file'] = list_file[0]
validated_data['is_test_request'] = request.data.get('is_test_request', False)
return validated_data return validated_data
def sbt_validate_ocr_request_and_get(request, subscription): def sbt_validate_ocr_request_and_get(request, subscription):

View File

@ -21,6 +21,7 @@ services:
deploy: deploy:
mode: replicated mode: replicated
replicas: 2 replicas: 2
# Back-end services # Back-end services
be-ctel-sbt: be-ctel-sbt:
environment: environment:
@ -57,7 +58,9 @@ services:
- ctel-sbt - ctel-sbt
volumes: volumes:
- ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT}
- BE_static:/app/static - ./data/static:/app/static
- ./cope2n-api:/app
working_dir: /app working_dir: /app
depends_on: depends_on:
db-sbt: db-sbt:
@ -143,7 +146,7 @@ services:
- ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT}
working_dir: /app working_dir: /app
command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5" command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 8"
# Back-end persistent # Back-end persistent
db-sbt: db-sbt:
@ -179,7 +182,7 @@ services:
image: public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:{{tag}} image: public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:{{tag}}
privileged: true privileged: true
ports: ports:
- 80:80 - ${SIDP_SERVICE_PORT:-9881}:80
depends_on: depends_on:
be-ctel-sbt: be-ctel-sbt:
condition: service_started condition: service_started
@ -189,7 +192,7 @@ services:
- VITE_PROXY=http://be-ctel-sbt:${BASE_PORT} - VITE_PROXY=http://be-ctel-sbt:${BASE_PORT}
- VITE_API_BASE_URL=http://fe-sbt:80 - VITE_API_BASE_URL=http://fe-sbt:80
volumes: volumes:
- BE_static:/backend-static - ./data/static:/backend-static
networks: networks:
- ctel-sbt - ctel-sbt

View File

@ -1,10 +1,12 @@
import requests import requests
import time import time
import random
import argparse import argparse
import multiprocessing import multiprocessing
import tqdm import tqdm
import random
import traceback import traceback
from requests_toolbelt import MultipartEncoderMonitor
import requests
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -30,23 +32,48 @@ try:
except: except:
print("Failed to login") print("Failed to login")
print(response.content) print(response.content)
# After the login, store the token in the memory (RAM) or DB # After the login, store the token in the memory (RAM) or DB
# Re-login to issue a new token after 6 days. # Re-login to issue a new token after 6 days.
# ================================================================= # =================================================================
def process_file(data): def process_file(data):
files, token = data _, token = data
files = []
if random.random() < 0.2:
files = [
('invoice_file', ("invoice.jpg", open("test_samples/sbt/invoice.jpg", "rb"), 'application/octet-stream')),
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')),
]
elif random.random() < 0.6:
files = [
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
]
else:
files = [
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')),
]
num_files = len(files) num_files = len(files)
files.append( files.append(('processType', '12'))
('processType', (None, 12)),
)
# ================================================================= # =================================================================
# UPLOAD THE FILE # UPLOAD THE FILE
start_time = time.time() start_time = time.time()
end_of_upload_time = 0
def my_callback(monitor):
nonlocal end_of_upload_time
if monitor.bytes_read == monitor.len:
end_of_upload_time = time.time()
m = MultipartEncoderMonitor.from_fields(
fields=files,
callback=my_callback
)
try: try:
response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={ response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={
'Authorization': token, 'Authorization': token,
}, files=files, timeout=300) 'Content-Type': m.content_type
}, data=m, timeout=300)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
print("Timeout occurred while uploading") print("Timeout occurred while uploading")
return { return {
@ -68,13 +95,20 @@ def process_file(data):
"num_files": 0, "num_files": 0,
} }
end_time = time.time() end_time = time.time()
upload_time = end_time - start_time upload_time = end_of_upload_time - start_time
# ================================================================= # =================================================================
try: try:
data = response.json() data = response.json()
if len(data["files"]) != num_files:
return {
"success": False,
"status": "missing_file",
"upload_time": 0,
"process_time": 0,
"num_files": 0,
}
data.pop("files", None) data.pop("files", None)
print(data)
except: except:
print(response.content) print(response.content)
return { return {
@ -88,36 +122,14 @@ def process_file(data):
"success": True, "success": True,
"status": 200, "status": 200,
"upload_time": upload_time, "upload_time": upload_time,
"process_time": upload_time, "process_time": time.time() - start_time - upload_time,
"num_files": num_files, "num_files": num_files,
} }
invoice_files = [
('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())),
]
# invoice_files = [
# ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())),
# ]
imei_files = [
('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())),
('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())),
]
def get_imei_files():
num_files = random.randint(1, len(imei_files) + 1)
print("Num imeis", num_files)
files = imei_files[:num_files]
# print("Num of imei files:", len(files))
return files
def get_files():
return invoice_files + get_imei_files()
def gen_input(num_input): def gen_input(num_input):
for _ in range(num_input): for _ in range(num_input):
yield (get_files(), token) yield (None, token)
pool = multiprocessing.Pool(processes=args.num_workers) pool = multiprocessing.Pool(processes=args.num_workers)
results = [] results = []
for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=args.num_requests)), total=args.num_requests): for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=args.num_requests)), total=args.num_requests):
@ -126,7 +138,6 @@ for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=ar
print("## TEST REPORT #################################") print("## TEST REPORT #################################")
print("Number of requests: {}".format(args.num_requests)) print("Number of requests: {}".format(args.num_requests))
print("Number of concurrent requests: {}".format(args.num_workers)) print("Number of concurrent requests: {}".format(args.num_workers))
print("Number of files: 1 invoice, 1-5 imei files (random)")
print("--------------------------------------") print("--------------------------------------")
print("SUCCESS RATE") print("SUCCESS RATE")
counter = {} counter = {}
@ -142,7 +153,8 @@ if len(uploading_time) == 0:
print("No valid uploading time") print("No valid uploading time")
print("Check the results!") print("Check the results!")
processing_time = [x["process_time"] for x in results if x["success"]] processing_time = [x["process_time"] for x in results if x["success"]]
print("Uploading + Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) print("Uploading time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(uploading_time) / len(uploading_time), min(uploading_time), max(uploading_time)))
print("Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time)))
print("--------------------------------------") print("--------------------------------------")
print("TIME BY IMAGE") print("TIME BY IMAGE")
uploading_time = [x["upload_time"] for x in results if x["success"]] uploading_time = [x["upload_time"] for x in results if x["success"]]