# -*- coding: utf-8 -*- """ 单元测试 — 文档对齐分析器 (doc_alignment_analyzer.py) 覆盖: - scan_docs 文档来源识别 - extract_code_references 代码引用提取 - check_reference_validity 引用有效性检查 - find_undocumented_modules 缺失文档检测 - check_ddl_vs_dictionary DDL 与数据字典比对 - check_api_samples_vs_parsers API 样本与解析器比对 - render_alignment_report 报告渲染 """ from __future__ import annotations import json from pathlib import Path import pytest from scripts.audit import AlignmentIssue, DocMapping from scripts.audit.doc_alignment_analyzer import ( _parse_ddl_tables, _parse_dictionary_tables, build_mappings, check_api_samples_vs_parsers, check_ddl_vs_dictionary, check_reference_validity, extract_code_references, find_undocumented_modules, render_alignment_report, scan_docs, ) # --------------------------------------------------------------------------- # scan_docs # --------------------------------------------------------------------------- class TestScanDocs: """文档来源识别测试。""" def test_finds_docs_dir_md(self, tmp_path: Path) -> None: (tmp_path / "docs").mkdir() (tmp_path / "docs" / "guide.md").write_text("# Guide", encoding="utf-8") result = scan_docs(tmp_path) assert "docs/guide.md" in result def test_finds_root_readme(self, tmp_path: Path) -> None: (tmp_path / "README.md").write_text("# Readme", encoding="utf-8") result = scan_docs(tmp_path) assert "README.md" in result def test_finds_dev_notes(self, tmp_path: Path) -> None: (tmp_path / "开发笔记").mkdir() (tmp_path / "开发笔记" / "记录.md").write_text("笔记", encoding="utf-8") result = scan_docs(tmp_path) assert "开发笔记/记录.md" in result def test_finds_module_readme(self, tmp_path: Path) -> None: (tmp_path / "gui").mkdir() (tmp_path / "gui" / "README.md").write_text("# GUI", encoding="utf-8") result = scan_docs(tmp_path) assert "gui/README.md" in result def test_finds_steering_files(self, tmp_path: Path) -> None: steering = tmp_path / ".kiro" / "steering" steering.mkdir(parents=True) (steering / "tech.md").write_text("# Tech", encoding="utf-8") result = scan_docs(tmp_path) assert ".kiro/steering/tech.md" in result def test_finds_json_samples(self, tmp_path: Path) -> None: sample_dir = tmp_path / "docs" / "test-json-doc" sample_dir.mkdir(parents=True) (sample_dir / "member.json").write_text("[]", encoding="utf-8") result = scan_docs(tmp_path) assert "docs/test-json-doc/member.json" in result def test_empty_repo_returns_empty(self, tmp_path: Path) -> None: result = scan_docs(tmp_path) assert result == [] def test_results_sorted(self, tmp_path: Path) -> None: (tmp_path / "docs").mkdir() (tmp_path / "docs" / "z.md").write_text("z", encoding="utf-8") (tmp_path / "docs" / "a.md").write_text("a", encoding="utf-8") (tmp_path / "README.md").write_text("r", encoding="utf-8") result = scan_docs(tmp_path) assert result == sorted(result) # --------------------------------------------------------------------------- # extract_code_references # --------------------------------------------------------------------------- class TestExtractCodeReferences: """代码引用提取测试。""" def test_extracts_backtick_paths(self, tmp_path: Path) -> None: doc = tmp_path / "doc.md" doc.write_text("使用 `tasks/base_task.py` 作为基类", encoding="utf-8") refs = extract_code_references(doc) assert "tasks/base_task.py" in refs def test_extracts_class_names(self, tmp_path: Path) -> None: doc = tmp_path / "doc.md" doc.write_text("继承 `BaseTask` 类", encoding="utf-8") refs = extract_code_references(doc) assert "BaseTask" in refs def test_skips_single_char(self, tmp_path: Path) -> None: doc = tmp_path / "doc.md" doc.write_text("变量 `x` 和 `y`", encoding="utf-8") refs = extract_code_references(doc) assert refs == [] def test_skips_pure_numbers(self, tmp_path: Path) -> None: doc = tmp_path / "doc.md" doc.write_text("版本 `2.0.0` 和 ID `12345`", encoding="utf-8") refs = extract_code_references(doc) assert refs == [] def test_deduplicates(self, tmp_path: Path) -> None: doc = tmp_path / "doc.md" doc.write_text("`foo.py` 和 `foo.py` 重复", encoding="utf-8") refs = extract_code_references(doc) assert refs.count("foo.py") == 1 def test_nonexistent_file_returns_empty(self, tmp_path: Path) -> None: refs = extract_code_references(tmp_path / "nonexistent.md") assert refs == [] def test_normalizes_backslash(self, tmp_path: Path) -> None: doc = tmp_path / "doc.md" doc.write_text("路径 `tasks\\base_task.py`", encoding="utf-8") refs = extract_code_references(doc) assert "tasks/base_task.py" in refs # --------------------------------------------------------------------------- # check_reference_validity # --------------------------------------------------------------------------- class TestCheckReferenceValidity: """引用有效性检查测试。""" def test_valid_file_path(self, tmp_path: Path) -> None: (tmp_path / "tasks").mkdir() (tmp_path / "tasks" / "base.py").write_text("", encoding="utf-8") assert check_reference_validity("tasks/base.py", tmp_path) is True def test_invalid_file_path(self, tmp_path: Path) -> None: assert check_reference_validity("nonexistent/file.py", tmp_path) is False def test_strips_legacy_prefix(self, tmp_path: Path) -> None: """兼容旧包名前缀(etl_billiards/)和当前根目录前缀(FQ-ETL/)""" (tmp_path / "tasks").mkdir() (tmp_path / "tasks" / "x.py").write_text("", encoding="utf-8") assert check_reference_validity("etl_billiards/tasks/x.py", tmp_path) is True assert check_reference_validity("FQ-ETL/tasks/x.py", tmp_path) is True def test_directory_path(self, tmp_path: Path) -> None: (tmp_path / "loaders").mkdir() assert check_reference_validity("loaders", tmp_path) is True def test_dotted_module_path(self, tmp_path: Path) -> None: (tmp_path / "config").mkdir() (tmp_path / "config" / "settings.py").write_text("", encoding="utf-8") assert check_reference_validity("config.settings", tmp_path) is True # --------------------------------------------------------------------------- # find_undocumented_modules # --------------------------------------------------------------------------- class TestFindUndocumentedModules: """缺失文档检测测试。""" def test_finds_undocumented(self, tmp_path: Path) -> None: tasks_dir = tmp_path / "tasks" tasks_dir.mkdir() (tasks_dir / "__init__.py").write_text("", encoding="utf-8") (tasks_dir / "ods_task.py").write_text("", encoding="utf-8") result = find_undocumented_modules(tmp_path, set()) assert "tasks/ods_task.py" in result def test_excludes_init(self, tmp_path: Path) -> None: tasks_dir = tmp_path / "tasks" tasks_dir.mkdir() (tasks_dir / "__init__.py").write_text("", encoding="utf-8") result = find_undocumented_modules(tmp_path, set()) assert all("__init__" not in r for r in result) def test_documented_module_excluded(self, tmp_path: Path) -> None: tasks_dir = tmp_path / "tasks" tasks_dir.mkdir() (tasks_dir / "ods_task.py").write_text("", encoding="utf-8") result = find_undocumented_modules(tmp_path, {"tasks/ods_task.py"}) assert "tasks/ods_task.py" not in result def test_non_core_dirs_ignored(self, tmp_path: Path) -> None: """gui/ 不在核心代码目录列表中,不应被检测。""" gui_dir = tmp_path / "gui" gui_dir.mkdir() (gui_dir / "main.py").write_text("", encoding="utf-8") result = find_undocumented_modules(tmp_path, set()) assert all("gui/" not in r for r in result) def test_results_sorted(self, tmp_path: Path) -> None: tasks_dir = tmp_path / "tasks" tasks_dir.mkdir() (tasks_dir / "z_task.py").write_text("", encoding="utf-8") (tasks_dir / "a_task.py").write_text("", encoding="utf-8") result = find_undocumented_modules(tmp_path, set()) assert result == sorted(result) # --------------------------------------------------------------------------- # _parse_ddl_tables / _parse_dictionary_tables # --------------------------------------------------------------------------- class TestParseDdlTables: """DDL 解析测试。""" def test_extracts_table_and_columns(self) -> None: sql = """ CREATE TABLE IF NOT EXISTS dim_member ( member_id BIGINT, nickname TEXT, mobile TEXT, PRIMARY KEY (member_id) ); """ result = _parse_ddl_tables(sql) assert "dim_member" in result assert "member_id" in result["dim_member"] assert "nickname" in result["dim_member"] assert "mobile" in result["dim_member"] def test_handles_schema_prefix(self) -> None: sql = "CREATE TABLE billiards_dwd.dim_site (\n site_id BIGINT\n);" result = _parse_ddl_tables(sql) assert "dim_site" in result def test_excludes_sql_keywords(self) -> None: sql = """ CREATE TABLE test_tbl ( id INTEGER, PRIMARY KEY (id) ); """ result = _parse_ddl_tables(sql) assert "primary" not in result.get("test_tbl", set()) class TestParseDictionaryTables: """数据字典解析测试。""" def test_extracts_table_and_fields(self) -> None: md = """## dim_member | 字段 | 类型 | 说明 | |------|------|------| | member_id | BIGINT | 会员ID | | nickname | TEXT | 昵称 | """ result = _parse_dictionary_tables(md) assert "dim_member" in result assert "member_id" in result["dim_member"] assert "nickname" in result["dim_member"] def test_skips_header_row(self) -> None: md = """## dim_test | 字段 | 类型 | |------|------| | col_a | INT | """ result = _parse_dictionary_tables(md) assert "字段" not in result.get("dim_test", set()) def test_handles_backtick_table_name(self) -> None: md = "## `dim_goods`\n\n| 字段 |\n| goods_id |" result = _parse_dictionary_tables(md) assert "dim_goods" in result # --------------------------------------------------------------------------- # check_ddl_vs_dictionary # --------------------------------------------------------------------------- class TestCheckDdlVsDictionary: """DDL 与数据字典比对测试。""" def test_detects_missing_table_in_dictionary(self, tmp_path: Path) -> None: # DDL 有表,字典没有 db_dir = tmp_path / "database" db_dir.mkdir() (db_dir / "schema_test.sql").write_text( "CREATE TABLE dim_orphan (\n id BIGINT\n);", encoding="utf-8", ) docs_dir = tmp_path / "docs" docs_dir.mkdir() (docs_dir / "dwd_main_tables_dictionary.md").write_text( "## dim_other\n\n| 字段 |\n| id |", encoding="utf-8", ) issues = check_ddl_vs_dictionary(tmp_path) missing = [i for i in issues if i.issue_type == "missing"] assert any("dim_orphan" in i.description for i in missing) def test_detects_column_mismatch(self, tmp_path: Path) -> None: db_dir = tmp_path / "database" db_dir.mkdir() (db_dir / "schema_test.sql").write_text( "CREATE TABLE dim_x (\n id BIGINT,\n extra_col TEXT\n);", encoding="utf-8", ) docs_dir = tmp_path / "docs" docs_dir.mkdir() (docs_dir / "dwd_main_tables_dictionary.md").write_text( "## dim_x\n\n| 字段 | 类型 |\n|---|---|\n| id | BIGINT |", encoding="utf-8", ) issues = check_ddl_vs_dictionary(tmp_path) conflict = [i for i in issues if i.issue_type == "conflict"] assert any("extra_col" in i.description for i in conflict) def test_no_issues_when_aligned(self, tmp_path: Path) -> None: db_dir = tmp_path / "database" db_dir.mkdir() (db_dir / "schema_test.sql").write_text( "CREATE TABLE dim_ok (\n id BIGINT\n);", encoding="utf-8", ) docs_dir = tmp_path / "docs" docs_dir.mkdir() (docs_dir / "dwd_main_tables_dictionary.md").write_text( "## dim_ok\n\n| 字段 | 类型 |\n|---|---|\n| id | BIGINT |", encoding="utf-8", ) issues = check_ddl_vs_dictionary(tmp_path) assert len(issues) == 0 # --------------------------------------------------------------------------- # check_api_samples_vs_parsers # --------------------------------------------------------------------------- class TestCheckApiSamplesVsParsers: """API 样本与解析器比对测试。""" def test_detects_json_field_not_in_ods(self, tmp_path: Path) -> None: # JSON 样本有 extra_field,ODS 没有 sample_dir = tmp_path / "docs" / "test-json-doc" sample_dir.mkdir(parents=True) (sample_dir / "test_entity.json").write_text( json.dumps([{"id": 1, "name": "a", "extra_field": "x"}]), encoding="utf-8", ) db_dir = tmp_path / "database" db_dir.mkdir() (db_dir / "schema_ODS_doc.sql").write_text( "CREATE TABLE billiards_ods.test_entity (\n" " id BIGINT,\n name TEXT,\n" " content_hash TEXT,\n payload JSONB\n);", encoding="utf-8", ) issues = check_api_samples_vs_parsers(tmp_path) assert any("extra_field" in i.description for i in issues) def test_no_issues_when_aligned(self, tmp_path: Path) -> None: sample_dir = tmp_path / "docs" / "test-json-doc" sample_dir.mkdir(parents=True) (sample_dir / "aligned_entity.json").write_text( json.dumps([{"id": 1, "name": "a"}]), encoding="utf-8", ) db_dir = tmp_path / "database" db_dir.mkdir() (db_dir / "schema_ODS_doc.sql").write_text( "CREATE TABLE billiards_ods.aligned_entity (\n" " id BIGINT,\n name TEXT,\n" " content_hash TEXT,\n payload JSONB\n);", encoding="utf-8", ) issues = check_api_samples_vs_parsers(tmp_path) assert len(issues) == 0 def test_skips_when_no_ods_table(self, tmp_path: Path) -> None: sample_dir = tmp_path / "docs" / "test-json-doc" sample_dir.mkdir(parents=True) (sample_dir / "unknown.json").write_text( json.dumps([{"a": 1}]), encoding="utf-8", ) db_dir = tmp_path / "database" db_dir.mkdir() (db_dir / "schema_ODS_doc.sql").write_text("-- empty", encoding="utf-8") issues = check_api_samples_vs_parsers(tmp_path) assert len(issues) == 0 # --------------------------------------------------------------------------- # render_alignment_report # --------------------------------------------------------------------------- class TestRenderAlignmentReport: """报告渲染测试。""" def test_contains_all_sections(self) -> None: report = render_alignment_report([], [], "/repo") assert "## 映射关系" in report assert "## 过期点" in report assert "## 冲突点" in report assert "## 缺失点" in report assert "## 统计摘要" in report def test_contains_header_metadata(self) -> None: report = render_alignment_report([], [], "/repo") assert "生成时间" in report assert "`/repo`" in report def test_contains_iso_timestamp(self) -> None: report = render_alignment_report([], [], "/repo") # ISO 格式时间戳包含 T 和 Z import re assert re.search(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", report) def test_mapping_table_rendered(self) -> None: mappings = [ DocMapping( doc_path="docs/guide.md", doc_topic="项目文档", related_code=["tasks/base.py"], status="aligned", ) ] report = render_alignment_report(mappings, [], "/repo") assert "`docs/guide.md`" in report assert "`tasks/base.py`" in report assert "aligned" in report def test_stale_issues_rendered(self) -> None: issues = [ AlignmentIssue( doc_path="docs/old.md", issue_type="stale", description="引用了已删除的文件", related_code="tasks/deleted.py", ) ] report = render_alignment_report([], issues, "/repo") assert "引用了已删除的文件" in report assert "## 过期点" in report def test_conflict_issues_rendered(self) -> None: issues = [ AlignmentIssue( doc_path="docs/dict.md", issue_type="conflict", description="字段不一致", related_code="database/schema.sql", ) ] report = render_alignment_report([], issues, "/repo") assert "字段不一致" in report def test_missing_issues_rendered(self) -> None: issues = [ AlignmentIssue( doc_path="docs/dict.md", issue_type="missing", description="缺少表定义", related_code="database/schema.sql", ) ] report = render_alignment_report([], issues, "/repo") assert "缺少表定义" in report def test_summary_counts(self) -> None: issues = [ AlignmentIssue("a", "stale", "d1", "c1"), AlignmentIssue("b", "stale", "d2", "c2"), AlignmentIssue("c", "conflict", "d3", "c3"), AlignmentIssue("d", "missing", "d4", "c4"), ] mappings = [DocMapping("x", "t", [], "aligned")] report = render_alignment_report(mappings, issues, "/repo") assert "过期点数量:2" in report assert "冲突点数量:1" in report assert "缺失点数量:1" in report assert "文档总数:1" in report def test_empty_report(self) -> None: report = render_alignment_report([], [], "/repo") assert "未发现过期点" in report assert "未发现冲突点" in report assert "未发现缺失点" in report assert "过期点数量:0" in report # --------------------------------------------------------------------------- # 属性测试 — Property 11 / 12 / 16 (hypothesis) # hypothesis 与 pytest 的 function-scoped fixture (tmp_path) 不兼容, # 因此在测试内部使用 tempfile.mkdtemp 自行管理临时目录。 # --------------------------------------------------------------------------- import shutil import tempfile from hypothesis import given, settings from hypothesis import strategies as st from scripts.audit.doc_alignment_analyzer import _CORE_CODE_DIRS class TestPropertyStaleReferenceDetection: """Feature: repo-audit, Property 11: 过期引用检测 *对于任意* 文档中提取的代码引用,若该引用指向的文件路径在仓库中不存在, 则 check_reference_validity 应返回 False。 Validates: Requirements 3.3 """ _safe_name = st.from_regex(r"[a-z][a-z0-9_]{1,12}", fullmatch=True) @given( existing_names=st.lists( _safe_name, min_size=1, max_size=5, unique=True, ), missing_names=st.lists( _safe_name, min_size=1, max_size=5, unique=True, ), ) @settings(max_examples=100) def test_nonexistent_path_returns_false( self, existing_names: list[str], missing_names: list[str], ) -> None: """不存在的文件路径引用应返回 False。""" tmp = Path(tempfile.mkdtemp()) try: for name in existing_names: (tmp / f"{name}.py").write_text("# ok", encoding="utf-8") existing_set = set(existing_names) # 只检查确实不存在的名称 truly_missing = [n for n in missing_names if n not in existing_set] for name in truly_missing: ref = f"nonexistent_dir/{name}.py" result = check_reference_validity(ref, tmp) assert result is False, ( f"引用 '{ref}' 指向不存在的文件,但返回了 True" ) finally: shutil.rmtree(tmp, ignore_errors=True) @given( existing_names=st.lists( _safe_name, min_size=1, max_size=5, unique=True, ), ) @settings(max_examples=100) def test_existing_path_returns_true( self, existing_names: list[str], ) -> None: """存在的文件路径引用应返回 True。""" tmp = Path(tempfile.mkdtemp()) try: for name in existing_names: (tmp / f"{name}.py").write_text("# ok", encoding="utf-8") for name in existing_names: ref = f"{name}.py" result = check_reference_validity(ref, tmp) assert result is True, ( f"引用 '{ref}' 指向存在的文件,但返回了 False" ) finally: shutil.rmtree(tmp, ignore_errors=True) class TestPropertyMissingDocDetection: """Feature: repo-audit, Property 12: 缺失文档检测 *对于任意* 核心代码模块集合和已文档化模块集合, find_undocumented_modules 返回的缺失列表应恰好等于核心模块集合与已文档化集合的差集。 Validates: Requirements 3.5 """ _core_dir = st.sampled_from(list(_CORE_CODE_DIRS)) _module_name = st.from_regex(r"[a-z][a-z0-9_]{1,10}", fullmatch=True) @given( core_dir=_core_dir, module_names=st.lists( _module_name, min_size=2, max_size=6, unique=True, ), doc_fraction=st.floats(min_value=0.0, max_value=1.0), ) @settings(max_examples=100) def test_undocumented_equals_difference( self, core_dir: str, module_names: list[str], doc_fraction: float, ) -> None: """返回的缺失列表应恰好等于核心模块与已文档化集合的差集。""" tmp = Path(tempfile.mkdtemp()) try: code_dir = tmp / core_dir code_dir.mkdir(parents=True, exist_ok=True) all_modules: set[str] = set() for name in module_names: (code_dir / f"{name}.py").write_text("# module", encoding="utf-8") all_modules.add(f"{core_dir}/{name}.py") split_idx = int(len(module_names) * doc_fraction) documented = { f"{core_dir}/{n}.py" for n in module_names[:split_idx] } result = find_undocumented_modules(tmp, documented) expected = sorted(all_modules - documented) assert result == expected, ( f"期望缺失列表 {expected},实际得到 {result}" ) finally: shutil.rmtree(tmp, ignore_errors=True) class TestPropertyAlignmentReportSections: """Feature: repo-audit, Property 16: 文档对齐报告分区完整性 *对于任意* render_alignment_report 的输出,Markdown 文本应包含 "映射关系"、"过期点"、"冲突点"、"缺失点"四个分区标题。 Validates: Requirements 3.8 """ _issue_type = st.sampled_from(["stale", "conflict", "missing"]) _text = st.text( alphabet=st.characters( whitelist_categories=("L", "N", "P"), blacklist_characters="\x00", ), min_size=1, max_size=30, ) _mapping_st = st.builds( DocMapping, doc_path=_text, doc_topic=_text, related_code=st.lists(_text, max_size=3), status=st.sampled_from(["aligned", "stale", "conflict", "orphan"]), ) _issue_st = st.builds( AlignmentIssue, doc_path=_text, issue_type=_issue_type, description=_text, related_code=_text, ) @given( mappings=st.lists(_mapping_st, max_size=5), issues=st.lists(_issue_st, max_size=8), ) @settings(max_examples=100) def test_report_contains_four_sections( self, mappings: list[DocMapping], issues: list[AlignmentIssue], ) -> None: """报告应包含四个分区标题。""" report = render_alignment_report(mappings, issues, "/test/repo") required_sections = ["## 映射关系", "## 过期点", "## 冲突点", "## 缺失点"] for section in required_sections: assert section in report, ( f"报告中缺少分区标题 '{section}'" )