Add AI processing time

This commit is contained in:
Viet Anh Nguyen 2023-12-22 14:03:56 +07:00
parent ac5dd81185
commit 5c9a51ccde
9 changed files with 128 additions and 16 deletions

View File

@ -1,4 +1,6 @@
from celery_worker.worker_fi import app from celery_worker.worker_fi import app
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output_sbt
@app.task(name="process_fi_invoice") @app.task(name="process_fi_invoice")
def process_invoice(rq_id, list_url): def process_invoice(rq_id, list_url):
@ -57,8 +59,6 @@ def process_manulife_invoice(rq_id, list_url):
@app.task(name="process_sbt_invoice") @app.task(name="process_sbt_invoice")
def process_sbt_invoice(rq_id, list_url): def process_sbt_invoice(rq_id, list_url):
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output_sbt
# TODO: simply returning 200 and 404 doesn't make any sense # TODO: simply returning 200 and 404 doesn't make any sense
c_connector = CeleryConnector() c_connector = CeleryConnector()
try: try:

View File

@ -1,2 +1,2 @@
#!/bin/bash #!/bin/bash
bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo" bash -c "celery -A celery_worker.worker_fi worker --loglevel=INFO --pool=solo -c 1"

View File

@ -110,6 +110,7 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string += "{},".format(b_url["doc_type"])
doc_type_string = doc_type_string[:-1] doc_type_string = doc_type_string[:-1]
new_request.doc_type = doc_type_string new_request.doc_type = doc_type_string
new_request.ai_inference_start_time = time.time()
new_request.save() new_request.save()
# Send to next queue # Send to next queue

View File

@ -1,4 +1,6 @@
import traceback import traceback
import time
import uuid
from copy import deepcopy from copy import deepcopy
@ -129,9 +131,15 @@ def process_invoice_manulife_result(rq_id, result):
print("Fail Invoice %d", rq_id) print("Fail Invoice %d", rq_id)
traceback.print_exc() traceback.print_exc()
return "FailInvoice" return "FailInvoice"
random_processor_name = None
@app.task(name='process_sbt_invoice_result') @app.task(name='process_sbt_invoice_result')
def process_invoice_sbt_result(rq_id, result): def process_invoice_sbt_result(rq_id, result):
global random_processor_name
if random_processor_name is None:
random_processor_name = uuid.uuid4()
print(rq_id, random_processor_name)
print_id(f"[DEBUG]: Received SBT request with id {rq_id}") print_id(f"[DEBUG]: Received SBT request with id {rq_id}")
try: try:
page_index = int(rq_id.split("_sub_")[1]) page_index = int(rq_id.split("_sub_")[1])
@ -157,13 +165,19 @@ def process_invoice_sbt_result(rq_id, result):
redis_client.remove_cache(rq_id) redis_client.remove_cache(rq_id)
rq.save() rq.save()
rq.ai_inference_time = time.time() - rq.ai_inference_start_time
rq.save()
update_user(rq) update_user(rq)
except IndexError as e: except IndexError as e:
print(e) print(e)
print("NotFound request by requestId, %d", rq_id) print("NotFound request by requestId, %d", rq_id)
rq.ai_inference_time = 0
rq.save()
except Exception as e: except Exception as e:
print(e) print(e)
print("Fail Invoice %d", rq_id) print("Fail Invoice %d", rq_id)
traceback.print_exc() traceback.print_exc()
rq.ai_inference_time = 0
rq.save()
return "FailInvoice" return "FailInvoice"

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.3 on 2023-12-22 03:08
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0159_subscriptionrequest_ai_inference_time_and_more'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='ai_inference_start_time',
field=models.DateTimeField(null=True),
),
]

View File

@ -0,0 +1,27 @@
# Generated by Django 4.1.3 on 2023-12-22 03:28
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0160_subscriptionrequest_ai_inference_start_time'),
]
operations = [
migrations.RemoveField(
model_name='subscriptionrequest',
name='ai_inference_start_time',
),
migrations.AddField(
model_name='subscriptionrequest',
name='ai_inference_start_time',
field=models.FloatField(default=0),
),
migrations.AlterField(
model_name='subscriptionrequest',
name='ai_inference_time',
field=models.FloatField(default=0),
),
]

View File

@ -1,7 +1,6 @@
from django.db import models from django.db import models
from django.utils import timezone from django.utils import timezone
from fwd_api.models import UserProfile
from fwd_api.models.Subscription import Subscription from fwd_api.models.Subscription import Subscription
@ -22,7 +21,8 @@ class SubscriptionRequest(models.Model):
is_test_request = models.BooleanField(default=False) is_test_request = models.BooleanField(default=False)
preprocessing_time = models.FloatField(default=-1) preprocessing_time = models.FloatField(default=-1)
ai_inference_time = models.FloatField(default=-1) ai_inference_start_time = models.FloatField(default=0)
ai_inference_time = models.FloatField(default=0)
cpu_percent = models.FloatField(default=-1) cpu_percent = models.FloatField(default=-1)
gpu_percent = models.FloatField(default=-1) gpu_percent = models.FloatField(default=-1)
used_memory = models.FloatField(default=-1) used_memory = models.FloatField(default=-1)

