import base64 from zoneinfo import ZoneInfo from django.db import transaction from django.http import JsonResponse from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiExample from rest_framework import status, viewsets from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.decorators import authentication_classes, permission_classes from ..annotation.api import throw_on_failure from ..constant.common import USER_MESSAGE, EntityStatus, PLAN_MESSAGE, PlanCode from ..exception.exceptions import InvalidException, NotFoundException, LockedEntityException, TrialOneException, \ LimitReachedException, NotAuthenticatedException from ..models import UserProfile, PricingPlan, Subscription from ..request.UpsertUserRequest import UpsertUserRequest from ..response.SubscriptionResponse import SubscriptionResponse from ..utils import ProcessUtil, DateUtil from ..utils.CryptoUtils import sds_authenticator, admin_sds_authenticator, SdsAuthentication import datetime from ..request.LoginRequest import LoginRequest from ..request.HealcheckSerializer import HealthCheckSerializer from ..utils.DateUtil import default_zone from fwd import settings class CtelUserViewSet(viewsets.ViewSet): lookup_field = "username" # @extend_schema(request=LoginRequest, responses=None, tags=['users'], examples=[ # OpenApiExample( # 'ex1', # summary='Sample Login', # description='Sample Login', # value={ # 'username': 'admin', # 'password': 'admin' # } # ), # ]) @extend_schema(request={ 'multipart/form-data': { 'type': 'object', 'properties': { 'username': { 'type': 'string', }, 'password': { 'type': 'string', }, }, 'required': {'username', 'password'} } }, responses=None, tags=['ocr']) @action(detail=False, url_path="login", methods=["POST"]) def login(self, request): serializer = LoginRequest(data=request.data) print(serializer.is_valid(raise_exception=True)) data = serializer.validated_data if data['username'] != settings.FI_USER_NAME or data['password'] != settings.FI_PASSWORD: raise NotAuthenticatedException() users = UserProfile.objects.filter(sync_id=settings.FI_USER_NAME) if len(users) > 1: raise InvalidException(excArgs=USER_MESSAGE) if len(users) == 0: user = UserProfile(sync_id=settings.FI_USER_NAME, status=EntityStatus.ACTIVE.value) user.save() else: user = users[0] subs = Subscription.objects.filter(user=user) if len(subs) > 1: raise TrialOneException(excArgs=PLAN_MESSAGE) if len(subs) == 0: p_code = "FI_PLAN" plans = PricingPlan.objects.filter(code=p_code) if len(plans) > 1: raise TrialOneException(excArgs=PLAN_MESSAGE) if len(plans) == 0: plan = PricingPlan(code=p_code, duration=365, token_limitations=999999) plan.save() else: plan: PricingPlan = plans[0] start_plan_at = DateUtil.get_date_time_now() c_time = start_plan_at + datetime.timedelta(days=plan.duration) c_time.replace(tzinfo=ZoneInfo(default_zone)) sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time, user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value) sub.save() else: sub = subs[0] return Response(status=status.HTTP_200_OK, data={ 'user_id': 'SBT', 'user_name': settings.FI_USER_NAME, 'token': sds_authenticator.generate_token(user_id=settings.FI_USER_NAME, internal_id=user.id, status=EntityStatus.ACTIVE.value, sub_id=sub.id) }) @extend_schema(request=None, responses=None, tags=['users']) @action(detail=False, url_path="users/info", methods=["GET"]) @throw_on_failure(InvalidException(excArgs='data')) def get_be_user(self, request): user_data = ProcessUtil.get_user(request) return Response(status=status.HTTP_200_OK, data={ 'userId': user_data.user.sync_id, 'message': 'User is valid' }) @extend_schema(request=None, responses=None, tags=['users'], methods=['GET'], parameters=[ OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"), OpenApiParameter("subscription_id", OpenApiTypes.STR, OpenApiParameter.QUERY, description="Subscription id"), ]) @extend_schema(request=UpsertUserRequest, responses=None, tags=['users'], methods=['POST'], parameters=[ OpenApiParameter("user", OpenApiTypes.STR, OpenApiParameter.HEADER, description="CMC/SDS encrypted token"), ], examples=[ OpenApiExample( 'Example', summary='Request Sample', description='Status : 0 ( active ) / 1 (inactive). Default is 0.
' 'Datetime Format: dd/mm/YYYY HH:MM:SS
' 'Plan code ( TRIAL / BASIC / ADVANCED ) ( TRIAL apply only one-time per user account )', value={ 'plan_code': 'A03', 'plan_start_at': '01/04/2023 00:00:00', 'status': 1, 'email': 'abc@mamil.vn', 'name': 'Pham Van A' }, request_only=True, response_only=False ), ]) @action(detail=False, url_path="users", methods=["POST", "GET"]) @throw_on_failure(InvalidException(excArgs='data')) def user_view(self, request): if request.method == "POST": return self.upsert_user(request) else: return self.get_user(request) @transaction.atomic def upsert_user(self, request): if not hasattr(request, 'user_data'): raise NotFoundException(excArgs=USER_MESSAGE) user_data = request.user_data user_updated: bool = False sub_id = -1 # Check request serializer = UpsertUserRequest(data=request.data) serializer.is_valid() if not serializer.is_valid(): print(serializer.errors) raise InvalidException(excArgs=list(serializer.errors.keys())) data = serializer.validated_data users = UserProfile.objects.filter(sync_id=user_data['userId']) if len(users) > 1: raise InvalidException(excArgs=USER_MESSAGE) if len(users) == 0: user = UserProfile(sync_id=user_data['userId']) user_updated = True else: user = users[0] if 'name' in data: user.full_name = data['name'] user_updated = True if 'email' in data: user.email = data['email'] user_updated = True if 'status' in data: user.status = data['status'] user_updated = True if user_updated: user.save() if 'plan_code' in data and "plan_start_at" in data: plan_code = data['plan_code'] # create sub plans = PricingPlan.objects.filter(code=plan_code) if len(plans) != 1: raise InvalidException(excArgs=PLAN_MESSAGE) plan: PricingPlan = plans[0] if plan_code == PlanCode.TRIAL.value: subs = Subscription.objects.filter(user=user, pricing_plan=plan) if len(subs) > 1: raise TrialOneException(excArgs=PLAN_MESSAGE) start_plan_at = DateUtil.to_date(data['plan_start_at'], DateUtil.FORMAT.DD_MM_YYYY_HHMMSS.value) c_time = start_plan_at + datetime.timedelta(days=plan.duration) c_time.replace(tzinfo=ZoneInfo(default_zone)) sub: Subscription = Subscription(limit_token=plan.token_limitations, pricing_plan=plan, expired_at=c_time, user=user, start_at=start_plan_at, status=EntityStatus.ACTIVE.value) sub.save() sub_id = sub.id subs = Subscription.objects.filter(user=user) sub_res = SubscriptionResponse(data=subs, many=True) sub_res.is_valid() response_dict = { 'userId': user_data['userId'], 'status': user.status, 'subscriptionId': sub_id, } return Response(status=status.HTTP_200_OK, data=response_dict) def get_user(self, request): if not hasattr(request, 'user_data'): raise NotFoundException(excArgs='user') user_data = request.user_data sub_id = request.query_params.get('subscription_id', None) users = UserProfile.objects.filter(sync_id=user_data['userId']) if len(users) != 1: raise InvalidException(excArgs="userId") user: UserProfile = users.first() if user.status != EntityStatus.ACTIVE.value: raise LockedEntityException(excArgs="user") if sub_id is None or sub_id == -1: subs = Subscription.objects.filter(user=user) sub_res = SubscriptionResponse(data=subs, many=True) sub_res.is_valid() return Response(status=status.HTTP_200_OK, data={ 'userId': user.sync_id, 'status': user.status, 'subscriptions': sub_res.data }) subs = Subscription.objects.filter(user=user, id=sub_id) sub_res = SubscriptionResponse(data=subs, many=True) sub_res.is_valid() sub = sub_res.data[0] gen_x: SdsAuthentication = admin_sds_authenticator if sub['plan_code'] == 'SDS_EXTRA_PREMIUM_PACK' else sds_authenticator # Gen long token for admin return Response(status=status.HTTP_200_OK, data={ 'userId': user.sync_id, 'status': user.status, 'subscriptions': sub, 'url': gen_x.generate_url(user.sync_id, user.id, user.status, sub['id']), 'token': gen_x.generate_token(user.sync_id, user.id, user.status, sub['id']), }) # @extend_schema(request={ # 'application/json': { # 'type': 'object', # 'properties': { # 'userId': { # 'type': 'string', # 'example': "C2H5OH" # }, # 'expiredAt': { # 'type': 'string', # 'example': "21/12/2023 00:00:00" # } # }, # 'required': {'collection_id', 'source', 'files'} # } # }, responses=None, tags=['users'], # description="API mô phỏng token CMC truyền sang SDS. Dùng để truyền vào header user") @action(detail=False, url_path="users/gen-token", methods=["POST"]) @throw_on_failure(InvalidException(excArgs='data')) def gen_cmc_token(self, request): data = request.data if "expiredAt" not in request.data: raise InvalidException(excArgs='expiredAt') uid = ProcessUtil.get_random_string(10) if "userId" not in data or data['userId'] is None else data['userId'] m_text = { 'userId': uid, "expiredAt": data['expiredAt'] } import os from ..utils.CryptoUtils import ctel_cryptor import json iv = os.urandom(16).hex() e_text = ctel_cryptor.encrypt_ctel(json.dumps(m_text), iv) e_data = { 'content': e_text, 'iv': iv } return Response(status=status.HTTP_200_OK, data={ 'token': base64.b64encode(json.dumps(e_data).encode()) }) @action(detail=False, url_path="healthcheck", methods=["GET"], authentication_classes=[], permission_classes=[]) def health_check(self, request): # Perform any checks to determine the health status of the application # TODO: check database connectivity, S3 service availability, etc. serializer = HealthCheckSerializer(data={ 'status': "OK", 'message': 'Application is running smoothly.' }) serializer.is_valid(raise_exception=True) return JsonResponse(status=status.HTTP_200_OK, data=serializer.data)