import logging import time import uuid from datetime import datetime from multiprocessing.pool import ThreadPool from typing import List from wsgiref.util import FileWrapper from django.core.files.uploadedfile import TemporaryUploadedFile from django.http import HttpResponse, JsonResponse from drf_spectacular.utils import extend_schema from fwd import settings from opentelemetry import trace from rest_framework import status, viewsets from rest_framework.decorators import action from rest_framework.renderers import JSONRenderer from rest_framework.response import Response from rest_framework_xml.renderers import XMLRenderer from ..annotation.api import throw_on_failure from ..celery_worker.client_connector import c_connector from ..constant.common import (FOLDER_TYPE, REQUEST_ID, EntityStatus, ProcessType, allowed_file_extensions, image_extensions, pdf_extensions, standard_ocr_list) from ..exception.exceptions import (FileContentInvalidException, InvalidException, LockedEntityException, NotFoundException, PermissionDeniedException, RequiredFieldException, ServiceTimeoutException) from ..models import (FeedbackRequest, OcrTemplate, SubscriptionRequest, SubscriptionRequestFile) from ..response.ReportSerializer import ReportSerializer from ..utils import file as FileUtils from ..utils import process as ProcessUtil logger = logging.getLogger(__name__) tracer = trace.get_tracer("sbt_django_backend") class CtelViewSet(viewsets.ViewSet): lookup_field = "username" size_to_compress = settings.SIZE_TO_COMPRESS @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'is_test_request': { 'type': 'boolean', }, 'file': { 'type': 'string', 'format': 'binary' }, 'processType': { 'type': 'string' }, }, 'required': {'file', 'processType'} } }, responses=None, tags=['OCR']) @action(detail=False, url_path="image/process", methods=["POST"]) def process(self, request): s_time = time.time() user_info = ProcessUtil.get_user(request) user = user_info.user sub = user_info.current_sub validated_data = ProcessUtil.validate_ocr_request_and_get(request, sub) provider_code = 'SAP' file_obj: TemporaryUploadedFile = validated_data['file'] file_extension = file_obj.name.split(".")[-1].lower() p_type = validated_data['type'] rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex file_name = f"temp_{rq_id}.{file_extension}" is_test_request = validated_data.get("is_test_request", False) total_page = 1 new_request: SubscriptionRequest = SubscriptionRequest( pages=total_page, pages_left=total_page, process_type=p_type, status=1, request_id=rq_id, provider_code=provider_code, subscription=sub, is_test_request=is_test_request ) new_request.save() from ..celery_worker.client_connector import c_connector file_obj.seek(0) file_path = FileUtils.resize_and_save_file(file_name, new_request, file_obj, 100) S3_path = FileUtils.save_to_S3(file_name, new_request, file_path) files =[{ "file_name": file_name, "file_path": file_path, # local path to file "file_type": "" },] if file_extension in pdf_extensions: c_connector.do_pdf((rq_id, sub.id, p_type, user.id, files)) elif file_extension in image_extensions: b_url = ProcessUtil.process_image_file(file_name, file_obj, new_request, user, "all", 0) j_time = time.time() logger.info(f"Duration of Pre-processing: {j_time - s_time}s") logger.info(f"b_url: {b_url}") if p_type in standard_ocr_list: ProcessUtil.send_to_queue2(rq_id + "_sub_0", sub.id, b_url, user.id, p_type) if p_type == ProcessType.TEMPLATE_MATCHING.value: ProcessUtil.send_template_queue(rq_id, b_url, validated_data['template'], user.id) else: return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {file_extension} is now allowed"}) return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id}) @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'imei_files': { 'type': 'array', 'items': { 'type': 'string', 'format': 'binary' } }, 'invoice_file': { 'type': 'string', 'format': 'binary' }, 'redemption_ID': { 'type': 'string' }, 'is_test_request': { 'type': 'boolean', }, }, 'required': {'imei_files'} } }, responses=None, tags=['OCR']) @action(detail=False, url_path="images/process", methods=["POST"]) def processes(self, request): user_info = ProcessUtil.get_user(request) user = user_info.user sub = user_info.current_sub validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) provider_code = 'SAP' imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] files = { "imei": imei_file_objs, "invoice": invoice_file_objs } total_page = len(files.keys()) is_test_request = validated_data.get("is_test_request", False) rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex p_type = validated_data['type'] new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, pages_left=total_page, process_type=p_type, status=1, request_id=rq_id, provider_code=provider_code, subscription=sub, redemption_id=validated_data["redemption_ID"], is_test_request=is_test_request) new_request.save() count = 0 compact_files = [] for doc_type, doc_files in files.items(): for i, doc_file in enumerate(doc_files): _ext = doc_file.name.split(".")[-1] if _ext not in allowed_file_extensions: return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"}) _name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" doc_file.seek(0) file_path = FileUtils.resize_and_save_file(_name, new_request, doc_file, 100) FileUtils.save_to_S3(_name, new_request, file_path) count += 1 this_file = { "index_in_request": i, "file_name": _name, "file_path": file_path, "file_type": doc_type } compact_files.append(this_file) c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id}) @tracer.start_as_current_span("process_sync") @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'imei_files': { 'type': 'array', 'items': { 'type': 'string', 'format': 'binary' } }, 'invoice_file': { 'type': 'string', 'format': 'binary' }, 'redemption_ID': { 'type': 'string' }, 'subsidiary': { 'type': 'string' }, 'is_test_request': { 'type': 'boolean', }, }, # 'required': {'imei_files'} } }, responses=None, tags=['OCR']) @action(detail=False, url_path="images/process_sync", methods=["POST"]) def processes_sync(self, request): user_info = ProcessUtil.get_user(request) user = user_info.user sub = user_info.current_sub validated_data = ProcessUtil.sbt_validate_ocr_request_and_get(request, sub) provider_code = 'SAP' imei_file_objs: List[TemporaryUploadedFile] = validated_data['imei_file'] invoice_file_objs: List[TemporaryUploadedFile] = validated_data['invoice_file'] files = { "imei": imei_file_objs, "invoice": invoice_file_objs } rq_id = provider_code + "_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex logger.info(f"Creating a request: {rq_id}") is_test_request = validated_data.get("is_test_request", False) count = 0 doc_files_with_type = [] for doc_type, doc_files in files.items(): for i, doc_file in enumerate(doc_files): _ext = doc_file.name.split(".")[-1] if _ext not in allowed_file_extensions: return JsonResponse(status=status.HTTP_406_NOT_ACCEPTABLE, data={"request_id": rq_id, "message": f"File {_ext} is now allowed"}) tmp_file_name = f"temp_{doc_type}_{rq_id}_{i}.{_ext}" doc_files_with_type.append(( count, doc_type, doc_file, tmp_file_name )) count += 1 total_page = len(doc_files_with_type) p_type = validated_data['type'] with tracer.start_as_current_span("create_and_save_record_in_db"): new_request: SubscriptionRequest = SubscriptionRequest(pages=total_page, pages_left=total_page, process_type=p_type, status=1, request_id=rq_id, provider_code=provider_code, subscription=sub, redemption_id=validated_data["redemption_ID"], subsidiary=validated_data["subsidiary"], is_test_request=is_test_request) new_request.save() # Run file processing in a pool of 2 threads. TODO: Convert to Celery worker when possible compact_files = [None] * len(doc_files_with_type) pool = ThreadPool(processes=2) @tracer.start_as_current_span("process_sync.process_file") def process_file(data): idx, doc_type, doc_file, tmp_file_name = data doc_file.seek(0) index_in_request = int(tmp_file_name.split(".")[0].split("_")[-1]) file_path = FileUtils.resize_and_save_file(tmp_file_name, new_request, doc_file, 100) FileUtils.save_to_S3(tmp_file_name, new_request, file_path) return { "index_in_request": index_in_request, "idx": idx, "file_name": tmp_file_name, "file_path": file_path, "file_type": doc_type } with tracer.start_as_current_span("process_file_with_multi_thread"): for result in pool.map(process_file, doc_files_with_type): compact_files[result["idx"]] = result # Send to AI queue c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) with tracer.start_as_current_span("backend_waiting_for_result"): time_limit = 120 start_time = time.time() while True: current_time = time.time() waiting_time = current_time - start_time if waiting_time > time_limit: break time.sleep(0.1) report_filter = SubscriptionRequest.objects.filter(request_id=rq_id) if report_filter.count() != 1: raise InvalidException(excArgs='requestId') if user_info.current_sub.id != report_filter[0].subscription.id: raise InvalidException(excArgs="user") if int(report_filter[0].process_type) == ProcessType.FI_INVOICE.value: data = report_filter[0].predict_result xml_as_string = "" if data and 'content' in data and 'combine_results' in data['content'] and 'xml' in data['content']['combine_results']: xml_as_string = data['content']['combine_results']['xml'] xml_as_string = xml_as_string.replace("\n", "").replace("\\", "") return HttpResponse(xml_as_string,content_type="text/xml") serializer: ReportSerializer = ReportSerializer(data=report_filter, many=True) serializer.is_valid() if report_filter[0].status == 400: raise FileContentInvalidException() if report_filter[0].status == 100: # continue, only return when result is fullfilled continue if len(serializer.data) == 0: continue if serializer.data[0].get("data", None) is None: continue if serializer.data[0]["data"].get("status", 200) != 200: continue serializer.data[0]["request_id"] = rq_id return Response(status=status.HTTP_200_OK, data=serializer.data[0]) raise ServiceTimeoutException(excArgs=f"{rq_id}") @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'request_id': { 'type': 'string', }, 'retailername': { 'type': 'string', }, 'invoice_no': { 'type': 'string', }, 'sold_to_party': { 'type': 'string', }, 'purchase_date': { 'type': 'array', 'items': { 'type': 'string', } }, 'imei_number': { 'type': 'array', 'items': { 'type': 'string', } }, }, 'required': ['request_id', 'retailername', 'invoice_no', 'sold_to_party', 'purchase_date', 'imei_number'] } }, responses=None, tags=['OCR']) @action(detail=False, url_path="images/feedback", methods=["POST"]) def feedback(self, request): validated_data = ProcessUtil.sbt_validate_feedback(request) rq_id = validated_data['request_id'] subcription_request = SubscriptionRequest.objects.filter(request_id=rq_id) if len(subcription_request) == 0: raise InvalidException(excArgs=f"{rq_id}") subcription_request = subcription_request[0] # Save to database subcription_request.feedback_result = validated_data subcription_request.save() file_name = f"feedback_{rq_id}.json" # Save to local file_path = FileUtils.save_json_file(file_name, subcription_request, validated_data) # Upload to S3 S3_path = FileUtils.save_to_S3(file_name, subcription_request, file_path) return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id}) @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'files': { 'type': 'array', 'items': { 'type': 'string', 'format': 'binary' } }, }, 'required': ['files'] } }, responses=None, tags=['OCR']) @action(detail=False, url_path="images/feedback_file", methods=["POST"]) def feedback_file(self, request): files = request.data.getlist('files') FileUtils.validate_csv_feedback(files) user_info = ProcessUtil.get_user(request) user = user_info.user sub = user_info.current_sub feedback_id = "FB_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex origin_name = "" file_names = "" for i, file in enumerate(files): origin_name += file.name + "," file_names += f"{feedback_id}_{i}.csv" origin_name = origin_name[:-1] new_request: FeedbackRequest = FeedbackRequest(feedback_id=feedback_id, origin_name=origin_name, file_name=file_names, subscription=sub) new_request.save() for i, file in enumerate(files): file_name = f"{feedback_id}_{i}.csv" # Save to local file_path = FileUtils.save_feedback_file(file_name, new_request, file) # Upload to S3 S3_path = FileUtils.save_feedback_to_S3(file_name, feedback_id, file_path) # validate FileUtils.validate_feedback_file(file_path) # Process csv file in the background ProcessUtil.process_feedback(feedback_id, file_path) return JsonResponse(status=status.HTTP_200_OK, data={"feedback_id": feedback_id}) @extend_schema(request=None, responses=None, tags=['Data']) @extend_schema(request=None, responses=None, tags=['templates'], methods=['GET']) @action(detail=False, url_path=r"media/(?P\w+)/(?P\w+)", methods=["GET"]) def get_file_v2(self, request, uq_id=None, folder_type=None): user_data = request.user_data content_type = "image/png" file_name: str = request.query_params.get('file_name', None) if folder_type is None: raise RequiredFieldException(excArgs=FOLDER_TYPE) if uq_id is None: raise RequiredFieldException(excArgs=REQUEST_ID) if folder_type == 'templates': temps: list = OcrTemplate.objects.filter(id=uq_id) if len(temps) != 1: raise NotFoundException(excArgs='file') temp: OcrTemplate = temps[0] user = temp.subscription.user content_type = 'application/pdf' if temp.file_name.split(".")[-1] in pdf_extensions else content_type if user.id != user_data['internal_id'] or user.status != EntityStatus.ACTIVE.value: raise PermissionDeniedException() logger.info(temp.file_path) return HttpResponse(FileWrapper(FileUtils.get_file(temp.file_path)), status=status.HTTP_200_OK, headers={'Content-Disposition': 'filename={fn}'.format(fn=temp.file_name)}, content_type=content_type) elif folder_type == 'requests': if file_name is None: raise RequiredFieldException(excArgs='file_name') try: rqs = SubscriptionRequest.objects.filter(request_id=uq_id) if len(rqs) != 1: raise NotFoundException(excArgs='file') rq = rqs[0] user = rq.subscription.user content_type = 'application/pdf' if file_name.split(".")[-1] in pdf_extensions else content_type if user.id != user_data['internal_id'] or user.status != EntityStatus.ACTIVE.value: raise PermissionDeniedException() file_data = SubscriptionRequestFile.objects.filter(request=rq, file_name=file_name)[0] except IndexError: raise NotFoundException(excArgs='file') return HttpResponse(FileWrapper(FileUtils.get_file(file_data.file_path)), status=status.HTTP_200_OK, headers={'Content-Disposition': 'filename={fn}'.format(fn=file_data.file_name)}, content_type=content_type) else: raise InvalidException(excArgs='type') @extend_schema(request=None, responses=None, tags=['Data']) @action(detail=False, url_path=r"v2/media/request/(?P\w+)", methods=["GET"]) def get_file_v3(self, request, media_id=None): user_info = ProcessUtil.get_user(request) sub = user_info.current_sub content_type = "image/png" if media_id is None: raise RequiredFieldException(excArgs=REQUEST_ID) try: media_list = SubscriptionRequestFile.objects.filter(code=media_id) if len(media_list) != 1: raise LockedEntityException(excArgs='media') media_data: SubscriptionRequestFile = media_list[0] if media_data.request.subscription.id != sub.id: raise PermissionDeniedException() file_name = media_data.file_name content_type = 'application/pdf' if file_name.split(".")[-1] in pdf_extensions else content_type except IndexError: raise NotFoundException(excArgs='file') return HttpResponse(FileWrapper(FileUtils.get_file(media_data.file_path)), status=status.HTTP_200_OK, headers={'Content-Disposition': 'filename={fn}'.format(fn=file_name)}, content_type=content_type) @extend_schema(request=None, responses=None, tags=['Data']) @throw_on_failure(InvalidException(excArgs='data')) @action(detail=False, url_path=r"result/(?P\w+)", methods=["GET"], renderer_classes=[JSONRenderer, XMLRenderer]) def get_result(self, request, request_id=None): user_info = ProcessUtil.get_user(request) if request_id is None: raise RequiredFieldException(excArgs='requestId') report_filter = SubscriptionRequest.objects.filter(request_id=request_id) if len(report_filter) != 1: raise InvalidException(excArgs='requestId') if user_info.current_sub.id != report_filter[0].subscription.id: raise InvalidException(excArgs="user") if int(report_filter[0].process_type) == ProcessType.FI_INVOICE.value: data = report_filter[0].predict_result xml_as_string = "" if data and 'content' in data and 'combine_results' in data['content'] and 'xml' in data['content']['combine_results']: xml_as_string = data['content']['combine_results']['xml'] xml_as_string = xml_as_string.replace("\n", "").replace("\\", "") # return Response(status=status.HTTP_200_OK, data=xml_as_string, content_type="application/xml; charset=utf-8") # return HttpResponse(xml_as_string,content_type="text/xml") return HttpResponse(xml_as_string,content_type="text/xml") serializer: ReportSerializer = ReportSerializer(data=report_filter, many=True) serializer.is_valid() if report_filter[0].status == 400: raise FileContentInvalidException() if report_filter[0].status == 100: # continue, only return when result is fullfilled empty_data = serializer.data[0] empty_data["data"] = None return Response(status=status.HTTP_200_OK, data=empty_data) data = serializer.data[0] data["status"] = "3" return Response(status=status.HTTP_200_OK, data=serializer.data[0]) @throw_on_failure(InvalidException(excArgs='data')) @action(detail=False, url_path=r"rsa/(?P\w+)", methods=["GET"]) def get_result2(self, request, request_id=None): user_info = ProcessUtil.get_user(request) if request_id is None: raise RequiredFieldException(excArgs='requestId') report_filter = SubscriptionRequest.objects.filter(request_id=request_id) if len(report_filter) != 1: raise InvalidException(excArgs='requestId') if user_info.current_sub.id != report_filter[0].subscription.id: raise InvalidException(excArgs="user") if int(report_filter[0].process_type) == ProcessType.FI_INVOICE.value: data = report_filter[0].predict_result xml_as_string = "" if data and 'content' in data and 'combine_results' in data['content'] and 'xml' in data['content']['combine_results']: xml_as_string = data['content']['combine_results']['xml'] xml_as_string = xml_as_string.replace("\n", "").replace("\\", "") # return Response(status=status.HTTP_200_OK, data=xml_as_string, content_type="application/xml; charset=utf-8") return HttpResponse(xml_as_string,content_type="text/xml") serializer: ReportSerializer = ReportSerializer(data=report_filter, many=True) serializer.is_valid() return Response(status=status.HTTP_200_OK, data=serializer.data[0])