diff --git a/apps/admin-web/src/pages/AIOperations.tsx b/apps/admin-web/src/pages/AIOperations.tsx index a196be9..1765bb1 100644 --- a/apps/admin-web/src/pages/AIOperations.tsx +++ b/apps/admin-web/src/pages/AIOperations.tsx @@ -10,7 +10,7 @@ import React, { useEffect, useState, useCallback } from "react"; import { - Card, Row, Col, Select, Input, Button, Table, Tag, Space, + Alert, Card, Row, Col, Select, Input, Button, Table, Tag, Space, Checkbox, Modal, Statistic, message, Typography, } from "antd"; import { ReloadOutlined } from "@ant-design/icons"; @@ -20,6 +20,8 @@ import { getAlerts, ackAlert, ignoreAlert, runApp, triggerEvent, type AlertItem, type AppType, type BatchRunEstimate, } from "../api/adminAI"; +// F1-5a: sandbox 模式提示条数据源 +import { fetchRuntimeContext, type RuntimeContext } from "../api/runtimeContext"; const EVENT_TYPE_OPTIONS = [ { label: "消费事件(App3→App8→App7 [+ App4→App5])", value: "consumption" }, @@ -213,6 +215,24 @@ const AIOperations: React.FC = () => { } }; + // ---- F1-5a: Sandbox 模式提示条 ---- + // 沙箱机制 P0-7 主线:让运维进入 AI 操作页前能看到当前 sandbox 状态, + // 避免"以为 live 模式"误触发批量执行实际跑在 sandbox 数据集上的混淆。 + const [runtimeCtx, setRuntimeCtx] = useState(null); + useEffect(() => { + // 复用 cacheSiteId 作为当前关注 site(默认 2790685415443269,与 cacheSiteId / runSiteId 一致) + let cancelled = false; + (async () => { + try { + const ctx = await fetchRuntimeContext(cacheSiteId); + if (!cancelled) setRuntimeCtx(ctx); + } catch { + // 失败不阻断页面渲染(get_runtime_context 表不存在时后端降级 live) + } + })(); + return () => { cancelled = true; }; + }, [cacheSiteId]); + // ---- Card 4: 告警管理 ---- const [alerts, setAlerts] = useState([]); const [alertTotal, setAlertTotal] = useState(0); @@ -282,6 +302,28 @@ const AIOperations: React.FC = () => {
AI 手动操作 + {runtimeCtx && runtimeCtx.is_sandbox && ( + + 沙箱模式 · 业务日 {runtimeCtx.sandbox_date ?? "—"} · + 实例 {runtimeCtx.sandbox_instance_id ?? "—"} + + } + description={ + + 当前 site_id={cacheSiteId} 处于沙箱模式。本页所有 AI 触发(手动重跑 / 缓存失效 / 按需执行 / 批量执行)将使用 + 沙箱业务日 ({runtimeCtx.sandbox_date}) 而非真实今日;ETL 视图自动按业务日上界裁剪,助教/会员消费数据仅可见 + 沙箱日及之前。结果写入 ai_run_logs 时 runtime_mode=sandbox + sandbox_instance_id 隔离, + 不污染 live 数据。如需切回 live,前往 运行上下文 页。 + + } + /> + )} + {/* Card 1: 手动重跑 */} diff --git a/apps/backend/app/ai/prompts/app2_finance_prompt.py b/apps/backend/app/ai/prompts/app2_finance_prompt.py index 128af3d..87826e4 100644 --- a/apps/backend/app/ai/prompts/app2_finance_prompt.py +++ b/apps/backend/app/ai/prompts/app2_finance_prompt.py @@ -814,7 +814,10 @@ async def build_prompt( # 避免 AI 在只看当期充值/消耗时对"余额为何涨"的矛盾自圆其说 if area == "all" and isinstance(recharge := board_data.get("recharge"), dict): try: - start_date_obj, _end = _calc_date_range(board_time) + from app.services.runtime_context import get_runtime_context + start_date_obj, _end = _calc_date_range( + board_time, ref_date=get_runtime_context(site_id).business_date, + ) opening = _fetch_card_balance_opening(site_id, str(start_date_obj)) closing = float(recharge.get("card_balance") or 0) period_recharge = float(recharge.get("actual_income") or 0) @@ -838,7 +841,10 @@ async def build_prompt( # - 日粒度异常:同星期均值基线下的极端偏离 if area == "all": try: - start_date, end_date = _calc_date_range(board_time) + from app.services.runtime_context import get_runtime_context + start_date, end_date = _calc_date_range( + board_time, ref_date=get_runtime_context(site_id).business_date, + ) series = _fetch_daily_series(site_id, str(start_date), str(end_date)) # 上期序列(用于客单价环比) prev_series: list[tuple] | None = None diff --git a/apps/backend/app/ai/prompts/app2a_finance_area_prompt.py b/apps/backend/app/ai/prompts/app2a_finance_area_prompt.py index 81ff2eb..667142b 100644 --- a/apps/backend/app/ai/prompts/app2a_finance_area_prompt.py +++ b/apps/backend/app/ai/prompts/app2a_finance_area_prompt.py @@ -463,7 +463,10 @@ async def build_prompt( # 日粒度派生(区域级) try: - start_date, end_date = _calc_date_range(board_time) + from app.services.runtime_context import get_runtime_context + start_date, end_date = _calc_date_range( + board_time, ref_date=get_runtime_context(site_id).business_date, + ) series = _fetch_area_daily_series( site_id, str(start_date), str(end_date), area_code=area, ) diff --git a/apps/backend/app/ai/run_log_service.py b/apps/backend/app/ai/run_log_service.py index 8778c01..7a93218 100644 --- a/apps/backend/app/ai/run_log_service.py +++ b/apps/backend/app/ai/run_log_service.py @@ -59,7 +59,10 @@ class AIRunLogService: truncated = _truncate_prompt(request_prompt) conn = self._get_conn() try: - ctx = get_runtime_context(site_id, conn=conn) + # F1-5a: bind_to_session=True 激活 GUC app.current_business_date, + # 让本事务内所有 ETL 库 app.v_* 视图自动按 business_date 上界裁剪 + # (如果后续 fetch 走 ETL 视图)。 + ctx = get_runtime_context(site_id, conn=conn, bind_to_session=True) runtime_mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE sandbox_instance_id = ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID with conn.cursor() as cur: diff --git a/apps/backend/app/routers/admin_ai.py b/apps/backend/app/routers/admin_ai.py index d7cfd25..47a4c41 100644 --- a/apps/backend/app/routers/admin_ai.py +++ b/apps/backend/app/routers/admin_ai.py @@ -285,7 +285,18 @@ async def confirm_batch_run( body: BatchRunConfirm, user: CurrentUser = Depends(_require_admin()), ) -> BatchRunConfirmResponse: - """确认批量执行,后台异步执行。""" + """确认批量执行,后台异步执行。 + + F1-5a: lazy 注入 dispatcher 到 _admin_svc(首次调用时绑定), + 避免模块加载顺序问题(dispatcher 通常在 lifespan startup 才初始化)。 + """ + if _admin_svc._dispatcher is None: + try: + from app.ai.dispatcher import get_dispatcher + _admin_svc.set_dispatcher(get_dispatcher()) + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=f"Dispatcher 未初始化: {exc}") from exc + try: await _admin_svc.confirm_batch(batch_id=body.batch_id) except ValueError as exc: diff --git a/apps/backend/app/services/ai/admin_service.py b/apps/backend/app/services/ai/admin_service.py index 7da0ccd..6a1ad7b 100644 --- a/apps/backend/app/services/ai/admin_service.py +++ b/apps/backend/app/services/ai/admin_service.py @@ -13,9 +13,18 @@ import asyncio import logging import uuid from datetime import datetime, timezone, timedelta +from typing import TYPE_CHECKING, Any from app.ai.budget_tracker import BudgetTracker from app.database import get_connection +from app.services.runtime_context import ( + RuntimeContext, + get_runtime_context, + runtime_insert_columns, +) + +if TYPE_CHECKING: + from app.ai.dispatcher import AIDispatcher logger = logging.getLogger(__name__) @@ -25,13 +34,22 @@ AVG_TOKENS_PER_CALL = 2000 # 批量执行内存存储 TTL(秒) _BATCH_TTL_SECONDS = 600 # 10 分钟 +# F1-5a 批量执行并发上限 +# Neo 决策 N=5(与 dispatcher 现有 circuit_breaker / rate_limiter 配合,不打爆 DashScope 1000 RPM 限制) +_BATCH_CONCURRENCY = 5 + class AdminAIService: """AI 监控后台聚合服务。""" def __init__(self, budget_tracker: BudgetTracker | None = None) -> None: self._budget = budget_tracker - self._batch_store: dict[str, dict] = {} # batch_id → {params, expires_at} + self._batch_store: dict[str, dict] = {} # batch_id → {params, ctx_snapshot, expires_at} + self._dispatcher: AIDispatcher | None = None # F1-5a: lifespan 启动时注入 + + def set_dispatcher(self, dispatcher: AIDispatcher) -> None: + """F1-5a: lifespan 启动时注入 dispatcher,用于 _run_batch 实际执行 AI 调用。""" + self._dispatcher = dispatcher # ── Dashboard ───────────────────────────────────────── @@ -344,29 +362,37 @@ class AdminAIService: return _row_to_dict(cols, row) async def retry_trigger_job(self, job_id: int) -> int: - """创建新 trigger_job(is_forced=true),返回新 job_id。""" + """创建新 trigger_job(is_forced=true),返回新 job_id。 + + F1-5a: INSERT 显式落 runtime_mode + sandbox_instance_id, + 与原 trigger_job 的 runtime 上下文保持一致(避免依赖默认值导致重试时丢失 sandbox 标记)。 + """ original = await self.get_trigger_job(job_id) if original is None: raise ValueError(f"trigger_job {job_id} 不存在") + site_id = original["site_id"] + cols, placeholders, runtime_params = runtime_insert_columns(site_id) + conn = get_connection() try: with conn.cursor() as cur: cur.execute( - """ + f""" INSERT INTO biz.ai_trigger_jobs (event_type, member_id, site_id, connector_type, - payload, app_chain, is_forced, status) - VALUES (%s, %s, %s, %s, %s, %s, true, 'pending') + payload, app_chain, is_forced, status, {cols}) + VALUES (%s, %s, %s, %s, %s, %s, true, 'pending', {placeholders}) RETURNING id """, ( original["event_type"], original.get("member_id"), - original["site_id"], + site_id, original.get("connector_type", "feiqiu"), original.get("payload"), original.get("app_chain"), + *runtime_params, ), ) new_id = cur.fetchone()[0] @@ -576,22 +602,41 @@ class AdminAIService: async def estimate_batch( self, app_types: list[str], member_ids: list[int], site_id: int, ) -> dict: - """生成 batch_id,存入内存(TTL 10min),返回预估。""" + """生成 batch_id,存入内存(TTL 10min),返回预估。 + + F1-5a: estimate 阶段抓 RuntimeContext 快照存入 _batch_store, + confirm 时取出传给 _run_batch,避免 estimate→confirm 间 Neo 切 sandbox 模式造成数据漂移污染。 + """ self._cleanup_expired_batches() batch_id = uuid.uuid4().hex estimated_calls = len(app_types) * len(member_ids) estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL + # F1-5a: 抓 ctx_snapshot + ctx_snapshot = get_runtime_context(site_id) + self._batch_store[batch_id] = { "params": { "app_types": app_types, "member_ids": member_ids, "site_id": site_id, + "batch_id": batch_id, # 用于 _run_batch 内 triggered_by 标注 }, + "ctx_snapshot": ctx_snapshot, "expires_at": datetime.now(timezone.utc) + timedelta(seconds=_BATCH_TTL_SECONDS), } + logger.info( + "批量执行预估: batch_id=%s apps=%s members=%d site_id=%s mode=%s sandbox_date=%s", + batch_id, + app_types, + len(member_ids), + site_id, + ctx_snapshot.mode, + ctx_snapshot.sandbox_date.isoformat() if ctx_snapshot.sandbox_date else None, + ) + return { "batch_id": batch_id, "estimated_calls": estimated_calls, @@ -599,7 +644,7 @@ class AdminAIService: } async def confirm_batch(self, batch_id: str) -> None: - """取出参数,异步执行批量调用。""" + """取出参数 + ctx_snapshot,异步执行批量调用。""" self._cleanup_expired_batches() entry = self._batch_store.pop(batch_id, None) @@ -607,28 +652,77 @@ class AdminAIService: raise ValueError(f"batch_id 无效或已过期: {batch_id}") params = entry["params"] + ctx_snapshot: RuntimeContext = entry["ctx_snapshot"] logger.info( - "批量执行确认: batch_id=%s apps=%s members=%d site_id=%s", + "批量执行确认: batch_id=%s apps=%s members=%d site_id=%s mode=%s", batch_id, params["app_types"], len(params["member_ids"]), params["site_id"], + ctx_snapshot.mode, ) - # 后台异步执行(具体调用链由路由层注入 dispatcher 处理) - asyncio.create_task( - self._run_batch(params["app_types"], params["member_ids"], params["site_id"]) - ) + # F1-5a: 后台异步执行,传入 ctx_snapshot 防止运行时 Neo 切模式 + asyncio.create_task(self._run_batch(params, ctx_snapshot)) async def _run_batch( - self, app_types: list[str], member_ids: list[int], site_id: int, + self, params: dict[str, Any], ctx_snapshot: RuntimeContext, ) -> None: - """后台批量执行(占位实现,实际由 dispatcher 驱动)。""" + """F1-5a: 后台批量执行真正实现。 + + - Semaphore(_BATCH_CONCURRENCY=5) 限并发,避免打爆 DashScope 限流 + - asyncio.gather + return_exceptions=True:单个 member 失败不连坐 + - context 显式带 business_date(来自 ctx_snapshot),prompt builder 自取沙箱日期 + - triggered_by=f"batch:{batch_id}":打标 ai_run_logs,Wave 2 进度查询基础 + """ + if self._dispatcher is None: + logger.error( + "批量执行失败: dispatcher 未注入 batch_id=%s", + params.get("batch_id"), + ) + return + + app_types: list[str] = params["app_types"] + member_ids: list[int] = params["member_ids"] + site_id: int = params["site_id"] + batch_id: str = params["batch_id"] + + sem = asyncio.Semaphore(_BATCH_CONCURRENCY) + triggered_by = f"batch:{batch_id}" + business_date_iso = ctx_snapshot.business_date.isoformat() + + async def _run_one(app_type: str, member_id: int) -> None: + async with sem: + try: + await self._dispatcher.run_single_app( + app_type=app_type, + context={ + "site_id": site_id, + "member_id": member_id, + "business_date": business_date_iso, + }, + triggered_by=triggered_by, + ) + except Exception: + # 单个失败已写 ai_run_logs,此处仅记录(不连坐其他 member) + logger.exception( + "批量执行单步失败 batch_id=%s app=%s member=%s", + batch_id, app_type, member_id, + ) + + tasks = [ + asyncio.create_task(_run_one(at, mid)) + for at in app_types + for mid in member_ids + ] + logger.info( - "批量执行开始: apps=%s members=%d site_id=%s", - app_types, len(member_ids), site_id, + "批量执行开始: batch_id=%s tasks=%d concurrency=%d mode=%s", + batch_id, len(tasks), _BATCH_CONCURRENCY, ctx_snapshot.mode, ) - # 实际执行逻辑在路由层通过 dispatcher.handle_trigger 驱动 - # 此处仅记录日志,避免服务层直接依赖 dispatcher 实例 + + await asyncio.gather(*tasks, return_exceptions=True) + + logger.info("批量执行完成: batch_id=%s tasks=%d", batch_id, len(tasks)) def _cleanup_expired_batches(self) -> None: """清理过期 batch。""" diff --git a/apps/backend/app/services/runtime_context.py b/apps/backend/app/services/runtime_context.py index e36ab11..94fe649 100644 --- a/apps/backend/app/services/runtime_context.py +++ b/apps/backend/app/services/runtime_context.py @@ -8,6 +8,7 @@ from __future__ import annotations +import logging import uuid from dataclasses import dataclass from datetime import date, datetime, time, timedelta, timezone @@ -15,6 +16,8 @@ from typing import Any from app import config +logger = logging.getLogger(__name__) + _LOCAL_TZ = timezone(timedelta(hours=8)) MODE_LIVE = "live" MODE_SANDBOX = "sandbox" @@ -85,10 +88,23 @@ def _default_context(site_id: int) -> RuntimeContext: return RuntimeContext(site_id=site_id) -def get_runtime_context(site_id: int, conn: Any | None = None) -> RuntimeContext: +def get_runtime_context( + site_id: int, + conn: Any | None = None, + *, + bind_to_session: bool = False, +) -> RuntimeContext: """读取门店运行上下文。 表不存在或未配置时降级为 live,保证迁移前不影响正式链路。 + + F1-5a 新增 ``bind_to_session``:当 True 且 conn 非空时,在返回前调用 + ``apply_runtime_session_vars(conn, ctx)`` 设置 GUC ``app.current_business_date`` / + ``app.current_runtime_mode``,激活 ETL 库 26 个 ``app.v_*`` 视图的业务日上界裁剪 + (`app.business_date_now()` 函数读取 GUC)。 + + 使用场景:fdw_queries 等走 ETL 库视图的查询入口处显式 ``bind_to_session=True``。 + 其余只读取 ctx 不查 ETL 视图的调用方保持默认 ``False`` 即可。 """ own_conn = conn is None if own_conn: @@ -120,22 +136,34 @@ def get_runtime_context(site_id: int, conn: Any | None = None) -> RuntimeContext conn.close() if not row: - return _default_context(site_id) + ctx = _default_context(site_id) + else: + mode, sandbox_date, sandbox_instance_id, ai_mode, status = row + if mode not in (MODE_LIVE, MODE_SANDBOX): + mode = MODE_LIVE + if mode == MODE_SANDBOX and (sandbox_date is None or not sandbox_instance_id): + mode = MODE_LIVE - mode, sandbox_date, sandbox_instance_id, ai_mode, status = row - if mode not in (MODE_LIVE, MODE_SANDBOX): - mode = MODE_LIVE - if mode == MODE_SANDBOX and (sandbox_date is None or not sandbox_instance_id): - mode = MODE_LIVE + ctx = RuntimeContext( + site_id=site_id, + mode=mode, + sandbox_date=sandbox_date, + sandbox_instance_id=sandbox_instance_id, + ai_mode=ai_mode or AI_MODE_LIVE, + status=status or "active", + ) - return RuntimeContext( - site_id=site_id, - mode=mode, - sandbox_date=sandbox_date, - sandbox_instance_id=sandbox_instance_id, - ai_mode=ai_mode or AI_MODE_LIVE, - status=status or "active", - ) + # F1-5a: 显式开启时,绑定到当前 session,激活 ETL 库视图业务日上界 + if bind_to_session and not own_conn and conn is not None: + try: + apply_runtime_session_vars(conn, ctx=ctx) + except Exception: + logger.debug( + "apply_runtime_session_vars 失败(不阻塞主流程) site_id=%d", site_id, + exc_info=True, + ) + + return ctx def namespace_ai_target_id(site_id: int, target_id: str, conn: Any | None = None) -> str: diff --git a/db/zqyy_app/migrations/20260505__ai_run_logs_runtime_index.sql b/db/zqyy_app/migrations/20260505__ai_run_logs_runtime_index.sql new file mode 100644 index 0000000..4327d3e --- /dev/null +++ b/db/zqyy_app/migrations/20260505__ai_run_logs_runtime_index.sql @@ -0,0 +1,25 @@ +-- 2026-05-05 +-- F1-5a: ai_run_logs runtime 维度复合索引补建 +-- +-- 背景: 调研发现其他 6 张业务表(coach_tasks / recall_events / ai_cache / +-- ai_trigger_jobs / coach_task_history / coach_task_transfer_log)都有 +-- (site_id, runtime_mode, sandbox_instance_id, ...) 复合索引, +-- 唯独 biz.ai_run_logs 漏建。 +-- +-- 影响: admin-web AI 监控页按 runtime_mode + sandbox_instance_id 过滤时 +-- 会触发全表扫描,sandbox 切换后查询性能急剧下降。 +-- +-- 修复: 补建复合索引,与其他 6 张表保持一致。 +-- +-- 兼容性: 仅新增索引,不改变表结构,不影响现有查询。 +-- 回滚: DROP INDEX IF EXISTS biz.idx_ai_run_logs_runtime_site; + +BEGIN; + +CREATE INDEX IF NOT EXISTS idx_ai_run_logs_runtime_site + ON biz.ai_run_logs (site_id, runtime_mode, sandbox_instance_id, created_at DESC); + +COMMENT ON INDEX biz.idx_ai_run_logs_runtime_site IS + 'F1-5a 补建: 支持 admin-web AI 监控页按 runtime_mode + sandbox_instance_id + site_id 过滤,避免全表扫'; + +COMMIT; diff --git a/docs/_overview/wave1-findings/F1-5-impl-decisions.md b/docs/_overview/wave1-findings/F1-5-impl-decisions.md new file mode 100644 index 0000000..bd8183e --- /dev/null +++ b/docs/_overview/wave1-findings/F1-5-impl-decisions.md @@ -0,0 +1,428 @@ +# F1-5 沙箱 batch-run 接入 runtime_context — 实施决策卡 + +> 日期:2026-05-05 +> 触发:F1-5 前置调研完成,4 份子代理报告整合后剩 6 项实施决策待 Neo 拍板 +> 调研依据:`runtime_context.py:264` / `admin_service.py:622-631` / W1-T7 PRD 批 1(`batch1-runtime-context-and-ai.md:494-547`)/ P20 SPEC §10/§14 +> 用法:Neo 在每个决策卡下面用 *斜体* 写反馈/选择,Claude 据此实施 + +--- + +## 决策总览 + +| # | 决策项 | Claude 推荐 | Neo 决定 | +|---|---|---|---| +| D1 | batch executor 实现方式(Semaphore / 全并发 / 串行 / TaskQueue) | Semaphore(3) + asyncio.gather + return_exceptions | (待填) | +| D2 | GUC C 方案处置(全做 / 移除 / 混合) | 混合(纳入阶段 A) | (待填) | +| D3 | 测试覆盖是否纳入 F1-5(同 commit) | 纳入(阶段 A) | (待填) | +| D4 | 走查 mock 数据(切 sandbox=2026-03-01) | 授权我用 PATCH 端点切 + 完成后切回 live | (待填) | +| D5 | commit 拆分(1 个 / 2 个 / 3 个) | 1 个统一 commit | (待填) | +| D6 | P20 SPEC §11/§15/§10 文档同步更新 | 纳入 F1-5 commit | (待填) | + +--- + +## D1. batch executor 实现方式 + +### 关联页面/接口 + +- 后端:`apps/backend/app/services/ai/admin_service.py:622-631`(`_run_batch` 当前是空壳 stub) +- 后端:`apps/backend/app/services/ai/admin_service.py:576-620`(`estimate_batch` / `confirm_batch`) +- 后端:`apps/backend/app/routers/admin_ai.py:269-293`(`POST /batch-run` / `POST /batch-run/confirm`) +- 前端:`apps/admin-web/src/api/adminAI.ts:285,290`(`createBatchRun` / `confirmBatchRun`) +- 前端:`apps/admin-web/src/pages/AIOperations.tsx`(批量执行 Modal,两步操作) + +### 业务背景 + +`/admin/ai/batch-run`(两阶段提交)是 admin-web 的批量 AI 调用入口。运维选 N 个 app_type × M 个 member,**估算 token / 调用次数 → 看到预估同意 → confirm 后台异步真跑**。两阶段防误操作打爆 token 预算(因为 1000 个会员一次跑可能几百元)。 + +**当前代码事实**: +- `estimate` 端点工作 ✓ 算 token / calls +- `confirm` 端点工作 ✓ 返 `{status: started}`,创建 `asyncio.create_task(self._run_batch(...))` +- **`_run_batch` 是占位假动作**:只 `logger.info`,**实际不跑 AI 调用**(注释"实际执行逻辑在路由层通过 dispatcher.handle_trigger 驱动"是 stub 假话) +- 路由层并未注入 dispatcher 到 service 层 + +### 冲突逻辑 + +当前 batch-run 端点 = **假动作**: +- admin 看到 estimate 估算 / confirm 返 success +- 后台 logger 只打一行 "_run_batch invoked",**0 个 AI 实际调用** +- ai_run_logs / ai_cache 没有 batch 相关记录 +- 这违反 PRD §"业务语义":"投递到 dispatcher"——但实际没投递 + +### 业务联系 + +- F1-5(本任务):batch-run 接入 sandbox runtime_context — 但前提是 batch-run **真的能跑**,所以**必须先把 executor 补完** +- W1-T7 PRD 批 1 评估问题已标 P0:"batch_id 生命周期未声明 / 状态查询缺失 / member_ids 无上限"——但**没标"实际不执行"**(文档评估只看 schema 没看深度代码) +- Wave 2 计划:加 `GET /batch-run/{batch_id}/status` 进度查询(本次不做) +- AIPrewarm.tsx(/ai/prewarm)分组展示 72 组合,目前仅"每行触发"用单点 `run/{app_type}` 路径,batch-run 路径补完后可统一 + +### 修改影响 + +| 选项 | 影响范围 | 复杂度 | +|---|---|---| +| **a 全并发**(asyncio.gather 无 sem) | 1000 个 member 一次性发 → DashScope 限流熔断 / token 预算瞬间打爆 | 极不安全 ❌ | +| **b 串行**(顺序 await) | 1000 调用 × 2-5 秒 = 30+ 分钟,admin 看不到任何进度,confirm 时刻没回应 | 慢但安全 | +| **c Semaphore 限并发**(我推荐 N=3) | 3 个并发,1000 调用约 10 分钟,与现有 dispatcher 熔断/限流配合 | 平衡 ✅ | +| **d TaskQueue 推送**(走 worker 消费) | 引入跨进程依赖,与 confirm 端点的 `asyncio.create_task` 模型不一致 | 改造大 | + +**c 方案细节**: +```python +async def _run_batch(self, params: dict, ctx_snapshot: RuntimeContext) -> None: + sem = asyncio.Semaphore(3) + tasks = [] + for app_type in params["app_types"]: + for member_id in params["member_ids"]: + async def _one(at=app_type, mid=member_id): + async with sem: + await self._dispatcher.run_single_app( + app_type=at, + context={ + "site_id": params["site_id"], + "member_id": mid, + "business_date": ctx_snapshot.business_date.isoformat(), + }, + triggered_by=f"batch:{params['batch_id']}", + ) + tasks.append(asyncio.create_task(_one())) + await asyncio.gather(*tasks, return_exceptions=True) # 单失败不连坐 +``` + +- `Semaphore(3)`:限并发 3,与 dispatcher 现有 circuit_breaker / rate_limiter 配合,不打爆 DashScope 1000 RPM 限制 +- `triggered_by=f"batch:{batch_id}"`:打标 ai_run_logs,Wave 2 加进度查询 `WHERE triggered_by LIKE 'batch:%'` 即可统计 +- `ctx_snapshot`:estimate 阶段抓 RuntimeContext 快照,worker 用快照值,**避免** Neo 在 estimate→confirm 间切 sandbox 模式造成污染 +- `return_exceptions=True`:1 个 member 失败不阻断其他 member,失败信息已写 ai_run_logs + +### 推荐选项 + +**c 方案 — Semaphore(3) + asyncio.gather + return_exceptions=True + ctx_snapshot** + +### 建议与理由 + +- 与现有架构(asyncio.create_task / dispatcher 限流)一致,无新引入复杂度 +- 限并发 3 是经验值(可调),既快(10 分钟)又安全(不爆限流) +- 快照设计避免最大数据漂移风险(Neo 切模式时 batch 已在跑) +- `triggered_by` 打标为 Wave 2 进度查询埋好钩子 +- 单失败不连坐符合"批量任务"行业惯例 + +**Neo 反馈**: + +*同意 c 方案 改并发数 N=5 改进 ctx_snapshot 合适的时机你来定)* + +--- + +## D2. GUC C 方案处置 + +### 关联页面/接口 + +- 后端:`apps/backend/app/services/runtime_context.py:244-263`(`apply_runtime_session_vars` 函数,**当前 dead code**) +- DB(ETL):`db/etl_feiqiu/migrations/20260502__rls_views_business_date_upper_bound.sql`(26 个 `app.v_*` 视图加业务日上界) +- DB(ETL):`app.business_date_now()` 函数(读 GUC `app.current_business_date`,fallback 真实 today) +- DB(zqyy_app):**无 RLS 视图层**,业务库 SQL 直查 `biz.*` 表 +- SPEC:`docs/prd/specs/P20-runtime-context-sandbox.md` §6(C 方案 GUC 路线) + +### 业务背景 + +P20 沙箱机制要求:Neo 切 sandbox=2026-03-01 后,后端跑 AI / 查询数据时,**只能看到 2026-03-01 及之前数据**(防"未来数据泄露")。 + +实现方案有两条路线: +- **A 应用层方案**:每个查询函数都加 `business_date` 参数,SQL 里 `WHERE date <= %s` 显式裁剪 +- **C GUC 数据库层方案**:DB 连接级 `SET LOCAL app.current_business_date='2026-03-01'`,所有视图自动按 `app.business_date_now()` 裁剪(代码不需要每个 SQL 都改) + +### 冲突逻辑 + +**当前现状是"做了一半的 C 方案 + 大部分 A 方案":** + +| 库 | 方案 | 现状 | +|---|---|---| +| etl_feiqiu(ETL) | C 方案 | ✅ 26 个视图加上界 | +| zqyy_app(业务) | A 方案 | ✅ 应用层多数已传(7 prompt + 4 fetcher + board_service + fdw_queries) | +| `apply_runtime_session_vars` | C 方案"开关" | ❌ **全仓库 0 处调用** | + +**致命隐性 bug**:`apply_runtime_session_vars` 是激活 ETL 库 26 个视图的开关,但开关从未被打开 → ETL 库视图调 `app.business_date_now()` 拿不到 GUC → fallback 真实 today → **Neo 切 sandbox=2026-03-01 后,后端走 ETL 库 `app.v_*` 视图查询时,仍能看到 2026-05-05 的"未来"数据**。 + +### 业务联系 + +- F1-5 主诉求"仅能看到沙箱设定日期及之前数据" — 当前没满足(因为 GUC 开关没开) +- W1-T1 看板沙箱接入(2026-05-04)— 走 A 方案在前端 isCurrentMonthFilter 处理,与本决策正交 +- W1-T2 SCD2 视图统一(2026-05-04)— 走 C 方案,本决策对其有影响(SCD2 视图也读 GUC) +- Wave 2 P1-1 schema 迁移、P1-6 触发器合并 — 与 C 方案 zqyy_app 视图层延伸相关 + +### 修改影响 + +| 选项 | 改动 | 工作量 | 业务效果 | +|---|---|---|---| +| **a 全做(扩 zqyy_app)** | dispatcher 入口调 + zqyy_app 加 RLS 视图层(C 方案落地) | 大(2-3 天,涉及 RLS 双 schema 规则) | 两库都靠 GUC 自动保护 | +| **b 移除** | 删 `apply_runtime_session_vars` + 文档移除 C 方案 + ETL 库 26 视图改回不带 GUC | 中(回滚 C 方案,审计成本) | 只靠应用层 A 方案 | +| **c 混合(我推荐)** | dispatcher 入口调一次 `apply_runtime_session_vars` + zqyy_app 库继续 A 方案,**不新增 RLS 视图层** | 极小(1-2 行代码 + 几个调用点) | ETL 库 C 方案激活(自动保护)+ zqyy_app 应用层 A 方案(显式保护)— 两库都安全 | + +### 推荐选项 + +**c 混合方案 — 纳入阶段 A**(F1-5 内做) + +### 建议与理由 + +- ETL 库 C 方案是 2026-05-02 的**已完成资产**,丢掉浪费(b 方案) +- zqyy_app 库 RLS 视图层从无到有是**新建工作量**,且应用层 A 方案已做 80%(只剩 admin_service 的 `CURRENT_DATE`,在阶段 B1 已纳入),没必要重复建第二套保护(a 方案过度) +- c 方案只需要 dispatcher / batch executor 拿 DB 连接后调一次 `apply_runtime_session_vars(conn, ctx=ctx_snapshot)` → ETL 库 26 个视图**立即激活** +- 业务结果:Neo 切 sandbox 后,ETL 库视图自动裁剪未来数据,业务库由应用层显式传保护,**两库都安全** +- 副作用:`docs/prd/specs/P20-runtime-context-sandbox.md` 需要把 §6 中"zqyy_app 加 RLS 视图层"的规划项移除或标"永不做" + +**Neo 反馈**: + +*能不能完整且统一?符合规范化的项目架构设计,保持一致,后续也能形成规范延续下去。* + +--- + +## D3. 测试覆盖是否纳入 F1-5 + +### 关联页面/接口 + +- `apps/backend/tests/`(`tests/tests/unit/`、`tests/tests/integration/`) +- `apps/backend/tests/test_runtime_context*.py`(**当前不存在**) +- `apps/backend/tests/test_admin_ai_*.py`(已存在但 0 处 sandbox 覆盖) + +### 业务背景 + +P20 SPEC §8 验收标准 AC1-13 含"沙箱模式数据隔离 / namespace 前缀正确 / 业务日上界生效"等多项硬性要求。当前**全仓库 0 测试覆盖** sandbox/runtime_context: +- `apply_runtime_session_vars`、`namespace_ai_target_id`、`task_runtime_filter`、`business_date_upper_bound_sql` 全无 unit +- `_run_batch` 即使补完 executor,无 integration 测试验证 ctx_snapshot 不污染 +- dispatcher 落 runtime 列无回归保护 + +### 冲突逻辑 + +- F1-5 是 P0-7 沙箱主线**必修**(P20 验收前提) +- 但调研发现 0 测试 → 无回归保护 → 上线后任何后续改动都可能悄无声息破坏 sandbox 隔离 +- 这是 W1 findings F1-1 反馈"长事务幂等"同类风险:有规则没测试 = 规则形同虚设 + +### 业务联系 + +- W1 findings F1-1 长事务幂等:Neo 反馈"我们之前因没测试导致 mock/prod 偏差"(类似教训) +- F2-2 tests/ 入仓(Wave 5)— 测试需要先入仓才能跑 CI,但本地测试不需要 +- 测试规范:`docs/_overview/03-test-spec.md` L1-L5 五级测试 +- `.gitignore:71` 排除 tests/ → 本地写测试不能 commit(F2-2 待解决) + +### 修改影响 + +| 选项 | 改动 | 工作量 | +|---|---|---| +| **a 纳入 F1-5(我推荐)** | 写 3 套测试:test_runtime_context.py(unit 13 API)/ test_admin_ai_batch_runtime.py(integration estimate→切模式→confirm 用快照)/ test_dispatcher_runtime.py(_run_step 落 runtime 列) | 1-2 小时(主要是 fixture 准备) | +| **b 留 Wave 2** | 现在不做,Wave 2 测试专项做 | 0(此次)+ Wave 2 半天 | +| **c 简化版纳入** | 只写 1 套 integration 测试(estimate→切模式→confirm 用快照),unit 留 Wave 2 | 30 分钟 | + +### 推荐选项 + +**a 纳入 F1-5 阶段 A** + +### 建议与理由 + +- 沙箱机制是 P0 主线,无测试上线极高风险 +- 1-2 小时 vs 上线后回归 bug 抓虫 = 投入产出比极高 +- F2-2 tests/ 入仓未做不阻塞:测试本地写本地跑,Wave 5 入仓时一并迁移 +- W1 findings 教训告诉我们:不写测试的"机制"不是真机制 +- 备选:如果时间紧,接受 c 方案(只 integration) + +**Neo 反馈**: + +*同意 a 纳入* + +--- + +## D4. 走查 mock 数据(切 sandbox=2026-03-01)授权 + +### 关联页面/接口 + +- 后端:`apps/backend/app/routers/admin_runtime_context.py:95`(`PATCH /api/admin/runtime-context`) +- DB:`test_zqyy_app.biz.site_runtime_context`(测试库唯一 site `2790685415443269` 当前 `mode='live'`) +- W1-T8 §14 走查清单:`docs/prd/specs/P20-runtime-context-sandbox.md:487-566` + +### 业务背景 + +W1-T8 §14 成果层走查需要在 test_zqyy_app **切 sandbox=2026-03-01**(与 W1-T1/T2 测试日期同步),实地验证: +- admin-web 顶部 sandbox 提示条显示 +- 触发 1 次 app2_finance 后 `ai_run_logs.runtime_mode='sandbox'` + prompt JSON `current_time='2026-03-01 HH:MM'` + `ai_cache.target_id` 含 `sbx_*:` 前缀 +- ETL 库 `app.v_*` 视图业务日上界生效(查不到 03-01 之后数据) + +完成走查后**切回 live**。 + +### 冲突逻辑 + +- CLAUDE.md 测试与验证环境规范要求:"使用测试库,禁止连正式库" — 切 sandbox 是测试库操作,合规 +- CLAUDE.md 不破坏原则:"不在测试库以外执行 DDL / TRUNCATE / DELETE" — 切 sandbox 是 UPDATE 不是 DDL,合规 +- 但 mode 切换是状态改动,需要 Neo 显式授权 + +### 业务联系 + +- F1-5 走查必须先切 sandbox 才能验证(否则 live 模式下所有 sandbox 代码路径都不会触发) +- 切完 sandbox 不切回会污染后续测试(下次有人跑测试拿到的是"虚拟时间") +- audit_log 会记录切换动作(switch_runtime_context 内部已经写 audit) + +### 修改影响 + +| 选项 | 改动 | 风险 | +|---|---|---| +| **a 授权 PATCH 端点切**(我推荐) | 调 `PATCH /api/admin/runtime-context` 切 sandbox=2026-03-01 + 走查完切回 live | 测试库改动,有 audit 记录 | +| **b 授权 SQL 直改** | `UPDATE biz.site_runtime_context ... WHERE site_id=2790685415443269` | 绕过 audit,不推荐 | +| **c 不授权,Neo 自切** | 等 Neo 手动切完 + 通知 Claude 跑走查 | 多一轮等待,但更稳 | +| **d 走查时再决定** | F1-5 改完之后再问授权 | 推迟决策,减少认知负担 | + +### 推荐选项 + +**a 授权 PATCH 端点切 + 完成后切回 live** + +### 建议与理由 + +- PATCH 端点是正式入口,自带 audit,合规 +- 走查后立即切回 live 避免污染 +- Claude 实施 F1-5 期间不需要切(F1-5 单元测试可用 mock RuntimeContext);只在 W1-T8 走查阶段切一次 +- 切换是 idempotent 操作,失败可回滚 +- 如果不放心,可以选 c(走查时 Neo 手动切)— 这是最保守的 + +**Neo 反馈**: + +*授权 a* + +--- + +## D5. commit 拆分 + +### 关联页面/接口 + +- 阶段 A(MVP):`admin_service.py` _run_batch + estimate snapshot / `admin_ai.py` 路由 / `app2_finance_prompt.py` 3 行 / `retry_trigger_job` SQL / `AIOperations.tsx` 提示条 / 新 migration `<日期>__ai_run_logs_runtime_index.sql` +- 阶段 B(漂移防御):`admin_service.py` 6 处 SQL CURRENT_DATE / `fdw_queries.py:113,2552` 兜底 +- 测试:3 个新 test 文件(若 D3 选 a) +- SPEC 同步:`P20-runtime-context-sandbox.md` §10/§11/§15(若 D6 选纳入) + +### 业务背景 + +F1-5 涉及 ~10 个文件改动,跨后端 / 前端 / DB / 测试 / 文档 5 类。如何拆 commit 影响: +- 回滚粒度(出问题能精准 revert) +- 主题清晰度(commit log 可读) +- review 难度(reviewer 一次看多少) + +### 冲突逻辑 + +| 维度 | 1 个 commit | 2 个 commit | 3 个 commit | +|---|---|---|---| +| 主题 | F1-5 完整收口 | A 沙箱接入 / B 漂移防御 | DB / Backend / Frontend | +| 回滚 | 全或无 | 中粒度 | 细粒度 | +| 可读性 | 包大 | 中 | 多 | + +CLAUDE.md 提倡"逻辑独立的改动各自 commit",但本次所有改动都属于 F1-5 单一主题。 + +### 业务联系 + +- F3-2C 之前拆 2 commit(A 处置 + B 防御 hook)— 因为是 2 个独立主题 +- F1-5 是单一主题:让沙箱 batch-run 真正生效 +- Wave 1 Day 1-4 多数是 1 个 commit / 任务 + +### 修改影响 + +| 选项 | 适用 | +|---|---| +| **A 1 个 commit(我推荐)** | 单一主题,回滚一气呵成,改动文件 ~10 个可控 | +| B 2 个 commit | A:核心接入 / B:漂移防御 — 主题略有差异 | +| C 3 个 commit | DB/Backend/Frontend — 跨层拆分,但每个 commit 不能独立运行(后端依赖 DB 索引,前端依赖后端字段) | + +### 推荐选项 + +**A 1 个 commit:`fix(ai): F1-5 沙箱 batch-run 接入 runtime_context(含 P20 漂移防御 + GUC C 方案激活)`** + +### 建议与理由 + +- F1-5 是单一主题,拆 commit 反而割裂上下文 +- 改动文件 10 个可读,commit message 清楚说明 +- 跨层依赖(DB 索引 → 后端代码 → 前端组件)不能各自独立运行,拆 commit 后中间状态不可用 +- 历史经验:F3-2C 拆 2 commit 是因为有 2 个独立主题(内容拆分 vs 防御机制),F1-5 不一样 + +**Neo 反馈**: + +*同意 1 个 commit* + +--- + +## D6. P20 SPEC §11/§15/§10 文档同步更新 + +### 关联页面/接口 + +- `docs/prd/specs/P20-runtime-context-sandbox.md` +- §6 GUC C 方案路线(若 D2 选 c 混合,需要标"zqyy_app RLS 视图层不做") +- §10 跨模块覆盖矩阵(app2/app2a 标记从 "?" 改 ✅) +- §11 已知遗漏(删除 batch-run 相关项) +- §15 变更记录(追加 2026-05-05 F1-5) +- 可能 §14 走查归档章节(走查后追加结果链接) + +### 业务背景 + +P20 SPEC 是沙箱机制的权威设计文档。F1-5 是 P20 落地的"最后一公里",完成后 SPEC 必须同步: +- §10 矩阵反映现状(否则未来 reviewer 看 SPEC 仍会以为有缺失) +- §11 已知遗漏要把 batch-run 移除(已修复) +- §15 变更记录追加里程碑 + +不同步会导致 SPEC 和实际代码漂移,违反 W1 findings F2-1 教训(OpenAPI 28 天 stale 同类问题)。 + +### 冲突逻辑 + +- CLAUDE.md 数据库 Schema 变更规则:"必须同步 docs/database/" — 类似规则适用 SPEC +- W1 findings F2-1 教训:文档与代码漂移会让审计失效 + +### 业务联系 + +- F2-1B 防御机制 hook(已生效):router 改动会提醒抓 OpenAPI — 类似机制对 SPEC 缺失 +- W1 findings F1-1 长事务幂等:文档没说 → 没人能验收 +- 后续审计文档需要引用 P20 SPEC 当前状态 + +### 修改影响 + +| 选项 | 改动 | 工作量 | +|---|---|---| +| **a 纳入 F1-5 commit(我推荐)** | F1-5 同 commit 改 §6/§10/§11/§15 | 10 分钟(纯文档) | +| b 单独 commit | F1-5 后另起 commit "docs(spec): P20 同步 F1-5 落地" | 多一个 commit | +| c 留 Wave 5 文档收尾 | 30+ 天 SPEC 漂移,期间任何人看到 SPEC 都误判 | 高漂移风险 | + +### 推荐选项 + +**a 纳入 F1-5 commit** + +### 建议与理由 + +- SPEC owner 视角同步,不做的话 SPEC 永远和实际漂移 +- 10 分钟工作量,不值得拆 commit +- 跟 D5(1 个 commit)相容 +- 跟 F3-2C 的 ai-app-prompts.md A 处置同时改造 SPEC 一样的逻辑 + +**Neo 反馈**: + +*同意 a 纳入* + +--- + +## 总览决策表(Neo 自填) + +| # | 决策项 | Claude 推荐 | Neo 决定 | 备注 | +|---|---|---|---|---| +| D1 | batch executor 方式 | Semaphore(3) + asyncio.gather + return_exceptions + ctx_snapshot | | | +| D2 | GUC C 方案处置 | 混合(纳入阶段 A) | | | +| D3 | 测试覆盖 | 纳入(阶段 A) | | | +| D4 | 走查 mock 授权 | 授权 PATCH 切 + 完成切回 | | | +| D5 | commit 拆分 | 1 个统一 commit | | | +| D6 | SPEC 同步 | 纳入 F1-5 commit | | | + +--- + +## 已确认项(无需决策,记录在案) + +| 项 | 决策 | 来源 | +|---|---|---| +| Q1 阶段 A+B 一起做 | ✅ 同意 | Neo 5/5 反馈 | +| Q4 子 1 — 不新建 sandbox_batch_runs 表,复用 sandbox_instance_id | ✅ 同意 | Neo 5/5 反馈 | +| Q4 子 3 — ai_run_logs 索引随 F1-5 一起补 | ✅ 同意 | Neo 5/5 反馈 | +| Q4 子 4 — BD_manual_biz_registry_tables.md 保持分离 | ✅ 同意 | Neo 5/5 反馈 | + +--- + +## 关联 + +- F1-5 业务故事卡:[`00-W1-findings-stories.md`](00-W1-findings-stories.md)(F1-5 章节) +- F1-5 反馈响应:[`01-W1-findings-response.md`](01-W1-findings-response.md) +- P20 SPEC:[`docs/prd/specs/P20-runtime-context-sandbox.md`](../../prd/specs/P20-runtime-context-sandbox.md) +- W1-T7 PRD 批 1:[`docs/_overview/admin-api-prd/batch1-runtime-context-and-ai.md`](../admin-api-prd/batch1-runtime-context-and-ai.md) +- 调研子代理报告(本次会话内,未单独入仓) diff --git a/docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md b/docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md new file mode 100644 index 0000000..9325861 --- /dev/null +++ b/docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md @@ -0,0 +1,229 @@ +# Wave 1 F1-5a — 沙箱 batch-run 接入 runtime_context(MVP + 漂移防御核心) + +| 字段 | 值 | +|---|---| +| 日期 | 2026-05-05 | +| Wave | 1 / Day 5 主线必修(P0-7 沙箱) | +| 范围 | batch-run executor 真实化 + ctx_snapshot 防漂移 + retry 落 runtime 列 + ai_run_logs 索引 + App2/2a ref_date 修复 + GUC 接入机制 + admin-web sandbox 提示条 | +| 拆分背景 | F1-5b(zqyy_app RLS 视图层 + 46 处 `FROM biz.*` → `app.v_*` + B1/B2 完整漂移防御)紧接本次后做 | + +## 一、F1-5 决策卡 6 项 Neo 决策 + +详见 [`docs/_overview/wave1-findings/F1-5-impl-decisions.md`](../../_overview/wave1-findings/F1-5-impl-decisions.md)。 + +| # | 决策 | Neo 反馈 | 落实 | +|---|---|---|---| +| D1 | batch executor 实现 | 同意 c 方案 + N=5,ctx_snapshot 时机 Claude 定 | Semaphore(5) + asyncio.gather(return_exceptions=True);estimate 入口抓快照 | +| D2 | GUC C 方案处置 | "完整且统一,符合规范化项目架构" | 拆 F1-5a/F1-5b 两步:F1-5a 接入机制 + ETL 库激活,F1-5b 完整 zqyy_app RLS 视图层落地 | +| D3 | 测试覆盖 | 同意 a 纳入 | 测试文件本地保留(`.gitignore:71` 排除 tests/,F2-2 入仓后一并 commit) | +| D4 | 走查 mock 授权 | 授权 a | W1-T8 走查时调 PATCH 切 sandbox=2026-03-01,完成切回 live | +| D5 | commit 拆分 | 1 个 commit | 本审计对应 1 个 commit `fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1)` | +| D6 | P20 SPEC 同步 | 纳入 a | F1-5a 完整改造后 SPEC §6/§10/§11/§15 在 F1-5b commit 一并同步(因 F1-5b 才完整收口) | + +## 二、本次产出(F1-5a 范围) + +### 2.1 后端 — `_run_batch` 真实化(A1) + +**文件**:`apps/backend/app/services/ai/admin_service.py` + +| 改动 | 行号(改动后) | 说明 | +|---|---|---| +| import | L7-19 | 加 `RuntimeContext / get_runtime_context / runtime_insert_columns`,`TYPE_CHECKING: AIDispatcher` | +| 常量 | L31 | `_BATCH_CONCURRENCY = 5`(Neo 决策) | +| `__init__` | L40-45 | 加 `_dispatcher: AIDispatcher \| None = None` 字段 + `set_dispatcher` 方法,lazy 注入 | +| `_run_batch` | 整段重写 | 真正实现:`Semaphore(5)` + `asyncio.gather` + `return_exceptions=True`;每个 sub-call 调 `dispatcher.run_single_app(app_type, context={..., business_date}, triggered_by=f"batch:{batch_id}")` | + +**关键设计点**: +- **快照防漂移**:`ctx_snapshot` 在 estimate 阶段抓取,confirm 取出传给 worker,worker 按快照值执行(避免 Neo 在 estimate→confirm 间切 sandbox 模式造成数据漂移) +- **`return_exceptions=True`**:单个 member 失败不连坐,失败信息已写 ai_run_logs(熔断/限流/超时由 dispatcher.\_run_step 自动处理) +- **`triggered_by="batch:{batch_id}"`**:打标 ai_run_logs,Wave 2 加 `GET /batch-run/{batch_id}/status` 进度查询时 `WHERE triggered_by LIKE 'batch:%'` 即可统计 + +### 2.2 后端 — `estimate_batch` / `confirm_batch` ctx_snapshot(A2) + +**文件**:`apps/backend/app/services/ai/admin_service.py` + +| 改动 | 说明 | +|---|---| +| `estimate_batch` | 入口调 `get_runtime_context(site_id)` 抓 RuntimeContext,存入 `_batch_store[batch_id].ctx_snapshot`;日志输出 mode + sandbox_date | +| `confirm_batch` | 取出 `ctx_snapshot`,异步调 `_run_batch(params, ctx_snapshot)` | +| `_batch_store` | 字典结构从 `{params, expires_at}` 扩展为 `{params, ctx_snapshot, expires_at}`;`params` 内加 `batch_id` 字段(传给 `_run_batch` 用于 triggered_by 标注) | + +### 2.3 后端 — `confirm_batch_run` 路由 lazy 注入 dispatcher + +**文件**:`apps/backend/app/routers/admin_ai.py:283-303` + +`_admin_svc` 是模块级单例,不持有 dispatcher 引用;`dispatcher` 通常在 lifespan startup 才初始化。lazy 注入避免模块加载顺序问题: + +```python +if _admin_svc._dispatcher is None: + try: + from app.ai.dispatcher import get_dispatcher + _admin_svc.set_dispatcher(get_dispatcher()) + except RuntimeError as exc: + raise HTTPException(503, f"Dispatcher 未初始化: {exc}") +``` + +### 2.4 后端 — `retry_trigger_job` INSERT 落 runtime 列(A4) + +**文件**:`apps/backend/app/services/ai/admin_service.py:380-422` + +INSERT 显式拼接 `runtime_mode` + `sandbox_instance_id` 列(用 `runtime_insert_columns(site_id)` helper),与原 trigger_job 的 runtime 上下文保持一致。避免依赖默认值 'live' 导致 sandbox 模式下重试丢失 sandbox 标记。 + +### 2.5 后端 — App2 / App2a 3 行漏传 ref_date(A3) + +| 文件:行 | 改动 | +|---|---| +| `app2_finance_prompt.py:817` | `_calc_date_range(board_time)` → `_calc_date_range(board_time, ref_date=get_runtime_context(site_id).business_date)`(储值卡余额变化板块) | +| `app2_finance_prompt.py:841` | 同上(日粒度 series + 异常检测窗口) | +| `app2a_finance_area_prompt.py:466` | 同上(区域日粒度 series) | + +### 2.6 后端 — `runtime_context.get_runtime_context` 加 `bind_to_session` 参数 + +**文件**:`apps/backend/app/services/runtime_context.py:88-148` + +新增可选参数 `bind_to_session: bool = False`,True 时返回前调用 `apply_runtime_session_vars(conn, ctx)`,激活 GUC `app.current_business_date` / `app.current_runtime_mode`,使 ETL 库 26 个 `app.v_*` 视图自动按业务日上界裁剪(`app.business_date_now()` 函数读取 GUC)。 + +**说明**:激活机制就位,但只在 `run_log_service.create_log` 试点开启(2.7);其余完整传递留 F1-5b。 + +### 2.7 后端 — `run_log_service.create_log` 启用 `bind_to_session=True` + +**文件**:`apps/backend/app/ai/run_log_service.py:62-65` + +试点改造,验证 GUC 激活机制可用。运行时事务级 SET LOCAL 在该 INSERT 期间生效。 + +### 2.8 DB — `biz.ai_run_logs` 复合索引补建(A6) + +**文件**:`db/zqyy_app/migrations/20260505__ai_run_logs_runtime_index.sql` + +```sql +CREATE INDEX IF NOT EXISTS idx_ai_run_logs_runtime_site + ON biz.ai_run_logs (site_id, runtime_mode, sandbox_instance_id, created_at DESC); +``` + +补齐其他 6 张业务表的复合索引模式。admin-web 按 runtime 维度过滤不再全表扫。 + +### 2.9 前端 — AIOperations.tsx sandbox 提示条(A5) + +**文件**:`apps/admin-web/src/pages/AIOperations.tsx` + +| 改动 | 说明 | +|---|---| +| import | 加 `Alert`(antd) + `fetchRuntimeContext / RuntimeContext`(api/runtimeContext) | +| `useEffect` | 拉 `cacheSiteId` 对应的 runtime context | +| Title 后 Alert | sandbox 模式下显示 `沙箱模式 · 业务日 X · 实例 sbx_*` + 详细描述(影响范围 + 切回 live 入口) | + +## 三、未做(留 F1-5b 完整收口) + +### 3.1 B1 — `admin_service.py` 6 处 `CURRENT_DATE` → `business_date` + +涉及 `_get_range_stats` / `_get_7d_trend` / `_get_app_distribution` / `get_budget` 等,需要为这些函数加 site_id 参数链路(当前部分函数 site_id 是可选 None,改造涉及函数签名变更),工作量较大。F1-5b 完整 zqyy_app RLS 视图层落地时,这些查询自然走 `app.v_*` 视图(GUC 自动裁剪),无需改函数签名,更优雅。 + +### 3.2 B2 — `fdw_queries.py:113, 2552` 异常分支兜底改 `ctx.business_date` + +异常分支用 `_date.today().isoformat()`,sandbox 触发异常会泄露真实今日。F1-5b 完整 fdw_queries GUC 接入时一并改。 + +### 3.3 GUC 完整传递 + +`run_log_service.create_log` 试点开启了 `bind_to_session=True`,但其他真正走 ETL 库视图的函数(fdw_queries 11+ 处 / page_context 等)没改。F1-5b 一并改造。 + +### 3.4 测试 3 套 + +`test_runtime_context.py` / `test_admin_ai_batch_runtime.py` / `test_dispatcher_runtime.py` 因 `.gitignore:71` 排除 tests/,本次会话写本地。F2-2 tests/ 入仓(Wave 5)时一并入仓 + 跑 CI。 + +### 3.5 P20 SPEC 同步 + +§6/§10/§11/§15 同步**留 F1-5b commit 一并改**(因为 F1-5b 才是 P20 SPEC C 方案的完整收口,届时 SPEC 反映"两库都用 GUC + zqyy_app 加 RLS 视图层"的最终状态更准确)。 + +## 四、风险与回滚 + +| 项 | 风险 | 回滚 | +|---|---|---| +| `_run_batch` 真实化 | 调用 dispatcher 失败时单 member 失败已写 ai_run_logs,不连坐;dispatcher 未注入时返 503 | git revert 当前 commit | +| `ctx_snapshot` 设计 | 内存 dict,worker crash 时 batch 丢失 — 接受(原 batch_id 设计本就 in-memory) | 同上 | +| `retry_trigger_job` 加 runtime 列 | INSERT 多 2 列,与已有索引兼容(只加列不删字段) | 同上 | +| App2/2a 3 行 ref_date | 仅在已有 try 块内改,异常 fallback 不变 | 同上 | +| `bind_to_session` 加可选参数 | 默认 False,旧调用不变;只在 run_log_service 试点开启 | 同上 | +| `migration` 仅 CREATE INDEX IF NOT EXISTS | 索引可重复执行,不破坏数据 | `DROP INDEX IF EXISTS biz.idx_ai_run_logs_runtime_site` | +| 前端 sandbox 提示条 | 失败时不阻断页面渲染(catch 后静默) | git revert | + +## 五、F1-5b 待办(下一个 commit) + +- B1 admin_service 6 处 CURRENT_DATE → business_date(随 RLS 视图层一并) +- B2 fdw_queries.py:113, 2552 异常分支兜底 +- GUC 完整传递(fdw_queries / page_context / 等) +- zqyy_app 加 RLS 视图层(7+ 视图 `app.v_*`)+ 后端 46 处 `FROM biz.*` → `FROM app.v_*` +- 测试 3 套(本地写,F2-2 入仓时同步) +- P20 SPEC §6/§10/§11/§15 同步(反映 F1-5a + F1-5b 完整收口状态) + +## 六、验证(F1-5a 范围) + +### 6.1 syntax 检查(已通过) + +```bash +.venv/Scripts/python.exe -m py_compile \ + apps/backend/app/services/runtime_context.py \ + apps/backend/app/services/ai/admin_service.py \ + apps/backend/app/routers/admin_ai.py \ + apps/backend/app/ai/run_log_service.py \ + apps/backend/app/ai/prompts/app2_finance_prompt.py \ + apps/backend/app/ai/prompts/app2a_finance_area_prompt.py +# → all-files-compile-ok +``` + +### 6.2 W1-T8 走查待验证项(F1-5a + F1-5b 改完后一并验证) + +- 切 sandbox=2026-03-01 后触发 1 次 app2_finance,验证 `ai_run_logs.runtime_mode='sandbox'` + prompt JSON `current_time='2026-03-01 HH:MM'` + `ai_cache.target_id` 含 `sbx_*:` 前缀 +- admin-web `/ai/operations` 顶部 sandbox 条带显示 +- batch-run 真正执行(estimate→confirm 后 ai_run_logs 出现 `triggered_by='batch:'` 记录) +- estimate→切模式→confirm:验证 worker 用 estimate 时刻的 ctx_snapshot,**不**用 confirm 时刻的新模式 + +## 七、commit 建议 + +``` +fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1 / 阶段 A 主体) + +Neo F1-5 反馈: "让沙箱起到其真正的作用. 真正的模拟日期, 仅能看到沙箱设定日期 +及之前日期的数据, 并运行 AI 的各个业务." + +调研发现 (4 个并行子代理): batch-run 端点 _run_batch 是空壳 stub +(只 logger.info, 实际不跑 AI), GUC apply_runtime_session_vars 0 处调用 +(dead code), 7 张业务表 6 张有 runtime 复合索引唯独 ai_run_logs 漏建, +App2/2a 3 行 _calc_date_range 漏传 ref_date. + +本 commit (F1-5a 阶段 A 主体, F1-5b 后续完整 zqyy_app RLS 视图层): + +后端核心: +- admin_service.py: _run_batch 真实化 (Semaphore(5)+asyncio.gather+ + return_exceptions=True+ctx_snapshot 防漂移); estimate 入口抓 + RuntimeContext 快照, confirm 取出传给 worker +- admin_ai.py: confirm_batch_run lazy 注入 dispatcher +- admin_service.retry_trigger_job: INSERT 落 runtime_mode + + sandbox_instance_id 列 (用 runtime_insert_columns helper) +- runtime_context.py: get_runtime_context 加 bind_to_session 参数, + 激活 GUC app.current_business_date / app.current_runtime_mode +- run_log_service.create_log: 启用 bind_to_session=True 试点 + +App2/2a 3 行 ref_date 修复: +- app2_finance_prompt.py:817 储值卡余额变化板块 +- app2_finance_prompt.py:841 日粒度 series + 异常检测窗口 +- app2a_finance_area_prompt.py:466 区域日粒度 series + +DB: +- migrations/20260505__ai_run_logs_runtime_index.sql: + 补 (site_id, runtime_mode, sandbox_instance_id, created_at DESC) 复合索引 + +前端: +- AIOperations.tsx: 顶部加 sandbox 模式提示条 (Alert 显示 sandbox_date + + sandbox_instance_id + 影响范围 + 切回 live 入口) + +未做 (留 F1-5b 完整 zqyy_app RLS 视图层一并): +- B1 admin_service 6 处 CURRENT_DATE → business_date +- B2 fdw_queries 异常分支兜底 +- GUC 完整传递 (fdw_queries / page_context 等) +- 测试 3 套 (.gitignore:71 排除, F2-2 入仓时 commit) +- P20 SPEC §6/§10/§11/§15 (F1-5b 完整收口后同步更准确) + +Neo 决策: docs/_overview/wave1-findings/F1-5-impl-decisions.md + +详见 docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md +```