#!/usr/bin/env python3 """审计一览表生成脚本 — 解析模块 从 docs/audit/changes/ 目录扫描审计源记录 Markdown 文件, 提取结构化信息(日期、标题、修改文件、风险等级、变更类型、影响模块)。 """ from __future__ import annotations import os import re from dataclasses import dataclass, field from pathlib import Path # --------------------------------------------------------------------------- # 常量 # --------------------------------------------------------------------------- # 文件名格式:YYYY-MM-DD__slug.md _FILENAME_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})__(.+)\.md$") # 文件路径 → 功能模块映射(按最长前缀优先匹配) MODULE_MAP: dict[str, str] = { "api/": "API 层", "tasks/ods": "ODS 层", "tasks/dwd": "DWD 层", "tasks/dws": "DWS 层", "tasks/index": "指数算法", "loaders/": "数据装载", "database/": "数据库", "orchestration/": "调度", "config/": "配置", "cli/": "CLI", "models/": "模型", "scd/": "SCD2", "docs/": "文档", "scripts/": "脚本工具", "tests/": "测试", "quality/": "质量校验", "gui/": "GUI", "utils/": "工具库", } # 按前缀长度降序排列,确保最长前缀优先匹配 _SORTED_PREFIXES: list[tuple[str, str]] = sorted( MODULE_MAP.items(), key=lambda kv: len(kv[0]), reverse=True ) # 所有合法模块名称(含兜底"其他") VALID_MODULES: frozenset[str] = frozenset(MODULE_MAP.values()) | {"其他"} # --------------------------------------------------------------------------- # 数据类 # --------------------------------------------------------------------------- @dataclass class AuditEntry: """从单个审计源记录文件解析出的结构化数据""" date: str # YYYY-MM-DD,从文件名提取 slug: str # 文件名中 __ 后的标识符 title: str # Markdown 一级标题 filename: str # 源文件名(不含路径) changed_files: list[str] = field(default_factory=list) # 修改的文件路径列表 modules: set[str] = field(default_factory=set) # 影响的功能模块集合 risk_level: str = "未知" # 风险等级:高/中/低/极低 change_type: str = "功能" # 变更类型:bugfix/功能/文档/重构/清理 projects: set[str] = field(default_factory=set) # 所属项目集合 # --------------------------------------------------------------------------- # 项目归属分类 # --------------------------------------------------------------------------- # 文件路径 → 项目归属映射(按最长前缀优先匹配) PROJECT_MAP: dict[str, str] = { "apps/etl/connectors/feiqiu/": "ETL-feiqiu", "apps/backend/": "后端", "apps/admin-web/": "管理后台", "apps/miniprogram/": "小程序", "gui/": "桌面GUI", "packages/shared/": "共享包", "db/etl_feiqiu/": "ETL-feiqiu", "db/zqyy_app/": "后端", "db/fdw/": "跨库(FDW)", "db/": "数据库", } _SORTED_PROJECT_PREFIXES: list[tuple[str, str]] = sorted( PROJECT_MAP.items(), key=lambda kv: len(kv[0]), reverse=True ) def classify_project(filepath: str) -> str: """根据 PROJECT_MAP 将文件路径映射到所属项目。 对于 monorepo 迁移前的历史审计记录,changed_files 可能是 ETL 内部相对路径 (如 api/xxx.py、tasks/dwd/xxx.py),此时回退匹配 MODULE_MAP 前缀归为 ETL-feiqiu。 无任何前缀命中时返回 "项目级"。 """ normalized = filepath.replace("\\", "/").lstrip("./") for prefix, project_name in _SORTED_PROJECT_PREFIXES: if normalized.startswith(prefix): return project_name # 回退:匹配 ETL 内部模块前缀(历史记录兼容) for prefix in MODULE_MAP: if normalized.startswith(prefix): return "ETL-feiqiu" return "项目级" # --------------------------------------------------------------------------- # 模块分类 # --------------------------------------------------------------------------- def classify_module(filepath: str) -> str: """根据 MODULE_MAP 将文件路径映射到功能模块。 匹配规则:按前缀长度降序逐一比较,首个命中即返回。 无任何前缀命中时返回 "其他"。 """ # 统一为正斜杠,去除前导 ./ 或 / normalized = filepath.replace("\\", "/").lstrip("./") for prefix, module_name in _SORTED_PREFIXES: if normalized.startswith(prefix): return module_name return "其他" # --------------------------------------------------------------------------- # 解析辅助函数 # --------------------------------------------------------------------------- def _extract_title(content: str) -> str | None: """从 Markdown 内容中提取第一个一级标题(# ...)。""" for line in content.splitlines(): stripped = line.strip() if stripped.startswith("# "): return stripped[2:].strip() return None # 匹配"修改文件清单"/"文件清单"/"Changed"/"变更范围"/"变更摘要" 等章节标题 _FILE_SECTION_RE = re.compile( r"^##\s+.*(修改文件|文件清单|Changed|变更范围|变更摘要).*$", re.IGNORECASE, ) # 从表格行提取文件路径:| `path` | ... 或 | path | ... _TABLE_FILE_RE = re.compile( r"^\|\s*`?([^`|]+?)`?\s*\|" ) # 从列表行提取文件路径:- path 或 - `path`(忽略纯描述行) _LIST_FILE_RE = re.compile( r"^[-*]\s+`?([^\s`(]+\.[a-zA-Z0-9_]+)`?" ) # 从含 → 的行提取源路径和目标路径 _ARROW_PATH_RE = re.compile( r"`([^`]+?)`\s*→\s*`([^`]+?)`" ) # 子章节标题(### ...),用于在文件清单章节内继续扫描 _SUB_HEADING_RE = re.compile(r"^###\s+") def _extract_changed_files(content: str) -> list[str]: """从审计文件内容中提取修改文件路径列表。 扫描策略: 1. 找到"修改文件清单"/"文件清单"/"Changed"/"变更范围"等二级章节 2. 在该章节内解析表格行和列表行中的文件路径 3. 遇到下一个同级(##)章节时停止 """ lines = content.splitlines() results: list[str] = [] in_section = False for line in lines: stripped = line.strip() if _FILE_SECTION_RE.match(stripped): in_section = True continue # 遇到下一个二级章节,退出扫描 if in_section and stripped.startswith("## ") and not _FILE_SECTION_RE.match(stripped): break if not in_section: continue # 跳过表头分隔行 if re.match(r"^\|[-\s|:]+\|$", stripped): continue # 跳过子章节标题(### 新增文件 等),但继续扫描 if _SUB_HEADING_RE.match(stripped): continue # 尝试表格行 m = _TABLE_FILE_RE.match(stripped) if m: path = m.group(1).strip() # 排除表头行("文件"、"文件/对象" 等) if path and not re.match(r"^(文件|File|路径|对象)", path, re.IGNORECASE): results.append(path) continue # 尝试含 → 的移动/重命名行(提取源和目标路径) m_arrow = _ARROW_PATH_RE.search(stripped) if m_arrow: src, dst = m_arrow.group(1).strip(), m_arrow.group(2).strip() if "/" in src: results.append(src) if "/" in dst: results.append(dst) continue # 尝试列表行 m = _LIST_FILE_RE.match(stripped) if m: path = m.group(1).strip() if path and "/" in path: results.append(path) continue return results # 风险等级关键词(按优先级排列) _RISK_KEYWORDS: list[tuple[str, str]] = [ ("极低", "极低"), ("低", "低"), ("中", "中"), ("高", "高"), ] # 匹配风险相关章节标题 _RISK_SECTION_RE = re.compile( r"^##\s+.*(风险|Risk).*$", re.IGNORECASE ) def _extract_risk_level(content: str) -> str: """从审计文件内容中提取风险等级。 扫描策略(按优先级): 1. 头部元数据行:`- 风险等级:低` 或 `- 风险:极低` 2. 风险相关二级章节内的关键词 3. 兜底:全文搜索含"风险"的行 """ lines = content.splitlines() # 策略 1:头部元数据(通常在前 15 行内) _meta_risk_re = re.compile(r"^-\s*风险[等级]*[::]\s*(.+)$") for line in lines[:15]: m = _meta_risk_re.match(line.strip()) if m: val = m.group(1) if "极低" in val: return "极低" if "高" in val: return "高" if "中" in val: return "中" if "低" in val: return "低" # 策略 2:风险相关二级章节 in_section = False section_text = "" for line in lines: stripped = line.strip() if _RISK_SECTION_RE.match(stripped): in_section = True continue if in_section and stripped.startswith("## "): break if in_section: section_text += stripped + " " # 策略 3:兜底全文搜索含"风险"的行 if not section_text: for line in lines: if "风险" in line: section_text += line.strip() + " " if not section_text: return "未知" # 按优先级匹配:先检查"极低",再检查独立的"高/中/低" if "极低" in section_text: return "极低" if re.search(r"风险[::]\s*高|高风险", section_text): return "高" if re.search(r"风险[::]\s*中|中等风险", section_text): return "中" # "纯文档" 等描述中含"低"但不含"极低"时匹配为"低" if re.search(r"风险[::]\s*低|低风险|风险.*低", section_text): return "低" # 推断:描述中含"纯文档/无运行时影响/纯分析"等表述视为极低 if re.search(r"纯文档|无运行时影响|纯分析|无逻辑改动|无代码", section_text): return "极低" return "未知" # 变更类型推断关键词 _CHANGE_TYPE_PATTERNS: list[tuple[str, str]] = [ ("bugfix", "bugfix"), ("bug", "bugfix"), ("修复", "bugfix"), ("重构", "重构"), ("清理", "清理"), ("纯文档", "文档"), ("无逻辑改动", "文档"), ("文档", "文档"), ] def _infer_change_type(content: str) -> str: """从审计文件内容推断变更类型。 按优先级扫描关键词,首个命中即返回。 默认返回 "功能"。 """ lower = content.lower() for keyword, ctype in _CHANGE_TYPE_PATTERNS: if keyword in lower: return ctype return "功能" # --------------------------------------------------------------------------- # 核心解析函数 # --------------------------------------------------------------------------- def parse_audit_file(filepath: str | Path) -> AuditEntry | None: """解析单个审计源记录文件,返回 AuditEntry。 文件名必须符合 YYYY-MM-DD__slug.md 格式,否则返回 None 并打印警告。 """ filepath = Path(filepath) filename = filepath.name # 校验文件名格式 m = _FILENAME_RE.match(filename) if not m: print(f"[警告] 文件名格式不符,已跳过:{filename}") return None date_str = m.group(1) slug = m.group(2) # 读取文件内容 try: content = filepath.read_text(encoding="utf-8") except (UnicodeDecodeError, OSError) as exc: print(f"[警告] 无法读取文件,已跳过:{filename}({exc})") return None # 提取标题(缺失时用 slug 兜底) title = _extract_title(content) or slug # 提取修改文件列表 changed_files = _extract_changed_files(content) # 推导影响模块 if changed_files: modules = {classify_module(f) for f in changed_files} projects = {classify_project(f) for f in changed_files} else: modules = {"其他"} projects = {"项目级"} # 提取风险等级 risk_level = _extract_risk_level(content) # 推断变更类型 change_type = _infer_change_type(content) return AuditEntry( date=date_str, slug=slug, title=title, filename=filename, changed_files=changed_files, modules=modules, risk_level=risk_level, change_type=change_type, projects=projects, ) def scan_audit_dir(dirpath: str | Path) -> list[AuditEntry]: """扫描审计目录,返回按日期倒序排列的 AuditEntry 列表。 跳过非 .md 文件和格式不合规的文件。 目录为空或不存在时返回空列表。 """ dirpath = Path(dirpath) if not dirpath.is_dir(): return [] entries: list[AuditEntry] = [] for child in sorted(dirpath.iterdir()): if not child.is_file() or child.suffix != ".md": continue entry = parse_audit_file(child) if entry is not None: entries.append(entry) # 按日期倒序 entries.sort(key=lambda e: e.date, reverse=True) return entries # --------------------------------------------------------------------------- # 渲染函数 # --------------------------------------------------------------------------- def render_timeline_table(entries: list[AuditEntry]) -> str: """按时间倒序生成 Markdown 表格。 输入的 entries 应已按日期倒序排列(由 scan_audit_dir 保证)。 空列表时返回"暂无审计记录"提示。 """ if not entries: return "> 暂无审计记录\n" lines: list[str] = [ "| 日期 | 项目 | 需求摘要 | 变更类型 | 影响模块 | 风险 | 详情 |", "|------|------|----------|----------|----------|------|------|", ] for e in entries: modules_str = ", ".join(sorted(e.modules)) projects_str = ", ".join(sorted(e.projects)) link = f"[链接](changes/{e.filename})" lines.append( f"| {e.date} | {projects_str} | {e.title} | {e.change_type} | {modules_str} | {e.risk_level} | {link} |" ) return "\n".join(lines) + "\n" def render_module_index(entries: list[AuditEntry]) -> str: """按模块分组生成 Markdown 章节。 每个模块一个三级标题 + 表格,模块按字母序排列。 空列表时返回"暂无审计记录"提示。 """ if not entries: return "> 暂无审计记录\n" # 按模块分组 module_entries: dict[str, list[AuditEntry]] = {} for e in entries: for mod in e.modules: module_entries.setdefault(mod, []).append(e) sections: list[str] = [] for mod in sorted(module_entries.keys()): mod_list = module_entries[mod] section_lines: list[str] = [ f"### {mod}", "", "| 日期 | 需求摘要 | 变更类型 | 风险 | 详情 |", "|------|----------|----------|------|------|", ] for e in mod_list: link = f"[链接](changes/{e.filename})" section_lines.append( f"| {e.date} | {e.title} | {e.change_type} | {e.risk_level} | {link} |" ) sections.append("\n".join(section_lines) + "\n") return "\n".join(sections) def render_project_index(entries: list[AuditEntry]) -> str: """按项目分组生成 Markdown 章节。 每个项目一个三级标题 + 表格,项目按固定顺序排列。 空列表时返回"暂无审计记录"提示。 """ if not entries: return "> 暂无审计记录\n" # 固定排序:ETL 连接器优先,然后各 APP,最后项目级 _PROJECT_ORDER = [ "ETL-feiqiu", "后端", "管理后台", "小程序", "桌面GUI", "共享包", "跨库(FDW)", "数据库", "项目级", ] project_entries: dict[str, list[AuditEntry]] = {} for e in entries: for proj in e.projects: project_entries.setdefault(proj, []).append(e) def sort_key(name: str) -> int: try: return _PROJECT_ORDER.index(name) except ValueError: return len(_PROJECT_ORDER) sections: list[str] = [] for proj in sorted(project_entries.keys(), key=sort_key): proj_list = project_entries[proj] section_lines: list[str] = [ f"### {proj}", "", "| 日期 | 需求摘要 | 变更类型 | 影响模块 | 风险 | 详情 |", "|------|----------|----------|----------|------|------|", ] for e in proj_list: modules_str = ", ".join(sorted(e.modules)) link = f"[链接](changes/{e.filename})" section_lines.append( f"| {e.date} | {e.title} | {e.change_type} | {modules_str} | {e.risk_level} | {link} |" ) sections.append("\n".join(section_lines) + "\n") return "\n".join(sections) def render_dashboard(entries: list[AuditEntry]) -> str: """组合时间线和模块索引生成完整 dashboard Markdown 文档。 包含:标题、生成时间戳、时间线视图、模块索引视图。 """ from datetime import datetime timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") parts: list[str] = [ "# 审计一览表", "", f"> 自动生成于 {timestamp},请勿手动编辑。", "", "## 时间线视图", "", render_timeline_table(entries), "## 项目索引", "", render_project_index(entries), "## 模块索引", "", render_module_index(entries), ] return "\n".join(parts) # --------------------------------------------------------------------------- # 主入口 # --------------------------------------------------------------------------- def main() -> None: """扫描审计源记录 → 解析 → 渲染 → 写入 audit_dashboard.md。""" audit_dir = Path("docs/audit/changes") output_path = Path("docs/audit/audit_dashboard.md") # 扫描并解析 entries = scan_audit_dir(audit_dir) # 渲染完整 dashboard content = render_dashboard(entries) # 确保输出目录存在 output_path.parent.mkdir(parents=True, exist_ok=True) # 写入文件 output_path.write_text(content, encoding="utf-8") # 输出摘要 print(f"已解析 {len(entries)} 条审计记录") print(f"输出文件:{output_path}") if __name__ == "__main__": main()