Files
ZQYY.FQ-ETL/tests/unit/test_audit_flow.py

668 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
单元测试 — 流程树分析器 (flow_analyzer.py)
覆盖:
- parse_imports: import 语句解析、标准库/第三方排除、语法错误容错
- build_flow_tree: 递归构建、循环导入处理
- find_orphan_modules: 孤立模块检测
- render_flow_report: Markdown 渲染、Mermaid 图、统计摘要
- discover_entry_points: 入口点识别
- classify_task_type / classify_loader_type: 类型区分
"""
from __future__ import annotations
from pathlib import Path
import pytest
from scripts.audit import FileEntry, FlowNode
from scripts.audit.flow_analyzer import (
build_flow_tree,
classify_loader_type,
classify_task_type,
discover_entry_points,
find_orphan_modules,
parse_imports,
render_flow_report,
_path_to_module_name,
_parse_bat_python_target,
)
# ---------------------------------------------------------------------------
# parse_imports 单元测试
# ---------------------------------------------------------------------------
class TestParseImports:
"""import 语句解析测试。"""
def test_absolute_import(self, tmp_path: Path) -> None:
"""绝对导入项目内部模块应被识别。"""
f = tmp_path / "test.py"
f.write_text("import cli.main\nimport config.settings\n", encoding="utf-8")
result = parse_imports(f)
assert "cli.main" in result
assert "config.settings" in result
def test_from_import(self, tmp_path: Path) -> None:
"""from ... import 语句应被识别。"""
f = tmp_path / "test.py"
f.write_text("from tasks.base_task import BaseTask\n", encoding="utf-8")
result = parse_imports(f)
assert "tasks.base_task" in result
def test_stdlib_excluded(self, tmp_path: Path) -> None:
"""标准库模块应被排除。"""
f = tmp_path / "test.py"
f.write_text("import os\nimport sys\nimport json\nfrom pathlib import Path\n", encoding="utf-8")
result = parse_imports(f)
assert result == []
def test_third_party_excluded(self, tmp_path: Path) -> None:
"""第三方包应被排除。"""
f = tmp_path / "test.py"
f.write_text("import requests\nfrom psycopg2 import sql\nimport flask\n", encoding="utf-8")
result = parse_imports(f)
assert result == []
def test_mixed_imports(self, tmp_path: Path) -> None:
"""混合导入应只保留项目内部模块。"""
f = tmp_path / "test.py"
f.write_text(
"import os\nimport cli.main\nimport requests\nfrom loaders.base_loader import BaseLoader\n",
encoding="utf-8",
)
result = parse_imports(f)
assert "cli.main" in result
assert "loaders.base_loader" in result
assert "os" not in result
assert "requests" not in result
def test_syntax_error_returns_empty(self, tmp_path: Path) -> None:
"""语法错误的文件应返回空列表。"""
f = tmp_path / "bad.py"
f.write_text("def broken(\n", encoding="utf-8")
result = parse_imports(f)
assert result == []
def test_nonexistent_file_returns_empty(self, tmp_path: Path) -> None:
"""不存在的文件应返回空列表。"""
result = parse_imports(tmp_path / "nonexistent.py")
assert result == []
def test_deduplication(self, tmp_path: Path) -> None:
"""重复导入应去重。"""
f = tmp_path / "test.py"
f.write_text("import cli.main\nimport cli.main\nfrom cli.main import main\n", encoding="utf-8")
result = parse_imports(f)
assert result.count("cli.main") == 1
def test_empty_file(self, tmp_path: Path) -> None:
"""空文件应返回空列表。"""
f = tmp_path / "empty.py"
f.write_text("", encoding="utf-8")
result = parse_imports(f)
assert result == []
# ---------------------------------------------------------------------------
# build_flow_tree 单元测试
# ---------------------------------------------------------------------------
class TestBuildFlowTree:
"""流程树构建测试。"""
def test_single_file_no_imports(self, tmp_path: Path) -> None:
"""无导入的单文件应生成叶节点。"""
cli_dir = tmp_path / "cli"
cli_dir.mkdir()
(cli_dir / "__init__.py").write_text("", encoding="utf-8")
(cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
tree = build_flow_tree(tmp_path, "cli/main.py")
assert tree.name == "cli.main"
assert tree.source_file == "cli/main.py"
assert tree.children == []
def test_simple_import_chain(self, tmp_path: Path) -> None:
"""简单导入链应正确构建子节点。"""
# cli/main.py → config/settings.py
cli_dir = tmp_path / "cli"
cli_dir.mkdir()
(cli_dir / "__init__.py").write_text("", encoding="utf-8")
(cli_dir / "main.py").write_text(
"from config.settings import AppConfig\n", encoding="utf-8"
)
config_dir = tmp_path / "config"
config_dir.mkdir()
(config_dir / "__init__.py").write_text("", encoding="utf-8")
(config_dir / "settings.py").write_text("class AppConfig: pass\n", encoding="utf-8")
tree = build_flow_tree(tmp_path, "cli/main.py")
assert tree.name == "cli.main"
assert len(tree.children) == 1
assert tree.children[0].name == "config.settings"
def test_circular_import_no_infinite_loop(self, tmp_path: Path) -> None:
"""循环导入不应导致无限递归。"""
pkg = tmp_path / "utils"
pkg.mkdir()
(pkg / "__init__.py").write_text("", encoding="utf-8")
# a → b → a循环
(pkg / "a.py").write_text("from utils.b import func_b\n", encoding="utf-8")
(pkg / "b.py").write_text("from utils.a import func_a\n", encoding="utf-8")
# 不应抛出 RecursionError
tree = build_flow_tree(tmp_path, "utils/a.py")
assert tree.name == "utils.a"
def test_entry_node_type(self, tmp_path: Path) -> None:
"""CLI 入口文件应标记为 entry 类型。"""
cli_dir = tmp_path / "cli"
cli_dir.mkdir()
(cli_dir / "__init__.py").write_text("", encoding="utf-8")
(cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
tree = build_flow_tree(tmp_path, "cli/main.py")
assert tree.node_type == "entry"
# ---------------------------------------------------------------------------
# find_orphan_modules 单元测试
# ---------------------------------------------------------------------------
class TestFindOrphanModules:
"""孤立模块检测测试。"""
def test_all_reachable(self, tmp_path: Path) -> None:
"""所有模块都可达时应返回空列表。"""
entries = [
FileEntry("cli/main.py", False, 100, ".py", False),
FileEntry("config/settings.py", False, 200, ".py", False),
]
reachable = {"cli/main.py", "config/settings.py"}
orphans = find_orphan_modules(tmp_path, entries, reachable)
assert orphans == []
def test_orphan_detected(self, tmp_path: Path) -> None:
"""不可达的模块应被标记为孤立。"""
entries = [
FileEntry("cli/main.py", False, 100, ".py", False),
FileEntry("utils/orphan.py", False, 50, ".py", False),
]
reachable = {"cli/main.py"}
orphans = find_orphan_modules(tmp_path, entries, reachable)
assert "utils/orphan.py" in orphans
def test_init_files_excluded(self, tmp_path: Path) -> None:
"""__init__.py 不应被视为孤立模块。"""
entries = [
FileEntry("cli/__init__.py", False, 0, ".py", False),
]
reachable: set[str] = set()
orphans = find_orphan_modules(tmp_path, entries, reachable)
assert "cli/__init__.py" not in orphans
def test_test_files_excluded(self, tmp_path: Path) -> None:
"""测试文件不应被视为孤立模块。"""
entries = [
FileEntry("tests/unit/test_something.py", False, 100, ".py", False),
]
reachable: set[str] = set()
orphans = find_orphan_modules(tmp_path, entries, reachable)
assert orphans == []
def test_audit_scripts_excluded(self, tmp_path: Path) -> None:
"""审计脚本自身不应被视为孤立模块。"""
entries = [
FileEntry("scripts/audit/scanner.py", False, 100, ".py", False),
]
reachable: set[str] = set()
orphans = find_orphan_modules(tmp_path, entries, reachable)
assert orphans == []
def test_directories_excluded(self, tmp_path: Path) -> None:
"""目录条目不应出现在孤立列表中。"""
entries = [
FileEntry("cli", True, 0, "", False),
]
reachable: set[str] = set()
orphans = find_orphan_modules(tmp_path, entries, reachable)
assert orphans == []
def test_sorted_output(self, tmp_path: Path) -> None:
"""孤立模块列表应按路径排序。"""
entries = [
FileEntry("utils/z.py", False, 50, ".py", False),
FileEntry("utils/a.py", False, 50, ".py", False),
FileEntry("cli/orphan.py", False, 50, ".py", False),
]
reachable: set[str] = set()
orphans = find_orphan_modules(tmp_path, entries, reachable)
assert orphans == sorted(orphans)
# ---------------------------------------------------------------------------
# render_flow_report 单元测试
# ---------------------------------------------------------------------------
class TestRenderFlowReport:
"""流程树报告渲染测试。"""
def test_header_contains_timestamp_and_path(self) -> None:
"""报告头部应包含时间戳和仓库路径。"""
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
report = render_flow_report(trees, [], "/repo")
assert "生成时间:" in report
assert "`/repo`" in report
def test_contains_mermaid_block(self) -> None:
"""报告应包含 Mermaid 代码块。"""
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
report = render_flow_report(trees, [], "/repo")
assert "```mermaid" in report
assert "graph TD" in report
def test_contains_indented_text(self) -> None:
"""报告应包含缩进文本形式的流程树。"""
child = FlowNode("config.settings", "config/settings.py", "module", [])
root = FlowNode("cli.main", "cli/main.py", "entry", [child])
report = render_flow_report([root], [], "/repo")
assert "`cli.main`" in report
assert "`config.settings`" in report
def test_orphan_section(self) -> None:
"""报告应包含孤立模块列表。"""
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
orphans = ["utils/orphan.py", "models/unused.py"]
report = render_flow_report(trees, orphans, "/repo")
assert "孤立模块" in report
assert "`utils/orphan.py`" in report
assert "`models/unused.py`" in report
def test_no_orphans_message(self) -> None:
"""无孤立模块时应显示提示信息。"""
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
report = render_flow_report(trees, [], "/repo")
assert "未发现孤立模块" in report
def test_statistics_summary(self) -> None:
"""报告应包含统计摘要。"""
trees = [FlowNode("cli.main", "cli/main.py", "entry", [])]
report = render_flow_report(trees, ["a.py"], "/repo")
assert "统计摘要" in report
assert "入口点" in report
assert "任务" in report
assert "加载器" in report
assert "孤立模块" in report
def test_task_type_annotation(self) -> None:
"""任务模块应带有类型标注。"""
task_node = FlowNode("tasks.ods_member", "tasks/ods_member.py", "module", [])
root = FlowNode("cli.main", "cli/main.py", "entry", [task_node])
report = render_flow_report([root], [], "/repo")
assert "ODS" in report
def test_loader_type_annotation(self) -> None:
"""加载器模块应带有类型标注。"""
loader_node = FlowNode(
"loaders.dimensions.member", "loaders/dimensions/member.py", "module", []
)
root = FlowNode("cli.main", "cli/main.py", "entry", [loader_node])
report = render_flow_report([root], [], "/repo")
assert "维度" in report or "SCD2" in report
# ---------------------------------------------------------------------------
# discover_entry_points 单元测试
# ---------------------------------------------------------------------------
class TestDiscoverEntryPoints:
"""入口点识别测试。"""
def test_cli_entry(self, tmp_path: Path) -> None:
"""应识别 CLI 入口。"""
cli_dir = tmp_path / "cli"
cli_dir.mkdir()
(cli_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
entries = discover_entry_points(tmp_path)
cli_entries = [e for e in entries if e["type"] == "CLI"]
assert len(cli_entries) == 1
assert cli_entries[0]["file"] == "cli/main.py"
def test_gui_entry(self, tmp_path: Path) -> None:
"""应识别 GUI 入口。"""
gui_dir = tmp_path / "gui"
gui_dir.mkdir()
(gui_dir / "main.py").write_text("def main(): pass\n", encoding="utf-8")
entries = discover_entry_points(tmp_path)
gui_entries = [e for e in entries if e["type"] == "GUI"]
assert len(gui_entries) == 1
def test_bat_entry(self, tmp_path: Path) -> None:
"""应识别批处理文件入口。"""
(tmp_path / "run_etl.bat").write_text(
"@echo off\npython -m cli.main %*\n", encoding="utf-8"
)
entries = discover_entry_points(tmp_path)
bat_entries = [e for e in entries if e["type"] == "批处理"]
assert len(bat_entries) == 1
assert "cli.main" in bat_entries[0]["description"]
def test_script_entry(self, tmp_path: Path) -> None:
"""应识别运维脚本入口。"""
scripts_dir = tmp_path / "scripts"
scripts_dir.mkdir()
(scripts_dir / "__init__.py").write_text("", encoding="utf-8")
(scripts_dir / "rebuild_db.py").write_text(
'if __name__ == "__main__": pass\n', encoding="utf-8"
)
entries = discover_entry_points(tmp_path)
script_entries = [e for e in entries if e["type"] == "运维脚本"]
assert len(script_entries) == 1
assert script_entries[0]["file"] == "scripts/rebuild_db.py"
def test_init_py_excluded_from_scripts(self, tmp_path: Path) -> None:
"""scripts/__init__.py 不应被识别为入口。"""
scripts_dir = tmp_path / "scripts"
scripts_dir.mkdir()
(scripts_dir / "__init__.py").write_text("", encoding="utf-8")
entries = discover_entry_points(tmp_path)
script_entries = [e for e in entries if e["type"] == "运维脚本"]
assert all(e["file"] != "scripts/__init__.py" for e in script_entries)
# ---------------------------------------------------------------------------
# classify_task_type / classify_loader_type 单元测试
# ---------------------------------------------------------------------------
class TestClassifyTypes:
"""任务类型和加载器类型区分测试。"""
def test_ods_task(self) -> None:
assert "ODS" in classify_task_type("tasks/ods_member.py")
def test_dwd_task(self) -> None:
assert "DWD" in classify_task_type("tasks/dwd_load.py")
def test_dws_task(self) -> None:
assert "DWS" in classify_task_type("tasks/dws/assistant_daily.py")
def test_verification_task(self) -> None:
assert "校验" in classify_task_type("tasks/verification/balance_check.py")
def test_schema_init_task(self) -> None:
assert "Schema" in classify_task_type("tasks/init_ods_schema.py")
def test_dimension_loader(self) -> None:
result = classify_loader_type("loaders/dimensions/member.py")
assert "维度" in result or "SCD2" in result
def test_fact_loader(self) -> None:
assert "事实" in classify_loader_type("loaders/facts/order.py")
def test_ods_loader(self) -> None:
assert "ODS" in classify_loader_type("loaders/ods/generic.py")
# ---------------------------------------------------------------------------
# _path_to_module_name 单元测试
# ---------------------------------------------------------------------------
class TestPathToModuleName:
"""路径到模块名转换测试。"""
def test_simple_file(self) -> None:
assert _path_to_module_name("cli/main.py") == "cli.main"
def test_init_file(self) -> None:
assert _path_to_module_name("cli/__init__.py") == "cli"
def test_nested_path(self) -> None:
assert _path_to_module_name("tasks/dws/assistant.py") == "tasks.dws.assistant"
# ---------------------------------------------------------------------------
# _parse_bat_python_target 单元测试
# ---------------------------------------------------------------------------
class TestParseBatPythonTarget:
"""批处理文件 Python 命令解析测试。"""
def test_module_invocation(self, tmp_path: Path) -> None:
bat = tmp_path / "run.bat"
bat.write_text("@echo off\npython -m cli.main %*\n", encoding="utf-8")
assert _parse_bat_python_target(bat) == "cli.main"
def test_no_python_command(self, tmp_path: Path) -> None:
bat = tmp_path / "run.bat"
bat.write_text("@echo off\necho hello\n", encoding="utf-8")
assert _parse_bat_python_target(bat) is None
def test_nonexistent_file(self, tmp_path: Path) -> None:
assert _parse_bat_python_target(tmp_path / "missing.bat") is None
# ---------------------------------------------------------------------------
# 属性测试 — Property 9 & 10hypothesis
# ---------------------------------------------------------------------------
import os
import string
from hypothesis import given, settings, assume
from hypothesis import strategies as st
# ---------------------------------------------------------------------------
# 辅助:项目包名列表(与 flow_analyzer 中 _PROJECT_PACKAGES 一致)
# ---------------------------------------------------------------------------
_PROJECT_PACKAGES_LIST = [
"cli", "config", "api", "database", "tasks", "loaders",
"scd", "orchestration", "quality", "models", "utils",
"gui", "scripts",
]
# ---------------------------------------------------------------------------
# Property 9: 流程树节点 source_file 有效性
# Feature: repo-audit, Property 9: 流程树节点 source_file 有效性
# Validates: Requirements 2.7
#
# 策略:在临时目录中随机生成 1~5 个项目内部模块文件,
# 其中一个作为入口,其他文件通过 import 语句相互引用。
# 构建流程树后,遍历所有节点验证 source_file 非空且文件存在。
# ---------------------------------------------------------------------------
def _collect_all_nodes(node: FlowNode) -> list[FlowNode]:
"""递归收集流程树中所有节点。"""
result = [node]
for child in node.children:
result.extend(_collect_all_nodes(child))
return result
# 生成合法的 Python 标识符作为模块文件名
_module_name_st = st.from_regex(r"[a-z][a-z0-9_]{0,8}", fullmatch=True).filter(
lambda s: s not in {"__init__", ""}
)
@st.composite
def project_layout(draw):
"""生成一个随机的项目布局:包名、模块文件名列表、以及模块间的 import 关系。
返回 (package, module_names, imports_map)
- package: 项目包名(如 "cli"
- module_names: 模块文件名列表(不含 .py 后缀),第一个为入口
- imports_map: dict[str, list[str]],每个模块导入的其他模块列表
"""
package = draw(st.sampled_from(_PROJECT_PACKAGES_LIST))
n_modules = draw(st.integers(min_value=1, max_value=5))
module_names = draw(
st.lists(
_module_name_st,
min_size=n_modules,
max_size=n_modules,
unique=True,
)
)
# 确保至少有一个模块
assume(len(module_names) >= 1)
# 为每个模块随机选择要导入的其他模块(子集)
imports_map: dict[str, list[str]] = {}
for i, mod in enumerate(module_names):
# 只能导入列表中的其他模块
others = [m for m in module_names if m != mod]
if others:
imported = draw(
st.lists(st.sampled_from(others), max_size=len(others), unique=True)
)
else:
imported = []
imports_map[mod] = imported
return package, module_names, imports_map
@given(layout=project_layout())
@settings(max_examples=100)
def test_property9_flow_tree_source_file_validity(layout, tmp_path_factory):
"""Property 9: 流程树中每个节点的 source_file 非空且对应文件在仓库中实际存在。
**Feature: repo-audit, Property 9: 流程树节点 source_file 有效性**
**Validates: Requirements 2.7**
"""
package, module_names, imports_map = layout
tmp_path = tmp_path_factory.mktemp("prop9")
# 创建包目录和 __init__.py
pkg_dir = tmp_path / package
pkg_dir.mkdir(parents=True, exist_ok=True)
(pkg_dir / "__init__.py").write_text("", encoding="utf-8")
# 创建每个模块文件,写入 import 语句
for mod in module_names:
lines = []
for imp in imports_map[mod]:
lines.append(f"from {package}.{imp} import *")
lines.append("") # 确保文件非空
(pkg_dir / f"{mod}.py").write_text("\n".join(lines), encoding="utf-8")
# 以第一个模块为入口构建流程树
entry_rel = f"{package}/{module_names[0]}.py"
tree = build_flow_tree(tmp_path, entry_rel)
# 遍历所有节点,验证 source_file 有效性
all_nodes = _collect_all_nodes(tree)
for node in all_nodes:
# source_file 应为非空字符串
assert isinstance(node.source_file, str), (
f"source_file 应为字符串,实际为 {type(node.source_file)}"
)
assert node.source_file != "", "source_file 不应为空字符串"
# 对应文件应在仓库中实际存在
full_path = tmp_path / node.source_file
assert full_path.exists(), (
f"source_file '{node.source_file}' 对应的文件不存在: {full_path}"
)
# ---------------------------------------------------------------------------
# Property 10: 孤立模块检测正确性
# Feature: repo-audit, Property 10: 孤立模块检测正确性
# Validates: Requirements 2.8
#
# 策略:生成随机的 FileEntry 列表(模拟项目中的 .py 文件),
# 生成随机的 reachable 集合(是 FileEntry 路径的子集),
# 调用 find_orphan_modules 验证:
# 1. 返回的每个孤立模块都不在 reachable 集合中
# 2. reachable 集合中的每个模块都不在孤立列表中
#
# 注意find_orphan_modules 会排除 __init__.py、tests/、scripts/audit/ 下的文件,
# 以及不属于 _PROJECT_PACKAGES 的子目录文件。生成器需要考虑这些排除规则。
# ---------------------------------------------------------------------------
# 生成属于项目包的 .py 文件路径(排除被 find_orphan_modules 忽略的路径)
_eligible_packages = [
p for p in _PROJECT_PACKAGES_LIST
if p not in ("scripts",) # scripts 下只有 scripts/audit/ 会被排除,但为简化直接排除
]
@st.composite
def orphan_test_data(draw):
"""生成 (file_entries, reachable_set) 用于测试 find_orphan_modules。
只生成"合格"的文件条目(属于项目包、非 __init__.py、非 tests/、非 scripts/audit/
这样可以精确验证 reachable 与 orphan 的互斥关系。
"""
# 生成 1~10 个合格的 .py 文件路径
n_files = draw(st.integers(min_value=1, max_value=10))
paths: list[str] = []
for _ in range(n_files):
pkg = draw(st.sampled_from(_eligible_packages))
fname = draw(_module_name_st)
path = f"{pkg}/{fname}.py"
paths.append(path)
# 去重
paths = list(dict.fromkeys(paths))
assume(len(paths) >= 1)
# 构建 FileEntry 列表
entries = [
FileEntry(rel_path=p, is_dir=False, size_bytes=100, extension=".py", is_empty_dir=False)
for p in paths
]
# 随机选择一个子集作为 reachable
reachable = set(draw(
st.lists(st.sampled_from(paths), max_size=len(paths), unique=True)
))
return entries, reachable
@given(data=orphan_test_data())
@settings(max_examples=100)
def test_property10_orphan_module_detection(data, tmp_path_factory):
"""Property 10: 孤立模块与可达模块互斥——孤立列表中的模块不在 reachable 中,
reachable 中的模块不在孤立列表中。
**Feature: repo-audit, Property 10: 孤立模块检测正确性**
**Validates: Requirements 2.8**
"""
entries, reachable = data
tmp_path = tmp_path_factory.mktemp("prop10")
orphans = find_orphan_modules(tmp_path, entries, reachable)
orphan_set = set(orphans)
# 验证 1: 孤立模块不应出现在 reachable 集合中
overlap = orphan_set & reachable
assert overlap == set(), (
f"孤立模块与可达集合存在交集: {overlap}"
)
# 验证 2: reachable 中的模块不应出现在孤立列表中
for r in reachable:
assert r not in orphan_set, (
f"可达模块 '{r}' 不应出现在孤立列表中"
)
# 验证 3: 孤立列表应已排序
assert orphans == sorted(orphans), "孤立模块列表应按路径排序"