import base64 from zoneinfo import ZoneInfo from django.db import transaction from django.http import JsonResponse from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiExample from rest_framework import status, viewsets from rest_framework.decorators import action from rest_framework.response import Response from ..annotation.api import throw_on_failure from ..constant.common import USER_MESSAGE, EntityStatus, PLAN_MESSAGE, PlanCode from ..exception.exceptions import InvalidException, NotFoundException, LockedEntityException, TrialOneException, NotAuthenticatedException from ..models import UserProfile, PricingPlan, Subscription from ..request.UpsertUserRequest import UpsertUserRequest from ..response.SubscriptionResponse import SubscriptionResponse from ..utils.health import get_health_report from ..utils import date as DateUtil from ..utils.crypto import sds_authenticator, admin_sds_authenticator, SdsAuthentication import datetime from ..request.LoginRequest import LoginRequest from ..utils.date import default_zone import logging logger = logging.getLogger(__name__) from fwd import settings class CtelUserViewSet(viewsets.ViewSet): lookup_field = "username" @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'username': { 'type': 'string', }, 'password': { 'type': 'string', }, }, 'required': {'username', 'password'} } }, responses=None, tags=['Login']) @action(detail=False, url_path="login", methods=["POST"]) def login(self, request): serializer = LoginRequest(data=request.data) logger.error(serializer.is_valid(raise_exception=True)) data = serializer.validated_data token_limit = 999999 if data['username'] == settings.ADMIN_USER_NAME: if data['password'] != settings.ADMIN_PASSWORD: raise NotAuthenticatedException() elif data['username'] == settings.STANDARD_USER_NAME: if data['password'] != settings.STANDARD_PASSWORD: raise NotAuthenticatedException() token_limit = 1000 else: raise NotAuthenticatedException() users = UserProfile.objects.filter(sync_id=data['username']) if len(users) > 1: raise InvalidException(excArgs=USER_MESSAGE) if len(users) == 0: user = UserProfile(sync_id=data['username'], status=EntityStatus.ACTIVE.value) user.save() else: user = users[0] subs = Subscription.objects.filter(user=user) if len(subs) > 1: raise TrialOneException(excArgs=PLAN_MESSAGE) if len(subs) == 0: p_code = "FI_PLAN" plans = PricingPlan.objects.filter(code=p_code) if len(plans) > 1: raise TrialOneException(excArgs=PLAN_MESSAGE) if len(plans) == 0: plan = PricingPlan(code=p_code, duration=365, token_limitations=token_limit) plan.save() else: plan: PricingPlan = plans[0] start_plan_at = DateUtil.get_date_time_now() c_time = start_plan_at + datetime.timedelta(days=plan.duration) c_time.replace(tzinfo=ZoneInfo(default_zone)) sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time, user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value) sub.save() else: sub = subs[0] return Response(status=status.HTTP_200_OK, data={ 'user_id': user.id, 'user_name': data['username'], 'token': sds_authenticator.generate_token(user_id=data['username'], internal_id=user.id, status=EntityStatus.ACTIVE.value, sub_id=sub.id) }) # TODO (vietanhdev): Make this optional. Comment out first # NOTE: Don't remove this code # @extend_schema(request=None, responses=None, tags=['users']) # @action(detail=False, url_path="users/info", methods=["GET"]) # @throw_on_failure(InvalidException(excArgs='data')) # def get_be_user(self, request): # user_data = ProcessUtil.get_user(request) # return Response(status=status.HTTP_200_OK, data={ # 'userId': user_data.user.sync_id, # 'message': 'User is valid' # }) # TODO (vietanhdev): Make this optional. Comment out first # NOTE: Don't remove this code # @extend_schema(request=None, responses=None, tags=['users'], methods=['GET'], parameters=[ # OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"), # OpenApiParameter("subscription_id", OpenApiTypes.STR, OpenApiParameter.QUERY, description="Subscription id"), # ]) # @extend_schema(request=UpsertUserRequest, responses=None, tags=['users'], methods=['POST'], parameters=[ # OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"), # ], examples=[ # OpenApiExample( # 'Example', # summary='Request Sample', # description='Status : 0 ( active ) / 1 (inactive). Default is 0.
' # 'Datetime Format: dd/mm/YYYY HH:MM:SS
' # 'Plan code ( TRIAL / BASIC / ADVANCED ) ( TRIAL apply only one-time per user account )', # value={ # 'plan_code': 'A03', # 'plan_start_at': '01/04/2023 00:00:00', # 'status': 1, # 'email': 'abc@mamil.vn', # 'name': 'Pham Van A' # }, # request_only=True, # response_only=False # ), # ]) # @action(detail=False, url_path="users", methods=["POST", "GET"]) # @throw_on_failure(InvalidException(excArgs='data')) # def user_view(self, request): # if request.method == "POST": # return self.upsert_user(request) # else: # return self.get_user(request) def upsert_user(self, request): if not hasattr(request, 'user_data'): raise NotFoundException(excArgs=USER_MESSAGE) user_data = request.user_data user_updated: bool = False sub_id = -1 # Check request serializer = UpsertUserRequest(data=request.data) serializer.is_valid() if not serializer.is_valid(): logger.error(serializer.errors) raise InvalidException(excArgs=list(serializer.errors.keys())) data = serializer.validated_data users = UserProfile.objects.filter(sync_id=user_data['userId']) if len(users) > 1: raise InvalidException(excArgs=USER_MESSAGE) if len(users) == 0: user = UserProfile(sync_id=user_data['userId']) user_updated = True else: user = users[0] if 'name' in data: user.full_name = data['name'] user_updated = True if 'email' in data: user.email = data['email'] user_updated = True if 'status' in data: user.status = data['status'] user_updated = True if user_updated: user.save() if 'plan_code' in data and "plan_start_at" in data: plan_code = data['plan_code'] # create sub plans = PricingPlan.objects.filter(code=plan_code) if len(plans) != 1: raise InvalidException(excArgs=PLAN_MESSAGE) plan: PricingPlan = plans[0] if plan_code == PlanCode.TRIAL.value: subs = Subscription.objects.filter(user=user, pricing_plan=plan) if len(subs) > 1: raise TrialOneException(excArgs=PLAN_MESSAGE) start_plan_at = DateUtil.to_date(data['plan_start_at'], DateUtil.FORMAT.DD_MM_YYYY_HHMMSS.value) c_time = start_plan_at + datetime.timedelta(days=plan.duration) c_time.replace(tzinfo=ZoneInfo(default_zone)) sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time, user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value) sub.save() sub_id = sub.id subs = Subscription.objects.filter(user=user) sub_res = SubscriptionResponse(data=subs, many=True) sub_res.is_valid() response_dict = { 'userId': user_data['userId'], 'status': user.status, 'subscriptionId': sub_id, } return Response(status=status.HTTP_200_OK, data=response_dict) def get_user(self, request): if not hasattr(request, 'user_data'): raise NotFoundException(excArgs='user') user_data = request.user_data sub_id = request.query_params.get('subscription_id', None) users = UserProfile.objects.filter(sync_id=user_data['userId']) if len(users) != 1: raise InvalidException(excArgs="userId") user: UserProfile = users.first() if user.status != EntityStatus.ACTIVE.value: raise LockedEntityException(excArgs="user") if sub_id is None or sub_id == -1: subs = Subscription.objects.filter(user=user) sub_res = SubscriptionResponse(data=subs, many=True) sub_res.is_valid() return Response(status=status.HTTP_200_OK, data={ 'userId': user.sync_id, 'status': user.status, 'subscriptions': sub_res.data }) subs = Subscription.objects.filter(user=user, id=sub_id) sub_res = SubscriptionResponse(data=subs, many=True) sub_res.is_valid() sub = sub_res.data[0] gen_x: SdsAuthentication = admin_sds_authenticator if sub['plan_code'] == 'SDS_EXTRA_PREMIUM_PACK' else sds_authenticator # Gen long token for admin return Response(status=status.HTTP_200_OK, data={ 'userId': user.sync_id, 'status': user.status, 'subscriptions': sub, 'url': gen_x.generate_url(user.sync_id, user.id, user.status, sub['id']), 'token': gen_x.generate_token(user.sync_id, user.id, user.status, sub['id']), }) # TODO (vietanhdev): Make this optional. Comment out first # NOTE: Don't remove this code # @action(detail=False, url_path="users/gen-token", methods=["POST"]) # @throw_on_failure(InvalidException(excArgs='data')) # def gen_cmc_token(self, request): # data = request.data # if "expiredAt" not in request.data: # raise InvalidException(excArgs='expiredAt') # uid = ProcessUtil.get_random_string(10) if "userId" not in data or data['userId'] is None else data['userId'] # m_text = { # 'userId': uid, # "expiredAt": data['expiredAt'] # } # import os # from ..utils.crypto import ctel_cryptor # import json # iv = os.urandom(16).hex() # e_text = ctel_cryptor.encrypt_ctel(json.dumps(m_text), iv) # e_data = { # 'content': e_text, # 'iv': iv # } # return Response(status=status.HTTP_200_OK, data={ # 'token': base64.b64encode(json.dumps(e_data).encode()) # }) @extend_schema(responses=None, tags=['System']) @action(detail=False, url_path="healthcheck", methods=["GET"], authentication_classes=[], permission_classes=[]) def health_check(self, request): response = { "status": "OK", } return JsonResponse(status=status.HTTP_200_OK, data=response) @extend_schema(responses=None, tags=['System']) @action(detail=False, url_path="system_usage", methods=["GET"]) def usage(self, request): data = get_health_report() response = { "status": "OK", "data": data, } return JsonResponse(status=status.HTTP_200_OK, data=response)