import json import redis # ---------------- Redis Client ---------------- class RedisClient: def __init__(self): self.redis = redis.Redis( "10.10.12.101", 6379, 0, decode_responses=True, ) # 已废弃 之前使用webhookds的方式通知 但是因为通知不及时的原因,现在在线状态全盘交给device status # def set_online(self, device_id: str): # key = f"device:online:{device_id}" # self.redis.set(key, 1) # # def set_offline(self, device_id: str): # key = f"device:online:{device_id}" # self.redis.delete(key) def set_device_info(self, device_id: str, info: dict): """ 存储完整设备信息到 redis hash 将 bool 转为 int """ key = f"device:info:{device_id}" # 转换 bool 为 int sanitized_info = { k: (int(v) if isinstance(v, bool) else v) for k, v in info.items() } self.redis.hmset(key, sanitized_info) def get_device_info(self, device_id: str) -> dict: key = f"device:info:{device_id}" raw_info = self.redis.hgetall(key) return raw_info def get_value(self, key: str): """ 获取 Redis 中的值,如果是 JSON 字符串自动解析为 dict """ value = self.redis.get(key) if not value: return None try: return json.loads(value) except json.JSONDecodeError: return value def set_value(self, key: str, value, expire: int = None): """ 存储 key-value,value 可以是 dict,自动序列化为 JSON expire 单位秒 """ if isinstance(value, (dict, list)): value = json.dumps(value) if expire: self.redis.set(key, value, ex=expire) else: self.redis.set(key, value) redis_client = RedisClient()