import base64 import datetime import json import os from zoneinfo import ZoneInfo import jwt from cryptography.fernet import Fernet from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from fwd import settings from fwd_api.annotation.api import throw_on_failure from fwd_api.exception.exceptions import InvalidException from fwd_api.utils.date import default_zone, get_date_time_now, FORMAT from fwd_api.utils import date as DateUtil class InternalCryptor: def __init__(self, key): self.cryptor = Fernet(key) @throw_on_failure(e=InvalidException(excArgs="data")) def encrypt(self, raw_text: str) -> str: return self.cryptor.encrypt(raw_text.encode('utf-8')).decode('utf-8') @throw_on_failure(e=InvalidException(excArgs="data")) def decrypt(self, cipher_text: str) -> str: return self.cryptor.decrypt(cipher_text.encode('utf-8')).decode('utf-8') @throw_on_failure(e=InvalidException(excArgs="data")) def encrypt_json(self, dict_data: dict) -> str: return self.cryptor.encrypt(json.dumps(dict_data).encode('utf-8')).decode('utf-8') @throw_on_failure(e=InvalidException(excArgs="data")) def decrypt_json(self, cipher_text: str) -> dict: raw_data = self.cryptor.decrypt(cipher_text.encode('utf-8')).decode('utf-8') data = json.loads(raw_data) return data sds_db_encryptor = InternalCryptor(key=settings.DB_ENCRYPT_KEY) class SdsAuthentication: key = settings.INTERNAL_SDS_KEY base_url_query = settings.BASE_UI_URL + "?cope2n-token={}" def __init__(self, life_time, algorithm): self.duration_of_token = life_time self.algorithm = algorithm def encode_data(self, data: dict) -> str: return jwt.encode(data, self.key, algorithm=self.algorithm) def decode_data(self, data: str) -> str: return jwt.decode(data, self.key, algorithms=self.algorithm) def generate_token(self, user_id, internal_id, status, sub_id=-1) -> str: c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token) c_time.replace(tzinfo=ZoneInfo(default_zone)) payload = {"id": user_id, "expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value), 'internal_id': internal_id, 'status': status, 'subscription_id': sub_id} return self.encode_data(payload) def generate_img_token(self, user_id) -> str: c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token) c_time.replace(tzinfo=ZoneInfo(default_zone)) payload = {"expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value), "internal_id": user_id} return self.encode_data(payload) def generate_img_token_v2(self, user_id, sub_id=None, user_sync_id=None) -> str: c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token) c_time.replace(tzinfo=ZoneInfo(default_zone)) payload = {"expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value), "internal_id": user_id, 'subscription_id': sub_id, "id": user_sync_id} return self.encode_data(payload) def generate_url(self, user_id, internal_id, status, sub_id) -> str: return self.base_url_query.format(self.generate_token(user_id, internal_id, status, sub_id)) sds_authenticator = SdsAuthentication(life_time=settings.AUTH_TOKEN_LIFE_TIME, algorithm="HS512") admin_sds_authenticator = SdsAuthentication(life_time=settings.AUTH_TOKEN_LIFE_TIME * 10, algorithm="HS512") image_authenticator = SdsAuthentication(life_time=settings.IMAGE_TOKEN_LIFE_TIME, algorithm="HS512") class CtelCrypto: key = settings.CTEL_KEY cypher = Cipher(algorithms.AES256(key.encode()), modes.CTR(os.urandom(16))) def encrypt_ctel(self, text: str, init_vector: str) -> str: # cypher equal to self.cypher if use one encryptor cypher = Cipher(algorithms.AES256(self.key.encode()), modes.CTR(bytes.fromhex(init_vector))) encryptor = cypher.encryptor() en_text = (encryptor.update(text.encode()) + encryptor.finalize()) return en_text.hex() def decrypt_ctel(self, encrypted_hex_text: str, init_vector: str): # cypher equal to self.cypher if use one encryptor cypher = Cipher(algorithms.AES256(self.key.encode()), modes.CTR(bytes.fromhex(init_vector))) decryptor = cypher.decryptor() decrypted_text = (decryptor.update(bytes.fromhex(encrypted_hex_text)) + decryptor.finalize()) return decrypted_text.decode() ctel_cryptor = CtelCrypto() if __name__ == '__main__': # Gen fake token ic = SdsAuthentication(life_time=130, algorithm="HS512") print(ic.generate_token('xmmkoyfpjc', 49, 1, 1))