sbt-idp/cope2n-api/fwd_api/api/accuracy_view.py
2024-01-29 17:43:10 +07:00

272 lines
12 KiB
Python

from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from django.core.paginator import Paginator
from django.http import JsonResponse
from datetime import datetime
from django.utils import timezone
from django.db.models import Q
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes
# from drf_spectacular.types import OpenApiString
from ..models import SubscriptionRequest
from ..exception.exceptions import RequiredFieldException
import json
class AccuracyViewSet(viewsets.ViewSet):
lookup_field = "username"
@extend_schema(
parameters=[
OpenApiParameter(
name='start_date',
location=OpenApiParameter.QUERY,
description='Start date (YYYY-mm-DDTHH:MM:SS)',
type=OpenApiTypes.DATE,
default='2023-01-02T00:00:00',
),
OpenApiParameter(
name='end_date',
location=OpenApiParameter.QUERY,
description='End date (YYYY-mm-DDTHH:MM:SS)',
type=OpenApiTypes.DATE,
default='2024-01-10T00:00:00',
),
OpenApiParameter(
name='include_test',
location=OpenApiParameter.QUERY,
description='Whether to include test record or not',
type=OpenApiTypes.BOOL,
),
OpenApiParameter(
name='is_reviewed',
location=OpenApiParameter.QUERY,
description='Which records to be query',
type=OpenApiTypes.STR,
enum=['reviewed', 'not reviewed', 'all'],
),
OpenApiParameter(
name='request_id',
location=OpenApiParameter.QUERY,
description='Specific request id',
type=OpenApiTypes.STR,
),
OpenApiParameter(
name='redemption_id',
location=OpenApiParameter.QUERY,
description='Specific redemption id',
type=OpenApiTypes.STR,
),
OpenApiParameter(
name='quality',
location=OpenApiParameter.QUERY,
description='One or more of [bad, good, all]',
type=OpenApiTypes.STR,
enum=['bad', 'good', 'all'],
),
OpenApiParameter(
name='page',
location=OpenApiParameter.QUERY,
description='Page number',
type=OpenApiTypes.INT,
required=False
),
OpenApiParameter(
name='page_size',
location=OpenApiParameter.QUERY,
description='Number of items per page',
type=OpenApiTypes.INT,
required=False
),
],
responses=None, tags=['Accuracy']
)
@action(detail=False, url_path="request_list", methods=["GET"])
def get_subscription_requests(self, request):
if request.method == 'GET':
start_date_str = request.GET.get('start_date')
end_date_str = request.GET.get('end_date')
page_number = int(request.GET.get('page', 1))
page_size = int(request.GET.get('page_size', 10))
request_id = request.GET.get('request_id', None)
redemption_id = request.GET.get('redemption_id', None)
is_reviewed = request.GET.get('is_reviewed', None)
include_test = request.GET.get('include_test', False)
quality = request.GET.get('quality', None)
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%dT%H:%M:%S')
end_date = datetime.strptime(end_date_str, '%Y-%m-%dT%H:%M:%S')
except ValueError:
return JsonResponse({'error': 'Invalid date format. Please use YYYY-MM-DD.'}, status=400)
base_query = Q(created_at__range=(start_date, end_date))
if request_id:
base_query &= Q(request_id=request_id)
if redemption_id:
base_query &= Q(redemption_id=redemption_id)
base_query &= Q(is_test_request=False)
if isinstance(include_test, str):
include_test = True if include_test=="true" else False
if include_test:
# base_query = ~base_query
base_query.children = base_query.children[:-1]
elif isinstance(include_test, bool):
if include_test:
base_query = ~base_query
if isinstance(is_reviewed, str):
if is_reviewed == "reviewed":
base_query &= Q(is_reviewed=True)
elif is_reviewed == "not reviewed":
base_query &= Q(is_reviewed=False)
elif is_reviewed == "all":
pass
if isinstance(quality, str):
if quality == "good":
base_query &= Q(is_bad_image_quality=False)
elif quality == "bad":
base_query &= Q(is_bad_image_quality=True)
elif quality == "all":
pass
subscription_requests = SubscriptionRequest.objects.filter(base_query).order_by('created_at')
paginator = Paginator(subscription_requests, page_size)
page = paginator.get_page(page_number)
data = []
for request in page:
imeis = []
purchase_date = []
retailer = ""
try:
if request.reviewed_result is not None:
imeis = request.reviewed_result.get("imei_number", [])
purchase_date = request.reviewed_result.get("purchase_date", [])
retailer = request.reviewed_result.get("retailername", "")
elif request.feedback_result is not None :
imeis = request.feedback_result.get("imei_number", [])
purchase_date = request.feedback_result.get("purchase_date", [])
retailer = request.feedback_result.get("retailername", "")
elif request.predict_result is not None:
if request.predict_result.get("status", 404) == 200:
imeis = request.predict_result.get("content", {}).get("document", [])[0].get("content", [])[3].get("value", [])
purchase_date = request.predict_result.get("content", {}).get("document", [])[0].get("content", [])[2].get("value", [])
retailer = request.predict_result.get("content", {}).get("document", [])[0].get("content", [])[0].get("value", [])
except Exception as e:
print(f"[ERROR]: {e}")
print(f"[ERROR]: {request}")
data.append({
'RequestID': request.request_id,
'RedemptionID': request.redemption_id,
'IMEIs': imeis,
'Purchase Date': purchase_date,
'Retailer': retailer,
'Client Request Time (ms)': request.client_request_time,
'Server Processing Time (ms)': request.preprocessing_time + request.ai_inference_time,
'Is Reviewed': request.is_reviewed,
# 'Is Bad Quality': request.is_bad_image_quality,
'created_at': request.created_at.isoformat()
})
response = {
'subscription_requests': data,
'page': {
'number': page.number,
'total_pages': page.paginator.num_pages,
'count': page.paginator.count,
}
}
return JsonResponse(response)
return JsonResponse({'error': 'Invalid request method.'}, status=405)
class RequestViewSet(viewsets.ViewSet):
lookup_field = "username"
@extend_schema(request = {
'multipart/form-data': {
'type': 'object',
'properties': {
'reviewed_result': {
'type': 'string',
},
}
},
}, responses=None, tags=['Request']
)
@action(detail=False, url_path=r"request/(?P<request_id>[\w\-]+)", methods=["GET", "POST"])
def get_subscription_request(self, request, request_id=None):
if request.method == 'GET':
base_query = Q(request_id=request_id)
subscription_request = SubscriptionRequest.objects.filter(base_query).first()
data = []
imeis = []
purchase_date = []
retailer = ""
try:
if subscription_request.reviewed_result is not None:
imeis = subscription_request.reviewed_result.get("imei_number", [])
purchase_date = subscription_request.reviewed_result.get("purchase_date", [])
retailer = subscription_request.reviewed_result.get("retailername", "")
elif subscription_request.feedback_result is not None :
imeis = subscription_request.feedback_result.get("imei_number", [])
purchase_date = subscription_request.feedback_result.get("purchase_date", [])
retailer = subscription_request.feedback_result.get("retailername", "")
elif subscription_request.predict_result is not None:
if subscription_request.predict_result.get("status", 404) == 200:
imeis = subscription_request.predict_result.get("content", {}).get("document", [])[0].get("content", [])[3].get("value", [])
purchase_date = subscription_request.predict_result.get("content", {}).get("document", [])[0].get("content", [])[2].get("value", [])
retailer = subscription_request.predict_result.get("content", {}).get("document", [])[0].get("content", [])[0].get("value", [])
except Exception as e:
print(f"[ERROR]: {e}")
print(f"[ERROR]: {subscription_request}")
data.append({
'RequestID': subscription_request.request_id,
'RedemptionID': subscription_request.redemption_id,
'IMEIs': imeis,
'Purchase Date': purchase_date,
'Retailer': retailer,
'Reviewed result': subscription_request.reviewed_result,
'Feedback result': subscription_request.feedback_result,
'Client Request Time (ms)': subscription_request.client_request_time,
'Server Processing Time (ms)': subscription_request.preprocessing_time + subscription_request.ai_inference_time,
'Is Reviewed': subscription_request.is_reviewed,
# 'Is Bad Quality': subscription_request.is_bad_image_quality,
'created_at': subscription_request.created_at.isoformat(),
'updated_at': subscription_request.updated_at.isoformat()
})
response = {
'subscription_requests': data
}
return JsonResponse(response)
elif request.method == 'POST':
data = request.data
base_query = Q(request_id=request_id)
subscription_request = SubscriptionRequest.objects.filter(base_query).first()
reviewed_result = json.loads(data["reviewed_result"][1:-1])
for field in ['retailername', 'sold_to_party', 'purchase_date', 'imei_number']:
if not field in reviewed_result.keys():
raise RequiredFieldException(excArgs=f'reviewed_result.{field}')
subscription_request.reviewed_result = reviewed_result
subscription_request.reviewed_result['request_id'] = request_id
subscription_request.is_reviewed = True
subscription_request.save()
return JsonResponse({'message': 'success.'}, status=200)
else:
return JsonResponse({'error': 'Invalid request method.'}, status=405)