Files
Neo-ZQYY/tests/test_etl_refactor_properties.py
Neo b25308c3f4 feat: P1-P3 全栈集成 — 数据库基础 + DWS 扩展 + 小程序鉴权 + 工程化体系
## P1 数据库基础
- zqyy_app: 创建 auth/biz schema、FDW 连接 etl_feiqiu
- etl_feiqiu: 创建 app schema RLS 视图、商品库存预警表
- 清理 assistant_abolish 残留数据

## P2 ETL/DWS 扩展
- 新增 DWS 助教订单贡献度表 (dws.assistant_order_contribution)
- 新增 assistant_order_contribution_task 任务及 RLS 视图
- member_consumption 增加充值字段、assistant_daily 增加处罚字段
- 更新 ODS/DWD/DWS 任务文档及业务规则文档
- 更新 consistency_checker、flow_runner、task_registry 等核心模块

## P3 小程序鉴权系统
- 新增 xcx_auth 路由/schema(微信登录 + JWT)
- 新增 wechat/role/matching/application 服务层
- zqyy_app 鉴权表迁移 + 角色权限种子数据
- auth/dependencies.py 支持小程序 JWT 鉴权

## 文档与审计
- 新增 DOCUMENTATION-MAP 文档导航
- 新增 7 份 BD_Manual 数据库变更文档
- 更新 DDL 基线快照(etl_feiqiu 6 schema + zqyy_app auth)
- 新增全栈集成审计记录、部署检查清单更新
- 新增 BACKLOG 路线图、FDW→Core 迁移计划

## Kiro 工程化
- 新增 5 个 Spec(P1/P2/P3/全栈集成/核心业务)
- 新增审计自动化脚本(agent_on_stop/build_audit_context/compliance_prescan)
- 新增 6 个 Hook(合规检查/会话日志/提交审计等)
- 新增 doc-map steering 文件

## 运维与测试
- 新增 ops 脚本:迁移验证/API 健康检查/ETL 监控/集成报告
- 新增属性测试:test_dws_contribution / test_auth_system
- 清理过期 export 报告文件
- 更新 .gitignore 排除规则
2026-02-26 08:03:53 +08:00

