Add preprocessing time to response

This commit is contained in:
Viet Anh Nguyen 2023-12-21 17:31:55 +07:00
parent 1f3bb27373
commit ac5dd81185
12 changed files with 181 additions and 52 deletions

View File

@ -5,6 +5,7 @@ import random
import numpy as np import numpy as np
from pathlib import Path from pathlib import Path
import uuid import uuid
from copy import deepcopy
import sys, os import sys, os
cur_dir = str(Path(__file__).parents[2]) cur_dir = str(Path(__file__).parents[2])
sys.path.append(cur_dir) sys.path.append(cur_dir)

View File

@ -6,7 +6,7 @@ ocr_engine = {
"device": device "device": device
}, },
"recognizer": { "recognizer": {
"version": "/workspace/cope2n-ai-fi/weights/models/sdsvtr/hub/sbt_20231210_sdsrv.pth", "version": "/workspace/cope2n-ai-fi/weights/models/sdsvtr/hub/sbt_20231218_e116_sdstr.pth",
"device": device "device": device
}, },
"deskew": { "deskew": {

View File

@ -11,6 +11,7 @@ from rest_framework.response import Response
from typing import List from typing import List
from rest_framework.renderers import JSONRenderer from rest_framework.renderers import JSONRenderer
from rest_framework_xml.renderers import XMLRenderer from rest_framework_xml.renderers import XMLRenderer
from multiprocessing.pool import ThreadPool
from fwd import settings from fwd import settings
from ..celery_worker.client_connector import c_connector from ..celery_worker.client_connector import c_connector
@ -31,6 +32,9 @@ class CtelViewSet(viewsets.ViewSet):
'multipart/form-data': { 'multipart/form-data': {
'type': 'object', 'type': 'object',
'properties': { 'properties': {
'is_test_request': {
'type': 'boolean',
},
'file': { 'file': {
'type': 'string', 'type': 'string',
'format': 'binary' 'format': 'binary'
@ -43,7 +47,6 @@ class CtelViewSet(viewsets.ViewSet):
} }
}, responses=None, tags=['OCR']) }, responses=None, tags=['OCR'])
@action(detail=False, url_path="image/process", methods=["POST"]) @action(detail=False, url_path="image/process", methods=["POST"])
# @transaction.atomic
def process(self, request): def process(self, request):
s_time = time.time() s_time = time.time()
user_info = ProcessUtil.get_user(request) user_info = ProcessUtil.get_user(request)
@ -59,15 +62,19 @@ class CtelViewSet(viewsets.ViewSet):
file_extension = file_obj.name.split(".")[-1].lower() file_extension = file_obj.name.split(".")[-1].lower()
p_type = validated_data['type'] p_type = validated_data['type']
file_name = f"temp_{rq_id}.{file_extension}" file_name = f"temp_{rq_id}.{file_extension}"
is_test_request = validated_data.get("is_test_request", False)
total_page = 1 total_page = 1
new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, new_request: SubscriptionRequest = SubscriptionRequest(
pages_left=total_page, pages=total_page,
doc_type="all", pages_left=total_page,
process_type=p_type, status=1, request_id=rq_id, doc_type="all",
provider_code=provider_code, process_type=p_type, status=1, request_id=rq_id,
subscription=sub) provider_code=provider_code,
subscription=sub,
is_test_request=is_test_request
)
new_request.save() new_request.save()
from ..celery_worker.client_connector import c_connector from ..celery_worker.client_connector import c_connector
file_obj.seek(0) file_obj.seek(0)
@ -82,7 +89,6 @@ class CtelViewSet(viewsets.ViewSet):
if file_extension in pdf_extensions: if file_extension in pdf_extensions:
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files)) c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files))
# b_url = ProcessUtil.process_pdf_file(file_name, file_obj, new_request, user)
elif file_extension in image_extensions: elif file_extension in image_extensions:
b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user) b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user)
j_time = time.time() j_time = time.time()
@ -216,26 +222,39 @@ class CtelViewSet(viewsets.ViewSet):
provider_code=provider_code, provider_code=provider_code,
subscription=sub) subscription=sub)
new_request.save() new_request.save()
count = 0 count = 0
compact_files = [] doc_files_with_type = []
for doc_type, doc_files in files.items(): for doc_type, doc_files in files.items():
for i, doc_file in enumerate(doc_files): for i, doc_file in enumerate(doc_files):
_ext = doc_file.name.split(".")[-1] _ext = doc_file.name.split(".")[-1]
if _ext not in allowed_file_extensions: if _ext not in allowed_file_extensions:
return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"}) return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"})
_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" tmp_file_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}"
doc_file.seek(0) doc_files_with_type.append((
file_path = FileUtils.resize_and_save_file(_name, new_request, doc_file, 100) count, doc_type, doc_file, tmp_file_name
FileUtils.save_to_S3(_name, new_request, file_path) ))
count += 1 count += 1
this_file = {
"file_name": _name,
"file_path": file_path,
"file_type": doc_type
}
compact_files.append(this_file)
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files))
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
compact_files = [None] * len(doc_files_with_type)
pool = ThreadPool(processes=2)
def process_file(data):
idx, doc_type, doc_file, tmp_file_name = data
doc_file.seek(0)
file_path = FileUtils.resize_and_save_file(tmp_file_name, new_request, doc_file, 100)
FileUtils.save_to_S3(tmp_file_name, new_request, file_path)
return {
"idx": idx,
"file_name": tmp_file_name,
"file_path": file_path,
"file_type": doc_type
}
for result in pool.map(process_file, doc_files_with_type):
compact_files[result["idx"]] = result
# Send to AI queue
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files))
time_limit = 120 time_limit = 120
start_time = time.time() start_time = time.time()

