Merge pull request #22 from dx-tan/fix/database_inconsistent

Fix/database inconsistent
This commit is contained in:
Đỗ Xuân Tân 2023-12-28 13:01:54 +07:00 committed by GitHub Enterprise
commit 0fafb8f565
11 changed files with 82 additions and 69 deletions

View File

@ -58,17 +58,18 @@ def process_manulife_invoice(rq_id, list_url):
return {"rq_id": rq_id} return {"rq_id": rq_id}
@app.task(name="process_sbt_invoice") @app.task(name="process_sbt_invoice")
def process_sbt_invoice(rq_id, list_url): def process_sbt_invoice(rq_id, list_url, metadata):
# TODO: simply returning 200 and 404 doesn't make any sense # TODO: simply returning 200 and 404 doesn't make any sense
c_connector = CeleryConnector() c_connector = CeleryConnector()
try: try:
result = compile_output_sbt(list_url) result = compile_output_sbt(list_url)
metadata['ai_inference_profile'] = result.pop("inference_profile")
hoadon = {"status": 200, "content": result, "message": "Success"} hoadon = {"status": 200, "content": result, "message": "Success"}
print(hoadon) print(hoadon)
c_connector.process_sbt_invoice_result((rq_id, hoadon)) c_connector.process_sbt_invoice_result((rq_id, hoadon, metadata))
return {"rq_id": rq_id} return {"rq_id": rq_id}
except Exception as e: except Exception as e:
print(e) print(e)
hoadon = {"status": 404, "content": {}} hoadon = {"status": 404, "content": {}}
c_connector.process_sbt_invoice_result((rq_id, hoadon)) c_connector.process_sbt_invoice_result((rq_id, hoadon, metadata))
return {"rq_id": rq_id} return {"rq_id": rq_id}

View File

@ -1,5 +1,6 @@
import os import os
import json import json
import time
from common import json2xml from common import json2xml
from common.json2xml import convert_key_names, replace_xml_values from common.json2xml import convert_key_names, replace_xml_values
@ -213,6 +214,8 @@ def compile_output_sbt(list_url):
dict: output compiled dict: output compiled
""" """
inference_profile = {}
results = { results = {
"model":{ "model":{
"name":"Invoice", "name":"Invoice",
@ -225,16 +228,23 @@ def compile_output_sbt(list_url):
outputs = [] outputs = []
start = time.time()
pages_predict_time = []
for page in list_url: for page in list_url:
output_model = predict_sbt(page['page_number'], page['file_url']) output_model = predict_sbt(page['page_number'], page['file_url'])
pages_predict_time.append(time.time())
if "doc_type" in page: if "doc_type" in page:
output_model['doc_type'] = page['doc_type'] output_model['doc_type'] = page['doc_type']
outputs.append(output_model) outputs.append(output_model)
start_postprocess = time.time()
documents = merge_sbt_output(outputs) documents = merge_sbt_output(outputs)
inference_profile["postprocess"] = [start_postprocess, time.time()]
inference_profile["inference"] = [start, pages_predict_time]
results = { results = {
"total_pages": len(list_url), "total_pages": len(list_url),
"ocr_num_pages": len(list_url), "ocr_num_pages": len(list_url),
"document": documents "document": documents,
"inference_profile": inference_profile
} }
return results return results

@ -1 +1 @@
Subproject commit a471c1018c17cc917d1723776bae81f829450f95 Subproject commit 4caed0d5ee08d1114727effd19bf32beab5263dc

View File

@ -1,6 +1,7 @@
import time import time
import uuid import uuid
from wsgiref.util import FileWrapper from wsgiref.util import FileWrapper
from datetime import datetime
from django.core.files.uploadedfile import TemporaryUploadedFile from django.core.files.uploadedfile import TemporaryUploadedFile
from django.http import HttpResponse, JsonResponse from django.http import HttpResponse, JsonResponse
@ -56,11 +57,11 @@ class CtelViewSet(viewsets.ViewSet):
validated_data = ProcessUtil.validate_ocr_request_and_get(request, sub) validated_data = ProcessUtil.validate_ocr_request_and_get(request, sub)
provider_code = 'SAP' provider_code = 'SAP'
rq_id = provider_code + uuid.uuid4().hex
file_obj: TemporaryUploadedFile = validated_data['file'] file_obj: TemporaryUploadedFile = validated_data['file']
file_extension = file_obj.name.split(".")[-1].lower() file_extension = file_obj.name.split(".")[-1].lower()
p_type = validated_data['type'] p_type = validated_data['type']
rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex
file_name = f"temp_{rq_id}.{file_extension}" file_name = f"temp_{rq_id}.{file_extension}"
is_test_request = validated_data.get("is_test_request", False) is_test_request = validated_data.get("is_test_request", False)
@ -134,7 +135,6 @@ class CtelViewSet(viewsets.ViewSet):
validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub)
provider_code = 'SAP' provider_code = 'SAP'
rq_id = provider_code + uuid.uuid4().hex
imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file']
invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file']
@ -145,6 +145,7 @@ class CtelViewSet(viewsets.ViewSet):
} }
total_page = len(files.keys()) total_page = len(files.keys())
rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex
p_type = validated_data['type'] p_type = validated_data['type']
new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page,
pages_left=total_page, pages_left=total_page,
@ -205,7 +206,6 @@ class CtelViewSet(viewsets.ViewSet):
validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub)
provider_code = 'SAP' provider_code = 'SAP'
rq_id = provider_code + uuid.uuid4().hex
imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file']
invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file']
@ -214,6 +214,7 @@ class CtelViewSet(viewsets.ViewSet):
"imei": imei_file_objs, "imei": imei_file_objs,
"invoice": invoice_file_objs "invoice": invoice_file_objs
} }
rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex
count = 0 count = 0
doc_files_with_type = [] doc_files_with_type = []

