From cfc4eaabdf0a991c5d4687cafe98c72751ea0fc7 Mon Sep 17 00:00:00 2001 From: PhanThanhTrung Date: Tue, 29 Oct 2024 18:18:07 +0700 Subject: [PATCH] update --- cope2n-api/fwd/__init__.py | 3 ++ .../celery_worker/worker.py => fwd/celery.py} | 17 ++------- .../fwd_api/celery_worker/client_connector.py | 6 +-- .../fwd_api/celery_worker/internal_task.py | 5 +-- .../celery_worker/process_report_tasks.py | 38 ++++++++++--------- .../celery_worker/process_result_tasks.py | 5 +-- 6 files changed, 34 insertions(+), 40 deletions(-) rename cope2n-api/{fwd_api/celery_worker/worker.py => fwd/celery.py} (92%) diff --git a/cope2n-api/fwd/__init__.py b/cope2n-api/fwd/__init__.py index e69de29..9e0d95f 100755 --- a/cope2n-api/fwd/__init__.py +++ b/cope2n-api/fwd/__init__.py @@ -0,0 +1,3 @@ +from .celery import app as celery_app + +__all__ = ('celery_app',) \ No newline at end of file diff --git a/cope2n-api/fwd_api/celery_worker/worker.py b/cope2n-api/fwd/celery.py similarity index 92% rename from cope2n-api/fwd_api/celery_worker/worker.py rename to cope2n-api/fwd/celery.py index 5ee7b5f..89ebab5 100755 --- a/cope2n-api/fwd_api/celery_worker/worker.py +++ b/cope2n-api/fwd/celery.py @@ -18,7 +18,7 @@ from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from fwd import settings +from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings") django.setup() @@ -31,7 +31,7 @@ tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318") service_name = "sbt_celery_backend" @worker_process_init.connect(weak=False) -def init_celery_tracing(*args, **kwargs): +def init_celery_tracing(*args, **kwargs): CeleryInstrumentor().instrument() span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces") processor = BatchSpanProcessor(span_exporter=span_exporter) @@ -42,8 +42,7 @@ def init_celery_tracing(*args, **kwargs): trace_provider.add_span_processor(span_processor=processor) trace.set_tracer_provider(tracer_provider=trace_provider) - reader = PeriodicExportingMetricReader( - OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics")) + reader = PeriodicExportingMetricReader(OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics")) meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(meter_provider=meter_provider) @@ -104,12 +103,4 @@ app.conf.update({ 'make_a_report': {'queue': "report"}, 'make_a_report_2': {'queue': "report_2"}, } -}) - -if __name__ == "__main__": - argv = [ - 'worker', - '--loglevel=INFO', - '--pool=solo' # Window opts - ] - app.worker_main(argv) +}) \ No newline at end of file diff --git a/cope2n-api/fwd_api/celery_worker/client_connector.py b/cope2n-api/fwd_api/celery_worker/client_connector.py index 9280921..68ad51d 100755 --- a/cope2n-api/fwd_api/celery_worker/client_connector.py +++ b/cope2n-api/fwd_api/celery_worker/client_connector.py @@ -1,10 +1,10 @@ from celery import Celery - -from fwd import settings +from celery.utils.log import get_task_logger +from django.conf import settings from fwd_api.exception.exceptions import GeneralException from fwd_api.middleware.local_storage import get_current_trace_id from kombu.utils.uuid import uuid -from celery.utils.log import get_task_logger + logger = get_task_logger(__name__) diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index 7ab109f..5485c02 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -9,16 +9,15 @@ import uuid from multiprocessing.pool import ThreadPool from celery.utils.log import get_task_logger -from opentelemetry import trace - +from fwd import celery_app as app from fwd import settings from fwd_api.celery_worker.task_warpper import VerboseTask -from fwd_api.celery_worker.worker import app from fwd_api.constant.common import FileCategory from fwd_api.middleware.local_storage import get_current_trace_id from fwd_api.models import (FeedbackRequest, Report, SubscriptionRequest, SubscriptionRequestFile, UserProfile) from fwd_api.utils.accuracy import predict_result_to_ready +from opentelemetry import trace from ..constant.common import FolderFileType, image_extensions from ..exception.exceptions import FileContentInvalidException diff --git a/cope2n-api/fwd_api/celery_worker/process_report_tasks.py b/cope2n-api/fwd_api/celery_worker/process_report_tasks.py index ff8a510..14b0e82 100755 --- a/cope2n-api/fwd_api/celery_worker/process_report_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_report_tasks.py @@ -1,26 +1,28 @@ -import traceback - -from fwd_api.models import SubscriptionRequest, Report, ReportFile -from fwd_api.celery_worker.worker import app -from fwd_api.celery_worker.task_warpper import VerboseTask -from ..utils import s3 as S3Util -from ..utils.accuracy import (update_temp_accuracy, IterAvg, - count_transactions, extract_report_detail_list, calculate_a_request, - ReportAccumulateByRequest, create_billing_data) -from ..utils.file import dict2xlsx, save_workbook_file, save_report_to_S3, save_images_to_csv_briefly -from ..utils import time_stuff -from ..utils.redis import RedisUtils -from ..utils.cache import set_cache, get_cache -from django.utils import timezone -from django.db.models import Q -from itertools import chain -import json import copy +import json import os +import traceback +from itertools import chain -from opentelemetry import trace from celery.utils.log import get_task_logger +from django.db.models import Q +from django.utils import timezone +from fwd import celery_app as app from fwd import settings +from fwd_api.celery_worker.task_warpper import VerboseTask +from fwd_api.models import Report, ReportFile, SubscriptionRequest +from opentelemetry import trace + +from ..utils import s3 as S3Util +from ..utils import time_stuff +from ..utils.accuracy import (IterAvg, ReportAccumulateByRequest, + calculate_a_request, count_transactions, + create_billing_data, extract_report_detail_list, + update_temp_accuracy) +from ..utils.cache import get_cache, set_cache +from ..utils.file import (dict2xlsx, save_images_to_csv_briefly, + save_report_to_S3, save_workbook_file) +from ..utils.redis import RedisUtils redis_client = RedisUtils() diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index ad8d16d..ee25a89 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -5,15 +5,14 @@ import uuid from copy import deepcopy from celery.utils.log import get_task_logger -from opentelemetry import trace - +from fwd import celery_app as app from fwd_api.celery_worker.task_warpper import VerboseTask -from fwd_api.celery_worker.worker import app from fwd_api.constant.common import ProcessType from fwd_api.exception.exceptions import InvalidException from fwd_api.models import SubscriptionRequest, SubscriptionRequestFile from fwd_api.utils import process as ProcessUtil from fwd_api.utils.redis import RedisUtils +from opentelemetry import trace logger = get_task_logger(__name__) tracer = trace.get_tracer("sbt_celery_backend")