View File

@ -13,7 +13,6 @@ else:
router.register("ctel", CtelViewSet, basename="CtelAPI") router.register("ctel", CtelViewSet, basename="CtelAPI")
router.register("ctel", CtelUserViewSet, basename="CtelUserAPI") router.register("ctel", CtelUserViewSet, basename="CtelUserAPI")
# router.register("ctel", CtelTemplateViewSet, basename="CtelTemplateAPI")
app_name = "api" app_name = "api"
urlpatterns = router.urls urlpatterns = router.urls

View File

@ -3,6 +3,7 @@ import uuid
import os import os
import base64 import base64
import traceback import traceback
from multiprocessing.pool import ThreadPool
from fwd_api.models import SubscriptionRequest, UserProfile from fwd_api.models import SubscriptionRequest, UserProfile
from fwd_api.celery_worker.worker import app from fwd_api.celery_worker.worker import app
@ -66,14 +67,13 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
"file_type": "" "file_type": ""
},] },]
""" """
start = time.time()
new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0]
user = UserProfile.objects.filter(id=user_id).first() user = UserProfile.objects.filter(id=user_id).first()
b_urls = []
new_request.pages = len(files) new_request.pages = len(files)
new_request.pages_left = len(files) new_request.pages_left = len(files)
for i, file in enumerate(files): def process_and_save_file(data):
idx, file = data
extension = file["file_name"].split(".")[-1].lower() extension = file["file_name"].split(".")[-1].lower()
if extension == "pdf": if extension == "pdf":
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user) _b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user)
@ -83,28 +83,39 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
raise FileContentInvalidException raise FileContentInvalidException
for j in range(len(_b_urls)): for j in range(len(_b_urls)):
_b_urls[j]["doc_type"] = file["file_type"] _b_urls[j]["doc_type"] = file["file_type"]
_b_urls[j]["page_number"] = j + len(b_urls) _b_urls[j]["page_number"] = idx
# b_urls += _b_urls # TODO: Client may request all images in a file, for now, extract the first page only return idx, _b_urls[0]
b_urls.append(_b_urls[0])
elif extension in image_extensions: elif extension in image_extensions:
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0] this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user)[0]
this_url["page_number"] = len(b_urls) this_url["page_number"] = idx
if file["file_type"]: if file["file_type"]:
this_url["doc_type"] = file["file_type"] this_url["doc_type"] = file["file_type"]
b_urls.append(this_url) return idx, this_url
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
start_time = time.time()
b_urls = [None] * len(files)
pool = ThreadPool(processes=2)
files_with_idx = [(idx, file) for idx, file in enumerate(files)]
for idx, url in pool.map(process_and_save_file, files_with_idx):
b_urls[idx] = url
new_request.preprocessing_time = time.time() - start_time
start_process = time.time()
logger.info(f"BE proccessing time: {start_process - start}")
# TODO: send to queue with different request_ids # TODO: send to queue with different request_ids
doc_type_string = "" doc_type_string = ""
to_queue = []
for i, b_url in enumerate(b_urls): for i, b_url in enumerate(b_urls):
fractorized_request_id = rq_id + f"_sub_{i}" fractorized_request_id = rq_id + f"_sub_{i}"
ProcessUtil.send_to_queue2(fractorized_request_id, sub_id, [b_url], user_id, p_type) to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type))
doc_type_string += "{},".format(b_url["doc_type"]) doc_type_string += "{},".format(b_url["doc_type"])
doc_type_string = doc_type_string[:-1] doc_type_string = doc_type_string[:-1]
new_request.doc_type = doc_type_string new_request.doc_type = doc_type_string
new_request.save() new_request.save()
# Send to next queue
for sub_rq_id, sub_id, urls, user_id, p_type in to_queue:
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type)
@app.task(name='upload_file_to_s3') @app.task(name='upload_file_to_s3')
def upload_file_to_s3(local_file_path, s3_key): def upload_file_to_s3(local_file_path, s3_key):

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.3 on 2023-12-21 04:32
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0157_alter_subscriptionrequest_created_at'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='is_test_request',
field=models.BooleanField(default=False),
),
]

