16 KiB
设计文档:ETL 调度器重构
概述
本次重构将 ETLScheduler(约 900 行,职责混乱的"上帝类")拆分为三层清晰的架构:
- CLI 层(
cli/main.py):参数解析、配置加载、资源创建与释放 - PipelineRunner(
orchestration/pipeline_runner.py):管道定义、层→任务映射、校验编排 - TaskExecutor(
orchestration/task_executor.py):单任务执行、游标管理、运行记录
核心设计原则:单个任务是最小执行单元,管道模式只是"调度拼接"。每层通过依赖注入接收协作对象,不自行创建资源,便于独立测试。
架构
分层架构图
graph TD
CLI["CLI 层<br/>cli/main.py<br/>参数解析 · 配置加载 · 资源管理"]
PR["PipelineRunner<br/>orchestration/pipeline_runner.py<br/>管道定义 · 层→任务映射 · 校验编排"]
TE["TaskExecutor<br/>orchestration/task_executor.py<br/>单任务执行 · 游标管理 · 运行记录"]
TR["TaskRegistry<br/>orchestration/task_registry.py<br/>任务注册 · 元数据查询"]
CM["CursorManager"]
RT["RunTracker"]
DB["DatabaseConnection"]
API["APIClient"]
CLI -->|"创建并注入"| PR
CLI -->|"创建并注入"| TE
CLI -->|"管理生命周期"| DB
CLI -->|"管理生命周期"| API
PR -->|"委托执行"| TE
PR -->|"查询任务"| TR
TE -->|"查询元数据"| TR
TE -->|"管理游标"| CM
TE -->|"记录运行"| RT
TE -->|"使用"| DB
TE -->|"使用"| API
调用流程
传统模式(--tasks):
CLI → TaskExecutor.run_tasks([task_codes]) → TaskExecutor._run_single_task() × N
管道模式(--pipeline):
CLI → PipelineRunner.run(pipeline, processing_mode, ...)
→ PipelineRunner._resolve_tasks(layers)
→ TaskExecutor.run_tasks([resolved_tasks])
→ [可选] PipelineRunner._run_verification(layers, ...)
组件与接口
TaskExecutor
负责单任务执行的完整生命周期。从原 ETLScheduler 中提取 _run_single_task、_execute_fetch、_execute_ingest、_execute_ods_record_and_load、_run_utility_task 等方法。
class TaskExecutor:
def __init__(
self,
config: AppConfig,
db_ops: DatabaseOperations,
api_client: APIClient,
cursor_mgr: CursorManager,
run_tracker: RunTracker,
task_registry: TaskRegistry,
logger: logging.Logger,
):
...
def run_tasks(
self,
task_codes: list[str],
data_source: str = "hybrid", # online / offline / hybrid
) -> list[dict[str, Any]]:
"""批量执行任务列表,返回每个任务的结果。"""
...
def run_single_task(
self,
task_code: str,
run_uuid: str,
store_id: int,
data_source: str = "hybrid",
) -> dict[str, Any]:
"""执行单个任务的完整生命周期。"""
...
关键变化:
data_source作为显式参数传入,不再读取self.pipeline_flow全局状态- 工具类任务判断通过
TaskRegistry.get_metadata(task_code)查询,不再硬编码 - 不自行创建
DatabaseConnection或APIClient
PipelineRunner
负责管道编排。从原 ETLScheduler 中提取 run_pipeline_with_verification、_run_layer_verification、_get_tasks_for_layers 等方法。
class PipelineRunner:
# 管道定义(从 scheduler.py 模块级常量迁移至此)
PIPELINE_LAYERS: dict[str, list[str]] = {
"api_ods": ["ODS"],
"api_ods_dwd": ["ODS", "DWD"],
"api_full": ["ODS", "DWD", "DWS", "INDEX"],
"ods_dwd": ["DWD"],
"dwd_dws": ["DWS"],
"dwd_dws_index": ["DWS", "INDEX"],
"dwd_index": ["INDEX"],
}
def __init__(
self,
config: AppConfig,
task_executor: TaskExecutor,
task_registry: TaskRegistry,
db_conn: DatabaseConnection,
api_client: APIClient,
logger: logging.Logger,
):
...
def run(
self,
pipeline: str,
processing_mode: str = "increment_only",
data_source: str = "hybrid",
window_start: datetime | None = None,
window_end: datetime | None = None,
window_split: str | None = None,
task_codes: list[str] | None = None,
fetch_before_verify: bool = False,
verify_tables: list[str] | None = None,
) -> dict[str, Any]:
"""执行管道,返回汇总结果。"""
...
def _resolve_tasks(self, layers: list[str]) -> list[str]:
"""根据层列表解析任务代码,优先查询 TaskRegistry 元数据。"""
...
def _run_verification(self, layers, window_start, window_end, ...):
"""执行后置校验(从原 _run_layer_verification 迁移)。"""
...
TaskRegistry(增强)
在现有注册功能基础上增加元数据支持。
@dataclass
class TaskMeta:
"""任务元数据"""
task_class: type
requires_db_config: bool = True
layer: str | None = None # "ODS" / "DWD" / "DWS" / "INDEX" / None
task_type: str = "etl" # "etl" / "utility" / "verification"
class TaskRegistry:
def __init__(self):
self._tasks: dict[str, TaskMeta] = {}
def register(
self,
task_code: str,
task_class: type,
requires_db_config: bool = True,
layer: str | None = None,
task_type: str = "etl",
):
"""注册任务类及其元数据。"""
self._tasks[task_code.upper()] = TaskMeta(
task_class=task_class,
requires_db_config=requires_db_config,
layer=layer,
task_type=task_type,
)
def create_task(self, task_code, config, db_connection, api_client, logger):
"""创建任务实例(保持原有接口不变)。"""
...
def get_metadata(self, task_code: str) -> TaskMeta | None:
"""查询任务元数据。"""
...
def get_tasks_by_layer(self, layer: str) -> list[str]:
"""获取指定层的所有任务代码。"""
...
def is_utility_task(self, task_code: str) -> bool:
"""判断是否为工具类任务(不需要游标/运行记录)。"""
meta = self.get_metadata(task_code)
return meta is not None and not meta.requires_db_config
def get_all_task_codes(self) -> list[str]:
"""获取所有已注册的任务代码(保持原有接口)。"""
...
CLI 层重构
# cli/main.py 核心流程伪代码
def main():
args = parse_args()
config = AppConfig.load(build_cli_overrides(args))
# 资源创建
db_conn = DatabaseConnection(...)
api_client = APIClient(...)
try:
# 组装依赖
db_ops = DatabaseOperations(db_conn)
cursor_mgr = CursorManager(db_conn)
run_tracker = RunTracker(db_conn)
registry = default_registry
executor = TaskExecutor(config, db_ops, api_client, cursor_mgr, run_tracker, registry, logger)
if args.pipeline:
runner = PipelineRunner(config, executor, registry, db_conn, api_client, logger)
runner.run(
pipeline=args.pipeline,
processing_mode=args.processing_mode,
data_source=resolve_data_source(args),
...
)
else:
task_codes = config.get("run.tasks")
data_source = resolve_data_source(args)
executor.run_tasks(task_codes, data_source=data_source)
finally:
db_conn.close()
参数映射
| 旧参数 | 旧值 | 新参数 | 新值 | 说明 |
|---|---|---|---|---|
--pipeline-flow |
FULL |
--data-source |
hybrid |
在线抓取 + 本地入库 |
--pipeline-flow |
FETCH_ONLY |
--data-source |
online |
仅在线抓取落盘 |
--pipeline-flow |
INGEST_ONLY |
--data-source |
offline |
仅本地清洗入库 |
静态方法归位
| 方法 | 原位置 | 新位置 | 理由 |
|---|---|---|---|
_map_run_status |
ETLScheduler |
RunTracker |
状态映射是运行记录的职责 |
_filter_verify_tables |
ETLScheduler |
tasks/verification/ 模块 |
校验表过滤是校验模块的职责 |
数据模型
TaskMeta(新增)
@dataclass
class TaskMeta:
task_class: type # 任务类引用
requires_db_config: bool = True # 是否需要数据库任务配置(游标/运行记录)
layer: str | None = None # 所属层:"ODS"/"DWD"/"DWS"/"INDEX"/None
task_type: str = "etl" # 任务类型:"etl"/"utility"/"verification"
DataSource 枚举
class DataSource(str, Enum):
ONLINE = "online" # 仅在线抓取(原 FETCH_ONLY)
OFFLINE = "offline" # 仅本地入库(原 INGEST_ONLY)
HYBRID = "hybrid" # 抓取 + 入库(原 FULL)
配置键映射
| 旧键 | 新键 | 默认值 |
|---|---|---|
app.timezone |
app.timezone |
Asia/Shanghai(原 Asia/Taipei) |
pipeline.flow |
run.data_source |
hybrid |
pipeline.fetch_root |
io.fetch_root |
export/JSON |
pipeline.ingest_source_dir |
io.ingest_source_dir |
"" |
任务执行结果(不变)
# 单任务结果
{
"task_code": str,
"status": str, # "SUCCESS" / "FAIL" / "SKIP"
"counts": {
"fetched": int,
"inserted": int,
"updated": int,
"skipped": int,
"errors": int,
},
"window": {"start": datetime, "end": datetime, "minutes": int} | None,
"dump_dir": str | None,
}
# 管道结果
{
"status": str,
"pipeline": str,
"layers": list[str],
"results": list[dict], # 各任务结果
"verification_summary": dict | None, # 校验汇总
}
正确性属性
正确性属性是一种在系统所有有效执行中都应成立的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。
Property 1:data_source 参数决定执行路径
对于任意 任务代码和任意 data_source 值(online/offline/hybrid),TaskExecutor 执行该任务时,抓取阶段执行当且仅当 data_source 为 online 或 hybrid,入库阶段执行当且仅当 data_source 为 offline 或 hybrid。
验证:需求 1.2
Property 2:成功任务推进游标
对于任意 非工具类任务,当任务执行成功且返回包含有效 window(含 start 和 end)的结果时,CursorManager.advance 应被调用且参数与返回的窗口一致。
验证:需求 1.3
Property 3:失败任务标记 FAIL 并重新抛出
对于任意 非工具类任务,当任务执行过程中抛出异常时,RunTracker 应被更新为 FAIL 状态,且该异常应被重新抛出给调用方。
验证:需求 1.4
Property 4:工具类任务由元数据决定
对于任意 任务代码,TaskExecutor 是否跳过游标管理和运行记录,取决于 TaskRegistry 中该任务的 requires_db_config 元数据。当 requires_db_config=False 时跳过,否则执行完整生命周期。
验证:需求 1.6, 4.2
Property 5:管道名称→层列表映射
对于任意 有效的管道名称,PipelineRunner 解析出的层列表应与 PIPELINE_LAYERS 字典中的定义完全一致。
验证:需求 2.1
Property 6:processing_mode 控制执行流程
对于任意 processing_mode 值,增量 ETL 执行当且仅当模式包含 increment(即 increment_only 或 increment_verify),校验流程执行当且仅当模式包含 verify(即 verify_only 或 increment_verify)。
验证:需求 2.3, 2.4
Property 7:管道结果汇总完整性
对于任意 一组任务执行结果,PipelineRunner 返回的汇总字典应包含 status、pipeline、layers、results 字段,且 results 列表长度等于实际执行的任务数。
验证:需求 2.6
Property 8:TaskRegistry 元数据 round-trip
对于任意 任务代码、任务类和元数据组合(requires_db_config、layer、task_type),注册后通过 get_metadata 查询应返回相同的元数据值。
验证:需求 4.1
Property 9:TaskRegistry 向后兼容默认值
对于任意 使用旧接口(仅 task_code 和 task_class)注册的任务,查询元数据应返回 requires_db_config=True、layer=None、task_type="etl"。
验证:需求 4.4
Property 10:按层查询任务
对于任意 注册了 layer 元数据的任务集合,get_tasks_by_layer(layer) 返回的任务代码集合应等于所有 layer 匹配的已注册任务代码集合。
验证:需求 4.3
Property 11:pipeline_flow → data_source 映射一致性
对于任意 旧 pipeline_flow 值(FULL/FETCH_ONLY/INGEST_ONLY),映射到 data_source 的结果应与预定义映射表一致:FULL→hybrid、FETCH_ONLY→online、INGEST_ONLY→offline。同样,配置键 pipeline.flow 应自动映射到 run.data_source。
验证:需求 8.1, 8.2, 8.3, 5.2, 8.4
错误处理
TaskExecutor 错误处理
- 任务执行异常:更新 RunTracker 状态为 FAIL(含 error_message),然后重新抛出异常
- 游标推进失败:记录错误日志,不影响任务结果(任务本身已成功)
- 任务配置不存在:返回
{"status": "SKIP"}结果,不抛异常
PipelineRunner 错误处理
- 单个任务失败:记录错误,继续执行后续任务(与当前行为一致)
- 校验框架未安装:返回
{"status": "SKIPPED"}并记录警告 - 无效管道名称:抛出
ValueError
CLI 错误处理
- 配置加载失败:
SystemExit并输出错误信息 - 资源创建失败:
SystemExit并输出错误信息 - 执行过程异常:记录错误日志,
finally块确保资源释放,返回非零退出码
弃用警告
- 使用 Python
warnings.warn(DeprecationWarning)发出弃用警告 - 同时在日志中记录映射详情,便于运维排查
测试策略
单元测试
使用 pytest + 现有的 FakeDB/FakeAPI 测试工具(tests/unit/task_test_utils.py)。
TaskExecutor 测试:
- 注入 mock 依赖(FakeDB、FakeAPI、mock CursorManager、mock RunTracker)
- 验证成功/失败/跳过三种路径
- 验证工具类任务不触发游标/运行记录
- 验证 data_source 参数正确控制抓取/入库阶段
PipelineRunner 测试:
- 注入 mock TaskExecutor
- 验证不同 processing_mode 下的执行流程
- 验证管道→层→任务的解析链
TaskRegistry 测试:
- 验证元数据注册和查询
- 验证向后兼容(无元数据注册)
- 验证按层查询
配置兼容性测试:
- 验证旧键→新键映射
- 验证优先级规则
- 验证默认值变更
属性测试
使用 hypothesis 库进行属性测试,每个属性至少运行 100 次迭代。
每个属性测试必须用注释标注对应的设计属性编号:
# Feature: scheduler-refactor, Property 8: TaskRegistry 元数据 round-trip
属性测试覆盖:
- Property 1: data_source 参数决定执行路径
- Property 2: 成功任务推进游标
- Property 3: 失败任务标记 FAIL 并重新抛出
- Property 4: 工具类任务由元数据决定
- Property 5: 管道名称→层列表映射
- Property 6: processing_mode 控制执行流程
- Property 7: 管道结果汇总完整性
- Property 8: TaskRegistry 元数据 round-trip
- Property 9: TaskRegistry 向后兼容默认值
- Property 10: 按层查询任务
- Property 11: pipeline_flow → data_source 映射一致性