# -*- coding: utf-8 -*- """端到端流程集成测试 验证 CLI → PipelineRunner → TaskExecutor 完整调用链。 使用 mock 依赖,不需要真实数据库。 需求: 9.4 """ from unittest.mock import MagicMock, patch, PropertyMock import pytest from orchestration.task_executor import TaskExecutor, DataSource from orchestration.pipeline_runner import PipelineRunner from orchestration.task_registry import TaskRegistry # --------------------------------------------------------------------------- # 辅助:构造最小可用的 mock config # --------------------------------------------------------------------------- def _make_config(**overrides): """构造一个行为类似 AppConfig 的 MagicMock。""" store = { "app.timezone": "Asia/Shanghai", "app.store_id": 1, "io.fetch_root": "/tmp/fetch", "io.ingest_source_dir": "", "io.write_pretty_json": False, "io.export_root": "/tmp/export", "io.log_root": "/tmp/logs", "pipeline.fetch_root": None, "pipeline.ingest_source_dir": None, "run.ods_tasks": [], "run.dws_tasks": [], "run.index_tasks": [], "run.data_source": "hybrid", "verification.ods_use_local_json": False, "verification.skip_ods_when_fetch_before_verify": True, } store.update(overrides) config = MagicMock() config.get = MagicMock(side_effect=lambda k, d=None: store.get(k, d)) config.__getitem__ = MagicMock(side_effect=lambda k: { "io": {"export_root": "/tmp/export", "log_root": "/tmp/logs"}, }[k]) return config # --------------------------------------------------------------------------- # 辅助:构造一个可被 TaskRegistry 注册的假任务类 # --------------------------------------------------------------------------- class _FakeTask: """最小假任务,execute() 返回固定结果。""" def __init__(self, config, db_ops, api_client, logger): pass def execute(self, cursor_data): return {"status": "SUCCESS", "counts": {"fetched": 5, "inserted": 3}} # =========================================================================== # 测试 1:传统模式 — TaskExecutor.run_tasks 端到端 # =========================================================================== class TestTraditionalModeE2E: """传统模式:TaskExecutor.run_tasks 端到端""" def test_run_tasks_executes_utility_task_and_returns_results(self): """工具类任务走 _run_utility_task 路径,跳过游标和运行记录。""" config = _make_config() registry = TaskRegistry() registry.register( "FAKE_UTIL", _FakeTask, requires_db_config=False, task_type="utility", ) cursor_mgr = MagicMock() run_tracker = MagicMock() executor = TaskExecutor( config=config, db_ops=MagicMock(), api_client=MagicMock(), cursor_mgr=cursor_mgr, run_tracker=run_tracker, task_registry=registry, logger=MagicMock(), ) results = executor.run_tasks(["FAKE_UTIL"], data_source="hybrid") assert len(results) == 1 # 工具类任务成功时 run_tasks 包装为 "成功" assert results[0]["status"] in ("成功", "完成", "SUCCESS") # 工具类任务不应触发游标或运行记录 cursor_mgr.get_or_create.assert_not_called() run_tracker.create_run.assert_not_called() # =========================================================================== # 测试 2:管道模式 — PipelineRunner → TaskExecutor 端到端 # =========================================================================== class TestPipelineModeE2E: """管道模式:PipelineRunner.run → TaskExecutor.run_tasks 端到端""" def test_pipeline_delegates_to_executor_and_returns_structure(self): """PipelineRunner 解析层→任务后委托 TaskExecutor 执行。""" executor = MagicMock() executor.run_tasks.return_value = [ {"task_code": "FAKE_ODS", "status": "成功", "counts": {"fetched": 10, "inserted": 8}}, ] registry = TaskRegistry() registry.register("FAKE_ODS", _FakeTask, layer="ODS") config = _make_config() runner = PipelineRunner( config=config, task_executor=executor, task_registry=registry, db_conn=MagicMock(), api_client=MagicMock(), logger=MagicMock(), ) result = runner.run( pipeline="api_ods", processing_mode="increment_only", data_source="hybrid", ) # 结构验证 assert result["status"] == "SUCCESS" assert result["pipeline"] == "api_ods" assert result["layers"] == ["ODS"] assert isinstance(result["results"], list) # TaskExecutor 被调用 executor.run_tasks.assert_called_once() call_args = executor.run_tasks.call_args assert call_args[1]["data_source"] == "hybrid" def test_pipeline_verify_only_skips_increment(self): """verify_only 模式跳过增量 ETL,仅执行校验。""" executor = MagicMock() executor.run_tasks.return_value = [] registry = TaskRegistry() config = _make_config() runner = PipelineRunner( config=config, task_executor=executor, task_registry=registry, db_conn=MagicMock(), api_client=MagicMock(), logger=MagicMock(), ) # 校验框架可能未安装,mock 掉 _run_verification with patch.object(runner, "_run_verification", return_value={"status": "COMPLETED"}): result = runner.run( pipeline="api_ods", processing_mode="verify_only", data_source="hybrid", ) assert result["status"] == "SUCCESS" # verify_only 且 fetch_before_verify=False 时不调用 run_tasks executor.run_tasks.assert_not_called() # =========================================================================== # 测试 3:ETLScheduler 薄包装层委托验证 # =========================================================================== class TestSchedulerThinWrapper: """ETLScheduler 薄包装层正确委托 TaskExecutor / PipelineRunner。""" def test_scheduler_delegates_run_tasks(self): """run_tasks() 委托给内部 task_executor。""" from orchestration.scheduler import ETLScheduler mock_config = MagicMock() mock_config.__getitem__ = MagicMock(side_effect=lambda k: { "db": { "dsn": "postgresql://fake:5432/test", "session": {"timezone": "Asia/Shanghai"}, "connect_timeout_sec": 5, }, "api": { "base_url": "https://fake.api", "token": "fake-token", "timeout_sec": 30, "retries": {"max_attempts": 3}, }, }[k]) mock_config.get = MagicMock(side_effect=lambda k, d=None: { "run.data_source": "hybrid", "run.tasks": ["FAKE"], "app.timezone": "Asia/Shanghai", }.get(k, d)) # mock 掉资源创建,避免真实连接 with patch("orchestration.scheduler.DatabaseConnection"), \ patch("orchestration.scheduler.DatabaseOperations"), \ patch("orchestration.scheduler.APIClient"), \ patch("orchestration.scheduler.CursorManager"), \ patch("orchestration.scheduler.RunTracker"), \ patch("orchestration.scheduler.TaskExecutor") as MockTE, \ patch("orchestration.scheduler.PipelineRunner") as MockPR: import warnings with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) scheduler = ETLScheduler(mock_config, MagicMock()) # run_tasks 委托 scheduler.run_tasks(["TEST_TASK"]) scheduler.task_executor.run_tasks.assert_called_once() # run_pipeline_with_verification 委托 scheduler.run_pipeline_with_verification(pipeline="api_ods") scheduler.pipeline_runner.run.assert_called_once()