Files

191 lines
8.9 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 实施计划ETL DWS/Flow 重构
## 概述
按 4 个阶段顺序实施BaseDwsTask 模板方法重构 → --layers CLI 参数 → 任务依赖声明 → 关键词/路径重命名。每个阶段完成后运行回归测试。
## 任务
- [x] 1. BaseDwsTask 默认模板方法
- [x] 1.1 在 BaseDwsTask 中添加 DATE_COL 类属性和默认 extract()/load() 实现
- 添加 `DATE_COL: str | None = None` 类属性
- 添加 `_do_extract(self, context) -> list[dict]` 抽象方法raise NotImplementedError
- 实现默认 `extract()`:调用 `_do_extract()` 并包装为标准字典
- 实现默认 `load()`delete_existing_data + bulk_insert返回标准统计字典
- _Requirements: 1.1, 1.2, 1.4, 1.5_
- [x] 1.2 编写 BaseDwsTask 默认模板方法的属性测试
- **Property 1: 默认 extract() 返回标准结构**
- **Validates: Requirements 1.1, 1.4**
- **Property 2: 默认 load() 幂等写入与标准统计**
- **Validates: Requirements 1.2, 1.5**
- [x] 1.3 迁移 DWS 子类使用默认模板方法
- 为每个 DWS 子类声明 DATE_COL
- 将各子类的 extract() 逻辑迁移到 _do_extract()
- 移除与默认 load() 行为一致的子类 load() 覆盖
- 保留有自定义逻辑的子类覆盖(如 AssistantSalaryTask 的月度删除)
- 涉及文件assistant_daily_task.py, assistant_monthly_task.py, assistant_customer_task.py, assistant_finance_task.py, member_consumption_task.py, member_visit_task.py, finance_daily_task.py, finance_recharge_task.py, finance_income_task.py, finance_discount_task.py
- _Requirements: 1.1, 1.2, 1.3_
- [x] 1.4 运行现有 DWS 单元测试确认无回归
- `cd apps/etl/pipelines/feiqiu && pytest tests/unit/test_dws_tasks.py -v`
- _Requirements: 11.1_
- [x] 2. 公共辅助方法提取与财务基类
- [x] 2.1 创建 dws_helpers.py 公共辅助模块
- 创建 `tasks/dws/dws_helpers.py`
- 提取 mask_mobile()、calc_days_since()、parse_id_list()、safe_division() 等函数
- 更新各 DWS 子类的导入,替换内联实现为 dws_helpers 调用
- _Requirements: 2.1, 2.2_
- [x] 2.2 编写 dws_helpers 函数等价性属性测试
- **Property 3: dws_helpers 函数等价性**
- **Validates: Requirements 2.3**
- [x] 2.3 创建 FinanceBaseTask 共享提取层
- 创建 `tasks/dws/finance_base_task.py`
- 从 FinanceDailyTask 提取共享方法_extract_settlement_summary, _extract_recharge_summary, _extract_groupbuy_summary, _extract_platform_summary
- 迁移 FinanceDailyTask, FinanceRechargeTask, FinanceIncomeStructureTask, FinanceDiscountDetailTask 继承 FinanceBaseTask
- _Requirements: 3.1, 3.2, 3.3_
- [x] 3. MV 刷新与数据清理合并 + MemberIndexBaseTask 模板
- [x] 3.1 创建 DwsMaintenanceTask 合并任务
- 创建 `tasks/dws/maintenance_task.py`
- 合并 BaseMvRefreshTask 和 DwsRetentionCleanupTask 的核心逻辑
- 在 TaskRegistry 中注册 DWS_MAINTENANCE移除原三个任务注册
- 更新 `tasks/dws/__init__.py` 导出
- _Requirements: 4.1, 4.2, 4.3, 4.4, 4.5_
- [x] 3.2 编写 DwsMaintenanceTask 属性测试和单元测试
- **Property 4: DwsMaintenanceTask 配置控制**
- **Validates: Requirements 4.3, 4.4**
- 单元测试:执行顺序(先刷新后清理)、注册项替换
- [x] 3.3 重构 MemberIndexBaseTask 模板方法
- 在 MemberIndexBaseTask 中实现 execute() 模板方法
- 添加 _calculate_scores() 和 _save_results() 抽象方法
- 迁移 WinbackIndexTask 和 NewconvIndexTask 使用新模板
- _Requirements: 5.1, 5.2, 5.3, 5.4_
- [x] 4. 检查点 - 阶段 1 回归测试
- 运行 `cd apps/etl/pipelines/feiqiu && pytest tests/unit -v` 确保所有测试通过
- 确保所有测试通过,如有问题请询问用户
- [x] 5. --layers CLI 参数与统一层解析
- [x] 5.1 实现 --layers CLI 参数
- 在 cli/main.py 中添加 `--layers` 参数
- 实现 parse_layers() 函数:解析逗号分隔的层名,校验合法性
- 添加 --layers 和 --pipeline 互斥校验
- 更新 main() 函数:当指定 --layers 时,构造层列表传递给 PipelineRunner
- _Requirements: 6.1, 6.2, 6.3, 6.4_
- [x] 5.2 编写 --layers 解析属性测试
- **Property 5: --layers 解析正确性**
- **Validates: Requirements 6.1, 6.2**
- [x] 5.3 统一 _resolve_tasks() 去掉硬编码回退
- 移除 _resolve_tasks() 中所有硬编码回退列表
- 统一走 TaskRegistry.get_tasks_by_layer() 获取任务
- 保留配置优先级run.ods_tasks / run.dws_tasks / run.index_tasks > Registry
- 空 Registry + 无配置时记录警告并返回空列表
- _Requirements: 7.1, 7.2, 7.3_
- [x] 5.4 编写配置优先级属性测试
- **Property 6: 配置优先级——配置值优先于 Registry**
- **Validates: Requirements 7.2**
- [x] 5.5 实现 DWS/INDEX 层轻量级校验
- 当 --layers 包含 DWS 或 INDEX 时,跳过完整性校验或仅执行行数校验
- _Requirements: 6.5_
- [x] 6. 任务依赖声明与拓扑排序
- [x] 6.1 扩展 TaskMeta 添加 depends_on 字段
- 在 TaskMeta 数据类中添加 `depends_on: list[str] = field(default_factory=list)`
- 更新 TaskRegistry.register() 接受 depends_on 参数
- 为已知依赖关系添加声明DWS_ASSISTANT_FINANCE → DWS_ASSISTANT_SALARY 等)
- _Requirements: 8.1, 8.2_
- [x] 6.2 实现拓扑排序函数
- 创建 `orchestration/topological_sort.py`
- 实现 Kahn's algorithm 拓扑排序
- 处理循环依赖检测(抛出 ValueError
- 处理缺失依赖警告(记录日志继续执行)
- 在 PipelineRunner._resolve_tasks() 中集成拓扑排序
- _Requirements: 8.3, 8.4, 8.5_
- [x] 6.3 编写拓扑排序属性测试
- **Property 7: 拓扑排序正确性**
- **Validates: Requirements 8.3**
- **Property 8: 循环依赖检测**
- **Validates: Requirements 8.4**
- [x] 7. 检查点 - 阶段 2+3 回归测试
- 运行 `cd apps/etl/pipelines/feiqiu && pytest tests/unit -v` 确保所有测试通过
- 运行 `cd C:\NeoZQYY && pytest tests/ -v` 确保 Monorepo 属性测试通过
- 确保所有测试通过,如有问题请询问用户
- [x] 8. 关键词重命名 pipeline → flow
- [x] 8.1 重命名 PipelineRunner → FlowRunner
-`orchestration/pipeline_runner.py` 重命名为 `orchestration/flow_runner.py`
- 类名 PipelineRunner → FlowRunner
- 常量 PIPELINE_LAYERS → FLOW_LAYERS
- 更新所有导入引用task_executor.py, cli/main.py, scheduler.py 等)
- _Requirements: 9.1, 9.2, 9.5_
- [x] 8.2 更新 CLI 参数 --pipeline → --flow
-`--pipeline` 重命名为 `--flow`
- 保留 `--pipeline` 作为已弃用别名(使用 argparse dest 映射)
- 使用已弃用参数时输出 DeprecationWarning
- 更新 --layers 互斥校验同时检查 --flow 和 --pipeline
- _Requirements: 9.3, 9.4_
- [x] 8.3 更新所有日志消息和注释中的 pipeline 术语
- 全局搜索替换日志中的 "Pipeline" / "pipeline" → "Flow" / "flow"
- 更新代码注释中的术语
- _Requirements: 9.6_
- [x] 9. 路径重命名 pipelines → connectors
- [x] 9.1 重命名目录 apps/etl/pipelines → apps/etl/connectors
- 执行目录重命名
- 更新 pyproject.toml workspace 成员声明
- 更新所有 Python 导入路径
- _Requirements: 10.1, 10.2, 10.4_
- [x] 9.2 更新所有配置文件、脚本和文档中的路径引用
- 更新 .env / .env.template 中的路径
- 更新 run_etl.bat / run_etl.sh 中的路径
- 更新 scripts/ 目录下引用旧路径的脚本
- _Requirements: 10.3_
- [x] 9.3 运行全量测试确认路径重命名无回归
- `cd apps/etl/connectors/feiqiu && pytest tests/unit -v`
- `cd C:\NeoZQYY && pytest tests/ -v`
- _Requirements: 10.5_
- [x] 10. 文档同步更新
- [x] 10.1 更新架构文档和模块文档
- 更新 `docs/etl-feiqiu-architecture.md`:类名、方法名、术语、路径
- 更新 `apps/etl/connectors/feiqiu/docs/` 下所有文档
- 更新 `tasks/README.md`:任务列表和继承关系
- _Requirements: 12.1, 12.2, 12.4_
- [x] 10.2 更新 CLI 帮助文本和示例
- 更新 cli/main.py 中的 epilog 示例
- 更新 argparse 帮助文本反映 --layers 和 --flow 参数
- _Requirements: 12.3_
- [x] 11. 最终检查点 - 全量回归测试
- 运行 `cd apps/etl/connectors/feiqiu && pytest tests/unit -v`
- 运行 `cd C:\NeoZQYY && pytest tests/ -v`
- 确保所有测试通过,如有问题请询问用户
- _Requirements: 11.1, 11.2, 11.3, 11.4, 11.5_
## 备注
- 标记 `*` 的子任务为可选,可跳过以加速 MVP
- 每个任务引用具体需求编号以确保可追溯性
- 检查点确保增量验证
- 属性测试验证普遍正确性属性,单元测试验证具体示例和边界条件
- 阶段 4关键词/路径重命名)风险最高,建议在独立 Git 分支上执行