Compare commits
10 Commits
3a64bbd955
...
3dd2440afa
Author | SHA1 | Date | |
---|---|---|---|
|
3dd2440afa | ||
|
0f955cb039 | ||
|
cfc4eaabdf | ||
|
57a0adf8de | ||
|
2e183360b0 | ||
|
5bcbc257de | ||
|
17a00c3595 | ||
|
90f959223c | ||
|
a5bd63df91 | ||
|
bb48329fee |
@ -36,4 +36,3 @@ ENV PYTHONPATH="."
|
|||||||
ENV TZ="Asia/Ho_Chi_Minh"
|
ENV TZ="Asia/Ho_Chi_Minh"
|
||||||
|
|
||||||
CMD [ "sh", "run.sh"]
|
CMD [ "sh", "run.sh"]
|
||||||
# ENTRYPOINT [ "sleep", "infinity" ]
|
|
@ -23,7 +23,7 @@ from utils.logging.logging import LOGGER_CONFIG
|
|||||||
logging.config.dictConfig(LOGGER_CONFIG)
|
logging.config.dictConfig(LOGGER_CONFIG)
|
||||||
# Get the logger
|
# Get the logger
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
tracer = trace.get_tracer()
|
tracer = trace.get_tracer("sbt_celery_ai")
|
||||||
|
|
||||||
logger.info("OCR engine configfs: \n", ocr_cfg)
|
logger.info("OCR engine configfs: \n", ocr_cfg)
|
||||||
logger.info("KVU configfs: \n", kvu_cfg)
|
logger.info("KVU configfs: \n", kvu_cfg)
|
||||||
|
@ -14,7 +14,7 @@ from .task_warpper import VerboseTask
|
|||||||
logging.config.dictConfig(LOGGER_CONFIG)
|
logging.config.dictConfig(LOGGER_CONFIG)
|
||||||
# Get the logger
|
# Get the logger
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
tracer = trace.get_tracer()
|
tracer = trace.get_tracer("sbt_celery_ai")
|
||||||
|
|
||||||
@app.task(base=VerboseTask,name="process_sbt_invoice", track_started=True)
|
@app.task(base=VerboseTask,name="process_sbt_invoice", track_started=True)
|
||||||
def process_sbt_invoice(rq_id, list_url, metadata):
|
def process_sbt_invoice(rq_id, list_url, metadata):
|
||||||
|
@ -11,7 +11,7 @@ from utils.logging.logging import LOGGER_CONFIG
|
|||||||
|
|
||||||
logging.config.dictConfig(LOGGER_CONFIG)
|
logging.config.dictConfig(LOGGER_CONFIG)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
tracer = trace.get_tracer()
|
tracer = trace.get_tracer("sbt_celery_ai")
|
||||||
|
|
||||||
os.environ['PYTHONPATH'] = '/home/thucpd/thucpd/cope2n-ai/cope2n-ai/'
|
os.environ['PYTHONPATH'] = '/home/thucpd/thucpd/cope2n-ai/cope2n-ai/'
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ import json
|
|||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
from opentelemetry import trace
|
from opentelemetry import trace
|
||||||
|
|
||||||
tracer = trace.get_tracer()
|
tracer = trace.get_tracer("sbt_celery_ai")
|
||||||
|
|
||||||
@tracer.start_as_current_span("longestCommonSubsequence")
|
@tracer.start_as_current_span("longestCommonSubsequence")
|
||||||
def longestCommonSubsequence(text1: str, text2: str) -> int:
|
def longestCommonSubsequence(text1: str, text2: str) -> int:
|
||||||
|
@ -0,0 +1,3 @@
|
|||||||
|
from .celery import app as celery_app
|
||||||
|
|
||||||
|
__all__ = ('celery_app',)
|
@ -1,3 +1,4 @@
|
|||||||
|
import environ
|
||||||
import copy
|
import copy
|
||||||
import os
|
import os
|
||||||
|
|
||||||
@ -11,13 +12,14 @@ from opentelemetry.exporter.otlp.proto.http.metric_exporter import \
|
|||||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
|
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
|
||||||
OTLPSpanExporter
|
OTLPSpanExporter
|
||||||
from opentelemetry.instrumentation.celery import CeleryInstrumentor
|
from opentelemetry.instrumentation.celery import CeleryInstrumentor
|
||||||
|
from opentelemetry.instrumentation.django import DjangoInstrumentor
|
||||||
from opentelemetry.sdk.metrics import MeterProvider
|
from opentelemetry.sdk.metrics import MeterProvider
|
||||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||||
from opentelemetry.sdk.trace import TracerProvider
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
|
|
||||||
from fwd import settings
|
from django.conf import settings
|
||||||
|
|
||||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings")
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings")
|
||||||
django.setup()
|
django.setup()
|
||||||
@ -41,8 +43,7 @@ def init_celery_tracing(*args, **kwargs):
|
|||||||
trace_provider.add_span_processor(span_processor=processor)
|
trace_provider.add_span_processor(span_processor=processor)
|
||||||
trace.set_tracer_provider(tracer_provider=trace_provider)
|
trace.set_tracer_provider(tracer_provider=trace_provider)
|
||||||
|
|
||||||
reader = PeriodicExportingMetricReader(
|
reader = PeriodicExportingMetricReader(OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics"))
|
||||||
OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics"))
|
|
||||||
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
|
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
|
||||||
metrics.set_meter_provider(meter_provider=meter_provider)
|
metrics.set_meter_provider(meter_provider=meter_provider)
|
||||||
|
|
||||||
@ -104,11 +105,3 @@ app.conf.update({
|
|||||||
'make_a_report_2': {'queue': "report_2"},
|
'make_a_report_2': {'queue': "report_2"},
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
argv = [
|
|
||||||
'worker',
|
|
||||||
'--loglevel=INFO',
|
|
||||||
'--pool=solo' # Window opts
|
|
||||||
]
|
|
||||||
app.worker_main(argv)
|
|
34
cope2n-api/fwd/opentelemetry_config.py
Normal file
34
cope2n-api/fwd/opentelemetry_config.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
from opentelemetry import metrics, trace
|
||||||
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import \
|
||||||
|
OTLPSpanExporter
|
||||||
|
from opentelemetry.exporter.otlp.proto.http.metric_exporter import \
|
||||||
|
OTLPMetricExporter
|
||||||
|
from opentelemetry.instrumentation.django import DjangoInstrumentor
|
||||||
|
from opentelemetry.instrumentation.celery import CeleryInstrumentor
|
||||||
|
from opentelemetry.sdk.metrics import MeterProvider
|
||||||
|
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||||
|
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
|
||||||
|
from opentelemetry.sdk.trace import TracerProvider
|
||||||
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||||
|
|
||||||
|
|
||||||
|
def setup_tracing(tracer_endpoint, service_name):
|
||||||
|
# Instrument Django
|
||||||
|
DjangoInstrumentor().instrument()
|
||||||
|
CeleryInstrumentor().instrument()
|
||||||
|
# Set up a tracer provider
|
||||||
|
span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces")
|
||||||
|
processor = BatchSpanProcessor(span_exporter=span_exporter)
|
||||||
|
|
||||||
|
attributes = {SERVICE_NAME: service_name}
|
||||||
|
resource = Resource(attributes=attributes)
|
||||||
|
trace_provider = TracerProvider(resource=resource)
|
||||||
|
trace_provider.add_span_processor(span_processor=processor)
|
||||||
|
trace.set_tracer_provider(tracer_provider=trace_provider)
|
||||||
|
|
||||||
|
reader = PeriodicExportingMetricReader(
|
||||||
|
OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics"))
|
||||||
|
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
|
||||||
|
metrics.set_meter_provider(meter_provider=meter_provider)
|
||||||
|
|
||||||
|
|
@ -14,7 +14,7 @@ from pathlib import Path
|
|||||||
import environ
|
import environ
|
||||||
from django.urls import reverse_lazy
|
from django.urls import reverse_lazy
|
||||||
from fwd_api.middleware.logging_request_response_middleware import TraceIDLogFilter
|
from fwd_api.middleware.logging_request_response_middleware import TraceIDLogFilter
|
||||||
|
from .opentelemetry_config import setup_tracing
|
||||||
# Build paths inside the project like this: BASE_DIR / 'subdir'.
|
# Build paths inside the project like this: BASE_DIR / 'subdir'.
|
||||||
|
|
||||||
BASE_DIR = Path(__file__).resolve().parent.parent
|
BASE_DIR = Path(__file__).resolve().parent.parent
|
||||||
@ -22,6 +22,11 @@ BASE_DIR = Path(__file__).resolve().parent.parent
|
|||||||
env = environ.Env(
|
env = environ.Env(
|
||||||
DEBUG=(bool, False)
|
DEBUG=(bool, False)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
TRACER_ENDPOINT = env.str("tracer_endpoint", "http://jaeger_collector:4317")
|
||||||
|
SERVICE_NAME = "sbt_django_backend"
|
||||||
|
setup_tracing(tracer_endpoint=TRACER_ENDPOINT, service_name=SERVICE_NAME)
|
||||||
|
|
||||||
DEBUG = False
|
DEBUG = False
|
||||||
environ.Env.read_env(os.path.join(BASE_DIR, '.env'))
|
environ.Env.read_env(os.path.join(BASE_DIR, '.env'))
|
||||||
ALLOWED_HOSTS = env.list("ALLOWED_HOSTS", default=['*'] + ['107.120.{}.{}'.format(i, j) for i in range(256) for j in range(256)])
|
ALLOWED_HOSTS = env.list("ALLOWED_HOSTS", default=['*'] + ['107.120.{}.{}'.format(i, j) for i in range(256) for j in range(256)])
|
||||||
|
@ -1,32 +1,42 @@
|
|||||||
|
import logging
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from wsgiref.util import FileWrapper
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from multiprocessing.pool import ThreadPool
|
||||||
|
from typing import List
|
||||||
|
from wsgiref.util import FileWrapper
|
||||||
|
|
||||||
from django.core.files.uploadedfile import TemporaryUploadedFile
|
from django.core.files.uploadedfile import TemporaryUploadedFile
|
||||||
from django.http import HttpResponse, JsonResponse
|
from django.http import HttpResponse, JsonResponse
|
||||||
from drf_spectacular.utils import extend_schema
|
from drf_spectacular.utils import extend_schema
|
||||||
|
from fwd import settings
|
||||||
|
from opentelemetry import trace
|
||||||
from rest_framework import status, viewsets
|
from rest_framework import status, viewsets
|
||||||
from rest_framework.decorators import action
|
from rest_framework.decorators import action
|
||||||
from rest_framework.response import Response
|
|
||||||
from typing import List
|
|
||||||
from rest_framework.renderers import JSONRenderer
|
from rest_framework.renderers import JSONRenderer
|
||||||
|
from rest_framework.response import Response
|
||||||
from rest_framework_xml.renderers import XMLRenderer
|
from rest_framework_xml.renderers import XMLRenderer
|
||||||
from multiprocessing.pool import ThreadPool
|
|
||||||
|
|
||||||
from fwd import settings
|
|
||||||
from ..celery_worker.client_connector import c_connector
|
|
||||||
from ..annotation.api import throw_on_failure
|
from ..annotation.api import throw_on_failure
|
||||||
from ..constant.common import ProcessType, REQUEST_ID, FOLDER_TYPE, EntityStatus, pdf_extensions, allowed_file_extensions, image_extensions, standard_ocr_list
|
from ..celery_worker.client_connector import c_connector
|
||||||
from ..exception.exceptions import RequiredFieldException, InvalidException, NotFoundException, \
|
from ..constant.common import (FOLDER_TYPE, REQUEST_ID, EntityStatus,
|
||||||
PermissionDeniedException, LockedEntityException, FileContentInvalidException, ServiceTimeoutException
|
ProcessType, allowed_file_extensions,
|
||||||
from ..models import SubscriptionRequest, SubscriptionRequestFile, OcrTemplate, FeedbackRequest
|
image_extensions, pdf_extensions,
|
||||||
|
standard_ocr_list)
|
||||||
|
from ..exception.exceptions import (FileContentInvalidException,
|
||||||
|
InvalidException, LockedEntityException,
|
||||||
|
NotFoundException,
|
||||||
|
PermissionDeniedException,
|
||||||
|
RequiredFieldException,
|
||||||
|
ServiceTimeoutException)
|
||||||
|
from ..models import (FeedbackRequest, OcrTemplate, SubscriptionRequest,
|
||||||
|
SubscriptionRequestFile)
|
||||||
from ..response.ReportSerializer import ReportSerializer
|
from ..response.ReportSerializer import ReportSerializer
|
||||||
from ..utils import file as FileUtils
|
from ..utils import file as FileUtils
|
||||||
from ..utils import process as ProcessUtil
|
from ..utils import process as ProcessUtil
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
tracer = trace.get_tracer("sbt_django_backend")
|
||||||
|
|
||||||
class CtelViewSet(viewsets.ViewSet):
|
class CtelViewSet(viewsets.ViewSet):
|
||||||
lookup_field = "username"
|
lookup_field = "username"
|
||||||
@ -183,6 +193,7 @@ class CtelViewSet(viewsets.ViewSet):
|
|||||||
|
|
||||||
return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id})
|
return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id})
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("process_sync")
|
||||||
@extend_schema(request={
|
@extend_schema(request={
|
||||||
'multipart/form-data': {
|
'multipart/form-data': {
|
||||||
'type': 'object',
|
'type': 'object',
|
||||||
@ -246,6 +257,7 @@ class CtelViewSet(viewsets.ViewSet):
|
|||||||
|
|
||||||
total_page = len(doc_files_with_type)
|
total_page = len(doc_files_with_type)
|
||||||
p_type = validated_data['type']
|
p_type = validated_data['type']
|
||||||
|
with tracer.start_as_current_span("create_and_save_record_in_db"):
|
||||||
new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page,
|
new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page,
|
||||||
pages_left=total_page,
|
pages_left=total_page,
|
||||||
process_type=p_type, status=1, request_id=rq_id,
|
process_type=p_type, status=1, request_id=rq_id,
|
||||||
@ -259,6 +271,8 @@ class CtelViewSet(viewsets.ViewSet):
|
|||||||
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
|
# Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible
|
||||||
compact_files = [None] * len(doc_files_with_type)
|
compact_files = [None] * len(doc_files_with_type)
|
||||||
pool = ThreadPool(processes=2)
|
pool = ThreadPool(processes=2)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("process_sync.process_file")
|
||||||
def process_file(data):
|
def process_file(data):
|
||||||
idx, doc_type, doc_file, tmp_file_name = data
|
idx, doc_type, doc_file, tmp_file_name = data
|
||||||
doc_file.seek(0)
|
doc_file.seek(0)
|
||||||
@ -272,12 +286,16 @@ class CtelViewSet(viewsets.ViewSet):
|
|||||||
"file_path": file_path,
|
"file_path": file_path,
|
||||||
"file_type": doc_type
|
"file_type": doc_type
|
||||||
}
|
}
|
||||||
|
|
||||||
|
with tracer.start_as_current_span("process_file_with_multi_thread"):
|
||||||
for result in pool.map(process_file, doc_files_with_type):
|
for result in pool.map(process_file, doc_files_with_type):
|
||||||
compact_files[result["idx"]] = result
|
compact_files[result["idx"]] = result
|
||||||
|
|
||||||
# Send to AI queue
|
# Send to AI queue
|
||||||
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files))
|
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files))
|
||||||
|
|
||||||
|
|
||||||
|
with tracer.start_as_current_span("backend_waiting_for_result"):
|
||||||
time_limit = 120
|
time_limit = 120
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
while True:
|
while True:
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
from celery import Celery
|
from celery import Celery
|
||||||
|
from celery.utils.log import get_task_logger
|
||||||
from fwd import settings
|
from django.conf import settings
|
||||||
from fwd_api.exception.exceptions import GeneralException
|
from fwd_api.exception.exceptions import GeneralException
|
||||||
from fwd_api.middleware.local_storage import get_current_trace_id
|
from fwd_api.middleware.local_storage import get_current_trace_id
|
||||||
from kombu.utils.uuid import uuid
|
from kombu.utils.uuid import uuid
|
||||||
from celery.utils.log import get_task_logger
|
|
||||||
logger = get_task_logger(__name__)
|
logger = get_task_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -9,16 +9,15 @@ import uuid
|
|||||||
from multiprocessing.pool import ThreadPool
|
from multiprocessing.pool import ThreadPool
|
||||||
|
|
||||||
from celery.utils.log import get_task_logger
|
from celery.utils.log import get_task_logger
|
||||||
from opentelemetry import trace
|
from fwd import celery_app as app
|
||||||
|
|
||||||
from fwd import settings
|
from fwd import settings
|
||||||
from fwd_api.celery_worker.task_warpper import VerboseTask
|
from fwd_api.celery_worker.task_warpper import VerboseTask
|
||||||
from fwd_api.celery_worker.worker import app
|
|
||||||
from fwd_api.constant.common import FileCategory
|
from fwd_api.constant.common import FileCategory
|
||||||
from fwd_api.middleware.local_storage import get_current_trace_id
|
from fwd_api.middleware.local_storage import get_current_trace_id
|
||||||
from fwd_api.models import (FeedbackRequest, Report, SubscriptionRequest,
|
from fwd_api.models import (FeedbackRequest, Report, SubscriptionRequest,
|
||||||
SubscriptionRequestFile, UserProfile)
|
SubscriptionRequestFile, UserProfile)
|
||||||
from fwd_api.utils.accuracy import predict_result_to_ready
|
from fwd_api.utils.accuracy import predict_result_to_ready
|
||||||
|
from opentelemetry import trace
|
||||||
|
|
||||||
from ..constant.common import FolderFileType, image_extensions
|
from ..constant.common import FolderFileType, image_extensions
|
||||||
from ..exception.exceptions import FileContentInvalidException
|
from ..exception.exceptions import FileContentInvalidException
|
||||||
@ -28,7 +27,7 @@ from ..utils import s3 as S3Util
|
|||||||
from ..utils.accuracy import validate_feedback_file
|
from ..utils.accuracy import validate_feedback_file
|
||||||
|
|
||||||
logger = get_task_logger(__name__)
|
logger = get_task_logger(__name__)
|
||||||
tracer = trace.get_tracer()
|
tracer = trace.get_tracer("sbt_celery_backend")
|
||||||
|
|
||||||
s3_client = S3Util.MinioS3Client(
|
s3_client = S3Util.MinioS3Client(
|
||||||
endpoint=settings.S3_ENDPOINT,
|
endpoint=settings.S3_ENDPOINT,
|
||||||
@ -235,7 +234,7 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
|
|||||||
for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue:
|
for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue:
|
||||||
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata)
|
ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata)
|
||||||
|
|
||||||
@app.task(base=VerboseTask, name='upload_file_to_s3', track_started=True)
|
@app.task(base=VerboseTask, name='upload_file_to_s3', track_started=False)
|
||||||
def upload_file_to_s3(local_file_path, s3_key, request_id):
|
def upload_file_to_s3(local_file_path, s3_key, request_id):
|
||||||
if s3_client.s3_client is not None:
|
if s3_client.s3_client is not None:
|
||||||
try:
|
try:
|
||||||
@ -279,7 +278,7 @@ def upload_report_to_s3(local_file_path, s3_key, report_id, delay):
|
|||||||
else:
|
else:
|
||||||
logger.info(f"S3 is not available, skipping,...")
|
logger.info(f"S3 is not available, skipping,...")
|
||||||
|
|
||||||
@app.task(base=VerboseTask, name='remove_local_file', track_started=True)
|
@app.task(base=VerboseTask, name='remove_local_file', track_started=False)
|
||||||
def remove_local_file(local_file_path, request_id):
|
def remove_local_file(local_file_path, request_id):
|
||||||
logger.info(f"Removing local file: {local_file_path}, ...")
|
logger.info(f"Removing local file: {local_file_path}, ...")
|
||||||
try:
|
try:
|
||||||
|
@ -1,31 +1,32 @@
|
|||||||
import traceback
|
|
||||||
|
|
||||||
from fwd_api.models import SubscriptionRequest, Report, ReportFile
|
|
||||||
from fwd_api.celery_worker.worker import app
|
|
||||||
from fwd_api.celery_worker.task_warpper import VerboseTask
|
|
||||||
from ..utils import s3 as S3Util
|
|
||||||
from ..utils.accuracy import (update_temp_accuracy, IterAvg,
|
|
||||||
count_transactions, extract_report_detail_list, calculate_a_request,
|
|
||||||
ReportAccumulateByRequest, create_billing_data)
|
|
||||||
from ..utils.file import dict2xlsx, save_workbook_file, save_report_to_S3, save_images_to_csv_briefly
|
|
||||||
from ..utils import time_stuff
|
|
||||||
from ..utils.redis import RedisUtils
|
|
||||||
from ..utils.cache import set_cache, get_cache
|
|
||||||
from django.utils import timezone
|
|
||||||
from django.db.models import Q
|
|
||||||
from itertools import chain
|
|
||||||
import json
|
|
||||||
import copy
|
import copy
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
|
import traceback
|
||||||
|
from itertools import chain
|
||||||
|
|
||||||
from opentelemetry import trace
|
|
||||||
from celery.utils.log import get_task_logger
|
from celery.utils.log import get_task_logger
|
||||||
|
from django.db.models import Q
|
||||||
|
from django.utils import timezone
|
||||||
|
from fwd import celery_app as app
|
||||||
from fwd import settings
|
from fwd import settings
|
||||||
|
from fwd_api.celery_worker.task_warpper import VerboseTask
|
||||||
|
from fwd_api.models import Report, SubscriptionRequest
|
||||||
|
from opentelemetry import trace
|
||||||
|
|
||||||
|
from ..utils import s3 as S3Util
|
||||||
|
from ..utils.accuracy import (IterAvg, ReportAccumulateByRequest,
|
||||||
|
calculate_a_request, count_transactions,
|
||||||
|
create_billing_data, extract_report_detail_list,
|
||||||
|
update_temp_accuracy)
|
||||||
|
from ..utils.cache import set_cache
|
||||||
|
from ..utils.file import (dict2xlsx, save_images_to_csv_briefly,
|
||||||
|
save_report_to_S3, save_workbook_file)
|
||||||
|
from ..utils.redis import RedisUtils
|
||||||
|
|
||||||
redis_client = RedisUtils()
|
redis_client = RedisUtils()
|
||||||
|
|
||||||
logger = get_task_logger(__name__)
|
logger = get_task_logger(__name__)
|
||||||
tracer = trace.get_tracer()
|
tracer = trace.get_tracer("sbt_celery_backend")
|
||||||
|
|
||||||
|
|
||||||
s3_client = S3Util.MinioS3Client(
|
s3_client = S3Util.MinioS3Client(
|
||||||
|
@ -5,18 +5,17 @@ import uuid
|
|||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
|
||||||
from celery.utils.log import get_task_logger
|
from celery.utils.log import get_task_logger
|
||||||
from opentelemetry import trace
|
from fwd import celery_app as app
|
||||||
|
|
||||||
from fwd_api.celery_worker.task_warpper import VerboseTask
|
from fwd_api.celery_worker.task_warpper import VerboseTask
|
||||||
from fwd_api.celery_worker.worker import app
|
|
||||||
from fwd_api.constant.common import ProcessType
|
from fwd_api.constant.common import ProcessType
|
||||||
from fwd_api.exception.exceptions import InvalidException
|
from fwd_api.exception.exceptions import InvalidException
|
||||||
from fwd_api.models import SubscriptionRequest, SubscriptionRequestFile
|
from fwd_api.models import SubscriptionRequest, SubscriptionRequestFile
|
||||||
from fwd_api.utils import process as ProcessUtil
|
from fwd_api.utils import process as ProcessUtil
|
||||||
from fwd_api.utils.redis import RedisUtils
|
from fwd_api.utils.redis import RedisUtils
|
||||||
|
from opentelemetry import trace
|
||||||
|
|
||||||
logger = get_task_logger(__name__)
|
logger = get_task_logger(__name__)
|
||||||
tracer = trace.get_tracer()
|
tracer = trace.get_tracer("sbt_celery_backend")
|
||||||
|
|
||||||
redis_client = RedisUtils()
|
redis_client = RedisUtils()
|
||||||
|
|
||||||
|
@ -1,31 +1,40 @@
|
|||||||
|
import csv
|
||||||
import io
|
import io
|
||||||
import os
|
|
||||||
import traceback
|
|
||||||
import pathlib
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
from PIL import Image, ExifTags
|
import os
|
||||||
from django.core.files.uploadedfile import TemporaryUploadedFile
|
import pathlib
|
||||||
from django.utils import timezone
|
import traceback
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
import imagesize
|
||||||
|
from django.core.files.uploadedfile import TemporaryUploadedFile
|
||||||
|
from django.utils import timezone
|
||||||
from fwd import settings
|
from fwd import settings
|
||||||
from ..utils import s3 as S3Util
|
|
||||||
from fwd_api.constant.common import allowed_file_extensions
|
from fwd_api.constant.common import allowed_file_extensions
|
||||||
from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \
|
from fwd_api.exception.exceptions import (FileFormatInvalidException,
|
||||||
ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException, RequiredColumnException
|
GeneralException,
|
||||||
from fwd_api.models import SubscriptionRequest, OcrTemplate, FeedbackRequest, SubscriptionRequestFile, Report, ReportFile
|
InvalidDecompressedSizeException,
|
||||||
|
InvalidException,
|
||||||
|
LimitReachedException,
|
||||||
|
RequiredColumnException,
|
||||||
|
RequiredFieldException,
|
||||||
|
ServiceUnavailableException)
|
||||||
|
from fwd_api.models import (FeedbackRequest, OcrTemplate, Report,
|
||||||
|
SubscriptionRequest, SubscriptionRequestFile)
|
||||||
from fwd_api.utils import process as ProcessUtil
|
from fwd_api.utils import process as ProcessUtil
|
||||||
from fwd_api.utils.crypto import image_authenticator
|
from fwd_api.utils.crypto import image_authenticator
|
||||||
from fwd_api.utils.image import resize
|
from fwd_api.utils.image import resize
|
||||||
from ..celery_worker.client_connector import c_connector
|
|
||||||
import imagesize
|
|
||||||
import csv
|
|
||||||
from openpyxl import load_workbook
|
from openpyxl import load_workbook
|
||||||
from openpyxl.styles import Font, Border, Side, PatternFill, NamedStyle, numbers, Alignment
|
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
|
||||||
import logging
|
from opentelemetry import trace
|
||||||
|
from PIL import ExifTags, Image
|
||||||
|
|
||||||
|
from ..celery_worker.client_connector import c_connector
|
||||||
|
from ..utils import s3 as S3Util
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
tracer = trace.get_tracer("sbt_django_backend")
|
||||||
|
|
||||||
s3_client = S3Util.MinioS3Client(
|
s3_client = S3Util.MinioS3Client(
|
||||||
endpoint=settings.S3_ENDPOINT,
|
endpoint=settings.S3_ENDPOINT,
|
||||||
@ -34,6 +43,7 @@ s3_client = S3Util.MinioS3Client(
|
|||||||
bucket_name=settings.S3_BUCKET_NAME
|
bucket_name=settings.S3_BUCKET_NAME
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("convert_date_string")
|
||||||
def convert_date_string(date_string):
|
def convert_date_string(date_string):
|
||||||
# Parse the input date string
|
# Parse the input date string
|
||||||
date_format = "%Y-%m-%d %H:%M:%S.%f %z"
|
date_format = "%Y-%m-%d %H:%M:%S.%f %z"
|
||||||
@ -44,6 +54,7 @@ def convert_date_string(date_string):
|
|||||||
|
|
||||||
return formatted_date
|
return formatted_date
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_report_list")
|
||||||
def validate_report_list(request):
|
def validate_report_list(request):
|
||||||
start_date_str = request.GET.get('start_date')
|
start_date_str = request.GET.get('start_date')
|
||||||
end_date_str = request.GET.get('end_date')
|
end_date_str = request.GET.get('end_date')
|
||||||
@ -68,6 +79,7 @@ def validate_report_list(request):
|
|||||||
raise RequiredFieldException(excArgs="report_id, start_date, end_date")
|
raise RequiredFieldException(excArgs="report_id, start_date, end_date")
|
||||||
return validated_data
|
return validated_data
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_feedback_file")
|
||||||
def validate_feedback_file(csv_file_path):
|
def validate_feedback_file(csv_file_path):
|
||||||
required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli']
|
required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli']
|
||||||
missing_columns = []
|
missing_columns = []
|
||||||
@ -83,6 +95,7 @@ def validate_feedback_file(csv_file_path):
|
|||||||
if missing_columns:
|
if missing_columns:
|
||||||
raise RequiredColumnException(excArgs=str(missing_columns))
|
raise RequiredColumnException(excArgs=str(missing_columns))
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_review")
|
||||||
def validate_review(review, num_imei):
|
def validate_review(review, num_imei):
|
||||||
for field in settings.FIELD:
|
for field in settings.FIELD:
|
||||||
if not field in review.keys():
|
if not field in review.keys():
|
||||||
@ -90,6 +103,7 @@ def validate_review(review, num_imei):
|
|||||||
if not isinstance(review["imei_number"], list) or len(review["imei_number"]) != num_imei:
|
if not isinstance(review["imei_number"], list) or len(review["imei_number"]) != num_imei:
|
||||||
raise InvalidException(excArgs=f'imei_number')
|
raise InvalidException(excArgs=f'imei_number')
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_list_file")
|
||||||
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
|
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
|
||||||
total_file_size = 0
|
total_file_size = 0
|
||||||
if len(files) < min_file_num:
|
if len(files) < min_file_num:
|
||||||
@ -109,6 +123,7 @@ def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUES
|
|||||||
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
|
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
|
||||||
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
|
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_csv_feedback")
|
||||||
def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"):
|
def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"):
|
||||||
total_file_size = 0
|
total_file_size = 0
|
||||||
if len(files) < min_file_num:
|
if len(files) < min_file_num:
|
||||||
@ -128,6 +143,7 @@ def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv
|
|||||||
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
|
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
|
||||||
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
|
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
|
||||||
|
|
||||||
|
# @tracer.start_as_current_span("get_file")
|
||||||
def get_file(file_path: str):
|
def get_file(file_path: str):
|
||||||
try:
|
try:
|
||||||
return open(file_path, 'rb')
|
return open(file_path, 'rb')
|
||||||
@ -136,6 +152,7 @@ def get_file(file_path: str):
|
|||||||
raise GeneralException("System")
|
raise GeneralException("System")
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("get_template_folder_path")
|
||||||
def get_template_folder_path(tem: OcrTemplate):
|
def get_template_folder_path(tem: OcrTemplate):
|
||||||
tem_id = str(tem.id)
|
tem_id = str(tem.id)
|
||||||
sub_id = str(tem.subscription.id)
|
sub_id = str(tem.subscription.id)
|
||||||
@ -156,6 +173,7 @@ def get_folder_path(rq: SubscriptionRequest):
|
|||||||
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id)
|
return os.path.join(settings.MEDIA_ROOT, 'users', user_id, "subscriptions", sub_id, 'requests', p_type, request_id)
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_byte_file")
|
||||||
def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
|
def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
|
||||||
folder_path = get_folder_path(rq)
|
folder_path = get_folder_path(rq)
|
||||||
is_exist = os.path.exists(folder_path)
|
is_exist = os.path.exists(folder_path)
|
||||||
@ -168,6 +186,7 @@ def save_byte_file(file_name: str, rq: SubscriptionRequest, file_bytes):
|
|||||||
|
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_file")
|
||||||
def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile):
|
def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile):
|
||||||
folder_path = get_folder_path(rq)
|
folder_path = get_folder_path(rq)
|
||||||
is_exist = os.path.exists(folder_path)
|
is_exist = os.path.exists(folder_path)
|
||||||
@ -183,6 +202,7 @@ def save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFi
|
|||||||
|
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_json_file")
|
||||||
def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
|
def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
|
||||||
folder_path = get_folder_path(rq)
|
folder_path = get_folder_path(rq)
|
||||||
is_exist = os.path.exists(folder_path)
|
is_exist = os.path.exists(folder_path)
|
||||||
@ -194,6 +214,7 @@ def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
|
|||||||
json.dump(data, json_file)
|
json.dump(data, json_file)
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_feedback_file")
|
||||||
def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict):
|
def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict):
|
||||||
user_id = str(rq.subscription.user.id)
|
user_id = str(rq.subscription.user.id)
|
||||||
feedback_id = str(rq.id)
|
feedback_id = str(rq.id)
|
||||||
@ -209,6 +230,7 @@ def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict)
|
|||||||
csvfile.write(file_contents)
|
csvfile.write(file_contents)
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_workbook_file")
|
||||||
def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""):
|
def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""):
|
||||||
report_id = str(rp.report_id)
|
report_id = str(rp.report_id)
|
||||||
|
|
||||||
@ -222,6 +244,7 @@ def save_workbook_file(file_name: str, rp: Report, workbook, prefix=""):
|
|||||||
workbook.save(file_path)
|
workbook.save(file_path)
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("delete_file_with_path")
|
||||||
def delete_file_with_path(file_path: str) -> bool:
|
def delete_file_with_path(file_path: str) -> bool:
|
||||||
try:
|
try:
|
||||||
os.remove(file_path)
|
os.remove(file_path)
|
||||||
@ -231,6 +254,7 @@ def delete_file_with_path(file_path: str) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_template_file")
|
||||||
def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality):
|
def save_template_file(file_name: str, rq: OcrTemplate, file: TemporaryUploadedFile, quality):
|
||||||
try:
|
try:
|
||||||
folder_path = get_template_folder_path(rq)
|
folder_path = get_template_folder_path(rq)
|
||||||
@ -258,6 +282,7 @@ def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, fo
|
|||||||
raise ServiceUnavailableException()
|
raise ServiceUnavailableException()
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_images_to_csv_briefly")
|
||||||
def save_images_to_csv_briefly(id, image_filenames):
|
def save_images_to_csv_briefly(id, image_filenames):
|
||||||
# columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"]
|
# columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"]
|
||||||
columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"]
|
columns = ["request_id", "file_name", "predict_result", "feedback_result", "reviewed_result", "feedback_accuracy", "reviewed_accuracy"]
|
||||||
@ -290,6 +315,7 @@ def save_images_to_csv_briefly(id, image_filenames):
|
|||||||
# save to S3
|
# save to S3
|
||||||
save_report_to_S3(id, file_path)
|
save_report_to_S3(id, file_path)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("resize_and_save_file")
|
||||||
def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality: int):
|
def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: TemporaryUploadedFile, quality: int):
|
||||||
try:
|
try:
|
||||||
folder_path = get_folder_path(rq)
|
folder_path = get_folder_path(rq)
|
||||||
@ -310,6 +336,7 @@ def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: Temporar
|
|||||||
logger.error(f"{e}")
|
logger.error(f"{e}")
|
||||||
raise ServiceUnavailableException()
|
raise ServiceUnavailableException()
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_to_S3")
|
||||||
def save_to_S3(file_name, rq, local_file_path):
|
def save_to_S3(file_name, rq, local_file_path):
|
||||||
try:
|
try:
|
||||||
file_path = get_folder_path(rq)
|
file_path = get_folder_path(rq)
|
||||||
@ -323,6 +350,7 @@ def save_to_S3(file_name, rq, local_file_path):
|
|||||||
logger.error(f"{e}")
|
logger.error(f"{e}")
|
||||||
raise ServiceUnavailableException()
|
raise ServiceUnavailableException()
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_feedback_to_S3")
|
||||||
def save_feedback_to_S3(file_name, id, local_file_path):
|
def save_feedback_to_S3(file_name, id, local_file_path):
|
||||||
try:
|
try:
|
||||||
assert len(local_file_path.split("/")) >= 3, "file_path must have at least feedback_folder and feedback_id"
|
assert len(local_file_path.split("/")) >= 3, "file_path must have at least feedback_folder and feedback_id"
|
||||||
@ -335,6 +363,7 @@ def save_feedback_to_S3(file_name, id, local_file_path):
|
|||||||
logger.error(f"{e}")
|
logger.error(f"{e}")
|
||||||
raise ServiceUnavailableException()
|
raise ServiceUnavailableException()
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_report_to_S3")
|
||||||
def save_report_to_S3(id, local_file_path, delay=0):
|
def save_report_to_S3(id, local_file_path, delay=0):
|
||||||
try:
|
try:
|
||||||
s3_key = os.path.join("report", local_file_path.split("/")[-2], local_file_path.split("/")[-1])
|
s3_key = os.path.join("report", local_file_path.split("/")[-2], local_file_path.split("/")[-1])
|
||||||
@ -345,9 +374,11 @@ def save_report_to_S3(id, local_file_path, delay=0):
|
|||||||
logger.error(f"{e}")
|
logger.error(f"{e}")
|
||||||
raise ServiceUnavailableException()
|
raise ServiceUnavailableException()
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("download_from_S3")
|
||||||
def download_from_S3(s3_key, local_file_path):
|
def download_from_S3(s3_key, local_file_path):
|
||||||
s3_client.download_file(s3_key, local_file_path)
|
s3_client.download_file(s3_key, local_file_path)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_file_with_path")
|
||||||
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
|
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
|
||||||
try:
|
try:
|
||||||
file_path = os.path.join(folder_path, file_name)
|
file_path = os.path.join(folder_path, file_name)
|
||||||
@ -364,13 +395,14 @@ def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, fo
|
|||||||
raise ServiceUnavailableException()
|
raise ServiceUnavailableException()
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_pdf")
|
||||||
def save_pdf(file_path: str, file: TemporaryUploadedFile):
|
def save_pdf(file_path: str, file: TemporaryUploadedFile):
|
||||||
f = open(file_path, 'wb+')
|
f = open(file_path, 'wb+')
|
||||||
for chunk in file.chunks():
|
for chunk in file.chunks():
|
||||||
f.write(chunk)
|
f.write(chunk)
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_img")
|
||||||
def save_img(file_path: str, file: TemporaryUploadedFile, quality):
|
def save_img(file_path: str, file: TemporaryUploadedFile, quality):
|
||||||
with open(file.temporary_file_path(), "rb") as fs:
|
with open(file.temporary_file_path(), "rb") as fs:
|
||||||
input_file = io.BytesIO(fs.read())
|
input_file = io.BytesIO(fs.read())
|
||||||
@ -407,6 +439,7 @@ def save_img(file_path: str, file: TemporaryUploadedFile, quality):
|
|||||||
image = image.convert('RGB')
|
image = image.convert('RGB')
|
||||||
image.save(file_path, optimize=True, quality=quality)
|
image.save(file_path, optimize=True, quality=quality)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("build_media_url")
|
||||||
def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
|
def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
|
||||||
token = image_authenticator.generate_img_token()
|
token = image_authenticator.generate_img_token()
|
||||||
if not file_name:
|
if not file_name:
|
||||||
@ -420,6 +453,7 @@ def build_media_url(folder: str, uid: str, file_name: str = None) -> str:
|
|||||||
token=token)
|
token=token)
|
||||||
|
|
||||||
|
|
||||||
|
# @tracer.start_as_current_span("build_url")
|
||||||
def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str:
|
def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) -> str:
|
||||||
token = image_authenticator.generate_img_token(user_id)
|
token = image_authenticator.generate_img_token(user_id)
|
||||||
if not file_name:
|
if not file_name:
|
||||||
@ -431,13 +465,17 @@ def build_url(folder: str, data_id: str, user_id: int, file_name: str = None) ->
|
|||||||
file_name=file_name,
|
file_name=file_name,
|
||||||
base_url=settings.BASE_URL,
|
base_url=settings.BASE_URL,
|
||||||
token=token)
|
token=token)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("build_media_url_v2")
|
||||||
def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str:
|
def build_media_url_v2(media_id: str, user_id: int, sub_id: int, u_sync_id: str) -> str:
|
||||||
token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id)
|
token = image_authenticator.generate_img_token_v2(user_id, sub_id, u_sync_id)
|
||||||
return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'
|
return f'{settings.BASE_URL}/api/ctel/v2/media/request/{media_id}/?token={token}'
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("build_S3_url")
|
||||||
def build_S3_url(s3_key, exp_time):
|
def build_S3_url(s3_key, exp_time):
|
||||||
return s3_client.create_url_with_expiration(s3_key, exp_time)
|
return s3_client.create_url_with_expiration(s3_key, exp_time)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("get_value")
|
||||||
def get_value(_dict, keys):
|
def get_value(_dict, keys):
|
||||||
keys = keys.split('.')
|
keys = keys.split('.')
|
||||||
value = _dict
|
value = _dict
|
||||||
@ -459,6 +497,7 @@ def get_value(_dict, keys):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("dict2xlsx")
|
||||||
def dict2xlsx(input: json, _type='report'):
|
def dict2xlsx(input: json, _type='report'):
|
||||||
if _type == 'report':
|
if _type == 'report':
|
||||||
wb = dump_excel_report(input=input)
|
wb = dump_excel_report(input=input)
|
||||||
@ -468,6 +507,7 @@ def dict2xlsx(input: json, _type='report'):
|
|||||||
wb = dump_excel_billing_report(input=input)
|
wb = dump_excel_billing_report(input=input)
|
||||||
return wb
|
return wb
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("dump_excel_report")
|
||||||
def dump_excel_report(input: json):
|
def dump_excel_report(input: json):
|
||||||
red = "FF0000"
|
red = "FF0000"
|
||||||
black = "000000"
|
black = "000000"
|
||||||
@ -537,6 +577,7 @@ def dump_excel_report(input: json):
|
|||||||
start_index += 1
|
start_index += 1
|
||||||
return wb
|
return wb
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("dump_excel_report_detail")
|
||||||
def dump_excel_report_detail(input: json):
|
def dump_excel_report_detail(input: json):
|
||||||
red = "FF0000"
|
red = "FF0000"
|
||||||
black = "000000"
|
black = "000000"
|
||||||
@ -597,6 +638,7 @@ def dump_excel_report_detail(input: json):
|
|||||||
start_index += 1
|
start_index += 1
|
||||||
return wb
|
return wb
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("dump_excel_billing_report")
|
||||||
def dump_excel_billing_report(input: json):
|
def dump_excel_billing_report(input: json):
|
||||||
black = "000000"
|
black = "000000"
|
||||||
font_black = Font(name="Calibri", size=11, color=black)
|
font_black = Font(name="Calibri", size=11, color=black)
|
||||||
|
@ -26,13 +26,13 @@ from ..models import UserProfile, OcrTemplate, OcrTemplateBox, \
|
|||||||
Subscription, SubscriptionRequestFile, SubscriptionRequest
|
Subscription, SubscriptionRequestFile, SubscriptionRequest
|
||||||
from ..celery_worker.client_connector import c_connector
|
from ..celery_worker.client_connector import c_connector
|
||||||
import logging
|
import logging
|
||||||
|
from opentelemetry import trace
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
tracer = trace.get_tracer("sbt_django_backend")
|
||||||
|
|
||||||
class UserData:
|
class UserData:
|
||||||
user: UserProfile = None
|
user: UserProfile = None
|
||||||
current_sub: Subscription = None
|
current_sub: Subscription = None
|
||||||
|
|
||||||
def __init__(self, request):
|
def __init__(self, request):
|
||||||
user_data = validate_user_request_and_get(request)
|
user_data = validate_user_request_and_get(request)
|
||||||
users = UserProfile.objects.filter(sync_id=user_data['id'])
|
users = UserProfile.objects.filter(sync_id=user_data['id'])
|
||||||
@ -64,11 +64,11 @@ class UserData:
|
|||||||
self.user = user
|
self.user = user
|
||||||
self.current_sub = sub
|
self.current_sub = sub
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("get_user")
|
||||||
def get_user(request) -> UserData:
|
def get_user(request) -> UserData:
|
||||||
return UserData(request)
|
return UserData(request)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_user_request_and_get")
|
||||||
def validate_user_request_and_get(request):
|
def validate_user_request_and_get(request):
|
||||||
if not hasattr(request, 'user_data'):
|
if not hasattr(request, 'user_data'):
|
||||||
raise NotFoundException(excArgs='user')
|
raise NotFoundException(excArgs='user')
|
||||||
@ -79,7 +79,7 @@ def validate_user_request_and_get(request):
|
|||||||
raise NotFoundException(excArgs='subscription')
|
raise NotFoundException(excArgs='subscription')
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_ocr_request_and_get")
|
||||||
def validate_ocr_request_and_get(request, subscription):
|
def validate_ocr_request_and_get(request, subscription):
|
||||||
validated_data = {}
|
validated_data = {}
|
||||||
if "processType" not in request.data or request.data['processType'] is None \
|
if "processType" not in request.data or request.data['processType'] is None \
|
||||||
@ -109,6 +109,7 @@ def validate_ocr_request_and_get(request, subscription):
|
|||||||
|
|
||||||
return validated_data
|
return validated_data
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("sbt_validate_ocr_request_and_get")
|
||||||
def sbt_validate_ocr_request_and_get(request, subscription):
|
def sbt_validate_ocr_request_and_get(request, subscription):
|
||||||
validated_data = {}
|
validated_data = {}
|
||||||
# if "processType" not in request.data or request.data['processType'] is None \
|
# if "processType" not in request.data or request.data['processType'] is None \
|
||||||
@ -154,6 +155,7 @@ def sbt_validate_ocr_request_and_get(request, subscription):
|
|||||||
|
|
||||||
return validated_data
|
return validated_data
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("string_to_boolean")
|
||||||
def string_to_boolean(value):
|
def string_to_boolean(value):
|
||||||
true_strings = ['true', 'yes', '1', 'on']
|
true_strings = ['true', 'yes', '1', 'on']
|
||||||
false_strings = ['false', 'no', '0', 'off']
|
false_strings = ['false', 'no', '0', 'off']
|
||||||
@ -165,6 +167,7 @@ def string_to_boolean(value):
|
|||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("sbt_validate_feedback")
|
||||||
def sbt_validate_feedback(request):
|
def sbt_validate_feedback(request):
|
||||||
validated_data = {}
|
validated_data = {}
|
||||||
|
|
||||||
@ -194,6 +197,7 @@ def sbt_validate_feedback(request):
|
|||||||
|
|
||||||
return validated_data
|
return validated_data
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("count_pages_in_pdf")
|
||||||
def count_pages_in_pdf(pdf_file):
|
def count_pages_in_pdf(pdf_file):
|
||||||
count = 0
|
count = 0
|
||||||
fh, temp_filename = tempfile.mkstemp() # make a tmp file
|
fh, temp_filename = tempfile.mkstemp() # make a tmp file
|
||||||
@ -207,6 +211,7 @@ def count_pages_in_pdf(pdf_file):
|
|||||||
return count
|
return count
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("count_pages_in_pdf_list")
|
||||||
def count_pages_in_pdf_list(list_file):
|
def count_pages_in_pdf_list(list_file):
|
||||||
total_page = 0
|
total_page = 0
|
||||||
|
|
||||||
@ -216,6 +221,7 @@ def count_pages_in_pdf_list(list_file):
|
|||||||
return total_page
|
return total_page
|
||||||
|
|
||||||
|
|
||||||
|
# @tracer.start_as_current_span("map_process_type_to_folder_name")
|
||||||
def map_process_type_to_folder_name(p_type):
|
def map_process_type_to_folder_name(p_type):
|
||||||
if p_type == ProcessType.ID_CARD.value:
|
if p_type == ProcessType.ID_CARD.value:
|
||||||
return 'id_card'
|
return 'id_card'
|
||||||
@ -239,6 +245,7 @@ def map_process_type_to_folder_name(p_type):
|
|||||||
raise InvalidException(excArgs='processType')
|
raise InvalidException(excArgs='processType')
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("get_random_string")
|
||||||
def get_random_string(length):
|
def get_random_string(length):
|
||||||
# choose from all lowercase letter
|
# choose from all lowercase letter
|
||||||
letters = string.ascii_lowercase
|
letters = string.ascii_lowercase
|
||||||
@ -247,6 +254,7 @@ def get_random_string(length):
|
|||||||
return result_str
|
return result_str
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("is_int")
|
||||||
def is_int(text) -> bool:
|
def is_int(text) -> bool:
|
||||||
try:
|
try:
|
||||||
# converting to integer
|
# converting to integer
|
||||||
@ -256,6 +264,7 @@ def is_int(text) -> bool:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_box")
|
||||||
def validate_box(list_box, max_number_of_box, max_number_of_item_in_a_box, number_of_box=None):
|
def validate_box(list_box, max_number_of_box, max_number_of_item_in_a_box, number_of_box=None):
|
||||||
if len(list_box) > max_number_of_box:
|
if len(list_box) > max_number_of_box:
|
||||||
raise NumberOfBoxLimitReachedException(excArgs=LIST_BOX_MESSAGE)
|
raise NumberOfBoxLimitReachedException(excArgs=LIST_BOX_MESSAGE)
|
||||||
@ -268,6 +277,7 @@ def validate_box(list_box, max_number_of_box, max_number_of_item_in_a_box, numbe
|
|||||||
raise InvalidException(excArgs="box coordinates")
|
raise InvalidException(excArgs="box coordinates")
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("to_box_list")
|
||||||
def to_box_list(str_list):
|
def to_box_list(str_list):
|
||||||
ls = []
|
ls = []
|
||||||
if not str_list:
|
if not str_list:
|
||||||
@ -280,6 +290,7 @@ def to_box_list(str_list):
|
|||||||
return ls
|
return ls
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_json_response_and_return")
|
||||||
def validate_json_response_and_return(res):
|
def validate_json_response_and_return(res):
|
||||||
if res.status_code != status.HTTP_200_OK:
|
if res.status_code != status.HTTP_200_OK:
|
||||||
raise ServiceUnavailableException()
|
raise ServiceUnavailableException()
|
||||||
@ -290,6 +301,7 @@ def validate_json_response_and_return(res):
|
|||||||
return res_data
|
return res_data
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("is_duplicate_in_list")
|
||||||
def is_duplicate_in_list(str_list):
|
def is_duplicate_in_list(str_list):
|
||||||
unique_set: set = set({})
|
unique_set: set = set({})
|
||||||
for label in str_list:
|
for label in str_list:
|
||||||
@ -300,16 +312,19 @@ def is_duplicate_in_list(str_list):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_duplicate")
|
||||||
def validate_duplicate(list_box):
|
def validate_duplicate(list_box):
|
||||||
if is_duplicate_in_list(list_box):
|
if is_duplicate_in_list(list_box):
|
||||||
raise DuplicateEntityException(excArgs="box_label")
|
raise DuplicateEntityException(excArgs="box_label")
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("validate_vn_and_space")
|
||||||
def validate_vn_and_space(txt: str):
|
def validate_vn_and_space(txt: str):
|
||||||
if not pattern.fullmatch(txt.upper()):
|
if not pattern.fullmatch(txt.upper()):
|
||||||
raise InvalidException(excArgs=NAME_MESSAGE)
|
raise InvalidException(excArgs=NAME_MESSAGE)
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("save_template_boxs")
|
||||||
def save_template_boxs(data, template):
|
def save_template_boxs(data, template):
|
||||||
saving_list = []
|
saving_list = []
|
||||||
for d_box in data['data_boxs']:
|
for d_box in data['data_boxs']:
|
||||||
@ -329,6 +344,7 @@ def token_value(token_type):
|
|||||||
return 5
|
return 5
|
||||||
return 1 # Basic OCR
|
return 1 # Basic OCR
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("send_to_queue2")
|
||||||
def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}):
|
def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}):
|
||||||
try:
|
try:
|
||||||
if typez == ProcessType.ID_CARD.value:
|
if typez == ProcessType.ID_CARD.value:
|
||||||
@ -346,6 +362,7 @@ def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}):
|
|||||||
logger.error(e)
|
logger.error(e)
|
||||||
raise BadGatewayException()
|
raise BadGatewayException()
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("build_template_matching_data")
|
||||||
def build_template_matching_data(template):
|
def build_template_matching_data(template):
|
||||||
temp_dict = {
|
temp_dict = {
|
||||||
|
|
||||||
@ -372,6 +389,7 @@ def build_template_matching_data(template):
|
|||||||
return temp_dict
|
return temp_dict
|
||||||
|
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("send_template_queue")
|
||||||
def send_template_queue(rq_id, file_url, template: OcrTemplate, uid):
|
def send_template_queue(rq_id, file_url, template: OcrTemplate, uid):
|
||||||
try:
|
try:
|
||||||
|
|
||||||
@ -383,9 +401,11 @@ def send_template_queue(rq_id, file_url, template: OcrTemplate, uid):
|
|||||||
logger.error(e)
|
logger.error(e)
|
||||||
raise BadGatewayException()
|
raise BadGatewayException()
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("process_feedback")
|
||||||
def process_feedback(feedback_id, local_file_path):
|
def process_feedback(feedback_id, local_file_path):
|
||||||
c_connector.csv_feedback((local_file_path, feedback_id))
|
c_connector.csv_feedback((local_file_path, feedback_id))
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("process_pdf_file")
|
||||||
def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
||||||
doc: fitz.Document = fitz.open(stream=file_obj.file.read())
|
doc: fitz.Document = fitz.open(stream=file_obj.file.read())
|
||||||
if doc.page_count > settings.MAX_PAGES_OF_PDF_FILE:
|
if doc.page_count > settings.MAX_PAGES_OF_PDF_FILE:
|
||||||
@ -406,6 +426,7 @@ def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: S
|
|||||||
# Sub-file
|
# Sub-file
|
||||||
return pdf_to_images_urls(doc, request, user)
|
return pdf_to_images_urls(doc, request, user)
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("process_image_file")
|
||||||
def process_image_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
def process_image_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
||||||
if file_obj.size > settings.SIZE_TO_COMPRESS:
|
if file_obj.size > settings.SIZE_TO_COMPRESS:
|
||||||
quality = 95
|
quality = 95
|
||||||
@ -425,6 +446,7 @@ def process_image_file(file_name: str, file_obj: TemporaryUploadedFile, request:
|
|||||||
'request_file_id': new_request_file.code
|
'request_file_id': new_request_file.code
|
||||||
}]
|
}]
|
||||||
|
|
||||||
|
# @tracer.start_as_current_span("process_image_local_file")
|
||||||
def process_image_local_file(file_name: str, file_path: str, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
def process_image_local_file(file_name: str, file_path: str, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
||||||
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
||||||
request=request,
|
request=request,
|
||||||
@ -439,6 +461,7 @@ def process_image_local_file(file_name: str, file_path: str, request: Subscripti
|
|||||||
'request_file_id': new_request_file.code
|
'request_file_id': new_request_file.code
|
||||||
}]
|
}]
|
||||||
|
|
||||||
|
@tracer.start_as_current_span("pdf_to_images_urls")
|
||||||
def pdf_to_images_urls(doc_path, request: SubscriptionRequest, user, dpi: int = 300) -> list:
|
def pdf_to_images_urls(doc_path, request: SubscriptionRequest, user, dpi: int = 300) -> list:
|
||||||
pdf_extracted = []
|
pdf_extracted = []
|
||||||
saving_path = FileUtils.get_folder_path(request)
|
saving_path = FileUtils.get_folder_path(request)
|
||||||
|
@ -30,14 +30,10 @@ services:
|
|||||||
reservations:
|
reservations:
|
||||||
devices:
|
devices:
|
||||||
- driver: nvidia
|
- driver: nvidia
|
||||||
count: 1
|
count: all
|
||||||
capabilities: [gpu]
|
capabilities: [gpu]
|
||||||
# command: bash -c "tail -f > /dev/null"
|
|
||||||
command: bash run.sh
|
command: bash run.sh
|
||||||
# deploy:
|
|
||||||
# mode: replicated
|
|
||||||
# replicas: 1
|
|
||||||
# Back-end services
|
|
||||||
be-ctel-sbt:
|
be-ctel-sbt:
|
||||||
build:
|
build:
|
||||||
context: cope2n-api
|
context: cope2n-api
|
||||||
@ -93,12 +89,11 @@ services:
|
|||||||
depends_on:
|
depends_on:
|
||||||
db-sbt:
|
db-sbt:
|
||||||
condition: service_started
|
condition: service_started
|
||||||
command: sh -c "sudo chmod -R 777 /app; sleep 5; python manage.py collectstatic --no-input &&
|
command: sh -c "python manage.py collectstatic --no-input &&
|
||||||
python manage.py makemigrations &&
|
python manage.py makemigrations &&
|
||||||
python manage.py migrate &&
|
python manage.py migrate &&
|
||||||
python manage.py compilemessages &&
|
python manage.py compilemessages &&
|
||||||
gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod
|
gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod
|
||||||
# command: "sleep infinity"
|
|
||||||
|
|
||||||
minio:
|
minio:
|
||||||
image: minio/minio
|
image: minio/minio
|
||||||
@ -186,7 +181,7 @@ services:
|
|||||||
- ./cope2n-api:/app
|
- ./cope2n-api:/app
|
||||||
|
|
||||||
working_dir: /app
|
working_dir: /app
|
||||||
command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5"
|
command: sh -c "celery -A fwd worker -l INFO -c 5"
|
||||||
# command: bash -c "tail -f > /dev/null"
|
# command: bash -c "tail -f > /dev/null"
|
||||||
|
|
||||||
# Back-end persistent
|
# Back-end persistent
|
||||||
|
Loading…
Reference in New Issue
Block a user