View File

@ -33,7 +33,36 @@ services:
command: bash run.sh command: bash run.sh
deploy: deploy:
mode: replicated mode: replicated
replicas: 2 replicas: 1
cope2n-fi-sbt-2:
build:
context: cope2n-ai-fi
shm_size: 10gb
dockerfile: Dockerfile
shm_size: 10gb
restart: always
networks:
- ctel-sbt
privileged: true
environment:
- CELERY_BROKER=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@rabbitmq-sbt:5672
- CUDA_VISIBLE_DEVICES=0
volumes:
- ./cope2n-ai-fi:/workspace/cope2n-ai-fi # for dev container only
working_dir: /workspace/cope2n-ai-fi
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: 1
# capabilities: [gpu]
# command: bash -c "tail -f > /dev/null"
command: bash run.sh
deploy:
mode: replicated
replicas: 1
# Back-end services # Back-end services
be-ctel-sbt: be-ctel-sbt:

View File

@ -5,6 +5,8 @@ import multiprocessing
import tqdm import tqdm
import random import random
import traceback import traceback
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
import requests
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -36,17 +38,30 @@ except:
def process_file(data): def process_file(data):
files, token = data files, token = data
num_files = len(files) files = (
files.append( # 'invoice_file': ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb"), 'application/octet-stream'),
('processType', (None, 12)), ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
('processType', '12'),
) )
num_files = len(files) - 1
# ================================================================= # =================================================================
# UPLOAD THE FILE # UPLOAD THE FILE
start_time = time.time() start_time = time.time()
end_of_upload_time = 0
def my_callback(monitor):
nonlocal end_of_upload_time
if monitor.bytes_read == monitor.len:
end_of_upload_time = time.time()
m = MultipartEncoderMonitor.from_fields(
fields=files,
callback=my_callback
)
try: try:
response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={ response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={
'Authorization': token, 'Authorization': token,
}, files=files, timeout=300) 'Content-Type': m.content_type
}, data=m, timeout=300)
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
print("Timeout occurred while uploading") print("Timeout occurred while uploading")
return { return {
@ -68,7 +83,7 @@ def process_file(data):
"num_files": 0, "num_files": 0,
} }
end_time = time.time() end_time = time.time()
upload_time = end_time - start_time upload_time = end_of_upload_time - start_time
# ================================================================= # =================================================================
try: try:
@ -88,7 +103,7 @@ def process_file(data):
"success": True, "success": True,
"status": 200, "status": 200,
"upload_time": upload_time, "upload_time": upload_time,
"process_time": upload_time, "process_time": time.time() - start_time - upload_time,
"num_files": num_files, "num_files": num_files,
} }
@ -101,8 +116,8 @@ invoice_files = [
('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())),
] ]
imei_files = [ imei_files = [
('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())), ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read(), 'application/octet-stream')),
# ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei1.jpg", "rb").read(), 'application/octet-stream')),
# ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())),
# ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())),
# ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())),
@ -112,8 +127,15 @@ def get_imei_files():
files = imei_files[:num_files] files = imei_files[:num_files]
# print("Num of imei files:", len(files)) # print("Num of imei files:", len(files))
return files return files
# def get_files():
# return imei_files
# return invoice_files + get_imei_files()
def get_files(): def get_files():
return invoice_files + imei_files # get_imei_files() return {
'invoice_file': ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read()),
'imei_files': ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read(), 'application/octet-stream'),
'processType': 12,
}
def gen_input(num_input): def gen_input(num_input):
for _ in range(num_input): for _ in range(num_input):
yield (get_files(), token) yield (get_files(), token)
@ -141,7 +163,8 @@ if len(uploading_time) == 0:
print("No valid uploading time") print("No valid uploading time")
print("Check the results!") print("Check the results!")
processing_time = [x["process_time"] for x in results if x["success"]] processing_time = [x["process_time"] for x in results if x["success"]]
print("Uploading + Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) print("Uploading time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(uploading_time) / len(uploading_time), min(uploading_time), max(uploading_time)))
print("Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time)))
print("--------------------------------------") print("--------------------------------------")
print("TIME BY IMAGE") print("TIME BY IMAGE")
uploading_time = [x["upload_time"] for x in results if x["success"]] uploading_time = [x["upload_time"] for x in results if x["success"]]