# -*- coding: utf-8 -*- """Monorepo 级属性测试 — ETL DWS/Flow 重构 集中存放跨模块的属性测试,验证重构后的正确性属性。 """ import sys import string from pathlib import Path import pytest from hypothesis import given, settings, assume from hypothesis import strategies as st # 将 ETL feiqiu 模块加入 sys.path,以便直接导入 _ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu" if str(_ETL_ROOT) not in sys.path: sys.path.insert(0, str(_ETL_ROOT)) from cli.main import parse_layers, VALID_LAYERS # noqa: E402 # --------------------------------------------------------------------------- # Property 5: --layers 解析正确性 # Feature: etl-dws-flow-refactor, Property 5: --layers 解析正确性 # Validates: Requirements 6.1, 6.2 # --------------------------------------------------------------------------- _ALL_LAYERS = sorted(VALID_LAYERS) _layer_subsets = st.lists( st.sampled_from(_ALL_LAYERS), min_size=1, max_size=4, unique=True, ) _invalid_layer = st.text( alphabet=string.ascii_letters, min_size=1, max_size=10, ).filter(lambda s: s.upper() not in VALID_LAYERS) class TestParseLayersProperty: """Property 5: --layers 解析正确性 # Feature: etl-dws-flow-refactor, Property 5: --layers 解析正确性 """ @given(subset=_layer_subsets) @settings(max_examples=100) def test_valid_subset_parsed_correctly(self, subset: list[str]): """任意合法层子集经逗号拼接后,parse_layers 返回恰好该子集(大写)。 **Validates: Requirements 6.1, 6.2** """ raw = ",".join(subset) result = parse_layers(raw) assert set(result) == set(subset) assert all(l == l.upper() for l in result) assert len(result) == len(subset) @given( valid=_layer_subsets, bad=_invalid_layer, ) @settings(max_examples=100) def test_invalid_layer_raises(self, valid: list[str], bad: str): """包含无效层名时必须抛出 ValueError。 **Validates: Requirements 6.1, 6.2** """ assume(bad.upper() not in VALID_LAYERS) raw = ",".join(valid + [bad]) with pytest.raises(ValueError, match="无效的层名"): parse_layers(raw) from orchestration.flow_runner import FlowRunner # noqa: E402 # --------------------------------------------------------------------------- # Property 6: 配置优先级——配置值优先于 Registry # Feature: etl-dws-flow-refactor, Property 6: 配置优先级 # Validates: Requirements 7.2 # --------------------------------------------------------------------------- _LAYER_CONFIG_KEY = { "ODS": "run.ods_tasks", "DWD": "run.dwd_tasks", "DWS": "run.dws_tasks", "INDEX": "run.index_tasks", } class _DictConfig: """模拟 AppConfig.get(),支持点号路径查找。""" def __init__(self, data: dict): self._data = data def get(self, key: str, default=None): keys = key.split(".") node = self._data for k in keys: if isinstance(node, dict) and k in node: node = node[k] else: return default return node def _make_runner(config_data: dict, registry_by_layer: dict) -> FlowRunner: """构造最小 FlowRunner 实例,仅满足 _resolve_tasks 所需。""" import logging from unittest.mock import MagicMock config = _DictConfig(config_data) registry = MagicMock() registry.get_tasks_by_layer = MagicMock( side_effect=lambda layer: registry_by_layer.get(layer.upper(), []) ) runner = object.__new__(FlowRunner) runner.config = config runner.task_registry = registry runner.logger = logging.getLogger("test_prop6") return runner _task_code_p6 = st.from_regex(r"[A-Z][A-Z0-9_]{2,20}", fullmatch=True) _task_list_p6 = st.lists(_task_code_p6, min_size=1, max_size=8, unique=True) _layer_p6 = st.sampled_from(["ODS", "DWD", "DWS", "INDEX"]) class TestConfigPriorityProperty: """Property 6: 配置优先级——配置值优先于 Registry # Feature: etl-dws-flow-refactor, Property 6: 配置优先级 """ @given(layer=_layer_p6, config_tasks=_task_list_p6, registry_tasks=_task_list_p6) @settings(max_examples=100) def test_config_overrides_registry( self, layer: str, config_tasks: list[str], registry_tasks: list[str] ): """任意层名 + 非空配置任务列表 → 返回配置值,忽略 Registry。 **Validates: Requirements 7.2** """ assume(set(config_tasks) != set(registry_tasks)) key = _LAYER_CONFIG_KEY[layer] parts = key.split(".") config_data = {parts[0]: {parts[1]: config_tasks}} runner = _make_runner(config_data, {layer: registry_tasks}) result = runner._resolve_tasks([layer]) assert result == config_tasks @given(layer=_layer_p6, registry_tasks=_task_list_p6) @settings(max_examples=100) def test_empty_config_falls_back_to_registry( self, layer: str, registry_tasks: list[str] ): """配置为空时回退到 Registry 结果。 **Validates: Requirements 7.2** """ runner = _make_runner({}, {layer: registry_tasks}) result = runner._resolve_tasks([layer]) assert result == registry_tasks from orchestration.topological_sort import topological_sort # noqa: E402 # --------------------------------------------------------------------------- # Property 7 & 8 辅助:轻量 TaskMeta 替身 + 简易 Registry # --------------------------------------------------------------------------- from dataclasses import dataclass, field as dc_field @dataclass class _FakeMeta: depends_on: list[str] = dc_field(default_factory=list) class _FakeRegistry: """最小 Registry 替身,仅提供 get_metadata()。""" def __init__(self, deps: dict[str, list[str]]): self._deps = deps def get_metadata(self, code: str): if code in self._deps: return _FakeMeta(depends_on=self._deps[code]) return _FakeMeta() # --------------------------------------------------------------------------- # Hypothesis 策略:DAG 生成 & 含环图生成 # --------------------------------------------------------------------------- @st.composite def dag_strategy(draw): """生成随机 DAG:(task_codes, deps_dict)。 通过仅允许依赖索引更小的任务来保证无环。 """ n = draw(st.integers(min_value=1, max_value=10)) codes = [f"TASK_{i}" for i in range(n)] deps: dict[str, list[str]] = {} for i, code in enumerate(codes): if i > 0: possible = codes[:i] chosen = draw( st.lists( st.sampled_from(possible), max_size=min(3, i), unique=True, ) ) deps[code] = chosen else: deps[code] = [] return codes, deps @st.composite def cyclic_graph_strategy(draw): """生成一定包含至少一个环的有向图:(task_codes, deps_dict, cycle_tasks)。""" n = draw(st.integers(min_value=2, max_value=8)) codes = [f"TASK_{i}" for i in range(n)] deps: dict[str, list[str]] = {code: [] for code in codes} # 选取 2-4 个任务构成环 cycle_size = draw(st.integers(min_value=2, max_value=min(4, n))) cycle_tasks = draw( st.lists( st.sampled_from(codes), min_size=cycle_size, max_size=cycle_size, unique=True, ) ) for i in range(len(cycle_tasks)): next_task = cycle_tasks[(i + 1) % len(cycle_tasks)] if next_task not in deps[cycle_tasks[i]]: deps[cycle_tasks[i]].append(next_task) return codes, deps, cycle_tasks # --------------------------------------------------------------------------- # Property 7: 拓扑排序正确性 # Feature: etl-dws-flow-refactor, Property 7: 拓扑排序正确性 # Validates: Requirements 8.3 # --------------------------------------------------------------------------- class TestTopologicalSortProperty: """Property 7: 拓扑排序正确性 # Feature: etl-dws-flow-refactor, Property 7: 拓扑排序正确性 """ @given(data=dag_strategy()) @settings(max_examples=100) def test_dependencies_precede_dependents(self, data): """对于任意 DAG,排序结果中每个任务的依赖都排在它之前。 **Validates: Requirements 8.3** """ codes, deps = data registry = _FakeRegistry(deps) result = topological_sort(codes, registry) # 结果包含且仅包含输入任务(无丢失、无重复) assert set(result) == set(codes) assert len(result) == len(codes) # 每个任务的依赖(在列表内的)都排在它之前 pos = {code: idx for idx, code in enumerate(result)} for code in codes: for dep in deps.get(code, []): if dep in pos: assert pos[dep] < pos[code], ( f"依赖 {dep} 应排在 {code} 之前," f"但位置为 {pos[dep]} vs {pos[code]}" ) # --------------------------------------------------------------------------- # Property 8: 循环依赖检测 # Feature: etl-dws-flow-refactor, Property 8: 循环依赖检测 # Validates: Requirements 8.4 # --------------------------------------------------------------------------- class TestCyclicDependencyProperty: """Property 8: 循环依赖检测 # Feature: etl-dws-flow-refactor, Property 8: 循环依赖检测 """ @given(data=cyclic_graph_strategy()) @settings(max_examples=100) def test_cycle_raises_value_error(self, data): """对于任意含环有向图,topological_sort 应抛出 ValueError。 **Validates: Requirements 8.4** """ codes, deps, cycle_tasks = data registry = _FakeRegistry(deps) with pytest.raises(ValueError, match="循环依赖"): topological_sort(codes, registry) @given(data=cyclic_graph_strategy()) @settings(max_examples=100) def test_cycle_error_mentions_involved_tasks(self, data): """错误信息中应包含环中涉及的至少一个任务代码。 **Validates: Requirements 8.4** """ codes, deps, cycle_tasks = data registry = _FakeRegistry(deps) with pytest.raises(ValueError) as exc_info: topological_sort(codes, registry) msg = str(exc_info.value) # 至少一个环中的任务出现在错误信息中 assert any(t in msg for t in cycle_tasks), ( f"错误信息 '{msg}' 中未包含环任务 {cycle_tasks} 中的任何一个" )