# -*- coding: utf-8 -*- """admin-web 端沙箱验证脚本。 不依赖前端 / 浏览器,直接调后端的 service 实现,校验 admin-web 各页面所依赖的核心数据 在 sandbox 模式下是否符合"按 sandbox_date 截断 / 隔离"的预期。 覆盖页面: - RuntimeContext → /api/admin/runtime-context, /api/config/runtime-context - AIRunLogs → list_run_logs / get_run_log(看 prompt 中 current_time) - AIOperations → namespace_ai_target_id / runtime_insert_columns(缓存命名隔离) - TaskManager → biz.coach_tasks 按 (runtime_mode, sandbox_instance_id) 隔离 - TriggerManager → biz.trigger_jobs 是全局共享(不应被 site 隔离) - AIDashboard → 调用次数等指标按真实时间统计(不受沙箱影响) 不覆盖(需手工 UI 验证): - 浏览器层 RuntimeContext 切换 Modal 关闭、Steps 弹窗 - AIRunLogs Drawer 渲染 - AIOperations 4 个 Card 交互 运行: python tools/db/verify_admin_web_sandbox.py [--sandbox-date 2025-09-01] [--site-id N] """ from __future__ import annotations import argparse import asyncio import json import os import sys from datetime import date, datetime, timedelta from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] BACKEND = ROOT / "apps" / "backend" sys.path.insert(0, str(BACKEND)) import psycopg2 # noqa: E402 import psycopg2.extensions # noqa: E402 from dotenv import load_dotenv # noqa: E402 load_dotenv(ROOT / ".env", override=False) os.environ["APP_DB_DSN"] = os.environ.get("TEST_APP_DB_DSN") or os.environ["APP_DB_DSN"] os.environ["PG_DSN"] = os.environ.get("TEST_PG_DSN") or os.environ["PG_DSN"] _etl = psycopg2.extensions.parse_dsn(os.environ["PG_DSN"]) _app = psycopg2.extensions.parse_dsn(os.environ["APP_DB_DSN"]) os.environ.setdefault("DB_HOST", _app.get("host", "localhost")) os.environ.setdefault("DB_PORT", str(_app.get("port", "5432"))) os.environ.setdefault("DB_USER", _app.get("user", "")) os.environ.setdefault("DB_PASSWORD", _app.get("password", "")) os.environ.setdefault("ETL_DB_HOST", _etl.get("host", "localhost")) os.environ.setdefault("ETL_DB_PORT", str(_etl.get("port", "5432"))) os.environ.setdefault("ETL_DB_USER", _etl.get("user", "")) os.environ.setdefault("ETL_DB_PASSWORD", _etl.get("password", "")) os.environ.setdefault("ETL_DB_NAME", _etl.get("dbname", "test_etl_feiqiu")) # --------------------------------------------------------------------------- # 公共工具(与 verify_sandbox_end_to_end.py 复用) # --------------------------------------------------------------------------- def switch_runtime(site_id: int, mode: str, sandbox_date: date | None = None) -> None: from app.database import get_connection from app.services.runtime_context import new_sandbox_instance_id instance_id = None if mode == "live" else new_sandbox_instance_id() sb_date = None if mode == "live" else sandbox_date conn = get_connection() try: with conn.cursor() as cur: cur.execute( """ INSERT INTO biz.site_runtime_context (site_id, mode, sandbox_date, sandbox_instance_id, ai_mode, status, updated_at, updated_by) VALUES (%s, %s, %s, %s, 'live', 'active', NOW(), NULL) ON CONFLICT (site_id) DO UPDATE SET mode = EXCLUDED.mode, sandbox_date = EXCLUDED.sandbox_date, sandbox_instance_id = EXCLUDED.sandbox_instance_id, updated_at = NOW(), updated_by = NULL """, (site_id, mode, sb_date, instance_id), ) conn.commit() finally: conn.close() def pick_test_site() -> int: pg_dsn = os.environ["PG_DSN"] conn = psycopg2.connect(pg_dsn) try: with conn.cursor() as cur: cur.execute( "SELECT site_id FROM dws.dws_finance_area_daily " "GROUP BY site_id ORDER BY COUNT(*) DESC LIMIT 1" ) row = cur.fetchone() if not row: raise SystemExit("etl 测试库无 finance_area_daily 数据") return int(row[0]) finally: conn.close() # --------------------------------------------------------------------------- # 各 admin-web 页面专项检查 # --------------------------------------------------------------------------- def check_runtime_context_api(site_id: int, bd: date) -> list[tuple[str, Any, str]]: """RuntimeContext 页 / Banner 用:核对 API 返回结构。""" from app.routers.admin_runtime_context import _context_response from app.services.runtime_context import get_runtime_context ctx = get_runtime_context(site_id) resp = _context_response(ctx).model_dump() rows: list[tuple[str, Any, str]] = [] rows.append(("ctx.mode", resp["mode"], "PASS" if resp["mode"] == "sandbox" else f"FAIL (got {resp['mode']})")) rows.append(("ctx.is_sandbox", resp["is_sandbox"], "PASS" if resp["is_sandbox"] else "FAIL")) # sandbox_date / business_date 在 model_dump 里是 date 对象,不是字符串 sb_d = resp["sandbox_date"] bd_d = resp["business_date"] sb_d_iso = sb_d.isoformat() if hasattr(sb_d, "isoformat") else str(sb_d) bd_d_iso = bd_d.isoformat() if hasattr(bd_d, "isoformat") else str(bd_d) rows.append(("ctx.sandbox_date", sb_d_iso, "PASS" if sb_d_iso == bd.isoformat() else f"FAIL (got {sb_d_iso})")) rows.append(("ctx.business_date", bd_d_iso, "PASS" if bd_d_iso == bd.isoformat() else f"FAIL (got {bd_d_iso})")) rows.append(("ctx.sandbox_instance_id", resp["sandbox_instance_id"], "PASS" if resp["sandbox_instance_id"] and resp["sandbox_instance_id"].startswith("sbx_") else "FAIL")) return rows async def check_ai_run_logs_have_sandbox_prompt(site_id: int, bd: date) -> list[tuple[str, Any, str]]: """AIRunLogs 页:本轮触发的 App3 / App2 prompt 中 current_time 应等于 sandbox_date。 不实际调 AI(避免烧 token),但**直接构建 prompt 字符串**,校验 prompt JSON 内 current_time。 这就是 admin-web 用户点 AIRunLogs Drawer 看到的 'Request Prompt' 字段内容。 """ rows: list[tuple[str, Any, str]] = [] # === App3 prompt === try: from app.ai.prompts import app3_clue_prompt # 取一个有消费记录的 member_id(直接从 dwd 表拿,避开视图业务日上界) pg_dsn = os.environ["PG_DSN"] member_id = None conn_etl = psycopg2.connect(pg_dsn) try: with conn_etl.cursor() as cur: cur.execute( "SELECT tenant_member_id FROM dwd.dwd_assistant_service_log " "WHERE is_delete = 0 AND site_id = %s AND tenant_member_id IS NOT NULL " "AND create_time::date <= %s " "LIMIT 1", (site_id, bd), ) r = cur.fetchone() if r: member_id = r[0] finally: conn_etl.close() if member_id: prompt_str = await app3_clue_prompt.build_prompt({"site_id": site_id, "member_id": member_id}) payload = json.loads(prompt_str) current_time = payload.get("current_time") d_in_prompt = datetime.strptime(current_time, "%Y-%m-%d %H:%M").date() rows.append(( "App3 prompt.current_time", current_time, "PASS" if d_in_prompt == bd else f"FAIL (expected {bd})", )) rows.append(( "App3 prompt.member_id", payload.get("member_id"), "OK" if payload.get("member_id") == member_id else "FAIL", )) else: rows.append(("App3 prompt", None, "SKIP (no member with service_log)")) except Exception as exc: rows.append(("App3 prompt", None, f"ERROR: {type(exc).__name__}: {str(exc)[:120]}")) # === App2 prompt === try: from app.ai.prompts import app2_finance_prompt # 注意:App2 的 time_dimension 是业务侧枚举(this_month/last_month/...), # 不是 board_service 的内部枚举(month/quarter/...) ctx = {"site_id": site_id, "time_dimension": "this_month"} prompt_str = await app2_finance_prompt.build_prompt(ctx) payload = json.loads(prompt_str) current_time = payload.get("当前时间", "") d_in_prompt = datetime.strptime(current_time, "%Y-%m-%d %H:%M").date() if current_time else None rows.append(( "App2 prompt.当前时间", current_time, "PASS" if d_in_prompt == bd else f"FAIL (expected {bd})", )) # 财务窗口结束日应 <= sandbox_date period = payload.get("当期日期范围", "") if period and "~" in period: end_str = period.split("~")[-1].strip() try: end_d = datetime.strptime(end_str, "%Y-%m-%d").date() rows.append(( "App2 prompt.当期日期范围(end)", end_str, "PASS" if end_d <= bd else f"FAIL (>{bd})", )) except Exception: rows.append(("App2 prompt.当期日期范围(end)", end_str, "OK (无法解析)")) except Exception as exc: rows.append(("App2 prompt", None, f"ERROR: {type(exc).__name__}: {str(exc)[:120]}")) return rows def check_ai_cache_namespace_isolation(site_id: int, bd: date) -> list[tuple[str, Any, str]]: """AIOperations 页 Card 2 缓存失效:sandbox 模式下 target_id 应被加 sandbox_instance_id 前缀。""" from app.services.runtime_context import ( get_runtime_context, namespace_ai_target_id, runtime_insert_columns, ) ctx = get_runtime_context(site_id) rows: list[tuple[str, Any, str]] = [] # 1. namespace_ai_target_id raw = "12345" namespaced = namespace_ai_target_id(site_id, raw) expected_prefix = ctx.sandbox_instance_id + ":" rows.append(( "namespace_ai_target_id('12345')", namespaced, "PASS" if namespaced.startswith(expected_prefix) and namespaced.endswith(":12345") else "FAIL", )) # 2. runtime_insert_columns 应该返回 ('runtime_mode, sandbox_instance_id', '%s, %s', ['sandbox', sbx_id]) cols, placeholders, values = runtime_insert_columns(site_id) rows.append(( "runtime_insert_columns.cols", cols, "PASS" if cols == "runtime_mode, sandbox_instance_id" else "FAIL", )) rows.append(( "runtime_insert_columns.values[0]", values[0], "PASS" if values[0] == "sandbox" else f"FAIL (got {values[0]})", )) rows.append(( "runtime_insert_columns.values[1]", values[1], "PASS" if values[1] and values[1].startswith("sbx_") else f"FAIL (got {values[1]})", )) return rows def check_task_manager_isolation(site_id: int, bd: date) -> list[tuple[str, Any, str]]: """TaskManager 页:sandbox 模式下查 coach_tasks 应只返回 sandbox 实例的记录。""" from app.database import get_connection from app.services.runtime_context import get_runtime_context, task_runtime_filter ctx = get_runtime_context(site_id) rows: list[tuple[str, Any, str]] = [] runtime_clause, runtime_params = task_runtime_filter(site_id, alias="ct") # 1. 校验 task_runtime_filter SQL 片段含 runtime_mode/sandbox_instance_id rows.append(( "task_runtime_filter clause", runtime_clause.strip(), "PASS" if "runtime_mode" in runtime_clause and "sandbox_instance_id" in runtime_clause else "FAIL", )) rows.append(( "task_runtime_filter params", runtime_params, "PASS" if runtime_params and runtime_params[0] == "sandbox" else f"FAIL (got {runtime_params})", )) # 2. 实查 biz.coach_tasks 全量 vs sandbox 过滤后的差 conn = get_connection() try: with conn.cursor() as cur: cur.execute( "SELECT COUNT(*), COUNT(*) FILTER (WHERE runtime_mode = 'sandbox' AND sandbox_instance_id = %s) " "FROM biz.coach_tasks ct WHERE ct.site_id = %s", (ctx.sandbox_instance_id, site_id), ) total_all, total_sbx = cur.fetchone() sql = ( "SELECT COUNT(*) FROM biz.coach_tasks ct " "WHERE ct.site_id = %s" + runtime_clause ) cur.execute(sql, [site_id, *runtime_params]) filtered = cur.fetchone()[0] conn.commit() finally: conn.close() rows.append(( "biz.coach_tasks 全量 (site)", total_all, "OK", )) rows.append(( "biz.coach_tasks 仅 sandbox 实例", total_sbx, "OK", )) rows.append(( "task_runtime_filter 过滤后 = sandbox 实例?", filtered, "PASS" if filtered == total_sbx else f"FAIL (filtered={filtered}, expected={total_sbx})", )) return rows def check_trigger_manager_global(site_id: int, bd: date) -> list[tuple[str, Any, str]]: """TriggerManager 页:biz.trigger_jobs 是**全局共享表**,sandbox 切换不应停掉它。""" from app.database import get_connection rows: list[tuple[str, Any, str]] = [] conn = get_connection() try: with conn.cursor() as cur: cur.execute("SELECT COUNT(*), COUNT(*) FILTER (WHERE status NOT IN ('paused_by_sandbox')) FROM biz.trigger_jobs") total, active_like = cur.fetchone() conn.commit() finally: conn.close() rows.append(("biz.trigger_jobs 总数", total, "OK")) rows.append(( "biz.trigger_jobs 未被 sandbox 暂停", active_like, "PASS" if active_like == total else f"FAIL (有 {total - active_like} 条被 sandbox 错误暂停)", )) return rows async def check_dashboard_uses_real_time(site_id: int, bd: date) -> list[tuple[str, Any, str]]: """AIDashboard 页:'今日'调用次数应用真实日期窗口(CURRENT_DATE),不被沙箱拉到 sandbox_date。""" from app.services.ai.admin_service import AdminAIService svc = AdminAIService() rows: list[tuple[str, Any, str]] = [] try: data = await svc.get_dashboard(site_id=site_id, range_days=1) # data 含 today_total 等字段,时间窗口由 CURRENT_DATE 决定 rows.append(( "Dashboard.range=1 today_total", data.get("today_total", "?"), "OK (按真实时间,不受沙箱影响)", )) rows.append(( "Dashboard.app_health 数量", len(data.get("app_health", [])), "OK", )) rows.append(( "Dashboard.budget 字段存在", "budget" in data, "PASS" if "budget" in data else "FAIL", )) except Exception as exc: rows.append(("AIDashboard.get_dashboard", None, f"ERROR: {type(exc).__name__}: {str(exc)[:120]}")) return rows async def check_ai_run_logs_query(site_id: int, bd: date) -> list[tuple[str, Any, str]]: """AIRunLogs 列表 API 仍按真实 created_at 排序(写入时间是真实系统时间,不受沙箱影响)。""" from app.services.ai.admin_service import AdminAIService svc = AdminAIService() rows: list[tuple[str, Any, str]] = [] try: resp = await svc.list_run_logs({"site_id": site_id}, page=1, page_size=5) items = resp.get("items", []) rows.append(("list_run_logs total", resp.get("total"), "OK")) rows.append(("list_run_logs page items", len(items), "OK")) if items: latest = items[0] rows.append(("list_run_logs[0].created_at", str(latest.get("created_at"))[:19], "OK (写入按真实时间)")) except Exception as exc: rows.append(("list_run_logs", None, f"ERROR: {type(exc).__name__}: {str(exc)[:120]}")) return rows # --------------------------------------------------------------------------- # 报告渲染 # --------------------------------------------------------------------------- def render_report(site_id: int, bd: date, sections: list[tuple[str, list[tuple[str, Any, str]]]]) -> str: out: list[str] = [] out.append(f"# admin-web 沙箱验证报告\n") out.append(f"- site_id: `{site_id}`") out.append(f"- sandbox_date: `{bd.isoformat()}`") out.append(f"- 生成时间: `{datetime.now().isoformat(timespec='seconds')}`") out.append(f"- 范围: admin-web 后端 service 实现 + AI prompt 构建 + 缓存 / 任务隔离\n") pass_n = 0 fail_n = 0 for title, rows in sections: out.append(f"## {title}\n") out.append("| 检查项 | 取值 | 结果 |") out.append("|---|---|---|") for label, val, status in rows: out.append(f"| `{label}` | {val} | {status} |") if status.startswith("PASS"): pass_n += 1 elif status.startswith("FAIL") or status.startswith("ERROR"): fail_n += 1 out.append("") out.append(f"## 汇总\n") out.append(f"- PASS: {pass_n}") out.append(f"- FAIL/ERROR: {fail_n}\n") if fail_n == 0: out.append("**结论:PASS — admin-web 各页面后端依赖在 sandbox 下行为符合预期。**") else: out.append("**结论:FAIL — 见上表。**") return "\n".join(out) async def main_async(args: argparse.Namespace) -> None: bd = datetime.strptime(args.sandbox_date, "%Y-%m-%d").date() site_id = args.site_id or pick_test_site() print(f"[admin-verify] site_id={site_id}, sandbox_date={bd}") print(f"[admin-verify] 切换到 sandbox({bd})") switch_runtime(site_id, "sandbox", bd) sections: list[tuple[str, list[tuple[str, Any, str]]]] = [] try: sections.append(("RuntimeContext 页 / Banner(/api/admin/runtime-context)", check_runtime_context_api(site_id, bd))) sections.append(("AIRunLogs 抽屉(Request Prompt 内 current_time)", await check_ai_run_logs_have_sandbox_prompt(site_id, bd))) sections.append(("AIRunLogs 列表(按真实时间 created_at)", await check_ai_run_logs_query(site_id, bd))) sections.append(("AIOperations Card 2 缓存命名隔离", check_ai_cache_namespace_isolation(site_id, bd))) sections.append(("TaskManager 队列 / 历史 任务隔离", check_task_manager_isolation(site_id, bd))) sections.append(("TriggerManager 全局触发器(不应被 sandbox 暂停)", check_trigger_manager_global(site_id, bd))) sections.append(("AIDashboard 指标(按真实时间)", await check_dashboard_uses_real_time(site_id, bd))) finally: if not args.keep_sandbox: print(f"[admin-verify] 还原到 live") switch_runtime(site_id, "live") md = render_report(site_id, bd, sections) out_path = ROOT / args.out out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(md, encoding="utf-8") print(f"[admin-verify] 报告写入 {out_path.relative_to(ROOT)}") print() if "## 汇总" in md: print(md.split("## 汇总")[1]) def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--sandbox-date", default="2025-09-01") parser.add_argument("--site-id", type=int, default=None) parser.add_argument("--keep-sandbox", action="store_true") parser.add_argument("--out", default="docs/database/changes/2026-05-02__sandbox_admin_web_verify_report.md") args = parser.parse_args() asyncio.run(main_async(args)) if __name__ == "__main__": main()