fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1 / 阶段 A 主体)

Neo F1-5 反馈: "让沙箱起到其真正的作用. 真正的模拟日期, 仅能看到沙箱设定日期
及之前日期的数据, 并运行 AI 的各个业务."

调研发现 (4 个并行子代理): batch-run 端点 _run_batch 是空壳 stub
(只 logger.info, 实际不跑 AI), GUC apply_runtime_session_vars 0 处调用
(dead code), 7 张业务表 6 张有 runtime 复合索引唯独 ai_run_logs 漏建,
App2/2a 3 行 _calc_date_range 漏传 ref_date.

本 commit (F1-5a 阶段 A 主体, F1-5b 后续完整 zqyy_app RLS 视图层):

后端核心:
- admin_service.py: _run_batch 真实化 (Semaphore(5)+asyncio.gather+
  return_exceptions=True+ctx_snapshot 防漂移); estimate 入口抓
  RuntimeContext 快照, confirm 取出传给 worker
- admin_ai.py: confirm_batch_run lazy 注入 dispatcher
- admin_service.retry_trigger_job: INSERT 落 runtime_mode +
  sandbox_instance_id 列 (用 runtime_insert_columns helper)
- runtime_context.py: get_runtime_context 加 bind_to_session 参数,
  激活 GUC app.current_business_date / app.current_runtime_mode
- run_log_service.create_log: 启用 bind_to_session=True 试点

App2/2a 3 行 ref_date 修复:
- app2_finance_prompt.py:817 储值卡余额变化板块
- app2_finance_prompt.py:841 日粒度 series + 异常检测窗口
- app2a_finance_area_prompt.py:466 区域日粒度 series

DB:
- migrations/20260505__ai_run_logs_runtime_index.sql:
  补 (site_id, runtime_mode, sandbox_instance_id, created_at DESC) 复合索引

前端:
- AIOperations.tsx: 顶部加 sandbox 模式提示条 (Alert 显示 sandbox_date +
  sandbox_instance_id + 影响范围 + 切回 live 入口)

未做 (留 F1-5b 完整 zqyy_app RLS 视图层一并):
- B1 admin_service 6 处 CURRENT_DATE -> business_date
- B2 fdw_queries 异常分支兜底
- GUC 完整传递 (fdw_queries / page_context 等)
- 测试 3 套 (.gitignore:71 排除, F2-2 入仓时 commit)
- P20 SPEC \xa76/\xa710/\xa711/\xa715 (F1-5b 完整收口后同步更准确)

Neo 决策: docs/_overview/wave1-findings/F1-5-impl-decisions.md

详见 docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md
This commit is contained in:
Neo
2026-05-05 03:01:48 +08:00
parent a99bbd9a74
commit 421e193041
10 changed files with 909 additions and 40 deletions

View File