View File

@ -0,0 +1,58 @@
# Generated by Django 4.1.3 on 2023-12-21 06:46
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0158_subscriptionrequest_is_test_request'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='ai_inference_time',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='cpu_percent',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='gpu_percent',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='gpu_stats',
field=models.CharField(max_length=100, null=True),
),
migrations.AddField(
model_name='subscriptionrequest',
name='is_bad_image_quality',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='subscriptionrequest',
name='is_reviewed',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='subscriptionrequest',
name='preprocessing_time',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='total_memory',
field=models.FloatField(default=-1),
),
migrations.AddField(
model_name='subscriptionrequest',
name='used_memory',
field=models.FloatField(default=-1),
),
]

View File

@ -19,3 +19,14 @@ class SubscriptionRequest(models.Model):
subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE)
created_at = models.DateTimeField(default=timezone.now, db_index=True) created_at = models.DateTimeField(default=timezone.now, db_index=True)
updated_at = models.DateTimeField(auto_now=True) updated_at = models.DateTimeField(auto_now=True)
is_test_request = models.BooleanField(default=False)
preprocessing_time = models.FloatField(default=-1)
ai_inference_time = models.FloatField(default=-1)
cpu_percent = models.FloatField(default=-1)
gpu_percent = models.FloatField(default=-1)
used_memory = models.FloatField(default=-1)
total_memory = models.FloatField(default=-1)
gpu_stats = models.CharField(max_length=100, null=True)
is_reviewed = models.BooleanField(default=False)
is_bad_image_quality = models.BooleanField(default=False)

View File

