2023-11-30 11:19:06 +00:00
|
|
|
import time
|
|
|
|
import uuid
|
|
|
|
import os
|
|
|
|
import base64
|
2023-12-13 09:01:31 +00:00
|
|
|
import traceback
|
2023-12-21 10:31:55 +00:00
|
|
|
from multiprocessing.pool import ThreadPool
|
2023-11-30 11:19:06 +00:00
|
|
|
|
2023-12-12 05:54:34 +00:00
|
|
|
from fwd_api.models import SubscriptionRequest, UserProfile
|
2023-11-30 11:19:06 +00:00
|
|
|
from fwd_api.celery_worker.worker import app
|
2023-12-12 05:54:34 +00:00
|
|
|
from ..constant.common import FolderFileType, image_extensions
|
2023-12-05 05:59:06 +00:00
|
|
|
from ..exception.exceptions import FileContentInvalidException
|
2024-02-01 07:32:20 +00:00
|
|
|
from fwd_api.models import SubscriptionRequestFile, FeedbackRequest, Report
|
2023-12-15 05:43:19 +00:00
|
|
|
from ..utils import file as FileUtils
|
|
|
|
from ..utils import process as ProcessUtil
|
|
|
|
from ..utils import s3 as S3Util
|
2023-12-26 14:05:35 +00:00
|
|
|
from fwd_api.constant.common import ProcessType
|
2024-01-09 12:41:17 +00:00
|
|
|
import csv
|
|
|
|
import json
|
2023-12-15 05:43:19 +00:00
|
|
|
|
2023-11-30 11:19:06 +00:00
|
|
|
from celery.utils.log import get_task_logger
|
|
|
|
from fwd import settings
|
|
|
|
|
|
|
|
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
2023-12-15 05:43:19 +00:00
|
|
|
s3_client = S3Util.MinioS3Client(
|
2023-11-30 11:19:06 +00:00
|
|
|
endpoint=settings.S3_ENDPOINT,
|
|
|
|
access_key=settings.S3_ACCESS_KEY,
|
|
|
|
secret_key=settings.S3_SECRET_KEY,
|
|
|
|
bucket_name=settings.S3_BUCKET_NAME
|
2023-12-13 09:01:31 +00:00
|
|
|
)
|
2023-11-30 11:19:06 +00:00
|
|
|
|
2024-01-17 09:28:50 +00:00
|
|
|
def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list:
|
2023-12-05 05:59:06 +00:00
|
|
|
try:
|
|
|
|
# Origin file
|
2024-01-17 09:28:50 +00:00
|
|
|
code = f'FIL{uuid.uuid4().hex}'
|
2023-12-05 05:59:06 +00:00
|
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
|
|
|
request=request,
|
|
|
|
file_name=file_name,
|
2024-01-17 09:28:50 +00:00
|
|
|
code=code,
|
|
|
|
doc_type=doc_type,
|
|
|
|
index_in_request=index_in_request)
|
2023-12-05 05:59:06 +00:00
|
|
|
new_request_file.save()
|
|
|
|
# Sub-file
|
2023-12-13 09:01:31 +00:00
|
|
|
return ProcessUtil.pdf_to_images_urls(FileUtils.get_file(file_path), request, user)
|
2023-12-05 05:59:06 +00:00
|
|
|
except Exception as e:
|
2023-12-13 09:01:31 +00:00
|
|
|
traceback.print_exc()
|
2023-12-05 05:59:06 +00:00
|
|
|
request.status = 400
|
|
|
|
request.predict_result = {"status": 400, "content": "", "message": f"Unable to extract pdf files {e}"}
|
|
|
|
request.save()
|
|
|
|
return None
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
|
|
|
|
def process_image_file(file_name: str, file_path, request, user) -> list:
|
|
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
|
|
|
request=request,
|
|
|
|
file_name=file_name,
|
|
|
|
code=f'FIL{uuid.uuid4().hex}')
|
|
|
|
new_request_file.save()
|
|
|
|
return [{
|
|
|
|
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
|
|
|
|
'page_number': 0,
|
|
|
|
'request_file_id': new_request_file.code
|
|
|
|
}]
|
|
|
|
|
2024-01-09 12:41:17 +00:00
|
|
|
@app.task(name="csv_feedback")
|
|
|
|
def process_csv_feedback(csv_file_path, feedback_id):
|
|
|
|
# load file to RAM
|
|
|
|
status = {}
|
|
|
|
with open(csv_file_path, 'r') as file:
|
|
|
|
reader = csv.DictReader(file)
|
|
|
|
# for rq in rqs
|
|
|
|
for row in reader:
|
|
|
|
# get request_subcription
|
|
|
|
request_id = row.get('requestId')
|
|
|
|
sub_rqs = SubscriptionRequest.objects.filter(request_id=request_id)
|
|
|
|
if len(sub_rqs) != 1:
|
|
|
|
status[request_id] = f"Found {len(sub_rqs)} records of request id {request_id}"
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
sub_rq = sub_rqs[0]
|
|
|
|
fb = {}
|
|
|
|
# update user result (with validate)
|
|
|
|
redemption_id = row.get('redemptionNumber')
|
|
|
|
imei1 = row.get('imeiNumber')
|
|
|
|
imei2 = row.get('imeiNumber2')
|
|
|
|
purchase_date = row.get('Purchase Date')
|
|
|
|
retailer = row.get('retailer')
|
|
|
|
sold_to_party = row.get('Sold to party')
|
|
|
|
server_time = float(row.get('timetakenmilli'))
|
|
|
|
fb['request_id'] = request_id
|
|
|
|
fb['retailername'] = retailer
|
|
|
|
fb['sold_to_party'] = sold_to_party
|
|
|
|
fb['purchase_date'] = purchase_date
|
|
|
|
fb['imei_number'] = [imei1, imei2]
|
|
|
|
sub_rq.feedback_result = fb
|
|
|
|
sub_rq.client_request_time = server_time
|
|
|
|
# update redemption_id if exist
|
|
|
|
if len(redemption_id) > 0:
|
|
|
|
sub_rq.redemption_id = redemption_id
|
|
|
|
sub_rq.save()
|
|
|
|
# update log into database
|
|
|
|
feedback_rq = FeedbackRequest.objects.filter(feedback_id=feedback_id).first()
|
|
|
|
feedback_rq.error_status = status
|
|
|
|
# save log to local
|
|
|
|
directory_name = os.path.dirname(csv_file_path)
|
|
|
|
file_path = csv_file_path.replace(".csv", "_error.json")
|
|
|
|
with open(file_path, "w") as outfile:
|
|
|
|
json.dump(status, outfile)
|
|
|
|
# save to s3
|
|
|
|
s3_key = os.path.join("feedback", directory_name.split("/")[-1], file_path.split("/")[-1])
|
|
|
|
if s3_client.s3_client is not None:
|
|
|
|
try:
|
|
|
|
# check if saved then delete local
|
|
|
|
s3_client.upload_file(file_path, s3_key)
|
|
|
|
os.remove(file_path)
|
|
|
|
except Exception as e:
|
|
|
|
logger.error(f"Unable to set S3: {e}")
|
|
|
|
print(f"Unable to set S3: {e}")
|
|
|
|
feedback_rq.save()
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
@app.task(name='do_pdf')
|
2023-12-05 05:59:06 +00:00
|
|
|
def process_pdf(rq_id, sub_id, p_type, user_id, files):
|
|
|
|
"""
|
2023-12-08 12:49:00 +00:00
|
|
|
files: [{
|
2024-01-17 09:28:50 +00:00
|
|
|
"index_in_request": int,
|
|
|
|
"idx": int,
|
2023-12-05 05:59:06 +00:00
|
|
|
"file_name": "",
|
|
|
|
"file_path": "", # local path to file
|
|
|
|
"file_type": ""
|
|
|
|
},]
|
|
|
|
"""
|
2023-12-27 11:19:17 +00:00
|
|
|
new_request = SubscriptionRequest.objects.filter(request_id=rq_id).first()
|
2023-11-30 11:19:06 +00:00
|
|
|
user = UserProfile.objects.filter(id=user_id).first()
|
2023-12-08 12:49:00 +00:00
|
|
|
new_request.pages = len(files)
|
|
|
|
new_request.pages_left = len(files)
|
|
|
|
|
2023-12-21 10:31:55 +00:00
|
|
|
def process_and_save_file(data):
|
|
|
|
idx, file = data
|
2023-12-05 05:59:06 +00:00
|
|
|
extension = file["file_name"].split(".")[-1].lower()
|
|
|
|
if extension == "pdf":
|
2024-01-17 09:28:50 +00:00
|
|
|
_b_urls = process_pdf_file(file["file_name"], file["file_path"], new_request, user, file["file_type"], file["index_in_request"])
|
2023-12-05 05:59:06 +00:00
|
|
|
if _b_urls is None:
|
2023-12-08 12:49:00 +00:00
|
|
|
new_request.status = 400
|
|
|
|
new_request.save()
|
2023-12-05 05:59:06 +00:00
|
|
|
raise FileContentInvalidException
|
2023-12-06 10:15:27 +00:00
|
|
|
for j in range(len(_b_urls)):
|
|
|
|
_b_urls[j]["doc_type"] = file["file_type"]
|
2023-12-21 10:31:55 +00:00
|
|
|
_b_urls[j]["page_number"] = idx
|
|
|
|
return idx, _b_urls[0]
|
2023-12-05 05:59:06 +00:00
|
|
|
elif extension in image_extensions:
|
2024-01-17 09:28:50 +00:00
|
|
|
this_url = ProcessUtil.process_image_local_file(file["file_name"], file["file_path"], new_request, user, file["file_type"], file["index_in_request"])[0]
|
2023-12-21 10:31:55 +00:00
|
|
|
this_url["page_number"] = idx
|
2023-12-05 05:59:06 +00:00
|
|
|
if file["file_type"]:
|
|
|
|
this_url["doc_type"] = file["file_type"]
|
2023-12-21 10:31:55 +00:00
|
|
|
return idx, this_url
|
|
|
|
|
|
|
|
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
|
|
|
|
start_time = time.time()
|
|
|
|
b_urls = [None] * len(files)
|
|
|
|
pool = ThreadPool(processes=2)
|
|
|
|
files_with_idx = [(idx, file) for idx, file in enumerate(files)]
|
|
|
|
for idx, url in pool.map(process_and_save_file, files_with_idx):
|
|
|
|
b_urls[idx] = url
|
2023-12-27 11:19:17 +00:00
|
|
|
preprocessing_time = time.time() - start_time
|
2023-11-30 11:19:06 +00:00
|
|
|
|
2023-12-08 12:49:00 +00:00
|
|
|
# TODO: send to queue with different request_ids
|
2023-12-21 10:31:55 +00:00
|
|
|
to_queue = []
|
2023-12-27 11:19:17 +00:00
|
|
|
ai_inference_start_time = time.time()
|
2023-12-08 12:49:00 +00:00
|
|
|
for i, b_url in enumerate(b_urls):
|
2023-12-27 11:19:17 +00:00
|
|
|
file_meta = {}
|
2023-12-08 12:49:00 +00:00
|
|
|
fractorized_request_id = rq_id + f"_sub_{i}"
|
2023-12-27 11:19:17 +00:00
|
|
|
file_meta["doc_type"] = b_url["doc_type"]
|
|
|
|
file_meta["ai_inference_start_time"] = ai_inference_start_time
|
|
|
|
file_meta["ai_inference_profile"] = {}
|
|
|
|
file_meta["index_in_request"] = i
|
|
|
|
file_meta["preprocessing_time"] = preprocessing_time
|
|
|
|
to_queue.append((fractorized_request_id, sub_id, [b_url], user_id, p_type, file_meta))
|
2023-12-26 14:05:35 +00:00
|
|
|
|
2023-12-21 10:31:55 +00:00
|
|
|
# Send to next queue
|
2023-12-27 11:19:17 +00:00
|
|
|
for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue:
|
|
|
|
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata)
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
@app.task(name='upload_file_to_s3')
|
2023-12-25 03:32:09 +00:00
|
|
|
def upload_file_to_s3(local_file_path, s3_key, request_id):
|
2023-11-30 11:19:06 +00:00
|
|
|
if s3_client.s3_client is not None:
|
2023-12-25 03:32:09 +00:00
|
|
|
try:
|
|
|
|
s3_client.upload_file(local_file_path, s3_key)
|
2023-12-25 11:48:50 +00:00
|
|
|
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)[0]
|
2023-12-25 03:32:09 +00:00
|
|
|
sub_request.S3_uploaded = True
|
|
|
|
sub_request.save()
|
|
|
|
except Exception as e:
|
2023-12-25 11:48:50 +00:00
|
|
|
logger.error(f"Unable to set S3: {e}")
|
|
|
|
print(f"Unable to set S3: {e}")
|
2023-12-25 03:32:09 +00:00
|
|
|
return
|
2023-11-30 11:19:06 +00:00
|
|
|
else:
|
2023-12-05 05:59:06 +00:00
|
|
|
logger.info(f"S3 is not available, skipping,...")
|
2023-11-30 11:19:06 +00:00
|
|
|
|
2024-01-09 12:41:17 +00:00
|
|
|
@app.task(name='upload_feedback_to_s3')
|
|
|
|
def upload_feedback_to_s3(local_file_path, s3_key, feedback_id):
|
|
|
|
if s3_client.s3_client is not None:
|
|
|
|
try:
|
|
|
|
s3_client.upload_file(local_file_path, s3_key)
|
|
|
|
feed_request = FeedbackRequest.objects.filter(feedback_id=feedback_id)[0]
|
|
|
|
feed_request.S3_uploaded = True
|
|
|
|
feed_request.save()
|
|
|
|
except Exception as e:
|
|
|
|
logger.error(f"Unable to set S3: {e}")
|
|
|
|
print(f"Unable to set S3: {e}")
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
logger.info(f"S3 is not available, skipping,...")
|
|
|
|
|
2024-02-01 07:32:20 +00:00
|
|
|
@app.task(name='upload_report_to_s3')
|
|
|
|
def upload_report_to_s3(local_file_path, s3_key, report_id):
|
|
|
|
if s3_client.s3_client is not None:
|
|
|
|
try:
|
|
|
|
s3_client.upload_file(local_file_path, s3_key)
|
|
|
|
report = Report.objects.filter(report_id=report_id)[0]
|
|
|
|
report.S3_uploaded = True
|
|
|
|
report.S3_file_name = s3_key
|
|
|
|
report.save()
|
|
|
|
except Exception as e:
|
|
|
|
logger.error(f"Unable to set S3: {e}")
|
|
|
|
print(f"Unable to set S3: {e}")
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
logger.info(f"S3 is not available, skipping,...")
|
|
|
|
|
2023-12-25 11:48:50 +00:00
|
|
|
@app.task(name='remove_local_file')
|
|
|
|
def remove_local_file(local_file_path, request_id):
|
|
|
|
print(f"[INFO] Removing local file: {local_file_path}, ...")
|
|
|
|
try:
|
|
|
|
os.remove(local_file_path)
|
|
|
|
except Exception as e:
|
|
|
|
logger.info(f"Unable to remove local file: {e}")
|
|
|
|
|
2023-11-30 11:19:06 +00:00
|
|
|
@app.task(name='upload_obj_to_s3')
|
|
|
|
def upload_obj_to_s3(byte_obj, s3_key):
|
|
|
|
if s3_client.s3_client is not None:
|
|
|
|
obj = base64.b64decode(byte_obj)
|
|
|
|
res = s3_client.update_object(s3_key, obj)
|
|
|
|
else:
|
2023-12-05 05:59:06 +00:00
|
|
|
logger.info(f"S3 is not available, skipping,...")
|