129 lines
6.6 KiB
Python
129 lines
6.6 KiB
Python
|
# myapp/management/commands/mycustomcommand.py
|
||
|
from io import StringIO
|
||
|
from django.core.management.base import BaseCommand
|
||
|
from tqdm import tqdm
|
||
|
from fwd_api.models import SubscriptionRequestFile, SubscriptionRequest
|
||
|
from fwd_api.utils.accuracy import predict_result_to_ready
|
||
|
import traceback
|
||
|
import copy
|
||
|
import csv
|
||
|
from fwd_api.constant.common import FileCategory
|
||
|
import re
|
||
|
|
||
|
PREDICT_INDEX = 4
|
||
|
FEEDBACK_INDEX = 3
|
||
|
REVIEWED_INDEX = 5
|
||
|
REASON_INDEX = 6
|
||
|
COUNTER_INDEX = 9
|
||
|
|
||
|
def detect_date_format(date_string):
|
||
|
pattern = r'^\d{2}/\d{2}/\d{4}$'
|
||
|
match = re.match(pattern, date_string)
|
||
|
if match:
|
||
|
return True
|
||
|
else:
|
||
|
return False
|
||
|
|
||
|
class Command(BaseCommand):
|
||
|
help = 'Refactor database for image level'
|
||
|
|
||
|
def add_arguments(self, parser):
|
||
|
# Add your command-line arguments here
|
||
|
parser.add_argument('test', type=str, help='Value for the argument')
|
||
|
|
||
|
def process_request(self, request_id):
|
||
|
request = SubscriptionRequest.objects.filter(request_id=request_id).first()
|
||
|
images = SubscriptionRequestFile.objects.filter(request=request, file_category=FileCategory.Origin.value).order_by('index_in_request')
|
||
|
request_review = {"imei_number": [], "retailername": None, "purchase_date": None, "sold_to_party": None}
|
||
|
for image in images:
|
||
|
if image.doc_type == "imei":
|
||
|
request_review["imei_number"] += image.reviewed_result.get("imei_number", [])
|
||
|
elif image.doc_type == "invoice":
|
||
|
request_review["retailername"] = image.reviewed_result.get("retailername", None)
|
||
|
request_review["purchase_date"] = image.reviewed_result.get("purchase_date", None)
|
||
|
request_review["sold_to_party"] = image.reviewed_result.get("sold_to_party", None)
|
||
|
|
||
|
request.reviewed_result = request_review
|
||
|
request.save()
|
||
|
|
||
|
def process_requestfile(self, request_list, traversed_requestfiles, request_filename, predict_result, user_feedback, reviewed_result, reason=None, counter=None):
|
||
|
image = SubscriptionRequestFile.objects.filter(file_name=request_filename).first()
|
||
|
try:
|
||
|
if image.doc_type == "imei":
|
||
|
if request_filename not in traversed_requestfiles:
|
||
|
image.reviewed_result = {"imei_number": [reviewed_result], "retailername": None, "purchase_date": None, "sold_to_party": None}
|
||
|
else:
|
||
|
image.reviewed_result["imei_number"].append(reviewed_result)
|
||
|
if request_filename not in traversed_requestfiles:
|
||
|
image.feedback_result = {"imei_number": [user_feedback], "retailername": None, "purchase_date": None, "sold_to_party": None}
|
||
|
else:
|
||
|
image.feedback_result["imei_number"].append(user_feedback)
|
||
|
if request_filename not in traversed_requestfiles:
|
||
|
image.predict_result = {"imei_number": [predict_result], "retailername": None, "purchase_date": [], "sold_to_party": None}
|
||
|
else:
|
||
|
image.predict_result["imei_number"].append(predict_result)
|
||
|
|
||
|
elif image.doc_type == "invoice":
|
||
|
if detect_date_format(reviewed_result):
|
||
|
if not image.reviewed_result:
|
||
|
image.reviewed_result = {"imei_number": [], "retailername": None, "purchase_date": reviewed_result, "sold_to_party": None}
|
||
|
else:
|
||
|
image.reviewed_result["purchase_date"] = reviewed_result
|
||
|
if not image.feedback_result:
|
||
|
image.feedback_result = {"imei_number": [], "retailername": None, "purchase_date": user_feedback, "sold_to_party": None}
|
||
|
else:
|
||
|
image.feedback_result["purchase_date"] = user_feedback
|
||
|
if not image.predict_result:
|
||
|
image.predict_result = {"imei_number": [], "retailername": None, "purchase_date": [predict_result], "sold_to_party": None}
|
||
|
else:
|
||
|
image.predict_result["purchase_date"] = [predict_result]
|
||
|
else:
|
||
|
if not image.reviewed_result:
|
||
|
image.reviewed_result = {"imei_number": [], "retailername": reviewed_result, "purchase_date": None, "sold_to_party": None}
|
||
|
else:
|
||
|
image.reviewed_result["retailername"] = reviewed_result
|
||
|
if not image.feedback_result:
|
||
|
image.feedback_result = {"imei_number": [], "retailername": user_feedback, "purchase_date": None, "sold_to_party": None}
|
||
|
else:
|
||
|
image.feedback_result["retailername"] = user_feedback
|
||
|
if not image.predict_result:
|
||
|
image.predict_result = {"imei_number": [], "retailername": predict_result, "purchase_date": [], "sold_to_party": None}
|
||
|
else:
|
||
|
image.predict_result["retailername"] = predict_result
|
||
|
|
||
|
if reason:
|
||
|
image.reason = reason
|
||
|
if counter:
|
||
|
image.counter_measures = counter
|
||
|
image.save()
|
||
|
request_list.append(image.request.request_id)
|
||
|
traversed_requestfiles.append(request_filename)
|
||
|
if request_filename == "temp_imei_SAP_20240201130151_7e7fa87017af40c1bd079b7da6950193_0.pdf":
|
||
|
print(f"[INFO]: {image.reviewed_result}")
|
||
|
print(f"[INFO]: {image.predict_result}")
|
||
|
print(f"[INFO]: {image.feedback_result}")
|
||
|
|
||
|
except Exception as e:
|
||
|
self.stdout.write(self.style.ERROR(f"Request File: {request_filename} failed with {e}"))
|
||
|
print(traceback.format_exc())
|
||
|
|
||
|
def handle(self, *args, **options):
|
||
|
test = options['test']
|
||
|
request_list = []
|
||
|
traversed_requestfiles = []
|
||
|
#open csv file
|
||
|
with open(test, 'r') as csvfile:
|
||
|
reader = csv.reader(csvfile)
|
||
|
index = 0
|
||
|
for row in reader:
|
||
|
if index != 0:
|
||
|
# request_list, traversed_requestfiles, request_filename, predict_result, user_feedback, reviewed_result, reason=None, counter=None
|
||
|
self.process_requestfile(request_list, traversed_requestfiles, row[2], row[PREDICT_INDEX], row[FEEDBACK_INDEX], row[REVIEWED_INDEX])
|
||
|
index += 1
|
||
|
|
||
|
self.stdout.write(self.style.SUCCESS(f"Reverting {len(list(set(request_list)))} from images"))
|
||
|
for request in request_list:
|
||
|
self.process_request(request)
|
||
|
self.stdout.write(self.style.SUCCESS('Sample Django management command executed successfully!'))
|
||
|
|