Files

16 KiB
Raw Permalink Blame History

设计文档ETL 调度器重构

概述

本次重构将 ETLScheduler(约 900 行,职责混乱的"上帝类")拆分为三层清晰的架构:

  1. CLI 层cli/main.py):参数解析、配置加载、资源创建与释放
  2. PipelineRunnerorchestration/pipeline_runner.py):管道定义、层→任务映射、校验编排
  3. TaskExecutororchestration/task_executor.py):单任务执行、游标管理、运行记录

核心设计原则:单个任务是最小执行单元,管道模式只是"调度拼接"。每层通过依赖注入接收协作对象,不自行创建资源,便于独立测试。

架构

分层架构图

graph TD
    CLI["CLI 层<br/>cli/main.py<br/>参数解析 · 配置加载 · 资源管理"]
    PR["PipelineRunner<br/>orchestration/pipeline_runner.py<br/>管道定义 · 层→任务映射 · 校验编排"]
    TE["TaskExecutor<br/>orchestration/task_executor.py<br/>单任务执行 · 游标管理 · 运行记录"]
    TR["TaskRegistry<br/>orchestration/task_registry.py<br/>任务注册 · 元数据查询"]
    CM["CursorManager"]
    RT["RunTracker"]
    DB["DatabaseConnection"]
    API["APIClient"]

    CLI -->|"创建并注入"| PR
    CLI -->|"创建并注入"| TE
    CLI -->|"管理生命周期"| DB
    CLI -->|"管理生命周期"| API
    PR -->|"委托执行"| TE
    PR -->|"查询任务"| TR
    TE -->|"查询元数据"| TR
    TE -->|"管理游标"| CM
    TE -->|"记录运行"| RT
    TE -->|"使用"| DB
    TE -->|"使用"| API

调用流程

传统模式--tasks

CLI → TaskExecutor.run_tasks([task_codes]) → TaskExecutor._run_single_task() × N

管道模式--pipeline

CLI → PipelineRunner.run(pipeline, processing_mode, ...) 
    → PipelineRunner._resolve_tasks(layers) 
    → TaskExecutor.run_tasks([resolved_tasks])
    → [可选] PipelineRunner._run_verification(layers, ...)

组件与接口

TaskExecutor

负责单任务执行的完整生命周期。从原 ETLScheduler 中提取 _run_single_task_execute_fetch_execute_ingest_execute_ods_record_and_load_run_utility_task 等方法。

class TaskExecutor:
    def __init__(
        self,
        config: AppConfig,
        db_ops: DatabaseOperations,
        api_client: APIClient,
        cursor_mgr: CursorManager,
        run_tracker: RunTracker,
        task_registry: TaskRegistry,
        logger: logging.Logger,
    ):
        ...

    def run_tasks(
        self,
        task_codes: list[str],
        data_source: str = "hybrid",  # online / offline / hybrid
    ) -> list[dict[str, Any]]:
        """批量执行任务列表,返回每个任务的结果。"""
        ...

    def run_single_task(
        self,
        task_code: str,
        run_uuid: str,
        store_id: int,
        data_source: str = "hybrid",
    ) -> dict[str, Any]:
        """执行单个任务的完整生命周期。"""
        ...

关键变化:

  • data_source 作为显式参数传入,不再读取 self.pipeline_flow 全局状态
  • 工具类任务判断通过 TaskRegistry.get_metadata(task_code) 查询,不再硬编码
  • 不自行创建 DatabaseConnectionAPIClient

PipelineRunner

负责管道编排。从原 ETLScheduler 中提取 run_pipeline_with_verification_run_layer_verification_get_tasks_for_layers 等方法。

