feat(backend): F1-6 sprint1 sandbox_replay 模块脚手架 + get_last_visit_days 迁移试点 (W1)

F1-5b 完成后启动 F1-6 沙箱时光机阶段 B,Sprint 1 范围:
1. 建立 sandbox_replay 模块脚手架
2. 实现 @runtime_aware decorator(自动注入 RuntimeContext)
3. 试点迁移 1 个指标:get_last_visit_days(P1-4 距上次到店天数)
4. MCP 端到端 4a/4b 双口径走查

新增文件:
- apps/backend/app/services/sandbox_replay/__init__.py(模块入口 + re-export)
- apps/backend/app/services/sandbox_replay/_decorator.py(@runtime_aware 实现)
- apps/backend/app/services/sandbox_replay/consumption_replay.py(get_last_visit_days 试点)
- docs/_overview/wave1-findings/F1-6-tasks.md(F1-6 任务清单 4 个 sprint)
- docs/audit/changes/2026-05-05__f1_6_sprint1_sandbox_replay_kickoff.md(Sprint 1 审计)

修改:
- apps/backend/app/services/fdw_queries.py:218-238
  get_last_visit_days 改 thin wrapper,委托 sandbox_replay.consumption_replay
  保持 75+ 现有调用点无感兼容

关键 bug 修复:@trace_service + @runtime_aware 嵌套 sig.bind 失败
- 现象:仅 FastAPI 请求 + 有活跃 TraceContext 时复现
  (直接 Python 脚本 get_current_trace 返回 None,跳过 _build_params_dict)
- 根因:functools.wraps 设置 __wrapped__ 让 inspect.signature 追溯原函数
  → bind 时缺 keyword-only 必传 ctx → TypeError
- 修复:_decorator.py 不用 functools.wraps,手动复制元信息但不设
  __wrapped__,inspect.signature 看到 wrapper 自身 (*args, **kwargs)

测试:
- unit test 10/10 PASS(本地 .gitignore:71 不入仓)
- 直接 customer_service.get_customer_detail PASS(独立诊断脚本)
- etl_conn 复用模式 PASS

MCP 双口径(member=2799207087163141 黄先生):
- 4a live (today=2026-05-05): daysSinceVisit=32(05-05 - 04-03)
- 4b sandbox=2026-04-20: daysSinceVisit=31(04-20 - 03-20)
  通过插 walkthrough 测试快照 stat_date=2026-04-15 演示
  (测试库 dws_member_consumption_summary 仅 stat_date=2026-05-01 一行,
   sandbox=4-20 时被视图层 stat_date <= business_date_now() 过滤)
- 测试快照已清理,sandbox 已切回 live

Sprint 2-4 待启动(见 F1-6-tasks.md):
- Sprint 2:5 个会员 P1 指标(余额/60d 消费/累计交易/GMV/累计服务客户数)
- Sprint 3:5 个助教/门店 P1 + MP-2 完整(含 ETL 改造)
- Sprint 4:5 个 P2 算法重算指标

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Neo
2026-05-05 23:53:19 +08:00
parent 5d4da0ae8c
commit 9f1e35d71a
6 changed files with 522 additions and 51 deletions

View File

