# -*- coding: utf-8 -*-
"""
数据流结构分析 — 属性测试
Feature: dataflow-structure-audit
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
# scripts/ops 不是 Python 包,通过 sys.path 导入
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts" / "ops"))
from dataflow_analyzer import ( # noqa: E402
AnalyzerConfig,
ColumnInfo,
FieldInfo,
TableCollectionResult,
collect_all_tables,
dump_collection_results,
flatten_json_tree,
)
from hypothesis import given, settings, assume
import hypothesis.strategies as st
# ---------------------------------------------------------------------------
# 策略:生成任意嵌套 JSON(dict / list / 标量组合)
# ---------------------------------------------------------------------------
_scalar = st.one_of(
st.none(),
st.booleans(),
st.integers(min_value=-10_000, max_value=10_000),
st.floats(allow_nan=False, allow_infinity=False),
st.text(min_size=0, max_size=20),
)
# 限制 key 为非空 ASCII 字母数字(避免含 '.' / '[]' 等干扰路径解析的字符)
_json_key = st.from_regex(r"[A-Za-z][A-Za-z0-9_]{0,9}", fullmatch=True)
def _json_value(max_depth: int = 3):
"""递归策略:生成嵌套 JSON 值,限制深度防止爆炸。"""
if max_depth <= 0:
return _scalar
return st.one_of(
_scalar,
st.dictionaries(_json_key, st.deferred(lambda: _json_value(max_depth - 1)),
min_size=0, max_size=4),
st.lists(st.deferred(lambda: _json_value(max_depth - 1)),
min_size=0, max_size=3),
)
# 顶层必须是 dict(与 flatten_json_tree 的输入契约一致)
_json_record = st.dictionaries(_json_key, _json_value(3), min_size=0, max_size=5)
# ---------------------------------------------------------------------------
# 参考实现:收集所有叶子节点路径(用于对比验证)
# ---------------------------------------------------------------------------
def _collect_leaves(obj, prefix: str = "") -> set[str]:
"""
独立于被测代码的参考叶子收集器。
返回所有标量叶子节点的路径集合。
"""
if isinstance(obj, dict):
paths: set[str] = set()
for key, val in obj.items():
child = f"{prefix}.{key}" if prefix else key
paths |= _collect_leaves(val, child)
return paths
elif isinstance(obj, list):
arr = f"{prefix}[]" if prefix else "[]"
paths = set()
for item in obj:
paths |= _collect_leaves(item, arr)
return paths
else:
# 标量叶子 — 空前缀的裸标量不会被记录(与实现一致)
return {prefix} if prefix else set()
# ---------------------------------------------------------------------------
# Property 1: JSON 递归展开路径正确性
# Validates: Requirements 1.1, 1.3, 1.4
# ---------------------------------------------------------------------------
@given(record=_json_record)
@settings(max_examples=200)
def test_flatten_json_tree_path_correctness(record: dict):
"""
Feature: dataflow-structure-audit, Property 1: JSON 递归展开路径正确性
对于任意嵌套 JSON 对象,flatten_json_tree 的输出应满足:
1. 路径使用 '.' 分隔层级
2. 数组层级使用 '[]' 标记
3. depth == 路径去掉 '[]' 后 '.' 的数量
4. 不遗漏任何叶子节点
**Validates: Requirements 1.1, 1.3, 1.4**
"""
result = flatten_json_tree([record])
# --- 参考叶子集合 ---
expected_leaves = _collect_leaves(record)
# --- 属性 4:不遗漏叶子节点 ---
actual_paths = set(result.keys())
assert actual_paths == expected_leaves, (
f"叶子节点不一致。\n"
f" 遗漏: {expected_leaves - actual_paths}\n"
f" 多余: {actual_paths - expected_leaves}"
)
for path, fi in result.items():
# --- 属性 1:路径使用 '.' 分隔层级 ---
# 路径中不应出现连续的 '.',且不以 '.' 开头或结尾
assert not path.startswith("."), f"路径不应以 '.' 开头: {path}"
stripped = path.replace("[]", "")
assert not stripped.endswith("."), f"路径不应以 '.' 结尾: {path}"
assert ".." not in stripped, f"路径不应含连续 '..': {path}"
# --- 属性 2:数组标记格式 ---
# '[]' 标记合法位置:紧跟在 key 之后或另一个 '[]' 之后
# 合法模式:key[]、key[][].child(嵌套数组)、key[].child
if "[]" in path:
# 路径不应以 '[]' 开头(第一个 '[]' 前必须有 key)
assert not path.startswith("[]"), (
f"路径不应以 '[]' 开头: {path}"
)
# --- 属性 3:depth 一致性 ---
expected_depth = stripped.count(".")
assert fi.depth == expected_depth, (
f"depth 不一致: path={path}, "
f"expected={expected_depth}, actual={fi.depth}"
)
# --- 基本字段完整性 ---
assert fi.path == path
assert fi.occurrence >= 1
assert fi.total_records == 1 # 单条记录
# ---------------------------------------------------------------------------
# Property 2: 多记录字段合并完整性与出现频率准确性
# Validates: Requirements 1.2, 1.5
# ---------------------------------------------------------------------------
@given(records=st.lists(_json_record, min_size=1, max_size=5))
@settings(max_examples=200)
def test_flatten_json_tree_field_merge_completeness(records: list[dict]):
"""
Feature: dataflow-structure-audit, Property 2: 多记录字段合并完整性与出现频率准确性
对于任意一组 JSON 记录列表,flatten_json_tree 的输出应满足:
1. 合并后的字段路径集合 == 所有单条记录字段路径集合的并集
2. 每个字段的 occurrence == 实际包含该字段的记录数
3. 每个字段的 occurrence <= total_records
4. total_records == len(records)
**Validates: Requirements 1.2, 1.5**
"""
result = flatten_json_tree(records)
# --- 参考计算:每条记录的叶子路径集合 ---
per_record_leaves: list[set[str]] = [
_collect_leaves(rec) for rec in records
]
expected_union = set().union(*per_record_leaves) if per_record_leaves else set()
# --- 属性 1:并集完整性 ---
actual_paths = set(result.keys())
assert actual_paths == expected_union, (
f"字段路径并集不一致。\n"
f" 遗漏: {expected_union - actual_paths}\n"
f" 多余: {actual_paths - expected_union}"
)
# --- 属性 2:occurrence 准确性 ---
for path, fi in result.items():
expected_occ = sum(1 for leaves in per_record_leaves if path in leaves)
assert fi.occurrence == expected_occ, (
f"occurrence 不一致: path={path}, "
f"expected={expected_occ}, actual={fi.occurrence}"
)
# --- 属性 3:occurrence <= total_records ---
assert fi.occurrence <= fi.total_records, (
f"occurrence 超过 total_records: path={path}, "
f"occurrence={fi.occurrence}, total_records={fi.total_records}"
)
# --- 属性 4:total_records 一致性 ---
for fi in result.values():
assert fi.total_records == len(records), (
f"total_records 不一致: expected={len(records)}, "
f"actual={fi.total_records}"
)
# ---------------------------------------------------------------------------
# 单元测试:flatten_json_tree 边界情况
# Requirements: 1.1, 1.2, 1.3, 1.4
# ---------------------------------------------------------------------------
from collections import OrderedDict
def test_empty_records():
"""空记录列表返回空 OrderedDict。
Requirements: 1.2 边界
"""
result = flatten_json_tree([])
assert isinstance(result, OrderedDict)
assert len(result) == 0
def test_flat_json():
"""单层 JSON(无嵌套),所有字段 depth=0。
Requirements: 1.1
"""
record = {"name": "Alice", "age": 30, "active": True}
result = flatten_json_tree([record])
assert set(result.keys()) == {"name", "age", "active"}
for path, fi in result.items():
assert fi.depth == 0, f"顶层字段 {path} 的 depth 应为 0,实际为 {fi.depth}"
assert "." not in fi.path, f"顶层字段路径不应含 '.': {fi.path}"
assert fi.occurrence == 1
assert fi.total_records == 1
assert result["name"].json_type == "string"
assert result["name"].sample == "Alice"
assert result["age"].json_type == "integer"
assert result["age"].sample == "30"
assert result["active"].json_type == "boolean"
def test_deep_nesting():
"""深层嵌套(5+ 层),验证路径和 depth 正确。
Requirements: 1.1, 1.4
"""
# 构造 6 层嵌套:a.b.c.d.e.f = "leaf"
record = {
"a": {
"b": {
"c": {
"d": {
"e": {
"f": "leaf"
}
}
}
}
}
}
result = flatten_json_tree([record])
assert "a.b.c.d.e.f" in result
fi = result["a.b.c.d.e.f"]
assert fi.depth == 5, f"6 层嵌套叶子 depth 应为 5,实际为 {fi.depth}"
assert fi.json_type == "string"
assert fi.sample == "leaf"
assert fi.occurrence == 1
assert fi.total_records == 1
# 确保中间 dict 节点不出现在结果中(只记录叶子)
assert len(result) == 1
def test_array_with_nested_objects():
"""数组内嵌套对象,验证 [] 标记和路径格式。
Requirements: 1.3
"""
record = {
"items": [
{"name": "apple", "price": 3.5},
{"name": "banana", "tags": ["fruit", "yellow"]},
]
}
result = flatten_json_tree([record])
# items[].name 和 items[].price 应存在
assert "items[].name" in result
assert "items[].price" in result
# items[].name: depth = "items.name".count(".") = 1
fi_name = result["items[].name"]
assert fi_name.depth == 1
assert fi_name.json_type == "string"
fi_price = result["items[].price"]
assert fi_price.depth == 1
assert fi_price.json_type == "number"
# 嵌套数组内的标量:items[].tags[] → 叶子路径
assert "items[].tags[]" in result
fi_tags = result["items[].tags[]"]
assert fi_tags.depth == 1 # "items.tags".count(".") = 1
assert fi_tags.json_type == "string"
# ---------------------------------------------------------------------------
# 单元测试:collect_all_tables 编排逻辑
# Requirements: 2.1, 2.2, 2.3, 2.4
# ---------------------------------------------------------------------------
# 测试用 specs(精简版,只保留 collect_all_tables 需要的字段)
_TEST_SPECS = [
{
"code": "ODS_TABLE_A",
"table": "table_a",
"endpoint": "/api/table_a",
"description": "测试表 A",
},
{
"code": "ODS_TABLE_B",
"table": "table_b",
"endpoint": "/api/table_b",
"description": "测试表 B",
},
{
"code": "ODS_TABLE_C",
"table": "table_c",
"endpoint": "/api/table_c",
"description": "测试表 C",
},
]
def _fake_fetch(spec: dict, limit: int) -> list[dict]:
"""返回简单的假数据,用于验证编排逻辑。"""
return [{"id": 1, "name": "test", "value": 42}]
def test_collect_all_tables_happy_path():
"""全部表采集成功,无数据库连接。
Requirements: 2.1, 2.2
"""
config = AnalyzerConfig(limit=10)
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert len(results) == 3
for r in results:
assert isinstance(r, TableCollectionResult)
assert r.error is None
assert r.record_count == 1
assert len(r.json_fields) > 0
# 无 DB 连接时 ODS/DWD 列为空
assert r.ods_columns == []
assert r.dwd_columns == []
def test_collect_all_tables_filter_by_name():
"""config.tables 过滤只采集指定表。
Requirements: 2.2
"""
config = AnalyzerConfig(tables=["table_b"])
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert len(results) == 1
assert results[0].table_name == "table_b"
assert results[0].task_code == "ODS_TABLE_B"
def test_collect_all_tables_filter_case_insensitive():
"""表名过滤不区分大小写。
Requirements: 2.2
"""
config = AnalyzerConfig(tables=["TABLE_A", "Table_C"])
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert len(results) == 2
names = {r.table_name for r in results}
assert names == {"table_a", "table_c"}
def test_collect_all_tables_filter_no_match():
"""过滤条件无匹配时返回空列表。
Requirements: 2.2
"""
config = AnalyzerConfig(tables=["nonexistent"])
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_fake_fetch)
assert results == []
def test_collect_all_tables_single_table_error_isolation():
"""单表 fetch 失败不中断其余表的采集。
Requirements: 2.4
"""
call_count = 0
def _failing_fetch(spec: dict, limit: int) -> list[dict]:
nonlocal call_count
call_count += 1
if spec["table"] == "table_b":
raise RuntimeError("模拟 API 超时")
return [{"id": 1}]
config = AnalyzerConfig(limit=5)
results = collect_all_tables(config, _TEST_SPECS, fetch_fn=_failing_fetch)
assert len(results) == 3
assert call_count == 3
# table_a 成功
assert results[0].table_name == "table_a"
assert results[0].error is None
assert results[0].record_count == 1
# table_b 失败但有记录
assert results[1].table_name == "table_b"
assert results[1].error is not None
assert "模拟 API 超时" in results[1].error
assert results[1].record_count == 0
# table_c 成功(未被 table_b 的失败影响)
assert results[2].table_name == "table_c"
assert results[2].error is None
assert results[2].record_count == 1
def test_collect_all_tables_empty_specs():
"""空 specs 列表返回空结果。
Requirements: 2.2
"""
config = AnalyzerConfig()
results = collect_all_tables(config, [], fetch_fn=_fake_fetch)
assert results == []
def test_collect_all_tables_preserves_metadata():
"""采集结果保留 spec 中的元数据字段。
Requirements: 2.2
"""
config = AnalyzerConfig(limit=50)
results = collect_all_tables(config, _TEST_SPECS[:1], fetch_fn=_fake_fetch)
r = results[0]
assert r.table_name == "table_a"
assert r.task_code == "ODS_TABLE_A"
assert r.description == "测试表 A"
assert r.endpoint == "/api/table_a"
# ---------------------------------------------------------------------------
# dump_collection_results 单元测试
# Requirements: 5.4, 5.6
# ---------------------------------------------------------------------------
import json as _json
def _make_result(
table: str = "test_table",
task_code: str = "ODS_TEST",
record_count: int = 2,
error: str | None = None,
) -> TableCollectionResult:
"""构造一个最小化的 TableCollectionResult 用于测试。"""
fields = OrderedDict()
if not error:
fields["id"] = FieldInfo(
path="id", json_type="integer", sample="1",
depth=0, occurrence=2, total_records=record_count,
samples=["1", "2"],
)
fields["name"] = FieldInfo(
path="name", json_type="string", sample="测试",
depth=0, occurrence=1, total_records=record_count,
samples=["测试"],
)
ods_cols = [
ColumnInfo(name="id", data_type="bigint", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
] if not error else []
dwd_cols_list = [
ColumnInfo(name="id", data_type="bigint", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
ColumnInfo(name="store_id", data_type="integer", is_nullable=False,
column_default=None, comment="门店", ordinal_position=2),
] if not error else []
# dwd_tables: 模拟一张 DWD 表(表名 = "dwd_{table}")
dwd_tables_dict: dict[str, list[ColumnInfo]] = {}
if not error and dwd_cols_list:
dwd_tables_dict[f"dwd_{table}"] = dwd_cols_list
return TableCollectionResult(
table_name=table,
task_code=task_code,
description="测试表",
endpoint="/api/test",
record_count=record_count,
json_fields=fields,
ods_columns=ods_cols,
dwd_columns=dwd_cols_list,
dwd_tables=dwd_tables_dict,
error=error,
)
def test_dump_creates_subdirectories(tmp_path):
"""dump_collection_results 自动创建 json_trees/、db_schemas/、field_mappings/ 和 bd_descriptions/ 子目录。
Requirements: 5.4
"""
results = [_make_result()]
paths = dump_collection_results(results, tmp_path)
assert (tmp_path / "json_trees").is_dir()
assert (tmp_path / "db_schemas").is_dir()
assert (tmp_path / "field_mappings").is_dir()
assert (tmp_path / "bd_descriptions").is_dir()
assert paths["json_trees"] == tmp_path / "json_trees"
assert paths["db_schemas"] == tmp_path / "db_schemas"
assert paths["field_mappings"] == tmp_path / "field_mappings"
assert paths["bd_descriptions"] == tmp_path / "bd_descriptions"
assert paths["manifest"] == tmp_path
def test_dump_json_trees_format(tmp_path):
"""json_trees/{table}.json 包含正确的字段结构(含 samples)。
Requirements: 5.4
"""
results = [_make_result(table="orders", record_count=10)]
dump_collection_results(results, tmp_path)
tree_file = tmp_path / "json_trees" / "orders.json"
assert tree_file.exists()
data = _json.loads(tree_file.read_text(encoding="utf-8"))
assert data["table"] == "orders"
assert data["total_records"] == 10
assert isinstance(data["fields"], list)
assert len(data["fields"]) == 2
# 验证字段结构
f0 = data["fields"][0]
assert f0["path"] == "id"
assert f0["json_type"] == "integer"
assert f0["depth"] == 0
# 验证 samples 字段存在
assert "samples" in f0
assert isinstance(f0["samples"], list)
def test_dump_db_schemas_format(tmp_path):
"""db_schemas/ods_{table}.json 和 dwd_{dwd_short}.json 包含正确的列结构。
Requirements: 5.4
"""
results = [_make_result(table="members")]
dump_collection_results(results, tmp_path)
ods_file = tmp_path / "db_schemas" / "ods_members.json"
# DWD 文件名现在使用 DWD 表名(dwd_members),不再使用 ODS 表名
dwd_file = tmp_path / "db_schemas" / "dwd_dwd_members.json"
assert ods_file.exists()
assert dwd_file.exists()
ods = _json.loads(ods_file.read_text(encoding="utf-8"))
assert ods["schema"] == "ods"
assert ods["table"] == "members"
assert len(ods["columns"]) == 1
assert ods["columns"][0]["name"] == "id"
dwd = _json.loads(dwd_file.read_text(encoding="utf-8"))
assert dwd["schema"] == "dwd"
assert dwd["table"] == "dwd_members"
assert dwd["ods_source"] == "members"
assert len(dwd["columns"]) == 2
def test_dump_manifest_format(tmp_path):
"""collection_manifest.json 包含时间戳和表清单。
Requirements: 5.6
"""
results = [
_make_result(table="t1", task_code="ODS_T1", record_count=100),
_make_result(table="t2", task_code="ODS_T2", record_count=0, error="连接失败"),
]
dump_collection_results(results, tmp_path)
manifest_file = tmp_path / "collection_manifest.json"
assert manifest_file.exists()
manifest = _json.loads(manifest_file.read_text(encoding="utf-8"))
assert "timestamp" in manifest
assert isinstance(manifest["tables"], list)
assert len(manifest["tables"]) == 2
t1 = manifest["tables"][0]
assert t1["table"] == "t1"
assert t1["task_code"] == "ODS_T1"
assert t1["record_count"] == 100
assert t1["json_field_count"] == 2 # id + name
assert t1["ods_column_count"] == 1
assert t1["dwd_column_count"] == 2
assert t1["dwd_tables"] == ["dwd_t1"]
assert t1["error"] is None
t2 = manifest["tables"][1]
assert t2["error"] == "连接失败"
assert t2["record_count"] == 0
def test_dump_utf8_chinese_content(tmp_path):
"""JSON 文件使用 UTF-8 编码,中文字符不被转义。
Requirements: 5.4
"""
results = [_make_result()]
dump_collection_results(results, tmp_path)
tree_file = tmp_path / "json_trees" / "test_table.json"
raw = tree_file.read_text(encoding="utf-8")
# ensure_ascii=False 意味着中文直接写入,不会出现 \uXXXX
assert "测试" in raw
assert "\\u" not in raw
def test_dump_empty_results(tmp_path):
"""空结果列表仍生成 manifest 和子目录。
Requirements: 5.4
"""
paths = dump_collection_results([], tmp_path)
assert (tmp_path / "json_trees").is_dir()
assert (tmp_path / "db_schemas").is_dir()
assert (tmp_path / "field_mappings").is_dir()
assert (tmp_path / "bd_descriptions").is_dir()
manifest = _json.loads(
(tmp_path / "collection_manifest.json").read_text(encoding="utf-8")
)
assert manifest["tables"] == []
assert "timestamp" in manifest
assert "table_map" in manifest
assert paths["json_trees"] == tmp_path / "json_trees"
# ---------------------------------------------------------------------------
# CLI 入口:build_parser / resolve_output_dir 单元测试
# ---------------------------------------------------------------------------
from analyze_dataflow import build_parser, resolve_output_dir
class TestBuildParser:
"""build_parser 参数解析测试。Requirements: 5.1, 5.2, 5.3"""
def test_defaults(self):
"""无参数时使用默认值。"""
parser = build_parser()
args = parser.parse_args([])
assert args.date_from is None
assert args.date_to is None
assert args.limit == 200
assert args.tables is None
def test_date_range(self):
"""--date-from / --date-to 正确解析。"""
parser = build_parser()
args = parser.parse_args(["--date-from", "2025-01-01", "--date-to", "2025-01-15"])
assert args.date_from == "2025-01-01"
assert args.date_to == "2025-01-15"
def test_limit(self):
"""--limit 解析为 int。"""
parser = build_parser()
args = parser.parse_args(["--limit", "50"])
assert args.limit == 50
def test_tables(self):
"""--tables 解析为逗号分隔字符串。"""
parser = build_parser()
args = parser.parse_args(["--tables", "settlement_records,payment_transactions"])
assert args.tables == "settlement_records,payment_transactions"
# 调用方可通过 split(',') 拆分为列表
assert args.tables.split(",") == ["settlement_records", "payment_transactions"]
def test_all_params_combined(self):
"""所有参数同时传入。"""
parser = build_parser()
args = parser.parse_args([
"--date-from", "2025-06-01",
"--date-to", "2025-06-30",
"--limit", "100",
"--tables", "orders,members",
])
assert args.date_from == "2025-06-01"
assert args.date_to == "2025-06-30"
assert args.limit == 100
assert args.tables == "orders,members"
class TestResolveOutputDir:
"""resolve_output_dir 输出目录回退测试。Requirements: 5.4, 5.5"""
def test_env_var_takes_priority(self, tmp_path, monkeypatch):
"""SYSTEM_ANALYZE_ROOT 存在时优先使用。"""
target = tmp_path / "custom_output"
monkeypatch.setenv("SYSTEM_ANALYZE_ROOT", str(target))
result = resolve_output_dir()
assert result == target
assert target.is_dir()
def test_fallback_raises_when_env_missing(self, monkeypatch):
"""SYSTEM_ANALYZE_ROOT 未设置时抛出 KeyError。"""
monkeypatch.delenv("SYSTEM_ANALYZE_ROOT", raising=False)
with pytest.raises(KeyError):
resolve_output_dir()
def test_creates_directory(self, tmp_path, monkeypatch):
"""目录不存在时自动创建。"""
target = tmp_path / "nested" / "deep" / "dir"
monkeypatch.setenv("SYSTEM_ANALYZE_ROOT", str(target))
result = resolve_output_dir()
assert result.is_dir()
def test_existing_directory_no_error(self, tmp_path, monkeypatch):
"""目录已存在时不报错。"""
target = tmp_path / "already_exists"
target.mkdir()
monkeypatch.setenv("SYSTEM_ANALYZE_ROOT", str(target))
result = resolve_output_dir()
assert result == target
assert target.is_dir()
# ---------------------------------------------------------------------------
# Property 3: 输出文件名格式正确性
# Feature: dataflow-structure-audit, Property 3: 输出文件名格式正确性
# Validates: Requirements 5.6
# ---------------------------------------------------------------------------
import re
from datetime import datetime
from analyze_dataflow import generate_output_filename
@settings(max_examples=100)
@given(dt=st.datetimes(
min_value=datetime(1970, 1, 1),
max_value=datetime(9999, 12, 31, 23, 59, 59),
))
def test_output_filename_format(dt: datetime):
"""
Property 3: 输出文件名格式正确性
对于任意合法 datetime,generate_output_filename 生成的文件名应:
1. 匹配模式 dataflow_YYYY-MM-DD_HHMMSS.md
2. 文件名中的日期时间与输入 datetime 一致
Validates: Requirements 5.6
"""
filename = generate_output_filename(dt)
# 1. 格式匹配
pattern = r"^dataflow_\d{4}-\d{2}-\d{2}_\d{6}\.md$"
assert re.match(pattern, filename), (
f"文件名 '{filename}' 不匹配模式 {pattern}"
)
# 2. 日期时间一致性:从文件名中提取并与输入比对
match = re.match(
r"^dataflow_(\d{4})-(\d{2})-(\d{2})_(\d{2})(\d{2})(\d{2})\.md$",
filename,
)
assert match, f"无法从文件名 '{filename}' 中提取日期时间"
year, month, day, hour, minute, second = (int(g) for g in match.groups())
assert year == dt.year
assert month == dt.month
assert day == dt.day
assert hour == dt.hour
assert minute == dt.minute
assert second == dt.second
# ---------------------------------------------------------------------------
# 单元测试:Hook 配置文件格式验证
# Validates: Requirements 6.1
# ---------------------------------------------------------------------------
class TestHookConfigFile:
"""验证 .kiro/hooks/dataflow-analyze.kiro.hook 配置文件格式正确性"""
HOOK_PATH = Path(__file__).resolve().parent.parent / ".kiro" / "hooks" / "dataflow-analyze.kiro.hook"
def test_hook_file_is_valid_json(self):
"""Hook 文件必须是合法 JSON"""
text = self.HOOK_PATH.read_text(encoding="utf-8")
data = json.loads(text) # 若非法 JSON 则抛 JSONDecodeError
assert isinstance(data, dict)
def test_hook_required_fields_exist(self):
"""Hook 文件必须包含 name、version、when.type、then.type、then.prompt"""
data = json.loads(self.HOOK_PATH.read_text(encoding="utf-8"))
assert "name" in data
assert "version" in data
assert "when" in data and "type" in data["when"]
assert "then" in data and "type" in data["then"]
assert "prompt" in data["then"]
def test_hook_when_type_is_user_triggered(self):
"""when.type 必须为 userTriggered"""
data = json.loads(self.HOOK_PATH.read_text(encoding="utf-8"))
assert data["when"]["type"] == "userTriggered"
def test_hook_then_type_is_ask_agent(self):
"""then.type 必须为 askAgent"""
data = json.loads(self.HOOK_PATH.read_text(encoding="utf-8"))
assert data["then"]["type"] == "askAgent"
# ---------------------------------------------------------------------------
# 新增测试:parse_table_map / parse_fact_mappings / build_field_mappings
# Validates: 三层字段映射构建正确性
# ---------------------------------------------------------------------------
from dataflow_analyzer import (
build_field_mappings,
parse_table_map,
parse_fact_mappings,
)
class TestParseTableMap:
"""从 ETL 源码解析 TABLE_MAP。"""
def test_parse_real_file(self):
"""解析真实的 dwd_load_task.py,应返回非空字典。"""
result = parse_table_map()
# 真实文件存在时应有映射
if Path("apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py").exists():
assert len(result) > 0
# 所有 key 应以 "dwd." 开头,所有 value 应以 "ods." 开头
for dwd_t, ods_t in result.items():
assert dwd_t.startswith("dwd."), f"DWD 表名应以 dwd. 开头: {dwd_t}"
assert ods_t.startswith("ods."), f"ODS 表名应以 ods. 开头: {ods_t}"
def test_parse_nonexistent_file(self, tmp_path):
"""文件不存在时返回空字典。"""
result = parse_table_map(tmp_path / "nonexistent.py")
assert result == {}
def test_parse_file_without_table_map(self, tmp_path):
"""文件存在但无 TABLE_MAP 定义时返回空字典。"""
py_file = tmp_path / "empty.py"
py_file.write_text("# 空文件\n", encoding="utf-8")
result = parse_table_map(py_file)
assert result == {}
def test_parse_synthetic_table_map(self, tmp_path):
"""解析合成的 TABLE_MAP 定义。"""
py_file = tmp_path / "task.py"
py_file.write_text(
'TABLE_MAP: dict[str, str] = {\n'
' "dwd.dim_foo": "ods.foo_master",\n'
' "dwd.dwd_bar": "ods.bar_records",\n'
'}\n',
encoding="utf-8",
)
result = parse_table_map(py_file)
assert result == {
"dwd.dim_foo": "ods.foo_master",
"dwd.dwd_bar": "ods.bar_records",
}
class TestParseFactMappings:
"""从 ETL 源码解析 FACT_MAPPINGS。"""
def test_parse_real_file(self):
"""解析真实的 dwd_load_task.py,应返回非空字典。"""
result = parse_fact_mappings()
if Path("apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py").exists():
assert len(result) > 0
for dwd_t, mappings in result.items():
assert dwd_t.startswith("dwd."), f"DWD 表名应以 dwd. 开头: {dwd_t}"
for entry in mappings:
assert len(entry) == 3, f"映射条目应为 3 元组: {entry}"
def test_parse_nonexistent_file(self, tmp_path):
"""文件不存在时返回空字典。"""
result = parse_fact_mappings(tmp_path / "nonexistent.py")
assert result == {}
class TestBuildFieldMappings:
"""build_field_mappings 三层映射构建。"""
def _make_test_data(self):
"""构造测试用的 TableCollectionResult + TABLE_MAP + FACT_MAPPINGS。"""
fields = OrderedDict()
fields["id"] = FieldInfo(
path="id", json_type="integer", sample="123",
depth=0, occurrence=10, total_records=10,
samples=["123", "456", "789"],
)
fields["name"] = FieldInfo(
path="name", json_type="string", sample="测试",
depth=0, occurrence=10, total_records=10,
samples=["测试", "示例"],
)
fields["extra.nested"] = FieldInfo(
path="extra.nested", json_type="string", sample="嵌套值",
depth=1, occurrence=5, total_records=10,
samples=["嵌套值"],
)
ods_cols = [
ColumnInfo(name="id", data_type="BIGINT", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
ColumnInfo(name="name", data_type="TEXT", is_nullable=True,
column_default=None, comment="名称", ordinal_position=2),
]
dwd_cols_main = [
ColumnInfo(name="foo_id", data_type="BIGINT", is_nullable=False,
column_default=None, comment="主键", ordinal_position=1),
ColumnInfo(name="name", data_type="TEXT", is_nullable=True,
column_default=None, comment="名称", ordinal_position=2),
ColumnInfo(name="scd2_start_time", data_type="TIMESTAMPTZ", is_nullable=True,
column_default=None, comment=None, ordinal_position=3),
]
result = TableCollectionResult(
table_name="foo_master",
task_code="ODS_FOO",
description="测试表",
endpoint="/api/foo",
record_count=10,
json_fields=fields,
ods_columns=ods_cols,
dwd_columns=dwd_cols_main,
dwd_tables={"dim_foo": dwd_cols_main},
)
table_map = {"dwd.dim_foo": "ods.foo_master"}
fact_mappings = {
"dwd.dim_foo": [("foo_id", "id", None)],
}
all_dwd_cols = {"dim_foo": dwd_cols_main}
return result, table_map, fact_mappings, all_dwd_cols
def test_anchors_generated(self):
"""锚点 ID 格式正确。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
assert mapping["anchors"]["api"] == "api-foo-master"
assert mapping["anchors"]["ods"] == "ods-foo-master"
assert "dim_foo" in mapping["anchors"]["dwd"]
assert mapping["anchors"]["dwd"]["dim_foo"] == "dwd-dim-foo"
def test_json_to_ods_mapping(self):
"""JSON→ODS 映射:已映射和未映射字段正确标注。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
j2o = mapping["json_to_ods"]
assert len(j2o) == 3
# id → 已映射
id_entry = next(e for e in j2o if e["json_path"] == "id")
assert id_entry["ods_col"] == "id"
assert id_entry["match_type"] in ("exact", "case_insensitive")
# name → 已映射
name_entry = next(e for e in j2o if e["json_path"] == "name")
assert name_entry["ods_col"] == "name"
# extra.nested → 未映射(ODS 无此列)
nested_entry = next(e for e in j2o if e["json_path"] == "extra.nested")
assert nested_entry["ods_col"] is None
assert nested_entry["match_type"] == "unmapped"
def test_ods_to_dwd_mapping(self):
"""ODS→DWD 映射:id 列映射到 dim_foo.foo_id。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
o2d = mapping["ods_to_dwd"]
# id → foo_id(通过 FACT_MAPPINGS)
assert "id" in o2d
targets = o2d["id"]
assert any(t["dwd_col"] == "foo_id" and t["dwd_table"] == "dim_foo" for t in targets)
# name → name(同名直传)
assert "name" in o2d
name_targets = o2d["name"]
assert any(t["dwd_col"] == "name" and t["dwd_table"] == "dim_foo" for t in name_targets)
def test_dwd_to_ods_mapping(self):
"""DWD→ODS 映射:包含直接映射和 SCD2 列。"""
result, tm, fm, dwd = self._make_test_data()
mapping = build_field_mappings(result, tm, fm, dwd)
d2o = mapping["dwd_to_ods"]
assert "dim_foo" in d2o
cols = d2o["dim_foo"]
# foo_id ← id(FACT_MAPPINGS 显式映射)
foo_id_entry = next(c for c in cols if c["dwd_col"] == "foo_id")
assert foo_id_entry["ods_source"] == "id"
assert foo_id_entry["mapping_type"] == "直接"
# name ← name(同名直传)
name_entry = next(c for c in cols if c["dwd_col"] == "name")
assert name_entry["ods_source"] == "name"
assert name_entry["mapping_type"] == "直接"
assert name_entry["note"] == "同名直传"
# scd2_start_time(SCD2 元数据)
scd2_entry = next(c for c in cols if c["dwd_col"] == "scd2_start_time")
assert scd2_entry["mapping_type"] == "SCD2"
assert scd2_entry["ods_source"] == "—"
def test_field_mappings_dumped_to_file(self, tmp_path):
"""dump_collection_results 生成 field_mappings JSON 文件。"""
result, _, _, _ = self._make_test_data()
dump_collection_results([result], tmp_path)
fm_file = tmp_path / "field_mappings" / "foo_master.json"
assert fm_file.exists()
data = _json.loads(fm_file.read_text(encoding="utf-8"))
assert data["ods_table"] == "foo_master"
assert "anchors" in data
assert "json_to_ods" in data
assert "ods_to_dwd" in data
assert "dwd_to_ods" in data
def test_no_dwd_tables_produces_empty_mappings(self):
"""ODS 表无 DWD 映射时,ods_to_dwd 和 dwd_to_ods 为空。"""
fields = OrderedDict()
fields["id"] = FieldInfo(
path="id", json_type="integer", sample="1",
depth=0, occurrence=1, total_records=1,
samples=["1"],
)
ods_cols = [
ColumnInfo(name="id", data_type="BIGINT", is_nullable=False,
column_default=None, comment=None, ordinal_position=1),
]
result = TableCollectionResult(
table_name="orphan_table",
task_code="ODS_ORPHAN",
description="无 DWD 映射的表",
endpoint="/api/orphan",
record_count=1,
json_fields=fields,
ods_columns=ods_cols,
dwd_columns=[],
dwd_tables={},
)
mapping = build_field_mappings(result, {}, {}, {})
assert mapping["ods_to_dwd"] == {}
assert mapping["dwd_to_ods"] == {}
assert mapping["anchors"]["dwd"] == {}
# ---------------------------------------------------------------------------
# 新增测试:多示例值采集(samples 字段)
# ---------------------------------------------------------------------------
def test_flatten_json_tree_collects_multiple_samples():
"""flatten_json_tree 为每个字段收集多个不同的示例值。"""
records = [
{"status": 1, "name": "Alice"},
{"status": 2, "name": "Bob"},
{"status": 1, "name": "Charlie"},
{"status": 3, "name": "Alice"},
]
result = flatten_json_tree(records)
# status 有 3 个不同值
assert "status" in result
fi_status = result["status"]
assert set(fi_status.samples) == {"1", "2", "3"}
# name 有 3 个不同值
fi_name = result["name"]
assert set(fi_name.samples) == {"Alice", "Bob", "Charlie"}
def test_flatten_json_tree_samples_limit():
"""samples 数量不超过 MAX_SAMPLES。"""
from dataflow_analyzer import MAX_SAMPLES
# 生成超过 MAX_SAMPLES 个不同值
records = [{"val": i} for i in range(MAX_SAMPLES + 5)]
result = flatten_json_tree(records)
fi = result["val"]
assert len(fi.samples) <= MAX_SAMPLES
def test_flatten_json_tree_samples_skip_none():
"""None 值不被收集到 samples 中。"""
records = [
{"x": None},
{"x": 42},
{"x": None},
{"x": 99},
]
result = flatten_json_tree(records)
fi = result["x"]
assert "None" not in fi.samples
assert set(fi.samples) == {"42", "99"}
# ---------------------------------------------------------------------------
# 新增测试:parse_bd_manual_fields / load_bd_descriptions / dump_bd_descriptions
# ---------------------------------------------------------------------------
from dataflow_analyzer import (
parse_bd_manual_fields,
dump_bd_descriptions,
)
class TestParseBdManualFields:
"""从 BD_manual Markdown 文档解析字段说明。"""
def test_parse_standard_format(self, tmp_path):
"""标准 BD_manual 格式解析。"""
doc = tmp_path / "BD_manual_test.md"
doc.write_text(
"# test 测试表\n\n"
"## 字段说明\n\n"
"| 序号 | 字段名 | 类型 | 可空 | 说明 |\n"
"|------|--------|------|------|------|\n"
"| 1 | id | BIGINT | NO | 主键 ID |\n"
"| 2 | name | TEXT | YES | 名称字段 |\n"
"| 3 | status | INTEGER | YES | 状态:1=启用,0=禁用 |\n",
encoding="utf-8",
)
result = parse_bd_manual_fields(doc)
assert result["id"] == "主键 ID"
assert result["name"] == "名称字段"
assert result["status"] == "状态:1=启用,0=禁用"
def test_parse_nonexistent_file(self, tmp_path):
"""文件不存在时返回空字典。"""
result = parse_bd_manual_fields(tmp_path / "nonexistent.md")
assert result == {}
def test_parse_no_field_table(self, tmp_path):
"""文件存在但无字段说明表格时返回空字典。"""
doc = tmp_path / "empty.md"
doc.write_text("# 空文档\n\n没有字段说明。\n", encoding="utf-8")
result = parse_bd_manual_fields(doc)
assert result == {}
def test_parse_dwd_format_with_pk_column(self, tmp_path):
"""DWD 格式(含主键列)也能正确解析。"""
doc = tmp_path / "BD_manual_dim_test.md"
doc.write_text(
"# dim_test 测试维表\n\n"
"## 字段说明\n\n"
"| 序号 | 字段名 | 类型 | 可空 | 主键 | 说明 |\n"
"|------|--------|------|------|------|------|\n"
"| 1 | test_id | BIGINT | NO | PK | 唯一标识 |\n"
"| 2 | label | TEXT | YES | | 标签名 |\n",
encoding="utf-8",
)
result = parse_bd_manual_fields(doc)
assert result["test_id"] == "唯一标识"
assert result["label"] == "标签名"
def test_parse_real_ods_doc(self):
"""解析真实的 ODS BD_manual 文档(如果存在)。"""
doc = Path("apps/etl/connectors/feiqiu/docs/database/ODS/main/BD_manual_assistant_accounts_master.md")
if doc.exists():
result = parse_bd_manual_fields(doc)
assert len(result) > 0
# 应包含 id 字段
assert "id" in result
def test_parse_real_dwd_doc(self):
"""解析真实的 DWD BD_manual 文档(如果存在)。"""
doc = Path("apps/etl/connectors/feiqiu/docs/database/DWD/main/BD_manual_dim_assistant.md")
if doc.exists():
result = parse_bd_manual_fields(doc)
assert len(result) > 0
assert "assistant_id" in result
class TestDumpBdDescriptions:
"""dump_bd_descriptions 输出 BD 描述 JSON。"""
def test_dump_creates_files(self, tmp_path):
"""为每张表生成 bd_descriptions JSON 文件。"""
results = [_make_result(table="test_table")]
dump_bd_descriptions(results, tmp_path)
bd_file = tmp_path / "bd_descriptions" / "test_table.json"
assert bd_file.exists()
data = _json.loads(bd_file.read_text(encoding="utf-8"))
assert data["ods_table"] == "test_table"
assert "ods_fields" in data
assert "dwd_fields" in data
def test_dump_empty_results(self, tmp_path):
"""空结果列表不报错。"""
dump_bd_descriptions([], tmp_path)
assert (tmp_path / "bd_descriptions").is_dir()
# ---------------------------------------------------------------------------
# 新增测试:gen_dataflow_report.py 报告生成器
# ---------------------------------------------------------------------------
from gen_dataflow_report import generate_report, _esc, _format_samples, _is_enum_like
class TestReportHelpers:
"""报告生成器辅助函数测试。"""
def test_esc_pipe(self):
"""管道符被转义。"""
assert _esc("a|b") == "a\\|b"
def test_esc_newline(self):
"""换行符被替换为空格。"""
assert _esc("a\nb") == "a b"
def test_esc_none(self):
"""空字符串返回空。"""
assert _esc("") == ""
def test_format_samples_basic(self):
"""基本多示例格式化。"""
result = _format_samples(["a", "b", "c"])
assert "`a`" in result
assert "`b`" in result
assert "`c`" in result
def test_format_samples_truncate(self):
"""超长示例值被截断。"""
long_val = "x" * 50
result = _format_samples([long_val])
assert "..." in result
def test_format_samples_empty(self):
"""空列表返回空字符串。"""
assert _format_samples([]) == ""
def test_is_enum_like_true(self):
"""少量不同值 + 足够记录数 → 枚举。"""
assert _is_enum_like(["1", "2", "3"], 100) is True
def test_is_enum_like_false_too_many(self):
"""超过 8 个不同值 → 非枚举。"""
assert _is_enum_like([str(i) for i in range(10)], 100) is False
def test_is_enum_like_false_single(self):
"""只有 1 个值 → 非枚举。"""
assert _is_enum_like(["1"], 100) is False
def test_is_enum_like_false_few_records(self):
"""记录数太少 → 非枚举。"""
assert _is_enum_like(["1", "2"], 3) is False
class TestGenerateReport:
"""generate_report 集成测试。"""
def _setup_data_dir(self, tmp_path):
"""构造最小化的数据目录用于报告生成。"""
# collection_manifest.json
manifest = {
"timestamp": "2026-02-16T22:00:00+08:00",
"table_map": {"dwd.dwd_test": "ods.test_table"},
"tables": [{
"table": "test_table",
"task_code": "ODS_TEST",
"description": "测试表",
"record_count": 10,
"json_field_count": 2,
"ods_column_count": 2,
"dwd_tables": ["dwd_test"],
"dwd_column_count": 3,
"error": None,
}],
}
(tmp_path / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
# json_trees/test_table.json
jt_dir = tmp_path / "json_trees"
jt_dir.mkdir()
jt = {
"table": "test_table", "total_records": 10,
"fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10,
"samples": ["1", "2", "3"]},
{"path": "status", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10,
"samples": ["0", "1", "2"]},
],
}
(jt_dir / "test_table.json").write_text(
_json.dumps(jt, ensure_ascii=False), encoding="utf-8"
)
# db_schemas/
db_dir = tmp_path / "db_schemas"
db_dir.mkdir()
ods = {"schema": "ods", "table": "test_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": "主键", "ordinal_position": 1},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
]}
(db_dir / "ods_test_table.json").write_text(
_json.dumps(ods, ensure_ascii=False), encoding="utf-8"
)
dwd = {"schema": "dwd", "table": "dwd_test", "ods_source": "test_table", "columns": [
{"name": "test_id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
{"name": "scd2_start_time", "data_type": "TIMESTAMPTZ", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 3},
]}
(db_dir / "dwd_dwd_test.json").write_text(
_json.dumps(dwd, ensure_ascii=False), encoding="utf-8"
)
# field_mappings/test_table.json
fm_dir = tmp_path / "field_mappings"
fm_dir.mkdir()
fm = {
"ods_table": "test_table",
"anchors": {
"api": "api-test-table", "ods": "ods-test-table",
"dwd": {"dwd_test": "dwd-dwd-test"},
},
"json_to_ods": [
{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
{"json_path": "status", "ods_col": "status", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
],
"ods_to_dwd": {
"id": [{"dwd_table": "dwd_test", "dwd_col": "test_id", "cast": None, "note": "字段重命名"}],
"status": [{"dwd_table": "dwd_test", "dwd_col": "status", "cast": None, "note": "同名直传"}],
},
"dwd_to_ods": {
"dwd_test": [
{"dwd_col": "test_id", "type": "BIGINT", "ods_source": "id", "mapping_type": "直接", "note": "字段重命名"},
{"dwd_col": "status", "type": "INTEGER", "ods_source": "status", "mapping_type": "直接", "note": "同名直传"},
{"dwd_col": "scd2_start_time", "type": "TIMESTAMPTZ", "ods_source": "—", "mapping_type": "SCD2", "note": "SCD2 元数据"},
],
},
}
(fm_dir / "test_table.json").write_text(
_json.dumps(fm, ensure_ascii=False), encoding="utf-8"
)
# bd_descriptions/test_table.json
bd_dir = tmp_path / "bd_descriptions"
bd_dir.mkdir()
bd = {
"ods_table": "test_table",
"ods_fields": {"id": "主键 ID", "status": "状态:0=禁用,1=启用"},
"dwd_fields": {"dwd_test": {"test_id": "唯一标识", "status": "状态"}},
}
(bd_dir / "test_table.json").write_text(
_json.dumps(bd, ensure_ascii=False), encoding="utf-8"
)
return tmp_path
def test_report_contains_json_field_count(self, tmp_path):
"""总览表包含 API JSON 字段数列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "API JSON 字段数" in report
def test_report_contains_field_diff(self, tmp_path):
"""报告包含字段对比差异章节。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "API↔ODS↔DWD 字段对比差异" in report
def test_report_contains_biz_desc_in_coverage(self, tmp_path):
"""覆盖率表包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
# 2.3 覆盖率表应有业务描述列头
lines = report.split("\n")
coverage_header = [l for l in lines if "业务描述" in l and "JSON 字段数" in l]
assert len(coverage_header) > 0
def test_report_contains_biz_desc_in_api_section(self, tmp_path):
"""API 源字段表包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "主键 ID" in report # 来自 bd_descriptions
def test_report_contains_biz_desc_in_ods_section(self, tmp_path):
"""ODS 表结构包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
# ODS 表头应有业务描述
assert "| # | ODS 列名 | 类型 | ← JSON 源 | → DWD 目标 | 业务描述 |" in report
def test_report_contains_biz_desc_in_dwd_section(self, tmp_path):
"""DWD 表结构包含业务描述列。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert "| # | DWD 列名 | 类型 | ← ODS 来源 | 转换 | 业务描述 |" in report
def test_report_contains_enum_samples(self, tmp_path):
"""枚举字段展示枚举值列表。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
# status 有 3 个不同值,应被识别为枚举
assert "枚举值" in report
def test_report_anchors_present(self, tmp_path):
"""报告包含锚点标签。"""
data_dir = self._setup_data_dir(tmp_path)
report = generate_report(data_dir)
assert '' in report
assert '' in report
assert '' in report
class TestFieldDiffSubTables:
"""测试 1.1 字段差异分表的生成逻辑。"""
def _setup_diff_data_dir(self, tmp_path):
"""构造包含多种差异类型的测试数据。"""
import json as _json
manifest = {
"timestamp": "2026-02-17T22:00:00+08:00",
"table_map": {"dwd.dwd_alpha": "ods.alpha_table"},
"tables": [{
"table": "alpha_table",
"task_code": "ODS_ALPHA",
"description": "差异测试表",
"record_count": 50,
"json_field_count": 6,
"ods_column_count": 5,
"dwd_tables": ["dwd_alpha"],
"dwd_column_count": 5,
"error": None,
}],
}
(tmp_path / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
# json_trees
jt_dir = tmp_path / "json_trees"
jt_dir.mkdir()
jt = {
"table": "alpha_table", "total_records": 50,
"fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["1"]},
{"path": "name", "json_type": "string", "sample": "foo",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["foo"]},
{"path": "extra_flat", "json_type": "string", "sample": "x",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["x"]},
{"path": "nested.a", "json_type": "string", "sample": "na",
"depth": 1, "occurrence": 50, "total_records": 50, "samples": ["na"]},
{"path": "nested.b", "json_type": "string", "sample": "nb",
"depth": 1, "occurrence": 50, "total_records": 50, "samples": ["nb"]},
{"path": "status", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 50, "total_records": 50, "samples": ["0", "1"]},
],
}
(jt_dir / "alpha_table.json").write_text(
_json.dumps(jt, ensure_ascii=False), encoding="utf-8"
)
# db_schemas
db_dir = tmp_path / "db_schemas"
db_dir.mkdir()
ods = {"schema": "ods", "table": "alpha_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "name", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 3},
{"name": "ods_only_col", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 4},
{"name": "no_dwd_col", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 5},
]}
(db_dir / "ods_alpha_table.json").write_text(
_json.dumps(ods, ensure_ascii=False), encoding="utf-8"
)
dwd = {"schema": "dwd", "table": "dwd_alpha", "ods_source": "alpha_table", "columns": [
{"name": "alpha_id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "name", "data_type": "TEXT", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
{"name": "status", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 3},
{"name": "scd2_ver", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 4},
{"name": "derived_flag", "data_type": "BOOLEAN", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 5},
]}
(db_dir / "dwd_dwd_alpha.json").write_text(
_json.dumps(dwd, ensure_ascii=False), encoding="utf-8"
)
# field_mappings — 构造各种差异
fm_dir = tmp_path / "field_mappings"
fm_dir.mkdir()
fm = {
"ods_table": "alpha_table",
"anchors": {
"api": "api-alpha-table", "ods": "ods-alpha-table",
"dwd": {"dwd_alpha": "dwd-dwd-alpha"},
},
"json_to_ods": [
{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
{"json_path": "name", "ods_col": "name", "match_type": "exact",
"json_type": "string", "occurrence_pct": 100.0},
{"json_path": "status", "ods_col": "status", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
# 平层未映射
{"json_path": "extra_flat", "ods_col": None, "match_type": "unmapped",
"json_type": "string", "occurrence_pct": 100.0},
# 嵌套未映射
{"json_path": "nested.a", "ods_col": None, "match_type": "unmapped",
"json_type": "string", "occurrence_pct": 100.0},
{"json_path": "nested.b", "ods_col": None, "match_type": "unmapped",
"json_type": "string", "occurrence_pct": 100.0},
],
"ods_to_dwd": {
"id": [{"dwd_table": "dwd_alpha", "dwd_col": "alpha_id", "cast": None, "note": "重命名"}],
"name": [{"dwd_table": "dwd_alpha", "dwd_col": "name", "cast": None, "note": ""}],
"status": [{"dwd_table": "dwd_alpha", "dwd_col": "status", "cast": None, "note": ""}],
# ods_only_col 和 no_dwd_col 不在此 → ODS→DWD 未映射
},
"dwd_to_ods": {
"dwd_alpha": [
{"dwd_col": "alpha_id", "type": "BIGINT", "ods_source": "id",
"mapping_type": "直接", "note": "重命名"},
{"dwd_col": "name", "type": "TEXT", "ods_source": "name",
"mapping_type": "直接", "note": ""},
{"dwd_col": "status", "type": "INTEGER", "ods_source": "status",
"mapping_type": "直接", "note": ""},
# DWD 无 ODS 源
{"dwd_col": "scd2_ver", "type": "INTEGER", "ods_source": "—",
"mapping_type": "SCD2", "note": "SCD2 元数据"},
{"dwd_col": "derived_flag", "type": "BOOLEAN", "ods_source": "—",
"mapping_type": "派生", "note": "派生列"},
],
},
}
(fm_dir / "alpha_table.json").write_text(
_json.dumps(fm, ensure_ascii=False), encoding="utf-8"
)
# bd_descriptions
bd_dir = tmp_path / "bd_descriptions"
bd_dir.mkdir()
bd = {
"ods_table": "alpha_table",
"ods_fields": {"id": "主键", "name": "名称"},
"dwd_fields": {"dwd_alpha": {"alpha_id": "标识", "name": "名称"}},
}
(bd_dir / "alpha_table.json").write_text(
_json.dumps(bd, ensure_ascii=False), encoding="utf-8"
)
return tmp_path
def test_summary_table_has_links(self, tmp_path):
"""汇总表中非零差异数应包含跳转链接。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
# API→ODS 未映射 = 3(1 平层 + 2 嵌套),应有链接
assert "[3](#diff-alpha-table)" in report
def test_summary_table_zero_no_link(self, tmp_path):
"""汇总表中差异数为 0 时不应有链接。"""
import json as _json
data_dir = self._setup_diff_data_dir(tmp_path)
# 添加一个全部映射的表,使其所有差异列为 0
manifest = _json.loads((data_dir / "collection_manifest.json").read_text(encoding="utf-8"))
manifest["tables"].append({
"table": "zero_table", "task_code": "ODS_ZERO", "description": "零差异",
"record_count": 5, "json_field_count": 1, "ods_column_count": 1,
"dwd_tables": ["dwd_zero"], "dwd_column_count": 1, "error": None,
})
(data_dir / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
fm_z = {
"ods_table": "zero_table",
"anchors": {"api": "api-zero-table", "ods": "ods-zero-table",
"dwd": {"dwd_zero": "dwd-dwd-zero"}},
"json_to_ods": [{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0}],
"ods_to_dwd": {"id": [{"dwd_table": "dwd_zero", "dwd_col": "id", "cast": None, "note": ""}]},
"dwd_to_ods": {"dwd_zero": [{"dwd_col": "id", "type": "BIGINT", "ods_source": "id",
"mapping_type": "直接", "note": ""}]},
}
(data_dir / "field_mappings" / "zero_table.json").write_text(
_json.dumps(fm_z, ensure_ascii=False), encoding="utf-8"
)
ods_z = {"schema": "ods", "table": "zero_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1}]}
(data_dir / "db_schemas" / "ods_zero_table.json").write_text(
_json.dumps(ods_z, ensure_ascii=False), encoding="utf-8"
)
dwd_z = {"schema": "dwd", "table": "dwd_zero", "ods_source": "zero_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1}]}
(data_dir / "db_schemas" / "dwd_dwd_zero.json").write_text(
_json.dumps(dwd_z, ensure_ascii=False), encoding="utf-8"
)
jt_z = {"table": "zero_table", "total_records": 5, "fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 5, "total_records": 5, "samples": ["1"]}]}
(data_dir / "json_trees" / "zero_table.json").write_text(
_json.dumps(jt_z, ensure_ascii=False), encoding="utf-8"
)
bd_z = {"ods_table": "zero_table", "ods_fields": {}, "dwd_fields": {}}
(data_dir / "bd_descriptions" / "zero_table.json").write_text(
_json.dumps(bd_z, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
# zero_table 行应有 "| 0 |" 且不含链接
zero_line = [l for l in report.split("\n") if "zero_table" in l and "| 0 |" in l]
assert len(zero_line) > 0
assert "#diff-zero-table" not in report
def test_diff_anchor_present(self, tmp_path):
"""分表应包含锚点标签。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert '' in report
def test_diff_subtable_title(self, tmp_path):
"""分表应有递增编号标题。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "#### 1.1.1 alpha_table 字段差异明细" in report
# 不应出现 1.1.x
assert "1.1.x" not in report
def test_api_unmapped_flat_bold(self, tmp_path):
"""平层未映射字段行应加粗。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "**[`extra_flat`](#api-alpha-table)**" in report
assert "**⚠️ 未映射**" in report
def test_api_unmapped_nested_in_details(self, tmp_path):
"""嵌套未映射字段应在 折叠块中。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "API→ODS 未映射(嵌套对象)" in report
assert "nested.a" in report
assert "nested.b" in report
assert "
" in report
def test_ods_no_dwd_fields_bold(self, tmp_path):
"""ODS→DWD 未映射字段行应加粗。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
# ods_only_col 和 no_dwd_col 不在 ods_to_dwd 中
assert "**[`ods_only_col`](#ods-alpha-table)**" in report
assert "**[`no_dwd_col`](#ods-alpha-table)**" in report
def test_dwd_no_ods_fields_bold(self, tmp_path):
"""DWD 无 ODS 源字段行应加粗。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "**[`scd2_ver`](#dwd-dwd-alpha)**" in report
assert "**[`derived_flag`](#dwd-dwd-alpha)**" in report
assert "**⚠️ 无 ODS 源**" in report
def test_no_diff_table_skipped(self, tmp_path):
"""无差异的表不应生成分表。"""
import json as _json
data_dir = self._setup_diff_data_dir(tmp_path)
# 添加一个无差异的表
manifest = _json.loads((data_dir / "collection_manifest.json").read_text(encoding="utf-8"))
manifest["tables"].append({
"table": "clean_table",
"task_code": "ODS_CLEAN",
"description": "无差异表",
"record_count": 10,
"json_field_count": 1,
"ods_column_count": 1,
"dwd_tables": ["dwd_clean"],
"dwd_column_count": 1,
"error": None,
})
(data_dir / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
# 构造完全匹配的 field_mappings
fm_clean = {
"ods_table": "clean_table",
"anchors": {"api": "api-clean-table", "ods": "ods-clean-table",
"dwd": {"dwd_clean": "dwd-dwd-clean"}},
"json_to_ods": [
{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0},
],
"ods_to_dwd": {"id": [{"dwd_table": "dwd_clean", "dwd_col": "id",
"cast": None, "note": ""}]},
"dwd_to_ods": {"dwd_clean": [
{"dwd_col": "id", "type": "BIGINT", "ods_source": "id",
"mapping_type": "直接", "note": ""},
]},
}
(data_dir / "field_mappings" / "clean_table.json").write_text(
_json.dumps(fm_clean, ensure_ascii=False), encoding="utf-8"
)
ods_clean = {"schema": "ods", "table": "clean_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
]}
(data_dir / "db_schemas" / "ods_clean_table.json").write_text(
_json.dumps(ods_clean, ensure_ascii=False), encoding="utf-8"
)
dwd_clean = {"schema": "dwd", "table": "dwd_clean", "ods_source": "clean_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
]}
(data_dir / "db_schemas" / "dwd_dwd_clean.json").write_text(
_json.dumps(dwd_clean, ensure_ascii=False), encoding="utf-8"
)
jt_clean = {
"table": "clean_table", "total_records": 10,
"fields": [{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10, "samples": ["1"]}],
}
(data_dir / "json_trees" / "clean_table.json").write_text(
_json.dumps(jt_clean, ensure_ascii=False), encoding="utf-8"
)
bd_clean = {"ods_table": "clean_table", "ods_fields": {}, "dwd_fields": {}}
(data_dir / "bd_descriptions" / "clean_table.json").write_text(
_json.dumps(bd_clean, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
# clean_table 不应有差异分表
assert "clean_table 字段差异明细" not in report
# 但 alpha_table 应有
assert "alpha_table 字段差异明细" in report
def test_reason_includes_nested_and_scd2(self, tmp_path):
"""差异原因应包含嵌套对象和 SCD2 信息。"""
data_dir = self._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "嵌套对象 2 个" in report
assert "SCD2/派生列 2 个" in report
class TestDiffSubTableColumns:
"""测试差异分表中列的输出格式(推测用途/置信度已移除,改为人工处理)。"""
def test_flat_unmapped_header(self, tmp_path):
"""平层未映射分表应包含示例值、说明列(无推测用途/置信度)。"""
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "| # | JSON 字段 | 示例值 | 说明 | 状态 |" in report
def test_dwd_no_ods_header(self, tmp_path):
"""DWD 无 ODS 源子表应包含说明列(无推测用途/置信度)。"""
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
assert "| # | DWD 表 | DWD 列 | 说明 | 状态 |" in report
def test_section_numbering_incremental(self, tmp_path):
"""多个差异分表应有递增编号 1.1.1, 1.1.2, ...。"""
import json as _json
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
manifest = _json.loads((data_dir / "collection_manifest.json").read_text(encoding="utf-8"))
manifest["tables"].append({
"table": "beta_table", "task_code": "ODS_BETA", "description": "第二表",
"record_count": 10, "json_field_count": 1, "ods_column_count": 1,
"dwd_tables": ["dwd_beta"], "dwd_column_count": 2, "error": None,
})
(data_dir / "collection_manifest.json").write_text(
_json.dumps(manifest, ensure_ascii=False), encoding="utf-8"
)
fm_b = {
"ods_table": "beta_table",
"anchors": {"api": "api-beta-table", "ods": "ods-beta-table",
"dwd": {"dwd_beta": "dwd-dwd-beta"}},
"json_to_ods": [{"json_path": "id", "ods_col": "id", "match_type": "exact",
"json_type": "integer", "occurrence_pct": 100.0}],
"ods_to_dwd": {"id": [{"dwd_table": "dwd_beta", "dwd_col": "id", "cast": None, "note": ""}]},
"dwd_to_ods": {"dwd_beta": [
{"dwd_col": "id", "type": "BIGINT", "ods_source": "id", "mapping_type": "直接", "note": ""},
{"dwd_col": "scd2_flag", "type": "INTEGER", "ods_source": "—", "mapping_type": "SCD2", "note": ""},
]},
}
(data_dir / "field_mappings" / "beta_table.json").write_text(
_json.dumps(fm_b, ensure_ascii=False), encoding="utf-8"
)
ods_b = {"schema": "ods", "table": "beta_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1}]}
(data_dir / "db_schemas" / "ods_beta_table.json").write_text(
_json.dumps(ods_b, ensure_ascii=False), encoding="utf-8"
)
dwd_b = {"schema": "dwd", "table": "dwd_beta", "ods_source": "beta_table", "columns": [
{"name": "id", "data_type": "BIGINT", "is_nullable": False,
"column_default": None, "comment": None, "ordinal_position": 1},
{"name": "scd2_flag", "data_type": "INTEGER", "is_nullable": True,
"column_default": None, "comment": None, "ordinal_position": 2},
]}
(data_dir / "db_schemas" / "dwd_dwd_beta.json").write_text(
_json.dumps(dwd_b, ensure_ascii=False), encoding="utf-8"
)
jt_b = {"table": "beta_table", "total_records": 10, "fields": [
{"path": "id", "json_type": "integer", "sample": "1",
"depth": 0, "occurrence": 10, "total_records": 10, "samples": ["1"]}]}
(data_dir / "json_trees" / "beta_table.json").write_text(
_json.dumps(jt_b, ensure_ascii=False), encoding="utf-8"
)
bd_b = {"ods_table": "beta_table", "ods_fields": {}, "dwd_fields": {}}
(data_dir / "bd_descriptions" / "beta_table.json").write_text(
_json.dumps(bd_b, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
assert "#### 1.1.1 alpha_table 字段差异明细" in report
assert "#### 1.1.2 beta_table 字段差异明细" in report
def test_sample_value_in_flat_unmapped(self, tmp_path):
"""平层未映射字段应显示来自 json_trees 的示例值。"""
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
report = generate_report(data_dir)
lines = report.split("\n")
flat_rows = [l for l in lines if "extra_flat" in l and "未映射" in l]
assert len(flat_rows) >= 1
assert "`x`" in flat_rows[0]
def test_bd_desc_in_ods_no_json(self, tmp_path):
"""ODS 无 JSON 源子表应显示来自 bd_descriptions 的说明。"""
import json as _json
from test_dataflow_analyzer import TestFieldDiffSubTables
inst = TestFieldDiffSubTables()
data_dir = inst._setup_diff_data_dir(tmp_path)
bd = {"ods_table": "alpha_table",
"ods_fields": {"ods_only_col": "仅ODS存在的测试列"},
"dwd_fields": {}}
(data_dir / "bd_descriptions" / "alpha_table.json").write_text(
_json.dumps(bd, ensure_ascii=False), encoding="utf-8"
)
report = generate_report(data_dir)
lines = report.split("\n")
ods_only_rows = [l for l in lines if "ods_only_col" in l and "无 JSON 源" in l]
assert len(ods_only_rows) >= 1
assert "仅ODS存在的测试列" in ods_only_rows[0]