sbt-idp/cope2n-api/fwd_api/celery_worker/process_result_tasks.py

197 lines
6.6 KiB
Python
Raw Normal View History

2023-12-13 09:01:31 +00:00
import traceback
2023-12-22 07:03:56 +00:00
import time
import uuid
2023-12-26 11:44:03 +00:00
import logging
2023-12-13 09:01:31 +00:00
2023-12-13 11:43:10 +00:00
from copy import deepcopy
2023-11-30 11:19:06 +00:00
from fwd_api.celery_worker.worker import app
from fwd_api.models import SubscriptionRequest
2023-12-08 12:49:00 +00:00
from fwd_api.exception.exceptions import InvalidException
2023-12-13 09:01:31 +00:00
from fwd_api.models import SubscriptionRequest
from fwd_api.constant.common import ProcessType
2023-12-15 07:12:23 +00:00
from fwd_api.utils.redis import RedisUtils
2023-12-15 05:43:19 +00:00
from fwd_api.utils import process as ProcessUtil
2023-12-13 11:43:10 +00:00
redis_client = RedisUtils()
2023-12-08 12:49:00 +00:00
2023-12-15 05:43:19 +00:00
def aggregate_result(results, doc_types):
2023-12-13 11:43:10 +00:00
doc_types = doc_types.split(',')
2023-12-08 12:49:00 +00:00
2023-12-15 05:43:19 +00:00
des_result = deepcopy(list(results.values()))[0]
2023-12-13 11:43:10 +00:00
des_result["content"]["total_pages"] = 0
des_result["content"]["ocr_num_pages"] = 0
des_result["content"]["document"][0]["end_page"] = 0
des_result["content"]["document"][0]["content"][3]["value"] = [None for _ in range(doc_types.count("imei"))]
des_result["content"]["document"][0]["content"][2]["value"] = []
2023-12-14 03:54:03 +00:00
2023-12-15 05:43:19 +00:00
sorted_results = [None] * len(doc_types)
for index, result in results.items():
2023-12-13 11:43:10 +00:00
index = int(index)
doc_type = doc_types[index]
2023-12-15 05:43:19 +00:00
sorted_results[index] = ((doc_type, result))
2023-12-13 11:43:10 +00:00
2023-12-15 05:43:19 +00:00
imei_count = 0
for doc_type, result in sorted_results:
2023-12-13 11:43:10 +00:00
des_result["content"]["total_pages"] += 1
des_result["content"]["ocr_num_pages"] += 1
des_result["content"]["document"][0]["end_page"] += 1
if doc_type == "imei":
2023-12-15 05:43:19 +00:00
des_result["content"]["document"][0]["content"][3]["value"][imei_count] = result["content"]["document"][0]["content"][3]["value"][0]
imei_count += 1
2023-12-13 11:43:10 +00:00
elif doc_type == "invoice":
2023-12-15 05:43:19 +00:00
des_result["content"]["document"][0]["content"][0]["value"] = result["content"]["document"][0]["content"][0]["value"]
des_result["content"]["document"][0]["content"][1]["value"] = result["content"]["document"][0]["content"][1]["value"]
des_result["content"]["document"][0]["content"][2]["value"] += result["content"]["document"][0]["content"][2]["value"]
2023-12-13 11:43:10 +00:00
elif doc_type == "all":
2023-12-15 05:43:19 +00:00
des_result.update(result)
2023-12-13 11:43:10 +00:00
else:
raise InvalidException(f"doc_type: {doc_type}")
2023-12-08 12:49:00 +00:00
return des_result
2023-11-30 11:19:06 +00:00
def print_id(rq_id):
print(" [x] Received {rq}".format(rq=rq_id))
def to_status(result):
print('X')
if 'status' in result and result['status'] not in [200, 201, 202]:
return 4
return 3
def update_user(rq: SubscriptionRequest):
sub = rq.subscription
predict_status = rq.status
if predict_status == 3:
sub.current_token += ProcessUtil.token_value(int(rq.process_type))
sub.save()
@app.task(name='process_sap_invoice_result')
def process_invoice_sap_result(rq_id, result):
try:
rq: SubscriptionRequest = \
SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.INVOICE.value)[0]
status = to_status(result)
rq.predict_result = result
rq.status = status
rq.save()
update_user(rq)
except IndexError as e:
print(e)
print("NotFound request by requestId, %d", rq_id)
except Exception as e:
print(e)
print("Fail Invoice %d", rq_id)
2023-12-13 09:01:31 +00:00
traceback.print_exc()
2023-11-30 11:19:06 +00:00
return "FailInvoice"
@app.task(name='process_fi_invoice_result')
def process_invoice_fi_result(rq_id, result):
try:
rq: SubscriptionRequest = \
SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.FI_INVOICE.value)[0]
status = to_status(result)
rq.predict_result = result
rq.status = status
rq.save()
update_user(rq)
except IndexError as e:
print(e)
print("NotFound request by requestId, %d", rq_id)
except Exception as e:
print(e)
print("Fail Invoice %d", rq_id)
2023-12-13 09:01:31 +00:00
traceback.print_exc()
2023-11-30 11:19:06 +00:00
return "FailInvoice"
@app.task(name='process_manulife_invoice_result')
def process_invoice_manulife_result(rq_id, result):
try:
rq: SubscriptionRequest = \
SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.MANULIFE_INVOICE.value)[0]
status = to_status(result)
rq.predict_result = result
rq.status = status
rq.save()
update_user(rq)
except IndexError as e:
print(e)
print("NotFound request by requestId, %d", rq_id)
except Exception as e:
print(e)
print("Fail Invoice %d", rq_id)
2023-12-13 09:01:31 +00:00
traceback.print_exc()
2023-11-30 11:19:06 +00:00
return "FailInvoice"
2023-12-22 07:03:56 +00:00
random_processor_name = None
2023-11-30 11:19:06 +00:00
@app.task(name='process_sbt_invoice_result')
def process_invoice_sbt_result(rq_id, result):
2023-12-22 07:03:56 +00:00
global random_processor_name
if random_processor_name is None:
random_processor_name = uuid.uuid4()
print(rq_id, random_processor_name)
2023-11-30 11:19:06 +00:00
print_id(f"[DEBUG]: Received SBT request with id {rq_id}")
try:
2023-12-08 12:49:00 +00:00
page_index = int(rq_id.split("_sub_")[1])
rq_id = rq_id.split("_sub_")[0]
2023-12-26 14:05:35 +00:00
rq: SubscriptionRequest = SubscriptionRequest.objects.filter(request_id=rq_id).first()
2023-12-26 11:44:03 +00:00
for i in range(10):
if rq.ai_inference_start_time == 0:
logging.warn(f"ai_inference_start_time = 0, looks like database is lagging, attemp {i} in 0.2 second ...")
rq.refresh_from_db()
time.sleep(0.2)
if i == 9: # return an error
logging.warn("Unable to retrieve rq, exiting")
rq.status = 404 # stop waiting
rq.predict_result = result
rq.save()
update_user(rq)
return "FailInvoice"
else:
break
2023-12-08 12:49:00 +00:00
# status = to_status(result)
status = result.get("status", 200)
2023-12-13 11:43:10 +00:00
redis_client.set_cache(rq_id, page_index, result)
done = rq.pages == redis_client.get_size(rq_id)
2023-12-08 12:49:00 +00:00
if status == 200:
2023-12-13 11:43:10 +00:00
if done:
2023-12-08 12:49:00 +00:00
rq.status = 200 # stop waiting
2023-12-13 11:43:10 +00:00
results = redis_client.get_all_cache(rq_id)
rq.predict_result = aggregate_result(results, rq.doc_type)
2023-12-14 03:54:03 +00:00
# print(f"[DEBUG]: rq.predict_result: {rq.predict_result}")
2023-12-13 11:43:10 +00:00
rq.save()
2023-12-08 12:49:00 +00:00
else:
rq.status = 404 # stop waiting
2023-12-13 11:43:10 +00:00
rq.predict_result = result
rq.save()
2023-11-30 11:19:06 +00:00
2023-12-22 07:03:56 +00:00
rq.ai_inference_time = time.time() - rq.ai_inference_start_time
rq.save()
2023-11-30 11:19:06 +00:00
update_user(rq)
except IndexError as e:
print(e)
print("NotFound request by requestId, %d", rq_id)
2023-12-22 07:03:56 +00:00
rq.ai_inference_time = 0
rq.save()
2023-11-30 11:19:06 +00:00
except Exception as e:
print(e)
print("Fail Invoice %d", rq_id)
2023-12-13 09:01:31 +00:00
traceback.print_exc()
2023-12-22 07:03:56 +00:00
rq.ai_inference_time = 0
rq.save()
2023-11-30 11:19:06 +00:00
return "FailInvoice"