Files
Neo-ZQYY/tests/test_etl_refactor_properties.py

326 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""Monorepo 级属性测试 — ETL DWS/Flow 重构
集中存放跨模块的属性测试,验证重构后的正确性属性。
"""
import sys
import string
from pathlib import Path
import pytest
from hypothesis import given, settings, assume
from hypothesis import strategies as st
# 将 ETL feiqiu 模块加入 sys.path以便直接导入
_ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu"
if str(_ETL_ROOT) not in sys.path:
sys.path.insert(0, str(_ETL_ROOT))
from cli.main import parse_layers, VALID_LAYERS # noqa: E402
# ---------------------------------------------------------------------------
# Property 5: --layers 解析正确性
# Feature: etl-dws-flow-refactor, Property 5: --layers 解析正确性
# Validates: Requirements 6.1, 6.2
# ---------------------------------------------------------------------------
_ALL_LAYERS = sorted(VALID_LAYERS)
_layer_subsets = st.lists(
st.sampled_from(_ALL_LAYERS),
min_size=1,
max_size=4,
unique=True,
)
_invalid_layer = st.text(
alphabet=string.ascii_letters,
min_size=1,
max_size=10,
).filter(lambda s: s.upper() not in VALID_LAYERS)
class TestParseLayersProperty:
"""Property 5: --layers 解析正确性
# Feature: etl-dws-flow-refactor, Property 5: --layers 解析正确性
"""
@given(subset=_layer_subsets)
@settings(max_examples=100)
def test_valid_subset_parsed_correctly(self, subset: list[str]):
"""任意合法层子集经逗号拼接后parse_layers 返回恰好该子集(大写)。
**Validates: Requirements 6.1, 6.2**
"""
raw = ",".join(subset)
result = parse_layers(raw)
assert set(result) == set(subset)
assert all(l == l.upper() for l in result)
assert len(result) == len(subset)
@given(
valid=_layer_subsets,
bad=_invalid_layer,
)
@settings(max_examples=100)
def test_invalid_layer_raises(self, valid: list[str], bad: str):
"""包含无效层名时必须抛出 ValueError。
**Validates: Requirements 6.1, 6.2**
"""
assume(bad.upper() not in VALID_LAYERS)
raw = ",".join(valid + [bad])
with pytest.raises(ValueError, match="无效的层名"):
parse_layers(raw)
from orchestration.flow_runner import FlowRunner # noqa: E402
# ---------------------------------------------------------------------------
# Property 6: 配置优先级——配置值优先于 Registry
# Feature: etl-dws-flow-refactor, Property 6: 配置优先级
# Validates: Requirements 7.2
# ---------------------------------------------------------------------------
_LAYER_CONFIG_KEY = {
"ODS": "run.ods_tasks",
"DWD": "run.dwd_tasks",
"DWS": "run.dws_tasks",
"INDEX": "run.index_tasks",
}
class _DictConfig:
"""模拟 AppConfig.get(),支持点号路径查找。"""
def __init__(self, data: dict):
self._data = data
def get(self, key: str, default=None):
keys = key.split(".")
node = self._data
for k in keys:
if isinstance(node, dict) and k in node:
node = node[k]
else:
return default
return node
def _make_runner(config_data: dict, registry_by_layer: dict) -> FlowRunner:
"""构造最小 FlowRunner 实例,仅满足 _resolve_tasks 所需。"""
import logging
from unittest.mock import MagicMock
config = _DictConfig(config_data)
registry = MagicMock()
registry.get_tasks_by_layer = MagicMock(
side_effect=lambda layer: registry_by_layer.get(layer.upper(), [])
)
runner = object.__new__(FlowRunner)
runner.config = config
runner.task_registry = registry
runner.logger = logging.getLogger("test_prop6")
return runner
_task_code_p6 = st.from_regex(r"[A-Z][A-Z0-9_]{2,20}", fullmatch=True)
_task_list_p6 = st.lists(_task_code_p6, min_size=1, max_size=8, unique=True)
_layer_p6 = st.sampled_from(["ODS", "DWD", "DWS", "INDEX"])
class TestConfigPriorityProperty:
"""Property 6: 配置优先级——配置值优先于 Registry
# Feature: etl-dws-flow-refactor, Property 6: 配置优先级
"""
@given(layer=_layer_p6, config_tasks=_task_list_p6, registry_tasks=_task_list_p6)
@settings(max_examples=100)
def test_config_overrides_registry(
self, layer: str, config_tasks: list[str], registry_tasks: list[str]
):
"""任意层名 + 非空配置任务列表 → 返回配置值,忽略 Registry。
**Validates: Requirements 7.2**
"""
assume(set(config_tasks) != set(registry_tasks))
key = _LAYER_CONFIG_KEY[layer]
parts = key.split(".")
config_data = {parts[0]: {parts[1]: config_tasks}}
runner = _make_runner(config_data, {layer: registry_tasks})
result = runner._resolve_tasks([layer])
assert result == config_tasks
@given(layer=_layer_p6, registry_tasks=_task_list_p6)
@settings(max_examples=100)
def test_empty_config_falls_back_to_registry(
self, layer: str, registry_tasks: list[str]
):
"""配置为空时回退到 Registry 结果。
**Validates: Requirements 7.2**
"""
runner = _make_runner({}, {layer: registry_tasks})
result = runner._resolve_tasks([layer])
assert result == registry_tasks
from orchestration.topological_sort import topological_sort # noqa: E402
# ---------------------------------------------------------------------------
# Property 7 & 8 辅助:轻量 TaskMeta 替身 + 简易 Registry
# ---------------------------------------------------------------------------
from dataclasses import dataclass, field as dc_field
@dataclass
class _FakeMeta:
depends_on: list[str] = dc_field(default_factory=list)
class _FakeRegistry:
"""最小 Registry 替身,仅提供 get_metadata()。"""
def __init__(self, deps: dict[str, list[str]]):
self._deps = deps
def get_metadata(self, code: str):
if code in self._deps:
return _FakeMeta(depends_on=self._deps[code])
return _FakeMeta()
# ---------------------------------------------------------------------------
# Hypothesis 策略DAG 生成 & 含环图生成
# ---------------------------------------------------------------------------
@st.composite
def dag_strategy(draw):
"""生成随机 DAG(task_codes, deps_dict)。
通过仅允许依赖索引更小的任务来保证无环。
"""
n = draw(st.integers(min_value=1, max_value=10))
codes = [f"TASK_{i}" for i in range(n)]
deps: dict[str, list[str]] = {}
for i, code in enumerate(codes):
if i > 0:
possible = codes[:i]
chosen = draw(
st.lists(
st.sampled_from(possible),
max_size=min(3, i),
unique=True,
)
)
deps[code] = chosen
else:
deps[code] = []
return codes, deps
@st.composite
def cyclic_graph_strategy(draw):
"""生成一定包含至少一个环的有向图:(task_codes, deps_dict, cycle_tasks)。"""
n = draw(st.integers(min_value=2, max_value=8))
codes = [f"TASK_{i}" for i in range(n)]
deps: dict[str, list[str]] = {code: [] for code in codes}
# 选取 2-4 个任务构成环
cycle_size = draw(st.integers(min_value=2, max_value=min(4, n)))
cycle_tasks = draw(
st.lists(
st.sampled_from(codes),
min_size=cycle_size,
max_size=cycle_size,
unique=True,
)
)
for i in range(len(cycle_tasks)):
next_task = cycle_tasks[(i + 1) % len(cycle_tasks)]
if next_task not in deps[cycle_tasks[i]]:
deps[cycle_tasks[i]].append(next_task)
return codes, deps, cycle_tasks
# ---------------------------------------------------------------------------
# Property 7: 拓扑排序正确性
# Feature: etl-dws-flow-refactor, Property 7: 拓扑排序正确性
# Validates: Requirements 8.3
# ---------------------------------------------------------------------------
class TestTopologicalSortProperty:
"""Property 7: 拓扑排序正确性
# Feature: etl-dws-flow-refactor, Property 7: 拓扑排序正确性
"""
@given(data=dag_strategy())
@settings(max_examples=100)
def test_dependencies_precede_dependents(self, data):
"""对于任意 DAG排序结果中每个任务的依赖都排在它之前。
**Validates: Requirements 8.3**
"""
codes, deps = data
registry = _FakeRegistry(deps)
result = topological_sort(codes, registry)
# 结果包含且仅包含输入任务(无丢失、无重复)
assert set(result) == set(codes)
assert len(result) == len(codes)
# 每个任务的依赖(在列表内的)都排在它之前
pos = {code: idx for idx, code in enumerate(result)}
for code in codes:
for dep in deps.get(code, []):
if dep in pos:
assert pos[dep] < pos[code], (
f"依赖 {dep} 应排在 {code} 之前,"
f"但位置为 {pos[dep]} vs {pos[code]}"
)
# ---------------------------------------------------------------------------
# Property 8: 循环依赖检测
# Feature: etl-dws-flow-refactor, Property 8: 循环依赖检测
# Validates: Requirements 8.4
# ---------------------------------------------------------------------------
class TestCyclicDependencyProperty:
"""Property 8: 循环依赖检测
# Feature: etl-dws-flow-refactor, Property 8: 循环依赖检测
"""
@given(data=cyclic_graph_strategy())
@settings(max_examples=100)
def test_cycle_raises_value_error(self, data):
"""对于任意含环有向图topological_sort 应抛出 ValueError。
**Validates: Requirements 8.4**
"""
codes, deps, cycle_tasks = data
registry = _FakeRegistry(deps)
with pytest.raises(ValueError, match="循环依赖"):
topological_sort(codes, registry)
@given(data=cyclic_graph_strategy())
@settings(max_examples=100)
def test_cycle_error_mentions_involved_tasks(self, data):
"""错误信息中应包含环中涉及的至少一个任务代码。
**Validates: Requirements 8.4**
"""
codes, deps, cycle_tasks = data
registry = _FakeRegistry(deps)
with pytest.raises(ValueError) as exc_info:
topological_sort(codes, registry)
msg = str(exc_info.value)
# 至少一个环中的任务出现在错误信息中
assert any(t in msg for t in cycle_tasks), (
f"错误信息 '{msg}' 中未包含环任务 {cycle_tasks} 中的任何一个"
)