Files
Neo-ZQYY/apps/backend/app/ai/event_bus.py
Neo caf179a5da feat: 2026-04-15~05-02 累积变更基线 — AI 重构 + Runtime Context + DWS 修复
涵盖(每条对应已存的审计记录):
- AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生)
  audit: 2026-04-20__ai-module-complete.md
- admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager
  audit: 2026-04-21__admin-web-ai-management-suite.md
- App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance)
  audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md
- App2 prewarm 全过滤器 + AI 触发器 cron reschedule
  audit: 2026-04-21__app2-finance-prewarm-all-filters.md
  migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql
- AppType 联合类型对齐 + adminAiAppTypes.test.ts
  audit: 2026-04-30__admin_web_ai_app_type_alignment.md
- DashScope tokens_used 提取修复
  audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md
- App3 线索完整详情 prompt
  audit: 2026-05-01__backend_app3_full_detail_prompt.md
- Runtime Context 沙箱(5-1~5-2 主线):
  - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router
  - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts
  - migration: 20260501__runtime_context_sandbox.sql
  - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py
  - database/changes: 7 份 sandbox_* 验证报告
- 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整
  + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py)

合规:
- .gitignore 启用 tmp/ 排除
- 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留)

待验证清单:
- docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md
  每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
2026-05-04 02:30:19 +08:00

124 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 事件广播总线in-process pub/sub
支持按 site_id 订阅的异步事件分发,用于:
- Phase 1.4AI 缓存主动失效 / 更新通知 → admin-web、小程序刷新
- Phase 3.1AI 告警实时推送(告警发生 / 确认 / 忽略)
设计要点:
- 仿 TaskExecutor.subscribe/unsubscribe 模式(单进程共享)
- 每个订阅者独立 asyncio.Queue互不干扰
- 订阅必须指定 site_id全局订阅需显式 site_id=None
- publish 异步写入所有订阅者 queue端点侧通过 get() 消费
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class AIEvent:
"""统一事件结构。
type 示例:
- cache_updated — 新缓存写入
- cache_invalidated — 缓存主动失效
- alert_created — 新告警Phase 3.1
- alert_updated — 告警状态变更Phase 3.1
"""
type: str
site_id: int | None
payload: dict[str, Any] = field(default_factory=dict)
class EventBus:
"""单进程事件广播总线。"""
def __init__(self) -> None:
# {site_id | None: [queue, ...]} None 表示全局订阅(收所有 site 事件)
self._subscribers: dict[int | None, list[asyncio.Queue[AIEvent | None]]] = {}
self._lock = asyncio.Lock()
async def subscribe(self, site_id: int | None) -> asyncio.Queue[AIEvent | None]:
"""订阅事件流,返回独立 asyncio.Queue。
site_id=None 表示订阅全部门店事件admin-web 全局监控用)。
site_id=<int> 表示仅订阅该门店事件(小程序或单门店后台)。
unsubscribe 时需将返回的 queue 作为参数传入。
"""
queue: asyncio.Queue[AIEvent | None] = asyncio.Queue()
async with self._lock:
self._subscribers.setdefault(site_id, []).append(queue)
return queue
async def unsubscribe(
self, site_id: int | None, queue: asyncio.Queue[AIEvent | None]
) -> None:
"""解除订阅,从订阅者列表移除 queue。"""
async with self._lock:
subs = self._subscribers.get(site_id, [])
try:
subs.remove(queue)
except ValueError:
pass
if not subs:
self._subscribers.pop(site_id, None)
def publish(self, event: AIEvent) -> int:
"""同步 publish 事件,返回送达的订阅者数。
可从任意线程 / sync 上下文调用(如 dispatcher._write_cache
内部使用 run_coroutine_threadsafe 线程安全写入 queue。
"""
targets = self._collect_targets(event.site_id)
sent = 0
for queue in targets:
try:
# 优先同步调用 put_nowait最常见同一 running loop
queue.put_nowait(event)
sent += 1
except RuntimeError:
# 无 running loop 场景极少,跳过
logger.debug("publish 无 running loop跳过 queue")
return sent
def _collect_targets(self, site_id: int | None) -> list[asyncio.Queue[AIEvent | None]]:
"""收集要推送的订阅者列表:该 site_id 的订阅者 + 全局订阅者。"""
targets: list[asyncio.Queue[AIEvent | None]] = []
if site_id is not None:
targets.extend(self._subscribers.get(site_id, []))
targets.extend(self._subscribers.get(None, []))
return targets
async def close_all(self) -> None:
"""结束时给所有订阅者发哨兵 None通知连接关闭。"""
async with self._lock:
all_queues = [q for subs in self._subscribers.values() for q in subs]
self._subscribers.clear()
for q in all_queues:
try:
q.put_nowait(None)
except Exception:
pass
# ── 单例 ──────────────────────────────────────────────────
_bus: EventBus | None = None
def get_event_bus() -> EventBus:
"""获取全局 EventBus 单例。进程启动时按需创建。"""
global _bus
if _bus is None:
_bus = EventBus()
return _bus