Files

8.3 KiB
Raw Permalink Blame History

需求文档ETL 调度器重构

简介

当前 orchestration/scheduler.py(约 900 行)中的 ETLScheduler 类承担了过多职责单任务执行、管道编排、资源管理。CLI 参数命名混乱(--pipeline vs --pipeline-flow vs --processing-mode全局状态耦合严重配置键语义重叠。本次重构将调度器拆分为三层架构CLI → PipelineRunner → TaskExecutor重新设计参数命名消除全局状态依赖使每层可独立测试。

术语表

  • TaskExecutor:任务执行器,负责单个 ETL 任务的执行、游标管理和运行记录
  • PipelineRunner:管道运行器,负责管道定义、层→任务映射、校验编排
  • TaskRegistry:任务注册表,管理所有已注册的任务类及其元数据
  • DataSource:数据源模式,取代原 pipeline.flow,表示数据来自在线 APIonline)、本地 JSONoffline)或混合模式(hybrid
  • ProcessingMode:处理模式,控制 ETL 执行策略(仅增量 / 仅校验 / 增量+校验)
  • Pipeline:管道,定义一组按层顺序执行的 ETL 任务集合(如 api_full = ODS → DWD → DWS → INDEX
  • CursorManager:游标管理器,管理任务的时间水位(上次处理到哪里)
  • RunTracker:运行记录器,在 etl_admin Schema 中记录每次任务执行的状态和统计

需求

需求 1架构分层 — TaskExecutor执行层

用户故事: 作为开发者,我希望单任务执行逻辑独立封装在 TaskExecutor 中,以便可以脱离管道上下文独立测试和复用。

验收标准

  1. THE TaskExecutor SHALL 封装单个任务的完整执行生命周期:创建运行记录、执行任务、更新游标、记录结果
  2. WHEN TaskExecutor 执行一个任务时THE TaskExecutor SHALL 接收显式的 data_source 参数,而非读取全局状态
  3. WHEN 任务执行成功且返回有效时间窗口时THE TaskExecutor SHALL 推进该任务的游标水位
  4. WHEN 任务执行过程中发生异常时THE TaskExecutor SHALL 将运行记录状态更新为 FAIL 并重新抛出异常
  5. THE TaskExecutor SHALL 通过构造函数接收 db_opsapi_clientcursor_managerrun_trackertask_registry 等依赖,而非自行创建
  6. WHEN 执行工具类任务(如 INIT_ODS_SCHEMATHE TaskExecutor SHALL 跳过游标管理和运行记录,直接执行任务

需求 2架构分层 — PipelineRunner编排层

用户故事: 作为开发者,我希望管道编排逻辑独立封装在 PipelineRunner 中,以便管道定义和校验流程可以独立演进。

验收标准

  1. THE PipelineRunner SHALL 根据管道名称解析出需要执行的层列表(如 api_full["ODS", "DWD", "DWS", "INDEX"]
  2. WHEN PipelineRunner 执行管道时THE PipelineRunner SHALL 委托 TaskExecutor 逐个执行任务,而非直接操作数据库或 API
  3. WHEN 处理模式为 verify_onlyTHE PipelineRunner SHALL 跳过增量 ETL仅执行校验流程
  4. WHEN 处理模式为 increment_verifyTHE PipelineRunner SHALL 先执行增量 ETL再执行校验流程
  5. THE PipelineRunner SHALL 根据层列表自动选择对应的任务代码,支持配置覆盖
  6. WHEN 管道执行完成时THE PipelineRunner SHALL 汇总所有任务的执行结果并返回统一的结果字典

需求 3架构分层 — CLI 层重构

用户故事: 作为运维人员,我希望 CLI 参数命名清晰、语义无歧义,以便快速理解和正确使用各种执行模式。

验收标准

  1. THE CLI SHALL 将 --pipeline-flowFULL/FETCH_ONLY/INGEST_ONLY重命名为 --data-sourceonline/offline/hybrid并保留旧名称作为别名
  2. THE CLI SHALL 保留 --pipeline 参数用于管道模式,保留 --tasks 参数用于传统模式
  3. WHEN 用户同时指定 --pipeline--tasksTHE CLI SHALL 将 --tasks 作为管道内的任务过滤器
  4. THE CLI SHALL 保留 --processing-modeincrement_only/verify_only/increment_verify参数不变
  5. WHEN 用户使用旧参数名 --pipeline-flowTHE CLI SHALL 发出弃用警告并将值映射到新的 --data-source 参数
  6. THE CLI SHALL 仅负责参数解析和配置加载,将执行逻辑委托给 PipelineRunner 或 TaskExecutor

需求 4任务分类元数据化

用户故事: 作为开发者,我希望任务的分类信息(是否需要数据库配置、所属层等)由任务注册表管理,而非硬编码在调度器中。

验收标准

  1. THE TaskRegistry SHALL 支持在注册任务时附带元数据(requires_db_configlayertask_type
  2. WHEN TaskExecutor 需要判断任务是否为工具类任务时THE TaskExecutor SHALL 查询 TaskRegistry 的元数据,而非检查硬编码集合
  3. WHEN PipelineRunner 需要根据层获取任务列表时THE PipelineRunner SHALL 查询 TaskRegistry 的 layer 元数据
  4. THE TaskRegistry SHALL 保持向后兼容,无元数据的任务默认为 requires_db_config=Truelayer=None

需求 5配置键重构

用户故事: 作为运维人员,我希望配置键命名合理、语义清晰,以便正确配置 ETL 系统的运行参数。

验收标准

  1. THE AppConfig SHALL 将 app.timezone 默认值从 Asia/Taipei 改为 Asia/Shanghai
  2. THE AppConfig SHALL 将 pipeline.flow 配置键重命名为 run.data_source,并保留旧键作为兼容别名
  3. WHEN 配置中同时存在旧键 pipeline.flow 和新键 run.data_sourceTHE AppConfig SHALL 优先使用新键的值
  4. THE AppConfig SHALL 将 pipeline.fetch_rootpipeline.ingest_source_dir 移至 io 命名空间下(io.fetch_rootio.ingest_source_dir

需求 6资源管理与生命周期

用户故事: 作为开发者,我希望数据库连接和 API 客户端的创建与关闭由 CLI 层统一管理,以便确保资源正确释放。

验收标准

  1. THE CLI SHALL 在 finally 块中关闭数据库连接和 API 客户端,确保异常情况下资源也能释放
  2. THE TaskExecutor SHALL 通过依赖注入接收已创建的数据库连接和 API 客户端,而非自行创建
  3. THE PipelineRunner SHALL 通过依赖注入接收已创建的数据库连接和 API 客户端,而非自行创建
  4. WHEN CLI 创建资源时THE CLI SHALL 使用 Python 上下文管理器(with 语句)或 try/finally 模式管理生命周期

需求 7静态方法归位

用户故事: 作为开发者,我希望与调度器无关的静态工具方法移至合适的模块,以便保持类的职责单一。

验收标准

  1. THE _map_run_status 方法 SHALL 从 ETLScheduler 移至 RunTracker 或独立的工具模块
  2. THE _filter_verify_tables 方法 SHALL 从 ETLScheduler 移至校验相关模块
  3. WHEN 静态方法被移动后THE 原调用方 SHALL 更新导入路径以引用新位置

需求 8向后兼容与过渡

用户故事: 作为运维人员,我希望重构后的系统在过渡期内兼容旧的 CLI 参数和配置键,以便平滑迁移。

验收标准

  1. WHEN 用户使用旧参数 --pipeline-flow FULLTHE CLI SHALL 将其等价映射为 --data-source hybrid 并发出弃用警告
  2. WHEN 用户使用旧参数 --pipeline-flow FETCH_ONLYTHE CLI SHALL 将其等价映射为 --data-source online 并发出弃用警告
  3. WHEN 用户使用旧参数 --pipeline-flow INGEST_ONLYTHE CLI SHALL 将其等价映射为 --data-source offline 并发出弃用警告
  4. WHEN 配置文件中使用旧键 pipeline.flowTHE AppConfig SHALL 自动映射到新键 run.data_source
  5. THE 系统 SHALL 在日志中记录所有弃用映射,便于运维人员逐步迁移

需求 9可测试性

用户故事: 作为开发者,我希望重构后的每一层都可以独立进行单元测试,以便快速验证逻辑正确性。

验收标准

  1. THE TaskExecutor SHALL 支持通过注入 mock 依赖FakeDB、FakeAPI进行单元测试无需真实数据库
  2. THE PipelineRunner SHALL 支持通过注入 mock TaskExecutor 进行单元测试,无需执行真实任务
  3. THE TaskRegistry SHALL 支持在测试中创建独立实例,不依赖全局 default_registry
  4. WHEN 运行单元测试时THE 测试 SHALL 验证各层之间的交互契约(调用参数、返回值格式)