Files

15 KiB
Raw Permalink Blame History

设计文档SPI 消费力指数Spending Power Index

概述

SPI 是 NeoZQYY 指数体系的第 7 个指数(继 WBI/NCI/RS/OS/MS/ML 之后),粒度为 (site_id, member_id),用于衡量会员在门店内的综合消费力层级。

SPI 采用"主分 + 子分"结构:

  • Level消费水平基于消费金额和客单价的 log1p 压缩加权
  • Speed消费速度基于绝对速度、相对速度、EWMA 速度的加权
  • Stability消费稳定性基于近 90 天周覆盖率

SPI 不继承 MemberIndexBaseTask(该基类为 WBI/NCI 共享的会员分群逻辑SPI 不需要 NEW/OLD/STOP 分群),而是直接继承 BaseIndexTask,自行实现数据提取和评分逻辑。

设计决策

  1. 继承 BaseIndexTask 而非 MemberIndexBaseTaskSPI 不需要会员分群NEW/OLD/STOP所有有消费记录的会员均参与计算。MemberIndexBaseTask 的 _build_member_activity 提取的特征intervals、t_v/t_r/t_a 等)与 SPI 需求不匹配,复用反而增加耦合。
  2. 独立数据提取SPI 需要按周聚合、日消费序列等 MemberIndexBaseTask 不提供的特征,因此自行编写 SQL 提取逻辑。
  3. 金额压缩基数自动校准:首次执行时从门店数据计算中位数作为基数,后续可通过 cfg_index_parameters 手动覆盖。
  4. 子分独立映射Level/Speed/Stability 各自独立做 batch_normalize_to_display使用不同的 index_type 后缀SPI_LEVEL/SPI_SPEED/SPI_STABILITY隔离分位历史。

架构

graph TD
    subgraph 数据来源
        A[dwd.dwd_settlement_head<br/>消费订单]
        B[dwd.dwd_recharge_order<br/>充值订单]
    end

    subgraph SpendingPowerIndexTask
        C[extract_spending_features<br/>提取基础特征]
        D[calculate_level<br/>Level 子分]
        E[calculate_speed<br/>Speed 子分]
        F[calculate_stability<br/>Stability 子分]
        G[calculate_spi_raw<br/>SPI 总分合成]
        H[batch_normalize_to_display<br/>展示分映射]
    end

    subgraph 配置
        I[cfg_index_parameters<br/>index_type='SPI']
    end

    subgraph 输出
        J[dws.dws_member_spending_power_index]
        K[dws.dws_index_percentile_history]
    end

    A --> C
    B --> C
    I --> C
    I --> D
    I --> E
    I --> F
    I --> G
    C --> D
    C --> E
    C --> F
    D --> G
    E --> G
    F --> G
    G --> H
    H --> J
    H --> K

继承体系

BaseTask
  └── BaseDwsTask
        └── BaseIndexTask
              ├── MemberIndexBaseTask   ← WBI / NCI不使用
              ├── RelationIndexTask     ← RS/OS/MS/ML
              ├── MlManualImportTask    ← ML 台账导入
              └── SpendingPowerIndexTask ← SPI新增

组件与接口

SpendingPowerIndexTask

继承 BaseIndexTask,实现以下接口:

