223 lines
8.2 KiB
Python
223 lines
8.2 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""端到端流程集成测试
|
||
|
||
验证 CLI → PipelineRunner → TaskExecutor 完整调用链。
|
||
使用 mock 依赖,不需要真实数据库。
|
||
|
||
需求: 9.4
|
||
"""
|
||
from unittest.mock import MagicMock, patch, PropertyMock
|
||
import pytest
|
||
|
||
from orchestration.task_executor import TaskExecutor, DataSource
|
||
from orchestration.pipeline_runner import PipelineRunner
|
||
from orchestration.task_registry import TaskRegistry
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 辅助:构造最小可用的 mock config
|
||
# ---------------------------------------------------------------------------
|
||
def _make_config(**overrides):
|
||
"""构造一个行为类似 AppConfig 的 MagicMock。"""
|
||
store = {
|
||
"app.timezone": "Asia/Shanghai",
|
||
"app.store_id": 1,
|
||
"io.fetch_root": "/tmp/fetch",
|
||
"io.ingest_source_dir": "",
|
||
"io.write_pretty_json": False,
|
||
"io.export_root": "/tmp/export",
|
||
"io.log_root": "/tmp/logs",
|
||
"pipeline.fetch_root": None,
|
||
"pipeline.ingest_source_dir": None,
|
||
"run.ods_tasks": [],
|
||
"run.dws_tasks": [],
|
||
"run.index_tasks": [],
|
||
"run.data_source": "hybrid",
|
||
"verification.ods_use_local_json": False,
|
||
"verification.skip_ods_when_fetch_before_verify": True,
|
||
}
|
||
store.update(overrides)
|
||
|
||
config = MagicMock()
|
||
config.get = MagicMock(side_effect=lambda k, d=None: store.get(k, d))
|
||
config.__getitem__ = MagicMock(side_effect=lambda k: {
|
||
"io": {"export_root": "/tmp/export", "log_root": "/tmp/logs"},
|
||
}[k])
|
||
return config
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 辅助:构造一个可被 TaskRegistry 注册的假任务类
|
||
# ---------------------------------------------------------------------------
|
||
class _FakeTask:
|
||
"""最小假任务,execute() 返回固定结果。"""
|
||
def __init__(self, config, db_ops, api_client, logger):
|
||
pass
|
||
|
||
def execute(self, cursor_data):
|
||
return {"status": "SUCCESS", "counts": {"fetched": 5, "inserted": 3}}
|
||
|
||
|
||
# ===========================================================================
|
||
# 测试 1:传统模式 — TaskExecutor.run_tasks 端到端
|
||
# ===========================================================================
|
||
class TestTraditionalModeE2E:
|
||
"""传统模式:TaskExecutor.run_tasks 端到端"""
|
||
|
||
def test_run_tasks_executes_utility_task_and_returns_results(self):
|
||
"""工具类任务走 _run_utility_task 路径,跳过游标和运行记录。"""
|
||
config = _make_config()
|
||
registry = TaskRegistry()
|
||
registry.register(
|
||
"FAKE_UTIL", _FakeTask,
|
||
requires_db_config=False, task_type="utility",
|
||
)
|
||
|
||
cursor_mgr = MagicMock()
|
||
run_tracker = MagicMock()
|
||
|
||
executor = TaskExecutor(
|
||
config=config,
|
||
db_ops=MagicMock(),
|
||
api_client=MagicMock(),
|
||
cursor_mgr=cursor_mgr,
|
||
run_tracker=run_tracker,
|
||
task_registry=registry,
|
||
logger=MagicMock(),
|
||
)
|
||
|
||
results = executor.run_tasks(["FAKE_UTIL"], data_source="hybrid")
|
||
|
||
assert len(results) == 1
|
||
# 工具类任务成功时 run_tasks 包装为 "成功"
|
||
assert results[0]["status"] in ("成功", "完成", "SUCCESS")
|
||
# 工具类任务不应触发游标或运行记录
|
||
cursor_mgr.get_or_create.assert_not_called()
|
||
run_tracker.create_run.assert_not_called()
|
||
|
||
|
||
# ===========================================================================
|
||
# 测试 2:管道模式 — PipelineRunner → TaskExecutor 端到端
|
||
# ===========================================================================
|
||
class TestPipelineModeE2E:
|
||
"""管道模式:PipelineRunner.run → TaskExecutor.run_tasks 端到端"""
|
||
|
||
def test_pipeline_delegates_to_executor_and_returns_structure(self):
|
||
"""PipelineRunner 解析层→任务后委托 TaskExecutor 执行。"""
|
||
executor = MagicMock()
|
||
executor.run_tasks.return_value = [
|
||
{"task_code": "FAKE_ODS", "status": "成功", "counts": {"fetched": 10, "inserted": 8}},
|
||
]
|
||
|
||
registry = TaskRegistry()
|
||
registry.register("FAKE_ODS", _FakeTask, layer="ODS")
|
||
|
||
config = _make_config()
|
||
|
||
runner = PipelineRunner(
|
||
config=config,
|
||
task_executor=executor,
|
||
task_registry=registry,
|
||
db_conn=MagicMock(),
|
||
api_client=MagicMock(),
|
||
logger=MagicMock(),
|
||
)
|
||
|
||
result = runner.run(
|
||
pipeline="api_ods",
|
||
processing_mode="increment_only",
|
||
data_source="hybrid",
|
||
)
|
||
|
||
# 结构验证
|
||
assert result["status"] == "SUCCESS"
|
||
assert result["pipeline"] == "api_ods"
|
||
assert result["layers"] == ["ODS"]
|
||
assert isinstance(result["results"], list)
|
||
# TaskExecutor 被调用
|
||
executor.run_tasks.assert_called_once()
|
||
call_args = executor.run_tasks.call_args
|
||
assert call_args[1]["data_source"] == "hybrid"
|
||
|
||
def test_pipeline_verify_only_skips_increment(self):
|
||
"""verify_only 模式跳过增量 ETL,仅执行校验。"""
|
||
executor = MagicMock()
|
||
executor.run_tasks.return_value = []
|
||
|
||
registry = TaskRegistry()
|
||
config = _make_config()
|
||
|
||
runner = PipelineRunner(
|
||
config=config,
|
||
task_executor=executor,
|
||
task_registry=registry,
|
||
db_conn=MagicMock(),
|
||
api_client=MagicMock(),
|
||
logger=MagicMock(),
|
||
)
|
||
|
||
# 校验框架可能未安装,mock 掉 _run_verification
|
||
with patch.object(runner, "_run_verification", return_value={"status": "COMPLETED"}):
|
||
result = runner.run(
|
||
pipeline="api_ods",
|
||
processing_mode="verify_only",
|
||
data_source="hybrid",
|
||
)
|
||
|
||
assert result["status"] == "SUCCESS"
|
||
# verify_only 且 fetch_before_verify=False 时不调用 run_tasks
|
||
executor.run_tasks.assert_not_called()
|
||
|
||
|
||
# ===========================================================================
|
||
# 测试 3:ETLScheduler 薄包装层委托验证
|
||
# ===========================================================================
|
||
class TestSchedulerThinWrapper:
|
||
"""ETLScheduler 薄包装层正确委托 TaskExecutor / PipelineRunner。"""
|
||
|
||
def test_scheduler_delegates_run_tasks(self):
|
||
"""run_tasks() 委托给内部 task_executor。"""
|
||
from orchestration.scheduler import ETLScheduler
|
||
|
||
mock_config = MagicMock()
|
||
mock_config.__getitem__ = MagicMock(side_effect=lambda k: {
|
||
"db": {
|
||
"dsn": "postgresql://fake:5432/test",
|
||
"session": {"timezone": "Asia/Shanghai"},
|
||
"connect_timeout_sec": 5,
|
||
},
|
||
"api": {
|
||
"base_url": "https://fake.api",
|
||
"token": "fake-token",
|
||
"timeout_sec": 30,
|
||
"retries": {"max_attempts": 3},
|
||
},
|
||
}[k])
|
||
mock_config.get = MagicMock(side_effect=lambda k, d=None: {
|
||
"run.data_source": "hybrid",
|
||
"run.tasks": ["FAKE"],
|
||
"app.timezone": "Asia/Shanghai",
|
||
}.get(k, d))
|
||
|
||
# mock 掉资源创建,避免真实连接
|
||
with patch("orchestration.scheduler.DatabaseConnection"), \
|
||
patch("orchestration.scheduler.DatabaseOperations"), \
|
||
patch("orchestration.scheduler.APIClient"), \
|
||
patch("orchestration.scheduler.CursorManager"), \
|
||
patch("orchestration.scheduler.RunTracker"), \
|
||
patch("orchestration.scheduler.TaskExecutor") as MockTE, \
|
||
patch("orchestration.scheduler.PipelineRunner") as MockPR:
|
||
|
||
import warnings
|
||
with warnings.catch_warnings():
|
||
warnings.simplefilter("ignore", DeprecationWarning)
|
||
scheduler = ETLScheduler(mock_config, MagicMock())
|
||
|
||
# run_tasks 委托
|
||
scheduler.run_tasks(["TEST_TASK"])
|
||
scheduler.task_executor.run_tasks.assert_called_once()
|
||
|
||
# run_pipeline_with_verification 委托
|
||
scheduler.run_pipeline_with_verification(pipeline="api_ods")
|
||
scheduler.pipeline_runner.run.assert_called_once()
|