@@ -0,0 +1,42 @@
"""沙箱时光机重算引擎 (Sandbox Replay Engine)
F1-6 沙箱时光机阶段 B 启动模块。
设计意图:
P20 沙箱(`runtime_mode='sandbox' + sandbox_business_date`)的"时光机"语义
要求所有业务指标按 sandbox_business_date 重新累计/截断,而非展示 ETL
跑批的最新累计。F1-5a/F1-5b 已建立基础设施(RuntimeContext + app 视图
业务日上界),本模块统一所有"runtime-aware"业务读取路径。
完整 spec:
docs/_overview/sandbox-replay-engine-spec.md
模块结构(随 sprint 推进逐步填充):
sandbox_replay/
├── __init__.py # 本文件,re-export 主要 API
├── _decorator.py # @runtime_aware decorator
├── consumption_replay.py # 消费/到店/累计交易(P1-2/3/4/12/13)
├── balance_replay.py # 会员余额(P1-1)— sprint 2
├── assistant_metrics_replay.py # 助教课时/收入/客户(P1-5/6/7/8)— sprint 3
├── salary_replay.py # MP-2 完整 daily salary(P2-17)— sprint 3
├── adjustments_replay.py # Excel 修正截断(P2-19)— sprint 3
├── tasks_replay.py # 任务完成率(P2-18)— sprint 4
├── rs_replay.py # RS 算法重算(P2-15)— sprint 4
└── intimacy_replay.py # 客户黏性(P2-16)— sprint 4
接口契约(sprint 1):
@runtime_aware(metric='last_visit_days')
def get_last_visit_days(conn, site_id: int, member_ids: list[int]) -> dict[int, int | None]:
'''函数体内自动接收 RuntimeContext,可读 ctx.is_sandbox / ctx.business_date。'''
...
"""
from app.services.sandbox_replay._decorator import runtime_aware
from app.services.sandbox_replay.consumption_replay import (
get_last_visit_days as get_last_visit_days_replay,
)
__all__ = [
"runtime_aware",
"get_last_visit_days_replay",
]

View File

@@ -0,0 +1,104 @@
"""runtime_aware decorator: 自动注入 RuntimeContext 的函数装饰器。
F1-6 沙箱时光机阶段 B Sprint 1 核心组件。
使用模式:
@runtime_aware(metric='last_visit_days')
def get_last_visit_days(
conn,
site_id: int,
member_ids: list[int],
*,
ctx: RuntimeContext, # 由 decorator 自动注入
) -> dict[int, int | None]:
# 函数内可直接读 ctx.is_sandbox / ctx.business_date /
# ctx.sandbox_instance_id 等
ref_date = ctx.business_date # sandbox 模式下取 sandbox_date
...
设计要点:
1. decorator 通过 site_id 参数(必须显式传)调用 get_runtime_context
2. ctx 作为 keyword-only 参数注入,被装饰函数显式接收
3. 兼容现有调用:若调用方已传 ctx kwarg,跳过自动注入(便于测试 mock)
4. 不强制 cache RuntimeContext(get_runtime_context 内部已无缓存,每次查 DB
是预期行为,F1-5a 共识)
性能考虑:
每次调用查 biz.site_runtime_context 一次(单行 SELECT,~1ms),
可接受。后续若需性能优化,可在 _fdw_context 层做 connection-level 缓存。
"""
from __future__ import annotations
import inspect
from typing import Any, Callable
from app.services.runtime_context import RuntimeContext, get_runtime_context
# 全局开关:metric_name → 监控/审计用,目前仅记录到 logger,后续可接 trace
_REGISTERED_METRICS: dict[str, Callable] = {}
def runtime_aware(metric: str) -> Callable[[Callable], Callable]:
"""decorator 工厂:自动注入 RuntimeContext。
被装饰函数签名约束:
- 第一个位置参数 conn(数据库连接)
- 第二个位置参数 site_id(多门店隔离 + ctx 查询键)
- 接受 keyword-only `ctx: RuntimeContext` 参数
Args:
metric: 指标名,用于注册表 + 日志 + 后续 trace 标注
Returns:
装饰器
"""
def _decorator(func: Callable) -> Callable:
sig = inspect.signature(func)
if "ctx" not in sig.parameters:
raise TypeError(
f"runtime_aware: {func.__name__} 必须接受 keyword-only `ctx: RuntimeContext` 参数"
)
_REGISTERED_METRICS[metric] = func
def _wrapper(*args: Any, **kwargs: Any) -> Any:
# 若调用方显式传了 ctx(测试场景),跳过自动注入
if "ctx" in kwargs:
return func(*args, **kwargs)
# 解析 conn + site_id(必须是前两个位置参数)
if len(args) < 2:
raise TypeError(
f"runtime_aware {func.__name__}: 必须传 (conn, site_id, ...)"
)
conn, site_id = args[0], args[1]
ctx = get_runtime_context(site_id, conn=conn)
return func(*args, ctx=ctx, **kwargs)
# 手动复制元信息(不用 functools.wraps,避免 __wrapped__ 让外层
# @trace_service 的 inspect.signature 追溯到原函数,导致 sig.bind
# 因 ctx 必传参数缺失而抛 TypeError)
_wrapper.__name__ = func.__name__
_wrapper.__qualname__ = func.__qualname__
_wrapper.__module__ = func.__module__
_wrapper.__doc__ = func.__doc__
# 暴露注册元数据,便于测试 / 调试
_wrapper.__runtime_aware_metric__ = metric # type: ignore[attr-defined]
return _wrapper
return _decorator
def list_registered_metrics() -> list[str]:
"""返回所有已注册的 runtime_aware 指标名,用于审计/调试。"""
return sorted(_REGISTERED_METRICS.keys())
__all__ = [
"runtime_aware",
"list_registered_metrics",
"RuntimeContext",
]

