8.1 KiB
实现计划:ETL 调度器重构
概述
将 ETLScheduler(~900 行)拆分为 TaskExecutor(执行层)、PipelineRunner(编排层)、增强版 TaskRegistry(元数据),重构 CLI 参数和配置键,保持向后兼容。采用自底向上的实现顺序:先基础组件,再上层编排,最后 CLI 集成。
任务
-
1. 增强 TaskRegistry,支持元数据注册与查询
-
1.1 扩展 TaskRegistry 类,添加 TaskMeta 数据类和元数据相关方法
- 在
orchestration/task_registry.py中添加TaskMetadataclass(task_class、requires_db_config、layer、task_type) - 修改
register()方法签名,增加可选的requires_db_config、layer、task_type参数 - 添加
get_metadata()、get_tasks_by_layer()、is_utility_task()方法 - 保持
create_task()和get_all_task_codes()接口不变 - 需求: 4.1, 4.4
- 在
-
1.2 更新所有任务注册调用,添加元数据
- 将原
NO_DB_CONFIG_TASKS硬编码集合中的任务标记为requires_db_config=False - 为 ODS 任务添加
layer="ODS",DWD 任务添加layer="DWD",DWS 任务添加layer="DWS",INDEX 任务添加layer="INDEX" - 工具类任务标记
task_type="utility",校验类任务标记task_type="verification" - 需求: 4.1, 4.2, 4.3
- 将原
-
1.3 编写 TaskRegistry 属性测试
- Property 8: TaskRegistry 元数据 round-trip
- 验证: 需求 4.1
-
1.4 编写 TaskRegistry 向后兼容和按层查询属性测试
- Property 9: TaskRegistry 向后兼容默认值
- Property 10: 按层查询任务
- 验证: 需求 4.4, 4.3
-
-
2. 配置键重构与向后兼容
-
2.1 修改
config/defaults.py默认值- 将
app.timezone默认值从Asia/Taipei改为Asia/Shanghai - 将
db.session.timezone默认值从Asia/Taipei改为Asia/Shanghai - 添加
run.data_source键(默认hybrid) - 将
pipeline.fetch_root和pipeline.ingest_source_dir复制到io.fetch_root和io.ingest_source_dir(保留旧键兼容) - 需求: 5.1, 5.2, 5.4
- 将
-
2.2 在
config/settings.py的_normalize()中添加兼容映射逻辑- 旧键
pipeline.flow→ 新键run.data_source(值映射:FULL→hybrid, FETCH_ONLY→online, INGEST_ONLY→offline) - 旧键
pipeline.fetch_root→io.fetch_root,pipeline.ingest_source_dir→io.ingest_source_dir - 新键优先:当新旧键同时存在时,使用新键的值
- 记录弃用警告日志
- 需求: 5.2, 5.3, 5.4, 8.4, 8.5
- 旧键
-
2.3 编写配置映射属性测试
- Property 11: pipeline_flow → data_source 映射一致性
- 验证: 需求 8.1, 8.2, 8.3, 5.2, 8.4
-
-
3. 静态方法归位
-
3.1 将
_map_run_status移至 RunTracker- 在
orchestration/run_tracker.py中添加map_run_status()静态方法(从ETLScheduler._map_run_status复制) - 需求: 7.1
- 在
-
3.2 将
_filter_verify_tables移至校验模块- 在
tasks/verification/下合适的模块中添加filter_verify_tables()函数 - 需求: 7.2
- 在
-
-
4. 检查点 — 确保所有测试通过
- 运行
pytest tests/unit,确保所有测试通过,如有问题请询问用户。
- 运行
-
5. 实现 TaskExecutor(执行层)
-
5.1 创建
orchestration/task_executor.py- 实现
TaskExecutor类,构造函数接收config、db_ops、api_client、cursor_mgr、run_tracker、task_registry、logger - 从
ETLScheduler迁移以下方法:run_tasks、_run_single_task、_execute_fetch、_execute_ingest、_execute_ods_record_and_load、_run_utility_task、_build_fetch_dir、_resolve_ingest_source、_counts_from_fetch、_load_task_config、_maybe_run_integrity_check、_attach_run_file_logger - 将
data_source改为方法参数(替代原self.pipeline_flow全局状态) - 使用
self.task_registry.is_utility_task()替代硬编码的NO_DB_CONFIG_TASKS - 使用
RunTracker.map_run_status()替代self._map_run_status() - 添加
DataSource枚举类(online/offline/hybrid) - 需求: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6
- 实现
-
5.2 编写 TaskExecutor 属性测试
- Property 1: data_source 参数决定执行路径
- Property 2: 成功任务推进游标
- Property 3: 失败任务标记 FAIL 并重新抛出
- Property 4: 工具类任务由元数据决定
- 验证: 需求 1.2, 1.3, 1.4, 1.6, 4.2
-
-
6. 实现 PipelineRunner(编排层)
-
6.1 创建
orchestration/pipeline_runner.py- 实现
PipelineRunner类,构造函数接收config、task_executor、task_registry、db_conn、api_client、logger - 将
PIPELINE_LAYERS常量从scheduler.py迁移至此 - 从
ETLScheduler迁移以下方法:run_pipeline_with_verification(重命名为run)、_run_layer_verification(重命名为_run_verification)、_get_tasks_for_layers(重命名为_resolve_tasks) - 使用
filter_verify_tables()(已移至校验模块)替代原内联静态方法 - 使用
task_registry.get_tasks_by_layer()作为默认任务解析,配置覆盖优先 - 需求: 2.1, 2.2, 2.3, 2.4, 2.5, 2.6
- 实现
-
6.2 编写 PipelineRunner 属性测试
- Property 5: 管道名称→层列表映射
- Property 6: processing_mode 控制执行流程
- Property 7: 管道结果汇总完整性
- 验证: 需求 2.1, 2.3, 2.4, 2.6
-
-
7. 检查点 — 确保所有测试通过
- 运行
pytest tests/unit,确保所有测试通过,如有问题请询问用户。
- 运行
-
8. 重构 CLI 层
-
8.1 重构
cli/main.py参数解析- 添加
--data-source参数(choices: online/offline/hybrid,默认 hybrid) - 保留
--pipeline-flow作为弃用别名,使用时发出DeprecationWarning并映射到--data-source - 更新
build_cli_overrides()将--data-source写入run.data_source配置键 - 需求: 3.1, 3.5, 8.1, 8.2, 8.3
- 添加
-
8.2 重构
cli/main.py的main()函数- 在
try/finally块中管理DatabaseConnection和APIClient的生命周期 - 在
try块内组装TaskExecutor和PipelineRunner(依赖注入) - 管道模式委托
PipelineRunner.run(),传统模式委托TaskExecutor.run_tasks() - 添加
resolve_data_source(args)辅助函数处理新旧参数映射 - 需求: 3.2, 3.3, 3.4, 3.6, 6.1, 6.4
- 在
-
8.3 编写 CLI 参数解析单元测试
- 测试
--data-source新参数正确解析 - 测试
--pipeline-flow旧参数弃用映射 - 测试
--pipeline+--tasks同时使用时的行为 - 需求: 3.1, 3.3, 3.5
- 测试
-
-
9. 清理旧代码与集成
-
9.1 重构
orchestration/scheduler.py为薄包装层- 将
ETLScheduler改为薄包装,内部委托TaskExecutor和PipelineRunner - 保留
ETLScheduler类名和run_tasks()、run_pipeline_with_verification()、close()公共接口,标记为弃用 - 确保 GUI 层(
gui/workers/)等现有调用方无需立即修改 - 需求: 8.1, 8.4
- 将
-
9.2 更新 GUI 工作线程中的调度器引用
- 检查
gui/workers/中对ETLScheduler的使用 - 如有直接引用内部方法,更新为使用新的公共接口
- 需求: 7.3
- 检查
-
9.3 编写集成测试验证端到端流程
- 使用 FakeDB/FakeAPI 验证 CLI → PipelineRunner → TaskExecutor 完整调用链
- 验证传统模式和管道模式均正常工作
- 需求: 9.4
-
-
10. 最终检查点 — 确保所有测试通过
- 运行
pytest tests/unit,确保所有测试通过,如有问题请询问用户。
- 运行
备注
- 标记
*的子任务为可选测试任务,可跳过以加速 MVP - 每个任务引用了具体的需求编号,确保可追溯性
- 检查点确保增量验证,避免问题累积
- 属性测试使用
hypothesis库,验证通用正确性属性 - 单元测试验证具体示例和边界条件
ETLScheduler保留为薄包装层,确保 GUI 等现有调用方平滑过渡