sbt-idp/cope2n-api/fwd_api/utils/redis.py
2024-02-06 10:14:44 +07:00

48 lines
1.7 KiB
Python

import redis
import json
from datetime import datetime, timedelta
from django.conf import settings
class RedisUtils:
def __init__(self, host=settings.REDIS_HOST, port=settings.REDIS_PORT):
self.redis_client = redis.Redis(host=host, port=port, decode_responses=True)
def set_cache(self, request_id, image_index, data):
"""
request_id: str
data: dict
image_index: int
"""
self.redis_client.hset(request_id, image_index, json.dumps(data))
self.redis_client.expire(request_id, 3600)
def get_all_cache(self, request_id):
resutlt = {}
for key, value in self.redis_client.hgetall(request_id).items():
resutlt[key] = json.loads(value)
return resutlt
def get_specific_cache(self, request_id, key):
return json.loads(self.redis_client.hget(request_id, key))
def get_size(self, request_id):
return self.redis_client.hlen(request_id)
def remove_cache(self, request_id):
self.redis_client.delete(request_id)
if __name__ == '__main__':
_host = "127.0.0.1"
_port = 6379
Yujii_A = RedisUtils(_host, _port)
Yujii_A.set_cache("SAP123", 1, {"status": 1})
Yujii_A.set_cache("SAP123", 2, {"status": 2})
Yujii_A.set_cache("SAP123", 3, {"status": 3})
print("[INFO]: data for request_id: {}".format(Yujii_A.get_all_cache("SAP123")))
print("[INFO]: len for request_id: {}".format(Yujii_A.get_size("SAP123")))
Yujii_A.remove_cache("SAP123")
print("[INFO]: data for request_id: {}".format(Yujii_A.get_all_cache("SAP123")))
print("[INFO]: len for request_id: {}".format(Yujii_A.get_size("SAP123")))