View File

@@ -0,0 +1,97 @@
"""消费/到店相关 sandbox 重算实现。
F1-6 沙箱时光机阶段 B Sprint 1 试点迁移。
覆盖指标(渐进):
- P1-4 距上次到店天数(get_last_visit_days)— sprint 1 已迁移
- P1-2 60 天消费 — sprint 2
- P1-3 累计消费总额 — sprint 2
- P1-12 累计交易笔数 — sprint 2
- P1-13 累计 GMV — sprint 2
设计原则:
- 通过 @runtime_aware decorator 自动注入 RuntimeContext
- 函数体内显式使用 ctx.business_date / ctx.is_sandbox
- 与 fdw_queries 中的原始实现保持完全行为一致(回归安全)
- 原 fdw_queries.get_last_visit_days 保留为 thin wrapper 兼容现有调用
"""
from __future__ import annotations
from typing import Any
from app.services.runtime_context import RuntimeContext
from app.services.sandbox_replay._decorator import runtime_aware
from app.trace.decorators import trace_service
@trace_service(
description_zh="获取最后到店天数(sandbox_replay)",
description_en="Get last visit days (sandbox_replay)",
)
@runtime_aware(metric="last_visit_days")
def get_last_visit_days(
conn: Any,
site_id: int,
member_ids: list[int],
*,
etl_conn: Any = None,
ctx: RuntimeContext,
) -> dict[int, int | None]:
"""批量查询客户距上次到店天数(sandbox_replay 版本)。
迁移自 fdw_queries.get_last_visit_days(F1-5b 已实现 sandbox 行为)。
本函数纯重构,行为完全一致:
- 取业务日为基准(ctx.business_date,sandbox 模式下为 sandbox_date)
- 仅查 stat_date <= business_date 的快照行
- 实时计算 days = business_date - last_consume_date
Args:
conn: zqyy_app 业务库连接
site_id: 门店 ID
member_ids: 待查会员列表
etl_conn: 可选,显式传 ETL 连接(便于测试 mock)
ctx: RuntimeContext(由 @runtime_aware 自动注入,测试时也可显式传)
Returns:
{member_id: days_since_visit} 映射,无记录的会员不在结果中。
last_consume_date 为 NULL 时,值为 None。
"""
if not member_ids:
return {}
# 延迟 import 避免循环依赖
from app.services.fdw_queries import _fdw_context
ref_date = ctx.business_date
result: dict[int, int | None] = {}
with _fdw_context(conn, site_id, etl_conn=etl_conn) as cur:
cur.execute(
"""
SELECT DISTINCT ON (member_id)
member_id,
last_consume_date,
stat_date
FROM app.v_dws_member_consumption_summary
WHERE member_id = ANY(%s)
AND stat_date <= %s
ORDER BY member_id, stat_date DESC
""",
(member_ids, ref_date),
)
for row in cur.fetchall():
mid = row[0]
last_consume = row[1]
if last_consume is None:
result[mid] = None
continue
try:
if hasattr(last_consume, "date"):
last_consume = last_consume.date()
days = (ref_date - last_consume).days
result[mid] = max(days, 0)
except Exception:
result[mid] = None
return result