208 lines
8.1 KiB
Python
208 lines
8.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""TaskExecutor 属性测试 - hypothesis 验证执行器的通用正确性属性。"""
|
|
import re
|
|
import string
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from hypothesis import given, settings
|
|
from hypothesis import strategies as st
|
|
|
|
from orchestration.task_executor import TaskExecutor, DataSource
|
|
from orchestration.task_registry import TaskRegistry
|
|
|
|
FILE_VERSION = "v4_shell"
|
|
|
|
data_source_st = st.sampled_from(["online", "offline", "hybrid"])
|
|
|
|
_NON_ODS_PREFIXES = ["DWD_", "DWS_", "TASK_", "ETL_", "TEST_"]
|
|
task_code_st = st.builds(
|
|
lambda prefix, suffix: prefix + suffix,
|
|
prefix=st.sampled_from(_NON_ODS_PREFIXES),
|
|
suffix=st.text(
|
|
alphabet=string.ascii_uppercase + string.digits + "_",
|
|
min_size=1, max_size=15,
|
|
),
|
|
)
|
|
|
|
window_start_st = st.datetimes(min_value=datetime(2020, 1, 1), max_value=datetime(2030, 12, 31))
|
|
|
|
|
|
def _make_fake_class(name="FakeTask"):
|
|
return type(name, (), {"__init__": lambda self, *a, **kw: None})
|
|
|
|
|
|
def _make_config():
|
|
config = MagicMock()
|
|
config.get = MagicMock(side_effect=lambda key, default=None: {
|
|
"app.timezone": "Asia/Shanghai",
|
|
"io.fetch_root": "/tmp/fetch",
|
|
"io.ingest_source_dir": "/tmp/ingest",
|
|
"io.write_pretty_json": False,
|
|
"pipeline.fetch_root": None,
|
|
"pipeline.ingest_source_dir": None,
|
|
"integrity.auto_check": False,
|
|
"run.overlap_seconds": 600,
|
|
}.get(key, default))
|
|
config.__getitem__ = MagicMock(side_effect=lambda k: {
|
|
"io": {"export_root": "/tmp/export", "log_root": "/tmp/log"},
|
|
}[k])
|
|
return config
|
|
|
|
|
|
def _make_executor(registry):
|
|
return TaskExecutor(
|
|
config=_make_config(), db_ops=MagicMock(), api_client=MagicMock(),
|
|
cursor_mgr=MagicMock(), run_tracker=MagicMock(),
|
|
task_registry=registry, logger=MagicMock(),
|
|
)
|
|
|
|
|
|
# Feature: scheduler-refactor, Property 1: data_source 参数决定执行路径
|
|
# **Validates: Requirements 1.2**
|
|
|
|
class TestProperty1DataSourceDeterminesPath:
|
|
|
|
@given(ds=data_source_st)
|
|
@settings(max_examples=100)
|
|
def test_flow_includes_fetch(self, ds):
|
|
result = TaskExecutor._flow_includes_fetch(ds)
|
|
assert result == (ds in {"online", "hybrid"})
|
|
|
|
@given(ds=data_source_st)
|
|
@settings(max_examples=100)
|
|
def test_flow_includes_ingest(self, ds):
|
|
result = TaskExecutor._flow_includes_ingest(ds)
|
|
assert result == (ds in {"offline", "hybrid"})
|
|
|
|
@given(ds=data_source_st)
|
|
@settings(max_examples=100)
|
|
def test_fetch_and_ingest_consistency(self, ds):
|
|
fetch = TaskExecutor._flow_includes_fetch(ds)
|
|
ingest = TaskExecutor._flow_includes_ingest(ds)
|
|
if ds == "hybrid":
|
|
assert fetch and ingest
|
|
elif ds == "online":
|
|
assert fetch and not ingest
|
|
elif ds == "offline":
|
|
assert not fetch and ingest
|
|
|
|
|
|
# Feature: scheduler-refactor, Property 2: 成功任务推进游标
|
|
# **Validates: Requirements 1.3**
|
|
|
|
class TestProperty2SuccessAdvancesCursor:
|
|
|
|
@given(
|
|
task_code=task_code_st,
|
|
window_start=window_start_st,
|
|
window_minutes=st.integers(min_value=1, max_value=1440),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_success_with_window_advances_cursor(self, task_code, window_start, window_minutes):
|
|
window_end = window_start + timedelta(minutes=window_minutes)
|
|
registry = TaskRegistry()
|
|
registry.register(task_code, _make_fake_class(), requires_db_config=True, layer="DWD")
|
|
task_result = {
|
|
"status": "SUCCESS",
|
|
"counts": {"fetched": 10, "inserted": 5},
|
|
"window": {"start": window_start, "end": window_end, "minutes": window_minutes},
|
|
}
|
|
executor = _make_executor(registry)
|
|
executor.cursor_mgr.get_or_create.return_value = {"cursor_id": 1, "last_end": None}
|
|
executor.run_tracker.create_run.return_value = 100
|
|
|
|
with (
|
|
patch.object(TaskExecutor, "_load_task_config", return_value={
|
|
"task_id": 42, "task_code": task_code, "store_id": 1, "enabled": True}),
|
|
patch.object(TaskExecutor, "_resolve_ingest_source", return_value=Path("/tmp/src")),
|
|
patch.object(TaskExecutor, "_execute_ingest", return_value=task_result),
|
|
patch.object(TaskExecutor, "_maybe_run_integrity_check"),
|
|
):
|
|
executor.run_single_task(task_code, "test-uuid", 1, "offline")
|
|
|
|
executor.cursor_mgr.advance.assert_called_once()
|
|
kw = executor.cursor_mgr.advance.call_args.kwargs
|
|
assert kw["window_start"] == window_start
|
|
assert kw["window_end"] == window_end
|
|
|
|
|
|
# Feature: scheduler-refactor, Property 3: 失败任务标记 FAIL 并重新抛出
|
|
# **Validates: Requirements 1.4**
|
|
|
|
class TestProperty3FailureMarksFailAndReraises:
|
|
|
|
@given(
|
|
task_code=task_code_st,
|
|
error_msg=st.text(
|
|
alphabet=string.ascii_letters + string.digits + " _-",
|
|
min_size=1, max_size=80,
|
|
),
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_exception_marks_fail_and_reraises(self, task_code, error_msg):
|
|
registry = TaskRegistry()
|
|
registry.register(task_code, _make_fake_class(), requires_db_config=True, layer="DWD")
|
|
executor = _make_executor(registry)
|
|
executor.cursor_mgr.get_or_create.return_value = {"cursor_id": 1, "last_end": None}
|
|
executor.run_tracker.create_run.return_value = 200
|
|
|
|
with (
|
|
patch.object(TaskExecutor, "_load_task_config", return_value={
|
|
"task_id": 99, "task_code": task_code, "store_id": 1, "enabled": True}),
|
|
patch.object(TaskExecutor, "_resolve_ingest_source", return_value=Path("/tmp/src")),
|
|
patch.object(TaskExecutor, "_execute_ingest", side_effect=RuntimeError(error_msg)),
|
|
):
|
|
with pytest.raises(RuntimeError, match=re.escape(error_msg)):
|
|
executor.run_single_task(task_code, "fail-uuid", 1, "offline")
|
|
|
|
executor.run_tracker.update_run.assert_called()
|
|
kw = executor.run_tracker.update_run.call_args.kwargs
|
|
assert kw["status"] == "FAIL"
|
|
|
|
|
|
# Feature: scheduler-refactor, Property 4: 工具类任务由元数据决定
|
|
# **Validates: Requirements 1.6, 4.2**
|
|
|
|
class TestProperty4UtilityTaskDeterminedByMetadata:
|
|
|
|
@given(task_code=task_code_st)
|
|
@settings(max_examples=100)
|
|
def test_utility_task_skips_cursor_and_run_tracker(self, task_code):
|
|
registry = TaskRegistry()
|
|
registry.register(task_code, _make_fake_class(), requires_db_config=False, task_type="utility")
|
|
executor = _make_executor(registry)
|
|
mock_task = MagicMock()
|
|
mock_task.execute.return_value = {"status": "SUCCESS", "counts": {}}
|
|
registry.create_task = MagicMock(return_value=mock_task)
|
|
|
|
result = executor.run_single_task(task_code, "util-uuid", 1, "hybrid")
|
|
|
|
executor.cursor_mgr.get_or_create.assert_not_called()
|
|
executor.cursor_mgr.advance.assert_not_called()
|
|
executor.run_tracker.create_run.assert_not_called()
|
|
assert result.get("status") == "SUCCESS"
|
|
|
|
@given(task_code=task_code_st)
|
|
@settings(max_examples=100)
|
|
def test_non_utility_task_uses_cursor_and_run_tracker(self, task_code):
|
|
registry = TaskRegistry()
|
|
registry.register(task_code, _make_fake_class(), requires_db_config=True, layer="DWS")
|
|
task_result = {"status": "SUCCESS", "counts": {"fetched": 1}}
|
|
executor = _make_executor(registry)
|
|
executor.cursor_mgr.get_or_create.return_value = {"cursor_id": 1, "last_end": None}
|
|
executor.run_tracker.create_run.return_value = 300
|
|
|
|
with (
|
|
patch.object(TaskExecutor, "_load_task_config", return_value={
|
|
"task_id": 77, "task_code": task_code, "store_id": 1, "enabled": True}),
|
|
patch.object(TaskExecutor, "_resolve_ingest_source", return_value=Path("/tmp/src")),
|
|
patch.object(TaskExecutor, "_execute_ingest", return_value=task_result),
|
|
):
|
|
executor.run_single_task(task_code, "non-util-uuid", 1, "offline")
|
|
|
|
executor.cursor_mgr.get_or_create.assert_called_once()
|
|
executor.run_tracker.create_run.assert_called_once()
|