Files
Neo-ZQYY/apps/backend/app/services/runtime_context.py
Neo 421e193041 fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1 / 阶段 A 主体)
Neo F1-5 反馈: "让沙箱起到其真正的作用. 真正的模拟日期, 仅能看到沙箱设定日期
及之前日期的数据, 并运行 AI 的各个业务."

调研发现 (4 个并行子代理): batch-run 端点 _run_batch 是空壳 stub
(只 logger.info, 实际不跑 AI), GUC apply_runtime_session_vars 0 处调用
(dead code), 7 张业务表 6 张有 runtime 复合索引唯独 ai_run_logs 漏建,
App2/2a 3 行 _calc_date_range 漏传 ref_date.

本 commit (F1-5a 阶段 A 主体, F1-5b 后续完整 zqyy_app RLS 视图层):

后端核心:
- admin_service.py: _run_batch 真实化 (Semaphore(5)+asyncio.gather+
  return_exceptions=True+ctx_snapshot 防漂移); estimate 入口抓
  RuntimeContext 快照, confirm 取出传给 worker
- admin_ai.py: confirm_batch_run lazy 注入 dispatcher
- admin_service.retry_trigger_job: INSERT 落 runtime_mode +
  sandbox_instance_id 列 (用 runtime_insert_columns helper)
- runtime_context.py: get_runtime_context 加 bind_to_session 参数,
  激活 GUC app.current_business_date / app.current_runtime_mode
- run_log_service.create_log: 启用 bind_to_session=True 试点

App2/2a 3 行 ref_date 修复:
- app2_finance_prompt.py:817 储值卡余额变化板块
- app2_finance_prompt.py:841 日粒度 series + 异常检测窗口
- app2a_finance_area_prompt.py:466 区域日粒度 series

DB:
- migrations/20260505__ai_run_logs_runtime_index.sql:
  补 (site_id, runtime_mode, sandbox_instance_id, created_at DESC) 复合索引

前端:
- AIOperations.tsx: 顶部加 sandbox 模式提示条 (Alert 显示 sandbox_date +
  sandbox_instance_id + 影响范围 + 切回 live 入口)

未做 (留 F1-5b 完整 zqyy_app RLS 视图层一并):
- B1 admin_service 6 处 CURRENT_DATE -> business_date
- B2 fdw_queries 异常分支兜底
- GUC 完整传递 (fdw_queries / page_context 等)
- 测试 3 套 (.gitignore:71 排除, F2-2 入仓时 commit)
- P20 SPEC \xa76/\xa710/\xa711/\xa715 (F1-5b 完整收口后同步更准确)

Neo 决策: docs/_overview/wave1-findings/F1-5-impl-decisions.md

详见 docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md
2026-05-05 03:01:48 +08:00

