306 lines
12 KiB
Python
Executable File
306 lines
12 KiB
Python
Executable File
import base64
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from django.db import transaction
|
|
from django.http import JsonResponse
|
|
from drf_spectacular.types import OpenApiTypes
|
|
from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiExample
|
|
from rest_framework import status, viewsets
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
|
|
from ..annotation.api import throw_on_failure
|
|
from ..constant.common import USER_MESSAGE, EntityStatus, PLAN_MESSAGE, PlanCode
|
|
from ..exception.exceptions import InvalidException, NotFoundException, LockedEntityException, TrialOneException, NotAuthenticatedException
|
|
from ..models import UserProfile, PricingPlan, Subscription
|
|
from ..request.UpsertUserRequest import UpsertUserRequest
|
|
from ..response.SubscriptionResponse import SubscriptionResponse
|
|
from ..utils.health import get_health_report
|
|
from ..utils import date as DateUtil
|
|
from ..utils.crypto import sds_authenticator, admin_sds_authenticator, SdsAuthentication
|
|
import datetime
|
|
from ..request.LoginRequest import LoginRequest
|
|
from ..utils.date import default_zone
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from fwd import settings
|
|
|
|
class CtelUserViewSet(viewsets.ViewSet):
|
|
lookup_field = "username"
|
|
|
|
@extend_schema(request={
|
|
'multipart/form-data': {
|
|
'type': 'object',
|
|
'properties': {
|
|
'username': {
|
|
'type': 'string',
|
|
},
|
|
'password': {
|
|
'type': 'string',
|
|
},
|
|
|
|
},
|
|
'required': {'username', 'password'}
|
|
}
|
|
}, responses=None, tags=['Login'])
|
|
@action(detail=False, url_path="login", methods=["POST"])
|
|
def login(self, request):
|
|
serializer = LoginRequest(data=request.data)
|
|
logger.error(serializer.is_valid(raise_exception=True))
|
|
|
|
data = serializer.validated_data
|
|
token_limit = 999999
|
|
if data['username'] == settings.ADMIN_USER_NAME:
|
|
if data['password'] != settings.ADMIN_PASSWORD:
|
|
raise NotAuthenticatedException()
|
|
elif data['username'] == settings.STANDARD_USER_NAME:
|
|
if data['password'] != settings.STANDARD_PASSWORD:
|
|
raise NotAuthenticatedException()
|
|
token_limit = 1000
|
|
else:
|
|
raise NotAuthenticatedException()
|
|
|
|
users = UserProfile.objects.filter(sync_id=data['username'])
|
|
|
|
if len(users) > 1:
|
|
raise InvalidException(excArgs=USER_MESSAGE)
|
|
if len(users) == 0:
|
|
user = UserProfile(sync_id=data['username'], status=EntityStatus.ACTIVE.value)
|
|
user.save()
|
|
else:
|
|
user = users[0]
|
|
subs = Subscription.objects.filter(user=user)
|
|
if len(subs) > 1:
|
|
raise TrialOneException(excArgs=PLAN_MESSAGE)
|
|
if len(subs) == 0:
|
|
p_code = "FI_PLAN"
|
|
plans = PricingPlan.objects.filter(code=p_code)
|
|
if len(plans) > 1:
|
|
raise TrialOneException(excArgs=PLAN_MESSAGE)
|
|
if len(plans) == 0:
|
|
plan = PricingPlan(code=p_code, duration=365, token_limitations=token_limit)
|
|
plan.save()
|
|
else:
|
|
plan: PricingPlan = plans[0]
|
|
|
|
start_plan_at = DateUtil.get_date_time_now()
|
|
c_time = start_plan_at + datetime.timedelta(days=plan.duration)
|
|
c_time.replace(tzinfo=ZoneInfo(default_zone))
|
|
|
|
sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time,
|
|
user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value)
|
|
sub.save()
|
|
else:
|
|
sub = subs[0]
|
|
return Response(status=status.HTTP_200_OK, data={
|
|
'user_id': user.id,
|
|
'user_name': data['username'],
|
|
'token': sds_authenticator.generate_token(user_id=data['username'], internal_id=user.id, status=EntityStatus.ACTIVE.value, sub_id=sub.id)
|
|
})
|
|
|
|
|
|
# TODO (vietanhdev): Make this optional. Comment out first
|
|
# NOTE: Don't remove this code
|
|
# @extend_schema(request=None, responses=None, tags=['users'])
|
|
# @action(detail=False, url_path="users/info", methods=["GET"])
|
|
# @throw_on_failure(InvalidException(excArgs='data'))
|
|
# def get_be_user(self, request):
|
|
# user_data = ProcessUtil.get_user(request)
|
|
|
|
# return Response(status=status.HTTP_200_OK, data={
|
|
# 'userId': user_data.user.sync_id,
|
|
# 'message': 'User is valid'
|
|
# })
|
|
|
|
|
|
# TODO (vietanhdev): Make this optional. Comment out first
|
|
# NOTE: Don't remove this code
|
|
# @extend_schema(request=None, responses=None, tags=['users'], methods=['GET'], parameters=[
|
|
# OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"),
|
|
# OpenApiParameter("subscription_id", OpenApiTypes.STR, OpenApiParameter.QUERY, description="Subscription id"),
|
|
|
|
# ])
|
|
# @extend_schema(request=UpsertUserRequest, responses=None, tags=['users'], methods=['POST'], parameters=[
|
|
# OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"),
|
|
# ], examples=[
|
|
# OpenApiExample(
|
|
# 'Example',
|
|
# summary='Request Sample',
|
|
# description='Status : 0 ( active ) / 1 (inactive). Default is 0. <br>'
|
|
# 'Datetime Format: <b>dd/mm/YYYY HH:MM:SS</b> <br>'
|
|
# 'Plan code ( TRIAL / BASIC / ADVANCED ) ( TRIAL apply only one-time per user account )',
|
|
# value={
|
|
# 'plan_code': 'A03',
|
|
# 'plan_start_at': '01/04/2023 00:00:00',
|
|
# 'status': 1,
|
|
# 'email': 'abc@mamil.vn',
|
|
# 'name': 'Pham Van A'
|
|
# },
|
|
# request_only=True,
|
|
# response_only=False
|
|
# ),
|
|
# ])
|
|
# @action(detail=False, url_path="users", methods=["POST", "GET"])
|
|
# @throw_on_failure(InvalidException(excArgs='data'))
|
|
# def user_view(self, request):
|
|
# if request.method == "POST":
|
|
# return self.upsert_user(request)
|
|
# else:
|
|
# return self.get_user(request)
|
|
|
|
def upsert_user(self, request):
|
|
if not hasattr(request, 'user_data'):
|
|
raise NotFoundException(excArgs=USER_MESSAGE)
|
|
user_data = request.user_data
|
|
user_updated: bool = False
|
|
sub_id = -1
|
|
# Check request
|
|
serializer = UpsertUserRequest(data=request.data)
|
|
serializer.is_valid()
|
|
if not serializer.is_valid():
|
|
logger.error(serializer.errors)
|
|
raise InvalidException(excArgs=list(serializer.errors.keys()))
|
|
data = serializer.validated_data
|
|
|
|
users = UserProfile.objects.filter(sync_id=user_data['userId'])
|
|
|
|
if len(users) > 1:
|
|
raise InvalidException(excArgs=USER_MESSAGE)
|
|
if len(users) == 0:
|
|
user = UserProfile(sync_id=user_data['userId'])
|
|
user_updated = True
|
|
else:
|
|
user = users[0]
|
|
|
|
if 'name' in data:
|
|
user.full_name = data['name']
|
|
user_updated = True
|
|
|
|
if 'email' in data:
|
|
user.email = data['email']
|
|
user_updated = True
|
|
|
|
if 'status' in data:
|
|
user.status = data['status']
|
|
user_updated = True
|
|
|
|
if user_updated:
|
|
user.save()
|
|
|
|
if 'plan_code' in data and "plan_start_at" in data:
|
|
plan_code = data['plan_code']
|
|
# create sub
|
|
plans = PricingPlan.objects.filter(code=plan_code)
|
|
if len(plans) != 1:
|
|
raise InvalidException(excArgs=PLAN_MESSAGE)
|
|
plan: PricingPlan = plans[0]
|
|
|
|
if plan_code == PlanCode.TRIAL.value:
|
|
subs = Subscription.objects.filter(user=user, pricing_plan=plan)
|
|
if len(subs) > 1:
|
|
raise TrialOneException(excArgs=PLAN_MESSAGE)
|
|
|
|
start_plan_at = DateUtil.to_date(data['plan_start_at'], DateUtil.FORMAT.DD_MM_YYYY_HHMMSS.value)
|
|
c_time = start_plan_at + datetime.timedelta(days=plan.duration)
|
|
c_time.replace(tzinfo=ZoneInfo(default_zone))
|
|
|
|
sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time,
|
|
user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value)
|
|
sub.save()
|
|
sub_id = sub.id
|
|
subs = Subscription.objects.filter(user=user)
|
|
sub_res = SubscriptionResponse(data=subs, many=True)
|
|
sub_res.is_valid()
|
|
|
|
response_dict = {
|
|
'userId': user_data['userId'],
|
|
'status': user.status,
|
|
'subscriptionId': sub_id,
|
|
}
|
|
|
|
return Response(status=status.HTTP_200_OK, data=response_dict)
|
|
|
|
def get_user(self, request):
|
|
if not hasattr(request, 'user_data'):
|
|
raise NotFoundException(excArgs='user')
|
|
user_data = request.user_data
|
|
sub_id = request.query_params.get('subscription_id', None)
|
|
users = UserProfile.objects.filter(sync_id=user_data['userId'])
|
|
if len(users) != 1:
|
|
raise InvalidException(excArgs="userId")
|
|
user: UserProfile = users.first()
|
|
if user.status != EntityStatus.ACTIVE.value:
|
|
raise LockedEntityException(excArgs="user")
|
|
|
|
if sub_id is None or sub_id == -1:
|
|
subs = Subscription.objects.filter(user=user)
|
|
sub_res = SubscriptionResponse(data=subs, many=True)
|
|
sub_res.is_valid()
|
|
|
|
return Response(status=status.HTTP_200_OK, data={
|
|
'userId': user.sync_id,
|
|
'status': user.status,
|
|
'subscriptions': sub_res.data
|
|
})
|
|
subs = Subscription.objects.filter(user=user, id=sub_id)
|
|
sub_res = SubscriptionResponse(data=subs, many=True)
|
|
sub_res.is_valid()
|
|
sub = sub_res.data[0]
|
|
gen_x: SdsAuthentication = admin_sds_authenticator if sub['plan_code'] == 'SDS_EXTRA_PREMIUM_PACK' else sds_authenticator # Gen long token for admin
|
|
|
|
return Response(status=status.HTTP_200_OK, data={
|
|
'userId': user.sync_id,
|
|
'status': user.status,
|
|
'subscriptions': sub,
|
|
'url': gen_x.generate_url(user.sync_id, user.id, user.status, sub['id']),
|
|
'token': gen_x.generate_token(user.sync_id, user.id, user.status, sub['id']),
|
|
})
|
|
|
|
# TODO (vietanhdev): Make this optional. Comment out first
|
|
# NOTE: Don't remove this code
|
|
# @action(detail=False, url_path="users/gen-token", methods=["POST"])
|
|
# @throw_on_failure(InvalidException(excArgs='data'))
|
|
# def gen_cmc_token(self, request):
|
|
# data = request.data
|
|
|
|
# if "expiredAt" not in request.data:
|
|
# raise InvalidException(excArgs='expiredAt')
|
|
# uid = ProcessUtil.get_random_string(10) if "userId" not in data or data['userId'] is None else data['userId']
|
|
# m_text = {
|
|
# 'userId': uid,
|
|
# "expiredAt": data['expiredAt']
|
|
# }
|
|
# import os
|
|
# from ..utils.crypto import ctel_cryptor
|
|
# import json
|
|
# iv = os.urandom(16).hex()
|
|
# e_text = ctel_cryptor.encrypt_ctel(json.dumps(m_text), iv)
|
|
# e_data = {
|
|
# 'content': e_text,
|
|
# 'iv': iv
|
|
# }
|
|
# return Response(status=status.HTTP_200_OK, data={
|
|
# 'token': base64.b64encode(json.dumps(e_data).encode())
|
|
# })
|
|
|
|
@extend_schema(responses=None, tags=['System'])
|
|
@action(detail=False, url_path="healthcheck", methods=["GET"], authentication_classes=[], permission_classes=[])
|
|
def health_check(self, request):
|
|
response = {
|
|
"status": "OK",
|
|
}
|
|
return JsonResponse(status=status.HTTP_200_OK, data=response)
|
|
|
|
@extend_schema(responses=None, tags=['System'])
|
|
@action(detail=False, url_path="system_usage", methods=["GET"])
|
|
def usage(self, request):
|
|
data = get_health_report()
|
|
response = {
|
|
"status": "OK",
|
|
"data": data,
|
|
}
|
|
return JsonResponse(status=status.HTTP_200_OK, data=response)
|
|
|