View File

@ -64,12 +64,13 @@ def process_image_file(file_name: str, file_path, request, user) -> list:
def process_pdf(rq_id, sub_id, p_type, user_id, files): def process_pdf(rq_id, sub_id, p_type, user_id, files):
""" """
files: [{ files: [{
"idx": int
"file_name": "", "file_name": "",
"file_path": "", # local path to file "file_path": "", # local path to file
"file_type": "" "file_type": ""
},] },]
""" """
new_request = SubscriptionRequest.objects.filter(request_id=rq_id)[0] new_request = SubscriptionRequest.objects.filter(request_id=rq_id).first()
user = UserProfile.objects.filter(id=user_id).first() user = UserProfile.objects.filter(id=user_id).first()
new_request.pages = len(files) new_request.pages = len(files)
new_request.pages_left = len(files) new_request.pages_left = len(files)
@ -101,42 +102,24 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
files_with_idx = [(idx, file) for idx, file in enumerate(files)] files_with_idx = [(idx, file) for idx, file in enumerate(files)]
for idx, url in pool.map(process_and_save_file, files_with_idx): for idx, url in pool.map(process_and_save_file, files_with_idx):
b_urls[idx] = url b_urls[idx] = url
new_request.preprocessing_time = time.time() - start_time preprocessing_time = time.time() - start_time
# TODO: send to queue with different request_ids # TODO: send to queue with different request_ids
doc_type_string = ""
to_queue = [] to_queue = []
ai_inference_start_time = time.time()
for i, b_url in enumerate(b_urls): for i, b_url in enumerate(b_urls):
file_meta = {}
fractorized_request_id = rq_id + f"_sub_{i}" fractorized_request_id = rq_id + f"_sub_{i}"
to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type)) file_meta["doc_type"] = b_url["doc_type"]
doc_type_string += "{},".format(b_url["doc_type"]) file_meta["ai_inference_start_time"] = ai_inference_start_time
doc_type_string = doc_type_string[:-1] file_meta["ai_inference_profile"] = {}
new_request.doc_type = doc_type_string file_meta["index_in_request"] = i
new_request.ai_inference_start_time = time.time() file_meta["preprocessing_time"] = preprocessing_time
new_request.save() to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type, file_meta))
trials = 0
while True:
rq: SubscriptionRequest = \
SubscriptionRequest.objects.filter(request_id=rq_id).first()
if rq.ai_inference_start_time != 0:
break
time.sleep(0.1)
trials += 1
if trials > 5:
rq.preprocessing_time = time.time() - start_time
rq.doc_type = doc_type_string
rq.ai_inference_start_time = time.time()
rq.save()
if trials > 10:
rq.status = 404
rq.save()
return
# Send to next queue # Send to next queue
for sub_rq_id, sub_id, urls, user_id, p_type in to_queue: for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue:
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type) ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata)
@app.task(name='upload_file_to_s3') @app.task(name='upload_file_to_s3')
def upload_file_to_s3(local_file_path, s3_key, request_id): def upload_file_to_s3(local_file_path, s3_key, request_id):

