import environ from celery import Celery from celery.signals import worker_process_init from kombu import Exchange, Queue from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ OTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ OTLPSpanExporter from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor env = environ.Env( DEBUG=(bool, False) ) debug = env.str("DEBUG", False) tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318") service_name = "sbt_celery_ai" @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): CeleryInstrumentor().instrument() span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces") processor = BatchSpanProcessor(span_exporter=span_exporter) attributes = {SERVICE_NAME: service_name} resource = Resource(attributes=attributes) trace_provider = TracerProvider(resource=resource) trace_provider.add_span_processor(span_processor=processor) trace.set_tracer_provider(tracer_provider=trace_provider) reader = PeriodicExportingMetricReader( OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics")) meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(meter_provider=meter_provider) app: Celery = Celery( "postman", broker= env.str("CELERY_BROKER", "amqp://test:test@rabbitmq:5672"), include=[ "celery_worker.mock_process_tasks_fi", ], broker_transport_options={'confirm_publish': False}, ) task_exchange = Exchange("default", type="direct") task_create_missing_queues = False app.conf.update( { "result_expires": 3600, "task_queues": [ Queue("invoice_fi"), Queue("invoice_sap"), Queue("invoice_manulife"), Queue("invoice_sbt"), ], "task_routes": { 'process_fi_invoice': {'queue': "invoice_fi"}, 'process_fi_invoice_result': {'queue': 'invoice_fi_rs'}, 'process_sap_invoice': {'queue': "invoice_sap"}, 'process_sap_invoice_result': {'queue': 'invoice_sap_rs'}, 'process_manulife_invoice': {'queue': 'invoice_manulife'}, 'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'}, 'process_sbt_invoice': {'queue': 'invoice_sbt'}, 'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'}, }, } )