sbt-idp/cope2n-api/fwd_api/utils/crypto.py

115 lines
4.6 KiB
Python
Raw Normal View History

2023-11-30 11:19:06 +00:00
import base64
import datetime
import json
import os
from zoneinfo import ZoneInfo
import jwt
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from fwd import settings
from fwd_api.annotation.api import throw_on_failure
from fwd_api.exception.exceptions import InvalidException
2023-12-15 05:43:19 +00:00
from fwd_api.utils.date import default_zone, get_date_time_now, FORMAT
from fwd_api.utils import date as DateUtil
2023-11-30 11:19:06 +00:00
class InternalCryptor:
def __init__(self, key):
self.cryptor = Fernet(key)
@throw_on_failure(e=InvalidException(excArgs="data"))
def encrypt(self, raw_text: str) -> str:
return self.cryptor.encrypt(raw_text.encode('utf-8')).decode('utf-8')
@throw_on_failure(e=InvalidException(excArgs="data"))
def decrypt(self, cipher_text: str) -> str:
return self.cryptor.decrypt(cipher_text.encode('utf-8')).decode('utf-8')
@throw_on_failure(e=InvalidException(excArgs="data"))
def encrypt_json(self, dict_data: dict) -> str:
return self.cryptor.encrypt(json.dumps(dict_data).encode('utf-8')).decode('utf-8')
@throw_on_failure(e=InvalidException(excArgs="data"))
def decrypt_json(self, cipher_text: str) -> dict:
raw_data = self.cryptor.decrypt(cipher_text.encode('utf-8')).decode('utf-8')
data = json.loads(raw_data)
return data
sds_db_encryptor = InternalCryptor(key=settings.DB_ENCRYPT_KEY)
class SdsAuthentication:
key = settings.INTERNAL_SDS_KEY
base_url_query = settings.BASE_UI_URL + "?cope2n-token={}"
def __init__(self, life_time, algorithm):
self.duration_of_token = life_time
self.algorithm = algorithm
def encode_data(self, data: dict) -> str:
return jwt.encode(data, self.key, algorithm=self.algorithm)
def decode_data(self, data: str) -> str:
return jwt.decode(data, self.key, algorithms=self.algorithm)
def generate_token(self, user_id, internal_id, status, sub_id=-1) -> str:
c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token)
c_time.replace(tzinfo=ZoneInfo(default_zone))
payload = {"id": user_id, "expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value),
'internal_id': internal_id, 'status': status, 'subscription_id': sub_id}
return self.encode_data(payload)
def generate_img_token(self, user_id) -> str:
c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token)
c_time.replace(tzinfo=ZoneInfo(default_zone))
payload = {"expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value), "internal_id": user_id}
return self.encode_data(payload)
def generate_img_token_v2(self, user_id, sub_id=None, user_sync_id=None) -> str:
c_time = get_date_time_now() + datetime.timedelta(hours=self.duration_of_token)
c_time.replace(tzinfo=ZoneInfo(default_zone))
payload = {"expired_at": DateUtil.to_str(c_time, FORMAT.DD_MM_YYYY_HHMMSS.value), "internal_id": user_id, 'subscription_id': sub_id, "id": user_sync_id}
return self.encode_data(payload)
def generate_url(self, user_id, internal_id, status, sub_id) -> str:
return self.base_url_query.format(self.generate_token(user_id, internal_id, status, sub_id))
sds_authenticator = SdsAuthentication(life_time=settings.AUTH_TOKEN_LIFE_TIME, algorithm="HS512")
admin_sds_authenticator = SdsAuthentication(life_time=settings.AUTH_TOKEN_LIFE_TIME * 10, algorithm="HS512")
image_authenticator = SdsAuthentication(life_time=settings.IMAGE_TOKEN_LIFE_TIME, algorithm="HS512")
class CtelCrypto:
key = settings.CTEL_KEY
cypher = Cipher(algorithms.AES256(key.encode()), modes.CTR(os.urandom(16)))
def encrypt_ctel(self, text: str, init_vector: str) -> str:
# cypher equal to self.cypher if use one encryptor
cypher = Cipher(algorithms.AES256(self.key.encode()), modes.CTR(bytes.fromhex(init_vector)))
encryptor = cypher.encryptor()
en_text = (encryptor.update(text.encode()) + encryptor.finalize())
return en_text.hex()
def decrypt_ctel(self, encrypted_hex_text: str, init_vector: str):
# cypher equal to self.cypher if use one encryptor
cypher = Cipher(algorithms.AES256(self.key.encode()), modes.CTR(bytes.fromhex(init_vector)))
decryptor = cypher.decryptor()
decrypted_text = (decryptor.update(bytes.fromhex(encrypted_hex_text)) + decryptor.finalize())
return decrypted_text.decode()
ctel_cryptor = CtelCrypto()
if __name__ == '__main__':
# Gen fake token
ic = SdsAuthentication(life_time=130, algorithm="HS512")
print(ic.generate_token('xmmkoyfpjc', 49, 1, 1))