sbt-idp/cope2n-api/fwd_api/api/ctel_user_view.py
Viet Anh Nguyen d2ea7f2e66 Clean up code
2023-12-12 12:54:34 +07:00

300 lines
12 KiB
Python
Executable File

import base64
from zoneinfo import ZoneInfo
from django.db import transaction
from django.http import JsonResponse
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiExample
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.decorators import authentication_classes, permission_classes
from ..annotation.api import throw_on_failure
from ..constant.common import USER_MESSAGE, EntityStatus, PLAN_MESSAGE, PlanCode
from ..exception.exceptions import InvalidException, NotFoundException, LockedEntityException, TrialOneException, \
LimitReachedException, NotAuthenticatedException
from ..models import UserProfile, PricingPlan, Subscription
from ..request.UpsertUserRequest import UpsertUserRequest
from ..response.SubscriptionResponse import SubscriptionResponse
from ..utils import ProcessUtil, DateUtil
from ..utils.CryptoUtils import sds_authenticator, admin_sds_authenticator, SdsAuthentication
import datetime
from ..request.LoginRequest import LoginRequest
from ..request.HealcheckSerializer import HealthCheckSerializer
from ..utils.DateUtil import default_zone
from fwd import settings
class CtelUserViewSet(viewsets.ViewSet):
lookup_field = "username"
@extend_schema(request={
'multipart/form-data': {
'type': 'object',
'properties': {
'username': {
'type': 'string',
},
'password': {
'type': 'string',
},
},
'required': {'username', 'password'}
}
}, responses=None, tags=['ocr'])
@action(detail=False, url_path="login", methods=["POST"])
def login(self, request):
serializer = LoginRequest(data=request.data)
print(serializer.is_valid(raise_exception=True))
data = serializer.validated_data
if data['username'] != settings.FI_USER_NAME or data['password'] != settings.FI_PASSWORD:
raise NotAuthenticatedException()
users = UserProfile.objects.filter(sync_id=settings.FI_USER_NAME)
if len(users) > 1:
raise InvalidException(excArgs=USER_MESSAGE)
if len(users) == 0:
user = UserProfile(sync_id=settings.FI_USER_NAME, status=EntityStatus.ACTIVE.value)
user.save()
else:
user = users[0]
subs = Subscription.objects.filter(user=user)
if len(subs) > 1:
raise TrialOneException(excArgs=PLAN_MESSAGE)
if len(subs) == 0:
p_code = "FI_PLAN"
plans = PricingPlan.objects.filter(code=p_code)
if len(plans) > 1:
raise TrialOneException(excArgs=PLAN_MESSAGE)
if len(plans) == 0:
plan = PricingPlan(code=p_code, duration=365, token_limitations=999999)
plan.save()
else:
plan: PricingPlan = plans[0]
start_plan_at = DateUtil.get_date_time_now()
c_time = start_plan_at + datetime.timedelta(days=plan.duration)
c_time.replace(tzinfo=ZoneInfo(default_zone))
sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time,
user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value)
sub.save()
else:
sub = subs[0]
return Response(status=status.HTTP_200_OK, data={
'user_id': 'SBT',
'user_name': settings.FI_USER_NAME,
'token': sds_authenticator.generate_token(user_id=settings.FI_USER_NAME, internal_id=user.id, status=EntityStatus.ACTIVE.value, sub_id=sub.id)
})
@extend_schema(request=None, responses=None, tags=['users'])
@action(detail=False, url_path="users/info", methods=["GET"])
@throw_on_failure(InvalidException(excArgs='data'))
def get_be_user(self, request):
user_data = ProcessUtil.get_user(request)
return Response(status=status.HTTP_200_OK, data={
'userId': user_data.user.sync_id,
'message': 'User is valid'
})
@extend_schema(request=None, responses=None, tags=['users'], methods=['GET'], parameters=[
OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"),
OpenApiParameter("subscription_id", OpenApiTypes.STR, OpenApiParameter.QUERY, description="Subscription id"),
])
@extend_schema(request=UpsertUserRequest, responses=None, tags=['users'], methods=['POST'], parameters=[
OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"),
], examples=[
OpenApiExample(
'Example',
summary='Request Sample',
description='Status : 0 ( active ) / 1 (inactive). Default is 0. <br>'
'Datetime Format: <b>dd/mm/YYYY HH:MM:SS</b> <br>'
'Plan code ( TRIAL / BASIC / ADVANCED ) ( TRIAL apply only one-time per user account )',
value={
'plan_code': 'A03',
'plan_start_at': '01/04/2023 00:00:00',
'status': 1,
'email': 'abc@mamil.vn',
'name': 'Pham Van A'
},
request_only=True,
response_only=False
),
])
@action(detail=False, url_path="users", methods=["POST", "GET"])
@throw_on_failure(InvalidException(excArgs='data'))
def user_view(self, request):
if request.method == "POST":
return self.upsert_user(request)
else:
return self.get_user(request)
@transaction.atomic
def upsert_user(self, request):
if not hasattr(request, 'user_data'):
raise NotFoundException(excArgs=USER_MESSAGE)
user_data = request.user_data
user_updated: bool = False
sub_id = -1
# Check request
serializer = UpsertUserRequest(data=request.data)
serializer.is_valid()
if not serializer.is_valid():
print(serializer.errors)
raise InvalidException(excArgs=list(serializer.errors.keys()))
data = serializer.validated_data
users = UserProfile.objects.filter(sync_id=user_data['userId'])
if len(users) > 1:
raise InvalidException(excArgs=USER_MESSAGE)
if len(users) == 0:
user = UserProfile(sync_id=user_data['userId'])
user_updated = True
else:
user = users[0]
if 'name' in data:
user.full_name = data['name']
user_updated = True
if 'email' in data:
user.email = data['email']
user_updated = True
if 'status' in data:
user.status = data['status']
user_updated = True
if user_updated:
user.save()
if 'plan_code' in data and "plan_start_at" in data:
plan_code = data['plan_code']
# create sub
plans = PricingPlan.objects.filter(code=plan_code)
if len(plans) != 1:
raise InvalidException(excArgs=PLAN_MESSAGE)
plan: PricingPlan = plans[0]
if plan_code == PlanCode.TRIAL.value:
subs = Subscription.objects.filter(user=user, pricing_plan=plan)
if len(subs) > 1:
raise TrialOneException(excArgs=PLAN_MESSAGE)
start_plan_at = DateUtil.to_date(data['plan_start_at'], DateUtil.FORMAT.DD_MM_YYYY_HHMMSS.value)
c_time = start_plan_at + datetime.timedelta(days=plan.duration)
c_time.replace(tzinfo=ZoneInfo(default_zone))
sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time,
user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value)
sub.save()
sub_id = sub.id
subs = Subscription.objects.filter(user=user)
sub_res = SubscriptionResponse(data=subs, many=True)
sub_res.is_valid()
response_dict = {
'userId': user_data['userId'],
'status': user.status,
'subscriptionId': sub_id,
}
return Response(status=status.HTTP_200_OK, data=response_dict)
def get_user(self, request):
if not hasattr(request, 'user_data'):
raise NotFoundException(excArgs='user')
user_data = request.user_data
sub_id = request.query_params.get('subscription_id', None)
users = UserProfile.objects.filter(sync_id=user_data['userId'])
if len(users) != 1:
raise InvalidException(excArgs="userId")
user: UserProfile = users.first()
if user.status != EntityStatus.ACTIVE.value:
raise LockedEntityException(excArgs="user")
if sub_id is None or sub_id == -1:
subs = Subscription.objects.filter(user=user)
sub_res = SubscriptionResponse(data=subs, many=True)
sub_res.is_valid()
return Response(status=status.HTTP_200_OK, data={
'userId': user.sync_id,
'status': user.status,
'subscriptions': sub_res.data
})
subs = Subscription.objects.filter(user=user, id=sub_id)
sub_res = SubscriptionResponse(data=subs, many=True)
sub_res.is_valid()
sub = sub_res.data[0]
gen_x: SdsAuthentication = admin_sds_authenticator if sub['plan_code'] == 'SDS_EXTRA_PREMIUM_PACK' else sds_authenticator # Gen long token for admin
return Response(status=status.HTTP_200_OK, data={
'userId': user.sync_id,
'status': user.status,
'subscriptions': sub,
'url': gen_x.generate_url(user.sync_id, user.id, user.status, sub['id']),
'token': gen_x.generate_token(user.sync_id, user.id, user.status, sub['id']),
})
# @extend_schema(request={
# 'application/json': {
# 'type': 'object',
# 'properties': {
# 'userId': {
# 'type': 'string',
# 'example': "C2H5OH"
# },
# 'expiredAt': {
# 'type': 'string',
# 'example': "21/12/2023 00:00:00"
# }
# },
# 'required': {'collection_id', 'source', 'files'}
# }
# }, responses=None, tags=['users'],
# description="API mô phỏng token CMC truyền sang SDS. Dùng để truyền vào header user")
@action(detail=False, url_path="users/gen-token", methods=["POST"])
@throw_on_failure(InvalidException(excArgs='data'))
def gen_cmc_token(self, request):
data = request.data
if "expiredAt" not in request.data:
raise InvalidException(excArgs='expiredAt')
uid = ProcessUtil.get_random_string(10) if "userId" not in data or data['userId'] is None else data['userId']
m_text = {
'userId': uid,
"expiredAt": data['expiredAt']
}
import os
from ..utils.CryptoUtils import ctel_cryptor
import json
iv = os.urandom(16).hex()
e_text = ctel_cryptor.encrypt_ctel(json.dumps(m_text), iv)
e_data = {
'content': e_text,
'iv': iv
}
return Response(status=status.HTTP_200_OK, data={
'token': base64.b64encode(json.dumps(e_data).encode())
})
@action(detail=False, url_path="healthcheck", methods=["GET"], authentication_classes=[], permission_classes=[])
def health_check(self, request):
# Perform any checks to determine the health status of the application
# TODO: check database connectivity, S3 service availability, etc.
serializer = HealthCheckSerializer(data={
'status': "OK",
'message': 'Application is running smoothly.'
})
serializer.is_valid(raise_exception=True)
return JsonResponse(status=status.HTTP_200_OK, data=serializer.data)