Files
Neo-ZQYY/tests/test_property_6_consistency_check.py

329 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Feature: dataflow-field-completion, Property 6: 数据一致性检查正确性
**Validates: Requirements 16.2, 16.3**
对于任意 ODS 行和对应的 DWD 行,黑盒测试检查器应能正确识别:
(a) ODS 中存在但 DWD 中缺失的字段
(b) ODS 与 DWD 之间值不一致的字段
测试策略:
- 使用 hypothesis 生成随机 API 字段集合和 ODS 列集合
- 使用 hypothesis 生成随机 DWD 列集合、ODS 列集合和 FACT_MAPPINGS 列表
- 验证属性:
1. check_api_vs_ods_fields当 API 字段是 ODS 列的子集时,结果应为 passed
2. check_api_vs_ods_fields当 API 字段不在 ODS 列中时missing_fields > 0
3. check_ods_vs_dwd_mappings当所有 DWD 列都有映射时,结果应为 passed
4. check_ods_vs_dwd_mappings当 DWD 列无映射源时missing_fields > 0
5. total_fields = passed_fields + missing_fields + mismatch_fields
6. field_results 列表长度 = total_fields
"""
from __future__ import annotations
import sys
from pathlib import Path
from hypothesis import given, settings, HealthCheck, assume
import hypothesis.strategies as st
# ── 将 ETL 模块加入 sys.path ──
_ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu"
if str(_ETL_ROOT) not in sys.path:
sys.path.insert(0, str(_ETL_ROOT))
from quality.consistency_checker import (
check_api_vs_ods_fields,
check_ods_vs_dwd_mappings,
ODS_META_COLUMNS,
)
from tasks.dwd.dwd_load_task import DwdLoadTask
# SCD2 列集合小写check_ods_vs_dwd_mappings 内部会排除这些列
_SCD_COLS_LOWER = {c.lower() for c in DwdLoadTask.SCD_COLS}
# ══════════════════════════════════════════════════════════════════
# 策略:生成合法的列名集合
# ══════════════════════════════════════════════════════════════════
# 列名策略:小写字母 + 下划线,长度 2-20避免与 SCD2/ODS 元数据列冲突
_col_name = st.from_regex(r"[a-z][a-z0-9_]{1,19}", fullmatch=True).filter(
lambda c: c not in _SCD_COLS_LOWER and c not in ODS_META_COLUMNS
)
# 非空列名集合
_col_set = st.frozensets(_col_name, min_size=1, max_size=15).map(set)
# 可能为空的列名集合
_col_set_maybe_empty = st.frozensets(_col_name, min_size=0, max_size=15).map(set)
# ══════════════════════════════════════════════════════════════════
# Property 6a: API 字段是 ODS 列子集时,结果应为 passed
# ══════════════════════════════════════════════════════════════════
@given(common=_col_set, extra_ods=_col_set_maybe_empty)
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_api_subset_of_ods_passes(common: set[str], extra_ods: set[str]):
"""
**Validates: Requirements 16.2, 16.3**
当 API 字段集合是 ODS 列集合的子集时check_api_vs_ods_fields 应返回 passed=True。
"""
api_fields = common
# ODS 列 = 共同列 + 额外 ODS 列(确保 API 字段全部被覆盖)
ods_columns = common | extra_ods
result = check_api_vs_ods_fields(api_fields, ods_columns)
assert result.passed is True, (
f"API 字段 {api_fields} 是 ODS 列 {ods_columns} 的子集,但 passed={result.passed}"
)
assert result.missing_fields == 0
# ══════════════════════════════════════════════════════════════════
# Property 6b: API 字段不在 ODS 列中时missing_fields > 0
# ══════════════════════════════════════════════════════════════════
@given(common=_col_set_maybe_empty, api_only=_col_set, ods_only=_col_set_maybe_empty)
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_api_fields_not_in_ods_detected_as_missing(
common: set[str], api_only: set[str], ods_only: set[str]
):
"""
**Validates: Requirements 16.2, 16.3**
当 API 字段中存在不在 ODS 列中的字段时missing_fields > 0 且 passed=False。
"""
# 确保 api_only 与 (common | ods_only) 不重叠
api_only_clean = api_only - common - ods_only
assume(len(api_only_clean) > 0)
api_fields = common | api_only_clean
ods_columns = common | ods_only
result = check_api_vs_ods_fields(api_fields, ods_columns)
assert result.passed is False, (
f"API 有 {len(api_only_clean)} 个字段不在 ODS 中,但 passed=True"
)
assert result.missing_fields >= len(api_only_clean), (
f"期望 missing_fields >= {len(api_only_clean)},实际 {result.missing_fields}"
)
# ══════════════════════════════════════════════════════════════════
# Property 6c: 所有 DWD 列都有映射时,结果应为 passed
# ══════════════════════════════════════════════════════════════════
@st.composite
def _fully_mapped_scenario(draw):
"""生成一个所有 DWD 列都有映射源的场景(显式映射 + 自动映射混合)"""
# 自动映射列ODS 和 DWD 同名
auto_cols = draw(st.frozensets(_col_name, min_size=0, max_size=8).map(set))
# 显式映射列DWD 列名与 ODS 列名不同
explicit_dwd = draw(st.frozensets(_col_name, min_size=0, max_size=8).map(set))
explicit_dwd = explicit_dwd - auto_cols # 避免与自动映射列重叠
# 为每个显式映射列生成一个 ODS 源列名
explicit_ods_names = draw(
st.frozensets(_col_name, min_size=len(explicit_dwd), max_size=len(explicit_dwd) + 5).map(set)
)
# 确保 ODS 源列名不与 DWD 列名重叠(避免被当作自动映射)
explicit_ods_names = explicit_ods_names - auto_cols - explicit_dwd
# 如果 ODS 源列名不够,跳过
assume(len(explicit_ods_names) >= len(explicit_dwd))
ods_list = sorted(explicit_ods_names)[:len(explicit_dwd)]
dwd_list = sorted(explicit_dwd)
fact_mappings = [(d, o, None) for d, o in zip(dwd_list, ods_list)]
dwd_columns = auto_cols | explicit_dwd
ods_columns = auto_cols | set(ods_list)
assume(len(dwd_columns) > 0)
return dwd_columns, ods_columns, fact_mappings
@given(scenario=_fully_mapped_scenario())
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_all_dwd_cols_mapped_passes(scenario):
"""
**Validates: Requirements 16.2, 16.3**
当所有 DWD 非 SCD2 列都有映射源(显式或自动)时,结果应为 passed=True。
"""
dwd_columns, ods_columns, fact_mappings = scenario
result = check_ods_vs_dwd_mappings(
"dwd.test_table", "ods.test_table",
dwd_columns, ods_columns,
fact_mappings if fact_mappings else None,
)
assert result.passed is True, (
f"所有 DWD 列都有映射但 passed=False。"
f" missing={result.missing_fields}, mismatch={result.mismatch_fields}"
)
assert result.missing_fields == 0
assert result.mismatch_fields == 0
# ══════════════════════════════════════════════════════════════════
# Property 6d: DWD 列无映射源时missing_fields > 0
# ══════════════════════════════════════════════════════════════════
@given(
mapped_cols=_col_set_maybe_empty,
orphan_cols=_col_set,
ods_cols=_col_set_maybe_empty,
)
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_unmapped_dwd_cols_detected_as_missing(
mapped_cols: set[str], orphan_cols: set[str], ods_cols: set[str]
):
"""
**Validates: Requirements 16.2, 16.3**
当 DWD 列中存在无映射源的列时missing_fields > 0 且 passed=False。
"""
# orphan_cols 不能与 mapped_cols 或 ods_cols 重叠(否则会被自动映射覆盖)
orphan_clean = orphan_cols - mapped_cols - ods_cols
assume(len(orphan_clean) > 0)
# mapped_cols 通过自动映射ODS 同名列)
dwd_columns = mapped_cols | orphan_clean
ods_columns = mapped_cols | ods_cols # mapped_cols 在 ODS 中存在(自动映射)
result = check_ods_vs_dwd_mappings(
"dwd.test_table", "ods.test_table",
dwd_columns, ods_columns, None,
)
assert result.passed is False, (
f"DWD 有 {len(orphan_clean)} 个无映射列,但 passed=True"
)
assert result.missing_fields >= len(orphan_clean), (
f"期望 missing_fields >= {len(orphan_clean)},实际 {result.missing_fields}"
)
# ══════════════════════════════════════════════════════════════════
# Property 6e: total_fields = passed_fields + missing_fields + mismatch_fields
# ══════════════════════════════════════════════════════════════════
@given(api_fields=_col_set, ods_columns=_col_set)
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_api_vs_ods_field_counts_consistent(api_fields: set[str], ods_columns: set[str]):
"""
**Validates: Requirements 16.2, 16.3**
check_api_vs_ods_fields 的结果中:
total_fields = passed_fields + missing_fields + mismatch_fields
"""
result = check_api_vs_ods_fields(api_fields, ods_columns)
actual_sum = result.passed_fields + result.missing_fields + result.mismatch_fields
assert result.total_fields == actual_sum, (
f"total_fields={result.total_fields} != "
f"passed({result.passed_fields}) + missing({result.missing_fields}) + "
f"mismatch({result.mismatch_fields}) = {actual_sum}"
)
@given(dwd_columns=_col_set, ods_columns=_col_set)
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_ods_vs_dwd_field_counts_consistent(dwd_columns: set[str], ods_columns: set[str]):
"""
**Validates: Requirements 16.2, 16.3**
check_ods_vs_dwd_mappings 的结果中:
total_fields = passed_fields + missing_fields + mismatch_fields
"""
result = check_ods_vs_dwd_mappings(
"dwd.test_table", "ods.test_table",
dwd_columns, ods_columns, None,
)
actual_sum = result.passed_fields + result.missing_fields + result.mismatch_fields
assert result.total_fields == actual_sum, (
f"total_fields={result.total_fields} != "
f"passed({result.passed_fields}) + missing({result.missing_fields}) + "
f"mismatch({result.mismatch_fields}) = {actual_sum}"
)
# ══════════════════════════════════════════════════════════════════
# Property 6f: field_results 列表长度 = total_fields
# ══════════════════════════════════════════════════════════════════
@given(api_fields=_col_set, ods_columns=_col_set)
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_api_vs_ods_field_results_length(api_fields: set[str], ods_columns: set[str]):
"""
**Validates: Requirements 16.2, 16.3**
check_api_vs_ods_fields 的 field_results 列表长度应等于 total_fields。
"""
result = check_api_vs_ods_fields(api_fields, ods_columns)
assert len(result.field_results) == result.total_fields, (
f"field_results 长度 {len(result.field_results)} != total_fields {result.total_fields}"
)
@given(dwd_columns=_col_set, ods_columns=_col_set)
@settings(
max_examples=100,
deadline=None,
suppress_health_check=[HealthCheck.function_scoped_fixture, HealthCheck.too_slow],
)
def test_ods_vs_dwd_field_results_length(dwd_columns: set[str], ods_columns: set[str]):
"""
**Validates: Requirements 16.2, 16.3**
check_ods_vs_dwd_mappings 的 field_results 列表长度应等于 total_fields。
"""
result = check_ods_vs_dwd_mappings(
"dwd.test_table", "ods.test_table",
dwd_columns, ods_columns, None,
)
assert len(result.field_results) == result.total_fields, (
f"field_results 长度 {len(result.field_results)} != total_fields {result.total_fields}"
)