Files
ZQYY.FQ-ETL/tests/unit/test_audit_doc_alignment.py

695 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
单元测试 — 文档对齐分析器 (doc_alignment_analyzer.py)
覆盖:
- scan_docs 文档来源识别
- extract_code_references 代码引用提取
- check_reference_validity 引用有效性检查
- find_undocumented_modules 缺失文档检测
- check_ddl_vs_dictionary DDL 与数据字典比对
- check_api_samples_vs_parsers API 样本与解析器比对
- render_alignment_report 报告渲染
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from scripts.audit import AlignmentIssue, DocMapping
from scripts.audit.doc_alignment_analyzer import (
_parse_ddl_tables,
_parse_dictionary_tables,
build_mappings,
check_api_samples_vs_parsers,
check_ddl_vs_dictionary,
check_reference_validity,
extract_code_references,
find_undocumented_modules,
render_alignment_report,
scan_docs,
)
# ---------------------------------------------------------------------------
# scan_docs
# ---------------------------------------------------------------------------
class TestScanDocs:
"""文档来源识别测试。"""
def test_finds_docs_dir_md(self, tmp_path: Path) -> None:
(tmp_path / "docs").mkdir()
(tmp_path / "docs" / "guide.md").write_text("# Guide", encoding="utf-8")
result = scan_docs(tmp_path)
assert "docs/guide.md" in result
def test_finds_root_readme(self, tmp_path: Path) -> None:
(tmp_path / "README.md").write_text("# Readme", encoding="utf-8")
result = scan_docs(tmp_path)
assert "README.md" in result
def test_finds_dev_notes(self, tmp_path: Path) -> None:
(tmp_path / "开发笔记").mkdir()
(tmp_path / "开发笔记" / "记录.md").write_text("笔记", encoding="utf-8")
result = scan_docs(tmp_path)
assert "开发笔记/记录.md" in result
def test_finds_module_readme(self, tmp_path: Path) -> None:
(tmp_path / "gui").mkdir()
(tmp_path / "gui" / "README.md").write_text("# GUI", encoding="utf-8")
result = scan_docs(tmp_path)
assert "gui/README.md" in result
def test_finds_steering_files(self, tmp_path: Path) -> None:
steering = tmp_path / ".kiro" / "steering"
steering.mkdir(parents=True)
(steering / "tech.md").write_text("# Tech", encoding="utf-8")
result = scan_docs(tmp_path)
assert ".kiro/steering/tech.md" in result
def test_finds_json_samples(self, tmp_path: Path) -> None:
sample_dir = tmp_path / "docs" / "test-json-doc"
sample_dir.mkdir(parents=True)
(sample_dir / "member.json").write_text("[]", encoding="utf-8")
result = scan_docs(tmp_path)
assert "docs/test-json-doc/member.json" in result
def test_empty_repo_returns_empty(self, tmp_path: Path) -> None:
result = scan_docs(tmp_path)
assert result == []
def test_results_sorted(self, tmp_path: Path) -> None:
(tmp_path / "docs").mkdir()
(tmp_path / "docs" / "z.md").write_text("z", encoding="utf-8")
(tmp_path / "docs" / "a.md").write_text("a", encoding="utf-8")
(tmp_path / "README.md").write_text("r", encoding="utf-8")
result = scan_docs(tmp_path)
assert result == sorted(result)
# ---------------------------------------------------------------------------
# extract_code_references
# ---------------------------------------------------------------------------
class TestExtractCodeReferences:
"""代码引用提取测试。"""
def test_extracts_backtick_paths(self, tmp_path: Path) -> None:
doc = tmp_path / "doc.md"
doc.write_text("使用 `tasks/base_task.py` 作为基类", encoding="utf-8")
refs = extract_code_references(doc)
assert "tasks/base_task.py" in refs
def test_extracts_class_names(self, tmp_path: Path) -> None:
doc = tmp_path / "doc.md"
doc.write_text("继承 `BaseTask` 类", encoding="utf-8")
refs = extract_code_references(doc)
assert "BaseTask" in refs
def test_skips_single_char(self, tmp_path: Path) -> None:
doc = tmp_path / "doc.md"
doc.write_text("变量 `x` 和 `y`", encoding="utf-8")
refs = extract_code_references(doc)
assert refs == []
def test_skips_pure_numbers(self, tmp_path: Path) -> None:
doc = tmp_path / "doc.md"
doc.write_text("版本 `2.0.0` 和 ID `12345`", encoding="utf-8")
refs = extract_code_references(doc)
assert refs == []
def test_deduplicates(self, tmp_path: Path) -> None:
doc = tmp_path / "doc.md"
doc.write_text("`foo.py` 和 `foo.py` 重复", encoding="utf-8")
refs = extract_code_references(doc)
assert refs.count("foo.py") == 1
def test_nonexistent_file_returns_empty(self, tmp_path: Path) -> None:
refs = extract_code_references(tmp_path / "nonexistent.md")
assert refs == []
def test_normalizes_backslash(self, tmp_path: Path) -> None:
doc = tmp_path / "doc.md"
doc.write_text("路径 `tasks\\base_task.py`", encoding="utf-8")
refs = extract_code_references(doc)
assert "tasks/base_task.py" in refs
# ---------------------------------------------------------------------------
# check_reference_validity
# ---------------------------------------------------------------------------
class TestCheckReferenceValidity:
"""引用有效性检查测试。"""
def test_valid_file_path(self, tmp_path: Path) -> None:
(tmp_path / "tasks").mkdir()
(tmp_path / "tasks" / "base.py").write_text("", encoding="utf-8")
assert check_reference_validity("tasks/base.py", tmp_path) is True
def test_invalid_file_path(self, tmp_path: Path) -> None:
assert check_reference_validity("nonexistent/file.py", tmp_path) is False
def test_strips_legacy_prefix(self, tmp_path: Path) -> None:
"""兼容旧包名前缀etl_billiards/和当前根目录前缀FQ-ETL/"""
(tmp_path / "tasks").mkdir()
(tmp_path / "tasks" / "x.py").write_text("", encoding="utf-8")
assert check_reference_validity("etl_billiards/tasks/x.py", tmp_path) is True
assert check_reference_validity("FQ-ETL/tasks/x.py", tmp_path) is True
def test_directory_path(self, tmp_path: Path) -> None:
(tmp_path / "loaders").mkdir()
assert check_reference_validity("loaders", tmp_path) is True
def test_dotted_module_path(self, tmp_path: Path) -> None:
(tmp_path / "config").mkdir()
(tmp_path / "config" / "settings.py").write_text("", encoding="utf-8")
assert check_reference_validity("config.settings", tmp_path) is True
# ---------------------------------------------------------------------------
# find_undocumented_modules
# ---------------------------------------------------------------------------
class TestFindUndocumentedModules:
"""缺失文档检测测试。"""
def test_finds_undocumented(self, tmp_path: Path) -> None:
tasks_dir = tmp_path / "tasks"
tasks_dir.mkdir()
(tasks_dir / "__init__.py").write_text("", encoding="utf-8")
(tasks_dir / "ods_task.py").write_text("", encoding="utf-8")
result = find_undocumented_modules(tmp_path, set())
assert "tasks/ods_task.py" in result
def test_excludes_init(self, tmp_path: Path) -> None:
tasks_dir = tmp_path / "tasks"
tasks_dir.mkdir()
(tasks_dir / "__init__.py").write_text("", encoding="utf-8")
result = find_undocumented_modules(tmp_path, set())
assert all("__init__" not in r for r in result)
def test_documented_module_excluded(self, tmp_path: Path) -> None:
tasks_dir = tmp_path / "tasks"
tasks_dir.mkdir()
(tasks_dir / "ods_task.py").write_text("", encoding="utf-8")
result = find_undocumented_modules(tmp_path, {"tasks/ods_task.py"})
assert "tasks/ods_task.py" not in result
def test_non_core_dirs_ignored(self, tmp_path: Path) -> None:
"""gui/ 不在核心代码目录列表中,不应被检测。"""
gui_dir = tmp_path / "gui"
gui_dir.mkdir()
(gui_dir / "main.py").write_text("", encoding="utf-8")
result = find_undocumented_modules(tmp_path, set())
assert all("gui/" not in r for r in result)
def test_results_sorted(self, tmp_path: Path) -> None:
tasks_dir = tmp_path / "tasks"
tasks_dir.mkdir()
(tasks_dir / "z_task.py").write_text("", encoding="utf-8")
(tasks_dir / "a_task.py").write_text("", encoding="utf-8")
result = find_undocumented_modules(tmp_path, set())
assert result == sorted(result)
# ---------------------------------------------------------------------------
# _parse_ddl_tables / _parse_dictionary_tables
# ---------------------------------------------------------------------------
class TestParseDdlTables:
"""DDL 解析测试。"""
def test_extracts_table_and_columns(self) -> None:
sql = """
CREATE TABLE IF NOT EXISTS dim_member (
member_id BIGINT,
nickname TEXT,
mobile TEXT,
PRIMARY KEY (member_id)
);
"""
result = _parse_ddl_tables(sql)
assert "dim_member" in result
assert "member_id" in result["dim_member"]
assert "nickname" in result["dim_member"]
assert "mobile" in result["dim_member"]
def test_handles_schema_prefix(self) -> None:
sql = "CREATE TABLE billiards_dwd.dim_site (\n site_id BIGINT\n);"
result = _parse_ddl_tables(sql)
assert "dim_site" in result
def test_excludes_sql_keywords(self) -> None:
sql = """
CREATE TABLE test_tbl (
id INTEGER,
PRIMARY KEY (id)
);
"""
result = _parse_ddl_tables(sql)
assert "primary" not in result.get("test_tbl", set())
class TestParseDictionaryTables:
"""数据字典解析测试。"""
def test_extracts_table_and_fields(self) -> None:
md = """## dim_member
| 字段 | 类型 | 说明 |
|------|------|------|
| member_id | BIGINT | 会员ID |
| nickname | TEXT | 昵称 |
"""
result = _parse_dictionary_tables(md)
assert "dim_member" in result
assert "member_id" in result["dim_member"]
assert "nickname" in result["dim_member"]
def test_skips_header_row(self) -> None:
md = """## dim_test
| 字段 | 类型 |
|------|------|
| col_a | INT |
"""
result = _parse_dictionary_tables(md)
assert "字段" not in result.get("dim_test", set())
def test_handles_backtick_table_name(self) -> None:
md = "## `dim_goods`\n\n| 字段 |\n| goods_id |"
result = _parse_dictionary_tables(md)
assert "dim_goods" in result
# ---------------------------------------------------------------------------
# check_ddl_vs_dictionary
# ---------------------------------------------------------------------------
class TestCheckDdlVsDictionary:
"""DDL 与数据字典比对测试。"""
def test_detects_missing_table_in_dictionary(self, tmp_path: Path) -> None:
# DDL 有表,字典没有
db_dir = tmp_path / "database"
db_dir.mkdir()
(db_dir / "schema_test.sql").write_text(
"CREATE TABLE dim_orphan (\n id BIGINT\n);",
encoding="utf-8",
)
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
(docs_dir / "dwd_main_tables_dictionary.md").write_text(
"## dim_other\n\n| 字段 |\n| id |",
encoding="utf-8",
)
issues = check_ddl_vs_dictionary(tmp_path)
missing = [i for i in issues if i.issue_type == "missing"]
assert any("dim_orphan" in i.description for i in missing)
def test_detects_column_mismatch(self, tmp_path: Path) -> None:
db_dir = tmp_path / "database"
db_dir.mkdir()
(db_dir / "schema_test.sql").write_text(
"CREATE TABLE dim_x (\n id BIGINT,\n extra_col TEXT\n);",
encoding="utf-8",
)
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
(docs_dir / "dwd_main_tables_dictionary.md").write_text(
"## dim_x\n\n| 字段 | 类型 |\n|---|---|\n| id | BIGINT |",
encoding="utf-8",
)
issues = check_ddl_vs_dictionary(tmp_path)
conflict = [i for i in issues if i.issue_type == "conflict"]
assert any("extra_col" in i.description for i in conflict)
def test_no_issues_when_aligned(self, tmp_path: Path) -> None:
db_dir = tmp_path / "database"
db_dir.mkdir()
(db_dir / "schema_test.sql").write_text(
"CREATE TABLE dim_ok (\n id BIGINT\n);",
encoding="utf-8",
)
docs_dir = tmp_path / "docs"
docs_dir.mkdir()
(docs_dir / "dwd_main_tables_dictionary.md").write_text(
"## dim_ok\n\n| 字段 | 类型 |\n|---|---|\n| id | BIGINT |",
encoding="utf-8",
)
issues = check_ddl_vs_dictionary(tmp_path)
assert len(issues) == 0
# ---------------------------------------------------------------------------
# check_api_samples_vs_parsers
# ---------------------------------------------------------------------------
class TestCheckApiSamplesVsParsers:
"""API 样本与解析器比对测试。"""
def test_detects_json_field_not_in_ods(self, tmp_path: Path) -> None:
# JSON 样本有 extra_fieldODS 没有
sample_dir = tmp_path / "docs" / "test-json-doc"
sample_dir.mkdir(parents=True)
(sample_dir / "test_entity.json").write_text(
json.dumps([{"id": 1, "name": "a", "extra_field": "x"}]),
encoding="utf-8",
)
db_dir = tmp_path / "database"
db_dir.mkdir()
(db_dir / "schema_ODS_doc.sql").write_text(
"CREATE TABLE billiards_ods.test_entity (\n"
" id BIGINT,\n name TEXT,\n"
" content_hash TEXT,\n payload JSONB\n);",
encoding="utf-8",
)
issues = check_api_samples_vs_parsers(tmp_path)
assert any("extra_field" in i.description for i in issues)
def test_no_issues_when_aligned(self, tmp_path: Path) -> None:
sample_dir = tmp_path / "docs" / "test-json-doc"
sample_dir.mkdir(parents=True)
(sample_dir / "aligned_entity.json").write_text(
json.dumps([{"id": 1, "name": "a"}]),
encoding="utf-8",
)
db_dir = tmp_path / "database"
db_dir.mkdir()
(db_dir / "schema_ODS_doc.sql").write_text(
"CREATE TABLE billiards_ods.aligned_entity (\n"
" id BIGINT,\n name TEXT,\n"
" content_hash TEXT,\n payload JSONB\n);",
encoding="utf-8",
)
issues = check_api_samples_vs_parsers(tmp_path)
assert len(issues) == 0
def test_skips_when_no_ods_table(self, tmp_path: Path) -> None:
sample_dir = tmp_path / "docs" / "test-json-doc"
sample_dir.mkdir(parents=True)
(sample_dir / "unknown.json").write_text(
json.dumps([{"a": 1}]),
encoding="utf-8",
)
db_dir = tmp_path / "database"
db_dir.mkdir()
(db_dir / "schema_ODS_doc.sql").write_text("-- empty", encoding="utf-8")
issues = check_api_samples_vs_parsers(tmp_path)
assert len(issues) == 0
# ---------------------------------------------------------------------------
# render_alignment_report
# ---------------------------------------------------------------------------
class TestRenderAlignmentReport:
"""报告渲染测试。"""
def test_contains_all_sections(self) -> None:
report = render_alignment_report([], [], "/repo")
assert "## 映射关系" in report
assert "## 过期点" in report
assert "## 冲突点" in report
assert "## 缺失点" in report
assert "## 统计摘要" in report
def test_contains_header_metadata(self) -> None:
report = render_alignment_report([], [], "/repo")
assert "生成时间" in report
assert "`/repo`" in report
def test_contains_iso_timestamp(self) -> None:
report = render_alignment_report([], [], "/repo")
# ISO 格式时间戳包含 T 和 Z
import re
assert re.search(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", report)
def test_mapping_table_rendered(self) -> None:
mappings = [
DocMapping(
doc_path="docs/guide.md",
doc_topic="项目文档",
related_code=["tasks/base.py"],
status="aligned",
)
]
report = render_alignment_report(mappings, [], "/repo")
assert "`docs/guide.md`" in report
assert "`tasks/base.py`" in report
assert "aligned" in report
def test_stale_issues_rendered(self) -> None:
issues = [
AlignmentIssue(
doc_path="docs/old.md",
issue_type="stale",
description="引用了已删除的文件",
related_code="tasks/deleted.py",
)
]
report = render_alignment_report([], issues, "/repo")
assert "引用了已删除的文件" in report
assert "## 过期点" in report
def test_conflict_issues_rendered(self) -> None:
issues = [
AlignmentIssue(
doc_path="docs/dict.md",
issue_type="conflict",
description="字段不一致",
related_code="database/schema.sql",
)
]
report = render_alignment_report([], issues, "/repo")
assert "字段不一致" in report
def test_missing_issues_rendered(self) -> None:
issues = [
AlignmentIssue(
doc_path="docs/dict.md",
issue_type="missing",
description="缺少表定义",
related_code="database/schema.sql",
)
]
report = render_alignment_report([], issues, "/repo")
assert "缺少表定义" in report
def test_summary_counts(self) -> None:
issues = [
AlignmentIssue("a", "stale", "d1", "c1"),
AlignmentIssue("b", "stale", "d2", "c2"),
AlignmentIssue("c", "conflict", "d3", "c3"),
AlignmentIssue("d", "missing", "d4", "c4"),
]
mappings = [DocMapping("x", "t", [], "aligned")]
report = render_alignment_report(mappings, issues, "/repo")
assert "过期点数量2" in report
assert "冲突点数量1" in report
assert "缺失点数量1" in report
assert "文档总数1" in report
def test_empty_report(self) -> None:
report = render_alignment_report([], [], "/repo")
assert "未发现过期点" in report
assert "未发现冲突点" in report
assert "未发现缺失点" in report
assert "过期点数量0" in report
# ---------------------------------------------------------------------------
# 属性测试 — Property 11 / 12 / 16 (hypothesis)
# hypothesis 与 pytest 的 function-scoped fixture (tmp_path) 不兼容,
# 因此在测试内部使用 tempfile.mkdtemp 自行管理临时目录。
# ---------------------------------------------------------------------------
import shutil
import tempfile
from hypothesis import given, settings
from hypothesis import strategies as st
from scripts.audit.doc_alignment_analyzer import _CORE_CODE_DIRS
class TestPropertyStaleReferenceDetection:
"""Feature: repo-audit, Property 11: 过期引用检测
*对于任意* 文档中提取的代码引用,若该引用指向的文件路径在仓库中不存在,
则 check_reference_validity 应返回 False。
Validates: Requirements 3.3
"""
_safe_name = st.from_regex(r"[a-z][a-z0-9_]{1,12}", fullmatch=True)
@given(
existing_names=st.lists(
_safe_name, min_size=1, max_size=5, unique=True,
),
missing_names=st.lists(
_safe_name, min_size=1, max_size=5, unique=True,
),
)
@settings(max_examples=100)
def test_nonexistent_path_returns_false(
self,
existing_names: list[str],
missing_names: list[str],
) -> None:
"""不存在的文件路径引用应返回 False。"""
tmp = Path(tempfile.mkdtemp())
try:
for name in existing_names:
(tmp / f"{name}.py").write_text("# ok", encoding="utf-8")
existing_set = set(existing_names)
# 只检查确实不存在的名称
truly_missing = [n for n in missing_names if n not in existing_set]
for name in truly_missing:
ref = f"nonexistent_dir/{name}.py"
result = check_reference_validity(ref, tmp)
assert result is False, (
f"引用 '{ref}' 指向不存在的文件,但返回了 True"
)
finally:
shutil.rmtree(tmp, ignore_errors=True)
@given(
existing_names=st.lists(
_safe_name, min_size=1, max_size=5, unique=True,
),
)
@settings(max_examples=100)
def test_existing_path_returns_true(
self,
existing_names: list[str],
) -> None:
"""存在的文件路径引用应返回 True。"""
tmp = Path(tempfile.mkdtemp())
try:
for name in existing_names:
(tmp / f"{name}.py").write_text("# ok", encoding="utf-8")
for name in existing_names:
ref = f"{name}.py"
result = check_reference_validity(ref, tmp)
assert result is True, (
f"引用 '{ref}' 指向存在的文件,但返回了 False"
)
finally:
shutil.rmtree(tmp, ignore_errors=True)
class TestPropertyMissingDocDetection:
"""Feature: repo-audit, Property 12: 缺失文档检测
*对于任意* 核心代码模块集合和已文档化模块集合,
find_undocumented_modules 返回的缺失列表应恰好等于核心模块集合与已文档化集合的差集。
Validates: Requirements 3.5
"""
_core_dir = st.sampled_from(list(_CORE_CODE_DIRS))
_module_name = st.from_regex(r"[a-z][a-z0-9_]{1,10}", fullmatch=True)
@given(
core_dir=_core_dir,
module_names=st.lists(
_module_name, min_size=2, max_size=6, unique=True,
),
doc_fraction=st.floats(min_value=0.0, max_value=1.0),
)
@settings(max_examples=100)
def test_undocumented_equals_difference(
self,
core_dir: str,
module_names: list[str],
doc_fraction: float,
) -> None:
"""返回的缺失列表应恰好等于核心模块与已文档化集合的差集。"""
tmp = Path(tempfile.mkdtemp())
try:
code_dir = tmp / core_dir
code_dir.mkdir(parents=True, exist_ok=True)
all_modules: set[str] = set()
for name in module_names:
(code_dir / f"{name}.py").write_text("# module", encoding="utf-8")
all_modules.add(f"{core_dir}/{name}.py")
split_idx = int(len(module_names) * doc_fraction)
documented = {
f"{core_dir}/{n}.py" for n in module_names[:split_idx]
}
result = find_undocumented_modules(tmp, documented)
expected = sorted(all_modules - documented)
assert result == expected, (
f"期望缺失列表 {expected},实际得到 {result}"
)
finally:
shutil.rmtree(tmp, ignore_errors=True)
class TestPropertyAlignmentReportSections:
"""Feature: repo-audit, Property 16: 文档对齐报告分区完整性
*对于任意* render_alignment_report 的输出Markdown 文本应包含
"映射关系""过期点""冲突点""缺失点"四个分区标题。
Validates: Requirements 3.8
"""
_issue_type = st.sampled_from(["stale", "conflict", "missing"])
_text = st.text(
alphabet=st.characters(
whitelist_categories=("L", "N", "P"),
blacklist_characters="\x00",
),
min_size=1,
max_size=30,
)
_mapping_st = st.builds(
DocMapping,
doc_path=_text,
doc_topic=_text,
related_code=st.lists(_text, max_size=3),
status=st.sampled_from(["aligned", "stale", "conflict", "orphan"]),
)
_issue_st = st.builds(
AlignmentIssue,
doc_path=_text,
issue_type=_issue_type,
description=_text,
related_code=_text,
)
@given(
mappings=st.lists(_mapping_st, max_size=5),
issues=st.lists(_issue_st, max_size=8),
)
@settings(max_examples=100)
def test_report_contains_four_sections(
self,
mappings: list[DocMapping],
issues: list[AlignmentIssue],
) -> None:
"""报告应包含四个分区标题。"""
report = render_alignment_report(mappings, issues, "/test/repo")
required_sections = ["## 映射关系", "## 过期点", "## 冲突点", "## 缺失点"]
for section in required_sections:
assert section in report, (
f"报告中缺少分区标题 '{section}'"
)