# 实现计划:ETL 聚合修复与生日字段补齐 ## 概述 按优先级(D → C1 → B → A → C2)逐步实施,每个需求独立可部署。代码变更集中在 ETL Connector 的 tasks 层和后端 API,DDL 变更通过迁移脚本执行。 ## 任务 - [x] 1. 需求 D:DwdLoadTask 返回值格式规范化 - [x] 1.1 修改 DwdLoadTask.load() 返回值格式 - 在 `apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py` 的 `load()` 方法中,将 `return {"tables": summary, "errors": errors}` 改为 `return {"tables": summary, "errors": len(errors), "error_details": errors}` - _需求: 1.1_ - [x] 1.2 增强 BaseTask._accumulate_counts() 防御层 - 在 `apps/etl/connectors/feiqiu/tasks/base_task.py` 的 `_accumulate_counts()` 方法中,增加 `isinstance(value, list)` 分支,将 list 转为 `len()` 后累加 - _需求: 1.2_ - [x] 1.3 编写属性测试:DwdLoadTask 返回值格式一致性 - **Property 1: DwdLoadTask 返回值格式一致性** - **验证: 需求 1.1** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_return_format_properties.py` - [x] 1.4 编写属性测试:_accumulate_counts 类型安全累加 - **Property 2: _accumulate_counts 类型安全累加** - **验证: 需求 1.2** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_return_format_properties.py` - [x] 2. 检查点 — 需求 D 完成 - 确保所有测试通过,ask the user if questions arise. - [x] 3. 需求 C1:会员生日字段 ETL 链路补齐 - [x] 3.1 编写迁移脚本:ODS/DWD 加 birthday 列 - 创建 `db/etl_feiqiu/migrations/2026-02-22__C1_dim_member_add_birthday.sql` - ODS: `ALTER TABLE ods.member_profiles ADD COLUMN IF NOT EXISTS birthday DATE;` - DWD: `ALTER TABLE dwd.dim_member ADD COLUMN IF NOT EXISTS birthday DATE;` - 包含回滚语句和验证 SQL - _需求: 4.1_ - [-] 3.1a 在测试库执行迁移脚本 C1 - 在 `test_etl_feiqiu` 上执行 `2026-02-22__C1_dim_member_add_birthday.sql` - 执行验证 SQL 确认列已添加 - _需求: 4.1_ - [x] 3.2 更新 ODS 入库逻辑:提取 birthday 字段 - 在 ODS 入库任务中增加 `birthday` 字段的 JSON 提取映射 - 确认 `ods_tasks.py` 中 `member_profiles` 的字段列表包含 `birthday` - _需求: 4.2_ - [x] 3.3 验证 DwdLoadTask 自动列映射包含 birthday - DwdLoadTask 通过 `_get_columns()` 自动读取 DWD 表列名,确认 `birthday` 被自动包含在列映射中 - SCD2 变化检测自动包含所有非 SCD2 元数据列,确认 `birthday` 参与变化检测 - _需求: 4.2, 4.3_ - [x] 3.4 恢复 DWS 任务中的 birthday 引用 - 修改 `member_visit_task.py` 的 `_extract_member_info()` SQL,加入 `birthday` 字段 - 修改 `member_consumption_task.py` 的 `_extract_member_info()` SQL,加入 `birthday` 字段 - 修改 DWS 任务的 `transform()` 方法,将 `member_birthday` 写入输出记录 - _需求: 4.4_ - [x] 3.5 编写属性测试:birthday ODS→DWD 装载正确性 - **Property 7: birthday ODS→DWD 装载正确性** - **验证: 需求 4.2** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py` - [x] 3.6 编写属性测试:birthday SCD2 变化检测 - **Property 8: birthday SCD2 变化检测** - **验证: 需求 4.3** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py` - [x] 4. 检查点 — 需求 C1 完成 - 确保所有测试通过,ask the user if questions arise. - [x] 5. 需求 B:多门店会员查询支持 - [x] 5.1 修改 member_visit_task.py 的 _extract_member_info() - 将 `WHERE register_site_id = %s` 改为通过 `dwd_settlement_head` 事实表的 `tenant_member_id` 反查 - _需求: 3.1, 3.2_ - [x] 5.2 修改 member_consumption_task.py 的 _extract_member_info() - 将 `WHERE register_site_id = %s` 改为通过 `dwd_settlement_head` 事实表的 `tenant_member_id` 反查 - 同时修改 `dim_member_card_account` 查询,改为通过事实表反查 - _需求: 3.1, 3.2_ - [x] 5.3 修改 assistant_customer_task.py 的 _extract_member_info() - 将 `WHERE register_site_id = %s` 改为通过 `dwd_assistant_service_log` 事实表的 `tenant_member_id` 反查 - _需求: 3.1, 3.2_ - [x] 5.4 修改 finance_recharge_task.py 的 dim_member_card_account 查询 - 将 `WHERE register_site_id = %s` 改为通过事实表反查 - _需求: 3.1_ - [x] 5.5 编写属性测试:跨店会员可查 - **Property 6: 跨店会员可查** - **验证: 需求 3.1, 3.2** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_multi_store_properties.py` - [x] 6. 检查点 — 需求 B 完成 - 确保所有测试通过,ask the user if questions arise. - [x] 7. 需求 A:助教月度聚合按档位分段统计 - [x] 7.1 编写迁移脚本:唯一约束变更 - 创建 `db/etl_feiqiu/migrations/2026-02-22__A_monthly_summary_uk_change.sql` - DROP 旧约束 `uk_dws_assistant_monthly`,ADD 新约束 `(site_id, assistant_id, stat_month, assistant_level_code)` - 包含回滚语句和验证 SQL - _需求: 2.2_ - [x] 7.1a 在测试库执行迁移脚本 A - 在 `test_etl_feiqiu` 上执行 `2026-02-22__A_monthly_summary_uk_change.sql` - 执行验证 SQL 确认约束已变更(`SELECT conname FROM pg_constraint ...`) - _需求: 2.2_ - [x] 7.2 修改 AssistantMonthlyTask._extract_daily_aggregates() - GROUP BY 加入 `assistant_level_code, assistant_level_name` - nickname 改为 `(ARRAY_AGG(assistant_nickname ORDER BY stat_date DESC))[1]` - _需求: 2.1, 2.3_ - [x] 7.3 修改 AssistantMonthlyTask._process_month() 适配多行 - 确认 `_process_month()` 能正确处理同一助教多个档位的聚合数据 - 每行使用自己的 `assistant_level_code` 进行档位匹配和排名计算 - _需求: 2.1_ - [x] 7.4 修改 AssistantSalaryTask 适配档位分段工资计算 - `_extract_monthly_summary()` 已能返回多行(同一助教不同档位) - `transform()` 遍历每行分别计算工资,按档位使用对应的 `level_price` 和 `tier` - _需求: 2.4_ - [x] 7.5 修改 AssistantFinanceTask._extract_daily_revenue() nickname 取值 - 将 `MAX(s.nickname)` 改为 `(ARRAY_AGG(s.nickname ORDER BY s.start_use_time DESC))[1]` - _需求: 2.5_ - [x] 7.6 修改 AssistantCustomerTask._extract_service_pairs() nickname 取值 - 将 `MAX(assistant_nickname)` 改为 `(ARRAY_AGG(assistant_nickname ORDER BY service_date DESC))[1]` - _需求: 2.6_ - [x] 7.7 编写属性测试:档位分段聚合正确性 - **Property 3: 档位分段聚合正确性** - **验证: 需求 2.1** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py` - [x] 7.8 编写属性测试:nickname 按时间倒序取值 - **Property 4: nickname 按时间倒序取值** - **验证: 需求 2.3, 2.5, 2.6** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py` - [x] 7.9 编写属性测试:工资按档位分段计算 - **Property 5: 工资按档位分段计算** - **验证: 需求 2.4** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py` - [x] 8. 检查点 — 需求 A 完成 - 确保所有测试通过,ask the user if questions arise. - [x] 9. 需求 C2:助教手动补录会员生日 - [x] 9.1 编写迁移脚本:创建 member_birthday_manual 表 - 创建 `db/zqyy_app/migrations/2026-02-22__C2_member_birthday_manual.sql` - 在 `zqyy_app` / `test_zqyy_app` 中创建 `member_birthday_manual` 表 - 包含回滚语句和验证 SQL - _需求: 5.1_ - [x] 9.1a 在测试库执行迁移脚本 C2 - 在 `test_zqyy_app` 上执行 `2026-02-22__C2_member_birthday_manual.sql` - 执行验证 SQL 确认表和约束已创建 - _需求: 5.1_ - [x] 9.2 编写 FDW 映射脚本:ETL 库读取业务库 - 创建 `db/fdw/setup_fdw_reverse.sql`(etl_feiqiu → zqyy_app 方向) - 创建 `db/fdw/setup_fdw_reverse_test.sql`(test 环境版本) - 在 ETL 库中创建 `fdw_app.member_birthday_manual` 外部表 - _需求: 5.3_ - [x] 9.2a 在测试库执行 FDW 映射脚本 - 在 `test_etl_feiqiu` 上执行 `setup_fdw_reverse_test.sql` - 验证 `fdw_app.member_birthday_manual` 外部表可读取 - _需求: 5.3_ - [x] 9.3 修改 DWS 任务:生日读取优先级逻辑 - 在 `member_visit_task.py` 和 `member_consumption_task.py` 的 `_extract_member_info()` 中,使用 `COALESCE(fdw_app.member_birthday_manual, dim_member.birthday)` 逻辑 - 增加 FDW 连接失败的降级处理 - _需求: 5.4_ - [x] 9.4 实现后端 API:生日提交接口 - 创建 `apps/backend/app/routers/member_birthday.py` - 实现 `POST /member-birthday` 接口,执行 UPSERT - 创建 Pydantic schema `apps/backend/app/schemas/member_birthday.py` - 在 `apps/backend/app/main.py` 中注册路由 - _需求: 5.5_ - [x] 9.5 编写属性测试:生日 UPSERT 幂等性 - **Property 9: 生日 UPSERT 幂等性** - **验证: 需求 5.2, 5.5** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py` - [x] 9.6 编写属性测试:手动补录优先于 API 来源 - **Property 10: 手动补录优先于 API 来源** - **验证: 需求 5.4** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py` - [x] 9.7 编写属性测试:SCD2 更新不影响手动补录表 - **Property 11: SCD2 更新不影响手动补录表** - **验证: 需求 5.6** - 文件:`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py` - [x] 10. 收尾:主 DDL 合并与文档更新 - [x] 10.1 合并迁移变更到主 DDL 文件 - 将 `birthday` 列定义合并到 `db/etl_feiqiu/schemas/ods.sql`(`member_profiles` 表) - 将 `birthday` 列定义合并到 `db/etl_feiqiu/schemas/dwd.sql`(`dim_member` 表) - 将新唯一约束合并到 `db/etl_feiqiu/schemas/dws.sql`(`dws_assistant_monthly_summary` 表) - 将 `member_birthday_manual` 表定义合并到 `db/zqyy_app/schemas/init.sql` - 将 FDW 反向映射合并到 `db/fdw/setup_fdw.sql` 和 `db/fdw/setup_fdw_test.sql` - [x] 10.2 更新数据库文档 - 在 `docs/database/` 中新建或更新受影响表的文档: - `dim_member`:新增 `birthday` 列说明 - `dws_assistant_monthly_summary`:唯一约束变更说明 - `member_birthday_manual`:新建表文档(含 FDW 映射说明) - 遵循现有 `BD_Manual_*.md` 命名规范 - [x] 10.3 最终验证 - 确保所有测试通过 - 确认主 DDL 文件与测试库实际结构一致 - ask the user if questions arise ## 备注 - 标记 `*` 的任务为可选测试任务,可跳过以加速 MVP - 每个需求独立可部署,检查点确保增量验证 - 迁移脚本需在 `test_etl_feiqiu` / `test_zqyy_app` 上先行验证 - 属性测试使用 hypothesis,每个属性最少 100 次迭代 - 单元测试使用 FakeDB/FakeAPI,不依赖真实数据库