Files
Neo-ZQYY/.kiro/scripts/agent_on_stop.py

651 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""agent_on_stop — agentStop 合并 hook 脚本v3含 LLM 摘要生成)。
合并原 audit_reminder + change_compliance_prescan + build_audit_context + session_extract
1. 全量会话记录提取 → docs/audit/session_logs/(无论是否有代码变更)
2. 为刚提取的 session 调用百炼 API 生成 description → 写入双索引
3. 扫描工作区 → 与 promptSubmit 基线对比 → 精确检测本次对话变更
4. 若无任何文件变更 → 跳过审查,静默退出
5. 合规预扫描 → .kiro/state/.compliance_state.json
6. 构建审计上下文 → .kiro/state/.audit_context.json
7. 审计提醒15 分钟限频)→ stderr
变更检测基于文件 mtime+size 基线对比,不依赖 git commit 历史。
所有功能块用 try/except 隔离,单个失败不影响其他。
"""
import hashlib
import json
import os
import re
import subprocess
import sys
from datetime import datetime, timezone, timedelta
# 同目录导入文件基线模块 + cwd 校验
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from file_baseline import scan_workspace, load_baseline, diff_baselines, total_changes
from _ensure_root import ensure_repo_root
TZ_TAIPEI = timezone(timedelta(hours=8))
MIN_INTERVAL = timedelta(minutes=15)
# 路径常量
STATE_PATH = os.path.join(".kiro", "state", ".audit_state.json")
COMPLIANCE_PATH = os.path.join(".kiro", "state", ".compliance_state.json")
CONTEXT_PATH = os.path.join(".kiro", "state", ".audit_context.json")
PROMPT_ID_PATH = os.path.join(".kiro", "state", ".last_prompt_id.json")
# 噪声路径(用于过滤变更列表中的非业务文件)
NOISE_PATTERNS = [
re.compile(r"^docs/audit/"),
re.compile(r"^\.kiro/"),
re.compile(r"^\.hypothesis/"),
re.compile(r"^tmp/"),
re.compile(r"\.png$"),
re.compile(r"\.jpg$"),
]
# 高风险路径
HIGH_RISK_PATTERNS = [
re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"),
re.compile(r"^apps/backend/app/"),
re.compile(r"^apps/admin-web/src/"),
re.compile(r"^apps/miniprogram/"),
re.compile(r"^packages/shared/"),
re.compile(r"^db/"),
]
# 文档映射(合规检查用)
DOC_MAP = {
"apps/backend/app/routers/": ["apps/backend/docs/API-REFERENCE.md", "docs/contracts/openapi/backend-api.json"],
"apps/backend/app/services/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"],
"apps/backend/app/auth/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md", "docs/contracts/openapi/backend-api.json"],
"apps/backend/app/schemas/": ["docs/contracts/openapi/backend-api.json"],
"apps/backend/app/main.py": ["docs/contracts/openapi/backend-api.json"],
"apps/etl/connectors/feiqiu/tasks/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
"apps/etl/connectors/feiqiu/loaders/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
"apps/etl/connectors/feiqiu/scd/": ["apps/etl/connectors/feiqiu/docs/business-rules/scd2_rules.md"],
"apps/etl/connectors/feiqiu/orchestration/": ["apps/etl/connectors/feiqiu/docs/architecture/"],
"apps/admin-web/src/": ["apps/admin-web/README.md"],
"apps/miniprogram/": ["apps/miniprogram/README.md"],
"packages/shared/": ["packages/shared/README.md"],
}
# 接口变更检测模式routers / auth / schemas / main.py
API_CHANGE_PATTERNS = [
re.compile(r"^apps/backend/app/routers/"),
re.compile(r"^apps/backend/app/auth/"),
re.compile(r"^apps/backend/app/schemas/"),
re.compile(r"^apps/backend/app/main\.py$"),
]
MIGRATION_PATTERNS = [
re.compile(r"^db/etl_feiqiu/migrations/.*\.sql$"),
re.compile(r"^db/zqyy_app/migrations/.*\.sql$"),
re.compile(r"^db/fdw/.*\.sql$"),
]
BD_MANUAL_PATTERN = re.compile(r"^docs/database/BD_Manual_.*\.md$")
DDL_BASELINE_DIR = "docs/database/ddl/"
AUDIT_CHANGES_DIR = "docs/audit/changes/"
def now_taipei():
return datetime.now(TZ_TAIPEI)
def sha1hex(s: str) -> str:
return hashlib.sha1(s.encode("utf-8")).hexdigest()
def is_noise(f: str) -> bool:
return any(p.search(f) for p in NOISE_PATTERNS)
def safe_read_json(path):
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def write_json(path, data):
os.makedirs(os.path.dirname(path) or os.path.join(".kiro", "state"), exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def git_diff_stat():
try:
r = subprocess.run(
["git", "diff", "--stat", "HEAD"],
capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=15
)
return r.stdout.strip() if r.returncode == 0 else ""
except Exception:
return ""
def git_diff_files(files, max_total=30000, max_per_file=15000):
"""获取文件的实际 diff 内容。对已跟踪文件用 git diff HEAD对新文件直接读取内容。"""
if not files:
return ""
all_diff = []
total_len = 0
for f in files:
if total_len >= max_total:
all_diff.append(f"\n[TRUNCATED: diff exceeds {max_total // 1000}KB]")
break
try:
# 先尝试 git diff HEAD
r = subprocess.run(
["git", "diff", "HEAD", "--", f],
capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=10
)
chunk = ""
if r.returncode == 0 and r.stdout.strip():
chunk = r.stdout.strip()
elif os.path.isfile(f):
# untracked 新文件:直接读取内容作为 diff
try:
with open(f, "r", encoding="utf-8", errors="replace") as fh:
file_content = fh.read(max_per_file + 100)
chunk = f"--- /dev/null\n+++ b/{f}\n@@ -0,0 +1 @@\n" + file_content
except Exception:
continue
if chunk:
if len(chunk) > max_per_file:
chunk = chunk[:max_per_file] + f"\n[TRUNCATED: {f} diff too long]"
all_diff.append(chunk)
total_len += len(chunk)
except Exception:
continue
return "\n".join(all_diff)
def get_latest_prompt_log():
log_dir = os.path.join("docs", "audit", "prompt_logs")
if not os.path.isdir(log_dir):
return ""
try:
files = sorted(
[f for f in os.listdir(log_dir) if f.startswith("prompt_log_")],
reverse=True
)
if not files:
return ""
with open(os.path.join(log_dir, files[0]), "r", encoding="utf-8") as f:
content = f.read()
return content[:3000] + "\n[TRUNCATED]" if len(content) > 3000 else content
except Exception:
return ""
# ── 步骤 1基于文件基线检测变更 ──
def detect_changes_via_baseline():
"""扫描当前工作区,与 promptSubmit 基线对比,返回精确的变更列表。
返回 (all_changed_files, external_files, diff_result, no_change)
- all_changed_files: 本次对话期间所有变更文件added + modified
- external_files: 暂时等于 all_changed_files后续可通过 Kiro 写入日志细化)
- diff_result: 完整的 diff 结果 {added, modified, deleted}
- no_change: 是否无任何变更
"""
before = load_baseline()
after = scan_workspace(".")
if not before:
# 没有基线(首次运行或基线丢失),无法对比,回退到全部文件
return [], [], {"added": [], "modified": [], "deleted": []}, True
diff = diff_baselines(before, after)
count = total_changes(diff)
if count == 0:
return [], [], diff, True
# 所有变更文件 = added + modifieddeleted 的文件已不存在,不参与风险判定)
all_changed = sorted(set(diff["added"] + diff["modified"]))
# 过滤噪声
real_files = [f for f in all_changed if not is_noise(f)]
if not real_files:
return [], [], diff, True
# 外部变更:目前所有基线检测到的变更都记录,
# 因为 Kiro 的写入也会改变 mtime所以这里的"外部"含义是
# "本次对话期间发生的所有变更",包括 Kiro 和非 Kiro 的。
# 精确区分需要 Kiro 运行时提供写入文件列表,目前不可用。
external_files = [] # 不再误报外部变更
return real_files, external_files, diff, False
# ── 步骤 3合规预扫描 ──
def do_compliance_prescan(all_files):
result = {
"new_migration_sql": [],
"new_or_modified_sql": [],
"code_without_docs": [],
"new_files": [],
"has_bd_manual": False,
"has_audit_record": False,
"has_ddl_baseline": False,
"api_changed": False,
"openapi_spec_stale": False,
}
code_files = []
doc_files = set()
for f in all_files:
if is_noise(f):
continue
for mp in MIGRATION_PATTERNS:
if mp.search(f):
result["new_migration_sql"].append(f)
break
if f.endswith(".sql"):
result["new_or_modified_sql"].append(f)
if BD_MANUAL_PATTERN.search(f):
result["has_bd_manual"] = True
if f.startswith(AUDIT_CHANGES_DIR):
result["has_audit_record"] = True
if f.startswith(DDL_BASELINE_DIR):
result["has_ddl_baseline"] = True
if f.endswith(".md") or "/docs/" in f:
doc_files.add(f)
if f.endswith((".py", ".ts", ".tsx", ".js", ".jsx")):
code_files.append(f)
# 检测接口相关文件变更
for ap in API_CHANGE_PATTERNS:
if ap.search(f):
result["api_changed"] = True
break
# 接口变更但 openapi spec 未同步更新 → 标记过期
if result["api_changed"] and "docs/contracts/openapi/backend-api.json" not in all_files:
result["openapi_spec_stale"] = True
for cf in code_files:
expected_docs = []
for prefix, docs in DOC_MAP.items():
if cf.startswith(prefix):
expected_docs.extend(docs)
if expected_docs:
has_doc = False
for ed in expected_docs:
if ed in doc_files:
has_doc = True
break
if ed.endswith("/") and any(d.startswith(ed) for d in doc_files):
has_doc = True
break
if not has_doc:
result["code_without_docs"].append({
"file": cf,
"expected_docs": expected_docs,
})
needs_check = bool(
result["new_migration_sql"]
or result["code_without_docs"]
or result["openapi_spec_stale"]
)
now = now_taipei()
write_json(COMPLIANCE_PATH, {
"needs_check": needs_check,
"scanned_at": now.isoformat(),
**result,
})
return result
# ── 步骤 4构建审计上下文 ──
def do_build_audit_context(all_files, diff_result, compliance):
now = now_taipei()
audit_state = safe_read_json(STATE_PATH)
prompt_info = safe_read_json(PROMPT_ID_PATH)
# 使用 audit_state 中的 changed_files来自 git status 的风险文件)
# 与本次对话的 baseline diff 合并
git_changed = audit_state.get("changed_files", [])
session_changed = all_files # 本次对话期间变更的文件
# 合并两个来源,去重
all_changed = sorted(set(git_changed + session_changed))
high_risk_files = [
f for f in all_changed
if any(p.search(f) for p in HIGH_RISK_PATTERNS)
]
diff_stat = git_diff_stat()
high_risk_diff = git_diff_files(high_risk_files)
prompt_log = get_latest_prompt_log()
context = {
"built_at": now.isoformat(),
"prompt_id": prompt_info.get("prompt_id", "unknown"),
"prompt_at": prompt_info.get("at", ""),
"audit_required": audit_state.get("audit_required", False),
"db_docs_required": audit_state.get("db_docs_required", False),
"reasons": audit_state.get("reasons", []),
"changed_files": all_changed[:100],
"high_risk_files": high_risk_files,
"session_diff": {
"added": diff_result.get("added", [])[:50],
"modified": diff_result.get("modified", [])[:50],
"deleted": diff_result.get("deleted", [])[:50],
},
"compliance": {
"code_without_docs": compliance.get("code_without_docs", []),
"new_migration_sql": compliance.get("new_migration_sql", []),
"has_bd_manual": compliance.get("has_bd_manual", False),
"has_audit_record": compliance.get("has_audit_record", False),
"has_ddl_baseline": compliance.get("has_ddl_baseline", False),
"api_changed": compliance.get("api_changed", False),
"openapi_spec_stale": compliance.get("openapi_spec_stale", False),
},
"diff_stat": diff_stat,
"high_risk_diff": high_risk_diff,
"latest_prompt_log": prompt_log,
}
write_json(CONTEXT_PATH, context)
# ── 步骤 5审计提醒15 分钟限频) ──
def do_audit_reminder(real_files):
state = safe_read_json(STATE_PATH)
if not state.get("audit_required"):
return
# 无变更时不提醒
if not real_files:
return
now = now_taipei()
last_str = state.get("last_reminded_at")
if last_str:
try:
last = datetime.fromisoformat(last_str)
if (now - last) < MIN_INTERVAL:
return
except Exception:
pass
state["last_reminded_at"] = now.isoformat()
write_json(STATE_PATH, state)
reasons = state.get("reasons", [])
reason_text = ", ".join(reasons) if reasons else "high-risk paths changed"
# 仅信息性提醒exit(0) 避免 agent 将其视为错误并自行执行审计
# 审计留痕统一由用户手动触发 /audit 完成
sys.stderr.write(
f"[AUDIT REMINDER] Pending audit ({reason_text}), "
f"{len(real_files)} files changed this session. "
f"Run /audit to sync. (15min rate limit)\n"
)
sys.exit(0)
# ── 步骤 6全量会话记录提取 ──
def do_full_session_extract():
"""从 Kiro globalStorage 提取当前 execution 的全量对话记录。
调用 scripts/ops/extract_kiro_session.py 的核心逻辑。
仅提取最新一条未索引的 execution避免重复。
"""
# 动态导入提取器(避免启动时 import 开销)
scripts_ops = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "scripts", "ops")
scripts_ops = os.path.normpath(scripts_ops)
if scripts_ops not in sys.path:
sys.path.insert(0, scripts_ops)
try:
from extract_kiro_session import extract_latest
except ImportError:
return # 提取器不存在则静默跳过
# globalStorage 路径:从环境变量或默认位置
global_storage = os.environ.get(
"KIRO_GLOBAL_STORAGE",
os.path.join(os.environ.get("APPDATA", ""), "Kiro", "User", "globalStorage")
)
workspace_path = os.getcwd()
extract_latest(global_storage, workspace_path)
def _extract_summary_content(md_content: str) -> str:
"""从 session log markdown 中提取适合生成摘要的内容。
策略:如果"用户输入"包含 CONTEXT TRANSFER跨轮续接
则替换为简短标注,避免历史背景干扰本轮摘要生成。
"""
import re
# 检测用户输入中是否包含 context transfer
ct_pattern = re.compile(r"## 2\. 用户输入\s*\n```\s*\n.*?CONTEXT TRANSFER", re.DOTALL)
if ct_pattern.search(md_content):
# 替换"用户输入"section 为简短标注
# 匹配从 "## 2. 用户输入" 到下一个 "## 3." 之间的内容
md_content = re.sub(
r"(## 2\. 用户输入)\s*\n```[\s\S]*?```\s*\n(?=## 3\.)",
r"\1\n\n[本轮为 Context Transfer 续接,用户输入为历史多轮摘要,已省略。请基于执行摘要和对话记录中的实际工具调用判断本轮工作。]\n\n",
md_content,
)
return md_content
# ── 步骤 7为最新 session 生成 LLM 摘要 ──
_SUMMARY_SYSTEM_PROMPT = """你是一个专业的技术对话分析师。你的任务是为 AI 编程助手的一轮执行execution生成简洁的中文摘要。
背景一个对话chatSession包含多轮执行execution。每轮执行 = 用户发一条消息 → AI 完成响应。你收到的是单轮执行的完整记录。
摘要规则:
1. 只描述本轮执行实际完成的工作,不要描述历史背景
2. 列出完成的功能点/任务(一轮可能完成多个)
3. 包含关键技术细节文件路径、模块名、数据库表、API 端点等
4. bug 修复要说明原因和方案
5. 不写过程性描述("用户说..."),只写结果
6. 内容太短或无实质内容的,写"无实质内容"
7. 不限字数,信息完整优先,避免截断失真
重要:
- "执行摘要"(📋)是最可靠的信息源,优先基于它判断本轮做了什么
- 如果"用户输入"包含 CONTEXT TRANSFER那是之前多轮的历史摘要不是本轮工作
- 对话记录中的实际工具调用和文件变更才是本轮的真实操作
请直接输出摘要,不要添加任何前缀或解释。"""
def do_generate_description():
"""为缺少 description 的主对话 entry 调用百炼 API 生成摘要,写入双索引。"""
from dotenv import load_dotenv
load_dotenv()
api_key = os.environ.get("BAILIAN_API_KEY", "")
if not api_key:
return
model = os.environ.get("BAILIAN_MODEL", "qwen-plus")
base_url = os.environ.get("BAILIAN_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")
scripts_ops = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "scripts", "ops")
scripts_ops = os.path.normpath(scripts_ops)
if scripts_ops not in sys.path:
sys.path.insert(0, scripts_ops)
try:
from extract_kiro_session import load_index, save_index, load_full_index, save_full_index
except ImportError:
return
index = load_index()
entries = index.get("entries", {})
if not entries:
return
# 收集所有缺少 description 的主对话 entry
targets = []
for eid, ent in entries.items():
if ent.get("is_sub"):
continue
if not ent.get("description"):
targets.append((eid, ent))
if not targets:
return
# agent_on_stop 场景下限制处理数量,避免超时
# 批量处理积压用独立脚本 batch_generate_summaries.py
MAX_PER_RUN = 10
if len(targets) > MAX_PER_RUN:
# 优先处理最新的(按 startTime 降序)
targets.sort(key=lambda t: t[1].get("startTime", ""), reverse=True)
targets = targets[:MAX_PER_RUN]
try:
from openai import OpenAI
client = OpenAI(api_key=api_key, base_url=base_url)
except Exception:
return
full_index = load_full_index()
full_entries = full_index.get("entries", {})
generated = 0
for target_eid, target_entry in targets:
out_dir = target_entry.get("output_dir", "")
if not out_dir or not os.path.isdir(out_dir):
continue
# 找到该 entry 对应的 main_*.md 文件
main_files = sorted(
f for f in os.listdir(out_dir)
if f.startswith("main_") and f.endswith(".md")
and target_eid[:8] in f # 按 executionId 短码匹配
)
if not main_files:
# 回退:取目录下所有 main 文件
main_files = sorted(
f for f in os.listdir(out_dir)
if f.startswith("main_") and f.endswith(".md")
)
if not main_files:
continue
content_parts = []
for mf in main_files:
try:
with open(os.path.join(out_dir, mf), "r", encoding="utf-8") as fh:
content_parts.append(fh.read())
except Exception:
continue
if not content_parts:
continue
content = "\n\n---\n\n".join(content_parts)
content = _extract_summary_content(content)
if len(content) > 60000:
content = content[:60000] + "\n\n[TRUNCATED]"
try:
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": _SUMMARY_SYSTEM_PROMPT},
{"role": "user", "content": f"请为以下单轮执行记录生成摘要:\n\n{content}"},
],
max_tokens=4096,
)
description = resp.choices[0].message.content.strip()
except Exception:
continue # 单条失败不影响其他
if not description:
continue
# 写入双索引(内存中)
entries[target_eid]["description"] = description
if target_eid in full_entries:
full_entries[target_eid]["description"] = description
generated += 1
# 批量保存
if generated > 0:
save_index(index)
save_full_index(full_index)
def main():
ensure_repo_root()
now = now_taipei()
force_rebuild = "--force-rebuild" in sys.argv
# 全量会话记录提取(无论是否有文件变更,每次对话都要记录)
try:
do_full_session_extract()
except Exception:
pass
# 步骤 1基于文件基线检测变更
real_files, external_files, diff_result, no_change = detect_changes_via_baseline()
# 无任何文件变更 → 跳过所有审查(除非 --force-rebuild
if no_change and not force_rebuild:
return
# --force-rebuild 且无变更时,仍需基于 git status 重建 context
if no_change and force_rebuild:
try:
compliance = do_compliance_prescan(real_files or [])
except Exception:
compliance = {}
try:
do_build_audit_context(real_files or [], diff_result, compliance)
except Exception:
pass
return
# 步骤 2合规预扫描基于本次对话变更的文件
compliance = {}
try:
compliance = do_compliance_prescan(real_files)
except Exception:
pass
# 步骤 4构建审计上下文
try:
do_build_audit_context(real_files, diff_result, compliance)
except Exception:
pass
# 步骤 7审计提醒信息性exit(0),不触发 agent 自行审计)
try:
do_audit_reminder(real_files)
except SystemExit:
pass # exit(0) 信息性退出,不需要 re-raise
except Exception:
pass
if __name__ == "__main__":
try:
main()
except SystemExit as e:
sys.exit(e.code)
except Exception:
pass