256 lines
8.7 KiB
Python
256 lines
8.7 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
审计主入口 — 依次调用扫描器和三个分析器,生成三份报告到 docs/audit/repo/。
|
||
|
||
仅在 docs/audit/repo/ 目录下创建文件,不修改仓库中的任何现有文件。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import re
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from scripts.audit.scanner import scan_repo
|
||
from scripts.audit.inventory_analyzer import (
|
||
build_inventory,
|
||
render_inventory_report,
|
||
)
|
||
from scripts.audit.flow_analyzer import (
|
||
build_flow_tree,
|
||
discover_entry_points,
|
||
find_orphan_modules,
|
||
render_flow_report,
|
||
)
|
||
from scripts.audit.doc_alignment_analyzer import (
|
||
build_mappings,
|
||
check_api_samples_vs_parsers,
|
||
check_ddl_vs_dictionary,
|
||
find_undocumented_modules,
|
||
render_alignment_report,
|
||
scan_docs,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 仓库根目录自动检测
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _detect_repo_root() -> Path:
|
||
"""从当前文件向上查找仓库根目录。
|
||
|
||
判断依据:包含 cli/ 目录或 .git/ 目录的祖先目录。
|
||
"""
|
||
current = Path(__file__).resolve().parent
|
||
for parent in (current, *current.parents):
|
||
if (parent / "cli").is_dir() or (parent / ".git").is_dir():
|
||
return parent
|
||
# 回退:假设 scripts/audit/ 在仓库根目录下
|
||
return current.parent.parent
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 报告输出目录
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _ensure_report_dir(repo_root: Path) -> Path:
|
||
"""检查并创建 docs/audit/repo/ 目录。
|
||
|
||
如果目录已存在则直接返回;不存在则创建。
|
||
创建失败时抛出 RuntimeError(因为无法输出报告)。
|
||
"""
|
||
audit_dir = repo_root / "docs" / "audit" / "repo"
|
||
if audit_dir.is_dir():
|
||
return audit_dir
|
||
try:
|
||
audit_dir.mkdir(parents=True, exist_ok=True)
|
||
except OSError as exc:
|
||
raise RuntimeError(f"无法创建报告输出目录 {audit_dir}: {exc}") from exc
|
||
logger.info("已创建报告输出目录: %s", audit_dir)
|
||
return audit_dir
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 报告头部元信息注入
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_HEADER_PATTERN = re.compile(r"生成时间[::]")
|
||
_ISO_TS_PATTERN = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z")
|
||
# 匹配非 ISO 格式的时间戳行,用于替换
|
||
_NON_ISO_TS_LINE = re.compile(
|
||
r"([-*]\s*生成时间[::]\s*)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}"
|
||
)
|
||
|
||
|
||
def _inject_header(report: str, timestamp: str, repo_path: str) -> str:
|
||
"""确保报告头部包含 ISO 格式时间戳和仓库路径。
|
||
|
||
- 已有 ISO 时间戳 → 不修改
|
||
- 有非 ISO 时间戳 → 替换为 ISO 格式
|
||
- 无头部 → 在标题后注入
|
||
"""
|
||
if _HEADER_PATTERN.search(report):
|
||
# 已有头部——检查时间戳格式是否为 ISO
|
||
if _ISO_TS_PATTERN.search(report):
|
||
return report
|
||
# 非 ISO 格式 → 替换时间戳
|
||
report = _NON_ISO_TS_LINE.sub(
|
||
lambda m: m.group(1) + timestamp, report,
|
||
)
|
||
# 同时确保仓库路径使用统一值(用 lambda 避免反斜杠转义问题)
|
||
safe_path = repo_path
|
||
report = re.sub(
|
||
r"([-*]\s*仓库路径[::]\s*)`[^`]*`",
|
||
lambda m: m.group(1) + "`" + safe_path + "`",
|
||
report,
|
||
)
|
||
return report
|
||
|
||
# 无头部 → 在第一个标题行之后插入
|
||
lines = report.split("\n")
|
||
insert_idx = 1
|
||
for i, line in enumerate(lines):
|
||
if line.startswith("# "):
|
||
insert_idx = i + 1
|
||
break
|
||
|
||
header_lines = [
|
||
"",
|
||
f"- 生成时间: {timestamp}",
|
||
f"- 仓库路径: `{repo_path}`",
|
||
"",
|
||
]
|
||
lines[insert_idx:insert_idx] = header_lines
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 主函数
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_audit(repo_root: Path | None = None) -> None:
|
||
"""执行完整审计流程,生成三份报告到 docs/audit/repo/。
|
||
|
||
Parameters
|
||
----------
|
||
repo_root : Path | None
|
||
仓库根目录。为 None 时自动检测。
|
||
"""
|
||
# 1. 确定仓库根目录
|
||
if repo_root is None:
|
||
repo_root = _detect_repo_root()
|
||
repo_root = repo_root.resolve()
|
||
repo_path_str = str(repo_root)
|
||
|
||
logger.info("审计开始 — 仓库路径: %s", repo_path_str)
|
||
|
||
# 2. 检查/创建输出目录
|
||
audit_dir = _ensure_report_dir(repo_root)
|
||
|
||
# 3. 生成 UTC 时间戳(所有报告共用)
|
||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
# 4. 扫描仓库
|
||
logger.info("正在扫描仓库文件...")
|
||
entries = scan_repo(repo_root)
|
||
logger.info("扫描完成,共 %d 个条目", len(entries))
|
||
|
||
# 5. 文件清单报告
|
||
logger.info("正在生成文件清单报告...")
|
||
try:
|
||
inventory_items = build_inventory(entries)
|
||
inventory_report = render_inventory_report(inventory_items, repo_path_str)
|
||
inventory_report = _inject_header(inventory_report, timestamp, repo_path_str)
|
||
(audit_dir / "file_inventory.md").write_text(
|
||
inventory_report, encoding="utf-8",
|
||
)
|
||
logger.info("文件清单报告已写入: file_inventory.md")
|
||
except Exception:
|
||
logger.exception("生成文件清单报告时出错")
|
||
|
||
# 6. 流程树报告
|
||
logger.info("正在生成流程树报告...")
|
||
try:
|
||
entry_points = discover_entry_points(repo_root)
|
||
trees = []
|
||
reachable: set[str] = set()
|
||
for ep in entry_points:
|
||
ep_file = ep["file"]
|
||
# 批处理文件不构建流程树
|
||
if not ep_file.endswith(".py"):
|
||
continue
|
||
tree = build_flow_tree(repo_root, ep_file)
|
||
trees.append(tree)
|
||
# 收集可达模块
|
||
_collect_reachable(tree, reachable)
|
||
|
||
orphans = find_orphan_modules(repo_root, entries, reachable)
|
||
flow_report = render_flow_report(trees, orphans, repo_path_str)
|
||
flow_report = _inject_header(flow_report, timestamp, repo_path_str)
|
||
(audit_dir / "flow_tree.md").write_text(
|
||
flow_report, encoding="utf-8",
|
||
)
|
||
logger.info("流程树报告已写入: flow_tree.md")
|
||
except Exception:
|
||
logger.exception("生成流程树报告时出错")
|
||
|
||
# 7. 文档对齐报告
|
||
logger.info("正在生成文档对齐报告...")
|
||
try:
|
||
doc_paths = scan_docs(repo_root)
|
||
mappings = build_mappings(doc_paths, repo_root)
|
||
|
||
issues = []
|
||
issues.extend(check_ddl_vs_dictionary(repo_root))
|
||
issues.extend(check_api_samples_vs_parsers(repo_root))
|
||
|
||
# 缺失文档检测
|
||
documented: set[str] = set()
|
||
for m in mappings:
|
||
documented.update(m.related_code)
|
||
undoc_modules = find_undocumented_modules(repo_root, documented)
|
||
from scripts.audit import AlignmentIssue
|
||
for mod in undoc_modules:
|
||
issues.append(AlignmentIssue(
|
||
doc_path="—",
|
||
issue_type="missing",
|
||
description=f"核心代码模块 `{mod}` 缺少对应文档",
|
||
related_code=mod,
|
||
))
|
||
|
||
alignment_report = render_alignment_report(mappings, issues, repo_path_str)
|
||
alignment_report = _inject_header(alignment_report, timestamp, repo_path_str)
|
||
(audit_dir / "doc_alignment.md").write_text(
|
||
alignment_report, encoding="utf-8",
|
||
)
|
||
logger.info("文档对齐报告已写入: doc_alignment.md")
|
||
except Exception:
|
||
logger.exception("生成文档对齐报告时出错")
|
||
|
||
logger.info("审计完成 — 报告输出目录: %s", audit_dir)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 辅助:收集可达模块
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _collect_reachable(node, reachable: set[str]) -> None:
|
||
"""递归收集流程树中所有节点的 source_file。"""
|
||
reachable.add(node.source_file)
|
||
for child in node.children:
|
||
_collect_reachable(child, reachable)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 入口
|
||
# ---------------------------------------------------------------------------
|
||
|
||
if __name__ == "__main__":
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||
)
|
||
run_audit()
|