class PipelineRunner:
    # 管道定义(从 scheduler.py 模块级常量迁移至此)
    PIPELINE_LAYERS: dict[str, list[str]] = {
        "api_ods": ["ODS"],
        "api_ods_dwd": ["ODS", "DWD"],
        "api_full": ["ODS", "DWD", "DWS", "INDEX"],
        "ods_dwd": ["DWD"],
        "dwd_dws": ["DWS"],
        "dwd_dws_index": ["DWS", "INDEX"],
        "dwd_index": ["INDEX"],
    }

    def __init__(
        self,
        config: AppConfig,
        task_executor: TaskExecutor,
        task_registry: TaskRegistry,
        db_conn: DatabaseConnection,
        api_client: APIClient,
        logger: logging.Logger,
    ):
        ...

    def run(
        self,
        pipeline: str,
        processing_mode: str = "increment_only",
        data_source: str = "hybrid",
        window_start: datetime | None = None,
        window_end: datetime | None = None,
        window_split: str | None = None,
        task_codes: list[str] | None = None,
        fetch_before_verify: bool = False,
        verify_tables: list[str] | None = None,
    ) -> dict[str, Any]:
        """执行管道,返回汇总结果。"""
        ...

    def _resolve_tasks(self, layers: list[str]) -> list[str]:
        """根据层列表解析任务代码,优先查询 TaskRegistry 元数据。"""
        ...

    def _run_verification(self, layers, window_start, window_end, ...):
        """执行后置校验(从原 _run_layer_verification 迁移)。"""
        ...

TaskRegistry增强

在现有注册功能基础上增加元数据支持。

@dataclass
class TaskMeta:
    """任务元数据"""
    task_class: type
    requires_db_config: bool = True
    layer: str | None = None        # "ODS" / "DWD" / "DWS" / "INDEX" / None
    task_type: str = "etl"          # "etl" / "utility" / "verification"

class TaskRegistry:
    def __init__(self):
        self._tasks: dict[str, TaskMeta] = {}

    def register(
        self,
        task_code: str,
        task_class: type,
        requires_db_config: bool = True,
        layer: str | None = None,
        task_type: str = "etl",
    ):
        """注册任务类及其元数据。"""
        self._tasks[task_code.upper()] = TaskMeta(
            task_class=task_class,
            requires_db_config=requires_db_config,
            layer=layer,
            task_type=task_type,
        )

    def create_task(self, task_code, config, db_connection, api_client, logger):
        """创建任务实例(保持原有接口不变)。"""
        ...

    def get_metadata(self, task_code: str) -> TaskMeta | None:
        """查询任务元数据。"""
        ...

    def get_tasks_by_layer(self, layer: str) -> list[str]:
        """获取指定层的所有任务代码。"""
        ...

    def is_utility_task(self, task_code: str) -> bool:
        """判断是否为工具类任务(不需要游标/运行记录)。"""
        meta = self.get_metadata(task_code)
        return meta is not None and not meta.requires_db_config

    def get_all_task_codes(self) -> list[str]:
        """获取所有已注册的任务代码(保持原有接口)。"""
        ...

CLI 层重构

# cli/main.py 核心流程伪代码

def main():
    args = parse_args()
    config = AppConfig.load(build_cli_overrides(args))

    # 资源创建
    db_conn = DatabaseConnection(...)
    api_client = APIClient(...)

    try:
        # 组装依赖
        db_ops = DatabaseOperations(db_conn)
        cursor_mgr = CursorManager(db_conn)
        run_tracker = RunTracker(db_conn)
        registry = default_registry

        executor = TaskExecutor(config, db_ops, api_client, cursor_mgr, run_tracker, registry, logger)

        if args.pipeline:
            runner = PipelineRunner(config, executor, registry, db_conn, api_client, logger)
            runner.run(
                pipeline=args.pipeline,
                processing_mode=args.processing_mode,
                data_source=resolve_data_source(args),
                ...
            )
        else:
            task_codes = config.get("run.tasks")
            data_source = resolve_data_source(args)
            executor.run_tasks(task_codes, data_source=data_source)
    finally:
        db_conn.close()

参数映射

