sbt-idp/cope2n-ai-fi/celery_worker/worker_fi.py
2024-10-29 11:07:30 +07:00

76 lines
2.8 KiB
Python
Executable File

import environ
from celery import Celery
from celery.signals import worker_process_init
from kombu import Exchange, Queue
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.metric_exporter import \
OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
env = environ.Env(
DEBUG=(bool, False)
)
debug = env.str("DEBUG", False)
tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318")
service_name = "sbt_celery_ai"
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces")
processor = BatchSpanProcessor(span_exporter=span_exporter)
attributes = {SERVICE_NAME: service_name}
resource = Resource(attributes=attributes)
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(span_processor=processor)
trace.set_tracer_provider(tracer_provider=trace_provider)
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics"))
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(meter_provider=meter_provider)
app: Celery = Celery(
"postman",
broker= env.str("CELERY_BROKER", "amqp://test:test@rabbitmq:5672"),
include=[
"celery_worker.mock_process_tasks_fi",
],
broker_transport_options={'confirm_publish': False},
)
task_exchange = Exchange("default", type="direct")
task_create_missing_queues = False
app.conf.update(
{
"result_expires": 3600,
"task_queues": [
Queue("invoice_fi"),
Queue("invoice_sap"),
Queue("invoice_manulife"),
Queue("invoice_sbt"),
],
"task_routes": {
'process_fi_invoice': {'queue': "invoice_fi"},
'process_fi_invoice_result': {'queue': 'invoice_fi_rs'},
'process_sap_invoice': {'queue': "invoice_sap"},
'process_sap_invoice_result': {'queue': 'invoice_sap_rs'},
'process_manulife_invoice': {'queue': 'invoice_manulife'},
'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'},
'process_sbt_invoice': {'queue': 'invoice_sbt'},
'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'},
},
}
)