233 lines
8.6 KiB
Python
Executable File
233 lines
8.6 KiB
Python
Executable File
import traceback
|
|
import time
|
|
import uuid
|
|
import logging
|
|
|
|
from copy import deepcopy
|
|
|
|
from fwd_api.celery_worker.worker import app
|
|
from fwd_api.models import SubscriptionRequest
|
|
from fwd_api.exception.exceptions import InvalidException
|
|
from fwd_api.models import SubscriptionRequest, SubscriptionRequestFile
|
|
from fwd_api.constant.common import ProcessType
|
|
from fwd_api.utils.redis import RedisUtils
|
|
from fwd_api.utils import process as ProcessUtil
|
|
|
|
redis_client = RedisUtils()
|
|
|
|
def aggregate_result(results):
|
|
sorted_results = [None] * len(results)
|
|
doc_types = []
|
|
for index, result in results.items():
|
|
index = int(index)
|
|
doc_type = result.get("metadata", {}).get("doc_type", "all")
|
|
doc_types.append(doc_type)
|
|
sorted_results[index] = ((doc_type, result))
|
|
|
|
des_result = deepcopy(list(results.values()))[0]
|
|
des_result.pop("metadata", None)
|
|
des_result["content"]["total_pages"] = 0
|
|
des_result["content"]["ocr_num_pages"] = 0
|
|
des_result["content"]["document"][0]["end_page"] = 0
|
|
des_result["content"]["document"][0]["content"][3]["value"] = [None for _ in range(doc_types.count("imei"))]
|
|
des_result["content"]["document"][0]["content"][2]["value"] = []
|
|
|
|
imei_count = 0
|
|
for doc_type, result in sorted_results:
|
|
des_result["content"]["total_pages"] += 1
|
|
des_result["content"]["ocr_num_pages"] += 1
|
|
des_result["content"]["document"][0]["end_page"] += 1
|
|
if doc_type == "imei":
|
|
des_result["content"]["document"][0]["content"][3]["value"][imei_count] = result["content"]["document"][0]["content"][3]["value"][0]
|
|
imei_count += 1
|
|
elif doc_type == "invoice":
|
|
des_result["content"]["document"][0]["content"][0]["value"] = result["content"]["document"][0]["content"][0]["value"]
|
|
des_result["content"]["document"][0]["content"][1]["value"] = result["content"]["document"][0]["content"][1]["value"]
|
|
des_result["content"]["document"][0]["content"][2]["value"] += result["content"]["document"][0]["content"][2]["value"]
|
|
elif doc_type == "all":
|
|
des_result.update(result)
|
|
else:
|
|
raise InvalidException(f"doc_type: {doc_type}")
|
|
|
|
return des_result
|
|
|
|
def print_id(rq_id):
|
|
print(" [x] Received {rq}".format(rq=rq_id))
|
|
|
|
|
|
def to_status(result):
|
|
print('X')
|
|
if 'status' in result and result['status'] not in [200, 201, 202]:
|
|
return 4
|
|
return 3
|
|
|
|
|
|
def update_user(rq: SubscriptionRequest):
|
|
sub = rq.subscription
|
|
predict_status = rq.status
|
|
if predict_status == 3:
|
|
sub.current_token += ProcessUtil.token_value(int(rq.process_type))
|
|
sub.save()
|
|
|
|
@app.task(name='process_sap_invoice_result')
|
|
def process_invoice_sap_result(rq_id, result):
|
|
try:
|
|
rq: SubscriptionRequest = \
|
|
SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.INVOICE.value)[0]
|
|
status = to_status(result)
|
|
|
|
rq.predict_result = result
|
|
rq.status = status
|
|
rq.save()
|
|
|
|
update_user(rq)
|
|
except IndexError as e:
|
|
print(e)
|
|
print("NotFound request by requestId, %d", rq_id)
|
|
except Exception as e:
|
|
print(e)
|
|
print("Fail Invoice %d", rq_id)
|
|
traceback.print_exc()
|
|
return "FailInvoice"
|
|
|
|
|
|
@app.task(name='process_fi_invoice_result')
|
|
def process_invoice_fi_result(rq_id, result):
|
|
try:
|
|
rq: SubscriptionRequest = \
|
|
SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.FI_INVOICE.value)[0]
|
|
status = to_status(result)
|
|
|
|
rq.predict_result = result
|
|
rq.status = status
|
|
rq.save()
|
|
|
|
update_user(rq)
|
|
except IndexError as e:
|
|
print(e)
|
|
print("NotFound request by requestId, %d", rq_id)
|
|
except Exception as e:
|
|
print(e)
|
|
print("Fail Invoice %d", rq_id)
|
|
traceback.print_exc()
|
|
return "FailInvoice"
|
|
|
|
@app.task(name='process_manulife_invoice_result')
|
|
def process_invoice_manulife_result(rq_id, result):
|
|
try:
|
|
rq: SubscriptionRequest = \
|
|
SubscriptionRequest.objects.filter(request_id=rq_id, process_type=ProcessType.MANULIFE_INVOICE.value)[0]
|
|
status = to_status(result)
|
|
|
|
rq.predict_result = result
|
|
rq.status = status
|
|
rq.save()
|
|
|
|
update_user(rq)
|
|
except IndexError as e:
|
|
print(e)
|
|
print("NotFound request by requestId, %d", rq_id)
|
|
except Exception as e:
|
|
print(e)
|
|
print("Fail Invoice %d", rq_id)
|
|
traceback.print_exc()
|
|
return "FailInvoice"
|
|
|
|
random_processor_name = None
|
|
|
|
@app.task(name='process_sbt_invoice_result')
|
|
def process_invoice_sbt_result(rq_id, result, metadata):
|
|
global random_processor_name
|
|
if random_processor_name is None:
|
|
random_processor_name = uuid.uuid4()
|
|
print(rq_id, random_processor_name)
|
|
print_id(f"[DEBUG]: Received SBT request with id {rq_id}")
|
|
try:
|
|
page_index = int(rq_id.split("_sub_")[1])
|
|
rq_id = rq_id.split("_sub_")[0]
|
|
rq: SubscriptionRequest = SubscriptionRequest.objects.filter(request_id=rq_id).first()
|
|
|
|
result["metadata"] = metadata
|
|
# status = to_status(result)
|
|
status = result.get("status", 200)
|
|
redis_client.set_cache(rq_id, page_index, result)
|
|
done = rq.pages == redis_client.get_size(rq_id)
|
|
if status == 200:
|
|
if done:
|
|
rq.status = 200 # stop waiting
|
|
results = redis_client.get_all_cache(rq_id)
|
|
rq.predict_result = aggregate_result(results)
|
|
# print(f"[DEBUG]: rq.predict_result: {rq.predict_result}")
|
|
ai_inference_profile = {}
|
|
doc_type_string = ""
|
|
for idx, result in results.items():
|
|
ai_inference_profile["{doc_type}_{idx}".format(doc_type=result.get("metadata", {}).get("doc_type", "all"), idx=result.get("metadata", {}).get("index_in_request", 0))] = result.get("metadata", {}).get("ai_inference_profile", {})
|
|
doc_type_string += "{},".format(result.get("metadata", {}).get("doc_type", "all"))
|
|
doc_type_string = doc_type_string[:-1]
|
|
rq.ai_inference_profile = ai_inference_profile
|
|
rq.doc_type = doc_type_string
|
|
rq.ai_inference_start_time = result.get("metadata", {}).get("ai_inference_start_time", -1) # advancing the last result
|
|
rq.preprocessing_time = result.get("metadata", {}).get("preprocessing_time", 0) # advancing the last result
|
|
rq.ai_inference_time = time.time() - rq.ai_inference_start_time
|
|
rq.save()
|
|
else:
|
|
rq.status = 404 # stop waiting
|
|
rq.predict_result = result
|
|
rq.save()
|
|
_update_subscription_rq_file(request_id=rq_id)
|
|
update_user(rq)
|
|
except IndexError as e:
|
|
print(e)
|
|
print("NotFound request by requestId, %d", rq_id)
|
|
rq.ai_inference_time = 0
|
|
rq.save()
|
|
_update_subscription_rq_file(request_id=rq_id)
|
|
except Exception as e:
|
|
print(e)
|
|
print("Fail Invoice %d", rq_id)
|
|
traceback.print_exc()
|
|
rq.ai_inference_time = 0
|
|
rq.save()
|
|
_update_subscription_rq_file(request_id=rq_id)
|
|
return "FailInvoice"
|
|
|
|
|
|
def _update_subscription_rq_file(request_id):
|
|
sub_rqs = SubscriptionRequest.objects.filter(request_id=request_id).first()
|
|
result = sub_rqs.predict_result
|
|
if result is None:
|
|
return
|
|
|
|
files = SubscriptionRequestFile.objects.filter(request=sub_rqs)
|
|
for image in files:
|
|
retailer_name = None
|
|
sold_to_party = None
|
|
purchase_date = []
|
|
imei_number = []
|
|
predicted_res = __get_actual_predict_result(result=result)
|
|
if len(predicted_res)!=0:
|
|
for elem in predicted_res:
|
|
if elem["label"] == "retailername":
|
|
retailer_name = elem['value']
|
|
elif elem["label"] == "sold_to_party":
|
|
sold_to_party = elem['value']
|
|
elif elem["label"] == "purchase_date":
|
|
purchase_date=elem['value']
|
|
else:
|
|
imei_number=elem['value']
|
|
|
|
_predict_result = {
|
|
"retailername": retailer_name,
|
|
"sold_to_party": sold_to_party,
|
|
"purchase_date": purchase_date,
|
|
"imei_number": imei_number
|
|
}
|
|
image.predict_result = _predict_result
|
|
image.save()
|
|
|
|
def __get_actual_predict_result(result: dict):
|
|
predicted_res = result.get('content', {}).get('document', [])
|
|
if len(predicted_res)==0:
|
|
return []
|
|
predicted_res = predicted_res[0].get('content', [])
|
|
return predicted_res |