Files
AILab/bbit_ai/app/db/postgres/ws_manager.py
T
2025-12-29 16:30:36 +08:00

51 lines
1.6 KiB
Python

import asyncio
from typing import List
from uuid import UUID
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: List[dict] = [] # 保存 websocket 和用户信息
self.lock = asyncio.Lock()
# proj_id:0:在线状态 1:畜牧车辆进入
async def connect(
self, websocket: WebSocket, user_id: UUID, dept_id: str, proj_id: int
):
await websocket.accept()
async with self.lock:
self.active_connections.append(
{
"ws": websocket,
"user_id": user_id,
"dept_id": dept_id,
"proj_id": proj_id,
}
)
async def disconnect(self, websocket: WebSocket):
async with self.lock:
self.active_connections = [
conn for conn in self.active_connections if conn["ws"] != websocket
]
async def noticeOnlineStatus(self, message: dict):
async with self.lock:
for conn in self.active_connections:
if conn["proj_id"] == 0:
await conn["ws"].send_json(message)
async def noticeSentinel(
self, message: dict, target_departments: List[UUID] = None
):
"""
target_departments: 指定哪些部门能收到消息
"""
async with self.lock:
for conn in self.active_connections:
if target_departments:
if conn["proj_id"] == 1 and conn["dept_id"] in target_departments:
await conn["ws"].send_json(message)