@@ -13,9 +13,18 @@ import asyncio
import logging
import uuid
from datetime import datetime, timezone, timedelta
from typing import TYPE_CHECKING, Any
from app.ai.budget_tracker import BudgetTracker
from app.database import get_connection
from app.services.runtime_context import (
RuntimeContext,
get_runtime_context,
runtime_insert_columns,
)
if TYPE_CHECKING:
from app.ai.dispatcher import AIDispatcher
logger = logging.getLogger(__name__)
@@ -25,13 +34,22 @@ AVG_TOKENS_PER_CALL = 2000
# 批量执行内存存储 TTL
_BATCH_TTL_SECONDS = 600 # 10 分钟
# F1-5a 批量执行并发上限
# Neo 决策 N=5与 dispatcher 现有 circuit_breaker / rate_limiter 配合,不打爆 DashScope 1000 RPM 限制)
_BATCH_CONCURRENCY = 5
class AdminAIService:
"""AI 监控后台聚合服务。"""
def __init__(self, budget_tracker: BudgetTracker | None = None) -> None:
self._budget = budget_tracker
self._batch_store: dict[str, dict] = {} # batch_id → {params, expires_at}
self._batch_store: dict[str, dict] = {} # batch_id → {params, ctx_snapshot, expires_at}
self._dispatcher: AIDispatcher | None = None # F1-5a: lifespan 启动时注入
def set_dispatcher(self, dispatcher: AIDispatcher) -> None:
"""F1-5a: lifespan 启动时注入 dispatcher用于 _run_batch 实际执行 AI 调用。"""
self._dispatcher = dispatcher
# ── Dashboard ─────────────────────────────────────────
@@ -344,29 +362,37 @@ class AdminAIService:
return _row_to_dict(cols, row)
async def retry_trigger_job(self, job_id: int) -> int:
"""创建新 trigger_jobis_forced=true返回新 job_id。"""
"""创建新 trigger_jobis_forced=true返回新 job_id。
F1-5a: INSERT 显式落 runtime_mode + sandbox_instance_id
与原 trigger_job 的 runtime 上下文保持一致(避免依赖默认值导致重试时丢失 sandbox 标记)。
"""
original = await self.get_trigger_job(job_id)
if original is None:
raise ValueError(f"trigger_job {job_id} 不存在")
site_id = original["site_id"]
cols, placeholders, runtime_params = runtime_insert_columns(site_id)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
f"""
INSERT INTO biz.ai_trigger_jobs
(event_type, member_id, site_id, connector_type,
payload, app_chain, is_forced, status)
VALUES (%s, %s, %s, %s, %s, %s, true, 'pending')
payload, app_chain, is_forced, status, {cols})
VALUES (%s, %s, %s, %s, %s, %s, true, 'pending', {placeholders})
RETURNING id
""",
(
original["event_type"],
original.get("member_id"),
original["site_id"],
site_id,
original.get("connector_type", "feiqiu"),
original.get("payload"),
original.get("app_chain"),
*runtime_params,
),
)
new_id = cur.fetchone()[0]
@@ -576,22 +602,41 @@ class AdminAIService:
async def estimate_batch(
self, app_types: list[str], member_ids: list[int], site_id: int,
) -> dict:
"""生成 batch_id存入内存TTL 10min返回预估。"""
"""生成 batch_id存入内存TTL 10min返回预估。
F1-5a: estimate 阶段抓 RuntimeContext 快照存入 _batch_store
confirm 时取出传给 _run_batch避免 estimate→confirm 间 Neo 切 sandbox 模式造成数据漂移污染。
"""
self._cleanup_expired_batches()
batch_id = uuid.uuid4().hex
estimated_calls = len(app_types) * len(member_ids)
estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL
# F1-5a: 抓 ctx_snapshot
ctx_snapshot = get_runtime_context(site_id)
self._batch_store[batch_id] = {
"params": {
"app_types": app_types,
"member_ids": member_ids,
"site_id": site_id,
"batch_id": batch_id, # 用于 _run_batch 内 triggered_by 标注
},
"ctx_snapshot": ctx_snapshot,
"expires_at": datetime.now(timezone.utc) + timedelta(seconds=_BATCH_TTL_SECONDS),
}
logger.info(
"批量执行预估: batch_id=%s apps=%s members=%d site_id=%s mode=%s sandbox_date=%s",
batch_id,
app_types,
len(member_ids),
site_id,
ctx_snapshot.mode,
ctx_snapshot.sandbox_date.isoformat() if ctx_snapshot.sandbox_date else None,
)
return {
"batch_id": batch_id,
"estimated_calls": estimated_calls,
@@ -599,7 +644,7 @@ class AdminAIService:
}
async def confirm_batch(self, batch_id: str) -> None:
"""取出参数,异步执行批量调用。"""
"""取出参数 + ctx_snapshot,异步执行批量调用。"""
self._cleanup_expired_batches()
entry = self._batch_store.pop(batch_id, None)
@@ -607,28 +652,77 @@ class AdminAIService:
raise ValueError(f"batch_id 无效或已过期: {batch_id}")
params = entry["params"]
ctx_snapshot: RuntimeContext = entry["ctx_snapshot"]
logger.info(
"批量执行确认: batch_id=%s apps=%s members=%d site_id=%s",
"批量执行确认: batch_id=%s apps=%s members=%d site_id=%s mode=%s",
batch_id,
params["app_types"],
len(params["member_ids"]),
params["site_id"],
ctx_snapshot.mode,
)
# 后台异步执行(具体调用链由路由层注入 dispatcher 处理)
asyncio.create_task(
self._run_batch(params["app_types"], params["member_ids"], params["site_id"])
)
# F1-5a: 后台异步执行,传入 ctx_snapshot 防止运行时 Neo 切模式
asyncio.create_task(self._run_batch(params, ctx_snapshot))
async def _run_batch(
self, app_types: list[str], member_ids: list[int], site_id: int,
self, params: dict[str, Any], ctx_snapshot: RuntimeContext,
) -> None:
"""后台批量执行(占位实现,实际由 dispatcher 驱动)。"""
"""F1-5a: 后台批量执行真正实现。
- Semaphore(_BATCH_CONCURRENCY=5) 限并发,避免打爆 DashScope 限流
- asyncio.gather + return_exceptions=True单个 member 失败不连坐
- context 显式带 business_date来自 ctx_snapshotprompt builder 自取沙箱日期
- triggered_by=f"batch:{batch_id}":打标 ai_run_logsWave 2 进度查询基础
"""
if self._dispatcher is None:
logger.error(
"批量执行失败: dispatcher 未注入 batch_id=%s",
params.get("batch_id"),
)
return
app_types: list[str] = params["app_types"]
member_ids: list[int] = params["member_ids"]
site_id: int = params["site_id"]
batch_id: str = params["batch_id"]
sem = asyncio.Semaphore(_BATCH_CONCURRENCY)
triggered_by = f"batch:{batch_id}"
business_date_iso = ctx_snapshot.business_date.isoformat()
async def _run_one(app_type: str, member_id: int) -> None:
async with sem:
try:
await self._dispatcher.run_single_app(
app_type=app_type,
context={
"site_id": site_id,
"member_id": member_id,
"business_date": business_date_iso,
},
triggered_by=triggered_by,
)
except Exception:
# 单个失败已写 ai_run_logs此处仅记录不连坐其他 member
logger.exception(
"批量执行单步失败 batch_id=%s app=%s member=%s",
batch_id, app_type, member_id,
)
tasks = [
asyncio.create_task(_run_one(at, mid))
for at in app_types
for mid in member_ids
]
logger.info(
"批量执行开始: apps=%s members=%d site_id=%s",
app_types, len(member_ids), site_id,
"批量执行开始: batch_id=%s tasks=%d concurrency=%d mode=%s",
batch_id, len(tasks), _BATCH_CONCURRENCY, ctx_snapshot.mode,
)
# 实际执行逻辑在路由层通过 dispatcher.handle_trigger 驱动
# 此处仅记录日志,避免服务层直接依赖 dispatcher 实例
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("批量执行完成: batch_id=%s tasks=%d", batch_id, len(tasks))
def _cleanup_expired_batches(self) -> None:
"""清理过期 batch。"""