update opentelemetry for sbt backend

This commit is contained in:
PhanThanhTrung 2024-10-29 12:31:49 +07:00
parent 43cf82653c
commit 8899b9755a
6 changed files with 108 additions and 47 deletions

View File

@ -84,7 +84,7 @@ class CtelViewSet(viewsets.ViewSet):
file_path = FileUtils.resize_and_save_file(file_name, new_request, file_obj, 100) file_path = FileUtils.resize_and_save_file(file_name, new_request, file_obj, 100)
S3_path = FileUtils.save_to_S3(file_name, new_request, file_path) S3_path = FileUtils.save_to_S3(file_name, new_request, file_path)
files: [{ files =[{
"file_name": file_name, "file_name": file_name,
"file_path": file_path, # local path to file "file_path": file_path, # local path to file
"file_type": "" "file_type": ""

View File

@ -1,31 +1,34 @@
import time
import uuid
import os
import base64 import base64
import copy
import csv
import json
import os
import time
import traceback import traceback
import uuid
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from fwd_api.models import SubscriptionRequest, UserProfile from celery.utils.log import get_task_logger
from fwd_api.celery_worker.worker import app from opentelemetry import trace
from fwd import settings
from fwd_api.celery_worker.task_warpper import VerboseTask from fwd_api.celery_worker.task_warpper import VerboseTask
from fwd_api.celery_worker.worker import app
from fwd_api.constant.common import FileCategory
from fwd_api.middleware.local_storage import get_current_trace_id
from fwd_api.models import (FeedbackRequest, Report, SubscriptionRequest,
SubscriptionRequestFile, UserProfile)
from fwd_api.utils.accuracy import predict_result_to_ready
from ..constant.common import FolderFileType, image_extensions from ..constant.common import FolderFileType, image_extensions
from ..exception.exceptions import FileContentInvalidException from ..exception.exceptions import FileContentInvalidException
from fwd_api.models import SubscriptionRequestFile, FeedbackRequest, Report
from ..utils import file as FileUtils from ..utils import file as FileUtils
from ..utils import process as ProcessUtil from ..utils import process as ProcessUtil
from ..utils import s3 as S3Util from ..utils import s3 as S3Util
from ..utils.accuracy import validate_feedback_file from ..utils.accuracy import validate_feedback_file
from fwd_api.constant.common import FileCategory
from fwd_api.middleware.local_storage import get_current_trace_id
import csv
import json
import copy
from fwd_api.utils.accuracy import predict_result_to_ready
from celery.utils.log import get_task_logger
from fwd import settings
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
tracer = trace.get_tracer()
s3_client = S3Util.MinioS3Client( s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT, endpoint=settings.S3_ENDPOINT,
@ -34,6 +37,7 @@ s3_client = S3Util.MinioS3Client(
bucket_name=settings.S3_BUCKET_NAME bucket_name=settings.S3_BUCKET_NAME
) )
@tracer.start_as_current_span("process_pdf_file")
def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list: def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list:
try: try:
# Origin file # Origin file
@ -54,7 +58,7 @@ def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: st
request.save() request.save()
return None return None
@tracer.start_as_current_span("process_image_file")
def process_image_file(file_name: str, file_path, request, user) -> list: def process_image_file(file_name: str, file_path, request, user) -> list:
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request, request=request,
@ -67,7 +71,7 @@ def process_image_file(file_name: str, file_path, request, user) -> list:
'request_file_id': new_request_file.code 'request_file_id': new_request_file.code
}] }]
@app.task(base=VerboseTask, name="csv_feedback") @app.task(base=VerboseTask, name="csv_feedback", track_started=True)
def process_csv_feedback(csv_file_path, feedback_id): def process_csv_feedback(csv_file_path, feedback_id):
# load file to RAM # load file to RAM
status = {} status = {}
@ -163,7 +167,7 @@ def process_csv_feedback(csv_file_path, feedback_id):
logger.error(f"Unable to set S3: {e}") logger.error(f"Unable to set S3: {e}")
feedback_rq.save() feedback_rq.save()
@app.task(base=VerboseTask, name='do_pdf') @app.task(base=VerboseTask, name='do_pdf', track_started=True)
def process_pdf(rq_id, sub_id, p_type, user_id, files): def process_pdf(rq_id, sub_id, p_type, user_id, files):
""" """
files: [{ files: [{
@ -231,7 +235,7 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue: for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue:
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata) ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata)
@app.task(base=VerboseTask, name='upload_file_to_s3') @app.task(base=VerboseTask, name='upload_file_to_s3', track_started=True)
def upload_file_to_s3(local_file_path, s3_key, request_id): def upload_file_to_s3(local_file_path, s3_key, request_id):
if s3_client.s3_client is not None: if s3_client.s3_client is not None:
try: try:
@ -245,7 +249,7 @@ def upload_file_to_s3(local_file_path, s3_key, request_id):
else: else:
logger.info(f"S3 is not available, skipping,...") logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='upload_feedback_to_s3') @app.task(base=VerboseTask, name='upload_feedback_to_s3', track_started=True)
def upload_feedback_to_s3(local_file_path, s3_key, feedback_id): def upload_feedback_to_s3(local_file_path, s3_key, feedback_id):
if s3_client.s3_client is not None: if s3_client.s3_client is not None:
try: try:
@ -259,7 +263,7 @@ def upload_feedback_to_s3(local_file_path, s3_key, feedback_id):
else: else:
logger.info(f"S3 is not available, skipping,...") logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='upload_report_to_s3') @app.task(base=VerboseTask, name='upload_report_to_s3', track_started=True)
def upload_report_to_s3(local_file_path, s3_key, report_id, delay): def upload_report_to_s3(local_file_path, s3_key, report_id, delay):
if s3_client.s3_client is not None: if s3_client.s3_client is not None:
try: try:
@ -275,7 +279,7 @@ def upload_report_to_s3(local_file_path, s3_key, report_id, delay):
else: else:
logger.info(f"S3 is not available, skipping,...") logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='remove_local_file') @app.task(base=VerboseTask, name='remove_local_file', track_started=True)
def remove_local_file(local_file_path, request_id): def remove_local_file(local_file_path, request_id):
logger.info(f"Removing local file: {local_file_path}, ...") logger.info(f"Removing local file: {local_file_path}, ...")
try: try:
@ -283,7 +287,7 @@ def remove_local_file(local_file_path, request_id):
except Exception as e: except Exception as e:
logger.info(f"Unable to remove local file: {e}") logger.info(f"Unable to remove local file: {e}")
@app.task(base=VerboseTask, name='upload_obj_to_s3') @app.task(base=VerboseTask, name='upload_obj_to_s3', track_started=True)
def upload_obj_to_s3(byte_obj, s3_key): def upload_obj_to_s3(byte_obj, s3_key):
if s3_client.s3_client is not None: if s3_client.s3_client is not None:
obj = base64.b64decode(byte_obj) obj = base64.b64decode(byte_obj)

