468 lines
19 KiB
Python
468 lines
19 KiB
Python
import os
|
|
import uuid
|
|
import random
|
|
import string
|
|
import tempfile
|
|
import fitz
|
|
import PyPDF2
|
|
from django.core.files.uploadedfile import TemporaryUploadedFile
|
|
from django.db import transaction
|
|
from rest_framework import status
|
|
|
|
from fwd_api.utils.image import get_first_page_pdf
|
|
from fwd import settings
|
|
from fwd_api.utils.image import resize
|
|
from fwd_api.constant.common import LIST_BOX_MESSAGE, pattern, NAME_MESSAGE, allowed_p_type, TEMPLATE_ID, \
|
|
FolderFileType, FileCategory
|
|
from fwd_api.exception.exceptions import NumberOfBoxLimitReachedException, \
|
|
ServiceUnavailableException, DuplicateEntityException, LimitReachedException, BadGatewayException
|
|
from fwd_api.utils import date as DateUtil
|
|
from fwd_api.utils import file as FileUtils
|
|
from fwd_api.utils.subsidiary import map_subsidiary_long_to_short, map_subsidiary_short_to_long
|
|
from ..constant.common import ProcessType, TEMPLATE_BOX_TYPE, EntityStatus
|
|
from ..exception.exceptions import InvalidException, NotFoundException, \
|
|
PermissionDeniedException, RequiredFieldException, InvalidException, InvalidDecompressedSizeException
|
|
from ..models import UserProfile, OcrTemplate, OcrTemplateBox, \
|
|
Subscription, SubscriptionRequestFile, SubscriptionRequest
|
|
from ..celery_worker.client_connector import c_connector
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class UserData:
|
|
user: UserProfile = None
|
|
current_sub: Subscription = None
|
|
|
|
def __init__(self, request):
|
|
user_data = validate_user_request_and_get(request)
|
|
users = UserProfile.objects.filter(sync_id=user_data['id'])
|
|
subs = Subscription.objects.filter(id=user_data['subscription_id'])
|
|
|
|
subs_num = len(subs)
|
|
users_num = len(users)
|
|
|
|
if subs_num == 0:
|
|
raise NotFoundException(excArgs='subscription')
|
|
if users_num == 0:
|
|
raise NotFoundException(excArgs='user')
|
|
|
|
if subs_num > 1:
|
|
raise DuplicateEntityException(excArgs='subscription')
|
|
if users_num > 1:
|
|
raise DuplicateEntityException(excArgs='user')
|
|
|
|
user = users[0]
|
|
sub = subs[0]
|
|
if user.id != sub.user.id:
|
|
raise PermissionDeniedException()
|
|
if sub.status != EntityStatus.ACTIVE.value:
|
|
raise InvalidException(excArgs='Subscription status')
|
|
if sub.expired_at < DateUtil.get_date_time_now():
|
|
raise InvalidException(excArgs='Subscription')
|
|
if user.status != EntityStatus.ACTIVE.value:
|
|
raise InvalidException(excArgs='User status')
|
|
self.user = user
|
|
self.current_sub = sub
|
|
|
|
|
|
def get_user(request) -> UserData:
|
|
return UserData(request)
|
|
|
|
|
|
def validate_user_request_and_get(request):
|
|
if not hasattr(request, 'user_data'):
|
|
raise NotFoundException(excArgs='user')
|
|
data = request.user_data
|
|
if 'internal_id' not in data:
|
|
raise NotFoundException(excArgs='user')
|
|
if 'subscription_id' not in data:
|
|
raise NotFoundException(excArgs='subscription')
|
|
return data
|
|
|
|
|
|
def validate_ocr_request_and_get(request, subscription):
|
|
validated_data = {}
|
|
if "processType" not in request.data or request.data['processType'] is None \
|
|
or not request.data['processType'].isnumeric() or int(request.data['processType']) not in allowed_p_type:
|
|
raise InvalidException(excArgs='processType')
|
|
p_type: int = int(request.data['processType'])
|
|
validated_data['type'] = p_type
|
|
|
|
if subscription.current_token + token_value(p_type) >= subscription.limit_token:
|
|
raise LimitReachedException(excArgs=('Number of tokens', str(subscription.limit_token), 'times'))
|
|
|
|
if p_type == ProcessType.TEMPLATE_MATCHING.value:
|
|
if "templateId" not in request.data:
|
|
raise InvalidException(excArgs=TEMPLATE_ID)
|
|
temp_id = request.data['templateId']
|
|
temp = OcrTemplate.objects.filter(id=temp_id, subscription=subscription)
|
|
if len(temp) != 1:
|
|
raise InvalidException(excArgs=TEMPLATE_ID)
|
|
validated_data['template'] = temp
|
|
|
|
list_file = request.data.getlist('file')
|
|
FileUtils.validate_list_file(list_file)
|
|
validated_data['file'] = list_file[0]
|
|
|
|
# validated_data['is_test_request'] = bool(request.data.get('is_test_request', False))
|
|
validated_data['is_test_request'] = string_to_boolean(request.data.get('is_test_request', "false"))
|
|
|
|
return validated_data
|
|
|
|
def sbt_validate_ocr_request_and_get(request, subscription):
|
|
validated_data = {}
|
|
# if "processType" not in request.data or request.data['processType'] is None \
|
|
# or not request.data['processType'].isnumeric() or int(request.data['processType']) not in allowed_p_type:
|
|
# raise InvalidException(excArgs='processType')
|
|
# p_type: int = int(request.data['processType'])
|
|
p_type = 12
|
|
validated_data['type'] = p_type # hard fix to be of type SBT Invoice
|
|
|
|
if subscription.current_token + token_value(p_type) >= subscription.limit_token:
|
|
raise LimitReachedException(excArgs=('Number of tokens', str(subscription.limit_token), 'times'))
|
|
|
|
if p_type == ProcessType.TEMPLATE_MATCHING.value:
|
|
if "templateId" not in request.data:
|
|
raise InvalidException(excArgs=TEMPLATE_ID)
|
|
temp_id = request.data['templateId']
|
|
temp = OcrTemplate.objects.filter(id=temp_id, subscription=subscription)
|
|
if len(temp) != 1:
|
|
raise InvalidException(excArgs=TEMPLATE_ID)
|
|
validated_data['template'] = temp
|
|
|
|
imei_files = request.data.getlist('imei_files')
|
|
invoice_file = request.data.getlist('invoice_file')
|
|
redemption_ID = request.data.get('redemption_ID', None)
|
|
|
|
FileUtils.validate_list_file(imei_files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=0, file_field="imei_file")
|
|
FileUtils.validate_list_file(invoice_file, max_file_num=1, min_file_num=0, file_field="invoice_file")
|
|
|
|
validated_data['imei_file'] = imei_files
|
|
validated_data['invoice_file'] = invoice_file
|
|
validated_data['redemption_ID'] = redemption_ID
|
|
validated_data['is_test_request'] = string_to_boolean(request.data.get('is_test_request', "false"))
|
|
|
|
subsidiary = request.data.get("subsidiary", None)
|
|
valid_subs = list(settings.SUBS.keys())[:-2] # remove "ALL" and "SEAO"
|
|
# TODO: subsidiary will be a required field in the future
|
|
if not subsidiary:
|
|
validated_data['subsidiary'] = None
|
|
else:
|
|
if not subsidiary or subsidiary not in valid_subs:
|
|
raise InvalidException(excArgs="subsidiary")
|
|
validated_data['subsidiary'] = map_subsidiary_long_to_short(subsidiary)
|
|
|
|
return validated_data
|
|
|
|
def string_to_boolean(value):
|
|
true_strings = ['true', 'yes', '1', 'on']
|
|
false_strings = ['false', 'no', '0', 'off']
|
|
|
|
if isinstance(value, str):
|
|
lower_value = value.lower()
|
|
if lower_value in true_strings:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def sbt_validate_feedback(request):
|
|
validated_data = {}
|
|
|
|
request_id = request.data.get('request_id', None)
|
|
retailername = request.data.get("retailername", None)
|
|
sold_to_party = request.data.get("sold_to_party", None)
|
|
purchase_date = request.data.getlist("purchase_date", [])
|
|
imei_number = request.data.getlist("imei_number", [])
|
|
|
|
if not request_id:
|
|
raise RequiredFieldException(excArgs="request_id")
|
|
if not retailername:
|
|
raise RequiredFieldException(excArgs="retailername")
|
|
if not sold_to_party:
|
|
raise RequiredFieldException(excArgs="sold_to_party")
|
|
if len(purchase_date)==0:
|
|
raise RequiredFieldException(excArgs="purchase_date")
|
|
if len(imei_number)==0:
|
|
raise RequiredFieldException(excArgs="imei_number")
|
|
|
|
|
|
validated_data['request_id'] = request_id
|
|
validated_data['retailername'] = retailername
|
|
validated_data['sold_to_party'] = sold_to_party
|
|
validated_data['purchase_date'] = purchase_date
|
|
validated_data['imei_number'] = imei_number
|
|
|
|
return validated_data
|
|
|
|
def count_pages_in_pdf(pdf_file):
|
|
count = 0
|
|
fh, temp_filename = tempfile.mkstemp() # make a tmp file
|
|
f = os.fdopen(fh, 'wb+') # open the tmp file for writing
|
|
for chunk in pdf_file.chunks():
|
|
f.write(chunk)
|
|
read_pdf = PyPDF2.PdfFileReader(f, strict=False)
|
|
count = read_pdf.numPages
|
|
f.close()
|
|
os.remove(temp_filename)
|
|
return count
|
|
|
|
|
|
def count_pages_in_pdf_list(list_file):
|
|
total_page = 0
|
|
|
|
for file_obj in list_file:
|
|
total_page += count_pages_in_pdf(file_obj)
|
|
|
|
return total_page
|
|
|
|
|
|
def map_process_type_to_folder_name(p_type):
|
|
if p_type == ProcessType.ID_CARD.value:
|
|
return 'id_card'
|
|
elif p_type == ProcessType.DRIVER_LICENSE.value:
|
|
return 'driver_license'
|
|
elif p_type == ProcessType.INVOICE.value:
|
|
return 'invoice'
|
|
elif p_type == ProcessType.OCR_WITH_BOX.value:
|
|
return 'basic_ocr'
|
|
elif p_type == ProcessType.TEMPLATE_MATCHING.value:
|
|
return 'template_matching'
|
|
elif p_type == ProcessType.AP_INVOICE.value:
|
|
return 'ap_invoice'
|
|
elif p_type == ProcessType.FI_INVOICE.value:
|
|
return 'fi_invoice'
|
|
elif p_type == ProcessType.MANULIFE_INVOICE.value:
|
|
return 'manulife_invoice'
|
|
elif p_type == ProcessType.SBT_INVOICE.value:
|
|
return 'sbt_invoice'
|
|
else:
|
|
raise InvalidException(excArgs='processType')
|
|
|
|
|
|
def get_random_string(length):
|
|
# choose from all lowercase letter
|
|
letters = string.ascii_lowercase
|
|
result_str = ''.join(random.choice(letters) for _ in range(length))
|
|
logger.debug("Random string of length", length, "is:", result_str)
|
|
return result_str
|
|
|
|
|
|
def is_int(text) -> bool:
|
|
try:
|
|
# converting to integer
|
|
int(text)
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
def validate_box(list_box, max_number_of_box, max_number_of_item_in_a_box, number_of_box=None):
|
|
if len(list_box) > max_number_of_box:
|
|
raise NumberOfBoxLimitReachedException(excArgs=LIST_BOX_MESSAGE)
|
|
|
|
if number_of_box and len(list_box) != number_of_box:
|
|
raise InvalidException(excArgs=LIST_BOX_MESSAGE)
|
|
|
|
for box in list_box:
|
|
if len(box) != max_number_of_item_in_a_box:
|
|
raise InvalidException(excArgs="box coordinates")
|
|
|
|
|
|
def to_box_list(str_list):
|
|
ls = []
|
|
if not str_list:
|
|
raise InvalidException(excArgs=LIST_BOX_MESSAGE)
|
|
box_list = str_list.split(";")
|
|
for box_str in box_list:
|
|
if not box_str:
|
|
raise InvalidException(excArgs=LIST_BOX_MESSAGE)
|
|
ls.append(box_str.split(","))
|
|
return ls
|
|
|
|
|
|
def validate_json_response_and_return(res):
|
|
if res.status_code != status.HTTP_200_OK:
|
|
raise ServiceUnavailableException()
|
|
|
|
res_data = res.json()
|
|
if 'status' in res_data and res_data['status'] != 200:
|
|
raise ServiceUnavailableException()
|
|
return res_data
|
|
|
|
|
|
def is_duplicate_in_list(str_list):
|
|
unique_set: set = set({})
|
|
for label in str_list:
|
|
if label not in unique_set:
|
|
unique_set.add(label)
|
|
else:
|
|
return True
|
|
return False
|
|
|
|
|
|
def validate_duplicate(list_box):
|
|
if is_duplicate_in_list(list_box):
|
|
raise DuplicateEntityException(excArgs="box_label")
|
|
|
|
|
|
def validate_vn_and_space(txt: str):
|
|
if not pattern.fullmatch(txt.upper()):
|
|
raise InvalidException(excArgs=NAME_MESSAGE)
|
|
|
|
|
|
def save_template_boxs(data, template):
|
|
saving_list = []
|
|
for d_box in data['data_boxs']:
|
|
box = OcrTemplateBox(name=d_box['name'], template=template, coordinates=d_box['coordinates'],
|
|
type=TEMPLATE_BOX_TYPE.DATA.value)
|
|
saving_list.append(box)
|
|
for a_box in data['anchor_boxs']:
|
|
box = OcrTemplateBox(template=template, coordinates=','.join(a_box), type=TEMPLATE_BOX_TYPE.ANCHOR.value)
|
|
saving_list.append(box)
|
|
OcrTemplateBox.objects.bulk_create(saving_list)
|
|
|
|
|
|
def token_value(token_type):
|
|
if token_type == ProcessType.ID_CARD.value or token_type == ProcessType.DRIVER_LICENSE.value:
|
|
return 3
|
|
if token_type == ProcessType.TEMPLATE_MATCHING.value or token_type == ProcessType.INVOICE.value:
|
|
return 5
|
|
return 1 # Basic OCR
|
|
|
|
def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}):
|
|
try:
|
|
if typez == ProcessType.ID_CARD.value:
|
|
c_connector.process_id(
|
|
(rq_id, sub_id, map_process_type_to_folder_name(typez), file_url, user_id))
|
|
elif typez == ProcessType.INVOICE.value:
|
|
c_connector.process_invoice_sap((rq_id, file_url))
|
|
elif typez == ProcessType.FI_INVOICE.value:
|
|
c_connector.process_fi((rq_id, file_url))
|
|
elif typez == ProcessType.MANULIFE_INVOICE.value:
|
|
c_connector.process_invoice_manulife((rq_id, file_url))
|
|
elif typez == ProcessType.SBT_INVOICE.value:
|
|
c_connector.process_invoice_sbt((rq_id, file_url, metadata))
|
|
except Exception as e:
|
|
logger.error(e)
|
|
raise BadGatewayException()
|
|
|
|
def build_template_matching_data(template):
|
|
temp_dict = {
|
|
|
|
}
|
|
list_anchor = OcrTemplateBox.objects.filter(template=template, type=TEMPLATE_BOX_TYPE.ANCHOR.value)
|
|
la = []
|
|
for a_box in list_anchor:
|
|
cos = a_box.coordinates.split(",")
|
|
la.append(cos)
|
|
temp_dict['anchors'] = la
|
|
|
|
list_data = OcrTemplateBox.objects.filter(template=template, type=TEMPLATE_BOX_TYPE.DATA.value)
|
|
ld = []
|
|
for d_box in list_data:
|
|
cos = d_box.coordinates.split(",")
|
|
ld.append({
|
|
"box": cos,
|
|
"label": d_box.name
|
|
})
|
|
temp_dict['fields'] = ld
|
|
|
|
temp_dict['image_path'] = template.file_path[11:] # len of /app/media/
|
|
temp_dict['template_name'] = template.name
|
|
return temp_dict
|
|
|
|
|
|
def send_template_queue(rq_id, file_url, template: OcrTemplate, uid):
|
|
try:
|
|
|
|
template_data = build_template_matching_data(template)
|
|
folder_name = map_process_type_to_folder_name(ProcessType.TEMPLATE_MATCHING.value)
|
|
c_connector.process_template_matching(
|
|
(rq_id, template.subscription.id, folder_name, file_url, template_data, uid))
|
|
except Exception as e:
|
|
logger.error(e)
|
|
raise BadGatewayException()
|
|
|
|
def process_feedback(feedback_id, local_file_path):
|
|
c_connector.csv_feedback((local_file_path, feedback_id))
|
|
|
|
def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
|
doc: fitz.Document = fitz.open(stream=file_obj.file.read())
|
|
if doc.page_count > settings.MAX_PAGES_OF_PDF_FILE:
|
|
raise LimitReachedException(excArgs=('Number of pages', str(settings.MAX_PAGES_OF_PDF_FILE), 'pages'))
|
|
request.pages = doc.page_count
|
|
request.save()
|
|
# Origin file
|
|
file_obj.seek(0)
|
|
file_path = FileUtils.resize_and_save_file(file_name, request, file_obj, 100)
|
|
code = f'FIL{uuid.uuid4().hex}'
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
|
request=request,
|
|
file_name=file_name,
|
|
code=code,
|
|
doc_type=doc_type,
|
|
index_in_request=index_in_request)
|
|
new_request_file.save()
|
|
# Sub-file
|
|
return pdf_to_images_urls(doc, request, user)
|
|
|
|
def process_image_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
|
if file_obj.size > settings.SIZE_TO_COMPRESS:
|
|
quality = 95
|
|
else:
|
|
quality = 100
|
|
file_path = FileUtils.resize_and_save_file(file_name, request, file_obj, quality)
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
|
request=request,
|
|
file_name=file_name,
|
|
code=f'FIL{uuid.uuid4().hex}',
|
|
doc_type=doc_type,
|
|
index_in_request=index_in_request)
|
|
new_request_file.save()
|
|
return [{
|
|
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
|
|
'page_number': 0,
|
|
'request_file_id': new_request_file.code
|
|
}]
|
|
|
|
def process_image_local_file(file_name: str, file_path: str, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
|
|
request=request,
|
|
file_name=file_name,
|
|
code=f'FIL{uuid.uuid4().hex}',
|
|
doc_type=doc_type,
|
|
index_in_request=index_in_request)
|
|
new_request_file.save()
|
|
return [{
|
|
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
|
|
'page_number': 0,
|
|
'request_file_id': new_request_file.code
|
|
}]
|
|
|
|
def pdf_to_images_urls(doc_path, request: SubscriptionRequest, user, dpi: int = 300) -> list:
|
|
pdf_extracted = []
|
|
saving_path = FileUtils.get_folder_path(request)
|
|
break_file_name = f'{os.path.basename(doc_path.name)}_page_0.jpg'
|
|
saving_path = os.path.join(saving_path, break_file_name)
|
|
|
|
image = get_first_page_pdf(doc_path, 300)
|
|
image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1])
|
|
image.save(saving_path)
|
|
logger.debug(f"Saving {saving_path}")
|
|
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=saving_path,
|
|
request=request,
|
|
file_name=break_file_name,
|
|
file_category=FileCategory.BREAK.value,
|
|
code=f'FIL{uuid.uuid4().hex}')
|
|
new_request_file.save()
|
|
|
|
file_url = FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, break_file_name)
|
|
pdf_extracted.append(
|
|
{
|
|
'file_url': file_url,
|
|
'page_number': 0,
|
|
'request_file_id': new_request_file.code
|
|
}
|
|
)
|
|
return pdf_extracted
|