#!/usr/bin/env python3 """build_audit_context — 合并所有前置 hook 产出,生成统一审计上下文快照。 读取: - .kiro/.audit_state.json(audit-flagger 产出:风险判定、变更文件列表) - .kiro/.compliance_state.json(change-compliance 产出:文档缺失、迁移状态) - .kiro/.last_prompt_id.json(prompt-audit-log 产出:Prompt ID 溯源) - git diff --stat HEAD(变更统计摘要) - git diff HEAD(仅高风险文件的 diff,截断到合理长度) 输出:.kiro/.audit_context.json(audit-writer 子代理的唯一输入) """ import json import os import re import subprocess import sys from datetime import datetime, timezone, timedelta TZ_TAIPEI = timezone(timedelta(hours=8)) CONTEXT_PATH = os.path.join(".kiro", ".audit_context.json") # 高风险路径(只对这些文件取 diff,避免 diff 过大) HIGH_RISK_PATTERNS = [ re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"), re.compile(r"^apps/backend/app/"), re.compile(r"^apps/admin-web/src/"), re.compile(r"^apps/miniprogram/"), re.compile(r"^packages/shared/"), re.compile(r"^db/"), ] def safe_read_json(path): if not os.path.isfile(path): return {} try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {} def git_diff_stat(): try: r = subprocess.run( ["git", "diff", "--stat", "HEAD"], capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=15 ) return r.stdout.strip() if r.returncode == 0 else "" except Exception: return "" def git_diff_files(files, max_total=30000): """获取指定文件的 git diff,截断到 max_total 字符""" if not files: return "" # 分批取 diff,避免命令行过长 all_diff = [] total_len = 0 for f in files: if total_len >= max_total: all_diff.append(f"\n[TRUNCATED: diff exceeds {max_total // 1000}KB limit]") break try: r = subprocess.run( ["git", "diff", "HEAD", "--", f], capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=10 ) if r.returncode == 0 and r.stdout.strip(): chunk = r.stdout.strip() # 单文件 diff 截断 if len(chunk) > 5000: chunk = chunk[:5000] + f"\n[TRUNCATED: {f} diff too long]" all_diff.append(chunk) total_len += len(chunk) except Exception: continue return "\n".join(all_diff) def get_latest_prompt_log(): """获取最新的 prompt log 文件内容(用于溯源)""" log_dir = os.path.join("docs", "audit", "prompt_logs") if not os.path.isdir(log_dir): return "" try: files = sorted( [f for f in os.listdir(log_dir) if f.startswith("prompt_log_")], reverse=True ) if not files: return "" latest = os.path.join(log_dir, files[0]) with open(latest, "r", encoding="utf-8") as f: content = f.read() # 截断过长内容 if len(content) > 3000: content = content[:3000] + "\n[TRUNCATED]" return content except Exception: return "" def main(): now = datetime.now(TZ_TAIPEI) # 读取前置 hook 产出 audit_state = safe_read_json(os.path.join(".kiro", ".audit_state.json")) compliance = safe_read_json(os.path.join(".kiro", ".compliance_state.json")) prompt_id_info = safe_read_json(os.path.join(".kiro", ".last_prompt_id.json")) # 从 audit_state 提取高风险文件 changed_files = audit_state.get("changed_files", []) high_risk_files = [ f for f in changed_files if any(p.search(f) for p in HIGH_RISK_PATTERNS) ] # 获取 diff(仅高风险文件) diff_stat = git_diff_stat() high_risk_diff = git_diff_files(high_risk_files) # 获取最新 prompt log prompt_log = get_latest_prompt_log() # 构建统一上下文 context = { "built_at": now.isoformat(), "prompt_id": prompt_id_info.get("prompt_id", "unknown"), "prompt_at": prompt_id_info.get("at", ""), # 来自 audit-flagger "audit_required": audit_state.get("audit_required", False), "db_docs_required": audit_state.get("db_docs_required", False), "reasons": audit_state.get("reasons", []), "changed_files": changed_files, "high_risk_files": high_risk_files, # 来自 change-compliance-prescan "compliance": { "code_without_docs": compliance.get("code_without_docs", []), "new_migration_sql": compliance.get("new_migration_sql", []), "has_bd_manual": compliance.get("has_bd_manual", False), "has_audit_record": compliance.get("has_audit_record", False), "has_ddl_baseline": compliance.get("has_ddl_baseline", False), }, # git 摘要 "diff_stat": diff_stat, "high_risk_diff": high_risk_diff, # prompt 溯源 "latest_prompt_log": prompt_log, } os.makedirs(".kiro", exist_ok=True) with open(CONTEXT_PATH, "w", encoding="utf-8") as f: json.dump(context, f, indent=2, ensure_ascii=False) # 输出摘要到 stdout print(f"audit_context built: {len(changed_files)} files, " f"{len(high_risk_files)} high-risk, " f"{len(compliance.get('code_without_docs', []))} docs missing") if __name__ == "__main__": try: main() except Exception as e: sys.stderr.write(f"build_audit_context failed: {e}\n") sys.exit(1)