class SpendingPowerIndexTask(BaseIndexTask):
    INDEX_TYPE = "SPI"
    
    DEFAULT_PARAMS = {
        # 窗口参数
        'spend_window_short_days': 30,
        'spend_window_long_days': 90,
        'ewma_alpha_daily_spend': 0.3,
        # 金额压缩基数(初始默认值,可被自动校准或配置表覆盖)
        'amount_base_spend_30': 500.0,
        'amount_base_spend_90': 1500.0,
        'amount_base_ticket_90': 200.0,
        'amount_base_recharge_90': 1000.0,
        'amount_base_speed_abs': 100.0,
        'amount_base_ewma_90': 50.0,
        # Level 子分权重
        'w_level_spend_30': 0.30,
        'w_level_spend_90': 0.35,
        'w_level_ticket_90': 0.20,
        'w_level_recharge_90': 0.15,
        # Speed 子分权重
        'w_speed_abs': 0.50,
        'w_speed_rel': 0.30,
        'w_speed_ewma': 0.20,
        # 总分权重
        'weight_level': 0.60,
        'weight_speed': 0.30,
        'weight_stability': 0.10,
        # 稳定性参数
        'stability_window_days': 90,
        'use_stability': 1,
        # 映射与平滑
        'percentile_lower': 5,
        'percentile_upper': 95,
        'compression_mode': 1,  # log1p
        'use_smoothing': 1,
        'ewma_alpha': 0.2,
        # 速度计算
        'speed_epsilon': 1e-6,
    }

    # --- 必须实现的抽象方法 ---
    def get_task_code(self) -> str: ...
    def get_target_table(self) -> str: ...
    def get_primary_keys(self) -> List[str]: ...
    def get_index_type(self) -> str: ...
    
    # --- 核心执行流程 ---
    def execute(self, context=None) -> Dict[str, Any]: ...
    
    # --- 数据提取 ---
    def _extract_spending_features(self, site_id, params) -> Dict[int, SPIMemberFeatures]: ...
    def _extract_recharge_features(self, site_id, params) -> Dict[int, RechargeFeatures]: ...
    def _calibrate_amount_bases(self, features, params) -> Dict[str, float]: ...
    
    # --- 子分计算(纯函数,可独立测试) ---
    @staticmethod
    def compute_level(features, params) -> float: ...
    @staticmethod
    def compute_speed(features, params) -> float: ...
    @staticmethod
    def compute_stability(features, params) -> float: ...
    @staticmethod
    def compute_spi_raw(level, speed, stability, params) -> float: ...
    
    # --- 持久化 ---
    def _save_spi_data(self, data_list, site_id) -> int: ...

关键设计:子分计算为静态方法

compute_levelcompute_speedcompute_stabilitycompute_spi_raw 设计为 @staticmethod,不依赖数据库或任务实例状态。这使得属性测试可以直接调用这些纯函数,无需 mock 数据库连接。

SPIMemberFeatures 数据类

@dataclass
class SPIMemberFeatures:
    """SPI 计算所需的会员级特征"""
    member_id: int
    site_id: int
    
    # 基础特征
    spend_30: float = 0.0          # 近30天消费总额
    spend_90: float = 0.0          # 近90天消费总额
    recharge_90: float = 0.0       # 近90天充值总额
    orders_30: int = 0             # 近30天消费笔数
    orders_90: int = 0             # 近90天消费笔数
    visit_days_30: int = 0         # 近30天消费日数按天去重
    visit_days_90: int = 0         # 近90天消费日数按天去重
    avg_ticket_90: float = 0.0     # 90天客单价
    active_weeks_90: int = 0       # 近90天有消费的自然周数
    daily_spend_ewma_90: float = 0.0  # 日消费 EWMA
    
    # 子分
    score_level_raw: float = 0.0
    score_speed_raw: float = 0.0
    score_stability_raw: float = 0.0
    
    # 展示分(归一化后填充)
    score_level_display: float = 0.0
    score_speed_display: float = 0.0
    score_stability_display: float = 0.0
    
    # 总分
    raw_score: float = 0.0
    display_score: float = 0.0

数据模型

dws.dws_member_spending_power_index 表结构

