Feature: feedback csv API

This commit is contained in:
dx-tan 2024-01-09 19:41:17 +07:00
parent ac65475356
commit cecee15cd6
13 changed files with 256 additions and 11 deletions

View File

@ -20,7 +20,7 @@ from ..annotation.api import throw_on_failure
from ..constant.common import ProcessType, REQUEST_ID, FOLDER_TYPE, EntityStatus, pdf_extensions, allowed_file_extensions, image_extensions, standard_ocr_list from ..constant.common import ProcessType, REQUEST_ID, FOLDER_TYPE, EntityStatus, pdf_extensions, allowed_file_extensions, image_extensions, standard_ocr_list
from ..exception.exceptions import RequiredFieldException, InvalidException, NotFoundException, \ from ..exception.exceptions import RequiredFieldException, InvalidException, NotFoundException, \
PermissionDeniedException, LockedEntityException, FileContentInvalidException, ServiceTimeoutException PermissionDeniedException, LockedEntityException, FileContentInvalidException, ServiceTimeoutException
from ..models import SubscriptionRequest, SubscriptionRequestFile, OcrTemplate from ..models import SubscriptionRequest, SubscriptionRequestFile, OcrTemplate, FeedbackRequest
from ..response.ReportSerializer import ReportSerializer from ..response.ReportSerializer import ReportSerializer
from ..utils import file as FileUtils from ..utils import file as FileUtils
from ..utils import process as ProcessUtil from ..utils import process as ProcessUtil
@ -348,6 +348,57 @@ class CtelViewSet(viewsets.ViewSet):
return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id}) return JsonResponse(status=status.HTTP_200_OK, data={"request_id": rq_id})
@extend_schema(request={
'multipart/form-data': {
'type': 'object',
'properties': {
'files': {
'type': 'array',
'items': {
'type': 'string',
'format': 'binary'
}
},
},
'required': ['files']
}
}, responses=None, tags=['OCR'])
@action(detail=False, url_path="images/feedback_file", methods=["POST"])
def feedback_file(self, request):
files = request.data.getlist('files')
FileUtils.validate_csv_feedback(files)
user_info = ProcessUtil.get_user(request)
user = user_info.user
sub = user_info.current_sub
feedback_id = "FB_" + datetime.now().strftime("%Y%m%d%H%M%S") + "_" + uuid.uuid4().hex
origin_name = ""
file_names = ""
for i, file in enumerate(files):
origin_name += file.name + ","
file_names += f"{feedback_id}_{i}.csv"
origin_name = origin_name[:-1]
new_request: FeedbackRequest = FeedbackRequest(feedback_id=feedback_id,
origin_name=origin_name,
file_name=file_names,
subscription=sub)
new_request.save()
for i, file in enumerate(files):
file_name = f"{feedback_id}_{i}.csv"
# Save to local
file_path = FileUtils.save_feedback_file(file_name, new_request, file)
FileUtils.validate_feedback_file(file_path)
# Upload to S3
S3_path = FileUtils.save_feedback_to_S3(file_name, feedback_id, file_path)
# Process csv file in the background
ProcessUtil.process_feedback(feedback_id, file_path)
return JsonResponse(status=status.HTTP_200_OK, data={"feedback_id": feedback_id})
@extend_schema(request=None, responses=None, tags=['Data']) @extend_schema(request=None, responses=None, tags=['Data'])
@extend_schema(request=None, responses=None, tags=['templates'], methods=['GET']) @extend_schema(request=None, responses=None, tags=['templates'], methods=['GET'])
@action(detail=False, url_path=r"media/(?P<folder_type>\w+)/(?P<uq_id>\w+)", methods=["GET"]) @action(detail=False, url_path=r"media/(?P<folder_type>\w+)/(?P<uq_id>\w+)", methods=["GET"])

View File