View File

@ -15,23 +15,23 @@ from fwd_api.utils import process as ProcessUtil
redis_client = RedisUtils() redis_client = RedisUtils()
def aggregate_result(results, doc_types): def aggregate_result(results):
doc_types = doc_types.split(',') sorted_results = [None] * len(results)
doc_types = []
for index, result in results.items():
index = int(index)
doc_type = result["metadata"]["doc_type"]
doc_types.append(doc_type)
sorted_results[index] = ((doc_type, result))
des_result = deepcopy(list(results.values()))[0] des_result = deepcopy(list(results.values()))[0]
des_result.pop("metadata")
des_result["content"]["total_pages"] = 0 des_result["content"]["total_pages"] = 0
des_result["content"]["ocr_num_pages"] = 0 des_result["content"]["ocr_num_pages"] = 0
des_result["content"]["document"][0]["end_page"] = 0 des_result["content"]["document"][0]["end_page"] = 0
des_result["content"]["document"][0]["content"][3]["value"] = [None for _ in range(doc_types.count("imei"))] des_result["content"]["document"][0]["content"][3]["value"] = [None for _ in range(doc_types.count("imei"))]
des_result["content"]["document"][0]["content"][2]["value"] = [] des_result["content"]["document"][0]["content"][2]["value"] = []
sorted_results = [None] * len(doc_types)
for index, result in results.items():
index = int(index)
doc_type = doc_types[index]
sorted_results[index] = ((doc_type, result))
imei_count = 0 imei_count = 0
for doc_type, result in sorted_results: for doc_type, result in sorted_results:
des_result["content"]["total_pages"] += 1 des_result["content"]["total_pages"] += 1
@ -136,7 +136,7 @@ def process_invoice_manulife_result(rq_id, result):
random_processor_name = None random_processor_name = None
@app.task(name='process_sbt_invoice_result') @app.task(name='process_sbt_invoice_result')
def process_invoice_sbt_result(rq_id, result): def process_invoice_sbt_result(rq_id, result, metadata):
global random_processor_name global random_processor_name
if random_processor_name is None: if random_processor_name is None:
random_processor_name = uuid.uuid4() random_processor_name = uuid.uuid4()
@ -146,21 +146,9 @@ def process_invoice_sbt_result(rq_id, result):
page_index = int(rq_id.split("_sub_")[1]) page_index = int(rq_id.split("_sub_")[1])
rq_id = rq_id.split("_sub_")[0] rq_id = rq_id.split("_sub_")[0]
rq: SubscriptionRequest = SubscriptionRequest.objects.filter(request_id=rq_id).first() rq: SubscriptionRequest = SubscriptionRequest.objects.filter(request_id=rq_id).first()
for i in range(10):
if rq.ai_inference_start_time == 0:
logging.warn(f"ai_inference_start_time = 0, looks like database is lagging, attemp {i} in 0.2 second ...")
rq.refresh_from_db()
time.sleep(0.2)
if i == 9: # return an error
logging.warn("Unable to retrieve rq, exiting")
rq.status = 404 # stop waiting
rq.predict_result = result
rq.save()
update_user(rq)
return "FailInvoice"
else:
break
result["metadata"] = metadata
# status = to_status(result) # status = to_status(result)
status = result.get("status", 200) status = result.get("status", 200)
redis_client.set_cache(rq_id, page_index, result) redis_client.set_cache(rq_id, page_index, result)
@ -169,8 +157,20 @@ def process_invoice_sbt_result(rq_id, result):
if done: if done:
rq.status = 200 # stop waiting rq.status = 200 # stop waiting
results = redis_client.get_all_cache(rq_id) results = redis_client.get_all_cache(rq_id)
rq.predict_result = aggregate_result(results, rq.doc_type) rq.predict_result = aggregate_result(results)
# print(f"[DEBUG]: rq.predict_result: {rq.predict_result}") # print(f"[DEBUG]: rq.predict_result: {rq.predict_result}")
ai_inference_profile = {}
doc_type_string = ""
for idx, result in results.items():
ai_inference_profile["{doc_type}_{idx}".format(doc_type=result["metadata"]["doc_type"], idx=result["metadata"]["index_in_request"])] = result["metadata"]["ai_inference_profile"]
doc_type_string += "{},".format(result["metadata"]["doc_type"])
doc_type_string = doc_type_string[:-1]
rq.ai_inference_profile = ai_inference_profile
rq.doc_type = doc_type_string
rq.ai_inference_start_time = result["metadata"]["ai_inference_start_time"] # advancing the last result
rq.preprocessing_time = result["metadata"]["preprocessing_time"] # advancing the last result
rq.ai_inference_time = time.time() - rq.ai_inference_start_time
rq.save() rq.save()
else: else:
@ -178,8 +178,6 @@ def process_invoice_sbt_result(rq_id, result):
rq.predict_result = result rq.predict_result = result
rq.save() rq.save()
rq.ai_inference_time = time.time() - rq.ai_inference_start_time
rq.save()
update_user(rq) update_user(rq)
except IndexError as e: except IndexError as e:
print(e) print(e)

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.3 on 2023-12-27 09:02
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0162_merge_20231225_1439'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='ai_inference_profile',
field=models.JSONField(null=True),
),
]