292 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""业务运行上下文与业务时钟服务。
该模块是开发/测试沙箱的统一控制层:
- live 模式:沿用真实系统日期和正式数据。
- sandbox 模式:业务上假设今天是配置的历史日期,并用 sandbox_instance_id 隔离写入。
"""
from __future__ import annotations
import logging
import uuid
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta, timezone
from typing import Any
from app import config
logger = logging.getLogger(__name__)
_LOCAL_TZ = timezone(timedelta(hours=8))
MODE_LIVE = "live"
MODE_SANDBOX = "sandbox"
AI_MODE_LIVE = "live"
LIVE_INSTANCE_ID = "live"
@dataclass(frozen=True)
class RuntimeContext:
"""单门店当前业务运行上下文。"""
site_id: int
mode: str = MODE_LIVE
business_day_start_hour: int = config.BUSINESS_DAY_START_HOUR
sandbox_date: date | None = None
sandbox_instance_id: str | None = None
ai_mode: str = AI_MODE_LIVE
status: str = "active"
@property
def is_sandbox(self) -> bool:
return self.mode == MODE_SANDBOX and self.sandbox_date is not None
@property
def business_date(self) -> date:
if self.is_sandbox and self.sandbox_date is not None:
return self.sandbox_date
now = datetime.now(_LOCAL_TZ)
today = now.date()
if now.hour < self.business_day_start_hour:
return today - timedelta(days=1)
return today
@property
def business_now(self) -> datetime:
if not self.is_sandbox:
return datetime.now(_LOCAL_TZ)
now = datetime.now(_LOCAL_TZ)
return datetime.combine(self.business_date, now.timetz(), tzinfo=_LOCAL_TZ)
@property
def active_sandbox_instance_id(self) -> str | None:
if not self.is_sandbox:
return None
return self.sandbox_instance_id
def to_dict(self) -> dict[str, Any]:
return {
"site_id": self.site_id,
"mode": self.mode,
"business_day_start_hour": self.business_day_start_hour,
"business_date": self.business_date.isoformat(),
"business_now": self.business_now.isoformat(),
"sandbox_date": self.sandbox_date.isoformat() if self.sandbox_date else None,
"sandbox_instance_id": self.sandbox_instance_id,
"ai_mode": self.ai_mode,
"status": self.status,
"is_sandbox": self.is_sandbox,
}
def new_sandbox_instance_id() -> str:
"""生成新的沙箱实例 ID。"""
return f"sbx_{uuid.uuid4().hex[:24]}"
def _default_context(site_id: int) -> RuntimeContext:
return RuntimeContext(site_id=site_id)
def get_runtime_context(
site_id: int,
conn: Any | None = None,
*,
bind_to_session: bool = False,
) -> RuntimeContext:
"""读取门店运行上下文。
表不存在或未配置时降级为 live保证迁移前不影响正式链路。
F1-5a 新增 ``bind_to_session``:当 True 且 conn 非空时,在返回前调用
``apply_runtime_session_vars(conn, ctx)`` 设置 GUC ``app.current_business_date`` /
``app.current_runtime_mode``,激活 ETL 库 26 个 ``app.v_*`` 视图的业务日上界裁剪
(`app.business_date_now()` 函数读取 GUC)。
使用场景:fdw_queries 等走 ETL 库视图的查询入口处显式 ``bind_to_session=True``。
其余只读取 ctx 不查 ETL 视图的调用方保持默认 ``False`` 即可。
"""
own_conn = conn is None
if own_conn:
from app.database import get_connection
conn = get_connection()
try:
with conn.cursor() as cur:
try:
cur.execute(
"""
SELECT mode, sandbox_date, sandbox_instance_id, ai_mode, status
FROM biz.site_runtime_context
WHERE site_id = %s
""",
(site_id,),
)
except Exception:
if own_conn:
conn.rollback()
return _default_context(site_id)
row = cur.fetchone()
if own_conn:
conn.commit()
finally:
if own_conn:
conn.close()
if not row:
ctx = _default_context(site_id)
else:
mode, sandbox_date, sandbox_instance_id, ai_mode, status = row
if mode not in (MODE_LIVE, MODE_SANDBOX):
mode = MODE_LIVE
if mode == MODE_SANDBOX and (sandbox_date is None or not sandbox_instance_id):
mode = MODE_LIVE
ctx = RuntimeContext(
site_id=site_id,
mode=mode,
sandbox_date=sandbox_date,
sandbox_instance_id=sandbox_instance_id,
ai_mode=ai_mode or AI_MODE_LIVE,
status=status or "active",
)
# F1-5a: 显式开启时,绑定到当前 session,激活 ETL 库视图业务日上界
if bind_to_session and not own_conn and conn is not None:
try:
apply_runtime_session_vars(conn, ctx=ctx)
except Exception:
logger.debug(
"apply_runtime_session_vars 失败(不阻塞主流程) site_id=%d", site_id,
exc_info=True,
)
return ctx
def namespace_ai_target_id(site_id: int, target_id: str, conn: Any | None = None) -> str:
"""按当前上下文转换 AI cache target_id。
前端和调用方继续使用原始 target_id沙箱命名空间在后端统一处理。
"""
ctx = get_runtime_context(site_id, conn=conn)
if not ctx.is_sandbox or not ctx.sandbox_instance_id:
return target_id
return f"{ctx.sandbox_instance_id}:{target_id}"
def task_runtime_filter(
site_id: int,
*,
alias: str = "",
conn: Any | None = None,
) -> tuple[str, list[Any]]:
"""返回 coach_tasks 等表的运行上下文过滤条件。"""
ctx = get_runtime_context(site_id, conn=conn)
prefix = f"{alias}." if alias else ""
if ctx.is_sandbox and ctx.sandbox_instance_id:
return (
f" AND {prefix}runtime_mode = %s AND {prefix}sandbox_instance_id = %s",
[MODE_SANDBOX, ctx.sandbox_instance_id],
)
return (
f" AND COALESCE({prefix}runtime_mode, 'live') = %s "
f"AND COALESCE({prefix}sandbox_instance_id, %s) = %s",
[MODE_LIVE, LIVE_INSTANCE_ID, LIVE_INSTANCE_ID],
)
def runtime_insert_columns(site_id: int, conn: Any | None = None) -> tuple[str, str, list[Any]]:
"""返回 INSERT SQL 片段:列名、占位符和值。"""
ctx = get_runtime_context(site_id, conn=conn)
return (
"runtime_mode, sandbox_instance_id",
"%s, %s",
[
MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE,
ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID,
],
)
def runtime_update_assignments(site_id: int, conn: Any | None = None) -> tuple[str, list[Any]]:
"""返回 UPDATE SQL 片段,用于把运行上下文写回已有记录。"""
ctx = get_runtime_context(site_id, conn=conn)
return (
"runtime_mode = %s, sandbox_instance_id = %s",
[
MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE,
ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID,
],
)
def as_runtime_now_param(site_id: int, conn: Any | None = None) -> datetime:
"""返回可传给 SQL 的业务当前时间。"""
return get_runtime_context(site_id, conn=conn).business_now
def as_runtime_today_param(site_id: int, conn: Any | None = None) -> date:
"""返回可传给 SQL 的业务当前日期。"""
return get_runtime_context(site_id, conn=conn).business_date
def as_runtime_year_month_param(site_id: int, conn: Any | None = None) -> str:
"""返回 'YYYY-MM' 形式的业务年月,用于 performance 等月度查询。"""
bd = get_runtime_context(site_id, conn=conn).business_date
return f"{bd.year:04d}-{bd.month:02d}"
def as_runtime_business_now_str(site_id: int, conn: Any | None = None, fmt: str = "%Y-%m-%d %H:%M:%S") -> str:
"""返回业务当前时间的格式化字符串,用于 AI prompts 中的 current_time。"""
return get_runtime_context(site_id, conn=conn).business_now.strftime(fmt)
def business_date_upper_bound_sql(
site_id: int,
*,
column: str,
alias: str = "",
cast: str | None = None,
conn: Any | None = None,
) -> tuple[str, list[Any]]:
"""返回业务日上界 SQL 片段。
sandbox 模式下,强制把 ``column`` 限制在业务日及之前(避免读到「未来」数据)。
live 模式下返回空片段,不影响任何逻辑。
cast 用于把 timestamp/timestamptz 列裁剪成日期再比较,例如 ``cast='date'``。
"""
ctx = get_runtime_context(site_id, conn=conn)
if not ctx.is_sandbox:
return ("", [])
prefix = f"{alias}." if alias else ""
expr = f"{prefix}{column}"
if cast:
expr = f"({expr})::{cast}"
return (f" AND {expr} <= %s", [ctx.business_date])
def apply_runtime_session_vars(conn: Any, ctx: RuntimeContext | None = None, *, site_id: int | None = None) -> None:
"""在已有数据库连接上设置 ``app.current_business_date`` 等 GUC 变量。
供 RLS 视图层C 方案)使用:视图通过 ``current_setting('app.current_business_date', true)``
读取业务日,再对事实/维度表做日期上界裁剪。
无论 live / sandbox 都设置该变量live 下视图仍按真实 ``CURRENT_DATE`` 行为。
"""
if ctx is None:
if site_id is None:
raise ValueError("apply_runtime_session_vars 需要 ctx 或 site_id 之一")
ctx = get_runtime_context(site_id, conn=conn)
bd = ctx.business_date.isoformat()
mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE
with conn.cursor() as cur:
cur.execute(
"SELECT set_config('app.current_business_date', %s, true), "
"set_config('app.current_runtime_mode', %s, true)",
(bd, mode),
)