"""消费/到店相关 sandbox 重算实现。 F1-6 沙箱时光机阶段 B Sprint 1/2 试点迁移。 覆盖指标(渐进): - P1-4 距上次到店天数(get_last_visit_days)— sprint 1 已迁移 - P1-2 60 天消费(get_consumption_60d)— sprint 2 ☚ - P1-3 累计消费总额 — sprint 2 - P1-12 累计交易笔数 — sprint 2 设计原则: - 通过 @runtime_aware decorator 自动注入 RuntimeContext - 函数体内显式使用 ctx.business_date / ctx.is_sandbox - 与 fdw_queries 中的原始实现保持完全行为一致(回归安全) - 原 fdw_queries.* 保留为 thin wrapper 兼容现有调用 """ from __future__ import annotations from decimal import Decimal from typing import Any from app.services.runtime_context import RuntimeContext from app.services.sandbox_replay._decorator import runtime_aware from app.trace.decorators import trace_service @trace_service( description_zh="获取最后到店天数(sandbox_replay)", description_en="Get last visit days (sandbox_replay)", ) @runtime_aware(metric="last_visit_days") def get_last_visit_days( conn: Any, site_id: int, member_ids: list[int], *, etl_conn: Any = None, ctx: RuntimeContext, ) -> dict[int, int | None]: """批量查询客户距上次到店天数(sandbox_replay 版本)。 迁移自 fdw_queries.get_last_visit_days(F1-5b 已实现 sandbox 行为)。 本函数纯重构,行为完全一致: - 取业务日为基准(ctx.business_date,sandbox 模式下为 sandbox_date) - 仅查 stat_date <= business_date 的快照行 - 实时计算 days = business_date - last_consume_date Args: conn: zqyy_app 业务库连接 site_id: 门店 ID member_ids: 待查会员列表 etl_conn: 可选,显式传 ETL 连接(便于测试 mock) ctx: RuntimeContext(由 @runtime_aware 自动注入,测试时也可显式传) Returns: {member_id: days_since_visit} 映射,无记录的会员不在结果中。 last_consume_date 为 NULL 时,值为 None。 """ if not member_ids: return {} # 延迟 import 避免循环依赖 from app.services.fdw_queries import _fdw_context ref_date = ctx.business_date result: dict[int, int | None] = {} with _fdw_context(conn, site_id, etl_conn=etl_conn) as cur: cur.execute( """ SELECT DISTINCT ON (member_id) member_id, last_consume_date, stat_date FROM app.v_dws_member_consumption_summary WHERE member_id = ANY(%s) AND stat_date <= %s ORDER BY member_id, stat_date DESC """, (member_ids, ref_date), ) for row in cur.fetchall(): mid = row[0] last_consume = row[1] if last_consume is None: result[mid] = None continue try: if hasattr(last_consume, "date"): last_consume = last_consume.date() days = (ref_date - last_consume).days result[mid] = max(days, 0) except Exception: result[mid] = None return result # ── P1-2 60 天消费 ────────────────────────────────────── @trace_service( description_zh="获取近 60 天消费金额(sandbox_replay)", description_en="Get 60-day consumption (sandbox_replay)", ) @runtime_aware(metric="consumption_60d") def get_consumption_60d( conn: Any, site_id: int, member_id: int, *, etl_conn: Any = None, ctx: RuntimeContext, ) -> Decimal | None: """查询客户近 60 天消费金额(sandbox_replay 版本)。 迁移自 fdw_queries.get_consumption_60d。行为完全一致: - 取业务日为基准(ctx.business_date,sandbox 模式下为 sandbox_date) - 仅查 stat_date <= business_date 的快照行(显式上界,与视图过滤一致) - 取最新 stat_date 行的 consume_amount_60d 口径(与 board-customer spend60 维度统一): - items_sum,60 天滚动窗口,日粒度 - 该字段由 ETL DWS 跑批时计算,sandbox 切到历史日期时,视图自动按 stat_date <= business_date_now() 过滤,取当时 ETL 写入的近 60 天值 Args: conn: zqyy_app 业务库连接 site_id: 门店 ID member_id: 单个会员 ID(保持原 API 签名) etl_conn: 可选,显式传 ETL 连接(便于测试 mock) ctx: RuntimeContext(由 @runtime_aware 自动注入) Returns: Decimal 金额或 None(无快照 / 字段为 NULL 时) """ # 延迟 import 避免循环依赖 from app.services.fdw_queries import _fdw_context ref_date = ctx.business_date with _fdw_context(conn, site_id, etl_conn=etl_conn) as cur: cur.execute( """ SELECT consume_amount_60d FROM app.v_dws_member_consumption_summary WHERE member_id = %s AND stat_date <= %s ORDER BY stat_date DESC LIMIT 1 """, (member_id, ref_date), ) row = cur.fetchone() return Decimal(str(row[0])) if row and row[0] is not None else None