Files
Neo-ZQYY/.kiro/scripts/build_audit_context.py

175 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""build_audit_context — 合并所有前置 hook 产出,生成统一审计上下文快照。
读取:
- .kiro/state/.audit_state.jsonaudit-flagger 产出:风险判定、变更文件列表)
- .kiro/state/.compliance_state.jsonchange-compliance 产出:文档缺失、迁移状态)
- .kiro/state/.last_prompt_id.jsonprompt-audit-log 产出Prompt ID 溯源)
- git diff --stat HEAD变更统计摘要
- git diff HEAD仅高风险文件的 diff截断到合理长度
输出:.kiro/state/.audit_context.jsonaudit-writer 子代理的唯一输入)
"""
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone, timedelta
TZ_TAIPEI = timezone(timedelta(hours=8))
CONTEXT_PATH = os.path.join(".kiro", "state", ".audit_context.json")
# 高风险路径(只对这些文件取 diff避免 diff 过大)
HIGH_RISK_PATTERNS = [
re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"),
re.compile(r"^apps/backend/app/"),
re.compile(r"^apps/admin-web/src/"),
re.compile(r"^apps/miniprogram/"),
re.compile(r"^packages/shared/"),
re.compile(r"^db/"),
]
def safe_read_json(path):
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def git_diff_stat():
try:
r = subprocess.run(
["git", "diff", "--stat", "HEAD"],
capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=15
)
return r.stdout.strip() if r.returncode == 0 else ""
except Exception:
return ""
def git_diff_files(files, max_total=30000):
"""获取指定文件的 git diff截断到 max_total 字符"""
if not files:
return ""
# 分批取 diff避免命令行过长
all_diff = []
total_len = 0
for f in files:
if total_len >= max_total:
all_diff.append(f"\n[TRUNCATED: diff exceeds {max_total // 1000}KB limit]")
break
try:
r = subprocess.run(
["git", "diff", "HEAD", "--", f],
capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=10
)
if r.returncode == 0 and r.stdout.strip():
chunk = r.stdout.strip()
# 单文件 diff 截断
if len(chunk) > 5000:
chunk = chunk[:5000] + f"\n[TRUNCATED: {f} diff too long]"
all_diff.append(chunk)
total_len += len(chunk)
except Exception:
continue
return "\n".join(all_diff)
def get_latest_prompt_log():
"""获取最新的 prompt log 文件内容(用于溯源)"""
log_dir = os.path.join("docs", "audit", "prompt_logs")
if not os.path.isdir(log_dir):
return ""
try:
files = sorted(
[f for f in os.listdir(log_dir) if f.startswith("prompt_log_")],
reverse=True
)
if not files:
return ""
latest = os.path.join(log_dir, files[0])
with open(latest, "r", encoding="utf-8") as f:
content = f.read()
# 截断过长内容
if len(content) > 3000:
content = content[:3000] + "\n[TRUNCATED]"
return content
except Exception:
return ""
def main():
now = datetime.now(TZ_TAIPEI)
# 读取前置 hook 产出
audit_state = safe_read_json(os.path.join(".kiro", "state", ".audit_state.json"))
compliance = safe_read_json(os.path.join(".kiro", "state", ".compliance_state.json"))
prompt_id_info = safe_read_json(os.path.join(".kiro", "state", ".last_prompt_id.json"))
# 从 audit_state 提取高风险文件
changed_files = audit_state.get("changed_files", [])
high_risk_files = [
f for f in changed_files
if any(p.search(f) for p in HIGH_RISK_PATTERNS)
]
# 获取 diff仅高风险文件
diff_stat = git_diff_stat()
high_risk_diff = git_diff_files(high_risk_files)
# 获取最新 prompt log
prompt_log = get_latest_prompt_log()
# 构建统一上下文
context = {
"built_at": now.isoformat(),
"prompt_id": prompt_id_info.get("prompt_id", "unknown"),
"prompt_at": prompt_id_info.get("at", ""),
# 来自 audit-flagger
"audit_required": audit_state.get("audit_required", False),
"db_docs_required": audit_state.get("db_docs_required", False),
"reasons": audit_state.get("reasons", []),
"changed_files": changed_files,
"high_risk_files": high_risk_files,
# 来自 change-compliance-prescan
"compliance": {
"code_without_docs": compliance.get("code_without_docs", []),
"new_migration_sql": compliance.get("new_migration_sql", []),
"has_bd_manual": compliance.get("has_bd_manual", False),
"has_audit_record": compliance.get("has_audit_record", False),
"has_ddl_baseline": compliance.get("has_ddl_baseline", False),
},
# git 摘要
"diff_stat": diff_stat,
"high_risk_diff": high_risk_diff,
# prompt 溯源
"latest_prompt_log": prompt_log,
}
os.makedirs(os.path.join(".kiro", "state"), exist_ok=True)
with open(CONTEXT_PATH, "w", encoding="utf-8") as f:
json.dump(context, f, indent=2, ensure_ascii=False)
# 输出摘要到 stdout
print(f"audit_context built: {len(changed_files)} files, "
f"{len(high_risk_files)} high-risk, "
f"{len(compliance.get('code_without_docs', []))} docs missing")
if __name__ == "__main__":
try:
main()
except Exception as e:
sys.stderr.write(f"build_audit_context failed: {e}\n")
sys.exit(1)