import time from django.db import transaction from rest_framework import status, viewsets from rest_framework.decorators import action from rest_framework.response import Response from fwd import settings from ..annotation.api import throw_on_failure from ..exception.exceptions import ServiceUnavailableException, \ LimitReachedException from ..models import OcrTemplateBox from ..models.OcrTemplate import OcrTemplate from ..request.CreateTemplateRequest import CreateTemplateRequest from ..constant.common import EntityStatus, TEMPLATE_ID from ..exception.exceptions import InvalidException from ..request.UpdateTemplateRequest import UpdateTemplateRequest from ..response.TemplateResponse import TemplateResponse from ..utils import FileUtils, ProcessUtil from ..utils.ProcessUtil import UserData from drf_spectacular.utils import extend_schema class CtelTemplateViewSet(viewsets.ViewSet): lookup_field = "username" base_url = settings.BASE_URL @extend_schema(request=None, responses=None, tags=['templates'], methods=['GET']) @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'files': { 'type': 'string', 'format': 'binary', }, 'template_id': { 'type': 'string', 'description': 'Required if updateTemplate' }, 'template_name': { 'type': 'string', }, 'data_box_names': { 'type': 'string', }, 'data_boxs': { 'type': 'string', }, 'anchor_boxs': { 'type': 'string', }, }, 'required': {'template_name', 'data_box_names', 'data_boxs', 'anchor_boxs', 'file'} } }, responses=None, tags=['templates'], methods=['POST']) @action(detail=False, methods=["POST", "GET"]) @throw_on_failure(InvalidException(excArgs='data')) def templates(self, request): user_data = ProcessUtil.get_user(request) if request.method == "POST": return self.upsert_template(request, user_data) else: return self.get_template(request, user_data) @extend_schema(request=None, responses=None, tags=['templates']) @action(detail=False, methods=["DELETE"], url_path=r"templates/(?P\d+)") @throw_on_failure(InvalidException(excArgs='data')) def delete_template(self, request, template_id=None): user_data: UserData = ProcessUtil.get_user(request) if not template_id: raise InvalidException(excArgs=TEMPLATE_ID) template = OcrTemplate.objects.filter(id=template_id, subscription=user_data.current_sub) if len(template) != 1: raise InvalidException(excArgs=TEMPLATE_ID) template = template[0] old_path = template.file_path deleted, _rows_count = template.delete() if not deleted: raise ServiceUnavailableException() # Delete file FileUtils.delete_file_with_path(old_path) return Response(status=status.HTTP_200_OK, data={ 'template_id': template_id }) def get_template(self, request, u_data: UserData): tem_id = request.query_params.get('template_id', None) if tem_id: tem_id: int = int(tem_id) template = OcrTemplate.objects.filter(id=tem_id, subscription=u_data.current_sub) if len(template) != 1: raise InvalidException(excArgs=TEMPLATE_ID) rs = TemplateResponse(data=template) rs.is_valid() return Response(status=status.HTTP_200_OK, data=rs.validated_data) else: template = OcrTemplate.objects.filter(subscription=u_data.current_sub) rs = TemplateResponse(template, many=True) return Response(status=status.HTTP_200_OK, data=rs.data) def upsert_template(self, request, user_data: UserData): if 'template_id' in request.data: return self.update_template(request, user_data) else: return self.insert_template(request, user_data) def insert_template(self, request, user_data: UserData): file_list = request.data.getlist('file') FileUtils.validate_list_file(file_list) sub = user_data.current_sub # Check request serializer = CreateTemplateRequest(data=request.data) serializer.is_valid() if not serializer.is_valid(): print(serializer.errors) raise InvalidException(excArgs=list(serializer.errors.keys())) data = serializer.validated_data # Check total templates = OcrTemplate.objects.filter(subscription=sub) if len(templates) >= settings.MAX_NUMBER_OF_TEMPLATE: raise LimitReachedException(excArgs=('Number of template', str(settings.MAX_NUMBER_OF_TEMPLATE), '')) # Create template: OcrTemplate = OcrTemplate(status=EntityStatus.ACTIVE.value, name=data['template_name'], subscription=sub) template.save() file_obj = file_list[0] file_name = str(round(time.time() * 1000)) + '_tem_' + file_obj.name file_path = FileUtils.save_template_file(file_name, template, file_obj, 100) template.file_path = file_path template.file_name = file_name template.save() ProcessUtil.save_template_boxs(data, template) return Response(status=status.HTTP_200_OK, data={ "id": template.id, }) def update_template(self, request, user_data: UserData): # Validate data = request.data file_list = data.getlist('file') file_obj = file_list[0] FileUtils.validate_list_file(file_list) serializer = UpdateTemplateRequest(data=data) serializer.is_valid() if not serializer.is_valid(): print(serializer.errors) raise InvalidException(excArgs=list(serializer.errors.keys())) data = serializer.validated_data template = OcrTemplate.objects.filter(id=data['template_id'], subscription=user_data.current_sub) if len(template) != 1: raise InvalidException(excArgs=TEMPLATE_ID) template = template[0] old_path = template.file_path template.name = data['template_name'] file_path = FileUtils.save_template_file(template.file_name, template, file_obj, 100) template.file_path = file_path template.save() # Delete Old OcrTemplateBox.objects.filter(template=template).delete() # FileUtils.delete_file_with_path(old_path) ProcessUtil.save_template_boxs(data, template) return Response(status=status.HTTP_200_OK, data={ 'template_id': template.id, })