sbt-idp/cope2n-api/fwd_api/celery_worker/internal_task.py

292 lines
13 KiB
Python
Executable File

import time
import uuid
import os
import base64
import traceback
from multiprocessing.pool import ThreadPool
from fwd_api.models import SubscriptionRequest, UserProfile
from fwd_api.celery_worker.worker import app
from fwd_api.celery_worker.task_warpper import VerboseTask
from ..constant.common import FolderFileType, image_extensions
from ..exception.exceptions import FileContentInvalidException
from fwd_api.models import SubscriptionRequestFile, FeedbackRequest, Report
from ..utils import file as FileUtils
from ..utils import process as ProcessUtil
from ..utils import s3 as S3Util
from ..utils.accuracy import validate_feedback_file
from fwd_api.constant.common import FileCategory
from fwd_api.middleware.local_storage import get_current_trace_id
import csv
import json
import copy
from fwd_api.utils.accuracy import predict_result_to_ready
from celery.utils.log import get_task_logger
from fwd import settings
logger = get_task_logger(__name__)
s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME
)
def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list:
try:
# Origin file
code = f'FIL{uuid.uuid4().hex}'
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=code,
doc_type=doc_type,
index_in_request=index_in_request)
new_request_file.save()
# Sub-file
return ProcessUtil.pdf_to_images_urls(FileUtils.get_file(file_path), request, user)
except Exception as e:
traceback.print_exc()
request.status = 400
request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"}
request.save()
return None
def process_image_file(file_name: str, file_path, request, user) -> list:
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}')
new_request_file.save()
return [{
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
'page_number': 0,
'request_file_id': new_request_file.code
}]
@app.task(base=VerboseTask, name="csv_feedback")
def process_csv_feedback(csv_file_path, feedback_id):
# load file to RAM
status = {}
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
# for rq in rqs
for row in reader:
# get request_subcription
request_id = row.get('requestId')
sub_rqs = SubscriptionRequest.objects.filter(request_id=request_id)
if len(sub_rqs) != 1:
status[request_id] = f"Found {len(sub_rqs)} records of request id {request_id}"
continue
else:
sub_rq = sub_rqs[0]
images = SubscriptionRequestFile.objects.filter(request=sub_rq)
fb = {}
# update user result (with validate)
redemption_id = row.get('redemptionNumber')
imei1 = row.get('imeiNumber')
imei2 = row.get('imeiNumber2')
purchase_date = row.get('Purchase Date')
retailer = row.get('retailer')
invoice_no = row.get('invoice_no')
sold_to_party = row.get('Sold to party')
server_time = float(row.get('timetakenmilli'))
fb['request_id'] = request_id
fb['retailername'] = retailer
fb['sold_to_party'] = sold_to_party
fb["invoice_no"] = invoice_no
fb['purchase_date'] = purchase_date
fb['imei_number'] = [imei1, imei2]
sub_rq.feedback_result = fb
sub_rq.client_request_time = server_time
# update redemption_id if exist
if len(redemption_id) > 0:
sub_rq.redemption_id = redemption_id
sub_rq.save()
# Update files
time_cost = {"imei": [], "invoice": [], "all": []}
imei_count = 0
if sub_rq.ai_inference_profile is None:
time_cost["imei"] = [-1 for _ in range(len(images))]
time_cost["invoice"] = [-1]
time_cost["all"] = [-1]
else:
for k, v in sub_rq.ai_inference_profile.items():
time_cost[k.split("_")[0]].append(v["inference"][1][0] - v["inference"][0] + (v["postprocess"][1]-v["postprocess"][0]))
for i, image in enumerate(images):
if image.file_category != FileCategory.Origin.value:
# skip break files, which are not responsible for storing data
continue
_predict_result = copy.deepcopy(predict_result_to_ready(sub_rq.predict_result))
_feedback_result = copy.deepcopy(sub_rq.feedback_result)
try:
image.processing_time = time_cost.get(image.doc_type, [0 for _ in range(image.index_in_request)])[image.index_in_request]
except Exception as e:
logger.error(f"image.doc_type: {image.doc_type} - image.index_in_request: {image.index_in_request} - time_cost: {time_cost} - {e}")
if not validate_feedback_file(_feedback_result, _predict_result):
status[request_id] = "Missalign imei number between feedback and predict"
# continue
if image.doc_type == "invoice":
_predict_result["imei_number"] = []
if _feedback_result:
_feedback_result["imei_number"] = []
else:
try:
_predict_result = {"retailername": None, "sold_to_party": None, "invoice_no": None, "purchase_date": [], "imei_number": [_predict_result["imei_number"][image.index_in_request]]}
_feedback_result = {"retailername": None, "sold_to_party": None, "invoice_no": None, "purchase_date": None, "imei_number": [_feedback_result["imei_number"][image.index_in_request]]} if _feedback_result and len(_feedback_result["imei_number"]) > image.index_in_request else None
except Exception as e:
logger.error(f"{request_id} - {e}")
image.predict_result = _predict_result
image.feedback_result = _feedback_result
# image.reviewed_result = _reviewed_result
image.save()
# update log into database
feedback_rq = FeedbackRequest.objects.filter(feedback_id=feedback_id).first()
feedback_rq.error_status = status
# save log to local
directory_name = os.path.dirname(csv_file_path)
file_path = csv_file_path.replace(".csv", "_error.json")
with open(file_path, "w") as outfile:
json.dump(status, outfile)
# save to s3
s3_key = os.path.join("feedback", directory_name.split("/")[-1], file_path.split("/")[-1])
if s3_client.s3_client is not None:
try:
# check if saved then delete local
s3_client.upload_file(file_path, s3_key)
os.remove(file_path)
except Exception as e:
logger.error(f"Unable to set S3: {e}")
feedback_rq.save()
@app.task(base=VerboseTask, name='do_pdf')
def process_pdf(rq_id, sub_id, p_type, user_id, files):
"""
files: [{
"index_in_request": int,
"idx": int,
"file_name": "",
"file_path": "", # local path to file
"file_type": ""
},]
"""
new_request = SubscriptionRequest.objects.filter(request_id=rq_id).first()
user = UserProfile.objects.filter(id=user_id).first()
new_request.pages = len(files)
new_request.pages_left = len(files)
def process_and_save_file(data):
idx, file = data
extension = file["file_name"].split(".")[-1].lower()
if extension == "pdf":
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user, file["file_type"], file["index_in_request"])
if _b_urls is None:
new_request.status = 400
new_request.save()
raise FileContentInvalidException
for j in range(len(_b_urls)):
_b_urls[j]["doc_type"] = file["file_type"]
_b_urls[j]["page_number"] = idx
_b_urls[j]["index_to_image_type"] = file["index_in_request"]
return idx, _b_urls[0]
elif extension in image_extensions:
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user, file["file_type"], file["index_in_request"])[0]
this_url["page_number"] = idx
this_url["index_to_image_type"] = file["index_in_request"]
if file["file_type"]:
this_url["doc_type"] = file["file_type"]
return idx, this_url
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
start_time = time.time()
b_urls = [None] * len(files)
pool = ThreadPool(processes=2)
files_with_idx = [(idx, file) for idx, file in enumerate(files)]
for idx, url in pool.map(process_and_save_file, files_with_idx):
b_urls[idx] = url
preprocessing_time = time.time() - start_time
# TODO: send to queue with different request_ids
to_queue = []
ai_inference_start_time = time.time()
for i, b_url in enumerate(b_urls):
file_meta = {}
fractorized_request_id = rq_id + f"_sub_{i}"
file_meta["doc_type"] = b_url["doc_type"]
file_meta["ai_inference_start_time"] = ai_inference_start_time
file_meta["ai_inference_profile"] = {}
file_meta["index_in_request"] = i
file_meta["preprocessing_time"] = preprocessing_time
file_meta["index_to_image_type"] = b_url["index_to_image_type"]
file_meta["subsidiary"] = new_request.subsidiary
file_meta["request_id"] = rq_id
file_meta["trace_id"] = get_current_trace_id()
to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type, file_meta))
# Send to next queue
for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue:
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata)
@app.task(base=VerboseTask, name='upload_file_to_s3')
def upload_file_to_s3(local_file_path, s3_key, request_id):
if s3_client.s3_client is not None:
try:
s3_client.upload_file(local_file_path, s3_key)
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)[0]
sub_request.S3_uploaded = True
sub_request.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='upload_feedback_to_s3')
def upload_feedback_to_s3(local_file_path, s3_key, feedback_id):
if s3_client.s3_client is not None:
try:
s3_client.upload_file(local_file_path, s3_key)
feed_request = FeedbackRequest.objects.filter(feedback_id=feedback_id)[0]
feed_request.S3_uploaded = True
feed_request.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='upload_report_to_s3')
def upload_report_to_s3(local_file_path, s3_key, report_id, delay):
if s3_client.s3_client is not None:
try:
time.sleep(delay)
s3_client.upload_file(local_file_path, s3_key)
if report_id:
report = Report.objects.filter(report_id=report_id)[0]
report.S3_uploaded = True
report.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='remove_local_file')
def remove_local_file(local_file_path, request_id):
logger.info(f"Removing local file: {local_file_path}, ...")
try:
os.remove(local_file_path)
except Exception as e:
logger.info(f"Unable to remove local file: {e}")
@app.task(base=VerboseTask, name='upload_obj_to_s3')
def upload_obj_to_s3(byte_obj, s3_key):
if s3_client.s3_client is not None:
obj = base64.b64decode(byte_obj)
res = s3_client.update_object(s3_key, obj)
else:
logger.info(f"S3 is not available, skipping,...")