# 实现计划:ETL 调度器重构 ## 概述 将 `ETLScheduler`(~900 行)拆分为 TaskExecutor(执行层)、PipelineRunner(编排层)、增强版 TaskRegistry(元数据),重构 CLI 参数和配置键,保持向后兼容。采用自底向上的实现顺序:先基础组件,再上层编排,最后 CLI 集成。 ## 任务 - [x] 1. 增强 TaskRegistry,支持元数据注册与查询 - [x] 1.1 扩展 TaskRegistry 类,添加 TaskMeta 数据类和元数据相关方法 - 在 `orchestration/task_registry.py` 中添加 `TaskMeta` dataclass(`task_class`、`requires_db_config`、`layer`、`task_type`) - 修改 `register()` 方法签名,增加可选的 `requires_db_config`、`layer`、`task_type` 参数 - 添加 `get_metadata()`、`get_tasks_by_layer()`、`is_utility_task()` 方法 - 保持 `create_task()` 和 `get_all_task_codes()` 接口不变 - _需求: 4.1, 4.4_ - [x] 1.2 更新所有任务注册调用,添加元数据 - 将原 `NO_DB_CONFIG_TASKS` 硬编码集合中的任务标记为 `requires_db_config=False` - 为 ODS 任务添加 `layer="ODS"`,DWD 任务添加 `layer="DWD"`,DWS 任务添加 `layer="DWS"`,INDEX 任务添加 `layer="INDEX"` - 工具类任务标记 `task_type="utility"`,校验类任务标记 `task_type="verification"` - _需求: 4.1, 4.2, 4.3_ - [x] 1.3 编写 TaskRegistry 属性测试 - **Property 8: TaskRegistry 元数据 round-trip** - **验证: 需求 4.1** - [x] 1.4 编写 TaskRegistry 向后兼容和按层查询属性测试 - **Property 9: TaskRegistry 向后兼容默认值** - **Property 10: 按层查询任务** - **验证: 需求 4.4, 4.3** - [x] 2. 配置键重构与向后兼容 - [x] 2.1 修改 `config/defaults.py` 默认值 - 将 `app.timezone` 默认值从 `Asia/Taipei` 改为 `Asia/Shanghai` - 将 `db.session.timezone` 默认值从 `Asia/Taipei` 改为 `Asia/Shanghai` - 添加 `run.data_source` 键(默认 `hybrid`) - 将 `pipeline.fetch_root` 和 `pipeline.ingest_source_dir` 复制到 `io.fetch_root` 和 `io.ingest_source_dir`(保留旧键兼容) - _需求: 5.1, 5.2, 5.4_ - [x] 2.2 在 `config/settings.py` 的 `_normalize()` 中添加兼容映射逻辑 - 旧键 `pipeline.flow` → 新键 `run.data_source`(值映射:FULL→hybrid, FETCH_ONLY→online, INGEST_ONLY→offline) - 旧键 `pipeline.fetch_root` → `io.fetch_root`,`pipeline.ingest_source_dir` → `io.ingest_source_dir` - 新键优先:当新旧键同时存在时,使用新键的值 - 记录弃用警告日志 - _需求: 5.2, 5.3, 5.4, 8.4, 8.5_ - [x] 2.3 编写配置映射属性测试 - **Property 11: pipeline_flow → data_source 映射一致性** - **验证: 需求 8.1, 8.2, 8.3, 5.2, 8.4** - [x] 3. 静态方法归位 - [x] 3.1 将 `_map_run_status` 移至 RunTracker - 在 `orchestration/run_tracker.py` 中添加 `map_run_status()` 静态方法(从 `ETLScheduler._map_run_status` 复制) - _需求: 7.1_ - [x] 3.2 将 `_filter_verify_tables` 移至校验模块 - 在 `tasks/verification/` 下合适的模块中添加 `filter_verify_tables()` 函数 - _需求: 7.2_ - [x] 4. 检查点 — 确保所有测试通过 - 运行 `pytest tests/unit`,确保所有测试通过,如有问题请询问用户。 - [x] 5. 实现 TaskExecutor(执行层) - [x] 5.1 创建 `orchestration/task_executor.py` - 实现 `TaskExecutor` 类,构造函数接收 `config`、`db_ops`、`api_client`、`cursor_mgr`、`run_tracker`、`task_registry`、`logger` - 从 `ETLScheduler` 迁移以下方法:`run_tasks`、`_run_single_task`、`_execute_fetch`、`_execute_ingest`、`_execute_ods_record_and_load`、`_run_utility_task`、`_build_fetch_dir`、`_resolve_ingest_source`、`_counts_from_fetch`、`_load_task_config`、`_maybe_run_integrity_check`、`_attach_run_file_logger` - 将 `data_source` 改为方法参数(替代原 `self.pipeline_flow` 全局状态) - 使用 `self.task_registry.is_utility_task()` 替代硬编码的 `NO_DB_CONFIG_TASKS` - 使用 `RunTracker.map_run_status()` 替代 `self._map_run_status()` - 添加 `DataSource` 枚举类(`online`/`offline`/`hybrid`) - _需求: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6_ - [x] 5.2 编写 TaskExecutor 属性测试 - **Property 1: data_source 参数决定执行路径** - **Property 2: 成功任务推进游标** - **Property 3: 失败任务标记 FAIL 并重新抛出** - **Property 4: 工具类任务由元数据决定** - **验证: 需求 1.2, 1.3, 1.4, 1.6, 4.2** - [x] 6. 实现 PipelineRunner(编排层) - [x] 6.1 创建 `orchestration/pipeline_runner.py` - 实现 `PipelineRunner` 类,构造函数接收 `config`、`task_executor`、`task_registry`、`db_conn`、`api_client`、`logger` - 将 `PIPELINE_LAYERS` 常量从 `scheduler.py` 迁移至此 - 从 `ETLScheduler` 迁移以下方法:`run_pipeline_with_verification`(重命名为 `run`)、`_run_layer_verification`(重命名为 `_run_verification`)、`_get_tasks_for_layers`(重命名为 `_resolve_tasks`) - 使用 `filter_verify_tables()`(已移至校验模块)替代原内联静态方法 - 使用 `task_registry.get_tasks_by_layer()` 作为默认任务解析,配置覆盖优先 - _需求: 2.1, 2.2, 2.3, 2.4, 2.5, 2.6_ - [x] 6.2 编写 PipelineRunner 属性测试 - **Property 5: 管道名称→层列表映射** - **Property 6: processing_mode 控制执行流程** - **Property 7: 管道结果汇总完整性** - **验证: 需求 2.1, 2.3, 2.4, 2.6** - [x] 7. 检查点 — 确保所有测试通过 - 运行 `pytest tests/unit`,确保所有测试通过,如有问题请询问用户。 - [x] 8. 重构 CLI 层 - [x] 8.1 重构 `cli/main.py` 参数解析 - 添加 `--data-source` 参数(choices: online/offline/hybrid,默认 hybrid) - 保留 `--pipeline-flow` 作为弃用别名,使用时发出 `DeprecationWarning` 并映射到 `--data-source` - 更新 `build_cli_overrides()` 将 `--data-source` 写入 `run.data_source` 配置键 - _需求: 3.1, 3.5, 8.1, 8.2, 8.3_ - [x] 8.2 重构 `cli/main.py` 的 `main()` 函数 - 在 `try/finally` 块中管理 `DatabaseConnection` 和 `APIClient` 的生命周期 - 在 `try` 块内组装 `TaskExecutor` 和 `PipelineRunner`(依赖注入) - 管道模式委托 `PipelineRunner.run()`,传统模式委托 `TaskExecutor.run_tasks()` - 添加 `resolve_data_source(args)` 辅助函数处理新旧参数映射 - _需求: 3.2, 3.3, 3.4, 3.6, 6.1, 6.4_ - [x] 8.3 编写 CLI 参数解析单元测试 - 测试 `--data-source` 新参数正确解析 - 测试 `--pipeline-flow` 旧参数弃用映射 - 测试 `--pipeline` + `--tasks` 同时使用时的行为 - _需求: 3.1, 3.3, 3.5_ - [x] 9. 清理旧代码与集成 - [x] 9.1 重构 `orchestration/scheduler.py` 为薄包装层 - 将 `ETLScheduler` 改为薄包装,内部委托 `TaskExecutor` 和 `PipelineRunner` - 保留 `ETLScheduler` 类名和 `run_tasks()`、`run_pipeline_with_verification()`、`close()` 公共接口,标记为弃用 - 确保 GUI 层(`gui/workers/`)等现有调用方无需立即修改 - _需求: 8.1, 8.4_ - [x] 9.2 更新 GUI 工作线程中的调度器引用 - 检查 `gui/workers/` 中对 `ETLScheduler` 的使用 - 如有直接引用内部方法,更新为使用新的公共接口 - _需求: 7.3_ - [x] 9.3 编写集成测试验证端到端流程 - 使用 FakeDB/FakeAPI 验证 CLI → PipelineRunner → TaskExecutor 完整调用链 - 验证传统模式和管道模式均正常工作 - _需求: 9.4_ - [x] 10. 最终检查点 — 确保所有测试通过 - 运行 `pytest tests/unit`,确保所有测试通过,如有问题请询问用户。 ## 备注 - 标记 `*` 的子任务为可选测试任务,可跳过以加速 MVP - 每个任务引用了具体的需求编号,确保可追溯性 - 检查点确保增量验证,避免问题累积 - 属性测试使用 `hypothesis` 库,验证通用正确性属性 - 单元测试验证具体示例和边界条件 - `ETLScheduler` 保留为薄包装层,确保 GUI 等现有调用方平滑过渡