Files
Neo-ZQYY/docs/spec-input/2026-02-22__etl-aggregation-fix-spec-input.md

8.2 KiB
Raw Permalink Blame History

ETL 聚合修复与生日字段补齐 — Spec 需求输入

本文档汇总 v8 联调中发现的 4 个需要深度修复的问题,合并为一个 Spec。 用户可复制本文档内容开启 Spec 流程。


背景

v8 联调修复了 11 个 BUG其中 4 个的当前修复方式是"临时止血",需要更完整的方案:

编号 原 BUG 当前临时修复 问题
A BUG 2 MAX() 聚合 nickname/level_code 不精确,应按时间取最后记录;且未来档位不同需分段统计
B BUG 3+4 site_id → register_site_id 单门店可用,多门店场景会漏掉跨店消费的会员
C BUG 5 移除 birthday 字段 生日是重要销售线索,应全链路保留 + 支持手动补录
D BUG 11 _safe_int() 下游防御 应从源头规范 DwdLoadTask.load() 返回值格式

需求 A助教月度聚合 — 档位分段统计

现状

  • dws_assistant_monthly_summary 唯一约束:(site_id, assistant_id, stat_month)
  • assistant_monthly_task.py_extract_daily_aggregates()(assistant_id, month) GROUP BY
  • 助教月内如果升级/降级nickname/level_code/level_name 会变化,当前用 MAX() 取值
  • MAX(assistant_level_code) 取数值最大的等级代码,不是时间上最后的等级

目标

  1. 月度汇总按 (assistant_id, stat_month, assistant_level_code) 分组,保留多行,分别统计各档位的业绩
  2. 唯一约束改为 (site_id, assistant_id, stat_month, assistant_level_code)
  3. 工资计算(assistant_salary_task.py)适配:按档位分段计算抽成
  4. nickname 取值改为按时间取最后一条记录(ORDER BY date DESC LIMIT 1 或窗口函数)

受影响的同类场景(需一并处理)

文件 方法 聚合粒度 维度字段 处理方式
assistant_monthly_task.py _extract_daily_aggregates assistant_id + month nickname, level_code, level_name 改为分段保留
assistant_finance_task.py _extract_daily_revenue date + assistant_id nickname 改为按时间取最后
assistant_customer_task.py _extract_service_pairs assistant_id + member_id nickname 改为按时间取最后

涉及变更

  • DDLdws.dws_assistant_monthly_summary 唯一约束变更
  • 代码:assistant_monthly_task.pyassistant_salary_task.pyassistant_finance_task.pyassistant_customer_task.py
  • 迁移脚本:db/etl_feiqiu/migrations/

需求 B多门店会员查询 — register_site_id 潜在问题

现状

  • dim_member 没有 site_id 列,只有 register_site_id(会员注册门店)
  • 所有 DWS 任务用 WHERE register_site_id = %s 筛选会员
  • 单门店场景下无问题

潜在问题

  • 会员在 A 店注册、B 店消费 → B 店的 DWS 任务查不到该会员信息
  • 会员昵称、手机号等维度信息缺失DWS 记录不完整

目标

  1. 评估当前业务是否存在跨店消费场景(如果只有一个门店,可标记为"已知限制"暂不处理)
  2. 如果需要支持多门店:改为通过事实表的 member_id 反查 dim_member,不再按 register_site_id 预筛选
  3. 受影响的任务:所有使用 _extract_member_info(site_id) 的 DWS 任务

建议

  • 当前单门店场景下,标记为"已知限制",在代码中加注释说明
  • 预留多门店扩展方案:将 WHERE register_site_id = %s 改为 WHERE member_id IN (SELECT DISTINCT member_id FROM dwd.事实表 WHERE site_id = %s)

需求 C会员生日字段全链路补齐 + 手动补录

现状

  • 上游 API member_profiles 可能返回生日字段(当前可能全为空)
  • ODS billiards_ods.member_profiles 保留 API 原始 payloadJSON 中有则保留)
  • DWD dim_member 没有 birthday 列 → ODS → DWD 装载时未映射
  • DWS member_visit_task.py 原本引用 birthdayBUG 5 临时修复时移除了

目标

C1ETL 链路生日字段补齐

  1. DWD dim_member 主表(不是 _ex 表)新增 birthday DATE
  2. DWD 装载映射:DwdLoadTask 的列映射中加入 birthday
  3. DWS 恢复 member_birthday 引用:member_visit_task.py 等任务从 dim_member.birthday 读取
  4. SCD2 更新时正常处理 birthday 变化

