Files
AILab/bbit_ai/app/config/redis.py
T
2025-12-29 16:30:36 +08:00

48 lines
1.2 KiB
Python

import redis
# ---------------- Redis Client ----------------
class RedisClient:
def __init__(self, config_path="config.yaml"):
self.redis = redis.Redis(
"10.10.12.101",
6379,
0,
decode_responses=True,
)
def set_online(self, device_id: str):
key = f"device:online:{device_id}"
self.redis.set(key, 1)
def set_offline(self, device_id: str):
key = f"device:online:{device_id}"
self.redis.delete(key)
def is_device_online(self, device_id: str) -> bool:
key = f"device:online:{device_id}"
return self.redis.exists(key) == 1
def set_device_info(self, device_id: str, info: dict):
"""
存储完整设备信息到 redis hash
将 bool 转为 int
"""
key = f"device:info:{device_id}"
# 转换 bool 为 int
sanitized_info = {
k: (int(v) if isinstance(v, bool) else v) for k, v in info.items()
}
self.redis.hmset(key, sanitized_info)
def get_device_info(self, device_id: str) -> dict:
key = f"device:info:{device_id}"
return self.redis.hgetall(key)
redis_client = RedisClient()