import environ import copy import os import django from celery import Celery from celery.signals import setup_logging, worker_process_init # noqa from kombu import Exchange, Queue from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ OTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ OTLPSpanExporter from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.instrumentation.django import DjangoInstrumentor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings") django.setup() env = environ.Env( DEBUG=(bool, False) ) tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318") service_name = "sbt_celery_backend" @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): CeleryInstrumentor().instrument() span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces") processor = BatchSpanProcessor(span_exporter=span_exporter) attributes = {SERVICE_NAME: service_name} resource = Resource(attributes=attributes) trace_provider = TracerProvider(resource=resource) trace_provider.add_span_processor(span_processor=processor) trace.set_tracer_provider(tracer_provider=trace_provider) reader = PeriodicExportingMetricReader(OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics")) meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(meter_provider=meter_provider) app: Celery = Celery( 'postman', broker=settings.BROKER_URL, include=['fwd_api.celery_worker.process_result_tasks', 'fwd_api.celery_worker.internal_task', 'fwd_api.celery_worker.process_report_tasks'], broker_transport_options={'confirm_publish': False}, ) app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() @setup_logging.connect def config_loggers(*args, **kwargs): from logging.config import dictConfig # noqa from django.conf import settings # noqa log_config = copy.deepcopy(settings.LOGGING) if log_config.get("handlers", {}).get("file", {}).get("filename", None): log_config["handlers"]["file"]["filename"] = log_config["handlers"]["file"]["filename"].replace(".log", "_celery.log") dictConfig(log_config) app.conf.update({ 'task_queues': [ Queue('invoice_sap_rs'), Queue('invoice_fi_rs'), Queue('invoice_manulife_rs'), Queue('invoice_sbt_rs'), Queue('do_pdf'), Queue('upload_file_to_s3'), Queue('upload_feedback_to_s3'), Queue('upload_obj_to_s3'), Queue('upload_report_to_s3'), Queue('remove_local_file'), Queue('csv_feedback'), Queue('report'), Queue('report_2'), ], 'task_routes': { 'process_sap_invoice_result': {'queue': 'invoice_sap_rs'}, 'process_sap_invoice': {'queue': "invoice_sap"}, 'process_fi_invoice_result': {'queue': 'invoice_fi_rs'}, 'process_fi_invoice': {'queue': "invoice_fi"}, 'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'}, 'process_manulife_invoice': {'queue': "invoice_manulife"}, 'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'}, 'process_sbt_invoice': {'queue': "invoice_sbt"}, 'do_pdf': {'queue': "do_pdf"}, 'upload_file_to_s3': {'queue': "upload_file_to_s3"}, 'upload_feedback_to_s3': {'queue': "upload_feedback_to_s3"}, 'upload_obj_to_s3': {'queue': "upload_obj_to_s3"}, 'upload_report_to_s3': {'queue': "upload_report_to_s3"}, 'remove_local_file': {'queue': "remove_local_file"}, 'csv_feedback': {'queue': "csv_feedback"}, 'make_a_report': {'queue': "report"}, 'make_a_report_2': {'queue': "report_2"}, } })