sbt-idp/cope2n-api/fwd_api/celery_worker/client_connector.py
2024-02-05 12:56:51 +07:00

115 lines
4.7 KiB
Python
Executable File

from celery import Celery
from fwd import settings
from fwd_api.exception.exceptions import GeneralException
class CeleryConnector:
task_routes = {
# save result
'process_id_result': {'queue': 'id_card_rs'},
'process_driver_license_result': {'queue': "driver_license_rs"},
'process_invoice_result': {'queue': "invoice_rs"},
'process_ocr_with_box_result': {'queue': "ocr_with_box_rs"},
'process_template_matching_result': {'queue': "process_template_matching_rs"},
'process_sap_invoice': {'queue': "invoice_sap"},
'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'},
'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'},
# process task
'process_id': {'queue': 'id_card'},
'process_driver_license': {'queue': "driver_license"},
'process_invoice': {'queue': "invoice"},
'process_ocr_with_box': {'queue': "ocr_with_box"},
'process_template_matching': {'queue': 'template_matching'},
"process_alignment": {"queue": "alignment"},
'process_sap_invoice_result': {'queue': 'invoice_sap_rs'},
'process_fi_invoice_result': {'queue': 'invoice_fi_rs'},
'process_fi_invoice': {'queue': "invoice_fi"},
'process_manulife_invoice': {'queue': "invoice_manulife"},
'process_sbt_invoice': {'queue': "invoice_sbt"},
'do_pdf': {'queue': "do_pdf"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"},
'upload_feedback_to_s3': {'queue': "upload_feedback_to_s3"},
'upload_obj_to_s3': {'queue': "upload_obj_to_s3"},
'upload_report_to_s3': {'queue': "upload_report_to_s3"},
'remove_local_file': {'queue': "remove_local_file"},
'csv_feedback': {'queue': "csv_feedback"},
'make_a_report': {'queue': "report"},
'make_a_report_2': {'queue': "report_2"},
}
app = Celery(
'postman',
broker=settings.BROKER_URL,
broker_transport_options={'confirm_publish': False},
)
def make_a_report(self, args):
return self.send_task('make_a_report', args)
def make_a_report_2(self, args):
return self.send_task('make_a_report_2', args)
def csv_feedback(self, args):
return self.send_task('csv_feedback', args)
def do_pdf(self, args):
return self.send_task('do_pdf', args)
def upload_feedback_to_s3(self, args):
return self.send_task('upload_feedback_to_s3', args)
def upload_file_to_s3(self, args):
return self.send_task('upload_file_to_s3', args)
def upload_report_to_s3(self, args):
return self.send_task('upload_report_to_s3', args)
def upload_obj_to_s3(self, args):
return self.send_task('upload_obj_to_s3', args)
def remove_local_file(self, args):
return self.send_task('remove_local_file', args, countdown=280) # nearest execution of this task in 280 seconds
def process_fi(self, args):
return self.send_task('process_fi_invoice', args)
def process_fi_result(self, args):
return self.send_task('process_fi_invoice_result', args)
def process_id_result(self, args):
return self.send_task('process_id_result', args)
def process_driver_license_result(self, args):
return self.send_task('process_driver_license_result', args)
def process_invoice_result(self, args):
return self.send_task('process_invoice_result', args)
def process_ocr_with_box_result(self, args):
return self.send_task('process_ocr_with_box_result', args)
def process_id(self, args):
return self.send_task('process_id', args)
def process_driver_license(self, args):
return self.send_task('process_driver_license', args)
def process_invoice(self, args):
return self.send_task('process_invoice', args)
def process_ocr_with_box(self, args):
return self.send_task('process_ocr_with_box', args)
def process_template_matching(self, args):
return self.send_task('process_template_matching', args)
def process_invoice_sap(self, args):
return self.send_task('process_sap_invoice', args)
def process_invoice_manulife(self, args):
return self.send_task('process_manulife_invoice', args)
def process_invoice_sbt(self, args):
return self.send_task('process_sbt_invoice', args)
def send_task(self, name=None, args=None, countdown=None):
if name not in self.task_routes or 'queue' not in self.task_routes[name]:
raise GeneralException("System")
return self.app.send_task(name, args, queue=self.task_routes[name]['queue'], expires=300, countdown=countdown)
c_connector = CeleryConnector()