CREATE TABLE dws.dws_member_spending_power_index (
    spi_id          BIGSERIAL PRIMARY KEY,
    site_id         INTEGER NOT NULL,
    member_id       BIGINT NOT NULL,
    
    -- 基础特征
    spend_30        NUMERIC(14,2) DEFAULT 0,
    spend_90        NUMERIC(14,2) DEFAULT 0,
    recharge_90     NUMERIC(14,2) DEFAULT 0,
    orders_30       INTEGER DEFAULT 0,
    orders_90       INTEGER DEFAULT 0,
    visit_days_30   INTEGER DEFAULT 0,
    visit_days_90   INTEGER DEFAULT 0,
    avg_ticket_90   NUMERIC(14,2) DEFAULT 0,
    active_weeks_90 INTEGER DEFAULT 0,
    daily_spend_ewma_90 NUMERIC(14,2) DEFAULT 0,
    
    -- 子分Raw
    score_level_raw     NUMERIC(10,4) DEFAULT 0,
    score_speed_raw     NUMERIC(10,4) DEFAULT 0,
    score_stability_raw NUMERIC(10,4) DEFAULT 0,
    
    -- 子分Display 0-10
    score_level_display     NUMERIC(5,2) DEFAULT 0,
    score_speed_display     NUMERIC(5,2) DEFAULT 0,
    score_stability_display NUMERIC(5,2) DEFAULT 0,
    
    -- 总分
    raw_score       NUMERIC(10,4) DEFAULT 0,
    display_score   NUMERIC(5,2) DEFAULT 0,
    
    -- 元数据
    calc_time       TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    created_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at      TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- 唯一约束(业务主键)
CREATE UNIQUE INDEX idx_spi_site_member 
    ON dws.dws_member_spending_power_index (site_id, member_id);

-- 查询索引
CREATE INDEX idx_spi_display_score 
    ON dws.dws_member_spending_power_index (site_id, display_score DESC);

cfg_index_parameters 新增种子数据

db/etl_feiqiu/seeds/seed_index_parameters.sql 中追加 index_type='SPI' 的参数行,格式与现有 WBI/NCI 参数一致。

执行流程

sequenceDiagram
    participant Scheduler as 调度器
    participant Task as SpendingPowerIndexTask
    participant DB as PostgreSQL
    participant Base as BaseIndexTask

    Scheduler->>Task: execute(context)
    Task->>DB: 获取 site_id
    Task->>Base: load_index_parameters('SPI')
    Base->>DB: SELECT FROM cfg_index_parameters
    Base-->>Task: params dict
    
    Task->>DB: 提取消费订单近90天
    Task->>DB: 提取充值订单近90天
    Task->>Task: 聚合会员级特征
    Task->>Task: 校准金额压缩基数(如需)
    
    loop 每个会员
        Task->>Task: compute_level(features, params)
        Task->>Task: compute_speed(features, params)
        Task->>Task: compute_stability(features, params)
        Task->>Task: compute_spi_raw(L, S, P, params)
    end
    
    Task->>Base: batch_normalize_to_display(SPI raw scores)
    Task->>Base: batch_normalize_to_display(Level raw scores)
    Task->>Base: batch_normalize_to_display(Speed raw scores)
    Task->>Base: batch_normalize_to_display(Stability raw scores)
    
    Task->>DB: DELETE FROM dws_member_spending_power_index WHERE site_id = %s
    Task->>DB: INSERT INTO dws_member_spending_power_index (batch)
    Task->>Base: save_percentile_history('SPI')
    Task-->>Scheduler: {status, member_count, records_inserted}

正确性属性

正确性属性Correctness Property是系统在所有合法执行路径上都应成立的行为特征——本质上是对"系统应该做什么"的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。

以下属性基于需求文档中的验收标准推导,每个属性都是可通过 hypothesis 属性测试验证的全称量化命题。子分计算函数(compute_levelcompute_speedcompute_stabilitycompute_spi_raw)设计为纯静态方法,不依赖数据库,可直接用于属性测试。

Property 1: SPI 总分非负性

For any 非负的 Level、Speed、Stability 子分和非负的权重参数,compute_spi_raw(L, S, P, params) 的返回值应为非负。

推导:SPI_raw = w_L × L + w_S × S + w_P × P,当所有子分 ≥ 0 且所有权重 ≥ 0 时,加权和必然 ≥ 0。

Validates: Requirements 6.1, 10.1

Property 2: Level 子分关于消费金额单调非递减

For any 非负的特征值和参数,若仅增加 spend_30spend_90(其他特征不变),compute_level 的返回值不应减少。

推导:L 中每一项形如 w × ln(1 + x/M)ln(1 + x/M) 关于 x 单调递增(x ≥ 0, M > 0),权重 w ≥ 0,因此增加任一消费金额项只会使 L 增加或不变。

Validates: Requirements 3.1, 10.2

Property 3: Speed 子分关于 spend_30 单调非递减

For any 非负的特征值和参数,若仅增加 spend_30(其他特征不变),compute_speed 的返回值不应减少。

推导:

  • V_abs = ln(1 + spend_30 / (max(visit_days_30, 1) × V0)):关于 spend_30 单调递增
  • V_rel = ln((spend_30/30 + ε) / (spend_90/90 + ε))spend_30 增加使分子增大,max(0, V_rel) 不减
  • V_ewma:不依赖 spend_30不变
  • 三项加权和中前两项不减,第三项不变,总和不减

Validates: Requirements 4.1, 4.4, 10.3

Property 4: Stability 子分取值范围 [0, 1]

For any active_weeks_90 在 [0, 13] 范围内,compute_stability 的返回值应在 [0, 1] 范围内。

推导:P = active_weeks_90 / 13,当 active_weeks_90 ∈ {0, 1, ..., 13} 时,P ∈ [0, 1]

Validates: Requirements 5.2, 5.4, 10.4

Property 5: Display Score 取值范围 [0, 10]

For any 非空的 raw_score 列表(所有值非负),经 batch_normalize_to_display 映射后,所有 display_score 应在 [0.00, 10.00] 范围内。

推导:batch_normalize_to_display 内部先 Winsorize 到 [P5, P95],再 MinMax 映射到 [0, 10],最后 max(0, min(10, score)) 截断。

Validates: Requirements 6.6, 10.5

错误处理

场景 处理方式
门店无消费/充值数据 返回 {'status': 'skipped', 'reason': 'no_data'},不写入任何记录
cfg_index_parameters 中缺少 SPI 参数 使用 DEFAULT_PARAMS 字典中的默认值,日志 WARNING
金额压缩基数为 0 或负数 使用 DEFAULT_PARAMS 中的默认基数,日志 WARNING
orders_90 = 0 导致除零 avg_ticket_90 = spend_90 / max(orders_90, 1),分母至少为 1
visit_days_30 = 0 导致除零 V_abs 公式中 max(visit_days_30, 1),分母至少为 1
v_30 和 v_90 均为 0 导致 V_rel 除零 使用 εspeed_epsilon默认 1e-6防除零
所有会员 raw_score 相同 batch_normalize_to_displaymax - min < ε 时返回 5.0
数据库写入失败 事务回滚,抛出异常由调度器处理
EWMA 分位历史不存在(首次执行) 不平滑,直接使用当前分位点

测试策略

属性测试hypothesis

属性测试位于 tests/ 目录Monorepo 级),使用 hypothesis 库。

