Files
AILab/bbit_ai/app/config/redis.py
T
2026-02-04 13:58:18 +08:00

72 lines
1.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import redis
# ---------------- Redis Client ----------------
class RedisClient:
def __init__(self):
self.redis = redis.Redis(
"10.10.12.101",
6379,
0,
decode_responses=True,
)
# 已废弃 之前使用webhookds的方式通知 但是因为通知不及时的原因,现在在线状态全盘交给device status
# def set_online(self, device_id: str):
# key = f"device:online:{device_id}"
# self.redis.set(key, 1)
#
# def set_offline(self, device_id: str):
# key = f"device:online:{device_id}"
# self.redis.delete(key)
def set_device_info(self, device_id: str, info: dict):
"""
存储完整设备信息到 redis hash
将 bool 转为 int
"""
key = f"device:info:{device_id}"
# 转换 bool 为 int
sanitized_info = {
k: (int(v) if isinstance(v, bool) else v) for k, v in info.items()
}
self.redis.hmset(key, sanitized_info)
def get_device_info(self, device_id: str) -> dict:
key = f"device:info:{device_id}"
raw_info = self.redis.hgetall(key)
return raw_info
def get_value(self, key: str):
"""
获取 Redis 中的值,如果是 JSON 字符串自动解析为 dict
"""
value = self.redis.get(key)
if not value:
return None
try:
return json.loads(value)
except json.JSONDecodeError:
return value
def set_value(self, key: str, value, expire: int = None):
"""
存储 key-valuevalue 可以是 dict,自动序列化为 JSON
expire 单位秒
"""
if isinstance(value, (dict, list)):
value = json.dumps(value)
if expire:
self.redis.set(key, value, ex=expire)
else:
self.redis.set(key, value)
redis_client = RedisClient()