sbt-idp/cope2n-api/fwd_api/utils/process.py
2024-06-26 14:58:24 +07:00

468 lines
19 KiB
Python

import os
import uuid
import random
import string
import tempfile
import fitz
import PyPDF2
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.db import transaction
from rest_framework import status
from fwd_api.utils.image import get_first_page_pdf
from fwd import settings
from fwd_api.utils.image import resize
from fwd_api.constant.common import LIST_BOX_MESSAGE, pattern, NAME_MESSAGE, allowed_p_type, TEMPLATE_ID, \
FolderFileType, FileCategory
from fwd_api.exception.exceptions import NumberOfBoxLimitReachedException, \
ServiceUnavailableException, DuplicateEntityException, LimitReachedException, BadGatewayException
from fwd_api.utils import date as DateUtil
from fwd_api.utils import file as FileUtils
from fwd_api.utils.subsidiary import map_subsidiary_long_to_short, map_subsidiary_short_to_long
from ..constant.common import ProcessType, TEMPLATE_BOX_TYPE, EntityStatus
from ..exception.exceptions import InvalidException, NotFoundException, \
PermissionDeniedException, RequiredFieldException, InvalidException, InvalidDecompressedSizeException
from ..models import UserProfile, OcrTemplate, OcrTemplateBox, \
Subscription, SubscriptionRequestFile, SubscriptionRequest
from ..celery_worker.client_connector import c_connector
import logging
logger = logging.getLogger(__name__)
class UserData:
user: UserProfile = None
current_sub: Subscription = None
def __init__(self, request):
user_data = validate_user_request_and_get(request)
users = UserProfile.objects.filter(sync_id=user_data['id'])
subs = Subscription.objects.filter(id=user_data['subscription_id'])
subs_num = len(subs)
users_num = len(users)
if subs_num == 0:
raise NotFoundException(excArgs='subscription')
if users_num == 0:
raise NotFoundException(excArgs='user')
if subs_num > 1:
raise DuplicateEntityException(excArgs='subscription')
if users_num > 1:
raise DuplicateEntityException(excArgs='user')
user = users[0]
sub = subs[0]
if user.id != sub.user.id:
raise PermissionDeniedException()
if sub.status != EntityStatus.ACTIVE.value:
raise InvalidException(excArgs='Subscription status')
if sub.expired_at < DateUtil.get_date_time_now():
raise InvalidException(excArgs='Subscription')
if user.status != EntityStatus.ACTIVE.value:
raise InvalidException(excArgs='User status')
self.user = user
self.current_sub = sub
def get_user(request) -> UserData:
return UserData(request)
def validate_user_request_and_get(request):
if not hasattr(request, 'user_data'):
raise NotFoundException(excArgs='user')
data = request.user_data
if 'internal_id' not in data:
raise NotFoundException(excArgs='user')
if 'subscription_id' not in data:
raise NotFoundException(excArgs='subscription')
return data
def validate_ocr_request_and_get(request, subscription):
validated_data = {}
if "processType" not in request.data or request.data['processType'] is None \
or not request.data['processType'].isnumeric() or int(request.data['processType']) not in allowed_p_type:
raise InvalidException(excArgs='processType')
p_type: int = int(request.data['processType'])
validated_data['type'] = p_type
if subscription.current_token + token_value(p_type) >= subscription.limit_token:
raise LimitReachedException(excArgs=('Number of tokens', str(subscription.limit_token), 'times'))
if p_type == ProcessType.TEMPLATE_MATCHING.value:
if "templateId" not in request.data:
raise InvalidException(excArgs=TEMPLATE_ID)
temp_id = request.data['templateId']
temp = OcrTemplate.objects.filter(id=temp_id, subscription=subscription)
if len(temp) != 1:
raise InvalidException(excArgs=TEMPLATE_ID)
validated_data['template'] = temp
list_file = request.data.getlist('file')
FileUtils.validate_list_file(list_file)
validated_data['file'] = list_file[0]
# validated_data['is_test_request'] = bool(request.data.get('is_test_request', False))
validated_data['is_test_request'] = string_to_boolean(request.data.get('is_test_request', "false"))
return validated_data
def sbt_validate_ocr_request_and_get(request, subscription):
validated_data = {}
# if "processType" not in request.data or request.data['processType'] is None \
# or not request.data['processType'].isnumeric() or int(request.data['processType']) not in allowed_p_type:
# raise InvalidException(excArgs='processType')
# p_type: int = int(request.data['processType'])
p_type = 12
validated_data['type'] = p_type # hard fix to be of type SBT Invoice
if subscription.current_token + token_value(p_type) >= subscription.limit_token:
raise LimitReachedException(excArgs=('Number of tokens', str(subscription.limit_token), 'times'))
if p_type == ProcessType.TEMPLATE_MATCHING.value:
if "templateId" not in request.data:
raise InvalidException(excArgs=TEMPLATE_ID)
temp_id = request.data['templateId']
temp = OcrTemplate.objects.filter(id=temp_id, subscription=subscription)
if len(temp) != 1:
raise InvalidException(excArgs=TEMPLATE_ID)
validated_data['template'] = temp
imei_files = request.data.getlist('imei_files')
invoice_file = request.data.getlist('invoice_file')
redemption_ID = request.data.get('redemption_ID', None)
FileUtils.validate_list_file(imei_files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=0, file_field="imei_file")
FileUtils.validate_list_file(invoice_file, max_file_num=1, min_file_num=0, file_field="invoice_file")
validated_data['imei_file'] = imei_files
validated_data['invoice_file'] = invoice_file
validated_data['redemption_ID'] = redemption_ID
validated_data['is_test_request'] = string_to_boolean(request.data.get('is_test_request', "false"))
subsidiary = request.data.get("subsidiary", None)
valid_subs = list(settings.SUBS.keys())[:-2] # remove "ALL" and "SEAO"
# TODO: subsidiary will be a required field in the future
if not subsidiary:
validated_data['subsidiary'] = None
else:
if not subsidiary or subsidiary not in valid_subs:
raise InvalidException(excArgs="subsidiary")
validated_data['subsidiary'] = map_subsidiary_long_to_short(subsidiary)
return validated_data
def string_to_boolean(value):
true_strings = ['true', 'yes', '1', 'on']
false_strings = ['false', 'no', '0', 'off']
if isinstance(value, str):
lower_value = value.lower()
if lower_value in true_strings:
return True
else:
return False
def sbt_validate_feedback(request):
validated_data = {}
request_id = request.data.get('request_id', None)
retailername = request.data.get("retailername", None)
sold_to_party = request.data.get("sold_to_party", None)
purchase_date = request.data.getlist("purchase_date", [])
imei_number = request.data.getlist("imei_number", [])
if not request_id:
raise RequiredFieldException(excArgs="request_id")
if not retailername:
raise RequiredFieldException(excArgs="retailername")
if not sold_to_party:
raise RequiredFieldException(excArgs="sold_to_party")
if len(purchase_date)==0:
raise RequiredFieldException(excArgs="purchase_date")
if len(imei_number)==0:
raise RequiredFieldException(excArgs="imei_number")
validated_data['request_id'] = request_id
validated_data['retailername'] = retailername
validated_data['sold_to_party'] = sold_to_party
validated_data['purchase_date'] = purchase_date
validated_data['imei_number'] = imei_number
return validated_data
def count_pages_in_pdf(pdf_file):
count = 0
fh, temp_filename = tempfile.mkstemp() # make a tmp file
f = os.fdopen(fh, 'wb+') # open the tmp file for writing
for chunk in pdf_file.chunks():
f.write(chunk)
read_pdf = PyPDF2.PdfFileReader(f, strict=False)
count = read_pdf.numPages
f.close()
os.remove(temp_filename)
return count
def count_pages_in_pdf_list(list_file):
total_page = 0
for file_obj in list_file:
total_page += count_pages_in_pdf(file_obj)
return total_page
def map_process_type_to_folder_name(p_type):
if p_type == ProcessType.ID_CARD.value:
return 'id_card'
elif p_type == ProcessType.DRIVER_LICENSE.value:
return 'driver_license'
elif p_type == ProcessType.INVOICE.value:
return 'invoice'
elif p_type == ProcessType.OCR_WITH_BOX.value:
return 'basic_ocr'
elif p_type == ProcessType.TEMPLATE_MATCHING.value:
return 'template_matching'
elif p_type == ProcessType.AP_INVOICE.value:
return 'ap_invoice'
elif p_type == ProcessType.FI_INVOICE.value:
return 'fi_invoice'
elif p_type == ProcessType.MANULIFE_INVOICE.value:
return 'manulife_invoice'
elif p_type == ProcessType.SBT_INVOICE.value:
return 'sbt_invoice'
else:
raise InvalidException(excArgs='processType')
def get_random_string(length):
# choose from all lowercase letter
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for _ in range(length))
logger.debug("Random string of length", length, "is:", result_str)
return result_str
def is_int(text) -> bool:
try:
# converting to integer
int(text)
return True
except ValueError:
return False
def validate_box(list_box, max_number_of_box, max_number_of_item_in_a_box, number_of_box=None):
if len(list_box) > max_number_of_box:
raise NumberOfBoxLimitReachedException(excArgs=LIST_BOX_MESSAGE)
if number_of_box and len(list_box) != number_of_box:
raise InvalidException(excArgs=LIST_BOX_MESSAGE)
for box in list_box:
if len(box) != max_number_of_item_in_a_box:
raise InvalidException(excArgs="box coordinates")
def to_box_list(str_list):
ls = []
if not str_list:
raise InvalidException(excArgs=LIST_BOX_MESSAGE)
box_list = str_list.split(";")
for box_str in box_list:
if not box_str:
raise InvalidException(excArgs=LIST_BOX_MESSAGE)
ls.append(box_str.split(","))
return ls
def validate_json_response_and_return(res):
if res.status_code != status.HTTP_200_OK:
raise ServiceUnavailableException()
res_data = res.json()
if 'status' in res_data and res_data['status'] != 200:
raise ServiceUnavailableException()
return res_data
def is_duplicate_in_list(str_list):
unique_set: set = set({})
for label in str_list:
if label not in unique_set:
unique_set.add(label)
else:
return True
return False
def validate_duplicate(list_box):
if is_duplicate_in_list(list_box):
raise DuplicateEntityException(excArgs="box_label")
def validate_vn_and_space(txt: str):
if not pattern.fullmatch(txt.upper()):
raise InvalidException(excArgs=NAME_MESSAGE)
def save_template_boxs(data, template):
saving_list = []
for d_box in data['data_boxs']:
box = OcrTemplateBox(name=d_box['name'], template=template, coordinates=d_box['coordinates'],
type=TEMPLATE_BOX_TYPE.DATA.value)
saving_list.append(box)
for a_box in data['anchor_boxs']:
box = OcrTemplateBox(template=template, coordinates=','.join(a_box), type=TEMPLATE_BOX_TYPE.ANCHOR.value)
saving_list.append(box)
OcrTemplateBox.objects.bulk_create(saving_list)
def token_value(token_type):
if token_type == ProcessType.ID_CARD.value or token_type == ProcessType.DRIVER_LICENSE.value:
return 3
if token_type == ProcessType.TEMPLATE_MATCHING.value or token_type == ProcessType.INVOICE.value:
return 5
return 1 # Basic OCR
def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}):
try:
if typez == ProcessType.ID_CARD.value:
c_connector.process_id(
(rq_id, sub_id, map_process_type_to_folder_name(typez), file_url, user_id))
elif typez == ProcessType.INVOICE.value:
c_connector.process_invoice_sap((rq_id, file_url))
elif typez == ProcessType.FI_INVOICE.value:
c_connector.process_fi((rq_id, file_url))
elif typez == ProcessType.MANULIFE_INVOICE.value:
c_connector.process_invoice_manulife((rq_id, file_url))
elif typez == ProcessType.SBT_INVOICE.value:
c_connector.process_invoice_sbt((rq_id, file_url, metadata))
except Exception as e:
logger.error(e)
raise BadGatewayException()
def build_template_matching_data(template):
temp_dict = {
}
list_anchor = OcrTemplateBox.objects.filter(template=template, type=TEMPLATE_BOX_TYPE.ANCHOR.value)
la = []
for a_box in list_anchor:
cos = a_box.coordinates.split(",")
la.append(cos)
temp_dict['anchors'] = la
list_data = OcrTemplateBox.objects.filter(template=template, type=TEMPLATE_BOX_TYPE.DATA.value)
ld = []
for d_box in list_data:
cos = d_box.coordinates.split(",")
ld.append({
"box": cos,
"label": d_box.name
})
temp_dict['fields'] = ld
temp_dict['image_path'] = template.file_path[11:] # len of /app/media/
temp_dict['template_name'] = template.name
return temp_dict
def send_template_queue(rq_id, file_url, template: OcrTemplate, uid):
try:
template_data = build_template_matching_data(template)
folder_name = map_process_type_to_folder_name(ProcessType.TEMPLATE_MATCHING.value)
c_connector.process_template_matching(
(rq_id, template.subscription.id, folder_name, file_url, template_data, uid))
except Exception as e:
logger.error(e)
raise BadGatewayException()
def process_feedback(feedback_id, local_file_path):
c_connector.csv_feedback((local_file_path, feedback_id))
def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
doc: fitz.Document = fitz.open(stream=file_obj.file.read())
if doc.page_count > settings.MAX_PAGES_OF_PDF_FILE:
raise LimitReachedException(excArgs=('Number of pages', str(settings.MAX_PAGES_OF_PDF_FILE), 'pages'))
request.pages = doc.page_count
request.save()
# Origin file
file_obj.seek(0)
file_path = FileUtils.resize_and_save_file(file_name, request, file_obj, 100)
code = f'FIL{uuid.uuid4().hex}'
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=code,
doc_type=doc_type,
index_in_request=index_in_request)
new_request_file.save()
# Sub-file
return pdf_to_images_urls(doc, request, user)
def process_image_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
if file_obj.size > settings.SIZE_TO_COMPRESS:
quality = 95
else:
quality = 100
file_path = FileUtils.resize_and_save_file(file_name, request, file_obj, quality)
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}',
doc_type=doc_type,
index_in_request=index_in_request)
new_request_file.save()
return [{
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
'page_number': 0,
'request_file_id': new_request_file.code
}]
def process_image_local_file(file_name: str, file_path: str, request: SubscriptionRequest, user, doc_type: str, index_in_request: int) -> list:
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path,
request=request,
file_name=file_name,
code=f'FIL{uuid.uuid4().hex}',
doc_type=doc_type,
index_in_request=index_in_request)
new_request_file.save()
return [{
'file_url': FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, file_name),
'page_number': 0,
'request_file_id': new_request_file.code
}]
def pdf_to_images_urls(doc_path, request: SubscriptionRequest, user, dpi: int = 300) -> list:
pdf_extracted = []
saving_path = FileUtils.get_folder_path(request)
break_file_name = f'{os.path.basename(doc_path.name)}_page_0.jpg'
saving_path = os.path.join(saving_path, break_file_name)
image = get_first_page_pdf(doc_path, 300)
image = resize(image, max_w=settings.TARGET_MAX_IMAGE_SIZE[0], max_h=settings.TARGET_MAX_IMAGE_SIZE[1])
image.save(saving_path)
logger.debug(f"Saving {saving_path}")
new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=saving_path,
request=request,
file_name=break_file_name,
file_category=FileCategory.BREAK.value,
code=f'FIL{uuid.uuid4().hex}')
new_request_file.save()
file_url = FileUtils.build_url(FolderFileType.REQUESTS.value, request.request_id, user.id, break_file_name)
pdf_extracted.append(
{
'file_url': file_url,
'page_number': 0,
'request_file_id': new_request_file.code
}
)
return pdf_extracted