# -*- coding: utf-8 -*- """ 审计主入口 — 依次调用扫描器和三个分析器,生成三份报告到 docs/audit/repo/。 仅在 docs/audit/repo/ 目录下创建文件,不修改仓库中的任何现有文件。 """ from __future__ import annotations import logging import re from datetime import datetime, timezone from pathlib import Path from scripts.audit.scanner import scan_repo from scripts.audit.inventory_analyzer import ( build_inventory, render_inventory_report, ) from scripts.audit.flow_analyzer import ( build_flow_tree, discover_entry_points, find_orphan_modules, render_flow_report, ) from scripts.audit.doc_alignment_analyzer import ( build_mappings, check_api_samples_vs_parsers, check_ddl_vs_dictionary, find_undocumented_modules, render_alignment_report, scan_docs, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # 仓库根目录自动检测 # --------------------------------------------------------------------------- def _detect_repo_root() -> Path: """从当前文件向上查找仓库根目录。 判断依据:包含 cli/ 目录或 .git/ 目录的祖先目录。 """ current = Path(__file__).resolve().parent for parent in (current, *current.parents): if (parent / "cli").is_dir() or (parent / ".git").is_dir(): return parent # 回退:假设 scripts/audit/ 在仓库根目录下 return current.parent.parent # --------------------------------------------------------------------------- # 报告输出目录 # --------------------------------------------------------------------------- def _ensure_report_dir(repo_root: Path) -> Path: """检查并创建 docs/audit/repo/ 目录。 如果目录已存在则直接返回;不存在则创建。 创建失败时抛出 RuntimeError(因为无法输出报告)。 """ audit_dir = repo_root / "docs" / "audit" / "repo" if audit_dir.is_dir(): return audit_dir try: audit_dir.mkdir(parents=True, exist_ok=True) except OSError as exc: raise RuntimeError(f"无法创建报告输出目录 {audit_dir}: {exc}") from exc logger.info("已创建报告输出目录: %s", audit_dir) return audit_dir # --------------------------------------------------------------------------- # 报告头部元信息注入 # --------------------------------------------------------------------------- _HEADER_PATTERN = re.compile(r"生成时间[::]") _ISO_TS_PATTERN = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z") # 匹配非 ISO 格式的时间戳行,用于替换 _NON_ISO_TS_LINE = re.compile( r"([-*]\s*生成时间[::]\s*)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}" ) def _inject_header(report: str, timestamp: str, repo_path: str) -> str: """确保报告头部包含 ISO 格式时间戳和仓库路径。 - 已有 ISO 时间戳 → 不修改 - 有非 ISO 时间戳 → 替换为 ISO 格式 - 无头部 → 在标题后注入 """ if _HEADER_PATTERN.search(report): # 已有头部——检查时间戳格式是否为 ISO if _ISO_TS_PATTERN.search(report): return report # 非 ISO 格式 → 替换时间戳 report = _NON_ISO_TS_LINE.sub( lambda m: m.group(1) + timestamp, report, ) # 同时确保仓库路径使用统一值(用 lambda 避免反斜杠转义问题) safe_path = repo_path report = re.sub( r"([-*]\s*仓库路径[::]\s*)`[^`]*`", lambda m: m.group(1) + "`" + safe_path + "`", report, ) return report # 无头部 → 在第一个标题行之后插入 lines = report.split("\n") insert_idx = 1 for i, line in enumerate(lines): if line.startswith("# "): insert_idx = i + 1 break header_lines = [ "", f"- 生成时间: {timestamp}", f"- 仓库路径: `{repo_path}`", "", ] lines[insert_idx:insert_idx] = header_lines return "\n".join(lines) # --------------------------------------------------------------------------- # 主函数 # --------------------------------------------------------------------------- def run_audit(repo_root: Path | None = None) -> None: """执行完整审计流程,生成三份报告到 docs/audit/repo/。 Parameters ---------- repo_root : Path | None 仓库根目录。为 None 时自动检测。 """ # 1. 确定仓库根目录 if repo_root is None: repo_root = _detect_repo_root() repo_root = repo_root.resolve() repo_path_str = str(repo_root) logger.info("审计开始 — 仓库路径: %s", repo_path_str) # 2. 检查/创建输出目录 audit_dir = _ensure_report_dir(repo_root) # 3. 生成 UTC 时间戳(所有报告共用) timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") # 4. 扫描仓库 logger.info("正在扫描仓库文件...") entries = scan_repo(repo_root) logger.info("扫描完成,共 %d 个条目", len(entries)) # 5. 文件清单报告 logger.info("正在生成文件清单报告...") try: inventory_items = build_inventory(entries) inventory_report = render_inventory_report(inventory_items, repo_path_str) inventory_report = _inject_header(inventory_report, timestamp, repo_path_str) (audit_dir / "file_inventory.md").write_text( inventory_report, encoding="utf-8", ) logger.info("文件清单报告已写入: file_inventory.md") except Exception: logger.exception("生成文件清单报告时出错") # 6. 流程树报告 logger.info("正在生成流程树报告...") try: entry_points = discover_entry_points(repo_root) trees = [] reachable: set[str] = set() for ep in entry_points: ep_file = ep["file"] # 批处理文件不构建流程树 if not ep_file.endswith(".py"): continue tree = build_flow_tree(repo_root, ep_file) trees.append(tree) # 收集可达模块 _collect_reachable(tree, reachable) orphans = find_orphan_modules(repo_root, entries, reachable) flow_report = render_flow_report(trees, orphans, repo_path_str) flow_report = _inject_header(flow_report, timestamp, repo_path_str) (audit_dir / "flow_tree.md").write_text( flow_report, encoding="utf-8", ) logger.info("流程树报告已写入: flow_tree.md") except Exception: logger.exception("生成流程树报告时出错") # 7. 文档对齐报告 logger.info("正在生成文档对齐报告...") try: doc_paths = scan_docs(repo_root) mappings = build_mappings(doc_paths, repo_root) issues = [] issues.extend(check_ddl_vs_dictionary(repo_root)) issues.extend(check_api_samples_vs_parsers(repo_root)) # 缺失文档检测 documented: set[str] = set() for m in mappings: documented.update(m.related_code) undoc_modules = find_undocumented_modules(repo_root, documented) from scripts.audit import AlignmentIssue for mod in undoc_modules: issues.append(AlignmentIssue( doc_path="—", issue_type="missing", description=f"核心代码模块 `{mod}` 缺少对应文档", related_code=mod, )) alignment_report = render_alignment_report(mappings, issues, repo_path_str) alignment_report = _inject_header(alignment_report, timestamp, repo_path_str) (audit_dir / "doc_alignment.md").write_text( alignment_report, encoding="utf-8", ) logger.info("文档对齐报告已写入: doc_alignment.md") except Exception: logger.exception("生成文档对齐报告时出错") logger.info("审计完成 — 报告输出目录: %s", audit_dir) # --------------------------------------------------------------------------- # 辅助:收集可达模块 # --------------------------------------------------------------------------- def _collect_reachable(node, reachable: set[str]) -> None: """递归收集流程树中所有节点的 source_file。""" reachable.add(node.source_file) for child in node.children: _collect_reachable(child, reachable) # --------------------------------------------------------------------------- # 入口 # --------------------------------------------------------------------------- if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) run_audit()