from celery import Celery import environ env = environ.Env( DEBUG=(bool, False) ) class CeleryConnector: task_routes = { 'process_fi_invoice_result': {'queue': 'invoice_fi_rs'}, 'process_sap_invoice_result': {'queue': 'invoice_sap_rs'}, 'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'}, 'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'}, # mock task 'process_fi_invoice': {'queue': "invoice_fi"}, 'process_sap_invoice': {'queue': "invoice_sap"}, 'process_manulife_invoice': {'queue': "invoice_manulife"}, 'process_sbt_invoice': {'queue': "invoice_sbt"}, } app = Celery( "postman", broker= env.str("CELERY_BROKER", "amqp://test:test@rabbitmq:5672"), broker_transport_options={'confirm_publish': False}, ) # mock task for FI def process_fi_invoice_result(self, args): return self.send_task("process_fi_invoice_result", args) def process_fi_invoice(self, args): return self.send_task("process_fi_invoice", args) # mock task for SAP def process_sap_invoice_result(self, args): return self.send_task("process_sap_invoice_result", args) def process_sap_invoice(self, args): return self.send_task("process_sap_invoice", args) # mock task for manulife def process_manulife_invoice_result(self, args): return self.send_task("process_manulife_invoice_result", args) def process_manulife_invoice(self, args): return self.send_task("process_manulife_invoice", args) # mock task for manulife def process_sbt_invoice_result(self, args): return self.send_task("process_sbt_invoice_result", args) def process_sbt_invoice(self, args): return self.send_task("process_sbt_invoice", args) def send_task(self, name=None, args=None): if name not in self.task_routes or "queue" not in self.task_routes[name]: return self.app.send_task(name, args) return self.app.send_task(name, args, queue=self.task_routes[name]["queue"])