C2助教手动补录生日独立于 ETL 链路)

  1. 新建表 dwd.dim_member_birthday_manual
    • member_id BIGINT NOT NULL — 会员 ID
    • birthday_value DATE NOT NULL — 生日值
    • recorded_by_assistant_id BIGINT — 补录助教 ID
    • recorded_by_name VARCHAR(50) — 补录人姓名
    • recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW() — 补录时间
    • source VARCHAR(20) DEFAULT 'assistant' — 来源标记
    • 唯一约束:(member_id, recorded_by_assistant_id) — 同一助教对同一客户只保留最新一条
  2. 可能有多个助教对同一客户提交生日信息,需要处理冲突:
    • 保留所有提交记录(不删除)
    • DWS 读取时取最早提交的记录(第一个发现的助教优先),或取最多人提交的相同值
  3. 与 ETL 链路的 dim_member.birthday 隔离:
    • DWS 读取优先级:dim_member_birthday_manual(手动值)> dim_member.birthdayAPI 值)
    • SCD2 更新 dim_member.birthday 时不覆盖手动补录值
  4. 后端 API助教提交生日的接口POST
  5. 小程序:助教端提交客户生日的 UI 入口

涉及变更

  • DDLdwd.dim_member 加列 + 新建 dwd.dim_member_birthday_manual
  • DWD 装载:dwd_load_task.py 列映射
  • DWS 任务:member_visit_task.py 等恢复 birthday 引用
  • 后端 API新增生日提交接口
  • 小程序:助教端 UI
  • 迁移脚本

需求 DDwdLoadTask 返回值格式规范化

现状

  • DwdLoadTask.load() 返回 {"tables": summary, "errors": errors},其中 errorslist[dict]
  • 其他任务返回 {"counts": {"fetched": 0, "inserted": 10, "errors": 0}}errorsint
  • BaseTask._accumulate_counts()list 类型做 setdefault 保留原值
  • flow_runner.pysum() 遇到 list 类型崩溃
  • 当前 _safe_int() 是下游防御,不是根本修复

目标

  1. DwdLoadTask.load() 返回值中 errors 改为 intlen(errors)
  2. 错误详情放到单独的 keyerror_details: list[dict]
  3. BaseTask._accumulate_counts()list 类型做 len() 累加(防御层保留)
  4. flow_runner.py_safe_int() 保留作为最终防御层

涉及变更

  • apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py — load() 返回值
  • apps/etl/connectors/feiqiu/tasks/base_task.py — _accumulate_counts()
  • apps/etl/connectors/feiqiu/orchestration/flow_runner.py — _safe_int() 保留

优先级建议

需求 优先级 理由
D P0 改动最小,消除类型不一致的隐患
C1 P1 生日字段是重要销售线索ETL 链路先通
B P1 加注释标记已知限制,预留扩展方案
A P2 涉及 DDL 变更 + 多任务重写,影响面最大
C2 P2 依赖后端 API + 小程序 UI需要跨团队协调

关键 DDL 参考

dim_member 当前结构(无 birthday

CREATE TABLE dim_member (
    member_id BIGINT,
    system_member_id BIGINT,
    tenant_id BIGINT,
    register_site_id BIGINT,    -- 注意:没有 site_id
    mobile TEXT,
    nickname TEXT,
    member_card_grade_code BIGINT,
    member_card_grade_name TEXT,
    create_time TIMESTAMPTZ,
    update_time TIMESTAMPTZ,
    pay_money_sum NUMERIC(18,2),
    recharge_money_sum NUMERIC(18,2),
    SCD2_start_time TIMESTAMPTZ,
    SCD2_end_time TIMESTAMPTZ,
    SCD2_is_current INT,
    SCD2_version INT,
    PRIMARY KEY (member_id, scd2_start_time)
);

dws_assistant_monthly_summary 当前唯一约束

CONSTRAINT uk_dws_assistant_monthly UNIQUE (site_id, assistant_id, stat_month)
-- 需改为: UNIQUE (site_id, assistant_id, stat_month, assistant_level_code)

DwdLoadTask.load() 当前返回值

return {"tables": summary, "errors": errors}
# errors 是 list[dict],如 [{"table": "dim_assistant_ex", "error": "year -1 is out of range"}]
# 应改为: return {"tables": summary, "errors": len(errors), "error_details": errors}