sbt-idp/cope2n-api/fwd_api/utils/crypto.py
2023-12-15 12:43:19 +07:00

115 lines
4.6 KiB
Python

import base64
import datetime
import json
import os
from zoneinfo import ZoneInfo
import jwt
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from fwd import settings
from fwd_api.annotation.api import throw_on_failure
from fwd_api.exception.exceptions import InvalidException
from fwd_api.utils.date import default_zone, get_date_time_now, FORMAT
from fwd_api.utils import date as DateUtil
class InternalCryptor:
def __init__(self, key):
self.cryptor = Fernet(key)
@throw_on_failure(e=InvalidException(excArgs="data"))
def encrypt(self, raw_text: str) -> str:
return self.cryptor.encrypt(raw_text.encode('utf-8')).decode('utf-8')
@throw_on_failure(e=InvalidException(excArgs="data"))
def decrypt(self, cipher_text: str) -> str:
return self.cryptor.decrypt(cipher_text.encode('utf-8')).decode('utf-8')
@throw_on_failure(e=InvalidException(excArgs="data"))
def encrypt_json(self, dict_data: dict) -> str:
return self.cryptor.encrypt(json.dumps(dict_data).encode('utf-8')).decode('utf-8')
@throw_on_failure(e=InvalidException(excArgs="data"))
def decrypt_json(self, cipher_text: str) -> dict:
raw_data = self.cryptor.decrypt(cipher_text.encode('utf-8')).decode('utf-8')
data = json.loads(raw_data)
return data
sds_db_encryptor = InternalCryptor(key=settings.DB_ENCRYPT_KEY)
class SdsAuthentication:
key = settings.INTERNAL_SDS_KEY
base_url_query = settings.BASE_UI_URL + "?cope2n-token={}"
def __init__(self, life_time, algorithm):
self.duration_of_token = life_time
self.algorithm = algorithm
def encode_data(self, data: dict) -> str:
return jwt.encode(data, self.key, algorithm=self.algorithm)
def decode_data(self, data: str) -> str:
return jwt.decode(data, self.key, algorithms=self.algorithm)
def generate_token(self, user_id, internal_id, status, sub_id=-1) -> str:
c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token)
c_time.replace(tzinfo=ZoneInfo(default_zone))
payload = {"id": user_id, "expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value),
'internal_id': internal_id, 'status': status, 'subscription_id': sub_id}
return self.encode_data(payload)
def generate_img_token(self, user_id) -> str:
c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token)
c_time.replace(tzinfo=ZoneInfo(default_zone))
payload = {"expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value), "internal_id": user_id}
return self.encode_data(payload)
def generate_img_token_v2(self, user_id, sub_id=None, user_sync_id=None) -> str:
c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token)
c_time.replace(tzinfo=ZoneInfo(default_zone))
payload = {"expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value), "internal_id": user_id, 'subscription_id': sub_id, "id": user_sync_id}
return self.encode_data(payload)
def generate_url(self, user_id, internal_id, status, sub_id) -> str:
return self.base_url_query.format(self.generate_token(user_id, internal_id, status, sub_id))
sds_authenticator = SdsAuthentication(life_time=settings.AUTH_TOKEN_LIFE_TIME, algorithm="HS512")
admin_sds_authenticator = SdsAuthentication(life_time=settings.AUTH_TOKEN_LIFE_TIME * 10, algorithm="HS512")
image_authenticator = SdsAuthentication(life_time=settings.IMAGE_TOKEN_LIFE_TIME, algorithm="HS512")
class CtelCrypto:
key = settings.CTEL_KEY
cypher = Cipher(algorithms.AES256(key.encode()), modes.CTR(os.urandom(16)))
def encrypt_ctel(self, text: str, init_vector: str) -> str:
# cypher equal to self.cypher if use one encryptor
cypher = Cipher(algorithms.AES256(self.key.encode()), modes.CTR(bytes.fromhex(init_vector)))
encryptor = cypher.encryptor()
en_text = (encryptor.update(text.encode()) + encryptor.finalize())
return en_text.hex()
def decrypt_ctel(self, encrypted_hex_text: str, init_vector: str):
# cypher equal to self.cypher if use one encryptor
cypher = Cipher(algorithms.AES256(self.key.encode()), modes.CTR(bytes.fromhex(init_vector)))
decryptor = cypher.decryptor()
decrypted_text = (decryptor.update(bytes.fromhex(encrypted_hex_text)) + decryptor.finalize())
return decrypted_text.decode()
ctel_cryptor = CtelCrypto()
if __name__ == '__main__':
# Gen fake token
ic = SdsAuthentication(life_time=130, algorithm="HS512")
print(ic.generate_token('xmmkoyfpjc', 49, 1, 1))