668 lines
26 KiB
Python
668 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
单元测试 — 流程树分析器 (flow_analyzer.py)
|
||
|
||
覆盖:
|
||
- parse_imports: import 语句解析、标准库/第三方排除、语法错误容错
|
||
- build_flow_tree: 递归构建、循环导入处理
|
||
- find_orphan_modules: 孤立模块检测
|
||
- render_flow_report: Markdown 渲染、Mermaid 图、统计摘要
|
||
- discover_entry_points: 入口点识别
|
||
- classify_task_type / classify_loader_type: 类型区分
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
|
||
from scripts.audit import FileEntry, FlowNode
|
||
from scripts.audit.flow_analyzer import (
|
||
build_flow_tree,
|
||
classify_loader_type,
|
||
classify_task_type,
|
||
discover_entry_points,
|
||
find_orphan_modules,
|
||
parse_imports,
|
||
render_flow_report,
|
||
_path_to_module_name,
|
||
_parse_bat_python_target,
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# parse_imports 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestParseImports:
|
||
"""import 语句解析测试。"""
|
||
|
||
def test_absolute_import(self, tmp_path: Path) -> None:
|
||
"""绝对导入项目内部模块应被识别。"""
|
||
f = tmp_path / "test.py"
|
||
f.write_text("import cli.main\nimport config.settings\n", encoding="utf-8")
|
||
result = parse_imports(f)
|
||
assert "cli.main" in result
|
||
assert "config.settings" in result
|
||
|
||
def test_from_import(self, tmp_path: Path) -> None:
|
||
"""from ... import 语句应被识别。"""
|
||
f = tmp_path / "test.py"
|
||
f.write_text("from tasks.base_task import BaseTask\n", encoding="utf-8")
|
||
result = parse_imports(f)
|
||
assert "tasks.base_task" in result
|
||
|
||
def test_stdlib_excluded(self, tmp_path: Path) -> None:
|
||
"""标准库模块应被排除。"""
|
||
f = tmp_path / "test.py"
|
||
f.write_text("import os\nimport sys\nimport json\nfrom pathlib import Path\n", encoding="utf-8")
|
||
result = parse_imports(f)
|
||
assert result == []
|
||
|
||
def test_third_party_excluded(self, tmp_path: Path) -> None:
|
||
"""第三方包应被排除。"""
|
||
f = tmp_path / "test.py"
|
||
f.write_text("import requests\nfrom psycopg2 import sql\nimport flask\n", encoding="utf-8")
|
||
result = parse_imports(f)
|
||
assert result == []
|
||
|
||
def test_mixed_imports(self, tmp_path: Path) -> None:
|
||
"""混合导入应只保留项目内部模块。"""
|
||
f = tmp_path / "test.py"
|
||
f.write_text(
|
||
"import os\nimport cli.main\nimport requests\nfrom loaders.base_loader import BaseLoader\n",
|
||
encoding="utf-8",
|
||
)
|
||
result = parse_imports(f)
|
||
assert "cli.main" in result
|
||
assert "loaders.base_loader" in result
|
||
assert "os" not in result
|
||
assert "requests" not in result
|
||
|
||
def test_syntax_error_returns_empty(self, tmp_path: Path) -> None:
|
||
"""语法错误的文件应返回空列表。"""
|
||
f = tmp_path / "bad.py"
|
||
f.write_text("def broken(\n", encoding="utf-8")
|
||
result = parse_imports(f)
|
||
assert result == []
|
||
|
||
def test_nonexistent_file_returns_empty(self, tmp_path: Path) -> None:
|
||
"""不存在的文件应返回空列表。"""
|
||
result = parse_imports(tmp_path / "nonexistent.py")
|
||
assert result == []
|
||
|
||
def test_deduplication(self, tmp_path: Path) -> None:
|
||
"""重复导入应去重。"""
|
||
f = tmp_path / "test.py"
|
||
f.write_text("import cli.main\nimport cli.main\nfrom cli.main import main\n", encoding="utf-8")
|
||
result = parse_imports(f)
|
||
assert result.count("cli.main") == 1
|
||
|
||
def test_empty_file(self, tmp_path: Path) -> None:
|
||
"""空文件应返回空列表。"""
|
||
f = tmp_path / "empty.py"
|
||
f.write_text("", encoding="utf-8")
|
||
result = parse_imports(f)
|
||
assert result == []
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# build_flow_tree 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestBuildFlowTree:
|
||
"""流程树构建测试。"""
|
||
|
||
def test_single_file_no_imports(self, tmp_path: Path) -> None:
|
||
"""无导入的单文件应生成叶节点。"""
|
||
cli_dir = tmp_path / "cli"
|
||
cli_dir.mkdir()
|
||
(cli_dir / "__init__.py").write_text("", encoding="utf-8")
|
||
(cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
|
||
|
||
tree = build_flow_tree(tmp_path, "cli/main.py")
|
||
assert tree.name == "cli.main"
|
||
assert tree.source_file == "cli/main.py"
|
||
assert tree.children == []
|
||
|
||
def test_simple_import_chain(self, tmp_path: Path) -> None:
|
||
"""简单导入链应正确构建子节点。"""
|
||
# cli/main.py → config/settings.py
|
||
cli_dir = tmp_path / "cli"
|
||
cli_dir.mkdir()
|
||
(cli_dir / "__init__.py").write_text("", encoding="utf-8")
|
||
(cli_dir / "main.py").write_text(
|
||
"from config.settings import AppConfig\n", encoding="utf-8"
|
||
)
|
||
|
||
config_dir = tmp_path / "config"
|
||
config_dir.mkdir()
|
||
(config_dir / "__init__.py").write_text("", encoding="utf-8")
|
||
(config_dir / "settings.py").write_text("class AppConfig: pass\n", encoding="utf-8")
|
||
|
||
tree = build_flow_tree(tmp_path, "cli/main.py")
|
||
assert tree.name == "cli.main"
|
||
assert len(tree.children) == 1
|
||
assert tree.children[0].name == "config.settings"
|
||
|
||
def test_circular_import_no_infinite_loop(self, tmp_path: Path) -> None:
|
||
"""循环导入不应导致无限递归。"""
|
||
pkg = tmp_path / "utils"
|
||
pkg.mkdir()
|
||
(pkg / "__init__.py").write_text("", encoding="utf-8")
|
||
# a → b → a(循环)
|
||
(pkg / "a.py").write_text("from utils.b import func_b\n", encoding="utf-8")
|
||
(pkg / "b.py").write_text("from utils.a import func_a\n", encoding="utf-8")
|
||
|
||
# 不应抛出 RecursionError
|
||
tree = build_flow_tree(tmp_path, "utils/a.py")
|
||
assert tree.name == "utils.a"
|
||
|
||
def test_entry_node_type(self, tmp_path: Path) -> None:
|
||
"""CLI 入口文件应标记为 entry 类型。"""
|
||
cli_dir = tmp_path / "cli"
|
||
cli_dir.mkdir()
|
||
(cli_dir / "__init__.py").write_text("", encoding="utf-8")
|
||
(cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
|
||
|
||
tree = build_flow_tree(tmp_path, "cli/main.py")
|
||
assert tree.node_type == "entry"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# find_orphan_modules 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestFindOrphanModules:
|
||
"""孤立模块检测测试。"""
|
||
|
||
def test_all_reachable(self, tmp_path: Path) -> None:
|
||
"""所有模块都可达时应返回空列表。"""
|
||
entries = [
|
||
FileEntry("cli/main.py", False, 100, ".py", False),
|
||
FileEntry("config/settings.py", False, 200, ".py", False),
|
||
]
|
||
reachable = {"cli/main.py", "config/settings.py"}
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
assert orphans == []
|
||
|
||
def test_orphan_detected(self, tmp_path: Path) -> None:
|
||
"""不可达的模块应被标记为孤立。"""
|
||
entries = [
|
||
FileEntry("cli/main.py", False, 100, ".py", False),
|
||
FileEntry("utils/orphan.py", False, 50, ".py", False),
|
||
]
|
||
reachable = {"cli/main.py"}
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
assert "utils/orphan.py" in orphans
|
||
|
||
def test_init_files_excluded(self, tmp_path: Path) -> None:
|
||
"""__init__.py 不应被视为孤立模块。"""
|
||
entries = [
|
||
FileEntry("cli/__init__.py", False, 0, ".py", False),
|
||
]
|
||
reachable: set[str] = set()
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
assert "cli/__init__.py" not in orphans
|
||
|
||
def test_test_files_excluded(self, tmp_path: Path) -> None:
|
||
"""测试文件不应被视为孤立模块。"""
|
||
entries = [
|
||
FileEntry("tests/unit/test_something.py", False, 100, ".py", False),
|
||
]
|
||
reachable: set[str] = set()
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
assert orphans == []
|
||
|
||
def test_audit_scripts_excluded(self, tmp_path: Path) -> None:
|
||
"""审计脚本自身不应被视为孤立模块。"""
|
||
entries = [
|
||
FileEntry("scripts/audit/scanner.py", False, 100, ".py", False),
|
||
]
|
||
reachable: set[str] = set()
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
assert orphans == []
|
||
|
||
def test_directories_excluded(self, tmp_path: Path) -> None:
|
||
"""目录条目不应出现在孤立列表中。"""
|
||
entries = [
|
||
FileEntry("cli", True, 0, "", False),
|
||
]
|
||
reachable: set[str] = set()
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
assert orphans == []
|
||
|
||
def test_sorted_output(self, tmp_path: Path) -> None:
|
||
"""孤立模块列表应按路径排序。"""
|
||
entries = [
|
||
FileEntry("utils/z.py", False, 50, ".py", False),
|
||
FileEntry("utils/a.py", False, 50, ".py", False),
|
||
FileEntry("cli/orphan.py", False, 50, ".py", False),
|
||
]
|
||
reachable: set[str] = set()
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
assert orphans == sorted(orphans)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# render_flow_report 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestRenderFlowReport:
|
||
"""流程树报告渲染测试。"""
|
||
|
||
def test_header_contains_timestamp_and_path(self) -> None:
|
||
"""报告头部应包含时间戳和仓库路径。"""
|
||
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
|
||
report = render_flow_report(trees, [], "/repo")
|
||
assert "生成时间:" in report
|
||
assert "`/repo`" in report
|
||
|
||
def test_contains_mermaid_block(self) -> None:
|
||
"""报告应包含 Mermaid 代码块。"""
|
||
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
|
||
report = render_flow_report(trees, [], "/repo")
|
||
assert "```mermaid" in report
|
||
assert "graph TD" in report
|
||
|
||
def test_contains_indented_text(self) -> None:
|
||
"""报告应包含缩进文本形式的流程树。"""
|
||
child = FlowNode("config.settings", "config/settings.py", "module", [])
|
||
root = FlowNode("cli.main", "cli/main.py", "entry", [child])
|
||
report = render_flow_report([root], [], "/repo")
|
||
assert "`cli.main`" in report
|
||
assert "`config.settings`" in report
|
||
|
||
def test_orphan_section(self) -> None:
|
||
"""报告应包含孤立模块列表。"""
|
||
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
|
||
orphans = ["utils/orphan.py", "models/unused.py"]
|
||
report = render_flow_report(trees, orphans, "/repo")
|
||
assert "孤立模块" in report
|
||
assert "`utils/orphan.py`" in report
|
||
assert "`models/unused.py`" in report
|
||
|
||
def test_no_orphans_message(self) -> None:
|
||
"""无孤立模块时应显示提示信息。"""
|
||
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
|
||
report = render_flow_report(trees, [], "/repo")
|
||
assert "未发现孤立模块" in report
|
||
|
||
def test_statistics_summary(self) -> None:
|
||
"""报告应包含统计摘要。"""
|
||
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
|
||
report = render_flow_report(trees, ["a.py"], "/repo")
|
||
assert "统计摘要" in report
|
||
assert "入口点" in report
|
||
assert "任务" in report
|
||
assert "加载器" in report
|
||
assert "孤立模块" in report
|
||
|
||
def test_task_type_annotation(self) -> None:
|
||
"""任务模块应带有类型标注。"""
|
||
task_node = FlowNode("tasks.ods_member", "tasks/ods_member.py", "module", [])
|
||
root = FlowNode("cli.main", "cli/main.py", "entry", [task_node])
|
||
report = render_flow_report([root], [], "/repo")
|
||
assert "ODS" in report
|
||
|
||
def test_loader_type_annotation(self) -> None:
|
||
"""加载器模块应带有类型标注。"""
|
||
loader_node = FlowNode(
|
||
"loaders.dimensions.member", "loaders/dimensions/member.py", "module", []
|
||
)
|
||
root = FlowNode("cli.main", "cli/main.py", "entry", [loader_node])
|
||
report = render_flow_report([root], [], "/repo")
|
||
assert "维度" in report or "SCD2" in report
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# discover_entry_points 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestDiscoverEntryPoints:
|
||
"""入口点识别测试。"""
|
||
|
||
def test_cli_entry(self, tmp_path: Path) -> None:
|
||
"""应识别 CLI 入口。"""
|
||
cli_dir = tmp_path / "cli"
|
||
cli_dir.mkdir()
|
||
(cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
|
||
|
||
entries = discover_entry_points(tmp_path)
|
||
cli_entries = [e for e in entries if e["type"] == "CLI"]
|
||
assert len(cli_entries) == 1
|
||
assert cli_entries[0]["file"] == "cli/main.py"
|
||
|
||
def test_gui_entry(self, tmp_path: Path) -> None:
|
||
"""应识别 GUI 入口。"""
|
||
gui_dir = tmp_path / "gui"
|
||
gui_dir.mkdir()
|
||
(gui_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
|
||
|
||
entries = discover_entry_points(tmp_path)
|
||
gui_entries = [e for e in entries if e["type"] == "GUI"]
|
||
assert len(gui_entries) == 1
|
||
|
||
def test_bat_entry(self, tmp_path: Path) -> None:
|
||
"""应识别批处理文件入口。"""
|
||
(tmp_path / "run_etl.bat").write_text(
|
||
"@echo off\npython -m cli.main %*\n", encoding="utf-8"
|
||
)
|
||
|
||
entries = discover_entry_points(tmp_path)
|
||
bat_entries = [e for e in entries if e["type"] == "批处理"]
|
||
assert len(bat_entries) == 1
|
||
assert "cli.main" in bat_entries[0]["description"]
|
||
|
||
def test_script_entry(self, tmp_path: Path) -> None:
|
||
"""应识别运维脚本入口。"""
|
||
scripts_dir = tmp_path / "scripts"
|
||
scripts_dir.mkdir()
|
||
(scripts_dir / "__init__.py").write_text("", encoding="utf-8")
|
||
(scripts_dir / "rebuild_db.py").write_text(
|
||
'if __name__ == "__main__": pass\n', encoding="utf-8"
|
||
)
|
||
|
||
entries = discover_entry_points(tmp_path)
|
||
script_entries = [e for e in entries if e["type"] == "运维脚本"]
|
||
assert len(script_entries) == 1
|
||
assert script_entries[0]["file"] == "scripts/rebuild_db.py"
|
||
|
||
def test_init_py_excluded_from_scripts(self, tmp_path: Path) -> None:
|
||
"""scripts/__init__.py 不应被识别为入口。"""
|
||
scripts_dir = tmp_path / "scripts"
|
||
scripts_dir.mkdir()
|
||
(scripts_dir / "__init__.py").write_text("", encoding="utf-8")
|
||
|
||
entries = discover_entry_points(tmp_path)
|
||
script_entries = [e for e in entries if e["type"] == "运维脚本"]
|
||
assert all(e["file"] != "scripts/__init__.py" for e in script_entries)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# classify_task_type / classify_loader_type 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestClassifyTypes:
|
||
"""任务类型和加载器类型区分测试。"""
|
||
|
||
def test_ods_task(self) -> None:
|
||
assert "ODS" in classify_task_type("tasks/ods_member.py")
|
||
|
||
def test_dwd_task(self) -> None:
|
||
assert "DWD" in classify_task_type("tasks/dwd_load.py")
|
||
|
||
def test_dws_task(self) -> None:
|
||
assert "DWS" in classify_task_type("tasks/dws/assistant_daily.py")
|
||
|
||
def test_verification_task(self) -> None:
|
||
assert "校验" in classify_task_type("tasks/verification/balance_check.py")
|
||
|
||
def test_schema_init_task(self) -> None:
|
||
assert "Schema" in classify_task_type("tasks/init_ods_schema.py")
|
||
|
||
def test_dimension_loader(self) -> None:
|
||
result = classify_loader_type("loaders/dimensions/member.py")
|
||
assert "维度" in result or "SCD2" in result
|
||
|
||
def test_fact_loader(self) -> None:
|
||
assert "事实" in classify_loader_type("loaders/facts/order.py")
|
||
|
||
def test_ods_loader(self) -> None:
|
||
assert "ODS" in classify_loader_type("loaders/ods/generic.py")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# _path_to_module_name 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestPathToModuleName:
|
||
"""路径到模块名转换测试。"""
|
||
|
||
def test_simple_file(self) -> None:
|
||
assert _path_to_module_name("cli/main.py") == "cli.main"
|
||
|
||
def test_init_file(self) -> None:
|
||
assert _path_to_module_name("cli/__init__.py") == "cli"
|
||
|
||
def test_nested_path(self) -> None:
|
||
assert _path_to_module_name("tasks/dws/assistant.py") == "tasks.dws.assistant"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# _parse_bat_python_target 单元测试
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class TestParseBatPythonTarget:
|
||
"""批处理文件 Python 命令解析测试。"""
|
||
|
||
def test_module_invocation(self, tmp_path: Path) -> None:
|
||
bat = tmp_path / "run.bat"
|
||
bat.write_text("@echo off\npython -m cli.main %*\n", encoding="utf-8")
|
||
assert _parse_bat_python_target(bat) == "cli.main"
|
||
|
||
def test_no_python_command(self, tmp_path: Path) -> None:
|
||
bat = tmp_path / "run.bat"
|
||
bat.write_text("@echo off\necho hello\n", encoding="utf-8")
|
||
assert _parse_bat_python_target(bat) is None
|
||
|
||
def test_nonexistent_file(self, tmp_path: Path) -> None:
|
||
assert _parse_bat_python_target(tmp_path / "missing.bat") is None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 属性测试 — Property 9 & 10(hypothesis)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
import os
|
||
import string
|
||
|
||
from hypothesis import given, settings, assume
|
||
from hypothesis import strategies as st
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 辅助:项目包名列表(与 flow_analyzer 中 _PROJECT_PACKAGES 一致)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_PROJECT_PACKAGES_LIST = [
|
||
"cli", "config", "api", "database", "tasks", "loaders",
|
||
"scd", "orchestration", "quality", "models", "utils",
|
||
"gui", "scripts",
|
||
]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Property 9: 流程树节点 source_file 有效性
|
||
# Feature: repo-audit, Property 9: 流程树节点 source_file 有效性
|
||
# Validates: Requirements 2.7
|
||
#
|
||
# 策略:在临时目录中随机生成 1~5 个项目内部模块文件,
|
||
# 其中一个作为入口,其他文件通过 import 语句相互引用。
|
||
# 构建流程树后,遍历所有节点验证 source_file 非空且文件存在。
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _collect_all_nodes(node: FlowNode) -> list[FlowNode]:
|
||
"""递归收集流程树中所有节点。"""
|
||
result = [node]
|
||
for child in node.children:
|
||
result.extend(_collect_all_nodes(child))
|
||
return result
|
||
|
||
|
||
# 生成合法的 Python 标识符作为模块文件名
|
||
_module_name_st = st.from_regex(r"[a-z][a-z0-9_]{0,8}", fullmatch=True).filter(
|
||
lambda s: s not in {"__init__", ""}
|
||
)
|
||
|
||
|
||
@st.composite
|
||
def project_layout(draw):
|
||
"""生成一个随机的项目布局:包名、模块文件名列表、以及模块间的 import 关系。
|
||
|
||
返回 (package, module_names, imports_map)
|
||
- package: 项目包名(如 "cli")
|
||
- module_names: 模块文件名列表(不含 .py 后缀),第一个为入口
|
||
- imports_map: dict[str, list[str]],每个模块导入的其他模块列表
|
||
"""
|
||
package = draw(st.sampled_from(_PROJECT_PACKAGES_LIST))
|
||
n_modules = draw(st.integers(min_value=1, max_value=5))
|
||
module_names = draw(
|
||
st.lists(
|
||
_module_name_st,
|
||
min_size=n_modules,
|
||
max_size=n_modules,
|
||
unique=True,
|
||
)
|
||
)
|
||
# 确保至少有一个模块
|
||
assume(len(module_names) >= 1)
|
||
|
||
# 为每个模块随机选择要导入的其他模块(子集)
|
||
imports_map: dict[str, list[str]] = {}
|
||
for i, mod in enumerate(module_names):
|
||
# 只能导入列表中的其他模块
|
||
others = [m for m in module_names if m != mod]
|
||
if others:
|
||
imported = draw(
|
||
st.lists(st.sampled_from(others), max_size=len(others), unique=True)
|
||
)
|
||
else:
|
||
imported = []
|
||
imports_map[mod] = imported
|
||
|
||
return package, module_names, imports_map
|
||
|
||
|
||
@given(layout=project_layout())
|
||
@settings(max_examples=100)
|
||
def test_property9_flow_tree_source_file_validity(layout, tmp_path_factory):
|
||
"""Property 9: 流程树中每个节点的 source_file 非空且对应文件在仓库中实际存在。
|
||
|
||
**Feature: repo-audit, Property 9: 流程树节点 source_file 有效性**
|
||
**Validates: Requirements 2.7**
|
||
"""
|
||
package, module_names, imports_map = layout
|
||
tmp_path = tmp_path_factory.mktemp("prop9")
|
||
|
||
# 创建包目录和 __init__.py
|
||
pkg_dir = tmp_path / package
|
||
pkg_dir.mkdir(parents=True, exist_ok=True)
|
||
(pkg_dir / "__init__.py").write_text("", encoding="utf-8")
|
||
|
||
# 创建每个模块文件,写入 import 语句
|
||
for mod in module_names:
|
||
lines = []
|
||
for imp in imports_map[mod]:
|
||
lines.append(f"from {package}.{imp} import *")
|
||
lines.append("") # 确保文件非空
|
||
(pkg_dir / f"{mod}.py").write_text("\n".join(lines), encoding="utf-8")
|
||
|
||
# 以第一个模块为入口构建流程树
|
||
entry_rel = f"{package}/{module_names[0]}.py"
|
||
tree = build_flow_tree(tmp_path, entry_rel)
|
||
|
||
# 遍历所有节点,验证 source_file 有效性
|
||
all_nodes = _collect_all_nodes(tree)
|
||
for node in all_nodes:
|
||
# source_file 应为非空字符串
|
||
assert isinstance(node.source_file, str), (
|
||
f"source_file 应为字符串,实际为 {type(node.source_file)}"
|
||
)
|
||
assert node.source_file != "", "source_file 不应为空字符串"
|
||
|
||
# 对应文件应在仓库中实际存在
|
||
full_path = tmp_path / node.source_file
|
||
assert full_path.exists(), (
|
||
f"source_file '{node.source_file}' 对应的文件不存在: {full_path}"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Property 10: 孤立模块检测正确性
|
||
# Feature: repo-audit, Property 10: 孤立模块检测正确性
|
||
# Validates: Requirements 2.8
|
||
#
|
||
# 策略:生成随机的 FileEntry 列表(模拟项目中的 .py 文件),
|
||
# 生成随机的 reachable 集合(是 FileEntry 路径的子集),
|
||
# 调用 find_orphan_modules 验证:
|
||
# 1. 返回的每个孤立模块都不在 reachable 集合中
|
||
# 2. reachable 集合中的每个模块都不在孤立列表中
|
||
#
|
||
# 注意:find_orphan_modules 会排除 __init__.py、tests/、scripts/audit/ 下的文件,
|
||
# 以及不属于 _PROJECT_PACKAGES 的子目录文件。生成器需要考虑这些排除规则。
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# 生成属于项目包的 .py 文件路径(排除被 find_orphan_modules 忽略的路径)
|
||
_eligible_packages = [
|
||
p for p in _PROJECT_PACKAGES_LIST
|
||
if p not in ("scripts",) # scripts 下只有 scripts/audit/ 会被排除,但为简化直接排除
|
||
]
|
||
|
||
|
||
@st.composite
|
||
def orphan_test_data(draw):
|
||
"""生成 (file_entries, reachable_set) 用于测试 find_orphan_modules。
|
||
|
||
只生成"合格"的文件条目(属于项目包、非 __init__.py、非 tests/、非 scripts/audit/),
|
||
这样可以精确验证 reachable 与 orphan 的互斥关系。
|
||
"""
|
||
# 生成 1~10 个合格的 .py 文件路径
|
||
n_files = draw(st.integers(min_value=1, max_value=10))
|
||
paths: list[str] = []
|
||
for _ in range(n_files):
|
||
pkg = draw(st.sampled_from(_eligible_packages))
|
||
fname = draw(_module_name_st)
|
||
path = f"{pkg}/{fname}.py"
|
||
paths.append(path)
|
||
|
||
# 去重
|
||
paths = list(dict.fromkeys(paths))
|
||
assume(len(paths) >= 1)
|
||
|
||
# 构建 FileEntry 列表
|
||
entries = [
|
||
FileEntry(rel_path=p, is_dir=False, size_bytes=100, extension=".py", is_empty_dir=False)
|
||
for p in paths
|
||
]
|
||
|
||
# 随机选择一个子集作为 reachable
|
||
reachable = set(draw(
|
||
st.lists(st.sampled_from(paths), max_size=len(paths), unique=True)
|
||
))
|
||
|
||
return entries, reachable
|
||
|
||
|
||
@given(data=orphan_test_data())
|
||
@settings(max_examples=100)
|
||
def test_property10_orphan_module_detection(data, tmp_path_factory):
|
||
"""Property 10: 孤立模块与可达模块互斥——孤立列表中的模块不在 reachable 中,
|
||
reachable 中的模块不在孤立列表中。
|
||
|
||
**Feature: repo-audit, Property 10: 孤立模块检测正确性**
|
||
**Validates: Requirements 2.8**
|
||
"""
|
||
entries, reachable = data
|
||
tmp_path = tmp_path_factory.mktemp("prop10")
|
||
|
||
orphans = find_orphan_modules(tmp_path, entries, reachable)
|
||
|
||
orphan_set = set(orphans)
|
||
|
||
# 验证 1: 孤立模块不应出现在 reachable 集合中
|
||
overlap = orphan_set & reachable
|
||
assert overlap == set(), (
|
||
f"孤立模块与可达集合存在交集: {overlap}"
|
||
)
|
||
|
||
# 验证 2: reachable 中的模块不应出现在孤立列表中
|
||
for r in reachable:
|
||
assert r not in orphan_set, (
|
||
f"可达模块 '{r}' 不应出现在孤立列表中"
|
||
)
|
||
|
||
# 验证 3: 孤立列表应已排序
|
||
assert orphans == sorted(orphans), "孤立模块列表应按路径排序"
|