106 lines
4.1 KiB
Python
Executable File
106 lines
4.1 KiB
Python
Executable File
import environ
|
|
import copy
|
|
import os
|
|
|
|
import django
|
|
from celery import Celery
|
|
from celery.signals import setup_logging, worker_process_init # noqa
|
|
from kombu import Exchange, Queue
|
|
from opentelemetry import metrics, trace
|
|
from opentelemetry.exporter.otlp.proto.http.metric_exporter import \
|
|
OTLPMetricExporter
|
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
|
|
OTLPSpanExporter
|
|
from opentelemetry.instrumentation.celery import CeleryInstrumentor
|
|
from opentelemetry.sdk.metrics import MeterProvider
|
|
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
|
from opentelemetry.sdk.trace import TracerProvider
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
|
|
from django.conf import settings
|
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings")
|
|
django.setup()
|
|
|
|
env = environ.Env(
|
|
DEBUG=(bool, False)
|
|
)
|
|
|
|
tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318")
|
|
service_name = "sbt_celery_backend"
|
|
|
|
@worker_process_init.connect(weak=False)
|
|
def init_celery_tracing(*args, **kwargs):
|
|
CeleryInstrumentor().instrument()
|
|
span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces")
|
|
processor = BatchSpanProcessor(span_exporter=span_exporter)
|
|
|
|
attributes = {SERVICE_NAME: service_name}
|
|
resource = Resource(attributes=attributes)
|
|
trace_provider = TracerProvider(resource=resource)
|
|
trace_provider.add_span_processor(span_processor=processor)
|
|
trace.set_tracer_provider(tracer_provider=trace_provider)
|
|
|
|
reader = PeriodicExportingMetricReader(OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics"))
|
|
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
|
|
metrics.set_meter_provider(meter_provider=meter_provider)
|
|
|
|
app: Celery = Celery(
|
|
'postman',
|
|
broker=settings.BROKER_URL,
|
|
include=['fwd_api.celery_worker.process_result_tasks', 'fwd_api.celery_worker.internal_task', 'fwd_api.celery_worker.process_report_tasks'],
|
|
broker_transport_options={'confirm_publish': False},
|
|
)
|
|
|
|
app.config_from_object("django.conf:settings", namespace="CELERY")
|
|
app.autodiscover_tasks()
|
|
|
|
@setup_logging.connect
|
|
def config_loggers(*args, **kwargs):
|
|
from logging.config import dictConfig # noqa
|
|
|
|
from django.conf import settings # noqa
|
|
|
|
log_config = copy.deepcopy(settings.LOGGING)
|
|
if log_config.get("handlers", {}).get("file", {}).get("filename", None):
|
|
log_config["handlers"]["file"]["filename"] = log_config["handlers"]["file"]["filename"].replace(".log", "_celery.log")
|
|
dictConfig(log_config)
|
|
|
|
app.conf.update({
|
|
'task_queues':
|
|
[
|
|
Queue('invoice_sap_rs'),
|
|
Queue('invoice_fi_rs'),
|
|
Queue('invoice_manulife_rs'),
|
|
Queue('invoice_sbt_rs'),
|
|
Queue('do_pdf'),
|
|
Queue('upload_file_to_s3'),
|
|
Queue('upload_feedback_to_s3'),
|
|
Queue('upload_obj_to_s3'),
|
|
Queue('upload_report_to_s3'),
|
|
Queue('remove_local_file'),
|
|
Queue('csv_feedback'),
|
|
Queue('report'),
|
|
Queue('report_2'),
|
|
],
|
|
'task_routes': {
|
|
'process_sap_invoice_result': {'queue': 'invoice_sap_rs'},
|
|
'process_sap_invoice': {'queue': "invoice_sap"},
|
|
'process_fi_invoice_result': {'queue': 'invoice_fi_rs'},
|
|
'process_fi_invoice': {'queue': "invoice_fi"},
|
|
'process_manulife_invoice_result': {'queue': 'invoice_manulife_rs'},
|
|
'process_manulife_invoice': {'queue': "invoice_manulife"},
|
|
'process_sbt_invoice_result': {'queue': 'invoice_sbt_rs'},
|
|
'process_sbt_invoice': {'queue': "invoice_sbt"},
|
|
'do_pdf': {'queue': "do_pdf"},
|
|
'upload_file_to_s3': {'queue': "upload_file_to_s3"},
|
|
'upload_feedback_to_s3': {'queue': "upload_feedback_to_s3"},
|
|
'upload_obj_to_s3': {'queue': "upload_obj_to_s3"},
|
|
'upload_report_to_s3': {'queue': "upload_report_to_s3"},
|
|
'remove_local_file': {'queue': "remove_local_file"},
|
|
'csv_feedback': {'queue': "csv_feedback"},
|
|
'make_a_report': {'queue': "report"},
|
|
'make_a_report_2': {'queue': "report_2"},
|
|
}
|
|
}) |