#!/usr/bin/env python3 """审计预扫描 — 识别变更文件、分类风险、合规检查 合并自 .kiro/scripts/audit_flagger.py + change_compliance_prescan.py, 去掉 .kiro/state 依赖,直接输出 JSON 到 stdout 供 /audit 命令读取。 用法: python scripts/audit/prescan.py python scripts/audit/prescan.py --files "apps/backend/app/routers/foo.py,db/etl_feiqiu/migrations/xxx.sql" 不带 --files 时从 git status 获取变更列表。 带 --files 时使用指定的文件列表(逗号分隔),跳过 git。 """ import argparse import json import re import subprocess import sys from datetime import datetime, timezone, timedelta TZ_SHANGHAI = timezone(timedelta(hours=8)) # ── 高风险路径规则 ── RISK_RULES = [ (re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"), "etl"), (re.compile(r"^apps/backend/app/"), "backend"), (re.compile(r"^apps/admin-web/src/"), "admin-web"), (re.compile(r"^apps/tenant-admin/src/"), "tenant-admin"), (re.compile(r"^apps/miniprogram/(miniapp|miniprogram)/"), "miniprogram"), (re.compile(r"^packages/shared/"), "shared"), (re.compile(r"^db/"), "db"), ] NOISE_PATTERNS = [ re.compile(r"^docs/audit/"), re.compile(r"^\.kiro/"), re.compile(r"^\.claude/"), re.compile(r"^tmp/"), re.compile(r"^\.hypothesis/"), re.compile(r"\.png$"), re.compile(r"\.jpg$"), ] # ── 代码→文档映射 ── DOC_MAP = { "apps/backend/app/routers/": ["apps/backend/docs/API-REFERENCE.md"], "apps/backend/app/services/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"], "apps/backend/app/auth/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"], "apps/backend/app/schemas/": ["apps/backend/docs/API-REFERENCE.md"], "apps/etl/connectors/feiqiu/tasks/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"], "apps/etl/connectors/feiqiu/loaders/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"], "apps/etl/connectors/feiqiu/scd/": ["apps/etl/connectors/feiqiu/docs/business-rules/scd2_rules.md"], "apps/etl/connectors/feiqiu/orchestration/": ["apps/etl/connectors/feiqiu/docs/architecture/"], "apps/admin-web/src/": ["apps/admin-web/README.md"], "apps/tenant-admin/src/": ["apps/tenant-admin/README.md"], "apps/miniprogram/": ["apps/miniprogram/README.md"], "packages/shared/": ["packages/shared/README.md"], "db/etl_feiqiu/migrations/": ["docs/database/"], "db/zqyy_app/migrations/": ["docs/database/"], } MIGRATION_PATTERNS = [ re.compile(r"^db/etl_feiqiu/migrations/.*\.sql$"), re.compile(r"^db/zqyy_app/migrations/.*\.sql$"), re.compile(r"^db/fdw/.*\.sql$"), ] DDL_BASELINE_DIR = "docs/database/ddl/" BD_MANUAL_PATTERN = re.compile(r"^docs/database/BD_Manual_.*\.md$") def get_changed_files_from_git() -> list[str]: """从 git status --porcelain 提取变更文件路径""" try: result = subprocess.run( ["git", "status", "--porcelain"], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: return [] except Exception: return [] files = [] for line in result.stdout.splitlines(): if len(line) < 4: continue path = line[3:].strip() if " -> " in path: path = path.split(" -> ")[-1] path = path.strip().strip('"').replace("\\", "/") if path: files.append(path) return sorted(set(files)) def is_noise(f: str) -> bool: return any(p.search(f) for p in NOISE_PATTERNS) def classify(files: list[str]) -> dict: """分类变更文件,输出结构化审查清单""" real_files = [f for f in files if not is_noise(f)] risk_tags = [] high_risk_files = [] new_migration_sql = [] code_without_docs = [] has_bd_manual = False has_ddl_baseline = False code_files = [] doc_files = set() for f in real_files: # 高风险分类 for pattern, label in RISK_RULES: if pattern.search(f): high_risk_files.append(f) tag = f"dir:{label}" if tag not in risk_tags: risk_tags.append(tag) break # 根目录散文件 if "/" not in f and "root-file" not in risk_tags: risk_tags.append("root-file") # 迁移 SQL for mp in MIGRATION_PATTERNS: if mp.search(f): new_migration_sql.append(f) if "db-schema-change" not in risk_tags: risk_tags.append("db-schema-change") break # BD Manual / DDL 基线 if BD_MANUAL_PATTERN.search(f): has_bd_manual = True if f.startswith(DDL_BASELINE_DIR): has_ddl_baseline = True # 分桶 if f.endswith(".md") or "/docs/" in f: doc_files.add(f) if f.endswith((".py", ".ts", ".tsx", ".js", ".jsx", ".sql")): code_files.append(f) # 代码→文档映射检查 for cf in code_files: expected_docs = [] for prefix, docs in DOC_MAP.items(): if cf.startswith(prefix): expected_docs.extend(docs) if not expected_docs: continue has_doc = False for ed in expected_docs: if ed in doc_files: has_doc = True break if ed.endswith("/") and any(d.startswith(ed) for d in doc_files): has_doc = True break if not has_doc: code_without_docs.append({ "file": cf, "expected_docs": expected_docs, }) return { "scanned_at": datetime.now(TZ_SHANGHAI).strftime("%Y-%m-%d %H:%M:%S"), "total_files": len(real_files), "all_files": real_files, "high_risk_files": sorted(set(high_risk_files)), "risk_tags": risk_tags, "new_migration_sql": new_migration_sql, "code_without_docs": code_without_docs, "has_bd_manual": has_bd_manual, "has_ddl_baseline": has_ddl_baseline, "audit_required": len(risk_tags) > 0, } def main(): parser = argparse.ArgumentParser() parser.add_argument( "--files", help="逗号分隔的文件列表(跳过 git status)", default=None, ) args = parser.parse_args() if args.files: files = [f.strip() for f in args.files.split(",") if f.strip()] else: files = get_changed_files_from_git() if not files: print(json.dumps({"audit_required": False, "total_files": 0}, ensure_ascii=False)) return result = classify(files) print(json.dumps(result, indent=2, ensure_ascii=False)) if __name__ == "__main__": try: main() except Exception as e: print(json.dumps({"error": str(e), "audit_required": False}, ensure_ascii=False))