229 lines
11 KiB
Markdown
229 lines
11 KiB
Markdown
# 实现计划:ETL 聚合修复与生日字段补齐
|
||
|
||
## 概述
|
||
|
||
按优先级(D → C1 → B → A → C2)逐步实施,每个需求独立可部署。代码变更集中在 ETL Connector 的 tasks 层和后端 API,DDL 变更通过迁移脚本执行。
|
||
|
||
## 任务
|
||
|
||
- [x] 1. 需求 D:DwdLoadTask 返回值格式规范化
|
||
- [x] 1.1 修改 DwdLoadTask.load() 返回值格式
|
||
- 在 `apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 的 `load()` 方法中,将 `return {"tables": summary, "errors": errors}` 改为 `return {"tables": summary, "errors": len(errors), "error_details": errors}`
|
||
- _需求: 1.1_
|
||
|
||
- [x] 1.2 增强 BaseTask._accumulate_counts() 防御层
|
||
- 在 `apps/etl/connectors/feiqiu/tasks/base_task.py` 的 `_accumulate_counts()` 方法中,增加 `isinstance(value, list)` 分支,将 list 转为 `len()` 后累加
|
||
- _需求: 1.2_
|
||
|
||
- [x] 1.3 编写属性测试:DwdLoadTask 返回值格式一致性
|
||
- **Property 1: DwdLoadTask 返回值格式一致性**
|
||
- **验证: 需求 1.1**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_return_format_properties.py`
|
||
|
||
- [x] 1.4 编写属性测试:_accumulate_counts 类型安全累加
|
||
- **Property 2: _accumulate_counts 类型安全累加**
|
||
- **验证: 需求 1.2**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_return_format_properties.py`
|
||
|
||
- [x] 2. 检查点 — 需求 D 完成
|
||
- 确保所有测试通过,ask the user if questions arise.
|
||
|
||
- [x] 3. 需求 C1:会员生日字段 ETL 链路补齐
|
||
- [x] 3.1 编写迁移脚本:ODS/DWD 加 birthday 列
|
||
- 创建 `db/etl_feiqiu/migrations/2026-02-22__C1_dim_member_add_birthday.sql`
|
||
- ODS: `ALTER TABLE ods.member_profiles ADD COLUMN IF NOT EXISTS birthday DATE;`
|
||
- DWD: `ALTER TABLE dwd.dim_member ADD COLUMN IF NOT EXISTS birthday DATE;`
|
||
- 包含回滚语句和验证 SQL
|
||
- _需求: 4.1_
|
||
|
||
- [-] 3.1a 在测试库执行迁移脚本 C1
|
||
- 在 `test_etl_feiqiu` 上执行 `2026-02-22__C1_dim_member_add_birthday.sql`
|
||
- 执行验证 SQL 确认列已添加
|
||
- _需求: 4.1_
|
||
|
||
- [x] 3.2 更新 ODS 入库逻辑:提取 birthday 字段
|
||
- 在 ODS 入库任务中增加 `birthday` 字段的 JSON 提取映射
|
||
- 确认 `ods_tasks.py` 中 `member_profiles` 的字段列表包含 `birthday`
|
||
- _需求: 4.2_
|
||
|
||
- [x] 3.3 验证 DwdLoadTask 自动列映射包含 birthday
|
||
- DwdLoadTask 通过 `_get_columns()` 自动读取 DWD 表列名,确认 `birthday` 被自动包含在列映射中
|
||
- SCD2 变化检测自动包含所有非 SCD2 元数据列,确认 `birthday` 参与变化检测
|
||
- _需求: 4.2, 4.3_
|
||
|
||
- [x] 3.4 恢复 DWS 任务中的 birthday 引用
|
||
- 修改 `member_visit_task.py` 的 `_extract_member_info()` SQL,加入 `birthday` 字段
|
||
- 修改 `member_consumption_task.py` 的 `_extract_member_info()` SQL,加入 `birthday` 字段
|
||
- 修改 DWS 任务的 `transform()` 方法,将 `member_birthday` 写入输出记录
|
||
- _需求: 4.4_
|
||
|
||
- [x] 3.5 编写属性测试:birthday ODS→DWD 装载正确性
|
||
- **Property 7: birthday ODS→DWD 装载正确性**
|
||
- **验证: 需求 4.2**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py`
|
||
|
||
- [x] 3.6 编写属性测试:birthday SCD2 变化检测
|
||
- **Property 8: birthday SCD2 变化检测**
|
||
- **验证: 需求 4.3**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py`
|
||
|
||
- [x] 4. 检查点 — 需求 C1 完成
|
||
- 确保所有测试通过,ask the user if questions arise.
|
||
|
||
- [x] 5. 需求 B:多门店会员查询支持
|
||
- [x] 5.1 修改 member_visit_task.py 的 _extract_member_info()
|
||
- 将 `WHERE register_site_id = %s` 改为通过 `dwd_settlement_head` 事实表的 `tenant_member_id` 反查
|
||
- _需求: 3.1, 3.2_
|
||
|
||
- [x] 5.2 修改 member_consumption_task.py 的 _extract_member_info()
|
||
- 将 `WHERE register_site_id = %s` 改为通过 `dwd_settlement_head` 事实表的 `tenant_member_id` 反查
|
||
- 同时修改 `dim_member_card_account` 查询,改为通过事实表反查
|
||
- _需求: 3.1, 3.2_
|
||
|
||
- [x] 5.3 修改 assistant_customer_task.py 的 _extract_member_info()
|
||
- 将 `WHERE register_site_id = %s` 改为通过 `dwd_assistant_service_log` 事实表的 `tenant_member_id` 反查
|
||
- _需求: 3.1, 3.2_
|
||
|
||
- [x] 5.4 修改 finance_recharge_task.py 的 dim_member_card_account 查询
|
||
- 将 `WHERE register_site_id = %s` 改为通过事实表反查
|
||
- _需求: 3.1_
|
||
|
||
- [x] 5.5 编写属性测试:跨店会员可查
|
||
- **Property 6: 跨店会员可查**
|
||
- **验证: 需求 3.1, 3.2**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_multi_store_properties.py`
|
||
|
||
- [x] 6. 检查点 — 需求 B 完成
|
||
- 确保所有测试通过,ask the user if questions arise.
|
||
|
||
- [x] 7. 需求 A:助教月度聚合按档位分段统计
|
||
- [x] 7.1 编写迁移脚本:唯一约束变更
|
||
- 创建 `db/etl_feiqiu/migrations/2026-02-22__A_monthly_summary_uk_change.sql`
|
||
- DROP 旧约束 `uk_dws_assistant_monthly`,ADD 新约束 `(site_id, assistant_id, stat_month, assistant_level_code)`
|
||
- 包含回滚语句和验证 SQL
|
||
- _需求: 2.2_
|
||
|
||
- [x] 7.1a 在测试库执行迁移脚本 A
|
||
- 在 `test_etl_feiqiu` 上执行 `2026-02-22__A_monthly_summary_uk_change.sql`
|
||
- 执行验证 SQL 确认约束已变更(`SELECT conname FROM pg_constraint ...`)
|
||
- _需求: 2.2_
|
||
|
||
- [x] 7.2 修改 AssistantMonthlyTask._extract_daily_aggregates()
|
||
- GROUP BY 加入 `assistant_level_code, assistant_level_name`
|
||
- nickname 改为 `(ARRAY_AGG(assistant_nickname ORDER BY stat_date DESC))[1]`
|
||
- _需求: 2.1, 2.3_
|
||
|
||
- [x] 7.3 修改 AssistantMonthlyTask._process_month() 适配多行
|
||
- 确认 `_process_month()` 能正确处理同一助教多个档位的聚合数据
|
||
- 每行使用自己的 `assistant_level_code` 进行档位匹配和排名计算
|
||
- _需求: 2.1_
|
||
|
||
- [x] 7.4 修改 AssistantSalaryTask 适配档位分段工资计算
|
||
- `_extract_monthly_summary()` 已能返回多行(同一助教不同档位)
|
||
- `transform()` 遍历每行分别计算工资,按档位使用对应的 `level_price` 和 `tier`
|
||
- _需求: 2.4_
|
||
|
||
- [x] 7.5 修改 AssistantFinanceTask._extract_daily_revenue() nickname 取值
|
||
- 将 `MAX(s.nickname)` 改为 `(ARRAY_AGG(s.nickname ORDER BY s.start_use_time DESC))[1]`
|
||
- _需求: 2.5_
|
||
|
||
- [x] 7.6 修改 AssistantCustomerTask._extract_service_pairs() nickname 取值
|
||
- 将 `MAX(assistant_nickname)` 改为 `(ARRAY_AGG(assistant_nickname ORDER BY service_date DESC))[1]`
|
||
- _需求: 2.6_
|
||
|
||
- [x] 7.7 编写属性测试:档位分段聚合正确性
|
||
- **Property 3: 档位分段聚合正确性**
|
||
- **验证: 需求 2.1**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py`
|
||
|
||
- [x] 7.8 编写属性测试:nickname 按时间倒序取值
|
||
- **Property 4: nickname 按时间倒序取值**
|
||
- **验证: 需求 2.3, 2.5, 2.6**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py`
|
||
|
||
- [x] 7.9 编写属性测试:工资按档位分段计算
|
||
- **Property 5: 工资按档位分段计算**
|
||
- **验证: 需求 2.4**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py`
|
||
|
||
- [x] 8. 检查点 — 需求 A 完成
|
||
- 确保所有测试通过,ask the user if questions arise.
|
||
|
||
- [x] 9. 需求 C2:助教手动补录会员生日
|
||
- [x] 9.1 编写迁移脚本:创建 member_birthday_manual 表
|
||
- 创建 `db/zqyy_app/migrations/2026-02-22__C2_member_birthday_manual.sql`
|
||
- 在 `zqyy_app` / `test_zqyy_app` 中创建 `member_birthday_manual` 表
|
||
- 包含回滚语句和验证 SQL
|
||
- _需求: 5.1_
|
||
|
||
- [x] 9.1a 在测试库执行迁移脚本 C2
|
||
- 在 `test_zqyy_app` 上执行 `2026-02-22__C2_member_birthday_manual.sql`
|
||
- 执行验证 SQL 确认表和约束已创建
|
||
- _需求: 5.1_
|
||
|
||
- [x] 9.2 编写 FDW 映射脚本:ETL 库读取业务库
|
||
- 创建 `db/fdw/setup_fdw_reverse.sql`(etl_feiqiu → zqyy_app 方向)
|
||
- 创建 `db/fdw/setup_fdw_reverse_test.sql`(test 环境版本)
|
||
- 在 ETL 库中创建 `fdw_app.member_birthday_manual` 外部表
|
||
- _需求: 5.3_
|
||
|
||
- [x] 9.2a 在测试库执行 FDW 映射脚本
|
||
- 在 `test_etl_feiqiu` 上执行 `setup_fdw_reverse_test.sql`
|
||
- 验证 `fdw_app.member_birthday_manual` 外部表可读取
|
||
- _需求: 5.3_
|
||
|
||
- [x] 9.3 修改 DWS 任务:生日读取优先级逻辑
|
||
- 在 `member_visit_task.py` 和 `member_consumption_task.py` 的 `_extract_member_info()` 中,使用 `COALESCE(fdw_app.member_birthday_manual, dim_member.birthday)` 逻辑
|
||
- 增加 FDW 连接失败的降级处理
|
||
- _需求: 5.4_
|
||
|
||
- [x] 9.4 实现后端 API:生日提交接口
|
||
- 创建 `apps/backend/app/routers/member_birthday.py`
|
||
- 实现 `POST /member-birthday` 接口,执行 UPSERT
|
||
- 创建 Pydantic schema `apps/backend/app/schemas/member_birthday.py`
|
||
- 在 `apps/backend/app/main.py` 中注册路由
|
||
- _需求: 5.5_
|
||
|
||
- [x] 9.5 编写属性测试:生日 UPSERT 幂等性
|
||
- **Property 9: 生日 UPSERT 幂等性**
|
||
- **验证: 需求 5.2, 5.5**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py`
|
||
|
||
- [x] 9.6 编写属性测试:手动补录优先于 API 来源
|
||
- **Property 10: 手动补录优先于 API 来源**
|
||
- **验证: 需求 5.4**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py`
|
||
|
||
- [x] 9.7 编写属性测试:SCD2 更新不影响手动补录表
|
||
- **Property 11: SCD2 更新不影响手动补录表**
|
||
- **验证: 需求 5.6**
|
||
- 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py`
|
||
|
||
- [x] 10. 收尾:主 DDL 合并与文档更新
|
||
- [x] 10.1 合并迁移变更到主 DDL 文件
|
||
- 将 `birthday` 列定义合并到 `db/etl_feiqiu/schemas/ods.sql`(`member_profiles` 表)
|
||
- 将 `birthday` 列定义合并到 `db/etl_feiqiu/schemas/dwd.sql`(`dim_member` 表)
|
||
- 将新唯一约束合并到 `db/etl_feiqiu/schemas/dws.sql`(`dws_assistant_monthly_summary` 表)
|
||
- 将 `member_birthday_manual` 表定义合并到 `db/zqyy_app/schemas/init.sql`
|
||
- 将 FDW 反向映射合并到 `db/fdw/setup_fdw.sql` 和 `db/fdw/setup_fdw_test.sql`
|
||
|
||
- [x] 10.2 更新数据库文档
|
||
- 在 `docs/database/` 中新建或更新受影响表的文档:
|
||
- `dim_member`:新增 `birthday` 列说明
|
||
- `dws_assistant_monthly_summary`:唯一约束变更说明
|
||
- `member_birthday_manual`:新建表文档(含 FDW 映射说明)
|
||
- 遵循现有 `BD_Manual_*.md` 命名规范
|
||
|
||
- [x] 10.3 最终验证
|
||
- 确保所有测试通过
|
||
- 确认主 DDL 文件与测试库实际结构一致
|
||
- ask the user if questions arise
|
||
|
||
## 备注
|
||
|
||
- 标记 `*` 的任务为可选测试任务,可跳过以加速 MVP
|
||
- 每个需求独立可部署,检查点确保增量验证
|
||
- 迁移脚本需在 `test_etl_feiqiu` / `test_zqyy_app` 上先行验证
|
||
- 属性测试使用 hypothesis,每个属性最少 100 次迭代
|
||
- 单元测试使用 FakeDB/FakeAPI,不依赖真实数据库
|