View File

@ -21,6 +21,7 @@ class SubscriptionRequest(models.Model):
is_test_request = models.BooleanField(default=False) is_test_request = models.BooleanField(default=False)
S3_uploaded = models.BooleanField(default=False) S3_uploaded = models.BooleanField(default=False)
ai_inference_profile=models.JSONField(null=True)
preprocessing_time = models.FloatField(default=-1) preprocessing_time = models.FloatField(default=-1)
ai_inference_start_time = models.FloatField(default=0) ai_inference_start_time = models.FloatField(default=0)
ai_inference_time = models.FloatField(default=0) ai_inference_time = models.FloatField(default=0)

View File

@ -307,7 +307,7 @@ def token_value(token_type):
return 1 # Basic OCR return 1 # Basic OCR
def send_to_queue2(rq_id, sub_id, file_url, user_id, typez): def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata):
try: try:
if typez == ProcessType.ID_CARD.value: if typez == ProcessType.ID_CARD.value:
c_connector.process_id( c_connector.process_id(
@ -319,7 +319,7 @@ def send_to_queue2(rq_id, sub_id, file_url, user_id, typez):
elif typez == ProcessType.MANULIFE_INVOICE.value: elif typez == ProcessType.MANULIFE_INVOICE.value:
c_connector.process_invoice_manulife((rq_id, file_url)) c_connector.process_invoice_manulife((rq_id, file_url))
elif typez == ProcessType.SBT_INVOICE.value: elif typez == ProcessType.SBT_INVOICE.value:
c_connector.process_invoice_sbt((rq_id, file_url)) c_connector.process_invoice_sbt((rq_id, file_url, metadata))
except Exception as e: except Exception as e:
print(e) print(e)
raise BadGatewayException() raise BadGatewayException()

View File

@ -82,6 +82,7 @@ services:
db-sbt: db-sbt:
condition: service_started condition: service_started
command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input && command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input &&
python manage.py makemigrations &&
python manage.py migrate && python manage.py migrate &&
python manage.py compilemessages && python manage.py compilemessages &&
gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod

View File

@ -145,7 +145,7 @@ services:
db-sbt: db-sbt:
restart: always restart: always
mem_reservation: 500m mem_reservation: 500m
image: postgres:14.7-alpine image: postgres:15.4-alpine
volumes: volumes:
- ./data/postgres_data:/var/lib/postgresql/data - ./data/postgres_data:/var/lib/postgresql/data
networks: networks: