# -*- coding: utf-8 -*- """TaskExecutor 属性测试 - hypothesis 验证执行器的通用正确性属性。""" import re import string from datetime import datetime, timedelta from pathlib import Path from unittest.mock import MagicMock, patch import pytest from hypothesis import given, settings from hypothesis import strategies as st from orchestration.task_executor import TaskExecutor, DataSource from orchestration.task_registry import TaskRegistry FILE_VERSION = "v4_shell" data_source_st = st.sampled_from(["online", "offline", "hybrid"]) _NON_ODS_PREFIXES = ["DWD_", "DWS_", "TASK_", "ETL_", "TEST_"] task_code_st = st.builds( lambda prefix, suffix: prefix + suffix, prefix=st.sampled_from(_NON_ODS_PREFIXES), suffix=st.text( alphabet=string.ascii_uppercase + string.digits + "_", min_size=1, max_size=15, ), ) window_start_st = st.datetimes(min_value=datetime(2020, 1, 1), max_value=datetime(2030, 12, 31)) def _make_fake_class(name="FakeTask"): return type(name, (), {"__init__": lambda self, *a, **kw: None}) def _make_config(): config = MagicMock() config.get = MagicMock(side_effect=lambda key, default=None: { "app.timezone": "Asia/Shanghai", "io.fetch_root": "/tmp/fetch", "io.ingest_source_dir": "/tmp/ingest", "io.write_pretty_json": False, "pipeline.fetch_root": None, "pipeline.ingest_source_dir": None, "integrity.auto_check": False, "run.overlap_seconds": 600, }.get(key, default)) config.__getitem__ = MagicMock(side_effect=lambda k: { "io": {"export_root": "/tmp/export", "log_root": "/tmp/log"}, }[k]) return config def _make_executor(registry): return TaskExecutor( config=_make_config(), db_ops=MagicMock(), api_client=MagicMock(), cursor_mgr=MagicMock(), run_tracker=MagicMock(), task_registry=registry, logger=MagicMock(), ) # Feature: scheduler-refactor, Property 1: data_source 参数决定执行路径 # **Validates: Requirements 1.2** class TestProperty1DataSourceDeterminesPath: @given(ds=data_source_st) @settings(max_examples=100) def test_flow_includes_fetch(self, ds): result = TaskExecutor._flow_includes_fetch(ds) assert result == (ds in {"online", "hybrid"}) @given(ds=data_source_st) @settings(max_examples=100) def test_flow_includes_ingest(self, ds): result = TaskExecutor._flow_includes_ingest(ds) assert result == (ds in {"offline", "hybrid"}) @given(ds=data_source_st) @settings(max_examples=100) def test_fetch_and_ingest_consistency(self, ds): fetch = TaskExecutor._flow_includes_fetch(ds) ingest = TaskExecutor._flow_includes_ingest(ds) if ds == "hybrid": assert fetch and ingest elif ds == "online": assert fetch and not ingest elif ds == "offline": assert not fetch and ingest # Feature: scheduler-refactor, Property 2: 成功任务推进游标 # **Validates: Requirements 1.3** class TestProperty2SuccessAdvancesCursor: @given( task_code=task_code_st, window_start=window_start_st, window_minutes=st.integers(min_value=1, max_value=1440), ) @settings(max_examples=100) def test_success_with_window_advances_cursor(self, task_code, window_start, window_minutes): window_end = window_start + timedelta(minutes=window_minutes) registry = TaskRegistry() registry.register(task_code, _make_fake_class(), requires_db_config=True, layer="DWD") task_result = { "status": "SUCCESS", "counts": {"fetched": 10, "inserted": 5}, "window": {"start": window_start, "end": window_end, "minutes": window_minutes}, } executor = _make_executor(registry) executor.cursor_mgr.get_or_create.return_value = {"cursor_id": 1, "last_end": None} executor.run_tracker.create_run.return_value = 100 with ( patch.object(TaskExecutor, "_load_task_config", return_value={ "task_id": 42, "task_code": task_code, "store_id": 1, "enabled": True}), patch.object(TaskExecutor, "_resolve_ingest_source", return_value=Path("/tmp/src")), patch.object(TaskExecutor, "_execute_ingest", return_value=task_result), patch.object(TaskExecutor, "_maybe_run_integrity_check"), ): executor.run_single_task(task_code, "test-uuid", 1, "offline") executor.cursor_mgr.advance.assert_called_once() kw = executor.cursor_mgr.advance.call_args.kwargs assert kw["window_start"] == window_start assert kw["window_end"] == window_end # Feature: scheduler-refactor, Property 3: 失败任务标记 FAIL 并重新抛出 # **Validates: Requirements 1.4** class TestProperty3FailureMarksFailAndReraises: @given( task_code=task_code_st, error_msg=st.text( alphabet=string.ascii_letters + string.digits + " _-", min_size=1, max_size=80, ), ) @settings(max_examples=100) def test_exception_marks_fail_and_reraises(self, task_code, error_msg): registry = TaskRegistry() registry.register(task_code, _make_fake_class(), requires_db_config=True, layer="DWD") executor = _make_executor(registry) executor.cursor_mgr.get_or_create.return_value = {"cursor_id": 1, "last_end": None} executor.run_tracker.create_run.return_value = 200 with ( patch.object(TaskExecutor, "_load_task_config", return_value={ "task_id": 99, "task_code": task_code, "store_id": 1, "enabled": True}), patch.object(TaskExecutor, "_resolve_ingest_source", return_value=Path("/tmp/src")), patch.object(TaskExecutor, "_execute_ingest", side_effect=RuntimeError(error_msg)), ): with pytest.raises(RuntimeError, match=re.escape(error_msg)): executor.run_single_task(task_code, "fail-uuid", 1, "offline") executor.run_tracker.update_run.assert_called() kw = executor.run_tracker.update_run.call_args.kwargs assert kw["status"] == "FAIL" # Feature: scheduler-refactor, Property 4: 工具类任务由元数据决定 # **Validates: Requirements 1.6, 4.2** class TestProperty4UtilityTaskDeterminedByMetadata: @given(task_code=task_code_st) @settings(max_examples=100) def test_utility_task_skips_cursor_and_run_tracker(self, task_code): registry = TaskRegistry() registry.register(task_code, _make_fake_class(), requires_db_config=False, task_type="utility") executor = _make_executor(registry) mock_task = MagicMock() mock_task.execute.return_value = {"status": "SUCCESS", "counts": {}} registry.create_task = MagicMock(return_value=mock_task) result = executor.run_single_task(task_code, "util-uuid", 1, "hybrid") executor.cursor_mgr.get_or_create.assert_not_called() executor.cursor_mgr.advance.assert_not_called() executor.run_tracker.create_run.assert_not_called() assert result.get("status") == "SUCCESS" @given(task_code=task_code_st) @settings(max_examples=100) def test_non_utility_task_uses_cursor_and_run_tracker(self, task_code): registry = TaskRegistry() registry.register(task_code, _make_fake_class(), requires_db_config=True, layer="DWS") task_result = {"status": "SUCCESS", "counts": {"fetched": 1}} executor = _make_executor(registry) executor.cursor_mgr.get_or_create.return_value = {"cursor_id": 1, "last_end": None} executor.run_tracker.create_run.return_value = 300 with ( patch.object(TaskExecutor, "_load_task_config", return_value={ "task_id": 77, "task_code": task_code, "store_id": 1, "enabled": True}), patch.object(TaskExecutor, "_resolve_ingest_source", return_value=Path("/tmp/src")), patch.object(TaskExecutor, "_execute_ingest", return_value=task_result), ): executor.run_single_task(task_code, "non-util-uuid", 1, "offline") executor.cursor_mgr.get_or_create.assert_called_once() executor.run_tracker.create_run.assert_called_once()