"""会员卡余额相关 sandbox 重算实现。 F1-6 沙箱时光机阶段 B Sprint 2 #4 新建。 覆盖指标: - P1-1 储值卡余额(get_member_balance)— sprint 2 #4 设计要点: - **数据源是 SCD2 维度表 `dim_member_card_account`**(非 daily 累计快照), 原生支持时光机回溯,不需要走 dws daily 快照层 - live 模式下 ctx.business_date=today,SCD2 时光机过滤效果等同 `scd2_is_current=1`(假设 ETL SCD2 跑批正确) - sandbox 模式下 ctx.business_date=sandbox_date,SCD2 时光机过滤回溯 到该日期当天结束时仍 active 的卡版本 - 多卡聚合(2026-03-29 修复多卡膨胀)保留:`SUM(balance) GROUP BY tenant_member_id` - NULL → Decimal('0') 沿用原 fdw_queries 行为 时光机 SQL 关键: -- live: WHERE scd2_is_current=1 -- sandbox: WHERE scd2_start_time < (ref_date + 1 day) -- AND (scd2_end_time IS NULL OR scd2_end_time >= (ref_date + 1 day)) -- ref_date+1day 边界 = ref_date 当天结束时仍 active 的版本 口径未拆分: `get_member_balance` 当前直接 SUM 全部 balance 不区分储值卡 / 赠送卡。 本 sprint 保持口径不变,cash/gift 拆分若有业务需求登记到架构演进 backlog。 """ from __future__ import annotations from decimal import Decimal from typing import Any from app.services.runtime_context import RuntimeContext from app.services.sandbox_replay._decorator import runtime_aware from app.trace.decorators import trace_service @trace_service( description_zh="获取会员储值卡余额(sandbox_replay)", description_en="Get member card balance (sandbox_replay)", ) @runtime_aware(metric="member_balance") def get_member_balance( conn: Any, site_id: int, member_ids: list[int], *, etl_conn: Any = None, ctx: RuntimeContext, ) -> dict[int, Decimal]: """批量查询会员储值卡余额(sandbox_replay 版本)。 迁移自 fdw_queries.get_member_balance,行为完全一致(live 模式下), 新增 sandbox 时光机能力。 数据源约束(DQ-7): 通过 tenant_member_id 关联 `app.v_dim_member_card_account`, 禁止使用 settlement_head.member_card_type_name(自 2025-07-21 全为 NULL)。 多卡聚合: 同一会员可能持有多张卡(储值卡 / 赠送卡 / 台费卡 / 活动券 / 酒水卡), 需 SUM(balance) 聚合,避免多卡返回多行造成调用方 dict 覆盖丢失。 Args: conn: zqyy_app 业务库连接 site_id: 门店 ID member_ids: 会员 ID 列表 etl_conn: 可选,显式传 ETL 连接(便于测试 mock) ctx: RuntimeContext(由 @runtime_aware 自动注入) Returns: {member_id: Decimal} 映射。无卡的 member_id 不出现在返回 dict 中 (沿用原 fdw_queries 行为)。SUM 为 NULL 时返回 Decimal('0')。 """ if not member_ids: return {} from app.services.fdw_queries import _fdw_context ref_date = ctx.business_date result: dict[int, Decimal] = {} with _fdw_context(conn, site_id, etl_conn=etl_conn) as cur: # SCD2 时光机过滤:取 ref_date 当天结束时仍 active 的卡版本。 # ref_date+1day 作为右边界,使比较语义在 timestamptz 精度下保持稳定。 # 多卡 SUM 聚合保留(2026-03-29 修复)。 cur.execute( """ SELECT tenant_member_id AS member_id, SUM(balance) AS balance FROM app.v_dim_member_card_account WHERE tenant_member_id = ANY(%s) AND scd2_start_time < (%s::date + INTERVAL '1 day') AND (scd2_end_time IS NULL OR scd2_end_time >= (%s::date + INTERVAL '1 day')) GROUP BY tenant_member_id """, (member_ids, ref_date, ref_date), ) for row in cur.fetchall(): result[row[0]] = ( Decimal(str(row[1])) if row[1] is not None else Decimal("0") ) return result