fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1 / 阶段 A 主体)

Neo F1-5 反馈: "让沙箱起到其真正的作用. 真正的模拟日期, 仅能看到沙箱设定日期
及之前日期的数据, 并运行 AI 的各个业务."

调研发现 (4 个并行子代理): batch-run 端点 _run_batch 是空壳 stub
(只 logger.info, 实际不跑 AI), GUC apply_runtime_session_vars 0 处调用
(dead code), 7 张业务表 6 张有 runtime 复合索引唯独 ai_run_logs 漏建,
App2/2a 3 行 _calc_date_range 漏传 ref_date.

本 commit (F1-5a 阶段 A 主体, F1-5b 后续完整 zqyy_app RLS 视图层):

后端核心:
- admin_service.py: _run_batch 真实化 (Semaphore(5)+asyncio.gather+
  return_exceptions=True+ctx_snapshot 防漂移); estimate 入口抓
  RuntimeContext 快照, confirm 取出传给 worker
- admin_ai.py: confirm_batch_run lazy 注入 dispatcher
- admin_service.retry_trigger_job: INSERT 落 runtime_mode +
  sandbox_instance_id 列 (用 runtime_insert_columns helper)
- runtime_context.py: get_runtime_context 加 bind_to_session 参数,
  激活 GUC app.current_business_date / app.current_runtime_mode
- run_log_service.create_log: 启用 bind_to_session=True 试点

App2/2a 3 行 ref_date 修复:
- app2_finance_prompt.py:817 储值卡余额变化板块
- app2_finance_prompt.py:841 日粒度 series + 异常检测窗口
- app2a_finance_area_prompt.py:466 区域日粒度 series

DB:
- migrations/20260505__ai_run_logs_runtime_index.sql:
  补 (site_id, runtime_mode, sandbox_instance_id, created_at DESC) 复合索引

前端:
- AIOperations.tsx: 顶部加 sandbox 模式提示条 (Alert 显示 sandbox_date +
  sandbox_instance_id + 影响范围 + 切回 live 入口)

未做 (留 F1-5b 完整 zqyy_app RLS 视图层一并):
- B1 admin_service 6 处 CURRENT_DATE -> business_date
- B2 fdw_queries 异常分支兜底
- GUC 完整传递 (fdw_queries / page_context 等)
- 测试 3 套 (.gitignore:71 排除, F2-2 入仓时 commit)
- P20 SPEC \xa76/\xa710/\xa711/\xa715 (F1-5b 完整收口后同步更准确)

Neo 决策: docs/_overview/wave1-findings/F1-5-impl-decisions.md

详见 docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md
This commit is contained in:
Neo
2026-05-05 03:01:48 +08:00
parent a99bbd9a74
commit 421e193041
10 changed files with 909 additions and 40 deletions

View File