@ -5,7 +5,7 @@ from datetime import timedelta
from ..models import SubscriptionRequest from ..models import SubscriptionRequest
def get_latest_requests(limit=50): def get_latest_requests(limit=50):
requests = SubscriptionRequest.objects.order_by("-created_at")[:limit] requests = SubscriptionRequest.objects.filter(is_test_request=False).order_by("-created_at")[:limit]
requests_dict = [] requests_dict = []
for request in requests: for request in requests:
requests_dict.append({ requests_dict.append({
@ -14,13 +14,23 @@ def get_latest_requests(limit=50):
"doc_type": request.doc_type, "doc_type": request.doc_type,
# "predict_result": request.predict_result, # "predict_result": request.predict_result,
"created_at": request.created_at, "created_at": request.created_at,
"updated_at": request.updated_at,
"preprocessing_time": request.preprocessing_time,
"ai_inference_time": request.ai_inference_time,
"cpu_percent": request.cpu_percent,
"gpu_percent": request.gpu_percent,
"used_memory": request.used_memory,
"total_memory": request.total_memory,
"gpu_stats": request.gpu_stats,
"is_reviewed": request.is_reviewed,
"is_bad_image_quality": request.is_bad_image_quality,
}) })
return requests_dict return requests_dict
def count_requests_by_date(days_limit=5): def count_requests_by_date(days_limit=5):
today = timezone.now().date() today = timezone.now().date()
start_date = today - timedelta(days=days_limit) start_date = today - timedelta(days=days_limit)
requests_by_date = SubscriptionRequest.objects.filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date') requests_by_date = SubscriptionRequest.objects.filter(is_test_request=False).filter(created_at__gte=start_date).values('created_at__date').annotate(count=Count('id')).values('created_at__date', 'count').order_by('created_at__date')
count_dict = [] count_dict = []
for rbd in requests_by_date: for rbd in requests_by_date:
count_dict.append({ count_dict.append({

View File

@ -104,6 +104,8 @@ def validate_ocr_request_and_get(request, subscription):
FileUtils.validate_list_file(list_file) FileUtils.validate_list_file(list_file)
validated_data['file'] = list_file[0] validated_data['file'] = list_file[0]
validated_data['is_test_request'] = request.data.get('is_test_request', False)
return validated_data return validated_data
def sbt_validate_ocr_request_and_get(request, subscription): def sbt_validate_ocr_request_and_get(request, subscription):

View File

@ -33,7 +33,8 @@ services:
command: bash run.sh command: bash run.sh
deploy: deploy:
mode: replicated mode: replicated
replicas: 3 replicas: 2
# Back-end services # Back-end services
be-ctel-sbt: be-ctel-sbt:
build: build:
@ -71,7 +72,7 @@ services:
- ctel-sbt - ctel-sbt
volumes: volumes:
- ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT}
- ./data/BE_static:/app/static - ./data/static:/app/static
- ./cope2n-api:/app - ./cope2n-api:/app
working_dir: /app working_dir: /app
depends_on: depends_on:
@ -160,7 +161,7 @@ services:
- ./cope2n-api:/app - ./cope2n-api:/app
working_dir: /app working_dir: /app
command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5" command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 8"
# Back-end persistent # Back-end persistent
db-sbt: db-sbt:
@ -197,7 +198,7 @@ services:
shm_size: 10gb shm_size: 10gb
privileged: true privileged: true
ports: ports:
- 80:80 - ${SIDP_SERVICE_PORT:-9881}:80
depends_on: depends_on:
be-ctel-sbt: be-ctel-sbt:
condition: service_started condition: service_started
@ -207,7 +208,7 @@ services:
- VITE_PROXY=http://be-ctel-sbt:${BASE_PORT} - VITE_PROXY=http://be-ctel-sbt:${BASE_PORT}
- VITE_API_BASE_URL=http://fe-sbt:80 - VITE_API_BASE_URL=http://fe-sbt:80
volumes: volumes:
- ./data/BE_static:/backend-static - ./data/static:/backend-static
networks: networks:
- ctel-sbt - ctel-sbt

View File

@ -94,27 +94,26 @@ def process_file(data):
invoice_files = [
('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())),
]
# invoice_files = [ # invoice_files = [
# ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), # ('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())),
# ] # ]
invoice_files = [
('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())),
]
imei_files = [ imei_files = [
('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())), ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())),
('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), # ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())),
] ]
def get_imei_files(): def get_imei_files():
num_files = random.randint(1, len(imei_files) + 1) num_files = random.randint(1, len(imei_files) + 1)
print("Num imeis", num_files)
files = imei_files[:num_files] files = imei_files[:num_files]
# print("Num of imei files:", len(files)) # print("Num of imei files:", len(files))
return files return files
def get_files(): def get_files():
return invoice_files + get_imei_files() return invoice_files + imei_files # get_imei_files()
def gen_input(num_input): def gen_input(num_input):
for _ in range(num_input): for _ in range(num_input):
yield (get_files(), token) yield (get_files(), token)