sbt-idp/cope2n-api/fwd_api/api/ctel_user_view.py
2024-06-26 14:58:24 +07:00

306 lines
12 KiB
Python
Executable File

import base64
from zoneinfo import ZoneInfo
from django.db import transaction
from django.http import JsonResponse
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiExample
from rest_framework import status, viewsets
from rest_framework.decorators import action
from rest_framework.response import Response
from ..annotation.api import throw_on_failure
from ..constant.common import USER_MESSAGE, EntityStatus, PLAN_MESSAGE, PlanCode
from ..exception.exceptions import InvalidException, NotFoundException, LockedEntityException, TrialOneException, NotAuthenticatedException
from ..models import UserProfile, PricingPlan, Subscription
from ..request.UpsertUserRequest import UpsertUserRequest
from ..response.SubscriptionResponse import SubscriptionResponse
from ..utils.health import get_health_report
from ..utils import date as DateUtil
from ..utils.crypto import sds_authenticator, admin_sds_authenticator, SdsAuthentication
import datetime
from ..request.LoginRequest import LoginRequest
from ..utils.date import default_zone
import logging
logger = logging.getLogger(__name__)
from fwd import settings
class CtelUserViewSet(viewsets.ViewSet):
lookup_field = "username"
@extend_schema(request={
'multipart/form-data': {
'type': 'object',
'properties': {
'username': {
'type': 'string',
},
'password': {
'type': 'string',
},
},
'required': {'username', 'password'}
}
}, responses=None, tags=['Login'])
@action(detail=False, url_path="login", methods=["POST"])
def login(self, request):
serializer = LoginRequest(data=request.data)
logger.error(serializer.is_valid(raise_exception=True))
data = serializer.validated_data
token_limit = 999999
if data['username'] == settings.ADMIN_USER_NAME:
if data['password'] != settings.ADMIN_PASSWORD:
raise NotAuthenticatedException()
elif data['username'] == settings.STANDARD_USER_NAME:
if data['password'] != settings.STANDARD_PASSWORD:
raise NotAuthenticatedException()
token_limit = 1000
else:
raise NotAuthenticatedException()
users = UserProfile.objects.filter(sync_id=data['username'])
if len(users) > 1:
raise InvalidException(excArgs=USER_MESSAGE)
if len(users) == 0:
user = UserProfile(sync_id=data['username'], status=EntityStatus.ACTIVE.value)
user.save()
else:
user = users[0]
subs = Subscription.objects.filter(user=user)
if len(subs) > 1:
raise TrialOneException(excArgs=PLAN_MESSAGE)
if len(subs) == 0:
p_code = "FI_PLAN"
plans = PricingPlan.objects.filter(code=p_code)
if len(plans) > 1:
raise TrialOneException(excArgs=PLAN_MESSAGE)
if len(plans) == 0:
plan = PricingPlan(code=p_code, duration=365, token_limitations=token_limit)
plan.save()
else:
plan: PricingPlan = plans[0]
start_plan_at = DateUtil.get_date_time_now()
c_time = start_plan_at + datetime.timedelta(days=plan.duration)
c_time.replace(tzinfo=ZoneInfo(default_zone))
sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time,
user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value)
sub.save()
else:
sub = subs[0]
return Response(status=status.HTTP_200_OK, data={
'user_id': user.id,
'user_name': data['username'],
'token': sds_authenticator.generate_token(user_id=data['username'], internal_id=user.id, status=EntityStatus.ACTIVE.value, sub_id=sub.id)
})
# TODO (vietanhdev): Make this optional. Comment out first
# NOTE: Don't remove this code
# @extend_schema(request=None, responses=None, tags=['users'])
# @action(detail=False, url_path="users/info", methods=["GET"])
# @throw_on_failure(InvalidException(excArgs='data'))
# def get_be_user(self, request):
# user_data = ProcessUtil.get_user(request)
# return Response(status=status.HTTP_200_OK, data={
# 'userId': user_data.user.sync_id,
# 'message': 'User is valid'
# })
# TODO (vietanhdev): Make this optional. Comment out first
# NOTE: Don't remove this code
# @extend_schema(request=None, responses=None, tags=['users'], methods=['GET'], parameters=[
# OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"),
# OpenApiParameter("subscription_id", OpenApiTypes.STR, OpenApiParameter.QUERY, description="Subscription id"),
# ])
# @extend_schema(request=UpsertUserRequest, responses=None, tags=['users'], methods=['POST'], parameters=[
# OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"),
# ], examples=[
# OpenApiExample(
# 'Example',
# summary='Request Sample',
# description='Status : 0 ( active ) / 1 (inactive). Default is 0. <br>'
# 'Datetime Format: <b>dd/mm/YYYY HH:MM:SS</b> <br>'
# 'Plan code ( TRIAL / BASIC / ADVANCED ) ( TRIAL apply only one-time per user account )',
# value={
# 'plan_code': 'A03',
# 'plan_start_at': '01/04/2023 00:00:00',
# 'status': 1,
# 'email': 'abc@mamil.vn',
# 'name': 'Pham Van A'
# },
# request_only=True,
# response_only=False
# ),
# ])
# @action(detail=False, url_path="users", methods=["POST", "GET"])
# @throw_on_failure(InvalidException(excArgs='data'))
# def user_view(self, request):
# if request.method == "POST":
# return self.upsert_user(request)
# else:
# return self.get_user(request)
def upsert_user(self, request):
if not hasattr(request, 'user_data'):
raise NotFoundException(excArgs=USER_MESSAGE)
user_data = request.user_data
user_updated: bool = False
sub_id = -1
# Check request
serializer = UpsertUserRequest(data=request.data)
serializer.is_valid()
if not serializer.is_valid():
logger.error(serializer.errors)
raise InvalidException(excArgs=list(serializer.errors.keys()))
data = serializer.validated_data
users = UserProfile.objects.filter(sync_id=user_data['userId'])
if len(users) > 1:
raise InvalidException(excArgs=USER_MESSAGE)
if len(users) == 0:
user = UserProfile(sync_id=user_data['userId'])
user_updated = True
else:
user = users[0]
if 'name' in data:
user.full_name = data['name']
user_updated = True
if 'email' in data:
user.email = data['email']
user_updated = True
if 'status' in data:
user.status = data['status']
user_updated = True
if user_updated:
user.save()
if 'plan_code' in data and "plan_start_at" in data:
plan_code = data['plan_code']
# create sub
plans = PricingPlan.objects.filter(code=plan_code)
if len(plans) != 1:
raise InvalidException(excArgs=PLAN_MESSAGE)
plan: PricingPlan = plans[0]
if plan_code == PlanCode.TRIAL.value:
subs = Subscription.objects.filter(user=user, pricing_plan=plan)
if len(subs) > 1:
raise TrialOneException(excArgs=PLAN_MESSAGE)
start_plan_at = DateUtil.to_date(data['plan_start_at'], DateUtil.FORMAT.DD_MM_YYYY_HHMMSS.value)
c_time = start_plan_at + datetime.timedelta(days=plan.duration)
c_time.replace(tzinfo=ZoneInfo(default_zone))
sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time,
user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value)
sub.save()
sub_id = sub.id
subs = Subscription.objects.filter(user=user)
sub_res = SubscriptionResponse(data=subs, many=True)
sub_res.is_valid()
response_dict = {
'userId': user_data['userId'],
'status': user.status,
'subscriptionId': sub_id,
}
return Response(status=status.HTTP_200_OK, data=response_dict)
def get_user(self, request):
if not hasattr(request, 'user_data'):
raise NotFoundException(excArgs='user')
user_data = request.user_data
sub_id = request.query_params.get('subscription_id', None)
users = UserProfile.objects.filter(sync_id=user_data['userId'])
if len(users) != 1:
raise InvalidException(excArgs="userId")
user: UserProfile = users.first()
if user.status != EntityStatus.ACTIVE.value:
raise LockedEntityException(excArgs="user")
if sub_id is None or sub_id == -1:
subs = Subscription.objects.filter(user=user)
sub_res = SubscriptionResponse(data=subs, many=True)
sub_res.is_valid()
return Response(status=status.HTTP_200_OK, data={
'userId': user.sync_id,
'status': user.status,
'subscriptions': sub_res.data
})
subs = Subscription.objects.filter(user=user, id=sub_id)
sub_res = SubscriptionResponse(data=subs, many=True)
sub_res.is_valid()
sub = sub_res.data[0]
gen_x: SdsAuthentication = admin_sds_authenticator if sub['plan_code'] == 'SDS_EXTRA_PREMIUM_PACK' else sds_authenticator # Gen long token for admin
return Response(status=status.HTTP_200_OK, data={
'userId': user.sync_id,
'status': user.status,
'subscriptions': sub,
'url': gen_x.generate_url(user.sync_id, user.id, user.status, sub['id']),
'token': gen_x.generate_token(user.sync_id, user.id, user.status, sub['id']),
})
# TODO (vietanhdev): Make this optional. Comment out first
# NOTE: Don't remove this code
# @action(detail=False, url_path="users/gen-token", methods=["POST"])
# @throw_on_failure(InvalidException(excArgs='data'))
# def gen_cmc_token(self, request):
# data = request.data
# if "expiredAt" not in request.data:
# raise InvalidException(excArgs='expiredAt')
# uid = ProcessUtil.get_random_string(10) if "userId" not in data or data['userId'] is None else data['userId']
# m_text = {
# 'userId': uid,
# "expiredAt": data['expiredAt']
# }
# import os
# from ..utils.crypto import ctel_cryptor
# import json
# iv = os.urandom(16).hex()
# e_text = ctel_cryptor.encrypt_ctel(json.dumps(m_text), iv)
# e_data = {
# 'content': e_text,
# 'iv': iv
# }
# return Response(status=status.HTTP_200_OK, data={
# 'token': base64.b64encode(json.dumps(e_data).encode())
# })
@extend_schema(responses=None, tags=['System'])
@action(detail=False, url_path="healthcheck", methods=["GET"], authentication_classes=[], permission_classes=[])
def health_check(self, request):
response = {
"status": "OK",
}
return JsonResponse(status=status.HTTP_200_OK, data=response)
@extend_schema(responses=None, tags=['System'])
@action(detail=False, url_path="system_usage", methods=["GET"])
def usage(self, request):
data = get_health_report()
response = {
"status": "OK",
"data": data,
}
return JsonResponse(status=status.HTTP_200_OK, data=response)