旧参数 旧值 新参数 新值 说明
--pipeline-flow FULL --data-source hybrid 在线抓取 + 本地入库
--pipeline-flow FETCH_ONLY --data-source online 仅在线抓取落盘
--pipeline-flow INGEST_ONLY --data-source offline 仅本地清洗入库

静态方法归位

方法 原位置 新位置 理由
_map_run_status ETLScheduler RunTracker 状态映射是运行记录的职责
_filter_verify_tables ETLScheduler tasks/verification/ 模块 校验表过滤是校验模块的职责

数据模型

TaskMeta新增

@dataclass
class TaskMeta:
    task_class: type                    # 任务类引用
    requires_db_config: bool = True     # 是否需要数据库任务配置(游标/运行记录)
    layer: str | None = None            # 所属层:"ODS"/"DWD"/"DWS"/"INDEX"/None
    task_type: str = "etl"              # 任务类型:"etl"/"utility"/"verification"

DataSource 枚举

class DataSource(str, Enum):
    ONLINE = "online"    # 仅在线抓取(原 FETCH_ONLY
    OFFLINE = "offline"  # 仅本地入库(原 INGEST_ONLY
    HYBRID = "hybrid"    # 抓取 + 入库(原 FULL

配置键映射

旧键 新键 默认值
app.timezone app.timezone Asia/Shanghai(原 Asia/Taipei
pipeline.flow run.data_source hybrid
pipeline.fetch_root io.fetch_root export/JSON
pipeline.ingest_source_dir io.ingest_source_dir ""

任务执行结果(不变)

# 单任务结果
{
    "task_code": str,
    "status": str,       # "SUCCESS" / "FAIL" / "SKIP"
    "counts": {
        "fetched": int,
        "inserted": int,
        "updated": int,
        "skipped": int,
        "errors": int,
    },
    "window": {"start": datetime, "end": datetime, "minutes": int} | None,
    "dump_dir": str | None,
}

# 管道结果
{
    "status": str,
    "pipeline": str,
    "layers": list[str],
    "results": list[dict],              # 各任务结果
    "verification_summary": dict | None, # 校验汇总
}

正确性属性

正确性属性是一种在系统所有有效执行中都应成立的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。

Property 1data_source 参数决定执行路径

对于任意 任务代码和任意 data_sourceonline/offline/hybridTaskExecutor 执行该任务时,抓取阶段执行当且仅当 data_sourceonlinehybrid,入库阶段执行当且仅当 data_sourceofflinehybrid

验证:需求 1.2

Property 2成功任务推进游标

对于任意 非工具类任务,当任务执行成功且返回包含有效 window(含 startend的结果时CursorManager.advance 应被调用且参数与返回的窗口一致。

验证:需求 1.3

Property 3失败任务标记 FAIL 并重新抛出

对于任意 非工具类任务当任务执行过程中抛出异常时RunTracker 应被更新为 FAIL 状态,且该异常应被重新抛出给调用方。

验证:需求 1.4

Property 4工具类任务由元数据决定

对于任意 任务代码TaskExecutor 是否跳过游标管理和运行记录,取决于 TaskRegistry 中该任务的 requires_db_config 元数据。当 requires_db_config=False 时跳过,否则执行完整生命周期。

验证:需求 1.6, 4.2

Property 5管道名称→层列表映射

对于任意 有效的管道名称PipelineRunner 解析出的层列表应与 PIPELINE_LAYERS 字典中的定义完全一致。

验证:需求 2.1

Property 6processing_mode 控制执行流程

对于任意 processing_mode 值,增量 ETL 执行当且仅当模式包含 increment(即 increment_onlyincrement_verify),校验流程执行当且仅当模式包含 verify(即 verify_onlyincrement_verify)。

验证:需求 2.3, 2.4

Property 7管道结果汇总完整性

对于任意 一组任务执行结果PipelineRunner 返回的汇总字典应包含 statuspipelinelayersresults 字段,且 results 列表长度等于实际执行的任务数。

验证:需求 2.6

Property 8TaskRegistry 元数据 round-trip

对于任意 任务代码、任务类和元数据组合requires_db_config、layer、task_type注册后通过 get_metadata 查询应返回相同的元数据值。

验证:需求 4.1

Property 9TaskRegistry 向后兼容默认值

对于任意 使用旧接口(仅 task_code 和 task_class注册的任务查询元数据应返回 requires_db_config=Truelayer=Nonetask_type="etl"

验证:需求 4.4

Property 10按层查询任务

对于任意 注册了 layer 元数据的任务集合,get_tasks_by_layer(layer) 返回的任务代码集合应等于所有 layer 匹配的已注册任务代码集合。

验证:需求 4.3

Property 11pipeline_flow → data_source 映射一致性

对于任意pipeline_flowFULL/FETCH_ONLY/INGEST_ONLY映射到 data_source 的结果应与预定义映射表一致FULL→hybrid、FETCH_ONLY→online、INGEST_ONLY→offline。同样配置键 pipeline.flow 应自动映射到 run.data_source

验证:需求 8.1, 8.2, 8.3, 5.2, 8.4

错误处理

TaskExecutor 错误处理

  • 任务执行异常:更新 RunTracker 状态为 FAIL含 error_message然后重新抛出异常
  • 游标推进失败:记录错误日志,不影响任务结果(任务本身已成功)
  • 任务配置不存在:返回 {"status": "SKIP"} 结果,不抛异常

PipelineRunner 错误处理

  • 单个任务失败:记录错误,继续执行后续任务(与当前行为一致)
  • 校验框架未安装:返回 {"status": "SKIPPED"} 并记录警告
  • 无效管道名称:抛出 ValueError

CLI 错误处理

  • 配置加载失败:SystemExit 并输出错误信息
  • 资源创建失败:SystemExit 并输出错误信息
  • 执行过程异常:记录错误日志,finally 块确保资源释放,返回非零退出码

弃用警告

  • 使用 Python warnings.warn(DeprecationWarning) 发出弃用警告
  • 同时在日志中记录映射详情,便于运维排查

测试策略

单元测试

使用 pytest + 现有的 FakeDB/FakeAPI 测试工具(tests/unit/task_test_utils.py)。

TaskExecutor 测试

  • 注入 mock 依赖FakeDB、FakeAPI、mock CursorManager、mock RunTracker
  • 验证成功/失败/跳过三种路径
  • 验证工具类任务不触发游标/运行记录
  • 验证 data_source 参数正确控制抓取/入库阶段

PipelineRunner 测试

  • 注入 mock TaskExecutor
  • 验证不同 processing_mode 下的执行流程
  • 验证管道→层→任务的解析链

TaskRegistry 测试

  • 验证元数据注册和查询
  • 验证向后兼容(无元数据注册)
  • 验证按层查询

配置兼容性测试

  • 验证旧键→新键映射
  • 验证优先级规则
  • 验证默认值变更

属性测试

使用 hypothesis 库进行属性测试,每个属性至少运行 100 次迭代。

每个属性测试必须用注释标注对应的设计属性编号:

# Feature: scheduler-refactor, Property 8: TaskRegistry 元数据 round-trip

属性测试覆盖

  • Property 1: data_source 参数决定执行路径
  • Property 2: 成功任务推进游标
  • Property 3: 失败任务标记 FAIL 并重新抛出
  • Property 4: 工具类任务由元数据决定
  • Property 5: 管道名称→层列表映射
  • Property 6: processing_mode 控制执行流程
  • Property 7: 管道结果汇总完整性
  • Property 8: TaskRegistry 元数据 round-trip
  • Property 9: TaskRegistry 向后兼容默认值
  • Property 10: 按层查询任务
  • Property 11: pipeline_flow → data_source 映射一致性