2023-11-30 11:19:06 +00:00
|
|
|
from celery import Celery
|
|
|
|
|
|
|
|
from fwd import settings
|
|
|
|
from fwd_api.exception.exceptions import GeneralException
|
2024-07-05 13:14:47 +00:00
|
|
|
from fwd_api.middleware.local_storage import get_current_trace_id
|
2024-06-26 07:58:24 +00:00
|
|
|
from kombu.utils.uuid import uuid
|
|
|
|
from celery.utils.log import get_task_logger
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
def is_it_an_index(id):
|
|
|
|
ret = True
|
|
|
|
if not isinstance(id, str):
|
|
|
|
logger.info("NOT A STRING")
|
|
|
|
return False
|
|
|
|
if "/" in id:
|
|
|
|
logger.info("/ in ID")
|
|
|
|
return False
|
|
|
|
if id.count("_") > 5 or id.count("_") < 1:
|
|
|
|
_c = id.count("_")
|
|
|
|
logger.info(f"_ HAS {_c}")
|
|
|
|
return False
|
|
|
|
return ret
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
class CeleryConnector:
|
|
|
|
task_routes = {
|
|
|
|
# save result
|
|
|
|
'process_id_result': {'queue': 'id_card_rs'},
|
|
|
|
'process_driver_license_result': {'queue': "driver_license_rs"},
|
|
|
|
'process_invoice_result': {'queue': "invoice_rs"},
|
|
|
|
'process_ocr_with_box_result': {'queue': "ocr_with_box_rs"},
|
|
|
|
'process_template_matching_result': {'queue': "process_template_matching_rs"},
|
|
|
|
'process_sap_invoice': {'queue': "invoice_sap"},
|
|
|
|
'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'},
|
|
|
|
'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'},
|
|
|
|
|
|
|
|
# process task
|
|
|
|
'process_id': {'queue': 'id_card'},
|
|
|
|
'process_driver_license': {'queue': "driver_license"},
|
|
|
|
'process_invoice': {'queue': "invoice"},
|
|
|
|
'process_ocr_with_box': {'queue': "ocr_with_box"},
|
|
|
|
'process_template_matching': {'queue': 'template_matching'},
|
|
|
|
"process_alignment": {"queue": "alignment"},
|
|
|
|
'process_sap_invoice_result': {'queue': 'invoice_sap_rs'},
|
|
|
|
'process_fi_invoice_result': {'queue': 'invoice_fi_rs'},
|
|
|
|
'process_fi_invoice': {'queue': "invoice_fi"},
|
|
|
|
'process_manulife_invoice': {'queue': "invoice_manulife"},
|
|
|
|
'process_sbt_invoice': {'queue': "invoice_sbt"},
|
|
|
|
'do_pdf': {'queue': "do_pdf"},
|
2023-12-25 11:48:50 +00:00
|
|
|
'upload_file_to_s3': {'queue': "upload_file_to_s3"},
|
2024-01-09 12:41:17 +00:00
|
|
|
'upload_feedback_to_s3': {'queue': "upload_feedback_to_s3"},
|
2023-11-30 11:19:06 +00:00
|
|
|
'upload_obj_to_s3': {'queue': "upload_obj_to_s3"},
|
2024-02-01 07:32:20 +00:00
|
|
|
'upload_report_to_s3': {'queue': "upload_report_to_s3"},
|
2023-12-25 11:48:50 +00:00
|
|
|
'remove_local_file': {'queue': "remove_local_file"},
|
2024-01-09 12:41:17 +00:00
|
|
|
'csv_feedback': {'queue': "csv_feedback"},
|
2024-01-31 03:00:18 +00:00
|
|
|
'make_a_report': {'queue': "report"},
|
2024-02-05 05:56:51 +00:00
|
|
|
'make_a_report_2': {'queue': "report_2"},
|
|
|
|
|
2023-11-30 11:19:06 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
app = Celery(
|
|
|
|
'postman',
|
|
|
|
broker=settings.BROKER_URL,
|
2023-12-14 06:26:16 +00:00
|
|
|
broker_transport_options={'confirm_publish': False},
|
2024-01-31 03:00:18 +00:00
|
|
|
)
|
|
|
|
def make_a_report(self, args):
|
|
|
|
return self.send_task('make_a_report', args)
|
2024-02-05 05:56:51 +00:00
|
|
|
|
|
|
|
def make_a_report_2(self, args):
|
|
|
|
return self.send_task('make_a_report_2', args)
|
|
|
|
|
2024-01-09 12:41:17 +00:00
|
|
|
def csv_feedback(self, args):
|
|
|
|
return self.send_task('csv_feedback', args)
|
2023-11-30 11:19:06 +00:00
|
|
|
def do_pdf(self, args):
|
|
|
|
return self.send_task('do_pdf', args)
|
2024-02-02 06:00:30 +00:00
|
|
|
def upload_feedback_to_s3(self, args):
|
|
|
|
return self.send_task('upload_feedback_to_s3', args)
|
2024-02-01 07:32:20 +00:00
|
|
|
def upload_file_to_s3(self, args):
|
|
|
|
return self.send_task('upload_file_to_s3', args)
|
|
|
|
def upload_report_to_s3(self, args):
|
|
|
|
return self.send_task('upload_report_to_s3', args)
|
2023-11-30 11:19:06 +00:00
|
|
|
def upload_obj_to_s3(self, args):
|
|
|
|
return self.send_task('upload_obj_to_s3', args)
|
2023-12-25 11:48:50 +00:00
|
|
|
def remove_local_file(self, args):
|
2023-12-26 03:41:36 +00:00
|
|
|
return self.send_task('remove_local_file', args, countdown=280) # nearest execution of this task in 280 seconds
|
2024-02-06 03:14:44 +00:00
|
|
|
|
2023-11-30 11:19:06 +00:00
|
|
|
def process_fi(self, args):
|
|
|
|
return self.send_task('process_fi_invoice', args)
|
|
|
|
def process_fi_result(self, args):
|
|
|
|
return self.send_task('process_fi_invoice_result', args)
|
|
|
|
|
|
|
|
def process_id_result(self, args):
|
|
|
|
return self.send_task('process_id_result', args)
|
|
|
|
|
|
|
|
def process_driver_license_result(self, args):
|
|
|
|
return self.send_task('process_driver_license_result', args)
|
|
|
|
|
|
|
|
def process_invoice_result(self, args):
|
|
|
|
return self.send_task('process_invoice_result', args)
|
|
|
|
|
|
|
|
def process_ocr_with_box_result(self, args):
|
|
|
|
return self.send_task('process_ocr_with_box_result', args)
|
|
|
|
|
|
|
|
def process_id(self, args):
|
|
|
|
return self.send_task('process_id', args)
|
|
|
|
|
|
|
|
def process_driver_license(self, args):
|
|
|
|
return self.send_task('process_driver_license', args)
|
|
|
|
|
|
|
|
def process_invoice(self, args):
|
|
|
|
return self.send_task('process_invoice', args)
|
|
|
|
|
|
|
|
def process_ocr_with_box(self, args):
|
|
|
|
return self.send_task('process_ocr_with_box', args)
|
|
|
|
|
|
|
|
def process_template_matching(self, args):
|
|
|
|
return self.send_task('process_template_matching', args)
|
|
|
|
|
|
|
|
def process_invoice_sap(self, args):
|
|
|
|
return self.send_task('process_sap_invoice', args)
|
|
|
|
|
|
|
|
def process_invoice_manulife(self, args):
|
|
|
|
return self.send_task('process_manulife_invoice', args)
|
|
|
|
|
|
|
|
def process_invoice_sbt(self, args):
|
|
|
|
return self.send_task('process_sbt_invoice', args)
|
|
|
|
|
2023-12-25 11:48:50 +00:00
|
|
|
def send_task(self, name=None, args=None, countdown=None):
|
2023-11-30 11:19:06 +00:00
|
|
|
if name not in self.task_routes or 'queue' not in self.task_routes[name]:
|
|
|
|
raise GeneralException("System")
|
2024-07-05 13:14:47 +00:00
|
|
|
task_id = args[0] + "_" + uuid()[:4] if isinstance(args, tuple) and is_it_an_index(args[0]) else uuid()
|
|
|
|
trace_id = get_current_trace_id()
|
|
|
|
args += (trace_id,) # add trace_id to args then remove before start
|
2024-06-26 07:58:24 +00:00
|
|
|
logger.info(f"SEND task name: {name} - {task_id} | args: {args} | countdown: {countdown}")
|
|
|
|
return self.app.send_task(name, args, queue=self.task_routes[name]['queue'], expires=300, countdown=countdown, task_id=task_id)
|
2023-11-30 11:19:06 +00:00
|
|
|
|
2024-07-05 06:32:00 +00:00
|
|
|
c_connector = CeleryConnector()
|