From 0f955cb03904a5d3d78d464e70e27641505b3cfd Mon Sep 17 00:00:00 2001 From: PhanThanhTrung Date: Thu, 31 Oct 2024 13:06:44 +0700 Subject: [PATCH] update --- cope2n-api/fwd/celery.py | 1 + cope2n-api/fwd/opentelemetry_config.py | 9 +- cope2n-api/fwd/settings.py | 6 +- cope2n-api/fwd_api/api/ctel_view.py | 132 ++++++++++-------- .../fwd_api/celery_worker/internal_task.py | 4 +- .../celery_worker/process_report_tasks.py | 5 +- cope2n-api/fwd_api/utils/file.py | 76 +++++++--- cope2n-api/fwd_api/utils/process.py | 33 ++++- 8 files changed, 176 insertions(+), 90 deletions(-) diff --git a/cope2n-api/fwd/celery.py b/cope2n-api/fwd/celery.py index 89ebab5..581f1ae 100755 --- a/cope2n-api/fwd/celery.py +++ b/cope2n-api/fwd/celery.py @@ -12,6 +12,7 @@ from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ OTLPSpanExporter from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.instrumentation.django import DjangoInstrumentor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import SERVICE_NAME, Resource diff --git a/cope2n-api/fwd/opentelemetry_config.py b/cope2n-api/fwd/opentelemetry_config.py index 70e50e5..bf4df82 100644 --- a/cope2n-api/fwd/opentelemetry_config.py +++ b/cope2n-api/fwd/opentelemetry_config.py @@ -4,6 +4,7 @@ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \ from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ OTLPMetricExporter from opentelemetry.instrumentation.django import DjangoInstrumentor +from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.resources import SERVICE_NAME, Resource @@ -12,10 +13,13 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor def setup_tracing(tracer_endpoint, service_name): + # Instrument Django + DjangoInstrumentor().instrument() + CeleryInstrumentor().instrument() # Set up a tracer provider span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces") processor = BatchSpanProcessor(span_exporter=span_exporter) - + attributes = {SERVICE_NAME: service_name} resource = Resource(attributes=attributes) trace_provider = TracerProvider(resource=resource) @@ -27,5 +31,4 @@ def setup_tracing(tracer_endpoint, service_name): meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) metrics.set_meter_provider(meter_provider=meter_provider) - # Instrument Django - DjangoInstrumentor().instrument() \ No newline at end of file + \ No newline at end of file diff --git a/cope2n-api/fwd/settings.py b/cope2n-api/fwd/settings.py index fe23d17..4848bcb 100755 --- a/cope2n-api/fwd/settings.py +++ b/cope2n-api/fwd/settings.py @@ -23,9 +23,9 @@ env = environ.Env( DEBUG=(bool, False) ) -tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318") -service_name = "sbt_celery_backend" -setup_tracing(tracer_endpoint=tracer_endpoint, service_name=service_name) +TRACER_ENDPOINT = env.str("tracer_endpoint", "http://jaeger_collector:4317") +SERVICE_NAME = "sbt_django_backend" +setup_tracing(tracer_endpoint=TRACER_ENDPOINT, service_name=SERVICE_NAME) DEBUG = False environ.Env.read_env(os.path.join(BASE_DIR, '.env')) diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index c76fad3..cb9a0ad 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -1,32 +1,42 @@ +import logging import time import uuid -from wsgiref.util import FileWrapper from datetime import datetime +from multiprocessing.pool import ThreadPool +from typing import List +from wsgiref.util import FileWrapper from django.core.files.uploadedfile import TemporaryUploadedFile from django.http import HttpResponse, JsonResponse from drf_spectacular.utils import extend_schema +from fwd import settings +from opentelemetry import trace from rest_framework import status, viewsets from rest_framework.decorators import action -from rest_framework.response import Response -from typing import List from rest_framework.renderers import JSONRenderer +from rest_framework.response import Response from rest_framework_xml.renderers import XMLRenderer -from multiprocessing.pool import ThreadPool -from fwd import settings -from ..celery_worker.client_connector import c_connector from ..annotation.api import throw_on_failure -from ..constant.common import ProcessType, REQUEST_ID, FOLDER_TYPE, EntityStatus, pdf_extensions, allowed_file_extensions, image_extensions, standard_ocr_list -from ..exception.exceptions import RequiredFieldException, InvalidException, NotFoundException, \ - PermissionDeniedException, LockedEntityException, FileContentInvalidException, ServiceTimeoutException -from ..models import SubscriptionRequest, SubscriptionRequestFile, OcrTemplate, FeedbackRequest +from ..celery_worker.client_connector import c_connector +from ..constant.common import (FOLDER_TYPE, REQUEST_ID, EntityStatus, + ProcessType, allowed_file_extensions, + image_extensions, pdf_extensions, + standard_ocr_list) +from ..exception.exceptions import (FileContentInvalidException, + InvalidException, LockedEntityException, + NotFoundException, + PermissionDeniedException, + RequiredFieldException, + ServiceTimeoutException) +from ..models import (FeedbackRequest, OcrTemplate, SubscriptionRequest, + SubscriptionRequestFile) from ..response.ReportSerializer import ReportSerializer from ..utils import file as FileUtils from ..utils import process as ProcessUtil -import logging logger = logging.getLogger(__name__) +tracer = trace.get_tracer("sbt_django_backend") class CtelViewSet(viewsets.ViewSet): lookup_field = "username" @@ -183,6 +193,7 @@ class CtelViewSet(viewsets.ViewSet): return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id}) + @tracer.start_as_current_span("process_sync") @extend_schema(request={ 'multipart/form-data': { 'type': 'object', @@ -246,19 +257,22 @@ class CtelViewSet(viewsets.ViewSet): total_page = len(doc_files_with_type) p_type = validated_data['type'] - new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, - pages_left=total_page, - process_type=p_type, status=1, request_id=rq_id, - provider_code=provider_code, - subscription=sub, - redemption_id=validated_data["redemption_ID"], - subsidiary=validated_data["subsidiary"], - is_test_request=is_test_request) - new_request.save() + with tracer.start_as_current_span("create_and_save_record_in_db"): + new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, + pages_left=total_page, + process_type=p_type, status=1, request_id=rq_id, + provider_code=provider_code, + subscription=sub, + redemption_id=validated_data["redemption_ID"], + subsidiary=validated_data["subsidiary"], + is_test_request=is_test_request) + new_request.save() # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible compact_files = [None] * len(doc_files_with_type) pool = ThreadPool(processes=2) + + @tracer.start_as_current_span("process_sync.process_file") def process_file(data): idx, doc_type, doc_file, tmp_file_name = data doc_file.seek(0) @@ -272,50 +286,54 @@ class CtelViewSet(viewsets.ViewSet): "file_path": file_path, "file_type": doc_type } - for result in pool.map(process_file, doc_files_with_type): - compact_files[result["idx"]] = result + + with tracer.start_as_current_span("process_file_with_multi_thread"): + for result in pool.map(process_file, doc_files_with_type): + compact_files[result["idx"]] = result # Send to AI queue c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) - time_limit = 120 - start_time = time.time() - while True: - current_time = time.time() - waiting_time = current_time - start_time - if waiting_time > time_limit: - break - time.sleep(0.1) - report_filter = SubscriptionRequest.objects.filter(request_id=rq_id) - if report_filter.count() != 1: - raise InvalidException(excArgs='requestId') + + with tracer.start_as_current_span("backend_waiting_for_result"): + time_limit = 120 + start_time = time.time() + while True: + current_time = time.time() + waiting_time = current_time - start_time + if waiting_time > time_limit: + break + time.sleep(0.1) + report_filter = SubscriptionRequest.objects.filter(request_id=rq_id) + if report_filter.count() != 1: + raise InvalidException(excArgs='requestId') - if user_info.current_sub.id != report_filter[0].subscription.id: - raise InvalidException(excArgs="user") - if int(report_filter[0].process_type) == ProcessType.FI_INVOICE.value: - data = report_filter[0].predict_result - xml_as_string = "" - - if data and 'content' in data and 'combine_results' in data['content'] and 'xml' in data['content']['combine_results']: - xml_as_string = data['content']['combine_results']['xml'] - xml_as_string = xml_as_string.replace("\n", "").replace("\\", "") - return HttpResponse(xml_as_string,content_type="text/xml") + if user_info.current_sub.id != report_filter[0].subscription.id: + raise InvalidException(excArgs="user") + if int(report_filter[0].process_type) == ProcessType.FI_INVOICE.value: + data = report_filter[0].predict_result + xml_as_string = "" + + if data and 'content' in data and 'combine_results' in data['content'] and 'xml' in data['content']['combine_results']: + xml_as_string = data['content']['combine_results']['xml'] + xml_as_string = xml_as_string.replace("\n", "").replace("\\", "") + return HttpResponse(xml_as_string,content_type="text/xml") - serializer: ReportSerializer = ReportSerializer(data=report_filter, many=True) - serializer.is_valid() - if report_filter[0].status == 400: - raise FileContentInvalidException() - if report_filter[0].status == 100: # continue, only return when result is fullfilled - continue - if len(serializer.data) == 0: - continue - if serializer.data[0].get("data", None) is None: - continue - if serializer.data[0]["data"].get("status", 200) != 200: - continue + serializer: ReportSerializer = ReportSerializer(data=report_filter, many=True) + serializer.is_valid() + if report_filter[0].status == 400: + raise FileContentInvalidException() + if report_filter[0].status == 100: # continue, only return when result is fullfilled + continue + if len(serializer.data) == 0: + continue + if serializer.data[0].get("data", None) is None: + continue + if serializer.data[0]["data"].get("status", 200) != 200: + continue - serializer.data[0]["request_id"] = rq_id - return Response(status=status.HTTP_200_OK, data=serializer.data[0]) + serializer.data[0]["request_id"] = rq_id + return Response(status=status.HTTP_200_OK, data=serializer.data[0]) raise ServiceTimeoutException(excArgs=f"{rq_id}") @extend_schema(request={ diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index 5485c02..eed9903 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -234,7 +234,7 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue: ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata) -@app.task(base=VerboseTask, name='upload_file_to_s3', track_started=True) +@app.task(base=VerboseTask, name='upload_file_to_s3', track_started=False) def upload_file_to_s3(local_file_path, s3_key, request_id): if s3_client.s3_client is not None: try: @@ -278,7 +278,7 @@ def upload_report_to_s3(local_file_path, s3_key, report_id, delay): else: logger.info(f"S3 is not available, skipping,...") -@app.task(base=VerboseTask, name='remove_local_file', track_started=True) +@app.task(base=VerboseTask, name='remove_local_file', track_started=False) def remove_local_file(local_file_path, request_id): logger.info(f"Removing local file: {local_file_path}, ...") try: diff --git a/cope2n-api/fwd_api/celery_worker/process_report_tasks.py b/cope2n-api/fwd_api/celery_worker/process_report_tasks.py index 14b0e82..8fbc2d9 100755 --- a/cope2n-api/fwd_api/celery_worker/process_report_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_report_tasks.py @@ -10,16 +10,15 @@ from django.utils import timezone from fwd import celery_app as app from fwd import settings from fwd_api.celery_worker.task_warpper import VerboseTask -from fwd_api.models import Report, ReportFile, SubscriptionRequest +from fwd_api.models import Report, SubscriptionRequest from opentelemetry import trace from ..utils import s3 as S3Util -from ..utils import time_stuff from ..utils.accuracy import (IterAvg, ReportAccumulateByRequest, calculate_a_request, count_transactions, create_billing_data, extract_report_detail_list, update_temp_accuracy) -from ..utils.cache import get_cache, set_cache +from ..utils.cache import set_cache from ..utils.file import (dict2xlsx, save_images_to_csv_briefly, save_report_to_S3, save_workbook_file) from ..utils.redis import RedisUtils diff --git a/cope2n-api/fwd_api/utils/file.py b/cope2n-api/fwd_api/utils/file.py index 44cdc34..c465861 100755 --- a/cope2n-api/fwd_api/utils/file.py +++ b/cope2n-api/fwd_api/utils/file.py @@ -1,31 +1,40 @@ +import csv import io -import os -import traceback -import pathlib import json - -from PIL import Image, ExifTags -from django.core.files.uploadedfile import TemporaryUploadedFile -from django.utils import timezone +import logging +import os +import pathlib +import traceback from datetime import datetime +import imagesize +from django.core.files.uploadedfile import TemporaryUploadedFile +from django.utils import timezone from fwd import settings -from ..utils import s3 as S3Util from fwd_api.constant.common import allowed_file_extensions -from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \ - ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException, RequiredColumnException -from fwd_api.models import SubscriptionRequest, OcrTemplate, FeedbackRequest, SubscriptionRequestFile, Report, ReportFile +from fwd_api.exception.exceptions import (FileFormatInvalidException, + GeneralException, + InvalidDecompressedSizeException, + InvalidException, + LimitReachedException, + RequiredColumnException, + RequiredFieldException, + ServiceUnavailableException) +from fwd_api.models import (FeedbackRequest, OcrTemplate, Report, + SubscriptionRequest, SubscriptionRequestFile) from fwd_api.utils import process as ProcessUtil from fwd_api.utils.crypto import image_authenticator from fwd_api.utils.image import resize -from ..celery_worker.client_connector import c_connector -import imagesize -import csv from openpyxl import load_workbook -from openpyxl.styles import Font, Border, Side, PatternFill, NamedStyle, numbers, Alignment -import logging +from openpyxl.styles import Alignment, Border, Font, PatternFill, Side +from opentelemetry import trace +from PIL import ExifTags, Image + +from ..celery_worker.client_connector import c_connector +from ..utils import s3 as S3Util logger = logging.getLogger(__name__) +tracer = trace.get_tracer("sbt_django_backend") s3_client = S3Util.MinioS3Client( endpoint=settings.S3_ENDPOINT, @@ -34,6 +43,7 @@ s3_client = S3Util.MinioS3Client( bucket_name=settings.S3_BUCKET_NAME ) +@tracer.start_as_current_span("convert_date_string") def convert_date_string(date_string): # Parse the input date string date_format = "%Y-%m-%d %H:%M:%S.%f %z" @@ -44,6 +54,7 @@ def convert_date_string(date_string): return formatted_date +@tracer.start_as_current_span("validate_report_list") def validate_report_list(request): start_date_str = request.GET.get('start_date') end_date_str = request.GET.get('end_date') @@ -68,6 +79,7 @@ def validate_report_list(request): raise RequiredFieldException(excArgs="report_id, start_date, end_date") return validated_data +@tracer.start_as_current_span("validate_feedback_file") def validate_feedback_file(csv_file_path): required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli'] missing_columns = [] @@ -83,6 +95,7 @@ def validate_feedback_file(csv_file_path): if missing_columns: raise RequiredColumnException(excArgs=str(missing_columns)) +@tracer.start_as_current_span("validate_review") def validate_review(review, num_imei): for field in settings.FIELD: if not field in review.keys(): @@ -90,6 +103,7 @@ def validate_review(review, num_imei): if not isinstance(review["imei_number"], list) or len(review["imei_number"]) != num_imei: raise InvalidException(excArgs=f'imei_number') +@tracer.start_as_current_span("validate_list_file") def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"): total_file_size = 0 if len(files) < min_file_num: @@ -109,6 +123,7 @@ def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUES if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST: raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) +@tracer.start_as_current_span("validate_csv_feedback") def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"): total_file_size = 0 if len(files) < min_file_num: @@ -128,6 +143,7 @@ def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST: raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) +# @tracer.start_as_current_span("get_file") def get_file(file_path: str): try: return open(file_path, 'rb') @@ -136,6 +152,7 @@ def get_file(file_path: str): raise GeneralException("System") +@tracer.start_as_current_span("get_template_folder_path") def get_template_folder_path(tem: OcrTemplate): tem_id = str(tem.id) sub_id = str(tem.subscription.id) @@ -156,6 +173,7 @@ def get_folder_path(rq: SubscriptionRequest): return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id) +@tracer.start_as_current_span("save_byte_file") def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) @@ -168,6 +186,7 @@ def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes): return file_path +@tracer.start_as_current_span("save_file") def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) @@ -183,6 +202,7 @@ def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFi return file_path +@tracer.start_as_current_span("save_json_file") def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict): folder_path = get_folder_path(rq) is_exist = os.path.exists(folder_path) @@ -194,6 +214,7 @@ def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict): json.dump(data, json_file) return file_path +@tracer.start_as_current_span("save_feedback_file") def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict): user_id = str(rq.subscription.user.id) feedback_id = str(rq.id) @@ -209,6 +230,7 @@ def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict) csvfile.write(file_contents) return file_path +@tracer.start_as_current_span("save_workbook_file") def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""): report_id = str(rp.report_id) @@ -222,6 +244,7 @@ def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""): workbook.save(file_path) return file_path +@tracer.start_as_current_span("delete_file_with_path") def delete_file_with_path(file_path: str) -> bool: try: os.remove(file_path) @@ -231,6 +254,7 @@ def delete_file_with_path(file_path: str) -> bool: return False +@tracer.start_as_current_span("save_template_file") def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality): try: folder_path = get_template_folder_path(rq) @@ -258,6 +282,7 @@ def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, fo raise ServiceUnavailableException() return file_path +@tracer.start_as_current_span("save_images_to_csv_briefly") def save_images_to_csv_briefly(id, image_filenames): # columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"] columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"] @@ -290,6 +315,7 @@ def save_images_to_csv_briefly(id, image_filenames): # save to S3 save_report_to_S3(id, file_path) +@tracer.start_as_current_span("resize_and_save_file") def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality: int): try: folder_path = get_folder_path(rq) @@ -310,6 +336,7 @@ def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: Temporar logger.error(f"{e}") raise ServiceUnavailableException() +@tracer.start_as_current_span("save_to_S3") def save_to_S3(file_name, rq, local_file_path): try: file_path = get_folder_path(rq) @@ -323,6 +350,7 @@ def save_to_S3(file_name, rq, local_file_path): logger.error(f"{e}") raise ServiceUnavailableException() +@tracer.start_as_current_span("save_feedback_to_S3") def save_feedback_to_S3(file_name, id, local_file_path): try: assert len(local_file_path.split("/")) >= 3, "file_path must have at least feedback_folder and feedback_id" @@ -335,6 +363,7 @@ def save_feedback_to_S3(file_name, id, local_file_path): logger.error(f"{e}") raise ServiceUnavailableException() +@tracer.start_as_current_span("save_report_to_S3") def save_report_to_S3(id, local_file_path, delay=0): try: s3_key = os.path.join("report", local_file_path.split("/")[-2], local_file_path.split("/")[-1]) @@ -345,9 +374,11 @@ def save_report_to_S3(id, local_file_path, delay=0): logger.error(f"{e}") raise ServiceUnavailableException() +@tracer.start_as_current_span("download_from_S3") def download_from_S3(s3_key, local_file_path): s3_client.download_file(s3_key, local_file_path) +@tracer.start_as_current_span("save_file_with_path") def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path): try: file_path = os.path.join(folder_path, file_name) @@ -364,13 +395,14 @@ def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, fo raise ServiceUnavailableException() return file_path +@tracer.start_as_current_span("save_pdf") def save_pdf(file_path: str, file: TemporaryUploadedFile): f = open(file_path, 'wb+') for chunk in file.chunks(): f.write(chunk) f.close() - +@tracer.start_as_current_span("save_img") def save_img(file_path: str, file: TemporaryUploadedFile, quality): with open(file.temporary_file_path(), "rb") as fs: input_file = io.BytesIO(fs.read()) @@ -407,6 +439,7 @@ def save_img(file_path: str, file: TemporaryUploadedFile, quality): image = image.convert('RGB') image.save(file_path, optimize=True, quality=quality) +@tracer.start_as_current_span("build_media_url") def build_media_url(folder: str, uid: str, file_name: str = None) -> str: token = image_authenticator.generate_img_token() if not file_name: @@ -420,6 +453,7 @@ def build_media_url(folder: str, uid: str, file_name: str = None) -> str: token=token) +# @tracer.start_as_current_span("build_url") def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str: token = image_authenticator.generate_img_token(user_id) if not file_name: @@ -431,13 +465,17 @@ def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> file_name=file_name, base_url=settings.BASE_URL, token=token) + +@tracer.start_as_current_span("build_media_url_v2") def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str: token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id) return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}' +@tracer.start_as_current_span("build_S3_url") def build_S3_url(s3_key, exp_time): return s3_client.create_url_with_expiration(s3_key, exp_time) +@tracer.start_as_current_span("get_value") def get_value(_dict, keys): keys = keys.split('.') value = _dict @@ -459,6 +497,7 @@ def get_value(_dict, keys): return value +@tracer.start_as_current_span("dict2xlsx") def dict2xlsx(input: json, _type='report'): if _type == 'report': wb = dump_excel_report(input=input) @@ -468,6 +507,7 @@ def dict2xlsx(input: json, _type='report'): wb = dump_excel_billing_report(input=input) return wb +@tracer.start_as_current_span("dump_excel_report") def dump_excel_report(input: json): red = "FF0000" black = "000000" @@ -537,6 +577,7 @@ def dump_excel_report(input: json): start_index += 1 return wb +@tracer.start_as_current_span("dump_excel_report_detail") def dump_excel_report_detail(input: json): red = "FF0000" black = "000000" @@ -597,6 +638,7 @@ def dump_excel_report_detail(input: json): start_index += 1 return wb +@tracer.start_as_current_span("dump_excel_billing_report") def dump_excel_billing_report(input: json): black = "000000" font_black = Font(name="Calibri", size=11, color=black) diff --git a/cope2n-api/fwd_api/utils/process.py b/cope2n-api/fwd_api/utils/process.py index 483d573..3502907 100644 --- a/cope2n-api/fwd_api/utils/process.py +++ b/cope2n-api/fwd_api/utils/process.py @@ -26,13 +26,13 @@ from ..models import UserProfile, OcrTemplate, OcrTemplateBox, \ Subscription, SubscriptionRequestFile, SubscriptionRequest from ..celery_worker.client_connector import c_connector import logging - +from opentelemetry import trace logger = logging.getLogger(__name__) +tracer = trace.get_tracer("sbt_django_backend") class UserData: user: UserProfile = None current_sub: Subscription = None - def __init__(self, request): user_data = validate_user_request_and_get(request) users = UserProfile.objects.filter(sync_id=user_data['id']) @@ -64,11 +64,11 @@ class UserData: self.user = user self.current_sub = sub - +@tracer.start_as_current_span("get_user") def get_user(request) -> UserData: return UserData(request) - +@tracer.start_as_current_span("validate_user_request_and_get") def validate_user_request_and_get(request): if not hasattr(request, 'user_data'): raise NotFoundException(excArgs='user') @@ -79,7 +79,7 @@ def validate_user_request_and_get(request): raise NotFoundException(excArgs='subscription') return data - +@tracer.start_as_current_span("validate_ocr_request_and_get") def validate_ocr_request_and_get(request, subscription): validated_data = {} if "processType" not in request.data or request.data['processType'] is None \ @@ -109,6 +109,7 @@ def validate_ocr_request_and_get(request, subscription): return validated_data +@tracer.start_as_current_span("sbt_validate_ocr_request_and_get") def sbt_validate_ocr_request_and_get(request, subscription): validated_data = {} # if "processType" not in request.data or request.data['processType'] is None \ @@ -154,6 +155,7 @@ def sbt_validate_ocr_request_and_get(request, subscription): return validated_data +@tracer.start_as_current_span("string_to_boolean") def string_to_boolean(value): true_strings = ['true', 'yes', '1', 'on'] false_strings = ['false', 'no', '0', 'off'] @@ -165,6 +167,7 @@ def string_to_boolean(value): else: return False +@tracer.start_as_current_span("sbt_validate_feedback") def sbt_validate_feedback(request): validated_data = {} @@ -194,6 +197,7 @@ def sbt_validate_feedback(request): return validated_data +@tracer.start_as_current_span("count_pages_in_pdf") def count_pages_in_pdf(pdf_file): count = 0 fh, temp_filename = tempfile.mkstemp() # make a tmp file @@ -207,6 +211,7 @@ def count_pages_in_pdf(pdf_file): return count +@tracer.start_as_current_span("count_pages_in_pdf_list") def count_pages_in_pdf_list(list_file): total_page = 0 @@ -216,6 +221,7 @@ def count_pages_in_pdf_list(list_file): return total_page +# @tracer.start_as_current_span("map_process_type_to_folder_name") def map_process_type_to_folder_name(p_type): if p_type == ProcessType.ID_CARD.value: return 'id_card' @@ -239,6 +245,7 @@ def map_process_type_to_folder_name(p_type): raise InvalidException(excArgs='processType') +@tracer.start_as_current_span("get_random_string") def get_random_string(length): # choose from all lowercase letter letters = string.ascii_lowercase @@ -247,6 +254,7 @@ def get_random_string(length): return result_str +@tracer.start_as_current_span("is_int") def is_int(text) -> bool: try: # converting to integer @@ -256,6 +264,7 @@ def is_int(text) -> bool: return False +@tracer.start_as_current_span("validate_box") def validate_box(list_box, max_number_of_box, max_number_of_item_in_a_box, number_of_box=None): if len(list_box) > max_number_of_box: raise NumberOfBoxLimitReachedException(excArgs=LIST_BOX_MESSAGE) @@ -268,6 +277,7 @@ def validate_box(list_box, max_number_of_box, max_number_of_item_in_a_box, numbe raise InvalidException(excArgs="box coordinates") +@tracer.start_as_current_span("to_box_list") def to_box_list(str_list): ls = [] if not str_list: @@ -280,6 +290,7 @@ def to_box_list(str_list): return ls +@tracer.start_as_current_span("validate_json_response_and_return") def validate_json_response_and_return(res): if res.status_code != status.HTTP_200_OK: raise ServiceUnavailableException() @@ -290,6 +301,7 @@ def validate_json_response_and_return(res): return res_data +@tracer.start_as_current_span("is_duplicate_in_list") def is_duplicate_in_list(str_list): unique_set: set = set({}) for label in str_list: @@ -300,16 +312,19 @@ def is_duplicate_in_list(str_list): return False +@tracer.start_as_current_span("validate_duplicate") def validate_duplicate(list_box): if is_duplicate_in_list(list_box): raise DuplicateEntityException(excArgs="box_label") +@tracer.start_as_current_span("validate_vn_and_space") def validate_vn_and_space(txt: str): if not pattern.fullmatch(txt.upper()): raise InvalidException(excArgs=NAME_MESSAGE) +@tracer.start_as_current_span("save_template_boxs") def save_template_boxs(data, template): saving_list = [] for d_box in data['data_boxs']: @@ -329,6 +344,7 @@ def token_value(token_type): return 5 return 1 # Basic OCR +@tracer.start_as_current_span("send_to_queue2") def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}): try: if typez == ProcessType.ID_CARD.value: @@ -346,6 +362,7 @@ def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}): logger.error(e) raise BadGatewayException() +@tracer.start_as_current_span("build_template_matching_data") def build_template_matching_data(template): temp_dict = { @@ -372,6 +389,7 @@ def build_template_matching_data(template): return temp_dict +@tracer.start_as_current_span("send_template_queue") def send_template_queue(rq_id, file_url, template: OcrTemplate, uid): try: @@ -383,9 +401,11 @@ def send_template_queue(rq_id, file_url, template: OcrTemplate, uid): logger.error(e) raise BadGatewayException() +@tracer.start_as_current_span("process_feedback") def process_feedback(feedback_id, local_file_path): c_connector.csv_feedback((local_file_path, feedback_id)) +@tracer.start_as_current_span("process_pdf_file") def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list: doc: fitz.Document = fitz.open(stream=file_obj.file.read()) if doc.page_count > settings.MAX_PAGES_OF_PDF_FILE: @@ -406,6 +426,7 @@ def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: S # Sub-file return pdf_to_images_urls(doc, request, user) +@tracer.start_as_current_span("process_image_file") def process_image_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list: if file_obj.size > settings.SIZE_TO_COMPRESS: quality = 95 @@ -425,6 +446,7 @@ def process_image_file(file_name: str, file_obj: TemporaryUploadedFile, request: 'request_file_id': new_request_file.code }] +# @tracer.start_as_current_span("process_image_local_file") def process_image_local_file(file_name: str, file_path: str, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list: new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, request=request, @@ -439,6 +461,7 @@ def process_image_local_file(file_name: str, file_path: str, request: Subscripti 'request_file_id': new_request_file.code }] +@tracer.start_as_current_span("pdf_to_images_urls") def pdf_to_images_urls(doc_path, request: SubscriptionRequest, user, dpi: int = 300) -> list: pdf_extracted = [] saving_path = FileUtils.get_folder_path(request)