Files
Neo-ZQYY/tests/test_dataflow_analyzer.py

1898 lines
75 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
数据流结构分析 — 属性测试
Feature: dataflow-structure-audit
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
# scripts/ops 不是 Python 包,通过 sys.path 导入
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts" / "ops"))
from dataflow_analyzer import ( # noqa: E402
AnalyzerConfig,
ColumnInfo,
FieldInfo,
TableCollectionResult,
collect_all_tables,
dump_collection_results,
flatten_json_tree,
)
from hypothesis import given, settings, assume
import hypothesis.strategies as st
# ---------------------------------------------------------------------------
# 策略:生成任意嵌套 JSONdict / list / 标量组合)
# ---------------------------------------------------------------------------
_scalar = st.one_of(
st.none(),
st.booleans(),
st.integers(min_value=-10_000, max_value=10_000),
st.floats(allow_nan=False, allow_infinity=False),
st.text(min_size=0, max_size=20),
)
# 限制 key 为非空 ASCII 字母数字(避免含 '.' / '[]' 等干扰路径解析的字符)
_json_key = st.from_regex(r"[A-Za-z][A-Za-z0-9_]{0,9}", fullmatch=True)
def _json_value(max_depth: int = 3):
"""递归策略:生成嵌套 JSON 值,限制深度防止爆炸。"""
if max_depth <= 0:
return _scalar
return st.one_of(
_scalar,
st.dictionaries(_json_key, st.deferred(lambda: _json_value(max_depth - 1)),
min_size=0, max_size=4),
st.lists(st.deferred(lambda: _json_value(max_depth - 1)),
min_size=0, max_size=3),
)
# 顶层必须是 dict与 flatten_json_tree 的输入契约一致)
_json_record = st.dictionaries(_json_key, _json_value(3), min_size=0, max_size=5)
# ---------------------------------------------------------------------------
# 参考实现:收集所有叶子节点路径(用于对比验证)
# ---------------------------------------------------------------------------
def _collect_leaves(obj, prefix: str = "") -> set[str]:
"""
独立于被测代码的参考叶子收集器。
返回所有标量叶子节点的路径集合。
"""
if isinstance(obj, dict):
paths: set[str] = set()
for key, val in obj.items():
child = f"{prefix}.{key}" if prefix else key
paths |= _collect_leaves(val, child)
return paths
elif isinstance(obj, list):
arr = f"{prefix}[]" if prefix else "[]"
paths = set()
for item in obj:
paths |= _collect_leaves(item, arr)
return paths
else:
# 标量叶子 — 空前缀的裸标量不会被记录(与实现一致)
return {prefix} if prefix else set()
# ---------------------------------------------------------------------------
# Property 1: JSON 递归展开路径正确性
# Validates: Requirements 1.1, 1.3, 1.4
# ---------------------------------------------------------------------------
@given(record=_json_record)
@settings(max_examples=200)
def test_flatten_json_tree_path_correctness(record: dict):
"""
Feature: dataflow-structure-audit, Property 1: JSON 递归展开路径正确性
对于任意嵌套 JSON 对象flatten_json_tree 的输出应满足:
1. 路径使用 '.' 分隔层级
2. 数组层级使用 '[]' 标记
3. depth == 路径去掉 '[]''.' 的数量
4. 不遗漏任何叶子节点
**Validates: Requirements 1.1, 1.3, 1.4**
"""
result = flatten_json_tree([record])
# --- 参考叶子集合 ---
expected_leaves = _collect_leaves(record)
# --- 属性 4不遗漏叶子节点 ---
actual_paths = set(result.keys())
assert actual_paths == expected_leaves, (
f"叶子节点不一致。\n"
f" 遗漏: {expected_leaves - actual_paths}\n"
f" 多余: {actual_paths - expected_leaves}"
)
for path, fi in result.items():
# --- 属性 1路径使用 '.' 分隔层级 ---
# 路径中不应出现连续的 '.',且不以 '.' 开头或结尾
assert not path.startswith("."), f"路径不应以 '.' 开头: {path}"
stripped = path.replace("[]", "")
assert not stripped.endswith("."), f"路径不应以 '.' 结尾: {path}"
assert ".." not in stripped, f"路径不应含连续 '..': {path}"
# --- 属性 2数组标记格式 ---
# '[]' 标记合法位置:紧跟在 key 之后或另一个 '[]' 之后
# 合法模式key[]、key[][].child嵌套数组、key[].child
if "[]" in path:
# 路径不应以 '[]' 开头(第一个 '[]' 前必须有 key
assert not path.startswith("[]"), (
f"路径不应以 '[]' 开头: {path}"
)
# --- 属性 3depth 一致性 ---
expected_depth = stripped.count(".")
assert fi.depth == expected_depth, (
f"depth 不一致: path={path}, "
f"expected={expected_depth}, actual={fi.depth}"
)
# --- 基本字段完整性 ---
assert fi.path == path
assert fi.occurrence >= 1
assert fi.total_records == 1 # 单条记录
# ---------------------------------------------------------------------------
# Property 2: 多记录字段合并完整性与出现频率准确性
# Validates: Requirements 1.2, 1.5
# ---------------------------------------------------------------------------
@given(records=st.lists(_json_record, min_size=1, max_size=5))
@settings(max_examples=200)
def test_flatten_json_tree_field_merge_completeness(records: list[dict]):
"""
Feature: dataflow-structure-audit, Property 2: 多记录字段合并完整性与出现频率准确性
对于任意一组 JSON 记录列表flatten_json_tree 的输出应满足:
1. 合并后的字段路径集合 == 所有单条记录字段路径集合的并集
2. 每个字段的 occurrence == 实际包含该字段的记录数
3. 每个字段的 occurrence <= total_records
4. total_records == len(records)
**Validates: Requirements 1.2, 1.5**
"""
result = flatten_json_tree(records)
# --- 参考计算:每条记录的叶子路径集合 ---
per_record_leaves: list[set[str]] = [
_collect_leaves(rec) for rec in records
]
expected_union = set().union(*per_record_leaves) if per_record_leaves else set()
# --- 属性 1并集完整性 ---
actual_paths = set(result.keys())
assert actual_paths == expected_union, (
f"字段路径并集不一致。\n"
f" 遗漏: {expected_union - actual_paths}\n"
f" 多余: {actual_paths - expected_union}"
)
# --- 属性 2occurrence 准确性 ---
for path, fi in result.items():
expected_occ = sum(1 for leaves in per_record_leaves if path in leaves)
assert fi.occurrence == expected_occ, (
f"occurrence 不一致: path={path}, "
f"expected={expected_occ}, actual={fi.occurrence}"
)
# --- 属性 3occurrence <= total_records ---
assert fi.occurrence <= fi.total_records, (
f"occurrence 超过 total_records: path={path}, "
f"occurrence={fi.occurrence}, total_records={fi.total_records}"
)
# --- 属性 4total_records 一致性 ---
for fi in result.values():
assert fi.total_records == len(records), (
f"total_records 不一致: expected={len(records)}, "
f"actual={fi.total_records}"
)
# ---------------------------------------------------------------------------
# 单元测试flatten_json_tree 边界情况
# Requirements: 1.1, 1.2, 1.3, 1.4
# ---------------------------------------------------------------------------
from collections import OrderedDict
def test_empty_records():
"""空记录列表返回空 OrderedDict。
Requirements: 1.2 边界
"""
result = flatten_json_tree([])
assert isinstance(result, OrderedDict)
assert len(result) == 0
def test_flat_json():
"""单层 JSON无嵌套所有字段 depth=0。
Requirements: 1.1
"""
record = {"name": "Alice", "age": 30, "active": True}
result = flatten_json_tree([record])
assert set(result.keys()) == {"name", "age", "active"}
for path, fi in result.items():
assert fi.depth == 0, f"顶层字段 {path} 的 depth 应为 0实际为 {fi.depth}"
assert "." not in fi.path, f"顶层字段路径不应含 '.': {fi.path}"
assert fi.occurrence == 1
assert fi.total_records == 1
assert result["name"].json_type == "string"
assert result["name"].sample == "Alice"
assert result["age"].json_type == "integer"
assert result["age"].sample == "30"
assert result["active"].json_type == "boolean"
def test_deep_nesting():
"""深层嵌套5+ 层),验证路径和 depth 正确。
Requirements: 1.1, 1.4
"""
# 构造 6 层嵌套a.b.c.d.e.f = "leaf"
record = {
"a": {
"b": {
"c": {
"d": {
"e": {
"f": "leaf"
}
}
}
}
}
}
result = flatten_json_tree([record])
assert "a.b.c.d.e.f" in result
fi = result["a.b.c.d.e.f"]
assert fi.depth == 5, f"6 层嵌套叶子 depth 应为 5实际为 {fi.depth}"
assert fi.json_type == "string"
assert fi.sample == "leaf"
assert fi.occurrence == 1
assert fi.total_records == 1
# 确保中间 dict 节点不出现在结果中(只记录叶子)
assert len(result) == 1
def test_array_with_nested_objects():
"""数组内嵌套对象,验证 [] 标记和路径格式。
Requirements: 1.3
"""
record = {
"items": [
{"name": "apple", "price": 3.5},
{"name": "banana", "tags": ["fruit", "yellow"]},
]
}
result = flatten_json_tree([record])
# items[].name 和 items[].price 应存在
assert "items[].name" in result
assert "items[].price" in result
# items[].name: depth = "items.name".count(".") = 1
fi_name = result["items[].name"]
assert fi_name.depth == 1
assert fi_name.json_type == "string"
fi_price = result["items[].price"]
assert fi_price.depth == 1
assert fi_price.json_type == "number"
# 嵌套数组内的标量items[].tags[] → 叶子路径
assert "items[].tags[]" in result
fi_tags = result["items[].tags[]"]
assert fi_tags.depth == 1 # "items.tags".count(".") = 1
assert fi_tags.json_type == "string"
# ---------------------------------------------------------------------------
# 单元测试collect_all_tables 编排逻辑
# Requirements: 2.1, 2.2, 2.3, 2.4
# ---------------------------------------------------------------------------
# 测试用 specs精简版只保留 collect_all_tables 需要的字段)
_TEST_SPECS = [
{
"code": "ODS_TABLE_A",
"table": "table_a",
"endpoint": "/api/table_a",
"description": "测试表 A",
},
{
"code": "ODS_TABLE_B",
"table": "table_b",
"endpoint": "/api/table_b",
"description": "测试表 B",
},
{
"code": "ODS_TABLE_C",
"table": "table_c",
"endpoint": "/api/table_c",
"description": "测试表 C",
},
]
def _fake_fetch(spec: dict, limit: int) -> list[dict]:
"""返回简单的假数据,用于验证编排逻辑。"""
return [{"id": 1, "name": "test", "value": 42}]
def test_collect_all_tables_happy_path():
"""全部表采集成功,无数据库连接。
Requirements: 2.1, 2.2
"""
config = AnalyzerConfig(limit=10)
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert len(results) == 3
for r in results:
assert isinstance(r, TableCollectionResult)
assert r.error is None
assert r.record_count == 1
assert len(r.json_fields) > 0
# 无 DB 连接时 ODS/DWD 列为空
assert r.ods_columns == []
assert r.dwd_columns == []
def test_collect_all_tables_filter_by_name():
"""config.tables 过滤只采集指定表。
Requirements: 2.2
"""
config = AnalyzerConfig(tables=["table_b"])
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert len(results) == 1
assert results[0].table_name == "table_b"
assert results[0].task_code == "ODS_TABLE_B"
def test_collect_all_tables_filter_case_insensitive():
"""表名过滤不区分大小写。
Requirements: 2.2
"""
config = AnalyzerConfig(tables=["TABLE_A", "Table_C"])
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert len(results) == 2
names = {r.table_name for r in results}
assert names == {"table_a", "table_c"}
def test_collect_all_tables_filter_no_match():
"""过滤条件无匹配时返回空列表。
Requirements: 2.2
"""
config = AnalyzerConfig(tables=["nonexistent"])
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert results == []
def test_collect_all_tables_single_table_error_isolation():
"""单表 fetch 失败不中断其余表的采集。
Requirements: 2.4
"""
call_count = 0
def _failing_fetch(spec: dict, limit: int) -> list[dict]:
nonlocal call_count
call_count += 1
if spec["table"] == "table_b":
raise RuntimeError("模拟 API 超时")
return [{"id": 1}]
config = AnalyzerConfig(limit=5)
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_failing_fetch)
assert len(results) == 3
assert call_count == 3
# table_a 成功
assert results[0].table_name == "table_a"
assert results[0].error is None
assert results[0].record_count == 1
# table_b 失败但有记录
assert results[1].table_name == "table_b"
assert results[1].error is not None
assert "模拟 API 超时" in results[1].error
assert results[1].record_count == 0
# table_c 成功(未被 table_b 的失败影响)
assert results[2].table_name == "table_c"
assert results[2].error is None
assert results[2].record_count == 1
def test_collect_all_tables_empty_specs():
"""空 specs 列表返回空结果。
Requirements: 2.2
"""
config = AnalyzerConfig()
results = collect_all_tables(config, [], fetch_fn=_fake_fetch)
assert results == []
def test_collect_all_tables_preserves_metadata():
"""采集结果保留 spec 中的元数据字段。
Requirements: 2.2
"""
config = AnalyzerConfig(limit=50)
results = collect_all_tables(config, _TEST_SPECS[:1], fetch_fn=_fake_fetch)
r = results[0]
assert r.table_name == "table_a"
assert r.task_code == "ODS_TABLE_A"
assert r.description == "测试表 A"
assert r.endpoint == "/api/table_a"
# ---------------------------------------------------------------------------
# dump_collection_results 单元测试
# Requirements: 5.4, 5.6
# ---------------------------------------------------------------------------
import json as _json
def _make_result(
table: str = "test_table",
task_code: str = "ODS_TEST",
record_count: int = 2,
error: str | None = None,
) -> TableCollectionResult:
"""构造一个最小化的 TableCollectionResult 用于测试。"""
fields = OrderedDict()
if not error:
fields["id"] = FieldInfo(
path="id", json_type="integer", sample="1",
depth=0, occurrence=2, total_records=record_count,
samples=["1", "2"],
)
fields["name"] = FieldInfo(
path="name", json_type="string", sample="测试",
depth=0, occurrence=1, total_records=record_count,
samples=["测试"],
)
ods_cols = [
ColumnInfo(name="id", data_type="bigint", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
] if not error else []
dwd_cols_list = [
ColumnInfo(name="id", data_type="bigint", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
ColumnInfo(name="store_id", data_type="integer", is_nullable=False,
column_default=None, comment="门店", ordinal_position=2),
] if not error else []
# dwd_tables: 模拟一张 DWD 表(表名 = "dwd_{table}"
dwd_tables_dict: dict[str, list[ColumnInfo]] = {}
if not error and dwd_cols_list:
dwd_tables_dict[f"dwd_{table}"] = dwd_cols_list
return TableCollectionResult(
table_name=table,
task_code=task_code,
description="测试表",
endpoint="/api/test",
record_count=record_count,
json_fields=fields,
ods_columns=ods_cols,
dwd_columns=dwd_cols_list,
dwd_tables=dwd_tables_dict,
error=error,
)
def test_dump_creates_subdirectories(tmp_path):
"""dump_collection_results 自动创建 json_trees/、db_schemas/、field_mappings/ 和 bd_descriptions/ 子目录。
Requirements: 5.4
"""
results = [_make_result()]
paths = dump_collection_results(results, tmp_path)
assert (tmp_path / "json_trees").is_dir()
assert (tmp_path / "db_schemas").is_dir()
assert (tmp_path / "field_mappings").is_dir()
assert (tmp_path / "bd_descriptions").is_dir()
assert paths["json_trees"] == tmp_path / "json_trees"
assert paths["db_schemas"] == tmp_path / "db_schemas"
assert paths["field_mappings"] == tmp_path / "field_mappings"
assert paths["bd_descriptions"] == tmp_path / "bd_descriptions"
assert paths["manifest"] == tmp_path
def test_dump_json_trees_format(tmp_path):
"""json_trees/{table}.json 包含正确的字段结构(含 samples
Requirements: 5.4
"""
results = [_make_result(table="orders", record_count=10)]
dump_collection_results(results, tmp_path)
tree_file = tmp_path / "json_trees" / "orders.json"
assert tree_file.exists()
data = _json.loads(tree_file.read_text(encoding="utf-8"))
assert data["table"] == "orders"
assert data["total_records"] == 10
assert isinstance(data["fields"], list)
assert len(data["fields"]) == 2
# 验证字段结构
f0 = data["fields"][0]
assert f0["path"] == "id"
assert f0["json_type"] == "integer"
assert f0["depth"] == 0
# 验证 samples 字段存在
assert "samples" in f0
assert isinstance(f0["samples"], list)
def test_dump_db_schemas_format(tmp_path):
"""db_schemas/ods_{table}.json 和 dwd_{dwd_short}.json 包含正确的列结构。
Requirements: 5.4
"""
results = [_make_result(table="members")]
dump_collection_results(results, tmp_path)
ods_file = tmp_path / "db_schemas" / "ods_members.json"
# DWD 文件名现在使用 DWD 表名dwd_members不再使用 ODS 表名
dwd_file = tmp_path / "db_schemas" / "dwd_dwd_members.json"
assert ods_file.exists()
assert dwd_file.exists()
ods = _json.loads(ods_file.read_text(encoding="utf-8"))
assert ods["schema"] == "ods"
assert ods["table"] == "members"
assert len(ods["columns"]) == 1
assert ods["columns"][0]["name"] == "id"
dwd = _json.loads(dwd_file.read_text(encoding="utf-8"))
assert dwd["schema"] == "dwd"
assert dwd["table"] == "dwd_members"
assert dwd["ods_source"] == "members"
assert len(dwd["columns"]) == 2
def test_dump_manifest_format(tmp_path):
"""collection_manifest.json 包含时间戳和表清单。
Requirements: 5.6
"""
results = [
_make_result(table="t1", task_code="ODS_T1", record_count=100),
_make_result(table="t2", task_code="ODS_T2", record_count=0, error="连接失败"),
]
dump_collection_results(results, tmp_path)
manifest_file = tmp_path / "collection_manifest.json"
assert manifest_file.exists()
manifest = _json.loads(manifest_file.read_text(encoding="utf-8"))
assert "timestamp" in manifest
assert isinstance(manifest["tables"], list)
assert len(manifest["tables"]) == 2
t1 = manifest["tables"][0]
assert t1["table"] == "t1"
assert t1["task_code"] == "ODS_T1"
assert t1["record_count"] == 100
assert t1["json_field_count"] == 2 # id + name
assert t1["ods_column_count"] == 1
assert t1["dwd_column_count"] == 2
assert t1["dwd_tables"] == ["dwd_t1"]
assert t1["error"] is None
t2 = manifest["tables"][1]
assert t2["error"] == "连接失败"
assert t2["record_count"] == 0
def test_dump_utf8_chinese_content(tmp_path):
"""JSON 文件使用 UTF-8 编码,中文字符不被转义。
Requirements: 5.4
"""
results = [_make_result()]
dump_collection_results(results, tmp_path)
tree_file = tmp_path / "json_trees" / "test_table.json"
raw = tree_file.read_text(encoding="utf-8")
# ensure_ascii=False 意味着中文直接写入,不会出现 \uXXXX
assert "测试" in raw
assert "\\u" not in raw
def test_dump_empty_results(tmp_path):
"""空结果列表仍生成 manifest 和子目录。
Requirements: 5.4
"""
paths = dump_collection_results([], tmp_path)
assert (tmp_path / "json_trees").is_dir()
assert (tmp_path / "db_schemas").is_dir()
assert (tmp_path / "field_mappings").is_dir()
assert (tmp_path / "bd_descriptions").is_dir()
manifest = _json.loads(
(tmp_path / "collection_manifest.json").read_text(encoding="utf-8")
)
assert manifest["tables"] == []
assert "timestamp" in manifest
assert "table_map" in manifest
assert paths["json_trees"] == tmp_path / "json_trees"
# ---------------------------------------------------------------------------
# CLI 入口build_parser / resolve_output_dir 单元测试
# ---------------------------------------------------------------------------
from analyze_dataflow import build_parser, resolve_output_dir
class TestBuildParser:
"""build_parser 参数解析测试。Requirements: 5.1, 5.2, 5.3"""
def test_defaults(self):
"""无参数时使用默认值。"""
parser = build_parser()
args = parser.parse_args([])
assert args.date_from is None
assert args.date_to is None
assert args.limit == 200
assert args.tables is None
def test_date_range(self):
"""--date-from / --date-to 正确解析。"""
parser = build_parser()
args = parser.parse_args(["--date-from", "2025-01-01", "--date-to", "2025-01-15"])
assert args.date_from == "2025-01-01"
assert args.date_to == "2025-01-15"
def test_limit(self):
"""--limit 解析为 int。"""
parser = build_parser()
args = parser.parse_args(["--limit", "50"])
assert args.limit == 50
def test_tables(self):
"""--tables 解析为逗号分隔字符串。"""
parser = build_parser()
args = parser.parse_args(["--tables", "settlement_records,payment_transactions"])
assert args.tables == "settlement_records,payment_transactions"
# 调用方可通过 split(',') 拆分为列表
assert args.tables.split(",") == ["settlement_records", "payment_transactions"]
def test_all_params_combined(self):
"""所有参数同时传入。"""
parser = build_parser()
args = parser.parse_args([
"--date-from", "2025-06-01",
"--date-to", "2025-06-30",
"--limit", "100",
"--tables", "orders,members",
])
assert args.date_from == "2025-06-01"
assert args.date_to == "2025-06-30"
assert args.limit == 100
assert args.tables == "orders,members"
class TestResolveOutputDir:
"""resolve_output_dir 输出目录回退测试。Requirements: 5.4, 5.5"""
def test_env_var_takes_priority(self, tmp_path, monkeypatch):
"""SYSTEM_ANALYZE_ROOT 存在时优先使用。"""
target = tmp_path / "custom_output"
monkeypatch.setenv("SYSTEM_ANALYZE_ROOT", str(target))
result = resolve_output_dir()
assert result == target
assert target.is_dir()
def test_fallback_raises_when_env_missing(self, monkeypatch):
"""SYSTEM_ANALYZE_ROOT 未设置时抛出 KeyError。"""
monkeypatch.delenv("SYSTEM_ANALYZE_ROOT", raising=False)
with pytest.raises(KeyError):
resolve_output_dir()
def test_creates_directory(self, tmp_path, monkeypatch):
"""目录不存在时自动创建。"""
target = tmp_path / "nested" / "deep" / "dir"
monkeypatch.setenv("SYSTEM_ANALYZE_ROOT", str(target))
result = resolve_output_dir()
assert result.is_dir()
def test_existing_directory_no_error(self, tmp_path, monkeypatch):
"""目录已存在时不报错。"""
target = tmp_path / "already_exists"
target.mkdir()
monkeypatch.setenv("SYSTEM_ANALYZE_ROOT", str(target))
result = resolve_output_dir()
assert result == target
assert target.is_dir()
# ---------------------------------------------------------------------------
# Property 3: 输出文件名格式正确性
# Feature: dataflow-structure-audit, Property 3: 输出文件名格式正确性
# Validates: Requirements 5.6
# ---------------------------------------------------------------------------
import re
from datetime import datetime
from analyze_dataflow import generate_output_filename
@settings(max_examples=100)
@given(dt=st.datetimes(
min_value=datetime(1970, 1, 1),
max_value=datetime(9999, 12, 31, 23, 59, 59),
))
def test_output_filename_format(dt: datetime):
"""
Property 3: 输出文件名格式正确性
对于任意合法 datetimegenerate_output_filename 生成的文件名应:
1. 匹配模式 dataflow_YYYY-MM-DD_HHMMSS.md
2. 文件名中的日期时间与输入 datetime 一致
Validates: Requirements 5.6
"""
filename = generate_output_filename(dt)
# 1. 格式匹配
pattern = r"^dataflow_\d{4}-\d{2}-\d{2}_\d{6}\.md$"
assert re.match(pattern, filename), (
f"文件名 '{filename}' 不匹配模式 {pattern}"
)
# 2. 日期时间一致性:从文件名中提取并与输入比对
match = re.match(
r"^dataflow_(\d{4})-(\d{2})-(\d{2})_(\d{2})(\d{2})(\d{2})\.md$",
filename,
)
assert match, f"无法从文件名 '{filename}' 中提取日期时间"
year, month, day, hour, minute, second = (int(g) for g in match.groups())
assert year == dt.year
assert month == dt.month
assert day == dt.day
assert hour == dt.hour
assert minute == dt.minute
assert second == dt.second
# ---------------------------------------------------------------------------
# 单元测试Hook 配置文件格式验证
# Validates: Requirements 6.1
# ---------------------------------------------------------------------------
class TestHookConfigFile:
"""验证 .kiro/hooks/dataflow-analyze.kiro.hook 配置文件格式正确性"""
HOOK_PATH = Path(__file__).resolve().parent.parent / ".kiro" / "hooks" / "dataflow-analyze.kiro.hook"
def test_hook_file_is_valid_json(self):
"""Hook 文件必须是合法 JSON"""
text = self.HOOK_PATH.read_text(encoding="utf-8")
data = json.loads(text) # 若非法 JSON 则抛 JSONDecodeError
assert isinstance(data, dict)
def test_hook_required_fields_exist(self):
"""Hook 文件必须包含 name、version、when.type、then.type、then.prompt"""
data = json.loads(self.HOOK_PATH.read_text(encoding="utf-8"))
assert "name" in data
assert "version" in data
assert "when" in data and "type" in data["when"]
assert "then" in data and "type" in data["then"]
assert "prompt" in data["then"]
def test_hook_when_type_is_user_triggered(self):
"""when.type 必须为 userTriggered"""
data = json.loads(self.HOOK_PATH.read_text(encoding="utf-8"))
assert data["when"]["type"] == "userTriggered"
def test_hook_then_type_is_ask_agent(self):
"""then.type 必须为 askAgent"""
data = json.loads(self.HOOK_PATH.read_text(encoding="utf-8"))
assert data["then"]["type"] == "askAgent"
# ---------------------------------------------------------------------------
# 新增测试parse_table_map / parse_fact_mappings / build_field_mappings
# Validates: 三层字段映射构建正确性
# ---------------------------------------------------------------------------
from dataflow_analyzer import (
build_field_mappings,
parse_table_map,
parse_fact_mappings,
)
class TestParseTableMap:
"""从 ETL 源码解析 TABLE_MAP。"""
def test_parse_real_file(self):
"""解析真实的 dwd_load_task.py应返回非空字典。"""
result = parse_table_map()
# 真实文件存在时应有映射
if Path("apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py").exists():
assert len(result) > 0
# 所有 key 应以 "dwd." 开头,所有 value 应以 "ods." 开头
for dwd_t, ods_t in result.items():
assert dwd_t.startswith("dwd."), f"DWD 表名应以 dwd. 开头: {dwd_t}"
assert ods_t.startswith("ods."), f"ODS 表名应以 ods. 开头: {ods_t}"
def test_parse_nonexistent_file(self, tmp_path):
"""文件不存在时返回空字典。"""
result = parse_table_map(tmp_path / "nonexistent.py")
assert result == {}
def test_parse_file_without_table_map(self, tmp_path):
"""文件存在但无 TABLE_MAP 定义时返回空字典。"""
py_file = tmp_path / "empty.py"
py_file.write_text("# 空文件\n", encoding="utf-8")
result = parse_table_map(py_file)
assert result == {}
def test_parse_synthetic_table_map(self, tmp_path):
"""解析合成的 TABLE_MAP 定义。"""
py_file = tmp_path / "task.py"
py_file.write_text(
'TABLE_MAP: dict[str, str] = {\n'
' "dwd.dim_foo": "ods.foo_master",\n'
' "dwd.dwd_bar": "ods.bar_records",\n'
'}\n',
encoding="utf-8",
)
result = parse_table_map(py_file)
assert result == {
"dwd.dim_foo": "ods.foo_master",
"dwd.dwd_bar": "ods.bar_records",
}
class TestParseFactMappings:
"""从 ETL 源码解析 FACT_MAPPINGS。"""
def test_parse_real_file(self):
"""解析真实的 dwd_load_task.py应返回非空字典。"""
result = parse_fact_mappings()
if Path("apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py").exists():
assert len(result) > 0
for dwd_t, mappings in result.items():
assert dwd_t.startswith("dwd."), f"DWD 表名应以 dwd. 开头: {dwd_t}"
for entry in mappings:
assert len(entry) == 3, f"映射条目应为 3 元组: {entry}"
def test_parse_nonexistent_file(self, tmp_path):
"""文件不存在时返回空字典。"""
result = parse_fact_mappings(tmp_path / "nonexistent.py")
assert result == {}
class TestBuildFieldMappings:
"""build_field_mappings 三层映射构建。"""
def _make_test_data(self):
"""构造测试用的 TableCollectionResult + TABLE_MAP + FACT_MAPPINGS。"""
fields = OrderedDict()
fields["id"] = FieldInfo(
path="id", json_type="integer", sample="123",
depth=0, occurrence=10, total_records=10,
samples=["123", "456", "789"],
)
fields["name"] = FieldInfo(
path="name", json_type="string", sample="测试",
depth=0, occurrence=10, total_records=10,
samples=["测试", "示例"],
)
fields["extra.nested"] = FieldInfo(
path="extra.nested", json_type="string", sample="嵌套值",
depth=1, occurrence=5, total_records=10,
samples=["嵌套值"],
)
ods_cols = [
ColumnInfo(name="id", data_type="BIGINT", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
ColumnInfo(name="name", data_type="TEXT", is_nullable=True,
column_default=None, comment="名称", ordinal_position=2),
]
dwd_cols_main = [
ColumnInfo(name="foo_id", data_type="BIGINT", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
ColumnInfo(name="name", data_type="TEXT", is_nullable=True,
column_default=None, comment="名称", ordinal_position=2),
ColumnInfo(name="scd2_start_time", data_type="TIMESTAMPTZ", is_nullable=True,
column_default=None, comment=None, ordinal_position=3),
]
result = TableCollectionResult(
table_name="foo_master",
task_code="ODS_FOO",
description="测试表",
endpoint="/api/foo",
record_count=10,
json_fields=fields,
ods_columns=ods_cols,
dwd_columns=dwd_cols_main,
dwd_tables={"dim_foo": dwd_cols_main},
)
table_map = {"dwd.dim_foo": "ods.foo_master"}
fact_mappings = {
"dwd.dim_foo": [("foo_id", "id", None)],
}
all_dwd_cols = {"dim_foo": dwd_cols_main}
return result, table_map, fact_mappings, all_dwd_cols
def test_anchors_generated(self):
"""锚点 ID 格式正确。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
assert mapping["anchors"]["api"] == "api-foo-master"
assert mapping["anchors"]["ods"] == "ods-foo-master"
assert "dim_foo" in mapping["anchors"]["dwd"]
assert mapping["anchors"]["dwd"]["dim_foo"] == "dwd-dim-foo"
def test_json_to_ods_mapping(self):
"""JSON→ODS 映射:已映射和未映射字段正确标注。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
j2o = mapping["json_to_ods"]
assert len(j2o) == 3
# id → 已映射
id_entry = next(e for e in j2o if e["json_path"] == "id")
assert id_entry["ods_col"] == "id"
assert id_entry["match_type"] in ("exact", "case_insensitive")
# name → 已映射
name_entry = next(e for e in j2o if e["json_path"] == "name")
assert name_entry["ods_col"] == "name"
# extra.nested → 未映射ODS 无此列)
nested_entry = next(e for e in j2o if e["json_path"] == "extra.nested")
assert nested_entry["ods_col"] is None
assert nested_entry["match_type"] == "unmapped"
def test_ods_to_dwd_mapping(self):
"""ODS→DWD 映射id 列映射到 dim_foo.foo_id。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
o2d = mapping["ods_to_dwd"]
# id → foo_id通过 FACT_MAPPINGS
assert "id" in o2d
targets = o2d["id"]
assert any(t["dwd_col"] == "foo_id" and t["dwd_table"] == "dim_foo" for t in targets)
# name → name同名直传
assert "name" in o2d
name_targets = o2d["name"]
assert any(t["dwd_col"] == "name" and t["dwd_table"] == "dim_foo" for t in name_targets)
def test_dwd_to_ods_mapping(self):
"""DWD→ODS 映射:包含直接映射和 SCD2 列。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
d2o = mapping["dwd_to_ods"]
assert "dim_foo" in d2o
cols = d2o["dim_foo"]
# foo_id ← idFACT_MAPPINGS 显式映射)
foo_id_entry = next(c for c in cols if c["dwd_col"] == "foo_id")
assert foo_id_entry["ods_source"] == "id"
assert foo_id_entry["mapping_type"] == "直接"
# name ← name同名直传
name_entry = next(c for c in cols if c["dwd_col"] == "name")
assert name_entry["ods_source"] == "name"
assert name_entry["mapping_type"] == "直接"
assert name_entry["note"] == "同名直传"
# scd2_start_timeSCD2 元数据)
scd2_entry = next(c for c in cols if c["dwd_col"] == "scd2_start_time")
assert scd2_entry["mapping_type"] == "SCD2"
assert scd2_entry["ods_source"] == ""
def test_field_mappings_dumped_to_file(self, tmp_path):
"""dump_collection_results 生成 field_mappings JSON 文件。"""
result, _, _, _ = self._make_test_data()
dump_collection_results([result], tmp_path)
fm_file = tmp_path / "field_mappings" / "foo_master.json"
assert fm_file.exists()
data = _json.loads(fm_file.read_text(encoding="utf-8"))
assert data["ods_table"] == "foo_master"
assert "anchors" in data
assert "json_to_ods" in data
assert "ods_to_dwd" in data
assert "dwd_to_ods" in data
def test_no_dwd_tables_produces_empty_mappings(self):
"""ODS 表无 DWD 映射时ods_to_dwd 和 dwd_to_ods 为空。"""
fields = OrderedDict()
fields["id"] = FieldInfo(
path="id", json_type="integer", sample="1",
depth=0, occurrence=1, total_records=1,
samples=["1"],
)
ods_cols = [
ColumnInfo(name="id", data_type="BIGINT", is_nullable=False,
column_default=None, comment=None, ordinal_position=1),
]
result = TableCollectionResult(
table_name="orphan_table",
task_code="ODS_ORPHAN",
description="无 DWD 映射的表",
endpoint="/api/orphan",
record_count=1,
json_fields=fields,
ods_columns=ods_cols,
dwd_columns=[],
dwd_tables={},
)
mapping = build_field_mappings(result, {}, {}, {})
assert mapping["ods_to_dwd"] == {}
assert mapping["dwd_to_ods"] == {}
assert mapping["anchors"]["dwd"] == {}
# ---------------------------------------------------------------------------
# 新增测试多示例值采集samples 字段)
# ---------------------------------------------------------------------------
def test_flatten_json_tree_collects_multiple_samples():
"""flatten_json_tree 为每个字段收集多个不同的示例值。"""
records = [
{"status": 1, "name": "Alice"},
{"status": 2, "name": "Bob"},
{"status": 1, "name": "Charlie"},
{"status": 3, "name": "Alice"},
]
result = flatten_json_tree(records)
# status 有 3 个不同值
assert "status" in result
fi_status = result["status"]
assert set(fi_status.samples) == {"1", "2", "3"}
# name 有 3 个不同值
fi_name = result["name"]
assert set(fi_name.samples) == {"Alice", "Bob", "Charlie"}
def test_flatten_json_tree_samples_limit():
"""samples 数量不超过 MAX_SAMPLES。"""
from dataflow_analyzer import MAX_SAMPLES
# 生成超过 MAX_SAMPLES 个不同值
records = [{"val": i} for i in range(MAX_SAMPLES + 5)]
result = flatten_json_tree(records)
fi = result["val"]
assert len(fi.samples) <= MAX_SAMPLES
def test_flatten_json_tree_samples_skip_none():
"""None 值不被收集到 samples 中。"""
records = [
{"x": None},
{"x": 42},
{"x": None},
{"x": 99},
]
result = flatten_json_tree(records)
fi = result["x"]
assert "None" not in fi.samples
assert set(fi.samples) == {"42", "99"}
# ---------------------------------------------------------------------------
# 新增测试parse_bd_manual_fields / load_bd_descriptions / dump_bd_descriptions
# ---------------------------------------------------------------------------
from dataflow_analyzer import (
parse_bd_manual_fields,
dump_bd_descriptions,
)
class TestParseBdManualFields:
"""从 BD_manual Markdown 文档解析字段说明。"""
def test_parse_standard_format(self, tmp_path):
"""标准 BD_manual 格式解析。"""
doc = tmp_path / "BD_manual_test.md"
doc.write_text(
"# test 测试表\n\n"
"## 字段说明\n\n"
"| 序号 | 字段名 | 类型 | 可空 | 说明 |\n"
"|------|--------|------|------|------|\n"
"| 1 | id | BIGINT | NO | 主键 ID |\n"
"| 2 | name | TEXT | YES | 名称字段 |\n"
"| 3 | status | INTEGER | YES | 状态1=启用0=禁用 |\n",
encoding="utf-8",
)
result = parse_bd_manual_fields(doc)
assert result["id"] == "主键 ID"
assert result["name"] == "名称字段"
assert result["status"] == "状态1=启用0=禁用"
def test_parse_nonexistent_file(self, tmp_path):
"""文件不存在时返回空字典。"""
result = parse_bd_manual_fields(tmp_path / "nonexistent.md")
assert result == {}
def test_parse_no_field_table(self, tmp_path):
"""文件存在但无字段说明表格时返回空字典。"""
doc = tmp_path / "empty.md"
doc.write_text("# 空文档\n\n没有字段说明。\n", encoding="utf-8")
result = parse_bd_manual_fields(doc)
assert result == {}
def test_parse_dwd_format_with_pk_column(self, tmp_path):
"""DWD 格式(含主键列)也能正确解析。"""
doc = tmp_path / "BD_manual_dim_test.md"
doc.write_text(
"# dim_test 测试维表\n\n"
"## 字段说明\n\n"
"| 序号 | 字段名 | 类型 | 可空 | 主键 | 说明 |\n"
"|------|--------|------|------|------|------|\n"
"| 1 | test_id | BIGINT | NO | PK | 唯一标识 |\n"
"| 2 | label | TEXT | YES | | 标签名 |\n",
encoding="utf-8",
)
result = parse_bd_manual_fields(doc)
assert result["test_id"] == "唯一标识"
assert result["label"] == "标签名"
def test_parse_real_ods_doc(self):
"""解析真实的 ODS BD_manual 文档(如果存在)。"""
doc = Path("apps/etl/connectors/feiqiu/docs/database/ODS/main/BD_manual_assistant_accounts_master.md")
if doc.exists():
result = parse_bd_manual_fields(doc)
assert len(result) > 0
# 应包含 id 字段
assert "id" in result
def test_parse_real_dwd_doc(self):
"""解析真实的 DWD BD_manual 文档(如果存在)。"""
doc = Path("apps/etl/connectors/feiqiu/docs/database/DWD/main/BD_manual_dim_assistant.md")
if doc.exists():
result = parse_bd_manual_fields(doc)
assert len(result) > 0
assert "assistant_id" in result
class TestDumpBdDescriptions:
"""dump_bd_descriptions 输出 BD 描述 JSON。"""
def test_dump_creates_files(self, tmp_path):
"""为每张表生成 bd_descriptions JSON 文件。"""
results = [_make_result(table="test_table")]
dump_bd_descriptions(results, tmp_path)
bd_file = tmp_path / "bd_descriptions" / "test_table.json"
assert bd_file.exists()
data = _json.loads(bd_file.read_text(encoding="utf-8"))
assert data["ods_table"] == "test_table"
assert "ods_fields" in data
assert "dwd_fields" in data
def test_dump_empty_results(self, tmp_path):
"""空结果列表不报错。"""
dump_bd_descriptions([], tmp_path)
assert (tmp_path / "bd_descriptions").is_dir()
# ---------------------------------------------------------------------------
# 新增测试gen_dataflow_report.py 报告生成器
# ---------------------------------------------------------------------------
from gen_dataflow_report import generate_report, _esc, _format_samples, _is_enum_like
class TestReportHelpers:
"""报告生成器辅助函数测试。"""
def test_esc_pipe(self):
"""管道符被转义。"""
assert _esc("a|b") == "a\\|b"
def test_esc_newline(self):
"""换行符被替换为空格。"""
assert _esc("a\nb") == "a b"
def test_esc_none(self):
"""空字符串返回空。"""
assert _esc("") == ""
def test_format_samples_basic(self):
"""基本多示例格式化。"""
result = _format_samples(["a", "b", "c"])
assert "`a`" in result
assert "`b`" in result
assert "`c`" in result
def test_format_samples_truncate(self):
"""超长示例值被截断。"""
long_val = "x" * 50
result = _format_samples([long_val])
assert "..." in result
def test_format_samples_empty(self):
"""空列表返回空字符串。"""
assert _format_samples([]) == ""
def test_is_enum_like_true(self):
"""少量不同值 + 足够记录数 → 枚举。"""
assert _is_enum_like(["1", "2", "3"], 100) is True
def test_is_enum_like_false_too_many(self):
"""超过 8 个不同值 → 非枚举。"""
assert _is_enum_like([str(i) for i in range(10)], 100) is False
def test_is_enum_like_false_single(self):
"""只有 1 个值 → 非枚举。"""
assert _is_enum_like(["1"], 100) is False
def test_is_enum_like_false_few_records(self):
"""记录数太少 → 非枚举。"""
assert _is_enum_like(["1", "2"], 3) is False
class TestGenerateReport:
"""generate_report 集成测试。"""
def _setup_data_dir(self, tmp_path):
"""构造最小化的数据目录用于报告生成。"""
# collection_manifest.json
manifest = {
"timestamp": "2026-02-16T22:00:00+08:00",
"table_map": {"dwd.dwd_test": "ods.test_table"},
"tables": [{
"table": "test_table",
"task_code": "ODS_TEST",
"description": "测试表",
"record_count": 10,
"json_field_count": 2,
"ods_column_count": 2,
"dwd_tables": ["dwd_test"],
"dwd_column_count": 3,
"error": None,
}],
}
(tmp_path / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
# json_trees/test_table.json
jt_dir = tmp_path / "json_trees"
jt_dir.mkdir()
jt = {
"table": "test_table", "total_records": 10,
"fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10,
"samples": ["1", "2", "3"]},
{"path": "status", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10,
"samples": ["0", "1", "2"]},
],
}
(jt_dir / "test_table.json").write_text(
_json.dumps(jt, ensure_ascii=False), encoding="utf-8"
)
# db_schemas/
db_dir = tmp_path / "db_schemas"
db_dir.mkdir()
ods = {"schema": "ods", "table": "test_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": "主键", "ordinal_position": 1},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
]}
(db_dir / "ods_test_table.json").write_text(
_json.dumps(ods, ensure_ascii=False), encoding="utf-8"
)
dwd = {"schema": "dwd", "table": "dwd_test", "ods_source": "test_table", "columns": [
{"name": "test_id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
{"name": "scd2_start_time", "data_type": "TIMESTAMPTZ", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 3},
]}
(db_dir / "dwd_dwd_test.json").write_text(
_json.dumps(dwd, ensure_ascii=False), encoding="utf-8"
)
# field_mappings/test_table.json
fm_dir = tmp_path / "field_mappings"
fm_dir.mkdir()
fm = {
"ods_table": "test_table",
"anchors": {
"api": "api-test-table", "ods": "ods-test-table",
"dwd": {"dwd_test": "dwd-dwd-test"},
},
"json_to_ods": [
{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
{"json_path": "status", "ods_col": "status", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
],
"ods_to_dwd": {
"id": [{"dwd_table": "dwd_test", "dwd_col": "test_id", "cast": None, "note": "字段重命名"}],
"status": [{"dwd_table": "dwd_test", "dwd_col": "status", "cast": None, "note": "同名直传"}],
},
"dwd_to_ods": {
"dwd_test": [
{"dwd_col": "test_id", "type": "BIGINT", "ods_source": "id", "mapping_type": "直接", "note": "字段重命名"},
{"dwd_col": "status", "type": "INTEGER", "ods_source": "status", "mapping_type": "直接", "note": "同名直传"},
{"dwd_col": "scd2_start_time", "type": "TIMESTAMPTZ", "ods_source": "", "mapping_type": "SCD2", "note": "SCD2 元数据"},
],
},
}
(fm_dir / "test_table.json").write_text(
_json.dumps(fm, ensure_ascii=False), encoding="utf-8"
)
# bd_descriptions/test_table.json
bd_dir = tmp_path / "bd_descriptions"
bd_dir.mkdir()
bd = {
"ods_table": "test_table",
"ods_fields": {"id": "主键 ID", "status": "状态0=禁用1=启用"},
"dwd_fields": {"dwd_test": {"test_id": "唯一标识", "status": "状态"}},
}
(bd_dir / "test_table.json").write_text(
_json.dumps(bd, ensure_ascii=False), encoding="utf-8"
)
return tmp_path
def test_report_contains_json_field_count(self, tmp_path):
"""总览表包含 API JSON 字段数列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "API JSON 字段数" in report
def test_report_contains_field_diff(self, tmp_path):
"""报告包含字段对比差异章节。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "API↔ODS↔DWD 字段对比差异" in report
def test_report_contains_biz_desc_in_coverage(self, tmp_path):
"""覆盖率表包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
# 2.3 覆盖率表应有业务描述列头
lines = report.split("\n")
coverage_header = [l for l in lines if "业务描述" in l and "JSON 字段数" in l]
assert len(coverage_header) > 0
def test_report_contains_biz_desc_in_api_section(self, tmp_path):
"""API 源字段表包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "主键 ID" in report # 来自 bd_descriptions
def test_report_contains_biz_desc_in_ods_section(self, tmp_path):
"""ODS 表结构包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
# ODS 表头应有业务描述
assert "| # | ODS 列名 | 类型 | ← JSON 源 | → DWD 目标 | 业务描述 |" in report
def test_report_contains_biz_desc_in_dwd_section(self, tmp_path):
"""DWD 表结构包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "| # | DWD 列名 | 类型 | ← ODS 来源 | 转换 | 业务描述 |" in report
def test_report_contains_enum_samples(self, tmp_path):
"""枚举字段展示枚举值列表。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
# status 有 3 个不同值,应被识别为枚举
assert "枚举值" in report
def test_report_anchors_present(self, tmp_path):
"""报告包含锚点标签。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert '<a id="api-test-table"></a>' in report
assert '<a id="ods-test-table"></a>' in report
assert '<a id="dwd-dwd-test"></a>' in report
class TestFieldDiffSubTables:
"""测试 1.1 字段差异分表的生成逻辑。"""
def _setup_diff_data_dir(self, tmp_path):
"""构造包含多种差异类型的测试数据。"""
import json as _json
manifest = {
"timestamp": "2026-02-17T22:00:00+08:00",
"table_map": {"dwd.dwd_alpha": "ods.alpha_table"},
"tables": [{
"table": "alpha_table",
"task_code": "ODS_ALPHA",
"description": "差异测试表",
"record_count": 50,
"json_field_count": 6,
"ods_column_count": 5,
"dwd_tables": ["dwd_alpha"],
"dwd_column_count": 5,
"error": None,
}],
}
(tmp_path / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
# json_trees
jt_dir = tmp_path / "json_trees"
jt_dir.mkdir()
jt = {
"table": "alpha_table", "total_records": 50,
"fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["1"]},
{"path": "name", "json_type": "string", "sample": "foo",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["foo"]},
{"path": "extra_flat", "json_type": "string", "sample": "x",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["x"]},
{"path": "nested.a", "json_type": "string", "sample": "na",
"depth": 1, "occurrence": 50, "total_records": 50, "samples": ["na"]},
{"path": "nested.b", "json_type": "string", "sample": "nb",
"depth": 1, "occurrence": 50, "total_records": 50, "samples": ["nb"]},
{"path": "status", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["0", "1"]},
],
}
(jt_dir / "alpha_table.json").write_text(
_json.dumps(jt, ensure_ascii=False), encoding="utf-8"
)
# db_schemas
db_dir = tmp_path / "db_schemas"
db_dir.mkdir()
ods = {"schema": "ods", "table": "alpha_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "name", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 3},
{"name": "ods_only_col", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 4},
{"name": "no_dwd_col", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 5},
]}
(db_dir / "ods_alpha_table.json").write_text(
_json.dumps(ods, ensure_ascii=False), encoding="utf-8"
)
dwd = {"schema": "dwd", "table": "dwd_alpha", "ods_source": "alpha_table", "columns": [
{"name": "alpha_id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "name", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 3},
{"name": "scd2_ver", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 4},
{"name": "derived_flag", "data_type": "BOOLEAN", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 5},
]}
(db_dir / "dwd_dwd_alpha.json").write_text(
_json.dumps(dwd, ensure_ascii=False), encoding="utf-8"
)
# field_mappings — 构造各种差异
fm_dir = tmp_path / "field_mappings"
fm_dir.mkdir()
fm = {
"ods_table": "alpha_table",
"anchors": {
"api": "api-alpha-table", "ods": "ods-alpha-table",
"dwd": {"dwd_alpha": "dwd-dwd-alpha"},
},
"json_to_ods": [
{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
{"json_path": "name", "ods_col": "name", "match_type": "exact",
"json_type": "string", "occurrence_pct": 100.0},
{"json_path": "status", "ods_col": "status", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
# 平层未映射
{"json_path": "extra_flat", "ods_col": None, "match_type": "unmapped",
"json_type": "string", "occurrence_pct": 100.0},
# 嵌套未映射
{"json_path": "nested.a", "ods_col": None, "match_type": "unmapped",
"json_type": "string", "occurrence_pct": 100.0},
{"json_path": "nested.b", "ods_col": None, "match_type": "unmapped",
"json_type": "string", "occurrence_pct": 100.0},
],
"ods_to_dwd": {
"id": [{"dwd_table": "dwd_alpha", "dwd_col": "alpha_id", "cast": None, "note": "重命名"}],
"name": [{"dwd_table": "dwd_alpha", "dwd_col": "name", "cast": None, "note": ""}],
"status": [{"dwd_table": "dwd_alpha", "dwd_col": "status", "cast": None, "note": ""}],
# ods_only_col 和 no_dwd_col 不在此 → ODS→DWD 未映射
},
"dwd_to_ods": {
"dwd_alpha": [
{"dwd_col": "alpha_id", "type": "BIGINT", "ods_source": "id",
"mapping_type": "直接", "note": "重命名"},
{"dwd_col": "name", "type": "TEXT", "ods_source": "name",
"mapping_type": "直接", "note": ""},
{"dwd_col": "status", "type": "INTEGER", "ods_source": "status",
"mapping_type": "直接", "note": ""},
# DWD 无 ODS 源
{"dwd_col": "scd2_ver", "type": "INTEGER", "ods_source": "",
"mapping_type": "SCD2", "note": "SCD2 元数据"},
{"dwd_col": "derived_flag", "type": "BOOLEAN", "ods_source": "",
"mapping_type": "派生", "note": "派生列"},
],
},
}
(fm_dir / "alpha_table.json").write_text(
_json.dumps(fm, ensure_ascii=False), encoding="utf-8"
)
# bd_descriptions
bd_dir = tmp_path / "bd_descriptions"
bd_dir.mkdir()
bd = {
"ods_table": "alpha_table",
"ods_fields": {"id": "主键", "name": "名称"},
"dwd_fields": {"dwd_alpha": {"alpha_id": "标识", "name": "名称"}},
}
(bd_dir / "alpha_table.json").write_text(
_json.dumps(bd, ensure_ascii=False), encoding="utf-8"
)
return tmp_path
def test_summary_table_has_links(self, tmp_path):
"""汇总表中非零差异数应包含跳转链接。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
# API→ODS 未映射 = 31 平层 + 2 嵌套),应有链接
assert "[3](#diff-alpha-table)" in report
def test_summary_table_zero_no_link(self, tmp_path):
"""汇总表中差异数为 0 时不应有链接。"""
import json as _json
data_dir = self._setup_diff_data_dir(tmp_path)
# 添加一个全部映射的表,使其所有差异列为 0
manifest = _json.loads((data_dir / "collection_manifest.json").read_text(encoding="utf-8"))
manifest["tables"].append({
"table": "zero_table", "task_code": "ODS_ZERO", "description": "零差异",
"record_count": 5, "json_field_count": 1, "ods_column_count": 1,
"dwd_tables": ["dwd_zero"], "dwd_column_count": 1, "error": None,
})
(data_dir / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
fm_z = {
"ods_table": "zero_table",
"anchors": {"api": "api-zero-table", "ods": "ods-zero-table",
"dwd": {"dwd_zero": "dwd-dwd-zero"}},
"json_to_ods": [{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0}],
"ods_to_dwd": {"id": [{"dwd_table": "dwd_zero", "dwd_col": "id", "cast": None, "note": ""}]},
"dwd_to_ods": {"dwd_zero": [{"dwd_col": "id", "type": "BIGINT", "ods_source": "id",
"mapping_type": "直接", "note": ""}]},
}
(data_dir / "field_mappings" / "zero_table.json").write_text(
_json.dumps(fm_z, ensure_ascii=False), encoding="utf-8"
)
ods_z = {"schema": "ods", "table": "zero_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1}]}
(data_dir / "db_schemas" / "ods_zero_table.json").write_text(
_json.dumps(ods_z, ensure_ascii=False), encoding="utf-8"
)
dwd_z = {"schema": "dwd", "table": "dwd_zero", "ods_source": "zero_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1}]}
(data_dir / "db_schemas" / "dwd_dwd_zero.json").write_text(
_json.dumps(dwd_z, ensure_ascii=False), encoding="utf-8"
)
jt_z = {"table": "zero_table", "total_records": 5, "fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 5, "total_records": 5, "samples": ["1"]}]}
(data_dir / "json_trees" / "zero_table.json").write_text(
_json.dumps(jt_z, ensure_ascii=False), encoding="utf-8"
)
bd_z = {"ods_table": "zero_table", "ods_fields": {}, "dwd_fields": {}}
(data_dir / "bd_descriptions" / "zero_table.json").write_text(
_json.dumps(bd_z, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
# zero_table 行应有 "| 0 |" 且不含链接
zero_line = [l for l in report.split("\n") if "zero_table" in l and "| 0 |" in l]
assert len(zero_line) > 0
assert "#diff-zero-table" not in report
def test_diff_anchor_present(self, tmp_path):
"""分表应包含锚点标签。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert '<a id="diff-alpha-table"></a>' in report
def test_diff_subtable_title(self, tmp_path):
"""分表应有递增编号标题。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "#### 1.1.1 alpha_table 字段差异明细" in report
# 不应出现 1.1.x
assert "1.1.x" not in report
def test_api_unmapped_flat_bold(self, tmp_path):
"""平层未映射字段行应加粗。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "**[`extra_flat`](#api-alpha-table)**" in report
assert "**⚠️ 未映射**" in report
def test_api_unmapped_nested_in_details(self, tmp_path):
"""嵌套未映射字段应在 <details> 折叠块中。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "<details><summary>API→ODS 未映射(嵌套对象)" in report
assert "nested.a" in report
assert "nested.b" in report
assert "</details>" in report
def test_ods_no_dwd_fields_bold(self, tmp_path):
"""ODS→DWD 未映射字段行应加粗。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
# ods_only_col 和 no_dwd_col 不在 ods_to_dwd 中
assert "**[`ods_only_col`](#ods-alpha-table)**" in report
assert "**[`no_dwd_col`](#ods-alpha-table)**" in report
def test_dwd_no_ods_fields_bold(self, tmp_path):
"""DWD 无 ODS 源字段行应加粗。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "**[`scd2_ver`](#dwd-dwd-alpha)**" in report
assert "**[`derived_flag`](#dwd-dwd-alpha)**" in report
assert "**⚠️ 无 ODS 源**" in report
def test_no_diff_table_skipped(self, tmp_path):
"""无差异的表不应生成分表。"""
import json as _json
data_dir = self._setup_diff_data_dir(tmp_path)
# 添加一个无差异的表
manifest = _json.loads((data_dir / "collection_manifest.json").read_text(encoding="utf-8"))
manifest["tables"].append({
"table": "clean_table",
"task_code": "ODS_CLEAN",
"description": "无差异表",
"record_count": 10,
"json_field_count": 1,
"ods_column_count": 1,
"dwd_tables": ["dwd_clean"],
"dwd_column_count": 1,
"error": None,
})
(data_dir / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
# 构造完全匹配的 field_mappings
fm_clean = {
"ods_table": "clean_table",
"anchors": {"api": "api-clean-table", "ods": "ods-clean-table",
"dwd": {"dwd_clean": "dwd-dwd-clean"}},
"json_to_ods": [
{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
],
"ods_to_dwd": {"id": [{"dwd_table": "dwd_clean", "dwd_col": "id",
"cast": None, "note": ""}]},
"dwd_to_ods": {"dwd_clean": [
{"dwd_col": "id", "type": "BIGINT", "ods_source": "id",
"mapping_type": "直接", "note": ""},
]},
}
(data_dir / "field_mappings" / "clean_table.json").write_text(
_json.dumps(fm_clean, ensure_ascii=False), encoding="utf-8"
)
ods_clean = {"schema": "ods", "table": "clean_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
]}
(data_dir / "db_schemas" / "ods_clean_table.json").write_text(
_json.dumps(ods_clean, ensure_ascii=False), encoding="utf-8"
)
dwd_clean = {"schema": "dwd", "table": "dwd_clean", "ods_source": "clean_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
]}
(data_dir / "db_schemas" / "dwd_dwd_clean.json").write_text(
_json.dumps(dwd_clean, ensure_ascii=False), encoding="utf-8"
)
jt_clean = {
"table": "clean_table", "total_records": 10,
"fields": [{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10, "samples": ["1"]}],
}
(data_dir / "json_trees" / "clean_table.json").write_text(
_json.dumps(jt_clean, ensure_ascii=False), encoding="utf-8"
)
bd_clean = {"ods_table": "clean_table", "ods_fields": {}, "dwd_fields": {}}
(data_dir / "bd_descriptions" / "clean_table.json").write_text(
_json.dumps(bd_clean, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
# clean_table 不应有差异分表
assert "clean_table 字段差异明细" not in report
# 但 alpha_table 应有
assert "alpha_table 字段差异明细" in report
def test_reason_includes_nested_and_scd2(self, tmp_path):
"""差异原因应包含嵌套对象和 SCD2 信息。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "嵌套对象 2 个" in report
assert "SCD2/派生列 2 个" in report
class TestDiffSubTableColumns:
"""测试差异分表中列的输出格式(推测用途/置信度已移除,改为人工处理)。"""
def test_flat_unmapped_header(self, tmp_path):
"""平层未映射分表应包含示例值、说明列(无推测用途/置信度)。"""
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "| # | JSON 字段 | 示例值 | 说明 | 状态 |" in report
def test_dwd_no_ods_header(self, tmp_path):
"""DWD 无 ODS 源子表应包含说明列(无推测用途/置信度)。"""
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "| # | DWD 表 | DWD 列 | 说明 | 状态 |" in report
def test_section_numbering_incremental(self, tmp_path):
"""多个差异分表应有递增编号 1.1.1, 1.1.2, ...。"""
import json as _json
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
manifest = _json.loads((data_dir / "collection_manifest.json").read_text(encoding="utf-8"))
manifest["tables"].append({
"table": "beta_table", "task_code": "ODS_BETA", "description": "第二表",
"record_count": 10, "json_field_count": 1, "ods_column_count": 1,
"dwd_tables": ["dwd_beta"], "dwd_column_count": 2, "error": None,
})
(data_dir / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
fm_b = {
"ods_table": "beta_table",
"anchors": {"api": "api-beta-table", "ods": "ods-beta-table",
"dwd": {"dwd_beta": "dwd-dwd-beta"}},
"json_to_ods": [{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0}],
"ods_to_dwd": {"id": [{"dwd_table": "dwd_beta", "dwd_col": "id", "cast": None, "note": ""}]},
"dwd_to_ods": {"dwd_beta": [
{"dwd_col": "id", "type": "BIGINT", "ods_source": "id", "mapping_type": "直接", "note": ""},
{"dwd_col": "scd2_flag", "type": "INTEGER", "ods_source": "", "mapping_type": "SCD2", "note": ""},
]},
}
(data_dir / "field_mappings" / "beta_table.json").write_text(
_json.dumps(fm_b, ensure_ascii=False), encoding="utf-8"
)
ods_b = {"schema": "ods", "table": "beta_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1}]}
(data_dir / "db_schemas" / "ods_beta_table.json").write_text(
_json.dumps(ods_b, ensure_ascii=False), encoding="utf-8"
)
dwd_b = {"schema": "dwd", "table": "dwd_beta", "ods_source": "beta_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "scd2_flag", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
]}
(data_dir / "db_schemas" / "dwd_dwd_beta.json").write_text(
_json.dumps(dwd_b, ensure_ascii=False), encoding="utf-8"
)
jt_b = {"table": "beta_table", "total_records": 10, "fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10, "samples": ["1"]}]}
(data_dir / "json_trees" / "beta_table.json").write_text(
_json.dumps(jt_b, ensure_ascii=False), encoding="utf-8"
)
bd_b = {"ods_table": "beta_table", "ods_fields": {}, "dwd_fields": {}}
(data_dir / "bd_descriptions" / "beta_table.json").write_text(
_json.dumps(bd_b, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
assert "#### 1.1.1 alpha_table 字段差异明细" in report
assert "#### 1.1.2 beta_table 字段差异明细" in report
def test_sample_value_in_flat_unmapped(self, tmp_path):
"""平层未映射字段应显示来自 json_trees 的示例值。"""
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
lines = report.split("\n")
flat_rows = [l for l in lines if "extra_flat" in l and "未映射" in l]
assert len(flat_rows) >= 1
assert "`x`" in flat_rows[0]
def test_bd_desc_in_ods_no_json(self, tmp_path):
"""ODS 无 JSON 源子表应显示来自 bd_descriptions 的说明。"""
import json as _json
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
bd = {"ods_table": "alpha_table",
"ods_fields": {"ods_only_col": "仅ODS存在的测试列"},
"dwd_fields": {}}
(data_dir / "bd_descriptions" / "alpha_table.json").write_text(
_json.dumps(bd, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
lines = report.split("\n")
ods_only_rows = [l for l in lines if "ods_only_col" in l and "无 JSON 源" in l]
assert len(ods_only_rows) >= 1
assert "仅ODS存在的测试列" in ods_only_rows[0]