每个属性测试对应设计文档中的一个 Property最少运行 100 次迭代。

测试文件:tests/test_spi_properties.py

# Feature: spi-spending-power-index, Property 1: SPI 总分非负性
@given(
    level=st.floats(min_value=0, max_value=100),
    speed=st.floats(min_value=0, max_value=100),
    stability=st.floats(min_value=0, max_value=1),
)
@settings(max_examples=200)
def test_spi_raw_non_negative(level, speed, stability):
    params = SpendingPowerIndexTask.DEFAULT_PARAMS
    result = SpendingPowerIndexTask.compute_spi_raw(level, speed, stability, params)
    assert result >= 0

属性测试库:hypothesis(已在项目依赖中)

单元测试

单元测试位于 apps/etl/connectors/feiqiu/tests/unit/,使用 FakeDB/FakeAPI 工具。

重点覆盖:

  • 边界情况:全零输入、单一极大值输入
  • 配置回退:参数缺失时使用默认值
  • 任务注册:验证 task_registry 中 SPI 任务的注册信息
  • use_stability=0 时稳定性子分不参与计算

测试配置

  • 属性测试:cd C:\NeoZQYY && pytest tests/test_spi_properties.py -v
  • 单元测试:cd apps/etl/connectors/feiqiu && pytest tests/unit/test_spi_task.py -v
  • 每个属性测试标注 @settings(max_examples=200)
  • 每个属性测试注释引用设计文档 Property 编号