@@ -10,7 +10,7 @@
import React, { useEffect, useState, useCallback } from "react"; import React, { useEffect, useState, useCallback } from "react";
import { import {
Card, Row, Col, Select, Input, Button, Table, Tag, Space, Alert, Card, Row, Col, Select, Input, Button, Table, Tag, Space,
Checkbox, Modal, Statistic, message, Typography, Checkbox, Modal, Statistic, message, Typography,
} from "antd"; } from "antd";
import { ReloadOutlined } from "@ant-design/icons"; import { ReloadOutlined } from "@ant-design/icons";
@@ -20,6 +20,8 @@ import {
getAlerts, ackAlert, ignoreAlert, runApp, triggerEvent, getAlerts, ackAlert, ignoreAlert, runApp, triggerEvent,
type AlertItem, type AppType, type BatchRunEstimate, type AlertItem, type AppType, type BatchRunEstimate,
} from "../api/adminAI"; } from "../api/adminAI";
// F1-5a: sandbox 模式提示条数据源
import { fetchRuntimeContext, type RuntimeContext } from "../api/runtimeContext";
const EVENT_TYPE_OPTIONS = [ const EVENT_TYPE_OPTIONS = [
{ label: "消费事件App3→App8→App7 [+ App4→App5]", value: "consumption" }, { label: "消费事件App3→App8→App7 [+ App4→App5]", value: "consumption" },
@@ -213,6 +215,24 @@ const AIOperations: React.FC = () => {
} }
}; };
// ---- F1-5a: Sandbox 模式提示条 ----
// 沙箱机制 P0-7 主线:让运维进入 AI 操作页前能看到当前 sandbox 状态,
// 避免"以为 live 模式"误触发批量执行实际跑在 sandbox 数据集上的混淆。
const [runtimeCtx, setRuntimeCtx] = useState<RuntimeContext | null>(null);
useEffect(() => {
// 复用 cacheSiteId 作为当前关注 site(默认 2790685415443269,与 cacheSiteId / runSiteId 一致)
let cancelled = false;
(async () => {
try {
const ctx = await fetchRuntimeContext(cacheSiteId);
if (!cancelled) setRuntimeCtx(ctx);
} catch {
// 失败不阻断页面渲染(get_runtime_context 表不存在时后端降级 live)
}
})();
return () => { cancelled = true; };
}, [cacheSiteId]);
// ---- Card 4: 告警管理 ---- // ---- Card 4: 告警管理 ----
const [alerts, setAlerts] = useState<AlertItem[]>([]); const [alerts, setAlerts] = useState<AlertItem[]>([]);
const [alertTotal, setAlertTotal] = useState(0); const [alertTotal, setAlertTotal] = useState(0);
@@ -282,6 +302,28 @@ const AIOperations: React.FC = () => {
<div> <div>
<Title level={4} style={{ marginBottom: 16 }}>AI </Title> <Title level={4} style={{ marginBottom: 16 }}>AI </Title>
{runtimeCtx && runtimeCtx.is_sandbox && (
<Alert
type="warning"
showIcon
style={{ marginBottom: 16 }}
message={
<span>
<strong></strong> · <strong>{runtimeCtx.sandbox_date ?? "—"}</strong> ·
<code>{runtimeCtx.sandbox_instance_id ?? "—"}</code>
</span>
}
description={
<span>
site_id={cacheSiteId} AI ( / / / )使
({runtimeCtx.sandbox_date}) ;ETL ,/
ai_run_logs runtime_mode=sandbox + sandbox_instance_id ,
live live, <a href="/settings/runtime-context"></a>
</span>
}
/>
)}
<Row gutter={16} style={{ marginBottom: 16 }}> <Row gutter={16} style={{ marginBottom: 16 }}>
{/* Card 1: 手动重跑 */} {/* Card 1: 手动重跑 */}
<Col span={12}> <Col span={12}>

View File

@@ -814,7 +814,10 @@ async def build_prompt(
# 避免 AI 在只看当期充值/消耗时对"余额为何涨"的矛盾自圆其说 # 避免 AI 在只看当期充值/消耗时对"余额为何涨"的矛盾自圆其说
if area == "all" and isinstance(recharge := board_data.get("recharge"), dict): if area == "all" and isinstance(recharge := board_data.get("recharge"), dict):
try: try:
start_date_obj, _end = _calc_date_range(board_time) from app.services.runtime_context import get_runtime_context
start_date_obj, _end = _calc_date_range(
board_time, ref_date=get_runtime_context(site_id).business_date,
)
opening = _fetch_card_balance_opening(site_id, str(start_date_obj)) opening = _fetch_card_balance_opening(site_id, str(start_date_obj))
closing = float(recharge.get("card_balance") or 0) closing = float(recharge.get("card_balance") or 0)
period_recharge = float(recharge.get("actual_income") or 0) period_recharge = float(recharge.get("actual_income") or 0)
@@ -838,7 +841,10 @@ async def build_prompt(
# - 日粒度异常:同星期均值基线下的极端偏离 # - 日粒度异常:同星期均值基线下的极端偏离
if area == "all": if area == "all":
try: try:
start_date, end_date = _calc_date_range(board_time) from app.services.runtime_context import get_runtime_context
start_date, end_date = _calc_date_range(
board_time, ref_date=get_runtime_context(site_id).business_date,
)
series = _fetch_daily_series(site_id, str(start_date), str(end_date)) series = _fetch_daily_series(site_id, str(start_date), str(end_date))
# 上期序列(用于客单价环比) # 上期序列(用于客单价环比)
prev_series: list[tuple] | None = None prev_series: list[tuple] | None = None

View File

@@ -463,7 +463,10 @@ async def build_prompt(
# 日粒度派生(区域级) # 日粒度派生(区域级)
try: try:
start_date, end_date = _calc_date_range(board_time) from app.services.runtime_context import get_runtime_context
start_date, end_date = _calc_date_range(
board_time, ref_date=get_runtime_context(site_id).business_date,
)
series = _fetch_area_daily_series( series = _fetch_area_daily_series(
site_id, str(start_date), str(end_date), area_code=area, site_id, str(start_date), str(end_date), area_code=area,
) )

View File

@@ -59,7 +59,10 @@ class AIRunLogService:
truncated = _truncate_prompt(request_prompt) truncated = _truncate_prompt(request_prompt)
conn = self._get_conn() conn = self._get_conn()
try: try:
ctx = get_runtime_context(site_id, conn=conn) # F1-5a: bind_to_session=True 激活 GUC app.current_business_date,
# 让本事务内所有 ETL 库 app.v_* 视图自动按 business_date 上界裁剪
# (如果后续 fetch 走 ETL 视图)。
ctx = get_runtime_context(site_id, conn=conn, bind_to_session=True)
runtime_mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE runtime_mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE
sandbox_instance_id = ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID sandbox_instance_id = ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID
with conn.cursor() as cur: with conn.cursor() as cur:

View File

@@ -285,7 +285,18 @@ async def confirm_batch_run(
body: BatchRunConfirm, body: BatchRunConfirm,
user: CurrentUser = Depends(_require_admin()), user: CurrentUser = Depends(_require_admin()),
) -> BatchRunConfirmResponse: ) -> BatchRunConfirmResponse:
"""确认批量执行,后台异步执行。""" """确认批量执行,后台异步执行。
F1-5a: lazy 注入 dispatcher 到 _admin_svc(首次调用时绑定),
避免模块加载顺序问题(dispatcher 通常在 lifespan startup 才初始化)。
"""
if _admin_svc._dispatcher is None:
try:
from app.ai.dispatcher import get_dispatcher
_admin_svc.set_dispatcher(get_dispatcher())
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=f"Dispatcher 未初始化: {exc}") from exc
try: try:
await _admin_svc.confirm_batch(batch_id=body.batch_id) await _admin_svc.confirm_batch(batch_id=body.batch_id)
except ValueError as exc: except ValueError as exc:

View File

@@ -13,9 +13,18 @@ import asyncio
import logging import logging
import uuid import uuid
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from typing import TYPE_CHECKING, Any
from app.ai.budget_tracker import BudgetTracker from app.ai.budget_tracker import BudgetTracker
from app.database import get_connection from app.database import get_connection
from app.services.runtime_context import (
RuntimeContext,
get_runtime_context,
runtime_insert_columns,
)
if TYPE_CHECKING:
from app.ai.dispatcher import AIDispatcher
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -25,13 +34,22 @@ AVG_TOKENS_PER_CALL = 2000
# 批量执行内存存储 TTL # 批量执行内存存储 TTL
_BATCH_TTL_SECONDS = 600 # 10 分钟 _BATCH_TTL_SECONDS = 600 # 10 分钟
# F1-5a 批量执行并发上限
# Neo 决策 N=5与 dispatcher 现有 circuit_breaker / rate_limiter 配合,不打爆 DashScope 1000 RPM 限制)
_BATCH_CONCURRENCY = 5
class AdminAIService: class AdminAIService:
"""AI 监控后台聚合服务。""" """AI 监控后台聚合服务。"""
def __init__(self, budget_tracker: BudgetTracker | None = None) -> None: def __init__(self, budget_tracker: BudgetTracker | None = None) -> None:
self._budget = budget_tracker self._budget = budget_tracker
self._batch_store: dict[str, dict] = {} # batch_id → {params, expires_at} self._batch_store: dict[str, dict] = {} # batch_id → {params, ctx_snapshot, expires_at}
self._dispatcher: AIDispatcher | None = None # F1-5a: lifespan 启动时注入
def set_dispatcher(self, dispatcher: AIDispatcher) -> None:
"""F1-5a: lifespan 启动时注入 dispatcher用于 _run_batch 实际执行 AI 调用。"""
self._dispatcher = dispatcher
# ── Dashboard ───────────────────────────────────────── # ── Dashboard ─────────────────────────────────────────
@@ -344,29 +362,37 @@ class AdminAIService:
return _row_to_dict(cols, row) return _row_to_dict(cols, row)
async def retry_trigger_job(self, job_id: int) -> int: async def retry_trigger_job(self, job_id: int) -> int:
"""创建新 trigger_jobis_forced=true返回新 job_id。""" """创建新 trigger_jobis_forced=true返回新 job_id。
F1-5a: INSERT 显式落 runtime_mode + sandbox_instance_id
与原 trigger_job 的 runtime 上下文保持一致(避免依赖默认值导致重试时丢失 sandbox 标记)。
"""
original = await self.get_trigger_job(job_id) original = await self.get_trigger_job(job_id)
if original is None: if original is None:
raise ValueError(f"trigger_job {job_id} 不存在") raise ValueError(f"trigger_job {job_id} 不存在")
site_id = original["site_id"]
cols, placeholders, runtime_params = runtime_insert_columns(site_id)
conn = get_connection() conn = get_connection()
try: try:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute( cur.execute(
""" f"""
INSERT INTO biz.ai_trigger_jobs INSERT INTO biz.ai_trigger_jobs
(event_type, member_id, site_id, connector_type, (event_type, member_id, site_id, connector_type,
payload, app_chain, is_forced, status) payload, app_chain, is_forced, status, {cols})
VALUES (%s, %s, %s, %s, %s, %s, true, 'pending') VALUES (%s, %s, %s, %s, %s, %s, true, 'pending', {placeholders})
RETURNING id RETURNING id
""", """,
( (
original["event_type"], original["event_type"],
original.get("member_id"), original.get("member_id"),
original["site_id"], site_id,
original.get("connector_type", "feiqiu"), original.get("connector_type", "feiqiu"),
original.get("payload"), original.get("payload"),
original.get("app_chain"), original.get("app_chain"),
*runtime_params,
), ),
) )
new_id = cur.fetchone()[0] new_id = cur.fetchone()[0]
@@ -576,22 +602,41 @@ class AdminAIService:
async def estimate_batch( async def estimate_batch(
self, app_types: list[str], member_ids: list[int], site_id: int, self, app_types: list[str], member_ids: list[int], site_id: int,
) -> dict: ) -> dict:
"""生成 batch_id存入内存TTL 10min返回预估。""" """生成 batch_id存入内存TTL 10min返回预估。
F1-5a: estimate 阶段抓 RuntimeContext 快照存入 _batch_store
confirm 时取出传给 _run_batch避免 estimate→confirm 间 Neo 切 sandbox 模式造成数据漂移污染。
"""
self._cleanup_expired_batches() self._cleanup_expired_batches()
batch_id = uuid.uuid4().hex batch_id = uuid.uuid4().hex
estimated_calls = len(app_types) * len(member_ids) estimated_calls = len(app_types) * len(member_ids)
estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL
# F1-5a: 抓 ctx_snapshot
ctx_snapshot = get_runtime_context(site_id)
self._batch_store[batch_id] = { self._batch_store[batch_id] = {
"params": { "params": {
"app_types": app_types, "app_types": app_types,
"member_ids": member_ids, "member_ids": member_ids,
"site_id": site_id, "site_id": site_id,
"batch_id": batch_id, # 用于 _run_batch 内 triggered_by 标注
}, },
"ctx_snapshot": ctx_snapshot,
"expires_at": datetime.now(timezone.utc) + timedelta(seconds=_BATCH_TTL_SECONDS), "expires_at": datetime.now(timezone.utc) + timedelta(seconds=_BATCH_TTL_SECONDS),
} }
logger.info(
"批量执行预估: batch_id=%s apps=%s members=%d site_id=%s mode=%s sandbox_date=%s",
batch_id,
app_types,
len(member_ids),
site_id,
ctx_snapshot.mode,
ctx_snapshot.sandbox_date.isoformat() if ctx_snapshot.sandbox_date else None,
)
return { return {
"batch_id": batch_id, "batch_id": batch_id,
"estimated_calls": estimated_calls, "estimated_calls": estimated_calls,
@@ -599,7 +644,7 @@ class AdminAIService:
} }
async def confirm_batch(self, batch_id: str) -> None: async def confirm_batch(self, batch_id: str) -> None:
"""取出参数,异步执行批量调用。""" """取出参数 + ctx_snapshot,异步执行批量调用。"""
self._cleanup_expired_batches() self._cleanup_expired_batches()
entry = self._batch_store.pop(batch_id, None) entry = self._batch_store.pop(batch_id, None)
@@ -607,28 +652,77 @@ class AdminAIService:
raise ValueError(f"batch_id 无效或已过期: {batch_id}") raise ValueError(f"batch_id 无效或已过期: {batch_id}")
params = entry["params"] params = entry["params"]
ctx_snapshot: RuntimeContext = entry["ctx_snapshot"]
logger.info( logger.info(
"批量执行确认: batch_id=%s apps=%s members=%d site_id=%s", "批量执行确认: batch_id=%s apps=%s members=%d site_id=%s mode=%s",
batch_id, batch_id,
params["app_types"], params["app_types"],
len(params["member_ids"]), len(params["member_ids"]),
params["site_id"], params["site_id"],
ctx_snapshot.mode,
) )
# 后台异步执行(具体调用链由路由层注入 dispatcher 处理) # F1-5a: 后台异步执行,传入 ctx_snapshot 防止运行时 Neo 切模式
asyncio.create_task( asyncio.create_task(self._run_batch(params, ctx_snapshot))
self._run_batch(params["app_types"], params["member_ids"], params["site_id"])
)
async def _run_batch( async def _run_batch(
self, app_types: list[str], member_ids: list[int], site_id: int, self, params: dict[str, Any], ctx_snapshot: RuntimeContext,
) -> None: ) -> None:
"""后台批量执行(占位实现,实际由 dispatcher 驱动)。""" """F1-5a: 后台批量执行真正实现。
- Semaphore(_BATCH_CONCURRENCY=5) 限并发,避免打爆 DashScope 限流
- asyncio.gather + return_exceptions=True单个 member 失败不连坐
- context 显式带 business_date来自 ctx_snapshotprompt builder 自取沙箱日期
- triggered_by=f"batch:{batch_id}":打标 ai_run_logsWave 2 进度查询基础
"""
if self._dispatcher is None:
logger.error(
"批量执行失败: dispatcher 未注入 batch_id=%s",
params.get("batch_id"),
)
return
app_types: list[str] = params["app_types"]
member_ids: list[int] = params["member_ids"]
site_id: int = params["site_id"]
batch_id: str = params["batch_id"]
sem = asyncio.Semaphore(_BATCH_CONCURRENCY)
triggered_by = f"batch:{batch_id}"
business_date_iso = ctx_snapshot.business_date.isoformat()
async def _run_one(app_type: str, member_id: int) -> None:
async with sem:
try:
await self._dispatcher.run_single_app(
app_type=app_type,
context={
"site_id": site_id,
"member_id": member_id,
"business_date": business_date_iso,
},
triggered_by=triggered_by,
)
except Exception:
# 单个失败已写 ai_run_logs此处仅记录不连坐其他 member
logger.exception(
"批量执行单步失败 batch_id=%s app=%s member=%s",
batch_id, app_type, member_id,
)
tasks = [
asyncio.create_task(_run_one(at, mid))
for at in app_types
for mid in member_ids
]
logger.info( logger.info(
"批量执行开始: apps=%s members=%d site_id=%s", "批量执行开始: batch_id=%s tasks=%d concurrency=%d mode=%s",
app_types, len(member_ids), site_id, batch_id, len(tasks), _BATCH_CONCURRENCY, ctx_snapshot.mode,
) )
# 实际执行逻辑在路由层通过 dispatcher.handle_trigger 驱动
# 此处仅记录日志,避免服务层直接依赖 dispatcher 实例 await asyncio.gather(*tasks, return_exceptions=True)
logger.info("批量执行完成: batch_id=%s tasks=%d", batch_id, len(tasks))
def _cleanup_expired_batches(self) -> None: def _cleanup_expired_batches(self) -> None:
"""清理过期 batch。""" """清理过期 batch。"""

View File

@@ -8,6 +8,7 @@
from __future__ import annotations from __future__ import annotations
import logging
import uuid import uuid
from dataclasses import dataclass from dataclasses import dataclass
from datetime import date, datetime, time, timedelta, timezone from datetime import date, datetime, time, timedelta, timezone
@@ -15,6 +16,8 @@ from typing import Any
from app import config from app import config
logger = logging.getLogger(__name__)
_LOCAL_TZ = timezone(timedelta(hours=8)) _LOCAL_TZ = timezone(timedelta(hours=8))
MODE_LIVE = "live" MODE_LIVE = "live"
MODE_SANDBOX = "sandbox" MODE_SANDBOX = "sandbox"
@@ -85,10 +88,23 @@ def _default_context(site_id: int) -> RuntimeContext:
return RuntimeContext(site_id=site_id) return RuntimeContext(site_id=site_id)
def get_runtime_context(site_id: int, conn: Any | None = None) -> RuntimeContext: def get_runtime_context(
site_id: int,
conn: Any | None = None,
*,
bind_to_session: bool = False,
) -> RuntimeContext:
"""读取门店运行上下文。 """读取门店运行上下文。
表不存在或未配置时降级为 live保证迁移前不影响正式链路。 表不存在或未配置时降级为 live保证迁移前不影响正式链路。
F1-5a 新增 ``bind_to_session``:当 True 且 conn 非空时,在返回前调用
``apply_runtime_session_vars(conn, ctx)`` 设置 GUC ``app.current_business_date`` /
``app.current_runtime_mode``,激活 ETL 库 26 个 ``app.v_*`` 视图的业务日上界裁剪
(`app.business_date_now()` 函数读取 GUC)。
使用场景:fdw_queries 等走 ETL 库视图的查询入口处显式 ``bind_to_session=True``。
其余只读取 ctx 不查 ETL 视图的调用方保持默认 ``False`` 即可。
""" """
own_conn = conn is None own_conn = conn is None
if own_conn: if own_conn:
@@ -120,22 +136,34 @@ def get_runtime_context(site_id: int, conn: Any | None = None) -> RuntimeContext
conn.close() conn.close()
if not row: if not row:
return _default_context(site_id) ctx = _default_context(site_id)
else:
mode, sandbox_date, sandbox_instance_id, ai_mode, status = row
if mode not in (MODE_LIVE, MODE_SANDBOX):
mode = MODE_LIVE
if mode == MODE_SANDBOX and (sandbox_date is None or not sandbox_instance_id):
mode = MODE_LIVE
mode, sandbox_date, sandbox_instance_id, ai_mode, status = row ctx = RuntimeContext(
if mode not in (MODE_LIVE, MODE_SANDBOX): site_id=site_id,
mode = MODE_LIVE mode=mode,
if mode == MODE_SANDBOX and (sandbox_date is None or not sandbox_instance_id): sandbox_date=sandbox_date,
mode = MODE_LIVE sandbox_instance_id=sandbox_instance_id,
ai_mode=ai_mode or AI_MODE_LIVE,
status=status or "active",
)
return RuntimeContext( # F1-5a: 显式开启时,绑定到当前 session,激活 ETL 库视图业务日上界
site_id=site_id, if bind_to_session and not own_conn and conn is not None:
mode=mode, try:
sandbox_date=sandbox_date, apply_runtime_session_vars(conn, ctx=ctx)
sandbox_instance_id=sandbox_instance_id, except Exception:
ai_mode=ai_mode or AI_MODE_LIVE, logger.debug(
status=status or "active", "apply_runtime_session_vars 失败(不阻塞主流程) site_id=%d", site_id,
) exc_info=True,
)
return ctx
def namespace_ai_target_id(site_id: int, target_id: str, conn: Any | None = None) -> str: def namespace_ai_target_id(site_id: int, target_id: str, conn: Any | None = None) -> str:

View File

@@ -0,0 +1,25 @@
-- 2026-05-05
-- F1-5a: ai_run_logs runtime 维度复合索引补建
--
-- 背景: 调研发现其他 6 张业务表(coach_tasks / recall_events / ai_cache /
-- ai_trigger_jobs / coach_task_history / coach_task_transfer_log)都有
-- (site_id, runtime_mode, sandbox_instance_id, ...) 复合索引,
-- 唯独 biz.ai_run_logs 漏建。
--
-- 影响: admin-web AI 监控页按 runtime_mode + sandbox_instance_id 过滤时
-- 会触发全表扫描,sandbox 切换后查询性能急剧下降。
--
-- 修复: 补建复合索引,与其他 6 张表保持一致。
--
-- 兼容性: 仅新增索引,不改变表结构,不影响现有查询。
-- 回滚: DROP INDEX IF EXISTS biz.idx_ai_run_logs_runtime_site;
BEGIN;
CREATE INDEX IF NOT EXISTS idx_ai_run_logs_runtime_site
ON biz.ai_run_logs (site_id, runtime_mode, sandbox_instance_id, created_at DESC);
COMMENT ON INDEX biz.idx_ai_run_logs_runtime_site IS
'F1-5a 补建: 支持 admin-web AI 监控页按 runtime_mode + sandbox_instance_id + site_id 过滤,避免全表扫';
COMMIT;

View File

@@ -0,0 +1,428 @@
# F1-5 沙箱 batch-run 接入 runtime_context — 实施决策卡
> 日期:2026-05-05
> 触发:F1-5 前置调研完成,4 份子代理报告整合后剩 6 项实施决策待 Neo 拍板
> 调研依据:`runtime_context.py:264` / `admin_service.py:622-631` / W1-T7 PRD 批 1(`batch1-runtime-context-and-ai.md:494-547`)/ P20 SPEC §10/§14
> 用法:Neo 在每个决策卡下面用 *斜体* 写反馈/选择,Claude 据此实施
---
## 决策总览
| # | 决策项 | Claude 推荐 | Neo 决定 |
|---|---|---|---|
| D1 | batch executor 实现方式(Semaphore / 全并发 / 串行 / TaskQueue) | Semaphore(3) + asyncio.gather + return_exceptions | (待填) |
| D2 | GUC C 方案处置(全做 / 移除 / 混合) | 混合(纳入阶段 A) | (待填) |
| D3 | 测试覆盖是否纳入 F1-5(同 commit) | 纳入(阶段 A) | (待填) |
| D4 | 走查 mock 数据(切 sandbox=2026-03-01) | 授权我用 PATCH 端点切 + 完成后切回 live | (待填) |
| D5 | commit 拆分(1 个 / 2 个 / 3 个) | 1 个统一 commit | (待填) |
| D6 | P20 SPEC §11/§15/§10 文档同步更新 | 纳入 F1-5 commit | (待填) |
---
## D1. batch executor 实现方式
### 关联页面/接口
- 后端:`apps/backend/app/services/ai/admin_service.py:622-631`(`_run_batch` 当前是空壳 stub)
- 后端:`apps/backend/app/services/ai/admin_service.py:576-620`(`estimate_batch` / `confirm_batch`)
- 后端:`apps/backend/app/routers/admin_ai.py:269-293`(`POST /batch-run` / `POST /batch-run/confirm`)
- 前端:`apps/admin-web/src/api/adminAI.ts:285,290`(`createBatchRun` / `confirmBatchRun`)
- 前端:`apps/admin-web/src/pages/AIOperations.tsx`(批量执行 Modal,两步操作)
### 业务背景
`/admin/ai/batch-run`(两阶段提交)是 admin-web 的批量 AI 调用入口。运维选 N 个 app_type × M 个 member,**估算 token / 调用次数 → 看到预估同意 → confirm 后台异步真跑**。两阶段防误操作打爆 token 预算(因为 1000 个会员一次跑可能几百元)。
**当前代码事实**:
- `estimate` 端点工作 ✓ 算 token / calls
- `confirm` 端点工作 ✓ 返 `{status: started}`,创建 `asyncio.create_task(self._run_batch(...))`
- **`_run_batch` 是占位假动作**:只 `logger.info`,**实际不跑 AI 调用**(注释"实际执行逻辑在路由层通过 dispatcher.handle_trigger 驱动"是 stub 假话)
- 路由层并未注入 dispatcher 到 service 层
### 冲突逻辑
当前 batch-run 端点 = **假动作**:
- admin 看到 estimate 估算 / confirm 返 success
- 后台 logger 只打一行 "_run_batch invoked",**0 个 AI 实际调用**
- ai_run_logs / ai_cache 没有 batch 相关记录
- 这违反 PRD §"业务语义":"投递到 dispatcher"——但实际没投递
### 业务联系
- F1-5(本任务):batch-run 接入 sandbox runtime_context — 但前提是 batch-run **真的能跑**,所以**必须先把 executor 补完**
- W1-T7 PRD 批 1 评估问题已标 P0:"batch_id 生命周期未声明 / 状态查询缺失 / member_ids 无上限"——但**没标"实际不执行"**(文档评估只看 schema 没看深度代码)
- Wave 2 计划:加 `GET /batch-run/{batch_id}/status` 进度查询(本次不做)
- AIPrewarm.tsx(/ai/prewarm)分组展示 72 组合,目前仅"每行触发"用单点 `run/{app_type}` 路径,batch-run 路径补完后可统一
### 修改影响
| 选项 | 影响范围 | 复杂度 |
|---|---|---|
| **a 全并发**(asyncio.gather 无 sem) | 1000 个 member 一次性发 → DashScope 限流熔断 / token 预算瞬间打爆 | 极不安全 ❌ |
| **b 串行**(顺序 await) | 1000 调用 × 2-5 秒 = 30+ 分钟,admin 看不到任何进度,confirm 时刻没回应 | 慢但安全 |
| **c Semaphore 限并发**(我推荐 N=3) | 3 个并发,1000 调用约 10 分钟,与现有 dispatcher 熔断/限流配合 | 平衡 ✅ |
| **d TaskQueue 推送**(走 worker 消费) | 引入跨进程依赖,与 confirm 端点的 `asyncio.create_task` 模型不一致 | 改造大 |
**c 方案细节**:
```python
async def _run_batch(self, params: dict, ctx_snapshot: RuntimeContext) -> None:
sem = asyncio.Semaphore(3)
tasks = []
for app_type in params["app_types"]:
for member_id in params["member_ids"]:
async def _one(at=app_type, mid=member_id):
async with sem:
await self._dispatcher.run_single_app(
app_type=at,
context={
"site_id": params["site_id"],
"member_id": mid,
"business_date": ctx_snapshot.business_date.isoformat(),
},
triggered_by=f"batch:{params['batch_id']}",
)
tasks.append(asyncio.create_task(_one()))
await asyncio.gather(*tasks, return_exceptions=True) # 单失败不连坐
```
- `Semaphore(3)`:限并发 3,与 dispatcher 现有 circuit_breaker / rate_limiter 配合,不打爆 DashScope 1000 RPM 限制
- `triggered_by=f"batch:{batch_id}"`:打标 ai_run_logs,Wave 2 加进度查询 `WHERE triggered_by LIKE 'batch:<id>%'` 即可统计
- `ctx_snapshot`:estimate 阶段抓 RuntimeContext 快照,worker 用快照值,**避免** Neo 在 estimate→confirm 间切 sandbox 模式造成污染
- `return_exceptions=True`:1 个 member 失败不阻断其他 member,失败信息已写 ai_run_logs
### 推荐选项
**c 方案 — Semaphore(3) + asyncio.gather + return_exceptions=True + ctx_snapshot**
### 建议与理由
- 与现有架构(asyncio.create_task / dispatcher 限流)一致,无新引入复杂度
- 限并发 3 是经验值(可调),既快(10 分钟)又安全(不爆限流)
- 快照设计避免最大数据漂移风险(Neo 切模式时 batch 已在跑)
- `triggered_by` 打标为 Wave 2 进度查询埋好钩子
- 单失败不连坐符合"批量任务"行业惯例
**Neo 反馈**:
*同意 c 方案 改并发数 N=5 改进 ctx_snapshot 合适的时机你来定)*
---
## D2. GUC C 方案处置
### 关联页面/接口
- 后端:`apps/backend/app/services/runtime_context.py:244-263`(`apply_runtime_session_vars` 函数,**当前 dead code**)
- DB(ETL):`db/etl_feiqiu/migrations/20260502__rls_views_business_date_upper_bound.sql`(26 个 `app.v_*` 视图加业务日上界)
- DB(ETL):`app.business_date_now()` 函数(读 GUC `app.current_business_date`,fallback 真实 today)
- DB(zqyy_app):**无 RLS 视图层**,业务库 SQL 直查 `biz.*`
- SPEC:`docs/prd/specs/P20-runtime-context-sandbox.md` §6(C 方案 GUC 路线)
### 业务背景
P20 沙箱机制要求:Neo 切 sandbox=2026-03-01 后,后端跑 AI / 查询数据时,**只能看到 2026-03-01 及之前数据**(防"未来数据泄露")。
实现方案有两条路线:
- **A 应用层方案**:每个查询函数都加 `business_date` 参数,SQL 里 `WHERE date <= %s` 显式裁剪
- **C GUC 数据库层方案**:DB 连接级 `SET LOCAL app.current_business_date='2026-03-01'`,所有视图自动按 `app.business_date_now()` 裁剪(代码不需要每个 SQL 都改)
### 冲突逻辑
**当前现状是"做了一半的 C 方案 + 大部分 A 方案":**
| 库 | 方案 | 现状 |
|---|---|---|
| etl_feiqiu(ETL) | C 方案 | ✅ 26 个视图加上界 |
| zqyy_app(业务) | A 方案 | ✅ 应用层多数已传(7 prompt + 4 fetcher + board_service + fdw_queries) |
| `apply_runtime_session_vars` | C 方案"开关" | ❌ **全仓库 0 处调用** |
**致命隐性 bug**:`apply_runtime_session_vars` 是激活 ETL 库 26 个视图的开关,但开关从未被打开 → ETL 库视图调 `app.business_date_now()` 拿不到 GUC → fallback 真实 today → **Neo 切 sandbox=2026-03-01 后,后端走 ETL 库 `app.v_*` 视图查询时,仍能看到 2026-05-05 的"未来"数据**
### 业务联系
- F1-5 主诉求"仅能看到沙箱设定日期及之前数据" — 当前没满足(因为 GUC 开关没开)
- W1-T1 看板沙箱接入(2026-05-04)— 走 A 方案在前端 isCurrentMonthFilter 处理,与本决策正交
- W1-T2 SCD2 视图统一(2026-05-04)— 走 C 方案,本决策对其有影响(SCD2 视图也读 GUC)
- Wave 2 P1-1 schema 迁移、P1-6 触发器合并 — 与 C 方案 zqyy_app 视图层延伸相关
### 修改影响
| 选项 | 改动 | 工作量 | 业务效果 |
|---|---|---|---|
| **a 全做(扩 zqyy_app)** | dispatcher 入口调 + zqyy_app 加 RLS 视图层(C 方案落地) | 大(2-3 天,涉及 RLS 双 schema 规则) | 两库都靠 GUC 自动保护 |
| **b 移除** | 删 `apply_runtime_session_vars` + 文档移除 C 方案 + ETL 库 26 视图改回不带 GUC | 中(回滚 C 方案,审计成本) | 只靠应用层 A 方案 |
| **c 混合(我推荐)** | dispatcher 入口调一次 `apply_runtime_session_vars` + zqyy_app 库继续 A 方案,**不新增 RLS 视图层** | 极小(1-2 行代码 + 几个调用点) | ETL 库 C 方案激活(自动保护)+ zqyy_app 应用层 A 方案(显式保护)— 两库都安全 |
### 推荐选项
**c 混合方案 — 纳入阶段 A**(F1-5 内做)
### 建议与理由
- ETL 库 C 方案是 2026-05-02 的**已完成资产**,丢掉浪费(b 方案)
- zqyy_app 库 RLS 视图层从无到有是**新建工作量**,且应用层 A 方案已做 80%(只剩 admin_service 的 `CURRENT_DATE`,在阶段 B1 已纳入),没必要重复建第二套保护(a 方案过度)
- c 方案只需要 dispatcher / batch executor 拿 DB 连接后调一次 `apply_runtime_session_vars(conn, ctx=ctx_snapshot)` → ETL 库 26 个视图**立即激活**
- 业务结果:Neo 切 sandbox 后,ETL 库视图自动裁剪未来数据,业务库由应用层显式传保护,**两库都安全**
- 副作用:`docs/prd/specs/P20-runtime-context-sandbox.md` 需要把 §6 中"zqyy_app 加 RLS 视图层"的规划项移除或标"永不做"
**Neo 反馈**:
*能不能完整且统一?符合规范化的项目架构设计,保持一致,后续也能形成规范延续下去。*
---
## D3. 测试覆盖是否纳入 F1-5
### 关联页面/接口
- `apps/backend/tests/`(`tests/tests/unit/``tests/tests/integration/`)
- `apps/backend/tests/test_runtime_context*.py`(**当前不存在**)
- `apps/backend/tests/test_admin_ai_*.py`(已存在但 0 处 sandbox 覆盖)
### 业务背景
P20 SPEC §8 验收标准 AC1-13 含"沙箱模式数据隔离 / namespace 前缀正确 / 业务日上界生效"等多项硬性要求。当前**全仓库 0 测试覆盖** sandbox/runtime_context:
- `apply_runtime_session_vars``namespace_ai_target_id``task_runtime_filter``business_date_upper_bound_sql` 全无 unit
- `_run_batch` 即使补完 executor,无 integration 测试验证 ctx_snapshot 不污染
- dispatcher 落 runtime 列无回归保护
### 冲突逻辑
- F1-5 是 P0-7 沙箱主线**必修**(P20 验收前提)
- 但调研发现 0 测试 → 无回归保护 → 上线后任何后续改动都可能悄无声息破坏 sandbox 隔离
- 这是 W1 findings F1-1 反馈"长事务幂等"同类风险:有规则没测试 = 规则形同虚设
### 业务联系
- W1 findings F1-1 长事务幂等:Neo 反馈"我们之前因没测试导致 mock/prod 偏差"(类似教训)
- F2-2 tests/ 入仓(Wave 5)— 测试需要先入仓才能跑 CI,但本地测试不需要
- 测试规范:`docs/_overview/03-test-spec.md` L1-L5 五级测试
- `.gitignore:71` 排除 tests/ → 本地写测试不能 commit(F2-2 待解决)
### 修改影响
| 选项 | 改动 | 工作量 |
|---|---|---|
| **a 纳入 F1-5(我推荐)** | 写 3 套测试:test_runtime_context.py(unit 13 API)/ test_admin_ai_batch_runtime.py(integration estimate→切模式→confirm 用快照)/ test_dispatcher_runtime.py(_run_step 落 runtime 列) | 1-2 小时(主要是 fixture 准备) |
| **b 留 Wave 2** | 现在不做,Wave 2 测试专项做 | 0(此次)+ Wave 2 半天 |
| **c 简化版纳入** | 只写 1 套 integration 测试(estimate→切模式→confirm 用快照),unit 留 Wave 2 | 30 分钟 |
### 推荐选项
**a 纳入 F1-5 阶段 A**
### 建议与理由
- 沙箱机制是 P0 主线,无测试上线极高风险
- 1-2 小时 vs 上线后回归 bug 抓虫 = 投入产出比极高
- F2-2 tests/ 入仓未做不阻塞:测试本地写本地跑,Wave 5 入仓时一并迁移
- W1 findings 教训告诉我们:不写测试的"机制"不是真机制
- 备选:如果时间紧,接受 c 方案(只 integration)
**Neo 反馈**:
*同意 a 纳入*
---
## D4. 走查 mock 数据(切 sandbox=2026-03-01)授权
### 关联页面/接口
- 后端:`apps/backend/app/routers/admin_runtime_context.py:95`(`PATCH /api/admin/runtime-context`)
- DB:`test_zqyy_app.biz.site_runtime_context`(测试库唯一 site `2790685415443269` 当前 `mode='live'`)
- W1-T8 §14 走查清单:`docs/prd/specs/P20-runtime-context-sandbox.md:487-566`
### 业务背景
W1-T8 §14 成果层走查需要在 test_zqyy_app **切 sandbox=2026-03-01**(与 W1-T1/T2 测试日期同步),实地验证:
- admin-web 顶部 sandbox 提示条显示
- 触发 1 次 app2_finance 后 `ai_run_logs.runtime_mode='sandbox'` + prompt JSON `current_time='2026-03-01 HH:MM'` + `ai_cache.target_id``sbx_*:` 前缀
- ETL 库 `app.v_*` 视图业务日上界生效(查不到 03-01 之后数据)
完成走查后**切回 live**。
### 冲突逻辑
- CLAUDE.md 测试与验证环境规范要求:"使用测试库,禁止连正式库" — 切 sandbox 是测试库操作,合规
- CLAUDE.md 不破坏原则:"不在测试库以外执行 DDL / TRUNCATE / DELETE" — 切 sandbox 是 UPDATE 不是 DDL,合规
- 但 mode 切换是状态改动,需要 Neo 显式授权
### 业务联系
- F1-5 走查必须先切 sandbox 才能验证(否则 live 模式下所有 sandbox 代码路径都不会触发)
- 切完 sandbox 不切回会污染后续测试(下次有人跑测试拿到的是"虚拟时间")
- audit_log 会记录切换动作(switch_runtime_context 内部已经写 audit)
### 修改影响
| 选项 | 改动 | 风险 |
|---|---|---|
| **a 授权 PATCH 端点切**(我推荐) | 调 `PATCH /api/admin/runtime-context` 切 sandbox=2026-03-01 + 走查完切回 live | 测试库改动,有 audit 记录 |
| **b 授权 SQL 直改** | `UPDATE biz.site_runtime_context ... WHERE site_id=2790685415443269` | 绕过 audit,不推荐 |
| **c 不授权,Neo 自切** | 等 Neo 手动切完 + 通知 Claude 跑走查 | 多一轮等待,但更稳 |
| **d 走查时再决定** | F1-5 改完之后再问授权 | 推迟决策,减少认知负担 |
### 推荐选项
**a 授权 PATCH 端点切 + 完成后切回 live**
### 建议与理由
- PATCH 端点是正式入口,自带 audit,合规
- 走查后立即切回 live 避免污染
- Claude 实施 F1-5 期间不需要切(F1-5 单元测试可用 mock RuntimeContext);只在 W1-T8 走查阶段切一次
- 切换是 idempotent 操作,失败可回滚
- 如果不放心,可以选 c(走查时 Neo 手动切)— 这是最保守的
**Neo 反馈**:
*授权 a*
---
## D5. commit 拆分
### 关联页面/接口
- 阶段 A(MVP):`admin_service.py` _run_batch + estimate snapshot / `admin_ai.py` 路由 / `app2_finance_prompt.py` 3 行 / `retry_trigger_job` SQL / `AIOperations.tsx` 提示条 / 新 migration `<日期>__ai_run_logs_runtime_index.sql`
- 阶段 B(漂移防御):`admin_service.py` 6 处 SQL CURRENT_DATE / `fdw_queries.py:113,2552` 兜底
- 测试:3 个新 test 文件(若 D3 选 a)
- SPEC 同步:`P20-runtime-context-sandbox.md` §10/§11/§15(若 D6 选纳入)
### 业务背景
F1-5 涉及 ~10 个文件改动,跨后端 / 前端 / DB / 测试 / 文档 5 类。如何拆 commit 影响:
- 回滚粒度(出问题能精准 revert)
- 主题清晰度(commit log 可读)
- review 难度(reviewer 一次看多少)
### 冲突逻辑
| 维度 | 1 个 commit | 2 个 commit | 3 个 commit |
|---|---|---|---|
| 主题 | F1-5 完整收口 | A 沙箱接入 / B 漂移防御 | DB / Backend / Frontend |
| 回滚 | 全或无 | 中粒度 | 细粒度 |
| 可读性 | 包大 | 中 | 多 |
CLAUDE.md 提倡"逻辑独立的改动各自 commit",但本次所有改动都属于 F1-5 单一主题。
### 业务联系
- F3-2C 之前拆 2 commit(A 处置 + B 防御 hook)— 因为是 2 个独立主题
- F1-5 是单一主题:让沙箱 batch-run 真正生效
- Wave 1 Day 1-4 多数是 1 个 commit / 任务
### 修改影响
| 选项 | 适用 |
|---|---|
| **A 1 个 commit(我推荐)** | 单一主题,回滚一气呵成,改动文件 ~10 个可控 |
| B 2 个 commit | A:核心接入 / B:漂移防御 — 主题略有差异 |
| C 3 个 commit | DB/Backend/Frontend — 跨层拆分,但每个 commit 不能独立运行(后端依赖 DB 索引,前端依赖后端字段) |
### 推荐选项
**A 1 个 commit:`fix(ai): F1-5 沙箱 batch-run 接入 runtime_context(含 P20 漂移防御 + GUC C 方案激活)`**
### 建议与理由
- F1-5 是单一主题,拆 commit 反而割裂上下文
- 改动文件 10 个可读,commit message 清楚说明
- 跨层依赖(DB 索引 → 后端代码 → 前端组件)不能各自独立运行,拆 commit 后中间状态不可用
- 历史经验:F3-2C 拆 2 commit 是因为有 2 个独立主题(内容拆分 vs 防御机制),F1-5 不一样
**Neo 反馈**:
*同意 1 个 commit*
---
## D6. P20 SPEC §11/§15/§10 文档同步更新
### 关联页面/接口
- `docs/prd/specs/P20-runtime-context-sandbox.md`
- §6 GUC C 方案路线(若 D2 选 c 混合,需要标"zqyy_app RLS 视图层不做")
- §10 跨模块覆盖矩阵(app2/app2a 标记从 "?" 改 ✅)
- §11 已知遗漏(删除 batch-run 相关项)
- §15 变更记录(追加 2026-05-05 F1-5)
- 可能 §14 走查归档章节(走查后追加结果链接)
### 业务背景
P20 SPEC 是沙箱机制的权威设计文档。F1-5 是 P20 落地的"最后一公里",完成后 SPEC 必须同步:
- §10 矩阵反映现状(否则未来 reviewer 看 SPEC 仍会以为有缺失)
- §11 已知遗漏要把 batch-run 移除(已修复)
- §15 变更记录追加里程碑
不同步会导致 SPEC 和实际代码漂移,违反 W1 findings F2-1 教训(OpenAPI 28 天 stale 同类问题)。
### 冲突逻辑
- CLAUDE.md 数据库 Schema 变更规则:"必须同步 docs/database/" — 类似规则适用 SPEC
- W1 findings F2-1 教训:文档与代码漂移会让审计失效
### 业务联系
- F2-1B 防御机制 hook(已生效):router 改动会提醒抓 OpenAPI — 类似机制对 SPEC 缺失
- W1 findings F1-1 长事务幂等:文档没说 → 没人能验收
- 后续审计文档需要引用 P20 SPEC 当前状态
### 修改影响
| 选项 | 改动 | 工作量 |
|---|---|---|
| **a 纳入 F1-5 commit(我推荐)** | F1-5 同 commit 改 §6/§10/§11/§15 | 10 分钟(纯文档) |
| b 单独 commit | F1-5 后另起 commit "docs(spec): P20 同步 F1-5 落地" | 多一个 commit |
| c 留 Wave 5 文档收尾 | 30+ 天 SPEC 漂移,期间任何人看到 SPEC 都误判 | 高漂移风险 |
### 推荐选项
**a 纳入 F1-5 commit**
### 建议与理由
- SPEC owner 视角同步,不做的话 SPEC 永远和实际漂移
- 10 分钟工作量,不值得拆 commit
- 跟 D5(1 个 commit)相容
- 跟 F3-2C 的 ai-app-prompts.md A 处置同时改造 SPEC 一样的逻辑
**Neo 反馈**:
*同意 a 纳入*
---
## 总览决策表(Neo 自填)
| # | 决策项 | Claude 推荐 | Neo 决定 | 备注 |
|---|---|---|---|---|
| D1 | batch executor 方式 | Semaphore(3) + asyncio.gather + return_exceptions + ctx_snapshot | | |
| D2 | GUC C 方案处置 | 混合(纳入阶段 A) | | |
| D3 | 测试覆盖 | 纳入(阶段 A) | | |
| D4 | 走查 mock 授权 | 授权 PATCH 切 + 完成切回 | | |
| D5 | commit 拆分 | 1 个统一 commit | | |
| D6 | SPEC 同步 | 纳入 F1-5 commit | | |
---
## 已确认项(无需决策,记录在案)
| 项 | 决策 | 来源 |
|---|---|---|
| Q1 阶段 A+B 一起做 | ✅ 同意 | Neo 5/5 反馈 |
| Q4 子 1 — 不新建 sandbox_batch_runs 表,复用 sandbox_instance_id | ✅ 同意 | Neo 5/5 反馈 |
| Q4 子 3 — ai_run_logs 索引随 F1-5 一起补 | ✅ 同意 | Neo 5/5 反馈 |
| Q4 子 4 — BD_manual_biz_registry_tables.md 保持分离 | ✅ 同意 | Neo 5/5 反馈 |
---
## 关联
- F1-5 业务故事卡:[`00-W1-findings-stories.md`](00-W1-findings-stories.md)(F1-5 章节)
- F1-5 反馈响应:[`01-W1-findings-response.md`](01-W1-findings-response.md)
- P20 SPEC:[`docs/prd/specs/P20-runtime-context-sandbox.md`](../../prd/specs/P20-runtime-context-sandbox.md)
- W1-T7 PRD 批 1:[`docs/_overview/admin-api-prd/batch1-runtime-context-and-ai.md`](../admin-api-prd/batch1-runtime-context-and-ai.md)
- 调研子代理报告(本次会话内,未单独入仓)

View File

@@ -0,0 +1,229 @@
# Wave 1 F1-5a — 沙箱 batch-run 接入 runtime_context(MVP + 漂移防御核心)
| 字段 | 值 |
|---|---|
| 日期 | 2026-05-05 |
| Wave | 1 / Day 5 主线必修(P0-7 沙箱) |
| 范围 | batch-run executor 真实化 + ctx_snapshot 防漂移 + retry 落 runtime 列 + ai_run_logs 索引 + App2/2a ref_date 修复 + GUC 接入机制 + admin-web sandbox 提示条 |
| 拆分背景 | F1-5b(zqyy_app RLS 视图层 + 46 处 `FROM biz.*``app.v_*` + B1/B2 完整漂移防御)紧接本次后做 |
## 一、F1-5 决策卡 6 项 Neo 决策
详见 [`docs/_overview/wave1-findings/F1-5-impl-decisions.md`](../../_overview/wave1-findings/F1-5-impl-decisions.md)。
| # | 决策 | Neo 反馈 | 落实 |
|---|---|---|---|
| D1 | batch executor 实现 | 同意 c 方案 + N=5,ctx_snapshot 时机 Claude 定 | Semaphore(5) + asyncio.gather(return_exceptions=True);estimate 入口抓快照 |
| D2 | GUC C 方案处置 | "完整且统一,符合规范化项目架构" | 拆 F1-5a/F1-5b 两步:F1-5a 接入机制 + ETL 库激活,F1-5b 完整 zqyy_app RLS 视图层落地 |
| D3 | 测试覆盖 | 同意 a 纳入 | 测试文件本地保留(`.gitignore:71` 排除 tests/,F2-2 入仓后一并 commit) |
| D4 | 走查 mock 授权 | 授权 a | W1-T8 走查时调 PATCH 切 sandbox=2026-03-01,完成切回 live |
| D5 | commit 拆分 | 1 个 commit | 本审计对应 1 个 commit `fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1)` |
| D6 | P20 SPEC 同步 | 纳入 a | F1-5a 完整改造后 SPEC §6/§10/§11/§15 在 F1-5b commit 一并同步(因 F1-5b 才完整收口) |
## 二、本次产出(F1-5a 范围)
### 2.1 后端 — `_run_batch` 真实化(A1)
**文件**:`apps/backend/app/services/ai/admin_service.py`
| 改动 | 行号(改动后) | 说明 |
|---|---|---|
| import | L7-19 | 加 `RuntimeContext / get_runtime_context / runtime_insert_columns`,`TYPE_CHECKING: AIDispatcher` |
| 常量 | L31 | `_BATCH_CONCURRENCY = 5`(Neo 决策) |
| `__init__` | L40-45 | 加 `_dispatcher: AIDispatcher \| None = None` 字段 + `set_dispatcher` 方法,lazy 注入 |
| `_run_batch` | 整段重写 | 真正实现:`Semaphore(5)` + `asyncio.gather` + `return_exceptions=True`;每个 sub-call 调 `dispatcher.run_single_app(app_type, context={..., business_date}, triggered_by=f"batch:{batch_id}")` |
**关键设计点**:
- **快照防漂移**:`ctx_snapshot` 在 estimate 阶段抓取,confirm 取出传给 worker,worker 按快照值执行(避免 Neo 在 estimate→confirm 间切 sandbox 模式造成数据漂移)
- **`return_exceptions=True`**:单个 member 失败不连坐,失败信息已写 ai_run_logs(熔断/限流/超时由 dispatcher.\_run_step 自动处理)
- **`triggered_by="batch:{batch_id}"`**:打标 ai_run_logs,Wave 2 加 `GET /batch-run/{batch_id}/status` 进度查询时 `WHERE triggered_by LIKE 'batch:<id>%'` 即可统计
### 2.2 后端 — `estimate_batch` / `confirm_batch` ctx_snapshot(A2)
**文件**:`apps/backend/app/services/ai/admin_service.py`
| 改动 | 说明 |
|---|---|
| `estimate_batch` | 入口调 `get_runtime_context(site_id)` 抓 RuntimeContext,存入 `_batch_store[batch_id].ctx_snapshot`;日志输出 mode + sandbox_date |
| `confirm_batch` | 取出 `ctx_snapshot`,异步调 `_run_batch(params, ctx_snapshot)` |
| `_batch_store` | 字典结构从 `{params, expires_at}` 扩展为 `{params, ctx_snapshot, expires_at}`;`params` 内加 `batch_id` 字段(传给 `_run_batch` 用于 triggered_by 标注) |
### 2.3 后端 — `confirm_batch_run` 路由 lazy 注入 dispatcher
**文件**:`apps/backend/app/routers/admin_ai.py:283-303`
`_admin_svc` 是模块级单例,不持有 dispatcher 引用;`dispatcher` 通常在 lifespan startup 才初始化。lazy 注入避免模块加载顺序问题:
```python
if _admin_svc._dispatcher is None:
try:
from app.ai.dispatcher import get_dispatcher
_admin_svc.set_dispatcher(get_dispatcher())
except RuntimeError as exc:
raise HTTPException(503, f"Dispatcher 未初始化: {exc}")
```
### 2.4 后端 — `retry_trigger_job` INSERT 落 runtime 列(A4)
**文件**:`apps/backend/app/services/ai/admin_service.py:380-422`
INSERT 显式拼接 `runtime_mode` + `sandbox_instance_id` 列(用 `runtime_insert_columns(site_id)` helper),与原 trigger_job 的 runtime 上下文保持一致。避免依赖默认值 'live' 导致 sandbox 模式下重试丢失 sandbox 标记。
### 2.5 后端 — App2 / App2a 3 行漏传 ref_date(A3)
| 文件:行 | 改动 |
|---|---|
| `app2_finance_prompt.py:817` | `_calc_date_range(board_time)``_calc_date_range(board_time, ref_date=get_runtime_context(site_id).business_date)`(储值卡余额变化板块) |
| `app2_finance_prompt.py:841` | 同上(日粒度 series + 异常检测窗口) |
| `app2a_finance_area_prompt.py:466` | 同上(区域日粒度 series) |
### 2.6 后端 — `runtime_context.get_runtime_context` 加 `bind_to_session` 参数
**文件**:`apps/backend/app/services/runtime_context.py:88-148`
新增可选参数 `bind_to_session: bool = False`,True 时返回前调用 `apply_runtime_session_vars(conn, ctx)`,激活 GUC `app.current_business_date` / `app.current_runtime_mode`,使 ETL 库 26 个 `app.v_*` 视图自动按业务日上界裁剪(`app.business_date_now()` 函数读取 GUC)。
**说明**:激活机制就位,但只在 `run_log_service.create_log` 试点开启(2.7);其余完整传递留 F1-5b。
### 2.7 后端 — `run_log_service.create_log` 启用 `bind_to_session=True`
**文件**:`apps/backend/app/ai/run_log_service.py:62-65`
试点改造,验证 GUC 激活机制可用。运行时事务级 SET LOCAL 在该 INSERT 期间生效。
### 2.8 DB — `biz.ai_run_logs` 复合索引补建(A6)
**文件**:`db/zqyy_app/migrations/20260505__ai_run_logs_runtime_index.sql`
```sql
CREATE INDEX IF NOT EXISTS idx_ai_run_logs_runtime_site
ON biz.ai_run_logs (site_id, runtime_mode, sandbox_instance_id, created_at DESC);
```
补齐其他 6 张业务表的复合索引模式。admin-web 按 runtime 维度过滤不再全表扫。
### 2.9 前端 — AIOperations.tsx sandbox 提示条(A5)
**文件**:`apps/admin-web/src/pages/AIOperations.tsx`
| 改动 | 说明 |
|---|---|
| import | 加 `Alert`(antd) + `fetchRuntimeContext / RuntimeContext`(api/runtimeContext) |
| `useEffect` | 拉 `cacheSiteId` 对应的 runtime context |
| Title 后 Alert | sandbox 模式下显示 `沙箱模式 · 业务日 X · 实例 sbx_*` + 详细描述(影响范围 + 切回 live 入口) |
## 三、未做(留 F1-5b 完整收口)
### 3.1 B1 — `admin_service.py` 6 处 `CURRENT_DATE` → `business_date`
涉及 `_get_range_stats` / `_get_7d_trend` / `_get_app_distribution` / `get_budget` 等,需要为这些函数加 site_id 参数链路(当前部分函数 site_id 是可选 None,改造涉及函数签名变更),工作量较大。F1-5b 完整 zqyy_app RLS 视图层落地时,这些查询自然走 `app.v_*` 视图(GUC 自动裁剪),无需改函数签名,更优雅。
### 3.2 B2 — `fdw_queries.py:113, 2552` 异常分支兜底改 `ctx.business_date`
异常分支用 `_date.today().isoformat()`,sandbox 触发异常会泄露真实今日。F1-5b 完整 fdw_queries GUC 接入时一并改。
### 3.3 GUC 完整传递
`run_log_service.create_log` 试点开启了 `bind_to_session=True`,但其他真正走 ETL 库视图的函数(fdw_queries 11+ 处 / page_context 等)没改。F1-5b 一并改造。
### 3.4 测试 3 套
`test_runtime_context.py` / `test_admin_ai_batch_runtime.py` / `test_dispatcher_runtime.py``.gitignore:71` 排除 tests/,本次会话写本地。F2-2 tests/ 入仓(Wave 5)时一并入仓 + 跑 CI。
### 3.5 P20 SPEC 同步
§6/§10/§11/§15 同步**留 F1-5b commit 一并改**(因为 F1-5b 才是 P20 SPEC C 方案的完整收口,届时 SPEC 反映"两库都用 GUC + zqyy_app 加 RLS 视图层"的最终状态更准确)。
## 四、风险与回滚
| 项 | 风险 | 回滚 |
|---|---|---|
| `_run_batch` 真实化 | 调用 dispatcher 失败时单 member 失败已写 ai_run_logs,不连坐;dispatcher 未注入时返 503 | git revert 当前 commit |
| `ctx_snapshot` 设计 | 内存 dict,worker crash 时 batch 丢失 — 接受(原 batch_id 设计本就 in-memory) | 同上 |
| `retry_trigger_job` 加 runtime 列 | INSERT 多 2 列,与已有索引兼容(只加列不删字段) | 同上 |
| App2/2a 3 行 ref_date | 仅在已有 try 块内改,异常 fallback 不变 | 同上 |
| `bind_to_session` 加可选参数 | 默认 False,旧调用不变;只在 run_log_service 试点开启 | 同上 |
| `migration` 仅 CREATE INDEX IF NOT EXISTS | 索引可重复执行,不破坏数据 | `DROP INDEX IF EXISTS biz.idx_ai_run_logs_runtime_site` |
| 前端 sandbox 提示条 | 失败时不阻断页面渲染(catch 后静默) | git revert |
## 五、F1-5b 待办(下一个 commit)
- B1 admin_service 6 处 CURRENT_DATE → business_date(随 RLS 视图层一并)
- B2 fdw_queries.py:113, 2552 异常分支兜底
- GUC 完整传递(fdw_queries / page_context / 等)
- zqyy_app 加 RLS 视图层(7+ 视图 `app.v_*`)+ 后端 46 处 `FROM biz.*``FROM app.v_*`
- 测试 3 套(本地写,F2-2 入仓时同步)
- P20 SPEC §6/§10/§11/§15 同步(反映 F1-5a + F1-5b 完整收口状态)
## 六、验证(F1-5a 范围)
### 6.1 syntax 检查(已通过)
```bash
.venv/Scripts/python.exe -m py_compile \
apps/backend/app/services/runtime_context.py \
apps/backend/app/services/ai/admin_service.py \
apps/backend/app/routers/admin_ai.py \
apps/backend/app/ai/run_log_service.py \
apps/backend/app/ai/prompts/app2_finance_prompt.py \
apps/backend/app/ai/prompts/app2a_finance_area_prompt.py
# → all-files-compile-ok
```
### 6.2 W1-T8 走查待验证项(F1-5a + F1-5b 改完后一并验证)
- 切 sandbox=2026-03-01 后触发 1 次 app2_finance,验证 `ai_run_logs.runtime_mode='sandbox'` + prompt JSON `current_time='2026-03-01 HH:MM'` + `ai_cache.target_id``sbx_*:` 前缀
- admin-web `/ai/operations` 顶部 sandbox 条带显示
- batch-run 真正执行(estimate→confirm 后 ai_run_logs 出现 `triggered_by='batch:<batch_id>'` 记录)
- estimate→切模式→confirm:验证 worker 用 estimate 时刻的 ctx_snapshot,**不**用 confirm 时刻的新模式
## 七、commit 建议
```
fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1 / 阶段 A 主体)
Neo F1-5 反馈: "让沙箱起到其真正的作用. 真正的模拟日期, 仅能看到沙箱设定日期
及之前日期的数据, 并运行 AI 的各个业务."
调研发现 (4 个并行子代理): batch-run 端点 _run_batch 是空壳 stub
(只 logger.info, 实际不跑 AI), GUC apply_runtime_session_vars 0 处调用
(dead code), 7 张业务表 6 张有 runtime 复合索引唯独 ai_run_logs 漏建,
App2/2a 3 行 _calc_date_range 漏传 ref_date.
本 commit (F1-5a 阶段 A 主体, F1-5b 后续完整 zqyy_app RLS 视图层):
后端核心:
- admin_service.py: _run_batch 真实化 (Semaphore(5)+asyncio.gather+
return_exceptions=True+ctx_snapshot 防漂移); estimate 入口抓
RuntimeContext 快照, confirm 取出传给 worker
- admin_ai.py: confirm_batch_run lazy 注入 dispatcher
- admin_service.retry_trigger_job: INSERT 落 runtime_mode +
sandbox_instance_id 列 (用 runtime_insert_columns helper)
- runtime_context.py: get_runtime_context 加 bind_to_session 参数,
激活 GUC app.current_business_date / app.current_runtime_mode
- run_log_service.create_log: 启用 bind_to_session=True 试点
App2/2a 3 行 ref_date 修复:
- app2_finance_prompt.py:817 储值卡余额变化板块
- app2_finance_prompt.py:841 日粒度 series + 异常检测窗口
- app2a_finance_area_prompt.py:466 区域日粒度 series
DB:
- migrations/20260505__ai_run_logs_runtime_index.sql:
补 (site_id, runtime_mode, sandbox_instance_id, created_at DESC) 复合索引
前端:
- AIOperations.tsx: 顶部加 sandbox 模式提示条 (Alert 显示 sandbox_date +
sandbox_instance_id + 影响范围 + 切回 live 入口)
未做 (留 F1-5b 完整 zqyy_app RLS 视图层一并):
- B1 admin_service 6 处 CURRENT_DATE → business_date
- B2 fdw_queries 异常分支兜底
- GUC 完整传递 (fdw_queries / page_context 等)
- 测试 3 套 (.gitignore:71 排除, F2-2 入仓时 commit)
- P20 SPEC §6/§10/§11/§15 (F1-5b 完整收口后同步更准确)
Neo 决策: docs/_overview/wave1-findings/F1-5-impl-decisions.md
详见 docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md
```