#!/usr/bin/env python3 """agent_on_stop — agentStop 合并 hook 脚本(v3:含 LLM 摘要生成)。 合并原 audit_reminder + change_compliance_prescan + build_audit_context + session_extract: 1. 全量会话记录提取 → docs/audit/session_logs/(无论是否有代码变更) 2. 为刚提取的 session 调用百炼 API 生成 description → 写入双索引 3. 扫描工作区 → 与 promptSubmit 基线对比 → 精确检测本次对话变更 4. 若无任何文件变更 → 跳过审查,静默退出 5. 合规预扫描 → .kiro/state/.compliance_state.json 6. 构建审计上下文 → .kiro/state/.audit_context.json 7. 审计提醒(15 分钟限频)→ stderr 变更检测基于文件 mtime+size 基线对比,不依赖 git commit 历史。 所有功能块用 try/except 隔离,单个失败不影响其他。 """ import hashlib import json import os import re import subprocess import sys from datetime import datetime, timezone, timedelta # 同目录导入文件基线模块 + cwd 校验 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from file_baseline import scan_workspace, load_baseline, diff_baselines, total_changes from _ensure_root import ensure_repo_root TZ_TAIPEI = timezone(timedelta(hours=8)) MIN_INTERVAL = timedelta(minutes=15) # 路径常量 STATE_PATH = os.path.join(".kiro", "state", ".audit_state.json") COMPLIANCE_PATH = os.path.join(".kiro", "state", ".compliance_state.json") CONTEXT_PATH = os.path.join(".kiro", "state", ".audit_context.json") PROMPT_ID_PATH = os.path.join(".kiro", "state", ".last_prompt_id.json") # 噪声路径(用于过滤变更列表中的非业务文件) NOISE_PATTERNS = [ re.compile(r"^docs/audit/"), re.compile(r"^\.kiro/"), re.compile(r"^\.hypothesis/"), re.compile(r"^tmp/"), re.compile(r"\.png$"), re.compile(r"\.jpg$"), ] # 高风险路径 HIGH_RISK_PATTERNS = [ re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"), re.compile(r"^apps/backend/app/"), re.compile(r"^apps/admin-web/src/"), re.compile(r"^apps/miniprogram/"), re.compile(r"^packages/shared/"), re.compile(r"^db/"), ] # 文档映射(合规检查用) DOC_MAP = { "apps/backend/app/routers/": ["apps/backend/docs/API-REFERENCE.md", "docs/contracts/openapi/backend-api.json"], "apps/backend/app/services/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"], "apps/backend/app/auth/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md", "docs/contracts/openapi/backend-api.json"], "apps/backend/app/schemas/": ["docs/contracts/openapi/backend-api.json"], "apps/backend/app/main.py": ["docs/contracts/openapi/backend-api.json"], "apps/etl/connectors/feiqiu/tasks/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"], "apps/etl/connectors/feiqiu/loaders/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"], "apps/etl/connectors/feiqiu/scd/": ["apps/etl/connectors/feiqiu/docs/business-rules/scd2_rules.md"], "apps/etl/connectors/feiqiu/orchestration/": ["apps/etl/connectors/feiqiu/docs/architecture/"], "apps/admin-web/src/": ["apps/admin-web/README.md"], "apps/miniprogram/": ["apps/miniprogram/README.md"], "packages/shared/": ["packages/shared/README.md"], } # 接口变更检测模式(routers / auth / schemas / main.py) API_CHANGE_PATTERNS = [ re.compile(r"^apps/backend/app/routers/"), re.compile(r"^apps/backend/app/auth/"), re.compile(r"^apps/backend/app/schemas/"), re.compile(r"^apps/backend/app/main\.py$"), ] MIGRATION_PATTERNS = [ re.compile(r"^db/etl_feiqiu/migrations/.*\.sql$"), re.compile(r"^db/zqyy_app/migrations/.*\.sql$"), re.compile(r"^db/fdw/.*\.sql$"), ] BD_MANUAL_PATTERN = re.compile(r"^docs/database/BD_Manual_.*\.md$") DDL_BASELINE_DIR = "docs/database/ddl/" AUDIT_CHANGES_DIR = "docs/audit/changes/" def now_taipei(): return datetime.now(TZ_TAIPEI) def sha1hex(s: str) -> str: return hashlib.sha1(s.encode("utf-8")).hexdigest() def is_noise(f: str) -> bool: return any(p.search(f) for p in NOISE_PATTERNS) def safe_read_json(path): if not os.path.isfile(path): return {} try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {} def write_json(path, data): os.makedirs(os.path.dirname(path) or os.path.join(".kiro", "state"), exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) def git_diff_stat(): try: r = subprocess.run( ["git", "diff", "--stat", "HEAD"], capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=15 ) return r.stdout.strip() if r.returncode == 0 else "" except Exception: return "" def git_diff_files(files, max_total=30000, max_per_file=15000): """获取文件的实际 diff 内容。对已跟踪文件用 git diff HEAD,对新文件直接读取内容。""" if not files: return "" all_diff = [] total_len = 0 for f in files: if total_len >= max_total: all_diff.append(f"\n[TRUNCATED: diff exceeds {max_total // 1000}KB]") break try: # 先尝试 git diff HEAD r = subprocess.run( ["git", "diff", "HEAD", "--", f], capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=10 ) chunk = "" if r.returncode == 0 and r.stdout.strip(): chunk = r.stdout.strip() elif os.path.isfile(f): # untracked 新文件:直接读取内容作为 diff try: with open(f, "r", encoding="utf-8", errors="replace") as fh: file_content = fh.read(max_per_file + 100) chunk = f"--- /dev/null\n+++ b/{f}\n@@ -0,0 +1 @@\n" + file_content except Exception: continue if chunk: if len(chunk) > max_per_file: chunk = chunk[:max_per_file] + f"\n[TRUNCATED: {f} diff too long]" all_diff.append(chunk) total_len += len(chunk) except Exception: continue return "\n".join(all_diff) def get_latest_prompt_log(): log_dir = os.path.join("docs", "audit", "prompt_logs") if not os.path.isdir(log_dir): return "" try: files = sorted( [f for f in os.listdir(log_dir) if f.startswith("prompt_log_")], reverse=True ) if not files: return "" with open(os.path.join(log_dir, files[0]), "r", encoding="utf-8") as f: content = f.read() return content[:3000] + "\n[TRUNCATED]" if len(content) > 3000 else content except Exception: return "" # ── 步骤 1:基于文件基线检测变更 ── def detect_changes_via_baseline(): """扫描当前工作区,与 promptSubmit 基线对比,返回精确的变更列表。 返回 (all_changed_files, external_files, diff_result, no_change) - all_changed_files: 本次对话期间所有变更文件(added + modified) - external_files: 暂时等于 all_changed_files(后续可通过 Kiro 写入日志细化) - diff_result: 完整的 diff 结果 {added, modified, deleted} - no_change: 是否无任何变更 """ before = load_baseline() after = scan_workspace(".") if not before: # 没有基线(首次运行或基线丢失),无法对比,回退到全部文件 return [], [], {"added": [], "modified": [], "deleted": []}, True diff = diff_baselines(before, after) count = total_changes(diff) if count == 0: return [], [], diff, True # 所有变更文件 = added + modified(deleted 的文件已不存在,不参与风险判定) all_changed = sorted(set(diff["added"] + diff["modified"])) # 过滤噪声 real_files = [f for f in all_changed if not is_noise(f)] if not real_files: return [], [], diff, True # 外部变更:目前所有基线检测到的变更都记录, # 因为 Kiro 的写入也会改变 mtime,所以这里的"外部"含义是 # "本次对话期间发生的所有变更",包括 Kiro 和非 Kiro 的。 # 精确区分需要 Kiro 运行时提供写入文件列表,目前不可用。 external_files = [] # 不再误报外部变更 return real_files, external_files, diff, False # ── 步骤 3:合规预扫描 ── def do_compliance_prescan(all_files): result = { "new_migration_sql": [], "new_or_modified_sql": [], "code_without_docs": [], "new_files": [], "has_bd_manual": False, "has_audit_record": False, "has_ddl_baseline": False, "api_changed": False, "openapi_spec_stale": False, } code_files = [] doc_files = set() for f in all_files: if is_noise(f): continue for mp in MIGRATION_PATTERNS: if mp.search(f): result["new_migration_sql"].append(f) break if f.endswith(".sql"): result["new_or_modified_sql"].append(f) if BD_MANUAL_PATTERN.search(f): result["has_bd_manual"] = True if f.startswith(AUDIT_CHANGES_DIR): result["has_audit_record"] = True if f.startswith(DDL_BASELINE_DIR): result["has_ddl_baseline"] = True if f.endswith(".md") or "/docs/" in f: doc_files.add(f) if f.endswith((".py", ".ts", ".tsx", ".js", ".jsx")): code_files.append(f) # 检测接口相关文件变更 for ap in API_CHANGE_PATTERNS: if ap.search(f): result["api_changed"] = True break # 接口变更但 openapi spec 未同步更新 → 标记过期 if result["api_changed"] and "docs/contracts/openapi/backend-api.json" not in all_files: result["openapi_spec_stale"] = True for cf in code_files: expected_docs = [] for prefix, docs in DOC_MAP.items(): if cf.startswith(prefix): expected_docs.extend(docs) if expected_docs: has_doc = False for ed in expected_docs: if ed in doc_files: has_doc = True break if ed.endswith("/") and any(d.startswith(ed) for d in doc_files): has_doc = True break if not has_doc: result["code_without_docs"].append({ "file": cf, "expected_docs": expected_docs, }) needs_check = bool( result["new_migration_sql"] or result["code_without_docs"] or result["openapi_spec_stale"] ) now = now_taipei() write_json(COMPLIANCE_PATH, { "needs_check": needs_check, "scanned_at": now.isoformat(), **result, }) return result # ── 步骤 4:构建审计上下文 ── def do_build_audit_context(all_files, diff_result, compliance): now = now_taipei() audit_state = safe_read_json(STATE_PATH) prompt_info = safe_read_json(PROMPT_ID_PATH) # 使用 audit_state 中的 changed_files(来自 git status 的风险文件) # 与本次对话的 baseline diff 合并 git_changed = audit_state.get("changed_files", []) session_changed = all_files # 本次对话期间变更的文件 # 合并两个来源,去重 all_changed = sorted(set(git_changed + session_changed)) high_risk_files = [ f for f in all_changed if any(p.search(f) for p in HIGH_RISK_PATTERNS) ] diff_stat = git_diff_stat() high_risk_diff = git_diff_files(high_risk_files) prompt_log = get_latest_prompt_log() context = { "built_at": now.isoformat(), "prompt_id": prompt_info.get("prompt_id", "unknown"), "prompt_at": prompt_info.get("at", ""), "audit_required": audit_state.get("audit_required", False), "db_docs_required": audit_state.get("db_docs_required", False), "reasons": audit_state.get("reasons", []), "changed_files": all_changed[:100], "high_risk_files": high_risk_files, "session_diff": { "added": diff_result.get("added", [])[:50], "modified": diff_result.get("modified", [])[:50], "deleted": diff_result.get("deleted", [])[:50], }, "compliance": { "code_without_docs": compliance.get("code_without_docs", []), "new_migration_sql": compliance.get("new_migration_sql", []), "has_bd_manual": compliance.get("has_bd_manual", False), "has_audit_record": compliance.get("has_audit_record", False), "has_ddl_baseline": compliance.get("has_ddl_baseline", False), "api_changed": compliance.get("api_changed", False), "openapi_spec_stale": compliance.get("openapi_spec_stale", False), }, "diff_stat": diff_stat, "high_risk_diff": high_risk_diff, "latest_prompt_log": prompt_log, } write_json(CONTEXT_PATH, context) # ── 步骤 5:审计提醒(15 分钟限频) ── def do_audit_reminder(real_files): state = safe_read_json(STATE_PATH) if not state.get("audit_required"): return # 无变更时不提醒 if not real_files: return now = now_taipei() last_str = state.get("last_reminded_at") if last_str: try: last = datetime.fromisoformat(last_str) if (now - last) < MIN_INTERVAL: return except Exception: pass state["last_reminded_at"] = now.isoformat() write_json(STATE_PATH, state) reasons = state.get("reasons", []) reason_text = ", ".join(reasons) if reasons else "high-risk paths changed" # 仅信息性提醒,exit(0) 避免 agent 将其视为错误并自行执行审计 # 审计留痕统一由用户手动触发 /audit 完成 sys.stderr.write( f"[AUDIT REMINDER] Pending audit ({reason_text}), " f"{len(real_files)} files changed this session. " f"Run /audit to sync. (15min rate limit)\n" ) sys.exit(0) # ── 步骤 6:全量会话记录提取 ── def do_full_session_extract(): """从 Kiro globalStorage 提取当前 execution 的全量对话记录。 调用 scripts/ops/extract_kiro_session.py 的核心逻辑。 仅提取最新一条未索引的 execution,避免重复。 """ # 动态导入提取器(避免启动时 import 开销) scripts_ops = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "scripts", "ops") scripts_ops = os.path.normpath(scripts_ops) if scripts_ops not in sys.path: sys.path.insert(0, scripts_ops) try: from extract_kiro_session import extract_latest except ImportError: return # 提取器不存在则静默跳过 # globalStorage 路径:从环境变量或默认位置 global_storage = os.environ.get( "KIRO_GLOBAL_STORAGE", os.path.join(os.environ.get("APPDATA", ""), "Kiro", "User", "globalStorage") ) workspace_path = os.getcwd() extract_latest(global_storage, workspace_path) def _extract_summary_content(md_content: str) -> str: """从 session log markdown 中提取适合生成摘要的内容。 策略:如果"用户输入"包含 CONTEXT TRANSFER(跨轮续接), 则替换为简短标注,避免历史背景干扰本轮摘要生成。 """ import re # 检测用户输入中是否包含 context transfer ct_pattern = re.compile(r"## 2\. 用户输入\s*\n```\s*\n.*?CONTEXT TRANSFER", re.DOTALL) if ct_pattern.search(md_content): # 替换"用户输入"section 为简短标注 # 匹配从 "## 2. 用户输入" 到下一个 "## 3." 之间的内容 md_content = re.sub( r"(## 2\. 用户输入)\s*\n```[\s\S]*?```\s*\n(?=## 3\.)", r"\1\n\n[本轮为 Context Transfer 续接,用户输入为历史多轮摘要,已省略。请基于执行摘要和对话记录中的实际工具调用判断本轮工作。]\n\n", md_content, ) return md_content # ── 步骤 7:为最新 session 生成 LLM 摘要 ── _SUMMARY_SYSTEM_PROMPT = """你是一个专业的技术对话分析师。你的任务是为 AI 编程助手的一轮执行(execution)生成简洁的中文摘要。 背景:一个对话(chatSession)包含多轮执行(execution)。每轮执行 = 用户发一条消息 → AI 完成响应。你收到的是单轮执行的完整记录。 摘要规则: 1. 只描述本轮执行实际完成的工作,不要描述历史背景 2. 列出完成的功能点/任务(一轮可能完成多个) 3. 包含关键技术细节:文件路径、模块名、数据库表、API 端点等 4. bug 修复要说明原因和方案 5. 不写过程性描述("用户说..."),只写结果 6. 内容太短或无实质内容的,写"无实质内容" 7. 不限字数,信息完整优先,避免截断失真 重要: - "执行摘要"(📋)是最可靠的信息源,优先基于它判断本轮做了什么 - 如果"用户输入"包含 CONTEXT TRANSFER,那是之前多轮的历史摘要,不是本轮工作 - 对话记录中的实际工具调用和文件变更才是本轮的真实操作 请直接输出摘要,不要添加任何前缀或解释。""" def do_generate_description(): """为缺少 description 的主对话 entry 调用百炼 API 生成摘要,写入双索引。""" from dotenv import load_dotenv load_dotenv() api_key = os.environ.get("BAILIAN_API_KEY", "") if not api_key: return model = os.environ.get("BAILIAN_MODEL", "qwen-plus") base_url = os.environ.get("BAILIAN_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1") scripts_ops = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "scripts", "ops") scripts_ops = os.path.normpath(scripts_ops) if scripts_ops not in sys.path: sys.path.insert(0, scripts_ops) try: from extract_kiro_session import load_index, save_index, load_full_index, save_full_index except ImportError: return index = load_index() entries = index.get("entries", {}) if not entries: return # 收集所有缺少 description 的主对话 entry targets = [] for eid, ent in entries.items(): if ent.get("is_sub"): continue if not ent.get("description"): targets.append((eid, ent)) if not targets: return # agent_on_stop 场景下限制处理数量,避免超时 # 批量处理积压用独立脚本 batch_generate_summaries.py MAX_PER_RUN = 10 if len(targets) > MAX_PER_RUN: # 优先处理最新的(按 startTime 降序) targets.sort(key=lambda t: t[1].get("startTime", ""), reverse=True) targets = targets[:MAX_PER_RUN] try: from openai import OpenAI client = OpenAI(api_key=api_key, base_url=base_url) except Exception: return full_index = load_full_index() full_entries = full_index.get("entries", {}) generated = 0 for target_eid, target_entry in targets: out_dir = target_entry.get("output_dir", "") if not out_dir or not os.path.isdir(out_dir): continue # 找到该 entry 对应的 main_*.md 文件 main_files = sorted( f for f in os.listdir(out_dir) if f.startswith("main_") and f.endswith(".md") and target_eid[:8] in f # 按 executionId 短码匹配 ) if not main_files: # 回退:取目录下所有 main 文件 main_files = sorted( f for f in os.listdir(out_dir) if f.startswith("main_") and f.endswith(".md") ) if not main_files: continue content_parts = [] for mf in main_files: try: with open(os.path.join(out_dir, mf), "r", encoding="utf-8") as fh: content_parts.append(fh.read()) except Exception: continue if not content_parts: continue content = "\n\n---\n\n".join(content_parts) content = _extract_summary_content(content) if len(content) > 60000: content = content[:60000] + "\n\n[TRUNCATED]" try: resp = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": _SUMMARY_SYSTEM_PROMPT}, {"role": "user", "content": f"请为以下单轮执行记录生成摘要:\n\n{content}"}, ], max_tokens=4096, ) description = resp.choices[0].message.content.strip() except Exception: continue # 单条失败不影响其他 if not description: continue # 写入双索引(内存中) entries[target_eid]["description"] = description if target_eid in full_entries: full_entries[target_eid]["description"] = description generated += 1 # 批量保存 if generated > 0: save_index(index) save_full_index(full_index) def main(): ensure_repo_root() now = now_taipei() force_rebuild = "--force-rebuild" in sys.argv # 全量会话记录提取(无论是否有文件变更,每次对话都要记录) try: do_full_session_extract() except Exception: pass # 步骤 1:基于文件基线检测变更 real_files, external_files, diff_result, no_change = detect_changes_via_baseline() # 无任何文件变更 → 跳过所有审查(除非 --force-rebuild) if no_change and not force_rebuild: return # --force-rebuild 且无变更时,仍需基于 git status 重建 context if no_change and force_rebuild: try: compliance = do_compliance_prescan(real_files or []) except Exception: compliance = {} try: do_build_audit_context(real_files or [], diff_result, compliance) except Exception: pass return # 步骤 2:合规预扫描(基于本次对话变更的文件) compliance = {} try: compliance = do_compliance_prescan(real_files) except Exception: pass # 步骤 4:构建审计上下文 try: do_build_audit_context(real_files, diff_result, compliance) except Exception: pass # 步骤 7:审计提醒(信息性,exit(0),不触发 agent 自行审计) try: do_audit_reminder(real_files) except SystemExit: pass # exit(0) 信息性退出,不需要 re-raise except Exception: pass if __name__ == "__main__": try: main() except SystemExit as e: sys.exit(e.code) except Exception: pass