11 KiB
实现计划:ETL 聚合修复与生日字段补齐
概述
按优先级(D → C1 → B → A → C2)逐步实施,每个需求独立可部署。代码变更集中在 ETL Connector 的 tasks 层和后端 API,DDL 变更通过迁移脚本执行。
任务
-
1. 需求 D:DwdLoadTask 返回值格式规范化
-
1.1 修改 DwdLoadTask.load() 返回值格式
- 在
apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py的load()方法中,将return {"tables": summary, "errors": errors}改为return {"tables": summary, "errors": len(errors), "error_details": errors} - 需求: 1.1
- 在
-
1.2 增强 BaseTask._accumulate_counts() 防御层
- 在
apps/etl/connectors/feiqiu/tasks/base_task.py的_accumulate_counts()方法中,增加isinstance(value, list)分支,将 list 转为len()后累加 - 需求: 1.2
- 在
-
1.3 编写属性测试:DwdLoadTask 返回值格式一致性
- Property 1: DwdLoadTask 返回值格式一致性
- 验证: 需求 1.1
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_return_format_properties.py
-
1.4 编写属性测试:_accumulate_counts 类型安全累加
- Property 2: _accumulate_counts 类型安全累加
- 验证: 需求 1.2
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_return_format_properties.py
-
-
2. 检查点 — 需求 D 完成
- 确保所有测试通过,ask the user if questions arise.
-
3. 需求 C1:会员生日字段 ETL 链路补齐
-
3.1 编写迁移脚本:ODS/DWD 加 birthday 列
- 创建
db/etl_feiqiu/migrations/2026-02-22__C1_dim_member_add_birthday.sql - ODS:
ALTER TABLE ods.member_profiles ADD COLUMN IF NOT EXISTS birthday DATE; - DWD:
ALTER TABLE dwd.dim_member ADD COLUMN IF NOT EXISTS birthday DATE; - 包含回滚语句和验证 SQL
- 需求: 4.1
- 创建
-
[-] 3.1a 在测试库执行迁移脚本 C1
- 在
test_etl_feiqiu上执行2026-02-22__C1_dim_member_add_birthday.sql - 执行验证 SQL 确认列已添加
- 需求: 4.1
- 在
-
3.2 更新 ODS 入库逻辑:提取 birthday 字段
- 在 ODS 入库任务中增加
birthday字段的 JSON 提取映射 - 确认
ods_tasks.py中member_profiles的字段列表包含birthday - 需求: 4.2
- 在 ODS 入库任务中增加
-
3.3 验证 DwdLoadTask 自动列映射包含 birthday
- DwdLoadTask 通过
_get_columns()自动读取 DWD 表列名,确认birthday被自动包含在列映射中 - SCD2 变化检测自动包含所有非 SCD2 元数据列,确认
birthday参与变化检测 - 需求: 4.2, 4.3
- DwdLoadTask 通过
-
3.4 恢复 DWS 任务中的 birthday 引用
- 修改
member_visit_task.py的_extract_member_info()SQL,加入birthday字段 - 修改
member_consumption_task.py的_extract_member_info()SQL,加入birthday字段 - 修改 DWS 任务的
transform()方法,将member_birthday写入输出记录 - 需求: 4.4
- 修改
-
3.5 编写属性测试:birthday ODS→DWD 装载正确性
- Property 7: birthday ODS→DWD 装载正确性
- 验证: 需求 4.2
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py
-
3.6 编写属性测试:birthday SCD2 变化检测
- Property 8: birthday SCD2 变化检测
- 验证: 需求 4.3
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py
-
-
4. 检查点 — 需求 C1 完成
- 确保所有测试通过,ask the user if questions arise.
-
5. 需求 B:多门店会员查询支持
-
5.1 修改 member_visit_task.py 的 _extract_member_info()
- 将
WHERE register_site_id = %s改为通过dwd_settlement_head事实表的tenant_member_id反查 - 需求: 3.1, 3.2
- 将
-
5.2 修改 member_consumption_task.py 的 _extract_member_info()
- 将
WHERE register_site_id = %s改为通过dwd_settlement_head事实表的tenant_member_id反查 - 同时修改
dim_member_card_account查询,改为通过事实表反查 - 需求: 3.1, 3.2
- 将
-
5.3 修改 assistant_customer_task.py 的 _extract_member_info()
- 将
WHERE register_site_id = %s改为通过dwd_assistant_service_log事实表的tenant_member_id反查 - 需求: 3.1, 3.2
- 将
-
5.4 修改 finance_recharge_task.py 的 dim_member_card_account 查询
- 将
WHERE register_site_id = %s改为通过事实表反查 - 需求: 3.1
- 将
-
5.5 编写属性测试:跨店会员可查
- Property 6: 跨店会员可查
- 验证: 需求 3.1, 3.2
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_multi_store_properties.py
-
-
6. 检查点 — 需求 B 完成
- 确保所有测试通过,ask the user if questions arise.
-
7. 需求 A:助教月度聚合按档位分段统计
-
7.1 编写迁移脚本:唯一约束变更
- 创建
db/etl_feiqiu/migrations/2026-02-22__A_monthly_summary_uk_change.sql - DROP 旧约束
uk_dws_assistant_monthly,ADD 新约束(site_id, assistant_id, stat_month, assistant_level_code) - 包含回滚语句和验证 SQL
- 需求: 2.2
- 创建
-
7.1a 在测试库执行迁移脚本 A
- 在
test_etl_feiqiu上执行2026-02-22__A_monthly_summary_uk_change.sql - 执行验证 SQL 确认约束已变更(
SELECT conname FROM pg_constraint ...) - 需求: 2.2
- 在
-
7.2 修改 AssistantMonthlyTask._extract_daily_aggregates()
- GROUP BY 加入
assistant_level_code, assistant_level_name - nickname 改为
(ARRAY_AGG(assistant_nickname ORDER BY stat_date DESC))[1] - 需求: 2.1, 2.3
- GROUP BY 加入
-
7.3 修改 AssistantMonthlyTask._process_month() 适配多行
- 确认
_process_month()能正确处理同一助教多个档位的聚合数据 - 每行使用自己的
assistant_level_code进行档位匹配和排名计算 - 需求: 2.1
- 确认
-
7.4 修改 AssistantSalaryTask 适配档位分段工资计算
_extract_monthly_summary()已能返回多行(同一助教不同档位)transform()遍历每行分别计算工资,按档位使用对应的level_price和tier- 需求: 2.4
-
7.5 修改 AssistantFinanceTask._extract_daily_revenue() nickname 取值
- 将
MAX(s.nickname)改为(ARRAY_AGG(s.nickname ORDER BY s.start_use_time DESC))[1] - 需求: 2.5
- 将
-
7.6 修改 AssistantCustomerTask._extract_service_pairs() nickname 取值
- 将
MAX(assistant_nickname)改为(ARRAY_AGG(assistant_nickname ORDER BY service_date DESC))[1] - 需求: 2.6
- 将
-
7.7 编写属性测试:档位分段聚合正确性
- Property 3: 档位分段聚合正确性
- 验证: 需求 2.1
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py
-
7.8 编写属性测试:nickname 按时间倒序取值
- Property 4: nickname 按时间倒序取值
- 验证: 需求 2.3, 2.5, 2.6
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py
-
7.9 编写属性测试:工资按档位分段计算
- Property 5: 工资按档位分段计算
- 验证: 需求 2.4
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py
-
-
8. 检查点 — 需求 A 完成
- 确保所有测试通过,ask the user if questions arise.
-
9. 需求 C2:助教手动补录会员生日
-
9.1 编写迁移脚本:创建 member_birthday_manual 表
- 创建
db/zqyy_app/migrations/2026-02-22__C2_member_birthday_manual.sql - 在
zqyy_app/test_zqyy_app中创建member_birthday_manual表 - 包含回滚语句和验证 SQL
- 需求: 5.1
- 创建
-
9.1a 在测试库执行迁移脚本 C2
- 在
test_zqyy_app上执行2026-02-22__C2_member_birthday_manual.sql - 执行验证 SQL 确认表和约束已创建
- 需求: 5.1
- 在
-
9.2 编写 FDW 映射脚本:ETL 库读取业务库
- 创建
db/fdw/setup_fdw_reverse.sql(etl_feiqiu → zqyy_app 方向) - 创建
db/fdw/setup_fdw_reverse_test.sql(test 环境版本) - 在 ETL 库中创建
fdw_app.member_birthday_manual外部表 - 需求: 5.3
- 创建
-
9.2a 在测试库执行 FDW 映射脚本
- 在
test_etl_feiqiu上执行setup_fdw_reverse_test.sql - 验证
fdw_app.member_birthday_manual外部表可读取 - 需求: 5.3
- 在
-
9.3 修改 DWS 任务:生日读取优先级逻辑
- 在
member_visit_task.py和member_consumption_task.py的_extract_member_info()中,使用COALESCE(fdw_app.member_birthday_manual, dim_member.birthday)逻辑 - 增加 FDW 连接失败的降级处理
- 需求: 5.4
- 在
-
9.4 实现后端 API:生日提交接口
- 创建
apps/backend/app/routers/member_birthday.py - 实现
POST /member-birthday接口,执行 UPSERT - 创建 Pydantic schema
apps/backend/app/schemas/member_birthday.py - 在
apps/backend/app/main.py中注册路由 - 需求: 5.5
- 创建
-
9.5 编写属性测试:生日 UPSERT 幂等性
- Property 9: 生日 UPSERT 幂等性
- 验证: 需求 5.2, 5.5
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py
-
9.6 编写属性测试:手动补录优先于 API 来源
- Property 10: 手动补录优先于 API 来源
- 验证: 需求 5.4
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py
-
9.7 编写属性测试:SCD2 更新不影响手动补录表
- Property 11: SCD2 更新不影响手动补录表
- 验证: 需求 5.6
- 文件:
apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py
-
-
10. 收尾:主 DDL 合并与文档更新
-
10.1 合并迁移变更到主 DDL 文件
- 将
birthday列定义合并到db/etl_feiqiu/schemas/ods.sql(member_profiles表) - 将
birthday列定义合并到db/etl_feiqiu/schemas/dwd.sql(dim_member表) - 将新唯一约束合并到
db/etl_feiqiu/schemas/dws.sql(dws_assistant_monthly_summary表) - 将
member_birthday_manual表定义合并到db/zqyy_app/schemas/init.sql - 将 FDW 反向映射合并到
db/fdw/setup_fdw.sql和db/fdw/setup_fdw_test.sql
- 将
-
10.2 更新数据库文档
- 在
docs/database/中新建或更新受影响表的文档:dim_member:新增birthday列说明dws_assistant_monthly_summary:唯一约束变更说明member_birthday_manual:新建表文档(含 FDW 映射说明)
- 遵循现有
BD_Manual_*.md命名规范
- 在
-
10.3 最终验证
- 确保所有测试通过
- 确认主 DDL 文件与测试库实际结构一致
- ask the user if questions arise
-
备注
- 标记
*的任务为可选测试任务,可跳过以加速 MVP - 每个需求独立可部署,检查点确保增量验证
- 迁移脚本需在
test_etl_feiqiu/test_zqyy_app上先行验证 - 属性测试使用 hypothesis,每个属性最少 100 次迭代
- 单元测试使用 FakeDB/FakeAPI,不依赖真实数据库