"""AI 事件广播总线(in-process pub/sub)。 支持按 site_id 订阅的异步事件分发,用于: - Phase 1.4:AI 缓存主动失效 / 更新通知 → admin-web、小程序刷新 - Phase 3.1:AI 告警实时推送(告警发生 / 确认 / 忽略) 设计要点: - 仿 TaskExecutor.subscribe/unsubscribe 模式(单进程共享) - 每个订阅者独立 asyncio.Queue,互不干扰 - 订阅必须指定 site_id(全局订阅需显式 site_id=None) - publish 异步写入所有订阅者 queue;端点侧通过 get() 消费 """ from __future__ import annotations import asyncio import logging from dataclasses import dataclass, field from typing import Any logger = logging.getLogger(__name__) @dataclass class AIEvent: """统一事件结构。 type 示例: - cache_updated — 新缓存写入 - cache_invalidated — 缓存主动失效 - alert_created — 新告警(Phase 3.1) - alert_updated — 告警状态变更(Phase 3.1) """ type: str site_id: int | None payload: dict[str, Any] = field(default_factory=dict) class EventBus: """单进程事件广播总线。""" def __init__(self) -> None: # {site_id | None: [queue, ...]} None 表示全局订阅(收所有 site 事件) self._subscribers: dict[int | None, list[asyncio.Queue[AIEvent | None]]] = {} self._lock = asyncio.Lock() async def subscribe(self, site_id: int | None) -> asyncio.Queue[AIEvent | None]: """订阅事件流,返回独立 asyncio.Queue。 site_id=None 表示订阅全部门店事件(admin-web 全局监控用)。 site_id= 表示仅订阅该门店事件(小程序或单门店后台)。 unsubscribe 时需将返回的 queue 作为参数传入。 """ queue: asyncio.Queue[AIEvent | None] = asyncio.Queue() async with self._lock: self._subscribers.setdefault(site_id, []).append(queue) return queue async def unsubscribe( self, site_id: int | None, queue: asyncio.Queue[AIEvent | None] ) -> None: """解除订阅,从订阅者列表移除 queue。""" async with self._lock: subs = self._subscribers.get(site_id, []) try: subs.remove(queue) except ValueError: pass if not subs: self._subscribers.pop(site_id, None) def publish(self, event: AIEvent) -> int: """同步 publish 事件,返回送达的订阅者数。 可从任意线程 / sync 上下文调用(如 dispatcher._write_cache)。 内部使用 run_coroutine_threadsafe 线程安全写入 queue。 """ targets = self._collect_targets(event.site_id) sent = 0 for queue in targets: try: # 优先同步调用 put_nowait(最常见:同一 running loop) queue.put_nowait(event) sent += 1 except RuntimeError: # 无 running loop 场景极少,跳过 logger.debug("publish 无 running loop:跳过 queue") return sent def _collect_targets(self, site_id: int | None) -> list[asyncio.Queue[AIEvent | None]]: """收集要推送的订阅者列表:该 site_id 的订阅者 + 全局订阅者。""" targets: list[asyncio.Queue[AIEvent | None]] = [] if site_id is not None: targets.extend(self._subscribers.get(site_id, [])) targets.extend(self._subscribers.get(None, [])) return targets async def close_all(self) -> None: """结束时给所有订阅者发哨兵 None,通知连接关闭。""" async with self._lock: all_queues = [q for subs in self._subscribers.values() for q in subs] self._subscribers.clear() for q in all_queues: try: q.put_nowait(None) except Exception: pass # ── 单例 ────────────────────────────────────────────────── _bus: EventBus | None = None def get_event_bus() -> EventBus: """获取全局 EventBus 单例。进程启动时按需创建。""" global _bus if _bus is None: _bus = EventBus() return _bus