feat(ai): F1-5b Wave A 中段 沙箱业务日全栈架构主体收口 (W1)

完成 F1-5b 任务:
- T1 RuntimeContext unit 测试基础(36 case PASS,本地不入仓走 .gitignore:71)
- A1 admin_service.py 4 处 CURRENT_DATE → business_date 改造
  - _get_range_stats / _get_7d_trend / _get_app_distribution
  - 上下界双全(下界 - 6 days + 上界 < + 1 day,Step 4b 暴露原 PR
    上界缺失,sandbox=4-20 时 trend_7d 漏 4-21~5-01 数据 → 修补)
  - 全局聚合 list_trigger_jobs / get_budget 保留 CURRENT_DATE
    (Neo D 决策选 A: 多 site 时全局无单一业务日)
- A2 fdw_queries:113 / 2552 异常分支兜底 + 三层 fallback + warning
  - conn=None 也尝试 get_runtime_context(自开 conn)
  - RuntimeContext 不可用降级真实 today + logger.warning
- A3 _fdw_context docstring 显式登记唯一 ETL 入口架构契约
  (D2 完整且统一: 所有 ETL 视图查询通过 _fdw_context 自动 SET 三个
   GUC: site_id / business_date / runtime_mode)
- 防御 hook post_edit_business_date_check.py
  Wave 2 后续 PR 引回 CURRENT_DATE / date.today() 即提醒

双口径验证(§3.1 4a + 4b):
- 4a live: dashboard trend_7d 2 条 4-30~5-01 (真实今天)
- 4b sandbox=2026-04-20: trend_7d 1 条仅 4-20 (业务日上界生效硬证据)
- pytest test_runtime_context 36/36 全过

未完(下一批 Wave A): T2 integration / UI-1/2/4 / MP-3/5 / MP-1 / BE-1
F1-5b-tasks.md 新增 + audit 记录已就位

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Neo
2026-05-05 15:01:51 +08:00
parent a045625d48
commit af02446740
6 changed files with 780 additions and 13 deletions

View File

