# myapp/management/commands/mycustomcommand.py from fwd_api.constant.common import FileCategory from django.core.management.base import BaseCommand from tqdm import tqdm from fwd_api.models import SubscriptionRequestFile, SubscriptionRequest from fwd_api.utils.accuracy import predict_result_to_ready import traceback import copy import csv PREDICT_INDEX = 3 FEEDBACK_INDEX = 2 REVIEWED_INDEX = 4 REASON_INDEX = 6 COUNTER_INDEX = 9 class Command(BaseCommand): help = 'Refactor database for image level' def add_arguments(self, parser): # Add your command-line arguments here parser.add_argument('test', type=str, help='Value for the argument') def process_request(self, total, failed, request, predict_result, user_feedback, reviewed_result, reason, counter): if len(request.request_id.split(".")[0].split("_")) < 2: return total[0] += 1 request_review = copy.deepcopy(request.reviewed_result) if not request_review: request_review = { "request_id": request.request_id, "imei_number": [], "retailername": "", "purchase_date": "", "sold_to_party": "" } images = SubscriptionRequestFile.objects.filter(request=request, file_category=FileCategory.Origin.value) is_match = False try: for i, image in enumerate(images): if not request.predict_result: raise KeyError(f"Key predict_result not found in {request.request_id}") if request.predict_result.get("status", 200) != 200: raise AttributeError(f"Failed request: {request.request_id}") for field in ['retailername', 'purchase_date', 'imei_number']: if image.feedback_result[field] is not None and ((field == 'imei_number' and len(image.feedback_result[field]) > 0 and image.feedback_result[field][0] == user_feedback) or image.feedback_result[field] == user_feedback): is_match = True if field == 'imei_number': if not (reviewed_result in request_review["imei_number"]): request_review["imei_number"].append(reviewed_result) else: if not reviewed_result == request_review[field]: request_review[field] = reviewed_result _reviewed_result = copy.deepcopy(request.reviewed_result) if not _reviewed_result: _reviewed_result = { "request_id": image.request_id, "imei_number": [], "retailername": "", "purchase_date": "", "sold_to_party": "" } if image.doc_type == "invoice" and field in ['retailername', 'purchase_date']: if _reviewed_result: _reviewed_result[field] = reviewed_result _reviewed_result["imei_number"] = [] else: None elif image.doc_type == "imei" and field == "imei_number": _reviewed_result = { "retailername": None, "sold_to_party": None, "purchase_date": None, "imei_number": [reviewed_result] } if _reviewed_result else None image.reviewed_result = _reviewed_result if reason: image.reason = reason if counter: image.counter_measures = counter image.save() request.reviewed_result = request_review request.reviewed_result["request_id"] = request.request_id request.is_reviewed = True request.save() except Exception as e: self.stdout.write(self.style.ERROR(f"Request: {request.request_id} failed with {e}")) failed[0] += 1 print(traceback.format_exc()) if not is_match: failed[0] += 1 print("FAIL =====>", image.feedback_result, predict_result, user_feedback, reviewed_result) def handle(self, *args, **options): test = options['test'] total = [0] failed = [0] #open csv file with open(test, 'r') as csvfile: reader = csv.reader(csvfile) index = 0 for row in reader: if index != 0: request = SubscriptionRequest.objects.filter(request_id=row[0]).first() if not request: print("Not found ====>", row) else: # request, predict_result, user_feedback, reviewed_result, reason, counter self.process_request(total, failed, request, row[PREDICT_INDEX], row[FEEDBACK_INDEX], row[REVIEWED_INDEX], row[REASON_INDEX], row[COUNTER_INDEX]) index += 1 self.stdout.write(self.style.SUCCESS(f"Failed/Total: {failed[0]}/{total[0]}")) self.stdout.write(self.style.SUCCESS('Sample Django management command executed successfully!'))