8.2 KiB
8.2 KiB
ETL 聚合修复与生日字段补齐 — Spec 需求输入
本文档汇总 v8 联调中发现的 4 个需要深度修复的问题,合并为一个 Spec。 用户可复制本文档内容开启 Spec 流程。
背景
v8 联调修复了 11 个 BUG,其中 4 个的当前修复方式是"临时止血",需要更完整的方案:
| 编号 | 原 BUG | 当前临时修复 | 问题 |
|---|---|---|---|
| A | BUG 2 | MAX() 聚合 nickname/level_code | 不精确,应按时间取最后记录;且未来档位不同需分段统计 |
| B | BUG 3+4 | site_id → register_site_id | 单门店可用,多门店场景会漏掉跨店消费的会员 |
| C | BUG 5 | 移除 birthday 字段 | 生日是重要销售线索,应全链路保留 + 支持手动补录 |
| D | BUG 11 | _safe_int() 下游防御 | 应从源头规范 DwdLoadTask.load() 返回值格式 |
需求 A:助教月度聚合 — 档位分段统计
现状
dws_assistant_monthly_summary唯一约束:(site_id, assistant_id, stat_month)assistant_monthly_task.py的_extract_daily_aggregates()按(assistant_id, month)GROUP BY- 助教月内如果升级/降级,nickname/level_code/level_name 会变化,当前用
MAX()取值 MAX(assistant_level_code)取数值最大的等级代码,不是时间上最后的等级
目标
- 月度汇总按
(assistant_id, stat_month, assistant_level_code)分组,保留多行,分别统计各档位的业绩 - 唯一约束改为
(site_id, assistant_id, stat_month, assistant_level_code) - 工资计算(
assistant_salary_task.py)适配:按档位分段计算抽成 - nickname 取值改为按时间取最后一条记录(
ORDER BY date DESC LIMIT 1或窗口函数)
受影响的同类场景(需一并处理)
| 文件 | 方法 | 聚合粒度 | 维度字段 | 处理方式 |
|---|---|---|---|---|
assistant_monthly_task.py |
_extract_daily_aggregates |
assistant_id + month | nickname, level_code, level_name | 改为分段保留 |
assistant_finance_task.py |
_extract_daily_revenue |
date + assistant_id | nickname | 改为按时间取最后 |
assistant_customer_task.py |
_extract_service_pairs |
assistant_id + member_id | nickname | 改为按时间取最后 |
涉及变更
- DDL:
dws.dws_assistant_monthly_summary唯一约束变更 - 代码:
assistant_monthly_task.py、assistant_salary_task.py、assistant_finance_task.py、assistant_customer_task.py - 迁移脚本:
db/etl_feiqiu/migrations/
需求 B:多门店会员查询 — register_site_id 潜在问题
现状
dim_member没有site_id列,只有register_site_id(会员注册门店)- 所有 DWS 任务用
WHERE register_site_id = %s筛选会员 - 单门店场景下无问题
潜在问题
- 会员在 A 店注册、B 店消费 → B 店的 DWS 任务查不到该会员信息
- 会员昵称、手机号等维度信息缺失,DWS 记录不完整
目标
- 评估当前业务是否存在跨店消费场景(如果只有一个门店,可标记为"已知限制"暂不处理)
- 如果需要支持多门店:改为通过事实表的
member_id反查dim_member,不再按register_site_id预筛选 - 受影响的任务:所有使用
_extract_member_info(site_id)的 DWS 任务
建议
- 当前单门店场景下,标记为"已知限制",在代码中加注释说明
- 预留多门店扩展方案:将
WHERE register_site_id = %s改为WHERE member_id IN (SELECT DISTINCT member_id FROM dwd.事实表 WHERE site_id = %s)
需求 C:会员生日字段全链路补齐 + 手动补录
现状
- 上游 API
member_profiles可能返回生日字段(当前可能全为空) - ODS
billiards_ods.member_profiles保留 API 原始 payload(JSON 中有则保留) - DWD
dim_member没有birthday列 → ODS → DWD 装载时未映射 - DWS
member_visit_task.py原本引用birthday,BUG 5 临时修复时移除了
目标
C1:ETL 链路生日字段补齐
- DWD
dim_member主表(不是_ex表)新增birthday DATE列 - DWD 装载映射:
DwdLoadTask的列映射中加入birthday - DWS 恢复
member_birthday引用:member_visit_task.py等任务从dim_member.birthday读取 - SCD2 更新时正常处理 birthday 变化
C2:助教手动补录生日(独立于 ETL 链路)
- 新建表
dwd.dim_member_birthday_manual:member_id BIGINT NOT NULL— 会员 IDbirthday_value DATE NOT NULL— 生日值recorded_by_assistant_id BIGINT— 补录助教 IDrecorded_by_name VARCHAR(50)— 补录人姓名recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()— 补录时间source VARCHAR(20) DEFAULT 'assistant'— 来源标记- 唯一约束:
(member_id, recorded_by_assistant_id)— 同一助教对同一客户只保留最新一条
- 可能有多个助教对同一客户提交生日信息,需要处理冲突:
- 保留所有提交记录(不删除)
- DWS 读取时取最早提交的记录(第一个发现的助教优先),或取最多人提交的相同值
- 与 ETL 链路的
dim_member.birthday隔离:- DWS 读取优先级:
dim_member_birthday_manual(手动值)>dim_member.birthday(API 值) - SCD2 更新
dim_member.birthday时不覆盖手动补录值
- DWS 读取优先级:
- 后端 API:助教提交生日的接口(POST)
- 小程序:助教端提交客户生日的 UI 入口
涉及变更
- DDL:
dwd.dim_member加列 + 新建dwd.dim_member_birthday_manual - DWD 装载:
dwd_load_task.py列映射 - DWS 任务:
member_visit_task.py等恢复 birthday 引用 - 后端 API:新增生日提交接口
- 小程序:助教端 UI
- 迁移脚本
需求 D:DwdLoadTask 返回值格式规范化
现状
DwdLoadTask.load()返回{"tables": summary, "errors": errors},其中errors是list[dict]- 其他任务返回
{"counts": {"fetched": 0, "inserted": 10, "errors": 0}},errors是int BaseTask._accumulate_counts()对list类型做setdefault保留原值flow_runner.py的sum()遇到list类型崩溃- 当前
_safe_int()是下游防御,不是根本修复
目标
DwdLoadTask.load()返回值中errors改为int(len(errors))- 错误详情放到单独的 key(如
error_details: list[dict]) BaseTask._accumulate_counts()对list类型做len()累加(防御层保留)flow_runner.py的_safe_int()保留作为最终防御层
涉及变更
apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py— load() 返回值apps/etl/connectors/feiqiu/tasks/base_task.py— _accumulate_counts()apps/etl/connectors/feiqiu/orchestration/flow_runner.py— _safe_int() 保留
优先级建议
| 需求 | 优先级 | 理由 |
|---|---|---|
| D | P0 | 改动最小,消除类型不一致的隐患 |
| C1 | P1 | 生日字段是重要销售线索,ETL 链路先通 |
| B | P1 | 加注释标记已知限制,预留扩展方案 |
| A | P2 | 涉及 DDL 变更 + 多任务重写,影响面最大 |
| C2 | P2 | 依赖后端 API + 小程序 UI,需要跨团队协调 |
关键 DDL 参考
dim_member 当前结构(无 birthday)
CREATE TABLE dim_member (
member_id BIGINT,
system_member_id BIGINT,
tenant_id BIGINT,
register_site_id BIGINT, -- 注意:没有 site_id
mobile TEXT,
nickname TEXT,
member_card_grade_code BIGINT,
member_card_grade_name TEXT,
create_time TIMESTAMPTZ,
update_time TIMESTAMPTZ,
pay_money_sum NUMERIC(18,2),
recharge_money_sum NUMERIC(18,2),
SCD2_start_time TIMESTAMPTZ,
SCD2_end_time TIMESTAMPTZ,
SCD2_is_current INT,
SCD2_version INT,
PRIMARY KEY (member_id, scd2_start_time)
);
dws_assistant_monthly_summary 当前唯一约束
CONSTRAINT uk_dws_assistant_monthly UNIQUE (site_id, assistant_id, stat_month)
-- 需改为: UNIQUE (site_id, assistant_id, stat_month, assistant_level_code)
DwdLoadTask.load() 当前返回值
return {"tables": summary, "errors": errors}
# errors 是 list[dict],如 [{"table": "dim_assistant_ex", "error": "year -1 is out of range"}]
# 应改为: return {"tables": summary, "errors": len(errors), "error_details": errors}