@ -30,8 +30,10 @@ class CeleryConnector:
'process_sbt_invoice': {'queue': "invoice_sbt"}, 'process_sbt_invoice': {'queue': "invoice_sbt"},
'do_pdf': {'queue': "do_pdf"}, 'do_pdf': {'queue': "do_pdf"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"}, 'upload_file_to_s3': {'queue': "upload_file_to_s3"},
'upload_feedback_to_s3': {'queue': "upload_feedback_to_s3"},
'upload_obj_to_s3': {'queue': "upload_obj_to_s3"}, 'upload_obj_to_s3': {'queue': "upload_obj_to_s3"},
'remove_local_file': {'queue': "remove_local_file"}, 'remove_local_file': {'queue': "remove_local_file"},
'csv_feedback': {'queue': "csv_feedback"},
} }
app = Celery( app = Celery(
@ -39,10 +41,14 @@ class CeleryConnector:
broker=settings.BROKER_URL, broker=settings.BROKER_URL,
broker_transport_options={'confirm_publish': False}, broker_transport_options={'confirm_publish': False},
) )
def csv_feedback(self, args):
return self.send_task('csv_feedback', args)
def do_pdf(self, args): def do_pdf(self, args):
return self.send_task('do_pdf', args) return self.send_task('do_pdf', args)
def upload_file_to_s3(self, args): def upload_file_to_s3(self, args):
return self.send_task('upload_file_to_s3', args) return self.send_task('upload_file_to_s3', args)
def upload_feedback_to_s3(self, args):
return self.send_task('upload_feedback_to_s3', args)
def upload_obj_to_s3(self, args): def upload_obj_to_s3(self, args):
return self.send_task('upload_obj_to_s3', args) return self.send_task('upload_obj_to_s3', args)
def remove_local_file(self, args): def remove_local_file(self, args):

View File

