from celery import Celery from fwd import settings from fwd_api.exception.exceptions import GeneralException from fwd_api.middleware.local_storage import get_current_trace_id from kombu.utils.uuid import uuid from celery.utils.log import get_task_logger logger = get_task_logger(__name__) def is_it_an_index(id): ret = True if not isinstance(id, str): logger.info("NOT A STRING") return False if "/" in id: logger.info("/ in ID") return False if id.count("_") > 5 or id.count("_") < 1: _c = id.count("_") logger.info(f"_ HAS {_c}") return False return ret class CeleryConnector: task_routes = { # save result 'process_id_result': {'queue': 'id_card_rs'}, 'process_driver_license_result': {'queue': "driver_license_rs"}, 'process_invoice_result': {'queue': "invoice_rs"}, 'process_ocr_with_box_result': {'queue': "ocr_with_box_rs"}, 'process_template_matching_result': {'queue': "process_template_matching_rs"}, 'process_sap_invoice': {'queue': "invoice_sap"}, 'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'}, 'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'}, # process task 'process_id': {'queue': 'id_card'}, 'process_driver_license': {'queue': "driver_license"}, 'process_invoice': {'queue': "invoice"}, 'process_ocr_with_box': {'queue': "ocr_with_box"}, 'process_template_matching': {'queue': 'template_matching'}, "process_alignment": {"queue": "alignment"}, 'process_sap_invoice_result': {'queue': 'invoice_sap_rs'}, 'process_fi_invoice_result': {'queue': 'invoice_fi_rs'}, 'process_fi_invoice': {'queue': "invoice_fi"}, 'process_manulife_invoice': {'queue': "invoice_manulife"}, 'process_sbt_invoice': {'queue': "invoice_sbt"}, 'do_pdf': {'queue': "do_pdf"}, 'upload_file_to_s3': {'queue': "upload_file_to_s3"}, 'upload_feedback_to_s3': {'queue': "upload_feedback_to_s3"}, 'upload_obj_to_s3': {'queue': "upload_obj_to_s3"}, 'upload_report_to_s3': {'queue': "upload_report_to_s3"}, 'remove_local_file': {'queue': "remove_local_file"}, 'csv_feedback': {'queue': "csv_feedback"}, 'make_a_report': {'queue': "report"}, 'make_a_report_2': {'queue': "report_2"}, } app = Celery( 'postman', broker=settings.BROKER_URL, broker_transport_options={'confirm_publish': False}, ) def make_a_report(self, args): return self.send_task('make_a_report', args) def make_a_report_2(self, args): return self.send_task('make_a_report_2', args) def csv_feedback(self, args): return self.send_task('csv_feedback', args) def do_pdf(self, args): return self.send_task('do_pdf', args) def upload_feedback_to_s3(self, args): return self.send_task('upload_feedback_to_s3', args) def upload_file_to_s3(self, args): return self.send_task('upload_file_to_s3', args) def upload_report_to_s3(self, args): return self.send_task('upload_report_to_s3', args) def upload_obj_to_s3(self, args): return self.send_task('upload_obj_to_s3', args) def remove_local_file(self, args): return self.send_task('remove_local_file', args, countdown=280) # nearest execution of this task in 280 seconds def process_fi(self, args): return self.send_task('process_fi_invoice', args) def process_fi_result(self, args): return self.send_task('process_fi_invoice_result', args) def process_id_result(self, args): return self.send_task('process_id_result', args) def process_driver_license_result(self, args): return self.send_task('process_driver_license_result', args) def process_invoice_result(self, args): return self.send_task('process_invoice_result', args) def process_ocr_with_box_result(self, args): return self.send_task('process_ocr_with_box_result', args) def process_id(self, args): return self.send_task('process_id', args) def process_driver_license(self, args): return self.send_task('process_driver_license', args) def process_invoice(self, args): return self.send_task('process_invoice', args) def process_ocr_with_box(self, args): return self.send_task('process_ocr_with_box', args) def process_template_matching(self, args): return self.send_task('process_template_matching', args) def process_invoice_sap(self, args): return self.send_task('process_sap_invoice', args) def process_invoice_manulife(self, args): return self.send_task('process_manulife_invoice', args) def process_invoice_sbt(self, args): return self.send_task('process_sbt_invoice', args) def send_task(self, name=None, args=None, countdown=None): if name not in self.task_routes or 'queue' not in self.task_routes[name]: raise GeneralException("System") task_id = args[0] + "_" + uuid()[:4] if isinstance(args, tuple) and is_it_an_index(args[0]) else uuid() trace_id = get_current_trace_id() args += (trace_id,) # add trace_id to args then remove before start logger.info(f"SEND task name: {name} - {task_id} | args: {args} | countdown: {countdown}") return self.app.send_task(name, args, queue=self.task_routes[name]['queue'], expires=300, countdown=countdown, task_id=task_id) c_connector = CeleryConnector()