sbt-idp/cope2n-api/fwd_api/management/commands/migrate-csv-revert.py

129 lines
6.6 KiB
Python
Raw Permalink Normal View History

2024-02-28 11:45:52 +00:00
# myapp/management/commands/mycustomcommand.py
from io import StringIO
from django.core.management.base import BaseCommand
from tqdm import tqdm
from fwd_api.models import SubscriptionRequestFile, SubscriptionRequest
from fwd_api.utils.accuracy import predict_result_to_ready
import traceback
import copy
import csv
from fwd_api.constant.common import FileCategory
import re
PREDICT_INDEX = 4
FEEDBACK_INDEX = 3
REVIEWED_INDEX = 5
REASON_INDEX = 6
COUNTER_INDEX = 9
def detect_date_format(date_string):
pattern = r'^\d{2}/\d{2}/\d{4}$'
match = re.match(pattern, date_string)
if match:
return True
else:
return False
class Command(BaseCommand):
help = 'Refactor database for image level'
def add_arguments(self, parser):
# Add your command-line arguments here
parser.add_argument('test', type=str, help='Value for the argument')
def process_request(self, request_id):
request = SubscriptionRequest.objects.filter(request_id=request_id).first()
images = SubscriptionRequestFile.objects.filter(request=request, file_category=FileCategory.Origin.value).order_by('index_in_request')
request_review = {"imei_number": [], "retailername": None, "purchase_date": None, "sold_to_party": None}
for image in images:
if image.doc_type == "imei":
request_review["imei_number"] += image.reviewed_result.get("imei_number", [])
elif image.doc_type == "invoice":
request_review["retailername"] = image.reviewed_result.get("retailername", None)
request_review["purchase_date"] = image.reviewed_result.get("purchase_date", None)
request_review["sold_to_party"] = image.reviewed_result.get("sold_to_party", None)
request.reviewed_result = request_review
request.save()
def process_requestfile(self, request_list, traversed_requestfiles, request_filename, predict_result, user_feedback, reviewed_result, reason=None, counter=None):
image = SubscriptionRequestFile.objects.filter(file_name=request_filename).first()
try:
if image.doc_type == "imei":
if request_filename not in traversed_requestfiles:
image.reviewed_result = {"imei_number": [reviewed_result], "retailername": None, "purchase_date": None, "sold_to_party": None}
else:
image.reviewed_result["imei_number"].append(reviewed_result)
if request_filename not in traversed_requestfiles:
image.feedback_result = {"imei_number": [user_feedback], "retailername": None, "purchase_date": None, "sold_to_party": None}
else:
image.feedback_result["imei_number"].append(user_feedback)
if request_filename not in traversed_requestfiles:
image.predict_result = {"imei_number": [predict_result], "retailername": None, "purchase_date": [], "sold_to_party": None}
else:
image.predict_result["imei_number"].append(predict_result)
elif image.doc_type == "invoice":
if detect_date_format(reviewed_result):
if not image.reviewed_result:
image.reviewed_result = {"imei_number": [], "retailername": None, "purchase_date": reviewed_result, "sold_to_party": None}
else:
image.reviewed_result["purchase_date"] = reviewed_result
if not image.feedback_result:
image.feedback_result = {"imei_number": [], "retailername": None, "purchase_date": user_feedback, "sold_to_party": None}
else:
image.feedback_result["purchase_date"] = user_feedback
if not image.predict_result:
image.predict_result = {"imei_number": [], "retailername": None, "purchase_date": [predict_result], "sold_to_party": None}
else:
image.predict_result["purchase_date"] = [predict_result]
else:
if not image.reviewed_result:
image.reviewed_result = {"imei_number": [], "retailername": reviewed_result, "purchase_date": None, "sold_to_party": None}
else:
image.reviewed_result["retailername"] = reviewed_result
if not image.feedback_result:
image.feedback_result = {"imei_number": [], "retailername": user_feedback, "purchase_date": None, "sold_to_party": None}
else:
image.feedback_result["retailername"] = user_feedback
if not image.predict_result:
image.predict_result = {"imei_number": [], "retailername": predict_result, "purchase_date": [], "sold_to_party": None}
else:
image.predict_result["retailername"] = predict_result
if reason:
image.reason = reason
if counter:
image.counter_measures = counter
image.save()
request_list.append(image.request.request_id)
traversed_requestfiles.append(request_filename)
if request_filename == "temp_imei_SAP_20240201130151_7e7fa87017af40c1bd079b7da6950193_0.pdf":
print(f"[INFO]: {image.reviewed_result}")
print(f"[INFO]: {image.predict_result}")
print(f"[INFO]: {image.feedback_result}")
except Exception as e:
self.stdout.write(self.style.ERROR(f"Request File: {request_filename} failed with {e}"))
print(traceback.format_exc())
def handle(self, *args, **options):
test = options['test']
request_list = []
traversed_requestfiles = []
#open csv file
with open(test, 'r') as csvfile:
reader = csv.reader(csvfile)
index = 0
for row in reader:
if index != 0:
# request_list, traversed_requestfiles, request_filename, predict_result, user_feedback, reviewed_result, reason=None, counter=None
self.process_requestfile(request_list, traversed_requestfiles, row[2], row[PREDICT_INDEX], row[FEEDBACK_INDEX], row[REVIEWED_INDEX])
index += 1
self.stdout.write(self.style.SUCCESS(f"Reverting {len(list(set(request_list)))} from images"))
for request in request_list:
self.process_request(request)
self.stdout.write(self.style.SUCCESS('Sample Django management command executed successfully!'))