"""AI 事件 WebSocket 推送端点。 提供: - /ws/ai-cache/{site_id} — 缓存更新 / 失效事件 - /ws/ai-alerts/{site_id} — AI 告警事件(Phase 3.1) 协议: - 客户端连接 → 服务端 accept → 订阅 EventBus → 持续 send_json 事件 - 事件格式:{"type": "cache_updated|cache_invalidated|alert_created|...", "site_id": int, "payload": {...}} - 服务端关闭或客户端断开时清理订阅 用 site_id=-1 表示全局订阅(收所有门店事件,admin-web 全局监控用)。 """ from __future__ import annotations import logging from fastapi import APIRouter, WebSocket, WebSocketDisconnect from ..ai.event_bus import AIEvent, get_event_bus logger = logging.getLogger(__name__) ws_router = APIRouter() @ws_router.websocket("/ws/ai-cache/{site_id}") async def ws_ai_cache(websocket: WebSocket, site_id: int) -> None: """AI 缓存事件推送。 site_id=-1 表示订阅全局(收所有门店的 cache_updated / cache_invalidated)。 """ await _serve_event_stream(websocket, site_id, endpoint="ai-cache") @ws_router.websocket("/ws/ai-alerts/{site_id}") async def ws_ai_alerts(websocket: WebSocket, site_id: int) -> None: """AI 告警事件推送(Phase 3.1)。 site_id=-1 表示订阅全局告警。 事件 type: alert_created / alert_updated / budget_exceeded / circuit_opened。 """ await _serve_event_stream(websocket, site_id, endpoint="ai-alerts") async def _serve_event_stream( websocket: WebSocket, site_id: int, endpoint: str, ) -> None: """共享事件流处理逻辑。""" await websocket.accept() # -1 映射为全局订阅(None) subscribe_key: int | None = None if site_id == -1 else site_id logger.info( "WS %s 连接建立: site_id=%s", endpoint, subscribe_key if subscribe_key else "ALL", ) bus = get_event_bus() queue = await bus.subscribe(subscribe_key) try: while True: event = await queue.get() if event is None: break await websocket.send_json({ "type": event.type, "site_id": event.site_id, "payload": event.payload, }) except WebSocketDisconnect: logger.info("WS %s 客户端断开: site_id=%s", endpoint, subscribe_key) except Exception: logger.exception("WS %s 异常: site_id=%s", endpoint, subscribe_key) finally: await bus.unsubscribe(subscribe_key, queue) try: await websocket.close() except Exception: pass