Files
Neo-ZQYY/.kiro/scripts/prompt_on_submit.py

232 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""prompt_on_submit — promptSubmit 合并 hook 脚本v2文件基线模式
合并原 audit_flagger + prompt_audit_log 的功能:
1. 扫描工作区文件 → 保存基线快照 → .kiro/state/.file_baseline.json
2. 基于基线文件列表做风险判定 → .kiro/state/.audit_state.json
3. 记录 prompt 日志 → docs/audit/prompt_logs/
变更检测不再依赖 git status解决不常 commit 导致的误判问题)。
风险判定仍基于 git status因为需要知道哪些文件相对于 commit 有变化)。
所有功能块用 try/except 隔离,单个失败不影响其他。
"""
import hashlib
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone, timedelta
# 同目录导入文件基线模块 + cwd 校验
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from file_baseline import scan_workspace, save_baseline
from _ensure_root import ensure_repo_root
TZ_TAIPEI = timezone(timedelta(hours=8))
# ── 风险规则(来自 audit_flagger ──
RISK_RULES = [
(re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"), "etl"),
(re.compile(r"^apps/backend/app/"), "backend"),
(re.compile(r"^apps/admin-web/src/"), "admin-web"),
(re.compile(r"^apps/miniprogram/(miniapp|miniprogram)/"), "miniprogram"),
(re.compile(r"^packages/shared/"), "shared"),
(re.compile(r"^db/"), "db"),
]
NOISE_PATTERNS = [
re.compile(r"^docs/audit/"),
re.compile(r"^\.kiro/"),
re.compile(r"^tmp/"),
re.compile(r"^\.hypothesis/"),
]
DB_PATTERNS = [
re.compile(r"^db/"),
re.compile(r"/migrations/"),
re.compile(r"\.sql$"),
re.compile(r"\.prisma$"),
]
STATE_PATH = os.path.join(".kiro", "state", ".audit_state.json")
PROMPT_ID_PATH = os.path.join(".kiro", "state", ".last_prompt_id.json")
def now_taipei():
return datetime.now(TZ_TAIPEI)
def sha1hex(s: str) -> str:
return hashlib.sha1(s.encode("utf-8")).hexdigest()
def get_git_changed_files() -> list[str]:
"""通过 git status 获取变更文件(仅用于风险判定,不用于变更检测)"""
try:
r = subprocess.run(
["git", "status", "--porcelain"],
capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=10
)
if r.returncode != 0:
return []
except Exception:
return []
files = []
for line in r.stdout.splitlines():
if len(line) < 4:
continue
path = line[3:].strip()
if " -> " in path:
path = path.split(" -> ")[-1]
path = path.strip().strip('"').replace("\\", "/")
if path:
files.append(path)
return files
def is_noise(f: str) -> bool:
return any(p.search(f) for p in NOISE_PATTERNS)
def safe_read_json(path):
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def write_json(path, data):
os.makedirs(os.path.dirname(path) or os.path.join(".kiro", "state"), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# ── 功能块 1风险标记基于 git status判定哪些文件需要审计 ──
def do_audit_flag(git_files, now):
files = sorted(set(f for f in git_files if not is_noise(f)))
if not files:
write_json(STATE_PATH, {
"audit_required": False,
"db_docs_required": False,
"reasons": [],
"changed_files": [],
"change_fingerprint": "",
"marked_at": now.isoformat(),
"last_reminded_at": None,
})
return
reasons = []
audit_required = False
db_docs_required = False
for f in files:
for pattern, label in RISK_RULES:
if pattern.search(f):
audit_required = True
tag = f"dir:{label}"
if tag not in reasons:
reasons.append(tag)
if "/" not in f:
audit_required = True
if "root-file" not in reasons:
reasons.append("root-file")
if any(p.search(f) for p in DB_PATTERNS):
db_docs_required = True
if "db-schema-change" not in reasons:
reasons.append("db-schema-change")
fp = sha1hex("\n".join(files))
# 保留已有 last_reminded_at
last_reminded = None
existing = safe_read_json(STATE_PATH)
if existing.get("change_fingerprint") == fp:
last_reminded = existing.get("last_reminded_at")
write_json(STATE_PATH, {
"audit_required": audit_required,
"db_docs_required": db_docs_required,
"reasons": reasons,
"changed_files": files[:50],
"change_fingerprint": fp,
"marked_at": now.isoformat(),
"last_reminded_at": last_reminded,
})
# ── 功能块 2Prompt 日志 ──
def do_prompt_log(now):
prompt_id = f"P{now.strftime('%Y%m%d-%H%M%S')}"
prompt_raw = os.environ.get("USER_PROMPT", "")
if len(prompt_raw) > 20000:
prompt_raw = prompt_raw[:5000] + "\n[TRUNCATED: prompt too long]"
summary = " ".join(prompt_raw.split()).strip()
if len(summary) > 120:
summary = summary[:120] + ""
if not summary:
summary = "(empty prompt)"
log_dir = os.path.join("docs", "audit", "prompt_logs")
os.makedirs(log_dir, exist_ok=True)
filename = f"prompt_log_{now.strftime('%Y%m%d_%H%M%S')}.md"
entry = f"""- [{prompt_id}] {now.strftime('%Y-%m-%d %H:%M:%S %z')}
- summary: {summary}
- prompt:
```text
{prompt_raw}
```
"""
with open(os.path.join(log_dir, filename), "w", encoding="utf-8") as f:
f.write(entry)
write_json(PROMPT_ID_PATH, {"prompt_id": prompt_id, "at": now.isoformat()})
# ── 功能块 3文件基线快照替代 git snapshot ──
def do_file_baseline():
"""扫描工作区文件 mtime+size保存为基线快照。
agentStop 时再扫一次对比,即可精确检测本次对话期间的变更。
"""
baseline = scan_workspace(".")
save_baseline(baseline)
def main():
ensure_repo_root()
now = now_taipei()
# 功能块 3文件基线快照最先执行记录对话开始时的文件状态
try:
do_file_baseline()
except Exception:
pass
# 功能块 1风险标记仍用 git status因为需要知道未提交的变更
try:
git_files = get_git_changed_files()
do_audit_flag(git_files, now)
except Exception:
pass
# 功能块 2Prompt 日志
try:
do_prompt_log(now)
except Exception:
pass
if __name__ == "__main__":
try:
main()
except Exception:
pass