327 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""Monorepo 级属性测试 — ETL DWS/Flow 重构
集中存放跨模块的属性测试,验证重构后的正确性属性。
"""
import sys
import string
from pathlib import Path
import pytest
from hypothesis import given, settings, assume
from hypothesis import strategies as st
# 将 ETL feiqiu 模块加入 sys.path以便直接导入
_ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu"
if str(_ETL_ROOT) not in sys.path:
sys.path.insert(0, str(_ETL_ROOT))
from cli.main import parse_layers, VALID_LAYERS # noqa: E402
# ---------------------------------------------------------------------------
# Property 5: --layers 解析正确性
# Feature: etl-dws-flow-refactor, Property 5: --layers 解析正确性
# Validates: Requirements 6.1, 6.2
# ---------------------------------------------------------------------------
_ALL_LAYERS = sorted(VALID_LAYERS)
_layer_subsets = st.lists(
st.sampled_from(_ALL_LAYERS),
min_size=1,
max_size=4,
unique=True,
)
_invalid_layer = st.text(
alphabet=string.ascii_letters,
min_size=1,
max_size=10,
).filter(lambda s: s.upper() not in VALID_LAYERS)
class TestParseLayersProperty:
"""Property 5: --layers 解析正确性
# Feature: etl-dws-flow-refactor, Property 5: --layers 解析正确性
"""
@given(subset=_layer_subsets)
@settings(max_examples=100)
def test_valid_subset_parsed_correctly(self, subset: list[str]):
"""任意合法层子集经逗号拼接后parse_layers 返回恰好该子集(大写)。
**Validates: Requirements 6.1, 6.2**
"""
raw = ",".join(subset)
result = parse_layers(raw)
assert set(result) == set(subset)
assert all(l == l.upper() for l in result)
assert len(result) == len(subset)
@given(
valid=_layer_subsets,
bad=_invalid_layer,
)
@settings(max_examples=100)
def test_invalid_layer_raises(self, valid: list[str], bad: str):
"""包含无效层名时必须抛出 ValueError。
**Validates: Requirements 6.1, 6.2**
"""
assume(bad.upper() not in VALID_LAYERS)
raw = ",".join(valid + [bad])
with pytest.raises(ValueError, match="无效的层名"):
parse_layers(raw)
from orchestration.flow_runner import FlowRunner # noqa: E402
# ---------------------------------------------------------------------------
# Property 6: 配置优先级——配置值优先于 Registry
# Feature: etl-dws-flow-refactor, Property 6: 配置优先级
# Validates: Requirements 7.2
# ---------------------------------------------------------------------------
_LAYER_CONFIG_KEY = {
"ODS": "run.ods_tasks",
"DWD": "run.dwd_tasks",
"DWS": "run.dws_tasks",
"INDEX": "run.index_tasks",
}
class _DictConfig:
"""模拟 AppConfig.get(),支持点号路径查找。"""
def __init__(self, data: dict):
self._data = data
def get(self, key: str, default=None):
keys = key.split(".")
node = self._data
for k in keys:
if isinstance(node, dict) and k in node:
node = node[k]
else:
return default
return node
def _make_runner(config_data: dict, registry_by_layer: dict) -> FlowRunner:
"""构造最小 FlowRunner 实例,仅满足 _resolve_tasks 所需。"""
import logging
from unittest.mock import MagicMock
config = _DictConfig(config_data)
registry = MagicMock()
registry.get_tasks_by_layer = MagicMock(
side_effect=lambda layer: registry_by_layer.get(layer.upper(), [])
)
runner = object.__new__(FlowRunner)
runner.config = config
runner.task_registry = registry
runner.logger = logging.getLogger("test_prop6")
return runner
_task_code_p6 = st.from_regex(r"[A-Z][A-Z0-9_]{2,20}", fullmatch=True)
_task_list_p6 = st.lists(_task_code_p6, min_size=1, max_size=8, unique=True)
_layer_p6 = st.sampled_from(["ODS", "DWD", "DWS", "INDEX"])
class TestConfigPriorityProperty:
"""Property 6: 配置优先级——配置值优先于 Registry
# Feature: etl-dws-flow-refactor, Property 6: 配置优先级
"""
@given(layer=_layer_p6, config_tasks=_task_list_p6, registry_tasks=_task_list_p6)
@settings(max_examples=100)
def test_config_overrides_registry(
self, layer: str, config_tasks: list[str], registry_tasks: list[str]
):
"""任意层名 + 非空配置任务列表 → 返回配置值,忽略 Registry。
**Validates: Requirements 7.2**
"""
assume(set(config_tasks) != set(registry_tasks))
key = _LAYER_CONFIG_KEY[layer]
parts = key.split(".")
config_data = {parts[0]: {parts[1]: config_tasks}}
runner = _make_runner(config_data, {layer: registry_tasks})
result = runner._resolve_tasks([layer])
assert result == config_tasks
@given(layer=_layer_p6, registry_tasks=_task_list_p6)
@settings(max_examples=100)
def test_empty_config_falls_back_to_registry(
self, layer: str, registry_tasks: list[str]
):
"""配置为空时回退到 Registry 结果。
**Validates: Requirements 7.2**
"""
runner = _make_runner({}, {layer: registry_tasks})
result = runner._resolve_tasks([layer])
assert result == registry_tasks
from orchestration.topological_sort import topological_sort # noqa: E402
# ---------------------------------------------------------------------------
# Property 7 & 8 辅助:轻量 TaskMeta 替身 + 简易 Registry
# ---------------------------------------------------------------------------
from dataclasses import dataclass, field as dc_field
@dataclass
class _FakeMeta:
depends_on: list[str] = dc_field(default_factory=list)
layer: str | None = None
class _FakeRegistry:
"""最小 Registry 替身,仅提供 get_metadata()。"""
def __init__(self, deps: dict[str, list[str]]):
self._deps = deps
def get_metadata(self, code: str):
if code in self._deps:
return _FakeMeta(depends_on=self._deps[code])
return _FakeMeta()
# ---------------------------------------------------------------------------
# Hypothesis 策略DAG 生成 & 含环图生成
# ---------------------------------------------------------------------------
@st.composite
def dag_strategy(draw):
"""生成随机 DAG(task_codes, deps_dict)。
通过仅允许依赖索引更小的任务来保证无环。
"""
n = draw(st.integers(min_value=1, max_value=10))
codes = [f"TASK_{i}" for i in range(n)]
deps: dict[str, list[str]] = {}
for i, code in enumerate(codes):
if i > 0:
possible = codes[:i]
chosen = draw(
st.lists(
st.sampled_from(possible),
max_size=min(3, i),
unique=True,
)
)
deps[code] = chosen
else:
deps[code] = []
return codes, deps
@st.composite
def cyclic_graph_strategy(draw):
"""生成一定包含至少一个环的有向图:(task_codes, deps_dict, cycle_tasks)。"""
n = draw(st.integers(min_value=2, max_value=8))
codes = [f"TASK_{i}" for i in range(n)]
deps: dict[str, list[str]] = {code: [] for code in codes}
# 选取 2-4 个任务构成环
cycle_size = draw(st.integers(min_value=2, max_value=min(4, n)))
cycle_tasks = draw(
st.lists(
st.sampled_from(codes),
min_size=cycle_size,
max_size=cycle_size,
unique=True,
)
)
for i in range(len(cycle_tasks)):
next_task = cycle_tasks[(i + 1) % len(cycle_tasks)]
if next_task not in deps[cycle_tasks[i]]:
deps[cycle_tasks[i]].append(next_task)
return codes, deps, cycle_tasks
# ---------------------------------------------------------------------------
# Property 7: 拓扑排序正确性
# Feature: etl-dws-flow-refactor, Property 7: 拓扑排序正确性
# Validates: Requirements 8.3
# ---------------------------------------------------------------------------
class TestTopologicalSortProperty:
"""Property 7: 拓扑排序正确性
# Feature: etl-dws-flow-refactor, Property 7: 拓扑排序正确性
"""
@given(data=dag_strategy())
@settings(max_examples=100)
def test_dependencies_precede_dependents(self, data):
"""对于任意 DAG排序结果中每个任务的依赖都排在它之前。
**Validates: Requirements 8.3**
"""
codes, deps = data
registry = _FakeRegistry(deps)
result = topological_sort(codes, registry)
# 结果包含且仅包含输入任务(无丢失、无重复)
assert set(result) == set(codes)
assert len(result) == len(codes)
# 每个任务的依赖(在列表内的)都排在它之前
pos = {code: idx for idx, code in enumerate(result)}
for code in codes:
for dep in deps.get(code, []):
if dep in pos:
assert pos[dep] < pos[code], (
f"依赖 {dep} 应排在 {code} 之前,"
f"但位置为 {pos[dep]} vs {pos[code]}"
)
# ---------------------------------------------------------------------------
# Property 8: 循环依赖检测
# Feature: etl-dws-flow-refactor, Property 8: 循环依赖检测
# Validates: Requirements 8.4
# ---------------------------------------------------------------------------
class TestCyclicDependencyProperty:
"""Property 8: 循环依赖检测
# Feature: etl-dws-flow-refactor, Property 8: 循环依赖检测
"""
@given(data=cyclic_graph_strategy())
@settings(max_examples=100)
def test_cycle_raises_value_error(self, data):
"""对于任意含环有向图topological_sort 应抛出 ValueError。
**Validates: Requirements 8.4**
"""
codes, deps, cycle_tasks = data
registry = _FakeRegistry(deps)
with pytest.raises(ValueError, match="循环依赖"):
topological_sort(codes, registry)
@given(data=cyclic_graph_strategy())
@settings(max_examples=100)
def test_cycle_error_mentions_involved_tasks(self, data):
"""错误信息中应包含环中涉及的至少一个任务代码。
**Validates: Requirements 8.4**
"""
codes, deps, cycle_tasks = data
registry = _FakeRegistry(deps)
with pytest.raises(ValueError) as exc_info:
topological_sort(codes, registry)
msg = str(exc_info.value)
# 至少一个环中的任务出现在错误信息中
assert any(t in msg for t in cycle_tasks), (
f"错误信息 '{msg}' 中未包含环任务 {cycle_tasks} 中的任何一个"
)