# -*- coding: utf-8 -*- """ 单元测试 — 流程树分析器 (flow_analyzer.py) 覆盖: - parse_imports: import 语句解析、标准库/第三方排除、语法错误容错 - build_flow_tree: 递归构建、循环导入处理 - find_orphan_modules: 孤立模块检测 - render_flow_report: Markdown 渲染、Mermaid 图、统计摘要 - discover_entry_points: 入口点识别 - classify_task_type / classify_loader_type: 类型区分 """ from __future__ import annotations from pathlib import Path import pytest from scripts.audit import FileEntry, FlowNode from scripts.audit.flow_analyzer import ( build_flow_tree, classify_loader_type, classify_task_type, discover_entry_points, find_orphan_modules, parse_imports, render_flow_report, _path_to_module_name, _parse_bat_python_target, ) # --------------------------------------------------------------------------- # parse_imports 单元测试 # --------------------------------------------------------------------------- class TestParseImports: """import 语句解析测试。""" def test_absolute_import(self, tmp_path: Path) -> None: """绝对导入项目内部模块应被识别。""" f = tmp_path / "test.py" f.write_text("import cli.main\nimport config.settings\n", encoding="utf-8") result = parse_imports(f) assert "cli.main" in result assert "config.settings" in result def test_from_import(self, tmp_path: Path) -> None: """from ... import 语句应被识别。""" f = tmp_path / "test.py" f.write_text("from tasks.base_task import BaseTask\n", encoding="utf-8") result = parse_imports(f) assert "tasks.base_task" in result def test_stdlib_excluded(self, tmp_path: Path) -> None: """标准库模块应被排除。""" f = tmp_path / "test.py" f.write_text("import os\nimport sys\nimport json\nfrom pathlib import Path\n", encoding="utf-8") result = parse_imports(f) assert result == [] def test_third_party_excluded(self, tmp_path: Path) -> None: """第三方包应被排除。""" f = tmp_path / "test.py" f.write_text("import requests\nfrom psycopg2 import sql\nimport flask\n", encoding="utf-8") result = parse_imports(f) assert result == [] def test_mixed_imports(self, tmp_path: Path) -> None: """混合导入应只保留项目内部模块。""" f = tmp_path / "test.py" f.write_text( "import os\nimport cli.main\nimport requests\nfrom loaders.base_loader import BaseLoader\n", encoding="utf-8", ) result = parse_imports(f) assert "cli.main" in result assert "loaders.base_loader" in result assert "os" not in result assert "requests" not in result def test_syntax_error_returns_empty(self, tmp_path: Path) -> None: """语法错误的文件应返回空列表。""" f = tmp_path / "bad.py" f.write_text("def broken(\n", encoding="utf-8") result = parse_imports(f) assert result == [] def test_nonexistent_file_returns_empty(self, tmp_path: Path) -> None: """不存在的文件应返回空列表。""" result = parse_imports(tmp_path / "nonexistent.py") assert result == [] def test_deduplication(self, tmp_path: Path) -> None: """重复导入应去重。""" f = tmp_path / "test.py" f.write_text("import cli.main\nimport cli.main\nfrom cli.main import main\n", encoding="utf-8") result = parse_imports(f) assert result.count("cli.main") == 1 def test_empty_file(self, tmp_path: Path) -> None: """空文件应返回空列表。""" f = tmp_path / "empty.py" f.write_text("", encoding="utf-8") result = parse_imports(f) assert result == [] # --------------------------------------------------------------------------- # build_flow_tree 单元测试 # --------------------------------------------------------------------------- class TestBuildFlowTree: """流程树构建测试。""" def test_single_file_no_imports(self, tmp_path: Path) -> None: """无导入的单文件应生成叶节点。""" cli_dir = tmp_path / "cli" cli_dir.mkdir() (cli_dir / "__init__.py").write_text("", encoding="utf-8") (cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8") tree = build_flow_tree(tmp_path, "cli/main.py") assert tree.name == "cli.main" assert tree.source_file == "cli/main.py" assert tree.children == [] def test_simple_import_chain(self, tmp_path: Path) -> None: """简单导入链应正确构建子节点。""" # cli/main.py → config/settings.py cli_dir = tmp_path / "cli" cli_dir.mkdir() (cli_dir / "__init__.py").write_text("", encoding="utf-8") (cli_dir / "main.py").write_text( "from config.settings import AppConfig\n", encoding="utf-8" ) config_dir = tmp_path / "config" config_dir.mkdir() (config_dir / "__init__.py").write_text("", encoding="utf-8") (config_dir / "settings.py").write_text("class AppConfig: pass\n", encoding="utf-8") tree = build_flow_tree(tmp_path, "cli/main.py") assert tree.name == "cli.main" assert len(tree.children) == 1 assert tree.children[0].name == "config.settings" def test_circular_import_no_infinite_loop(self, tmp_path: Path) -> None: """循环导入不应导致无限递归。""" pkg = tmp_path / "utils" pkg.mkdir() (pkg / "__init__.py").write_text("", encoding="utf-8") # a → b → a(循环) (pkg / "a.py").write_text("from utils.b import func_b\n", encoding="utf-8") (pkg / "b.py").write_text("from utils.a import func_a\n", encoding="utf-8") # 不应抛出 RecursionError tree = build_flow_tree(tmp_path, "utils/a.py") assert tree.name == "utils.a" def test_entry_node_type(self, tmp_path: Path) -> None: """CLI 入口文件应标记为 entry 类型。""" cli_dir = tmp_path / "cli" cli_dir.mkdir() (cli_dir / "__init__.py").write_text("", encoding="utf-8") (cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8") tree = build_flow_tree(tmp_path, "cli/main.py") assert tree.node_type == "entry" # --------------------------------------------------------------------------- # find_orphan_modules 单元测试 # --------------------------------------------------------------------------- class TestFindOrphanModules: """孤立模块检测测试。""" def test_all_reachable(self, tmp_path: Path) -> None: """所有模块都可达时应返回空列表。""" entries = [ FileEntry("cli/main.py", False, 100, ".py", False), FileEntry("config/settings.py", False, 200, ".py", False), ] reachable = {"cli/main.py", "config/settings.py"} orphans = find_orphan_modules(tmp_path, entries, reachable) assert orphans == [] def test_orphan_detected(self, tmp_path: Path) -> None: """不可达的模块应被标记为孤立。""" entries = [ FileEntry("cli/main.py", False, 100, ".py", False), FileEntry("utils/orphan.py", False, 50, ".py", False), ] reachable = {"cli/main.py"} orphans = find_orphan_modules(tmp_path, entries, reachable) assert "utils/orphan.py" in orphans def test_init_files_excluded(self, tmp_path: Path) -> None: """__init__.py 不应被视为孤立模块。""" entries = [ FileEntry("cli/__init__.py", False, 0, ".py", False), ] reachable: set[str] = set() orphans = find_orphan_modules(tmp_path, entries, reachable) assert "cli/__init__.py" not in orphans def test_test_files_excluded(self, tmp_path: Path) -> None: """测试文件不应被视为孤立模块。""" entries = [ FileEntry("tests/unit/test_something.py", False, 100, ".py", False), ] reachable: set[str] = set() orphans = find_orphan_modules(tmp_path, entries, reachable) assert orphans == [] def test_audit_scripts_excluded(self, tmp_path: Path) -> None: """审计脚本自身不应被视为孤立模块。""" entries = [ FileEntry("scripts/audit/scanner.py", False, 100, ".py", False), ] reachable: set[str] = set() orphans = find_orphan_modules(tmp_path, entries, reachable) assert orphans == [] def test_directories_excluded(self, tmp_path: Path) -> None: """目录条目不应出现在孤立列表中。""" entries = [ FileEntry("cli", True, 0, "", False), ] reachable: set[str] = set() orphans = find_orphan_modules(tmp_path, entries, reachable) assert orphans == [] def test_sorted_output(self, tmp_path: Path) -> None: """孤立模块列表应按路径排序。""" entries = [ FileEntry("utils/z.py", False, 50, ".py", False), FileEntry("utils/a.py", False, 50, ".py", False), FileEntry("cli/orphan.py", False, 50, ".py", False), ] reachable: set[str] = set() orphans = find_orphan_modules(tmp_path, entries, reachable) assert orphans == sorted(orphans) # --------------------------------------------------------------------------- # render_flow_report 单元测试 # --------------------------------------------------------------------------- class TestRenderFlowReport: """流程树报告渲染测试。""" def test_header_contains_timestamp_and_path(self) -> None: """报告头部应包含时间戳和仓库路径。""" trees = [FlowNode("cli.main", "cli/main.py", "entry", [])] report = render_flow_report(trees, [], "/repo") assert "生成时间:" in report assert "`/repo`" in report def test_contains_mermaid_block(self) -> None: """报告应包含 Mermaid 代码块。""" trees = [FlowNode("cli.main", "cli/main.py", "entry", [])] report = render_flow_report(trees, [], "/repo") assert "```mermaid" in report assert "graph TD" in report def test_contains_indented_text(self) -> None: """报告应包含缩进文本形式的流程树。""" child = FlowNode("config.settings", "config/settings.py", "module", []) root = FlowNode("cli.main", "cli/main.py", "entry", [child]) report = render_flow_report([root], [], "/repo") assert "`cli.main`" in report assert "`config.settings`" in report def test_orphan_section(self) -> None: """报告应包含孤立模块列表。""" trees = [FlowNode("cli.main", "cli/main.py", "entry", [])] orphans = ["utils/orphan.py", "models/unused.py"] report = render_flow_report(trees, orphans, "/repo") assert "孤立模块" in report assert "`utils/orphan.py`" in report assert "`models/unused.py`" in report def test_no_orphans_message(self) -> None: """无孤立模块时应显示提示信息。""" trees = [FlowNode("cli.main", "cli/main.py", "entry", [])] report = render_flow_report(trees, [], "/repo") assert "未发现孤立模块" in report def test_statistics_summary(self) -> None: """报告应包含统计摘要。""" trees = [FlowNode("cli.main", "cli/main.py", "entry", [])] report = render_flow_report(trees, ["a.py"], "/repo") assert "统计摘要" in report assert "入口点" in report assert "任务" in report assert "加载器" in report assert "孤立模块" in report def test_task_type_annotation(self) -> None: """任务模块应带有类型标注。""" task_node = FlowNode("tasks.ods_member", "tasks/ods_member.py", "module", []) root = FlowNode("cli.main", "cli/main.py", "entry", [task_node]) report = render_flow_report([root], [], "/repo") assert "ODS" in report def test_loader_type_annotation(self) -> None: """加载器模块应带有类型标注。""" loader_node = FlowNode( "loaders.dimensions.member", "loaders/dimensions/member.py", "module", [] ) root = FlowNode("cli.main", "cli/main.py", "entry", [loader_node]) report = render_flow_report([root], [], "/repo") assert "维度" in report or "SCD2" in report # --------------------------------------------------------------------------- # discover_entry_points 单元测试 # --------------------------------------------------------------------------- class TestDiscoverEntryPoints: """入口点识别测试。""" def test_cli_entry(self, tmp_path: Path) -> None: """应识别 CLI 入口。""" cli_dir = tmp_path / "cli" cli_dir.mkdir() (cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8") entries = discover_entry_points(tmp_path) cli_entries = [e for e in entries if e["type"] == "CLI"] assert len(cli_entries) == 1 assert cli_entries[0]["file"] == "cli/main.py" def test_gui_entry(self, tmp_path: Path) -> None: """应识别 GUI 入口。""" gui_dir = tmp_path / "gui" gui_dir.mkdir() (gui_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8") entries = discover_entry_points(tmp_path) gui_entries = [e for e in entries if e["type"] == "GUI"] assert len(gui_entries) == 1 def test_bat_entry(self, tmp_path: Path) -> None: """应识别批处理文件入口。""" (tmp_path / "run_etl.bat").write_text( "@echo off\npython -m cli.main %*\n", encoding="utf-8" ) entries = discover_entry_points(tmp_path) bat_entries = [e for e in entries if e["type"] == "批处理"] assert len(bat_entries) == 1 assert "cli.main" in bat_entries[0]["description"] def test_script_entry(self, tmp_path: Path) -> None: """应识别运维脚本入口。""" scripts_dir = tmp_path / "scripts" scripts_dir.mkdir() (scripts_dir / "__init__.py").write_text("", encoding="utf-8") (scripts_dir / "rebuild_db.py").write_text( 'if __name__ == "__main__": pass\n', encoding="utf-8" ) entries = discover_entry_points(tmp_path) script_entries = [e for e in entries if e["type"] == "运维脚本"] assert len(script_entries) == 1 assert script_entries[0]["file"] == "scripts/rebuild_db.py" def test_init_py_excluded_from_scripts(self, tmp_path: Path) -> None: """scripts/__init__.py 不应被识别为入口。""" scripts_dir = tmp_path / "scripts" scripts_dir.mkdir() (scripts_dir / "__init__.py").write_text("", encoding="utf-8") entries = discover_entry_points(tmp_path) script_entries = [e for e in entries if e["type"] == "运维脚本"] assert all(e["file"] != "scripts/__init__.py" for e in script_entries) # --------------------------------------------------------------------------- # classify_task_type / classify_loader_type 单元测试 # --------------------------------------------------------------------------- class TestClassifyTypes: """任务类型和加载器类型区分测试。""" def test_ods_task(self) -> None: assert "ODS" in classify_task_type("tasks/ods_member.py") def test_dwd_task(self) -> None: assert "DWD" in classify_task_type("tasks/dwd_load.py") def test_dws_task(self) -> None: assert "DWS" in classify_task_type("tasks/dws/assistant_daily.py") def test_verification_task(self) -> None: assert "校验" in classify_task_type("tasks/verification/balance_check.py") def test_schema_init_task(self) -> None: assert "Schema" in classify_task_type("tasks/init_ods_schema.py") def test_dimension_loader(self) -> None: result = classify_loader_type("loaders/dimensions/member.py") assert "维度" in result or "SCD2" in result def test_fact_loader(self) -> None: assert "事实" in classify_loader_type("loaders/facts/order.py") def test_ods_loader(self) -> None: assert "ODS" in classify_loader_type("loaders/ods/generic.py") # --------------------------------------------------------------------------- # _path_to_module_name 单元测试 # --------------------------------------------------------------------------- class TestPathToModuleName: """路径到模块名转换测试。""" def test_simple_file(self) -> None: assert _path_to_module_name("cli/main.py") == "cli.main" def test_init_file(self) -> None: assert _path_to_module_name("cli/__init__.py") == "cli" def test_nested_path(self) -> None: assert _path_to_module_name("tasks/dws/assistant.py") == "tasks.dws.assistant" # --------------------------------------------------------------------------- # _parse_bat_python_target 单元测试 # --------------------------------------------------------------------------- class TestParseBatPythonTarget: """批处理文件 Python 命令解析测试。""" def test_module_invocation(self, tmp_path: Path) -> None: bat = tmp_path / "run.bat" bat.write_text("@echo off\npython -m cli.main %*\n", encoding="utf-8") assert _parse_bat_python_target(bat) == "cli.main" def test_no_python_command(self, tmp_path: Path) -> None: bat = tmp_path / "run.bat" bat.write_text("@echo off\necho hello\n", encoding="utf-8") assert _parse_bat_python_target(bat) is None def test_nonexistent_file(self, tmp_path: Path) -> None: assert _parse_bat_python_target(tmp_path / "missing.bat") is None # --------------------------------------------------------------------------- # 属性测试 — Property 9 & 10(hypothesis) # --------------------------------------------------------------------------- import os import string from hypothesis import given, settings, assume from hypothesis import strategies as st # --------------------------------------------------------------------------- # 辅助:项目包名列表(与 flow_analyzer 中 _PROJECT_PACKAGES 一致) # --------------------------------------------------------------------------- _PROJECT_PACKAGES_LIST = [ "cli", "config", "api", "database", "tasks", "loaders", "scd", "orchestration", "quality", "models", "utils", "gui", "scripts", ] # --------------------------------------------------------------------------- # Property 9: 流程树节点 source_file 有效性 # Feature: repo-audit, Property 9: 流程树节点 source_file 有效性 # Validates: Requirements 2.7 # # 策略:在临时目录中随机生成 1~5 个项目内部模块文件, # 其中一个作为入口,其他文件通过 import 语句相互引用。 # 构建流程树后,遍历所有节点验证 source_file 非空且文件存在。 # --------------------------------------------------------------------------- def _collect_all_nodes(node: FlowNode) -> list[FlowNode]: """递归收集流程树中所有节点。""" result = [node] for child in node.children: result.extend(_collect_all_nodes(child)) return result # 生成合法的 Python 标识符作为模块文件名 _module_name_st = st.from_regex(r"[a-z][a-z0-9_]{0,8}", fullmatch=True).filter( lambda s: s not in {"__init__", ""} ) @st.composite def project_layout(draw): """生成一个随机的项目布局:包名、模块文件名列表、以及模块间的 import 关系。 返回 (package, module_names, imports_map) - package: 项目包名(如 "cli") - module_names: 模块文件名列表(不含 .py 后缀),第一个为入口 - imports_map: dict[str, list[str]],每个模块导入的其他模块列表 """ package = draw(st.sampled_from(_PROJECT_PACKAGES_LIST)) n_modules = draw(st.integers(min_value=1, max_value=5)) module_names = draw( st.lists( _module_name_st, min_size=n_modules, max_size=n_modules, unique=True, ) ) # 确保至少有一个模块 assume(len(module_names) >= 1) # 为每个模块随机选择要导入的其他模块(子集) imports_map: dict[str, list[str]] = {} for i, mod in enumerate(module_names): # 只能导入列表中的其他模块 others = [m for m in module_names if m != mod] if others: imported = draw( st.lists(st.sampled_from(others), max_size=len(others), unique=True) ) else: imported = [] imports_map[mod] = imported return package, module_names, imports_map @given(layout=project_layout()) @settings(max_examples=100) def test_property9_flow_tree_source_file_validity(layout, tmp_path_factory): """Property 9: 流程树中每个节点的 source_file 非空且对应文件在仓库中实际存在。 **Feature: repo-audit, Property 9: 流程树节点 source_file 有效性** **Validates: Requirements 2.7** """ package, module_names, imports_map = layout tmp_path = tmp_path_factory.mktemp("prop9") # 创建包目录和 __init__.py pkg_dir = tmp_path / package pkg_dir.mkdir(parents=True, exist_ok=True) (pkg_dir / "__init__.py").write_text("", encoding="utf-8") # 创建每个模块文件,写入 import 语句 for mod in module_names: lines = [] for imp in imports_map[mod]: lines.append(f"from {package}.{imp} import *") lines.append("") # 确保文件非空 (pkg_dir / f"{mod}.py").write_text("\n".join(lines), encoding="utf-8") # 以第一个模块为入口构建流程树 entry_rel = f"{package}/{module_names[0]}.py" tree = build_flow_tree(tmp_path, entry_rel) # 遍历所有节点,验证 source_file 有效性 all_nodes = _collect_all_nodes(tree) for node in all_nodes: # source_file 应为非空字符串 assert isinstance(node.source_file, str), ( f"source_file 应为字符串,实际为 {type(node.source_file)}" ) assert node.source_file != "", "source_file 不应为空字符串" # 对应文件应在仓库中实际存在 full_path = tmp_path / node.source_file assert full_path.exists(), ( f"source_file '{node.source_file}' 对应的文件不存在: {full_path}" ) # --------------------------------------------------------------------------- # Property 10: 孤立模块检测正确性 # Feature: repo-audit, Property 10: 孤立模块检测正确性 # Validates: Requirements 2.8 # # 策略:生成随机的 FileEntry 列表(模拟项目中的 .py 文件), # 生成随机的 reachable 集合(是 FileEntry 路径的子集), # 调用 find_orphan_modules 验证: # 1. 返回的每个孤立模块都不在 reachable 集合中 # 2. reachable 集合中的每个模块都不在孤立列表中 # # 注意:find_orphan_modules 会排除 __init__.py、tests/、scripts/audit/ 下的文件, # 以及不属于 _PROJECT_PACKAGES 的子目录文件。生成器需要考虑这些排除规则。 # --------------------------------------------------------------------------- # 生成属于项目包的 .py 文件路径(排除被 find_orphan_modules 忽略的路径) _eligible_packages = [ p for p in _PROJECT_PACKAGES_LIST if p not in ("scripts",) # scripts 下只有 scripts/audit/ 会被排除,但为简化直接排除 ] @st.composite def orphan_test_data(draw): """生成 (file_entries, reachable_set) 用于测试 find_orphan_modules。 只生成"合格"的文件条目(属于项目包、非 __init__.py、非 tests/、非 scripts/audit/), 这样可以精确验证 reachable 与 orphan 的互斥关系。 """ # 生成 1~10 个合格的 .py 文件路径 n_files = draw(st.integers(min_value=1, max_value=10)) paths: list[str] = [] for _ in range(n_files): pkg = draw(st.sampled_from(_eligible_packages)) fname = draw(_module_name_st) path = f"{pkg}/{fname}.py" paths.append(path) # 去重 paths = list(dict.fromkeys(paths)) assume(len(paths) >= 1) # 构建 FileEntry 列表 entries = [ FileEntry(rel_path=p, is_dir=False, size_bytes=100, extension=".py", is_empty_dir=False) for p in paths ] # 随机选择一个子集作为 reachable reachable = set(draw( st.lists(st.sampled_from(paths), max_size=len(paths), unique=True) )) return entries, reachable @given(data=orphan_test_data()) @settings(max_examples=100) def test_property10_orphan_module_detection(data, tmp_path_factory): """Property 10: 孤立模块与可达模块互斥——孤立列表中的模块不在 reachable 中, reachable 中的模块不在孤立列表中。 **Feature: repo-audit, Property 10: 孤立模块检测正确性** **Validates: Requirements 2.8** """ entries, reachable = data tmp_path = tmp_path_factory.mktemp("prop10") orphans = find_orphan_modules(tmp_path, entries, reachable) orphan_set = set(orphans) # 验证 1: 孤立模块不应出现在 reachable 集合中 overlap = orphan_set & reachable assert overlap == set(), ( f"孤立模块与可达集合存在交集: {overlap}" ) # 验证 2: reachable 中的模块不应出现在孤立列表中 for r in reachable: assert r not in orphan_set, ( f"可达模块 '{r}' 不应出现在孤立列表中" ) # 验证 3: 孤立列表应已排序 assert orphans == sorted(orphans), "孤立模块列表应按路径排序"