View File

@ -18,12 +18,15 @@ import json
import copy import copy
import os import os
from opentelemetry import trace
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from fwd import settings from fwd import settings
redis_client = RedisUtils() redis_client = RedisUtils()
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
tracer = trace.get_tracer()
s3_client = S3Util.MinioS3Client( s3_client = S3Util.MinioS3Client(
endpoint=settings.S3_ENDPOINT, endpoint=settings.S3_ENDPOINT,
@ -32,13 +35,14 @@ s3_client = S3Util.MinioS3Client(
bucket_name=settings.S3_BUCKET_NAME bucket_name=settings.S3_BUCKET_NAME
) )
@tracer.start_as_current_span("mean_list")
def mean_list(l): def mean_list(l):
l = [x for x in l if x is not None] l = [x for x in l if x is not None]
if len(l) == 0: if len(l) == 0:
return 0 return 0
return sum(l)/len(l) return sum(l)/len(l)
@app.task(base=VerboseTask, name='make_a_report_2') @app.task(base=VerboseTask, name='make_a_report_2', track_started=True)
def make_a_report_2(report_id, query_set): def make_a_report_2(report_id, query_set):
report_type = query_set.pop("report_type", "accuracy") report_type = query_set.pop("report_type", "accuracy")
if report_type == "accuracy": if report_type == "accuracy":
@ -48,7 +52,7 @@ def make_a_report_2(report_id, query_set):
else: else:
raise TypeError("Invalid report type") raise TypeError("Invalid report type")
@tracer.start_as_current_span("create_accuracy_report")
def create_accuracy_report(report_id, **kwargs): def create_accuracy_report(report_id, **kwargs):
try: try:
start_date = timezone.datetime.strptime(kwargs["start_date_str"], '%Y-%m-%dT%H:%M:%S%z') start_date = timezone.datetime.strptime(kwargs["start_date_str"], '%Y-%m-%dT%H:%M:%S%z')
@ -245,7 +249,7 @@ def create_accuracy_report(report_id, **kwargs):
traceback.print_exc() traceback.print_exc()
return 400 return 400
@tracer.start_as_current_span("create_billing_report")
def create_billing_report(report_id, **kwargs): def create_billing_report(report_id, **kwargs):
try: try:
start_date = timezone.datetime.strptime( start_date = timezone.datetime.strptime(

View File

@ -1,24 +1,26 @@
import traceback
import time
import uuid
import logging import logging
import time
import traceback
import uuid
from copy import deepcopy from copy import deepcopy
from celery.utils.log import get_task_logger
from opentelemetry import trace
from fwd_api.celery_worker.task_warpper import VerboseTask
from fwd_api.celery_worker.worker import app from fwd_api.celery_worker.worker import app
from fwd_api.models import SubscriptionRequest from fwd_api.constant.common import ProcessType
from fwd_api.exception.exceptions import InvalidException from fwd_api.exception.exceptions import InvalidException
from fwd_api.models import SubscriptionRequest, SubscriptionRequestFile from fwd_api.models import SubscriptionRequest, SubscriptionRequestFile
from fwd_api.constant.common import ProcessType
from fwd_api.utils.redis import RedisUtils
from fwd_api.utils import process as ProcessUtil from fwd_api.utils import process as ProcessUtil
from fwd_api.celery_worker.task_warpper import VerboseTask from fwd_api.utils.redis import RedisUtils
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__) logger = get_task_logger(__name__)
tracer = trace.get_tracer()
redis_client = RedisUtils() redis_client = RedisUtils()
@tracer.start_as_current_span("aggregate_result")
def aggregate_result(results): def aggregate_result(results):
sorted_results = [None] * len(results) sorted_results = [None] * len(results)
doc_types = [] doc_types = []
@ -58,16 +60,17 @@ def aggregate_result(results):
return des_result return des_result
@tracer.start_as_current_span("print_id")
def print_id(rq_id): def print_id(rq_id):
logger.info(" [x] Received {rq}".format(rq=rq_id)) logger.info(" [x] Received {rq}".format(rq=rq_id))
@tracer.start_as_current_span("to_status")
def to_status(result): def to_status(result):
if 'status' in result and result['status'] not in [200, 201, 202]: if 'status' in result and result['status'] not in [200, 201, 202]:
return 4 return 4
return 3 return 3
@tracer.start_as_current_span("update_user")
def update_user(rq: SubscriptionRequest): def update_user(rq: SubscriptionRequest):
sub = rq.subscription sub = rq.subscription
predict_status = rq.status predict_status = rq.status
@ -75,7 +78,7 @@ def update_user(rq: SubscriptionRequest):
sub.current_token += ProcessUtil.token_value(int(rq.process_type)) sub.current_token += ProcessUtil.token_value(int(rq.process_type))
sub.save() sub.save()
@app.task(base=VerboseTask, name='process_sap_invoice_result') @app.task(base=VerboseTask, name='process_sap_invoice_result', track_started=True)
def process_invoice_sap_result(rq_id, result): def process_invoice_sap_result(rq_id, result):
try: try:
rq: SubscriptionRequest = \ rq: SubscriptionRequest = \
@ -97,7 +100,7 @@ def process_invoice_sap_result(rq_id, result):
return "FailInvoice" return "FailInvoice"
@app.task(base=VerboseTask, name='process_fi_invoice_result') @app.task(base=VerboseTask, name='process_fi_invoice_result', track_started=True)
def process_invoice_fi_result(rq_id, result): def process_invoice_fi_result(rq_id, result):
try: try:
rq: SubscriptionRequest = \ rq: SubscriptionRequest = \
@ -118,7 +121,7 @@ def process_invoice_fi_result(rq_id, result):
traceback.print_exc() traceback.print_exc()
return "FailInvoice" return "FailInvoice"
@app.task(base=VerboseTask, name='process_manulife_invoice_result') @app.task(base=VerboseTask, name='process_manulife_invoice_result', track_started=True)
def process_invoice_manulife_result(rq_id, result): def process_invoice_manulife_result(rq_id, result):
try: try:
rq: SubscriptionRequest = \ rq: SubscriptionRequest = \
@ -141,7 +144,7 @@ def process_invoice_manulife_result(rq_id, result):
random_processor_name = None random_processor_name = None
@app.task(base=VerboseTask, name='process_sbt_invoice_result') @app.task(base=VerboseTask, name='process_sbt_invoice_result', track_started=True)
def process_invoice_sbt_result(rq_id, result, metadata): def process_invoice_sbt_result(rq_id, result, metadata):
global random_processor_name global random_processor_name
if random_processor_name is None: if random_processor_name is None:
@ -194,7 +197,7 @@ def process_invoice_sbt_result(rq_id, result, metadata):
rq.save() rq.save()
return "FailInvoice" return "FailInvoice"
@tracer.start_as_current_span("_update_subscription_rq_file")
def _update_subscription_rq_file(request_id, index_in_request, doc_type, result): def _update_subscription_rq_file(request_id, index_in_request, doc_type, result):
image = SubscriptionRequestFile.objects.filter(request=request_id, index_in_request=index_in_request, doc_type=doc_type).first() image = SubscriptionRequestFile.objects.filter(request=request_id, index_in_request=index_in_request, doc_type=doc_type).first()
retailer_name = None retailer_name = None
@ -234,6 +237,7 @@ def _update_subscription_rq_file(request_id, index_in_request, doc_type, result)
image.predict_result = _predict_result image.predict_result = _predict_result
image.save() image.save()
@tracer.start_as_current_span("__get_actual_predict_result")
def __get_actual_predict_result(result: dict): def __get_actual_predict_result(result: dict):
predicted_res = result.get('content', {}).get('document', []) predicted_res = result.get('content', {}).get('document', [])
if len(predicted_res)==0: if len(predicted_res)==0:

View File

@ -1,16 +1,51 @@
import copy
import os import os
import django import django
from celery import Celery from celery import Celery
from celery.signals import setup_logging # noqa from celery.signals import setup_logging, worker_process_init # noqa
from kombu import Queue from kombu import Exchange, Queue
import copy from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.metric_exporter import \
OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from fwd import settings from fwd import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings")
django.setup() django.setup()
env = environ.Env(
DEBUG=(bool, False)
)
tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318")
service_name = "sbt_celery_backend"
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces")
processor = BatchSpanProcessor(span_exporter=span_exporter)
attributes = {SERVICE_NAME: service_name}
resource = Resource(attributes=attributes)
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(span_processor=processor)
trace.set_tracer_provider(tracer_provider=trace_provider)
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics"))
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(meter_provider=meter_provider)
app: Celery = Celery( app: Celery = Celery(
'postman', 'postman',
broker=settings.BROKER_URL, broker=settings.BROKER_URL,
@ -24,6 +59,7 @@ app.autodiscover_tasks()
@setup_logging.connect @setup_logging.connect
def config_loggers(*args, **kwargs): def config_loggers(*args, **kwargs):
from logging.config import dictConfig # noqa from logging.config import dictConfig # noqa
from django.conf import settings # noqa from django.conf import settings # noqa
log_config = copy.deepcopy(settings.LOGGING) log_config = copy.deepcopy(settings.LOGGING)

View File

@ -60,3 +60,16 @@ openpyxl==3.1.2
# torch==1.13.1+cu116 # torch==1.13.1+cu116
# torchvision==0.14.1+cu116 # torchvision==0.14.1+cu116
# --extra-index-url https://download.pytorch.org/whl/cu116 # --extra-index-url https://download.pytorch.org/whl/cu116
opentelemetry-api==1.27.0
opentelemetry-distro==0.48b0
opentelemetry-exporter-otlp==1.27.0
opentelemetry-exporter-otlp-proto-common==1.27.0
opentelemetry-exporter-otlp-proto-http==1.27.0
opentelemetry-instrumentation==0.48b0
opentelemetry-instrumentation-celery==0.48b0
opentelemetry-instrumentation-django==0.48b0
opentelemetry-proto==1.27.0
opentelemetry-sdk==1.27.0
opentelemetry-semantic-conventions==0.48b0
opentelemetry-util-http==0.48b0