import redis # ---------------- Redis Client ---------------- class RedisClient: def __init__(self, config_path="config.yaml"): self.redis = redis.Redis( "10.10.12.101", 6379, 0, decode_responses=True, ) def set_online(self, device_id: str): key = f"device:online:{device_id}" self.redis.set(key, 1) def set_offline(self, device_id: str): key = f"device:online:{device_id}" self.redis.delete(key) def is_device_online(self, device_id: str) -> bool: key = f"device:online:{device_id}" return self.redis.exists(key) == 1 def set_device_info(self, device_id: str, info: dict): """ 存储完整设备信息到 redis hash 将 bool 转为 int """ key = f"device:info:{device_id}" # 转换 bool 为 int sanitized_info = { k: (int(v) if isinstance(v, bool) else v) for k, v in info.items() } self.redis.hmset(key, sanitized_info) def get_device_info(self, device_id: str) -> dict: key = f"device:info:{device_id}" return self.redis.hgetall(key) redis_client = RedisClient()