import time from django.db import transaction from rest_framework import status, viewsets from rest_framework.decorators import action from rest_framework.response import Response from fwd import settings from ..annotation.api import throw_on_failure from ..exception.exceptions import ServiceUnavailableException, \ LimitReachedException from ..models import OcrTemplateBox from ..models.OcrTemplate import OcrTemplate from ..request.CreateTemplateRequest import CreateTemplateRequest from ..constant.common import EntityStatus, TEMPLATE_ID from ..exception.exceptions import InvalidException from ..request.UpdateTemplateRequest import UpdateTemplateRequest from ..response.TemplateResponse import TemplateResponse from ..utils import file as FileUtils from ..utils import process as ProcessUtil from ..utils.process import UserData from drf_spectacular.utils import extend_schema class CtelTemplateViewSet(viewsets.ViewSet): lookup_field = "username" base_url = settings.BASE_URL @extend_schema(request=None, responses=None, tags=['templates'], methods=['GET']) @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'files': { 'type': 'string', 'format': 'binary', }, 'template_id': { 'type': 'string', 'description': 'Required if updateTemplate' }, 'template_name': { 'type': 'string', }, 'data_box_names': { 'type': 'string', }, 'data_boxs': { 'type': 'string', }, 'anchor_boxs': { 'type': 'string', }, }, 'required': {'template_name', 'data_box_names', 'data_boxs', 'anchor_boxs', 'file'} } }, responses=None, tags=['templates'], methods=['POST']) @action(detail=False, methods=["POST", "GET"]) @throw_on_failure(InvalidException(excArgs='data')) def templates(self, request): user_data = ProcessUtil.get_user(request) if request.method == "POST": return self.upsert_template(request, user_data) else: return self.get_template(request, user_data) @extend_schema(request=None, responses=None, tags=['templates']) @action(detail=False, methods=["DELETE"], url_path=r"templates/(?P\d+)") @throw_on_failure(InvalidException(excArgs='data')) def delete_template(self, request, template_id=None): user_data: UserData = ProcessUtil.get_user(request) if not template_id: raise InvalidException(excArgs=TEMPLATE_ID) template = OcrTemplate.objects.filter(id=template_id, subscription=user_data.current_sub) if len(template) != 1: raise InvalidException(excArgs=TEMPLATE_ID) template = template[0] old_path = template.file_path deleted, _rows_count = template.delete() if not deleted: raise ServiceUnavailableException() # Delete file FileUtils.delete_file_with_path(old_path) return Response(status=status.HTTP_200_OK, data={ 'template_id': template_id }) def get_template(self, request, u_data: UserData): tem_id = request.query_params.get('template_id', None) if tem_id: tem_id: int = int(tem_id) template = OcrTemplate.objects.filter(id=tem_id, subscription=u_data.current_sub) if len(template) != 1: raise InvalidException(excArgs=TEMPLATE_ID) rs = TemplateResponse(data=template) rs.is_valid() return Response(status=status.HTTP_200_OK, data=rs.validated_data) else: template = OcrTemplate.objects.filter(subscription=u_data.current_sub) rs = TemplateResponse(template, many=True) return Response(status=status.HTTP_200_OK, data=rs.data) def upsert_template(self, request, user_data: UserData): if 'template_id' in request.data: return self.update_template(request, user_data) else: return self.insert_template(request, user_data) def insert_template(self, request, user_data: UserData): file_list = request.data.getlist('file') FileUtils.validate_list_file(file_list) sub = user_data.current_sub # Check request serializer = CreateTemplateRequest(data=request.data) serializer.is_valid() if not serializer.is_valid(): print(serializer.errors) raise InvalidException(excArgs=list(serializer.errors.keys())) data = serializer.validated_data # Check total templates = OcrTemplate.objects.filter(subscription=sub) if len(templates) >= settings.MAX_NUMBER_OF_TEMPLATE: raise LimitReachedException(excArgs=('Number of template', str(settings.MAX_NUMBER_OF_TEMPLATE), '')) # Create template: OcrTemplate = OcrTemplate(status=EntityStatus.ACTIVE.value, name=data['template_name'], subscription=sub) template.save() file_obj = file_list[0] file_name = str(round(time.time() * 1000)) + '_tem_' + file_obj.name file_path = FileUtils.save_template_file(file_name, template, file_obj, 100) template.file_path = file_path template.file_name = file_name template.save() ProcessUtil.save_template_boxs(data, template) return Response(status=status.HTTP_200_OK, data={ "id": template.id, }) def update_template(self, request, user_data: UserData): # Validate data = request.data file_list = data.getlist('file') file_obj = file_list[0] FileUtils.validate_list_file(file_list) serializer = UpdateTemplateRequest(data=data) serializer.is_valid() if not serializer.is_valid(): print(serializer.errors) raise InvalidException(excArgs=list(serializer.errors.keys())) data = serializer.validated_data template = OcrTemplate.objects.filter(id=data['template_id'], subscription=user_data.current_sub) if len(template) != 1: raise InvalidException(excArgs=TEMPLATE_ID) template = template[0] old_path = template.file_path template.name = data['template_name'] file_path = FileUtils.save_template_file(template.file_name, template, file_obj, 100) template.file_path = file_path template.save() # Delete Old OcrTemplateBox.objects.filter(template=template).delete() # FileUtils.delete_file_with_path(old_path) ProcessUtil.save_template_boxs(data, template) return Response(status=status.HTTP_200_OK, data={ 'template_id': template.id, })