sbt-idp/cope2n-api/fwd_api/celery_worker/internal_task.py
PhanThanhTrung 0f955cb039 update
2024-10-31 13:06:44 +07:00

295 lines
14 KiB
Python
Executable File

import base64
import copy
import csv
import json
import os
import time
import traceback
import uuid
from multiprocessing.pool import ThreadPool
from celery.utils.log import get_task_logger
from fwd import celery_app as app
from fwd import settings
from fwd_api.celery_worker.task_warpper import VerboseTask
from fwd_api.constant.common import FileCategory
from fwd_api.middleware.local_storage import get_current_trace_id
from fwd_api.models import (FeedbackRequest, Report, SubscriptionRequest,
SubscriptionRequestFile, UserProfile)
from fwd_api.utils.accuracy import predict_result_to_ready
from opentelemetry import trace
from ..constant.common import FolderFileType, image_extensions
from ..exception.exceptions import FileContentInvalidException
from ..utils import file as FileUtils
from ..utils import process as ProcessUtil
from ..utils import s3 as S3Util
from ..utils.accuracy import validate_feedback_file
logger = get_task_logger(__name__)
tracer = trace.get_tracer("sbt_celery_backend")
s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT,
access_key=settings.S3_ACCESS_KEY,
secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME
)
@tracer.start_as_current_span("process_pdf_file")
def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list:
try:
# Origin file
code = f'FIL{uuid.uuid4().hex}'
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=code,
doc_type=doc_type,
index_in_request=index_in_request)
new_request_file.save()
# Sub-file
return ProcessUtil.pdf_to_images_urls(FileUtils.get_file(file_path), request, user)
except Exception as e:
traceback.print_exc()
request.status = 400
request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"}
request.save()
return None
@tracer.start_as_current_span("process_image_file")
def process_image_file(file_name: str, file_path, request, user) -> list:
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}')
new_request_file.save()
return [{
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
'page_number': 0,
'request_file_id': new_request_file.code
}]
@app.task(base=VerboseTask, name="csv_feedback", track_started=True)
def process_csv_feedback(csv_file_path, feedback_id):
# load file to RAM
status = {}
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
# for rq in rqs
for row in reader:
# get request_subcription
request_id = row.get('requestId')
sub_rqs = SubscriptionRequest.objects.filter(request_id=request_id)
if len(sub_rqs) != 1:
status[request_id] = f"Found {len(sub_rqs)} records of request id {request_id}"
continue
else:
sub_rq = sub_rqs[0]
images = SubscriptionRequestFile.objects.filter(request=sub_rq)
fb = {}
# update user result (with validate)
redemption_id = row.get('redemptionNumber')
imei1 = row.get('imeiNumber')
imei2 = row.get('imeiNumber2')
purchase_date = row.get('Purchase Date')
retailer = row.get('retailer')
invoice_no = row.get('invoice_no')
sold_to_party = row.get('Sold to party')
server_time = float(row.get('timetakenmilli'))
fb['request_id'] = request_id
fb['retailername'] = retailer
fb['sold_to_party'] = sold_to_party
fb["invoice_no"] = invoice_no
fb['purchase_date'] = purchase_date
fb['imei_number'] = [imei1, imei2]
sub_rq.feedback_result = fb
sub_rq.client_request_time = server_time
# update redemption_id if exist
if len(redemption_id) > 0:
sub_rq.redemption_id = redemption_id
sub_rq.save()
# Update files
time_cost = {"imei": [], "invoice": [], "all": []}
imei_count = 0
if sub_rq.ai_inference_profile is None:
time_cost["imei"] = [-1 for _ in range(len(images))]
time_cost["invoice"] = [-1]
time_cost["all"] = [-1]
else:
for k, v in sub_rq.ai_inference_profile.items():
time_cost[k.split("_")[0]].append(v["inference"][1][0] - v["inference"][0] + (v["postprocess"][1]-v["postprocess"][0]))
for i, image in enumerate(images):
if image.file_category != FileCategory.Origin.value:
# skip break files, which are not responsible for storing data
continue
_predict_result = copy.deepcopy(predict_result_to_ready(sub_rq.predict_result))
_feedback_result = copy.deepcopy(sub_rq.feedback_result)
try:
image.processing_time = time_cost.get(image.doc_type, [0 for _ in range(image.index_in_request)])[image.index_in_request]
except Exception as e:
logger.error(f"image.doc_type: {image.doc_type} - image.index_in_request: {image.index_in_request} - time_cost: {time_cost} - {e}")
if not validate_feedback_file(_feedback_result, _predict_result):
status[request_id] = "Missalign imei number between feedback and predict"
# continue
if image.doc_type == "invoice":
_predict_result["imei_number"] = []
if _feedback_result:
_feedback_result["imei_number"] = []
else:
try:
_predict_result = {"retailername": None, "sold_to_party": None, "invoice_no": None, "purchase_date": [], "imei_number": [_predict_result["imei_number"][image.index_in_request]]}
_feedback_result = {"retailername": None, "sold_to_party": None, "invoice_no": None, "purchase_date": None, "imei_number": [_feedback_result["imei_number"][image.index_in_request]]} if _feedback_result and len(_feedback_result["imei_number"]) > image.index_in_request else None
except Exception as e:
logger.error(f"{request_id} - {e}")
image.predict_result = _predict_result
image.feedback_result = _feedback_result
# image.reviewed_result = _reviewed_result
image.save()
# update log into database
feedback_rq = FeedbackRequest.objects.filter(feedback_id=feedback_id).first()
feedback_rq.error_status = status
# save log to local
directory_name = os.path.dirname(csv_file_path)
file_path = csv_file_path.replace(".csv", "_error.json")
with open(file_path, "w") as outfile:
json.dump(status, outfile)
# save to s3
s3_key = os.path.join("feedback", directory_name.split("/")[-1], file_path.split("/")[-1])
if s3_client.s3_client is not None:
try:
# check if saved then delete local
s3_client.upload_file(file_path, s3_key)
os.remove(file_path)
except Exception as e:
logger.error(f"Unable to set S3: {e}")
feedback_rq.save()
@app.task(base=VerboseTask, name='do_pdf', track_started=True)
def process_pdf(rq_id, sub_id, p_type, user_id, files):
"""
files: [{
"index_in_request": int,
"idx": int,
"file_name": "",
"file_path": "", # local path to file
"file_type": ""
},]
"""
new_request = SubscriptionRequest.objects.filter(request_id=rq_id).first()
user = UserProfile.objects.filter(id=user_id).first()
new_request.pages = len(files)
new_request.pages_left = len(files)
def process_and_save_file(data):
idx, file = data
extension = file["file_name"].split(".")[-1].lower()
if extension == "pdf":
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user, file["file_type"], file["index_in_request"])
if _b_urls is None:
new_request.status = 400
new_request.save()
raise FileContentInvalidException
for j in range(len(_b_urls)):
_b_urls[j]["doc_type"] = file["file_type"]
_b_urls[j]["page_number"] = idx
_b_urls[j]["index_to_image_type"] = file["index_in_request"]
return idx, _b_urls[0]
elif extension in image_extensions:
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user, file["file_type"], file["index_in_request"])[0]
this_url["page_number"] = idx
this_url["index_to_image_type"] = file["index_in_request"]
if file["file_type"]:
this_url["doc_type"] = file["file_type"]
return idx, this_url
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
start_time = time.time()
b_urls = [None] * len(files)
pool = ThreadPool(processes=2)
files_with_idx = [(idx, file) for idx, file in enumerate(files)]
for idx, url in pool.map(process_and_save_file, files_with_idx):
b_urls[idx] = url
preprocessing_time = time.time() - start_time
# TODO: send to queue with different request_ids
to_queue = []
ai_inference_start_time = time.time()
for i, b_url in enumerate(b_urls):
file_meta = {}
fractorized_request_id = rq_id + f"_sub_{i}"
file_meta["doc_type"] = b_url["doc_type"]
file_meta["ai_inference_start_time"] = ai_inference_start_time
file_meta["ai_inference_profile"] = {}
file_meta["index_in_request"] = i
file_meta["preprocessing_time"] = preprocessing_time
file_meta["index_to_image_type"] = b_url["index_to_image_type"]
file_meta["subsidiary"] = new_request.subsidiary
file_meta["request_id"] = rq_id
file_meta["trace_id"] = get_current_trace_id()
to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type, file_meta))
# Send to next queue
for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue:
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata)
@app.task(base=VerboseTask, name='upload_file_to_s3', track_started=False)
def upload_file_to_s3(local_file_path, s3_key, request_id):
if s3_client.s3_client is not None:
try:
s3_client.upload_file(local_file_path, s3_key)
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)[0]
sub_request.S3_uploaded = True
sub_request.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='upload_feedback_to_s3', track_started=True)
def upload_feedback_to_s3(local_file_path, s3_key, feedback_id):
if s3_client.s3_client is not None:
try:
s3_client.upload_file(local_file_path, s3_key)
feed_request = FeedbackRequest.objects.filter(feedback_id=feedback_id)[0]
feed_request.S3_uploaded = True
feed_request.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='upload_report_to_s3', track_started=True)
def upload_report_to_s3(local_file_path, s3_key, report_id, delay):
if s3_client.s3_client is not None:
try:
time.sleep(delay)
s3_client.upload_file(local_file_path, s3_key)
if report_id:
report = Report.objects.filter(report_id=report_id)[0]
report.S3_uploaded = True
report.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='remove_local_file', track_started=False)
def remove_local_file(local_file_path, request_id):
logger.info(f"Removing local file: {local_file_path}, ...")
try:
os.remove(local_file_path)
except Exception as e:
logger.info(f"Unable to remove local file: {e}")
@app.task(base=VerboseTask, name='upload_obj_to_s3', track_started=True)
def upload_obj_to_s3(byte_obj, s3_key):
if s3_client.s3_client is not None:
obj = base64.b64decode(byte_obj)
res = s3_client.update_object(s3_key, obj)
else:
logger.info(f"S3 is not available, skipping,...")