@ -9,12 +9,13 @@ from fwd_api.models import SubscriptionRequest, UserProfile
from fwd_api.celery_worker.worker import app from fwd_api.celery_worker.worker import app
from ..constant.common import FolderFileType, image_extensions from ..constant.common import FolderFileType, image_extensions
from ..exception.exceptions import FileContentInvalidException from ..exception.exceptions import FileContentInvalidException
from fwd_api.models import SubscriptionRequestFile from fwd_api.models import SubscriptionRequestFile, FeedbackRequest
from ..utils import file as FileUtils from ..utils import file as FileUtils
from ..utils import process as ProcessUtil from ..utils import process as ProcessUtil
from ..utils import s3 as S3Util from ..utils import s3 as S3Util
from fwd_api.constant.common import ProcessType from fwd_api.constant.common import ProcessType
import csv
import json
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from fwd import settings from fwd import settings
@ -59,6 +60,61 @@ def process_image_file(file_name: str, file_path, request, user) -> list:
'request_file_id': new_request_file.code 'request_file_id': new_request_file.code
}] }]
@app.task(name="csv_feedback")
def process_csv_feedback(csv_file_path, feedback_id):
# load file to RAM
status = {}
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
# for rq in rqs
for row in reader:
# get request_subcription
request_id = row.get('requestId')
sub_rqs = SubscriptionRequest.objects.filter(request_id=request_id)
if len(sub_rqs) != 1:
status[request_id] = f"Found {len(sub_rqs)} records of request id {request_id}"
continue
else:
sub_rq = sub_rqs[0]
fb = {}
# update user result (with validate)
redemption_id = row.get('redemptionNumber')
imei1 = row.get('imeiNumber')
imei2 = row.get('imeiNumber2')
purchase_date = row.get('Purchase Date')
retailer = row.get('retailer')
sold_to_party = row.get('Sold to party')
server_time = float(row.get('timetakenmilli'))
fb['request_id'] = request_id
fb['retailername'] = retailer
fb['sold_to_party'] = sold_to_party
fb['purchase_date'] = purchase_date
fb['imei_number'] = [imei1, imei2]
sub_rq.feedback_result = fb
sub_rq.client_request_time = server_time
# update redemption_id if exist
if len(redemption_id) > 0:
sub_rq.redemption_id = redemption_id
sub_rq.save()
# update log into database
feedback_rq = FeedbackRequest.objects.filter(feedback_id=feedback_id).first()
feedback_rq.error_status = status
# save log to local
directory_name = os.path.dirname(csv_file_path)
file_path = csv_file_path.replace(".csv", "_error.json")
with open(file_path, "w") as outfile:
json.dump(status, outfile)
# save to s3
s3_key = os.path.join("feedback", directory_name.split("/")[-1], file_path.split("/")[-1])
if s3_client.s3_client is not None:
try:
# check if saved then delete local
s3_client.upload_file(file_path, s3_key)
os.remove(file_path)
except Exception as e:
logger.error(f"Unable to set S3: {e}")
print(f"Unable to set S3: {e}")
feedback_rq.save()
@app.task(name='do_pdf') @app.task(name='do_pdf')
def process_pdf(rq_id, sub_id, p_type, user_id, files): def process_pdf(rq_id, sub_id, p_type, user_id, files):
@ -136,6 +192,21 @@ def upload_file_to_s3(local_file_path, s3_key, request_id):
else: else:
logger.info(f"S3 is not available, skipping,...") logger.info(f"S3 is not available, skipping,...")
@app.task(name='upload_feedback_to_s3')
def upload_feedback_to_s3(local_file_path, s3_key, feedback_id):
if s3_client.s3_client is not None:
try:
s3_client.upload_file(local_file_path, s3_key)
feed_request = FeedbackRequest.objects.filter(feedback_id=feedback_id)[0]
feed_request.S3_uploaded = True
feed_request.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
print(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(name='remove_local_file') @app.task(name='remove_local_file')
def remove_local_file(local_file_path, request_id): def remove_local_file(local_file_path, request_id):
print(f"[INFO] Removing local file: {local_file_path}, ...") print(f"[INFO] Removing local file: {local_file_path}, ...")

View File

@ -38,7 +38,7 @@ app.conf.update({
Queue('upload_file_to_s3'), Queue('upload_file_to_s3'),
Queue('upload_obj_to_s3'), Queue('upload_obj_to_s3'),
Queue('remove_local_file'), Queue('remove_local_file'),
Queue('csv_feedback'),
], ],
'task_routes': { 'task_routes': {
@ -52,9 +52,10 @@ app.conf.update({
'process_sbt_invoice': {'queue': "invoice_sbt"}, 'process_sbt_invoice': {'queue': "invoice_sbt"},
'do_pdf': {'queue': "do_pdf"}, 'do_pdf': {'queue': "do_pdf"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"}, 'upload_file_to_s3': {'queue': "upload_file_to_s3"},
'upload_obj_to_s3': {'queue': "upload_obj_to_s3"}, 'upload_feedback_to_s3': {'queue': "upload_feedback_to_s3"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"}, 'upload_obj_to_s3': {'queue': "upload_obj_to_s3"},
'remove_local_file': {'queue': "remove_local_file"}, 'remove_local_file': {'queue': "remove_local_file"},
'csv_feedback': {'queue': "csv_feedback"},
} }
}) })

View File

@ -67,6 +67,11 @@ class RequiredFieldException(GeneralException):
default_detail = 'Field required' default_detail = 'Field required'
detail_with_arg = '{} param is required' detail_with_arg = '{} param is required'
class RequiredColumnException(GeneralException):
status_code = status.HTTP_400_BAD_REQUEST
default_code = 4003
default_detail = 'Collumns required'
detail_with_arg = '{} collumns are required'
class DuplicateEntityException(GeneralException): class DuplicateEntityException(GeneralException):
status_code = status.HTTP_400_BAD_REQUEST status_code = status.HTTP_400_BAD_REQUEST

View File

@ -0,0 +1,29 @@
# Generated by Django 4.1.3 on 2024-01-09 10:08
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0164_subscriptionrequest_client_request_time_and_more'),
]
operations = [
migrations.CreateModel(
name='FeedbackRequest',
fields=[
('id', models.AutoField(primary_key=True, serialize=False)),
('feedback_id', models.CharField(max_length=200)),
('file_name', models.CharField(max_length=200)),
('origin_name', models.CharField(max_length=200)),
('error_status', models.JSONField(null=True)),
('created_at', models.DateTimeField(db_index=True, default=django.utils.timezone.now)),
('updated_at', models.DateTimeField(auto_now=True)),
('S3_uploaded', models.BooleanField(default=False)),
('subscription', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='fwd_api.subscription')),
],
),
]