@@ -21,6 +21,7 @@ from app.ai.budget_tracker import BudgetTracker
from app.database import get_connection
from app.services.runtime_context import (
RuntimeContext,
as_runtime_today_param,
get_runtime_context,
runtime_insert_columns,
)
@@ -94,12 +95,24 @@ class AdminAIService:
"""指定时间段内的调用次数、成功率、Token 消耗、平均延迟。
字段名沿用 today_* 前缀以兼容前端 DashboardResponse schema。
F1-5b A1: 时间窗口基准日:
- site_id 非 None: 用 RuntimeContext.business_date(sandbox 模式取虚拟日)
- site_id 为 None: 全局聚合,用 PG CURRENT_DATE(全局视图无单一业务日)
"""
site_clause, site_params = _site_filter(site_id)
if date_from and date_to:
time_clause = "created_at >= %s::date AND created_at < (%s::date + INTERVAL '1 day')"
time_params: tuple = (date_from, date_to)
elif site_id is not None:
days = range_days if range_days and range_days > 0 else 1
today = as_runtime_today_param(site_id)
time_clause = (
"created_at >= %s::date - (%s::int - 1) * INTERVAL '1 day' "
"AND created_at < %s::date + INTERVAL '1 day'"
)
time_params = (today, days, today)
else:
days = range_days if range_days and range_days > 0 else 1
time_clause = (
@@ -142,8 +155,26 @@ class AdminAIService:
}
async def _get_7d_trend(self, site_id: int | None) -> list[dict]:
"""近 7 天按日聚合。"""
site_clause, params = _site_filter(site_id)
"""近 7 天按日聚合。
F1-5b A1: site_id 非 None 用业务日上下界(sandbox 取虚拟日);
site_id None 全局聚合用 CURRENT_DATE。
2026-05-05 修补:同时加上界 < %s + 1day,sandbox 不漏未来数据。
"""
site_clause, site_params = _site_filter(site_id)
if site_id is not None:
today = as_runtime_today_param(site_id)
time_clause = (
"created_at >= %s::date - INTERVAL '6 days' "
"AND created_at < %s::date + INTERVAL '1 day'"
)
params = (today, today) + site_params
else:
time_clause = (
"created_at >= CURRENT_DATE - INTERVAL '6 days' "
"AND created_at < CURRENT_DATE + INTERVAL '1 day'"
)
params = site_params
conn = get_connection()
try:
with conn.cursor() as cur:
@@ -154,7 +185,7 @@ class AdminAIService:
COUNT(*) AS calls,
COUNT(*) FILTER (WHERE status = 'success') AS success_count
FROM biz.ai_run_logs
WHERE created_at >= CURRENT_DATE - INTERVAL '6 days'
WHERE {time_clause}
{site_clause}
GROUP BY day
ORDER BY day
@@ -176,8 +207,26 @@ class AdminAIService:
]
async def _get_app_distribution(self, site_id: int | None) -> list[dict]:
"""各 App 调用占比。"""
site_clause, params = _site_filter(site_id)
"""各 App 调用占比。
F1-5b A1: site_id 非 None 用业务日上下界(sandbox 取虚拟日);
site_id None 全局聚合用 CURRENT_DATE。
2026-05-05 修补:同时加上界 < %s + 1day,sandbox 不漏未来数据。
"""
site_clause, site_params = _site_filter(site_id)
if site_id is not None:
today = as_runtime_today_param(site_id)
time_clause = (
"created_at >= %s::date - INTERVAL '6 days' "
"AND created_at < %s::date + INTERVAL '1 day'"
)
params = (today, today) + site_params
else:
time_clause = (
"created_at >= CURRENT_DATE - INTERVAL '6 days' "
"AND created_at < CURRENT_DATE + INTERVAL '1 day'"
)
params = site_params
conn = get_connection()
try:
with conn.cursor() as cur:
@@ -185,7 +234,7 @@ class AdminAIService:
f"""
SELECT app_type, COUNT(*) AS cnt
FROM biz.ai_run_logs
WHERE created_at >= CURRENT_DATE - INTERVAL '6 days'
WHERE {time_clause}
{site_clause}
GROUP BY app_type
ORDER BY cnt DESC

View File

@@ -93,6 +93,21 @@ def _fdw_context(conn: Any, site_id: int, *, etl_conn: Any = None):
不传时新建连接并在 yield 后自动关闭。避免同一请求内多次新建连接(每次 ~2.6s)。
CHANGE 2026-05-02 | 同时设置 app.current_business_date / app.current_runtime_mode
供 RLS 视图层C 方案做日期上界裁剪。conn=None 时降级 live。
F1-5b A3 架构契约(D2"完整且统一"):
本函数是 backend 访问 ETL 库 (etl_feiqiu) 的**唯一入口**,任何 ETL 视图查询
都必须通过 `with _fdw_context(...) as cur` 形式调用。本函数内部 SET 三个 GUC:
- app.current_site_id (RLS 多门店隔离)
- app.current_business_date (sandbox 业务日上界,激活 ETL 库 26 个 v_* 视图裁剪)
- app.current_runtime_mode (live/sandbox 标识)
禁止绕过 `_fdw_context` 直接 `_get_etl_connection` + cur.execute,
否则 sandbox 业务日上界在该路径不生效。
与 `apply_runtime_session_vars` 的关系:
- apply_runtime_session_vars 设 business_date + runtime_mode (业务库 conn 用)
- _fdw_context 额外设 site_id (ETL 库 RLS 必需)
两者互补,不重叠。zqyy_app conn 调 apply_runtime_session_vars,
ETL conn 走 _fdw_context。
"""
from app.services.runtime_context import (
MODE_LIVE,
@@ -104,15 +119,18 @@ def _fdw_context(conn: Any, site_id: int, *, etl_conn: Any = None):
bd_str = ""
rt_mode = MODE_LIVE
try:
if conn is not None:
ctx = get_runtime_context(site_id, conn=conn)
bd_str = ctx.business_date.isoformat()
rt_mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE
else:
from datetime import date as _date
bd_str = _date.today().isoformat()
# F1-5b A2: conn=None 也尝试取 ctx(get_runtime_context 内部自开 conn),
# 确保 sandbox 业务日上界在所有调用路径生效。
ctx = get_runtime_context(site_id, conn=conn) if conn is not None else get_runtime_context(site_id)
bd_str = ctx.business_date.isoformat()
rt_mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE
except Exception:
# 三层 fallback: RuntimeContext 完全不可用 → 降级真实 today + warning
from datetime import date as _date
logger.warning(
"F1-5b A2: RuntimeContext 不可用,fdw_context 降级真实 today (site_id=%s)",
site_id, exc_info=True,
)
bd_str = _date.today().isoformat()
rt_mode = MODE_LIVE
@@ -2549,6 +2567,12 @@ def _get_weekly_visits_batch(
from datetime import date as _date, timedelta as _timedelta
if ref_date is None:
# F1-5b A2: ref_date 应由调用方从 RuntimeContext.business_date 显式传入,
# 此处降级真实 today 仅作 last-resort 兜底,违反 sandbox 上界
logger.warning(
"F1-5b A2: _get_weekly_visits_batch 收到 ref_date=None,"
"调用方应从 RuntimeContext.business_date 显式传入,降级真实 today"
)
ref_date = _date.today()
elif hasattr(ref_date, "date") and not isinstance(ref_date, _date):
ref_date = ref_date.date()