View File

@ -0,0 +1,14 @@
from django.db import models
from django.utils import timezone
from fwd_api.models.Subscription import Subscription
class FeedbackRequest(models.Model):
id = models.AutoField(primary_key=True)
feedback_id = models.CharField(max_length=200) # Change to request_id
file_name = models.CharField(max_length=200) # Change to request_id
origin_name = models.CharField(max_length=200) # Change to request_id
error_status = models.JSONField(null=True)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
updated_at = models.DateTimeField(auto_now=True)
subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE)
S3_uploaded = models.BooleanField(default=False)

View File

@ -5,3 +5,5 @@ from .OcrTemplate import OcrTemplate
from .OcrTemplateBox import OcrTemplateBox from .OcrTemplateBox import OcrTemplateBox
from .PricingPlan import PricingPlan from .PricingPlan import PricingPlan
from .Subscription import Subscription from .Subscription import Subscription
from .FeedbackRequest import FeedbackRequest

View File

@ -10,13 +10,29 @@ from django.core.files.uploadedfile import TemporaryUploadedFile
from fwd import settings from fwd import settings
from fwd_api.constant.common import allowed_file_extensions from fwd_api.constant.common import allowed_file_extensions
from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \ from fwd_api.exception.exceptions import GeneralException, RequiredFieldException, InvalidException, \
ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException ServiceUnavailableException, FileFormatInvalidException, LimitReachedException, InvalidDecompressedSizeException, RequiredColumnException
from fwd_api.models import SubscriptionRequest, OcrTemplate from fwd_api.models import SubscriptionRequest, OcrTemplate, FeedbackRequest
from fwd_api.utils import process as ProcessUtil from fwd_api.utils import process as ProcessUtil
from fwd_api.utils.crypto import image_authenticator from fwd_api.utils.crypto import image_authenticator
from fwd_api.utils.image import resize from fwd_api.utils.image import resize
from ..celery_worker.client_connector import c_connector from ..celery_worker.client_connector import c_connector
import imagesize import imagesize
import csv
def validate_feedback_file(csv_file_path):
required_columns = ['redemptionNumber', 'requestId', 'imeiNumber', 'imeiNumber2', 'Purchase Date', 'retailer', 'Sold to party', 'timetakenmilli']
missing_columns = []
with open(csv_file_path, 'r') as file:
reader = csv.DictReader(file)
# Check if all required columns are present
for column in required_columns:
if column not in reader.fieldnames:
missing_columns.append(column)
if missing_columns:
raise RequiredColumnException(excArgs=str(missing_columns))
def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"): def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUEST, min_file_num=1, file_field="files"):
total_file_size = 0 total_file_size = 0
@ -39,6 +55,26 @@ def validate_list_file(files, max_file_num=settings.MAX_UPLOAD_FILES_IN_A_REQUES
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB')) raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def validate_csv_feedback(files, max_file_num=1, min_file_num=1, file_field="csv files"):
total_file_size = 0
if len(files) < min_file_num:
raise RequiredFieldException(excArgs=file_field)
if len(files) > max_file_num:
raise LimitReachedException(excArgs=(f'Number of {file_field}', str(max_file_num), ''))
for f in files:
if not isinstance(f, TemporaryUploadedFile):
# print(f'[DEBUG]: {f.name}')
raise InvalidException(excArgs="files")
extension = f.name.split(".")[-1].lower() in ["csv"]
if not extension or "." not in f.name:
raise FileFormatInvalidException(excArgs=[".csv"])
if f.size > settings.MAX_UPLOAD_SIZE_OF_A_FILE:
raise LimitReachedException(excArgs=('A file', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
total_file_size += f.size
if total_file_size > settings.MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST:
raise LimitReachedException(excArgs=('Total size of all files', str(settings.MAX_UPLOAD_SIZE_OF_A_FILE / 1024 / 1024), 'MB'))
def get_file(file_path: str): def get_file(file_path: str):
try: try:
return open(file_path, 'rb') return open(file_path, 'rb')
@ -105,6 +141,21 @@ def save_json_file(file_name: str, rq: SubscriptionRequest, data: dict):
json.dump(data, json_file) json.dump(data, json_file)
return file_path return file_path
def save_feedback_file(file_name: str, rq: FeedbackRequest, uploaded_file: dict):
user_id = str(rq.subscription.user.id)
feedback_id = str(rq.id)
folder_path = os.path.join(settings.MEDIA_ROOT, 'users', user_id, "feedbacks", feedback_id, 'requests', feedback_id)
os.makedirs(folder_path, exist_ok = True)
file_path = os.path.join(folder_path, file_name)
with uploaded_file.open() as file:
# Read the contents of the file
file_contents = file.read().decode('utf-8')
with open(file_path, 'w', newline='') as csvfile:
csvfile.write(file_contents)
return file_path
def delete_file_with_path(file_path: str) -> bool: def delete_file_with_path(file_path: str) -> bool:
try: try:
os.remove(file_path) os.remove(file_path)
@ -166,6 +217,17 @@ def save_to_S3(file_name, rq, local_file_path):
print(f"[ERROR]: {e}") print(f"[ERROR]: {e}")
raise ServiceUnavailableException() raise ServiceUnavailableException()
def save_feedback_to_S3(file_name, id, local_file_path):
try:
assert len(local_file_path.split("/")) >= 2, "file_path must have at least feedback_folder and feedback_id"
s3_key = os.path.join(local_file_path.split("/")[-2], local_file_path.split("/")[-1], file_name)
c_connector.upload_feedback_to_s3((local_file_path, s3_key, id))
c_connector.remove_local_file((local_file_path, id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")
raise ServiceUnavailableException()
def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path): def save_file_with_path(file_name: str, file: TemporaryUploadedFile, quality, folder_path):
try: try:
file_path = os.path.join(folder_path, file_name) file_path = os.path.join(folder_path, file_name)

View File

@ -306,7 +306,6 @@ def token_value(token_type):
return 5 return 5
return 1 # Basic OCR return 1 # Basic OCR
def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}): def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}):
try: try:
if typez == ProcessType.ID_CARD.value: if typez == ProcessType.ID_CARD.value:
@ -324,7 +323,6 @@ def send_to_queue2(rq_id, sub_id, file_url, user_id, typez, metadata={}):
print(e) print(e)
raise BadGatewayException() raise BadGatewayException()
def build_template_matching_data(template): def build_template_matching_data(template):
temp_dict = { temp_dict = {
@ -362,6 +360,8 @@ def send_template_queue(rq_id, file_url, template: OcrTemplate, uid):
print(e) print(e)
raise BadGatewayException() raise BadGatewayException()
def process_feedback(feedback_id, local_file_path):
c_connector.csv_feedback((local_file_path, feedback_id))
def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user) -> list: def process_pdf_file(file_name: str, file_obj: TemporaryUploadedFile, request: SubscriptionRequest, user) -> list:
doc: fitz.Document = fitz.open(stream=file_obj.file.read()) doc: fitz.Document = fitz.open(stream=file_obj.file.read())

View File

@ -1,4 +1,5 @@
Apache License Apache License
Version 2.0, January 2004 Version 2.0, January 2004
http://www.apache.org/licenses/ http://www.apache.org/licenses/

View File

@ -2,7 +2,7 @@
set -e set -e
tag=$1 tag=$1
is_prod=${$2:-False} # is_prod=${$2:-False}
echo "[INFO] Tag received from Python: $tag" echo "[INFO] Tag received from Python: $tag"

View File

@ -98,6 +98,9 @@ services:
- MINIO_SECRET_KEY=${S3_SECRET_KEY} - MINIO_SECRET_KEY=${S3_SECRET_KEY}
volumes: volumes:
- ./data/minio_data:/data - ./data/minio_data:/data
ports:
- 9884:9884
- 9885:9885
networks: networks:
- ctel-sbt - ctel-sbt
restart: always restart: always