651 lines
23 KiB
Python
651 lines
23 KiB
Python
#!/usr/bin/env python3
|
||
"""agent_on_stop — agentStop 合并 hook 脚本(v3:含 LLM 摘要生成)。
|
||
|
||
合并原 audit_reminder + change_compliance_prescan + build_audit_context + session_extract:
|
||
1. 全量会话记录提取 → docs/audit/session_logs/(无论是否有代码变更)
|
||
2. 为刚提取的 session 调用百炼 API 生成 description → 写入双索引
|
||
3. 扫描工作区 → 与 promptSubmit 基线对比 → 精确检测本次对话变更
|
||
4. 若无任何文件变更 → 跳过审查,静默退出
|
||
5. 合规预扫描 → .kiro/state/.compliance_state.json
|
||
6. 构建审计上下文 → .kiro/state/.audit_context.json
|
||
7. 审计提醒(15 分钟限频)→ stderr
|
||
|
||
变更检测基于文件 mtime+size 基线对比,不依赖 git commit 历史。
|
||
所有功能块用 try/except 隔离,单个失败不影响其他。
|
||
"""
|
||
|
||
import hashlib
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
# 同目录导入文件基线模块 + cwd 校验
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
from file_baseline import scan_workspace, load_baseline, diff_baselines, total_changes
|
||
from _ensure_root import ensure_repo_root
|
||
|
||
TZ_TAIPEI = timezone(timedelta(hours=8))
|
||
MIN_INTERVAL = timedelta(minutes=15)
|
||
|
||
# 路径常量
|
||
STATE_PATH = os.path.join(".kiro", "state", ".audit_state.json")
|
||
COMPLIANCE_PATH = os.path.join(".kiro", "state", ".compliance_state.json")
|
||
CONTEXT_PATH = os.path.join(".kiro", "state", ".audit_context.json")
|
||
PROMPT_ID_PATH = os.path.join(".kiro", "state", ".last_prompt_id.json")
|
||
# 噪声路径(用于过滤变更列表中的非业务文件)
|
||
NOISE_PATTERNS = [
|
||
re.compile(r"^docs/audit/"),
|
||
re.compile(r"^\.kiro/"),
|
||
re.compile(r"^\.hypothesis/"),
|
||
re.compile(r"^tmp/"),
|
||
re.compile(r"\.png$"),
|
||
re.compile(r"\.jpg$"),
|
||
]
|
||
|
||
# 高风险路径
|
||
HIGH_RISK_PATTERNS = [
|
||
re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"),
|
||
re.compile(r"^apps/backend/app/"),
|
||
re.compile(r"^apps/admin-web/src/"),
|
||
re.compile(r"^apps/miniprogram/"),
|
||
re.compile(r"^packages/shared/"),
|
||
re.compile(r"^db/"),
|
||
]
|
||
|
||
# 文档映射(合规检查用)
|
||
DOC_MAP = {
|
||
"apps/backend/app/routers/": ["apps/backend/docs/API-REFERENCE.md", "docs/contracts/openapi/backend-api.json"],
|
||
"apps/backend/app/services/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"],
|
||
"apps/backend/app/auth/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md", "docs/contracts/openapi/backend-api.json"],
|
||
"apps/backend/app/schemas/": ["docs/contracts/openapi/backend-api.json"],
|
||
"apps/backend/app/main.py": ["docs/contracts/openapi/backend-api.json"],
|
||
"apps/etl/connectors/feiqiu/tasks/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
|
||
"apps/etl/connectors/feiqiu/loaders/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
|
||
"apps/etl/connectors/feiqiu/scd/": ["apps/etl/connectors/feiqiu/docs/business-rules/scd2_rules.md"],
|
||
"apps/etl/connectors/feiqiu/orchestration/": ["apps/etl/connectors/feiqiu/docs/architecture/"],
|
||
"apps/admin-web/src/": ["apps/admin-web/README.md"],
|
||
"apps/miniprogram/": ["apps/miniprogram/README.md"],
|
||
"packages/shared/": ["packages/shared/README.md"],
|
||
}
|
||
|
||
# 接口变更检测模式(routers / auth / schemas / main.py)
|
||
API_CHANGE_PATTERNS = [
|
||
re.compile(r"^apps/backend/app/routers/"),
|
||
re.compile(r"^apps/backend/app/auth/"),
|
||
re.compile(r"^apps/backend/app/schemas/"),
|
||
re.compile(r"^apps/backend/app/main\.py$"),
|
||
]
|
||
|
||
MIGRATION_PATTERNS = [
|
||
re.compile(r"^db/etl_feiqiu/migrations/.*\.sql$"),
|
||
re.compile(r"^db/zqyy_app/migrations/.*\.sql$"),
|
||
re.compile(r"^db/fdw/.*\.sql$"),
|
||
]
|
||
|
||
BD_MANUAL_PATTERN = re.compile(r"^docs/database/BD_Manual_.*\.md$")
|
||
DDL_BASELINE_DIR = "docs/database/ddl/"
|
||
AUDIT_CHANGES_DIR = "docs/audit/changes/"
|
||
|
||
|
||
def now_taipei():
|
||
return datetime.now(TZ_TAIPEI)
|
||
|
||
|
||
def sha1hex(s: str) -> str:
|
||
return hashlib.sha1(s.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def is_noise(f: str) -> bool:
|
||
return any(p.search(f) for p in NOISE_PATTERNS)
|
||
|
||
|
||
def safe_read_json(path):
|
||
if not os.path.isfile(path):
|
||
return {}
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def write_json(path, data):
|
||
os.makedirs(os.path.dirname(path) or os.path.join(".kiro", "state"), exist_ok=True)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
|
||
|
||
def git_diff_stat():
|
||
try:
|
||
r = subprocess.run(
|
||
["git", "diff", "--stat", "HEAD"],
|
||
capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=15
|
||
)
|
||
return r.stdout.strip() if r.returncode == 0 else ""
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def git_diff_files(files, max_total=30000, max_per_file=15000):
|
||
"""获取文件的实际 diff 内容。对已跟踪文件用 git diff HEAD,对新文件直接读取内容。"""
|
||
if not files:
|
||
return ""
|
||
all_diff = []
|
||
total_len = 0
|
||
for f in files:
|
||
if total_len >= max_total:
|
||
all_diff.append(f"\n[TRUNCATED: diff exceeds {max_total // 1000}KB]")
|
||
break
|
||
try:
|
||
# 先尝试 git diff HEAD
|
||
r = subprocess.run(
|
||
["git", "diff", "HEAD", "--", f],
|
||
capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=10
|
||
)
|
||
chunk = ""
|
||
if r.returncode == 0 and r.stdout.strip():
|
||
chunk = r.stdout.strip()
|
||
elif os.path.isfile(f):
|
||
# untracked 新文件:直接读取内容作为 diff
|
||
try:
|
||
with open(f, "r", encoding="utf-8", errors="replace") as fh:
|
||
file_content = fh.read(max_per_file + 100)
|
||
chunk = f"--- /dev/null\n+++ b/{f}\n@@ -0,0 +1 @@\n" + file_content
|
||
except Exception:
|
||
continue
|
||
|
||
if chunk:
|
||
if len(chunk) > max_per_file:
|
||
chunk = chunk[:max_per_file] + f"\n[TRUNCATED: {f} diff too long]"
|
||
all_diff.append(chunk)
|
||
total_len += len(chunk)
|
||
except Exception:
|
||
continue
|
||
return "\n".join(all_diff)
|
||
|
||
|
||
def get_latest_prompt_log():
|
||
log_dir = os.path.join("docs", "audit", "prompt_logs")
|
||
if not os.path.isdir(log_dir):
|
||
return ""
|
||
try:
|
||
files = sorted(
|
||
[f for f in os.listdir(log_dir) if f.startswith("prompt_log_")],
|
||
reverse=True
|
||
)
|
||
if not files:
|
||
return ""
|
||
with open(os.path.join(log_dir, files[0]), "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
return content[:3000] + "\n[TRUNCATED]" if len(content) > 3000 else content
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
# ── 步骤 1:基于文件基线检测变更 ──
|
||
def detect_changes_via_baseline():
|
||
"""扫描当前工作区,与 promptSubmit 基线对比,返回精确的变更列表。
|
||
|
||
返回 (all_changed_files, external_files, diff_result, no_change)
|
||
- all_changed_files: 本次对话期间所有变更文件(added + modified)
|
||
- external_files: 暂时等于 all_changed_files(后续可通过 Kiro 写入日志细化)
|
||
- diff_result: 完整的 diff 结果 {added, modified, deleted}
|
||
- no_change: 是否无任何变更
|
||
"""
|
||
before = load_baseline()
|
||
after = scan_workspace(".")
|
||
|
||
if not before:
|
||
# 没有基线(首次运行或基线丢失),无法对比,回退到全部文件
|
||
return [], [], {"added": [], "modified": [], "deleted": []}, True
|
||
|
||
diff = diff_baselines(before, after)
|
||
count = total_changes(diff)
|
||
|
||
if count == 0:
|
||
return [], [], diff, True
|
||
|
||
# 所有变更文件 = added + modified(deleted 的文件已不存在,不参与风险判定)
|
||
all_changed = sorted(set(diff["added"] + diff["modified"]))
|
||
|
||
# 过滤噪声
|
||
real_files = [f for f in all_changed if not is_noise(f)]
|
||
|
||
if not real_files:
|
||
return [], [], diff, True
|
||
|
||
# 外部变更:目前所有基线检测到的变更都记录,
|
||
# 因为 Kiro 的写入也会改变 mtime,所以这里的"外部"含义是
|
||
# "本次对话期间发生的所有变更",包括 Kiro 和非 Kiro 的。
|
||
# 精确区分需要 Kiro 运行时提供写入文件列表,目前不可用。
|
||
external_files = [] # 不再误报外部变更
|
||
|
||
return real_files, external_files, diff, False
|
||
|
||
|
||
|
||
|
||
|
||
|
||
# ── 步骤 3:合规预扫描 ──
|
||
def do_compliance_prescan(all_files):
|
||
result = {
|
||
"new_migration_sql": [],
|
||
"new_or_modified_sql": [],
|
||
"code_without_docs": [],
|
||
"new_files": [],
|
||
"has_bd_manual": False,
|
||
"has_audit_record": False,
|
||
"has_ddl_baseline": False,
|
||
"api_changed": False,
|
||
"openapi_spec_stale": False,
|
||
}
|
||
|
||
code_files = []
|
||
doc_files = set()
|
||
|
||
for f in all_files:
|
||
if is_noise(f):
|
||
continue
|
||
for mp in MIGRATION_PATTERNS:
|
||
if mp.search(f):
|
||
result["new_migration_sql"].append(f)
|
||
break
|
||
if f.endswith(".sql"):
|
||
result["new_or_modified_sql"].append(f)
|
||
if BD_MANUAL_PATTERN.search(f):
|
||
result["has_bd_manual"] = True
|
||
if f.startswith(AUDIT_CHANGES_DIR):
|
||
result["has_audit_record"] = True
|
||
if f.startswith(DDL_BASELINE_DIR):
|
||
result["has_ddl_baseline"] = True
|
||
if f.endswith(".md") or "/docs/" in f:
|
||
doc_files.add(f)
|
||
if f.endswith((".py", ".ts", ".tsx", ".js", ".jsx")):
|
||
code_files.append(f)
|
||
# 检测接口相关文件变更
|
||
for ap in API_CHANGE_PATTERNS:
|
||
if ap.search(f):
|
||
result["api_changed"] = True
|
||
break
|
||
|
||
# 接口变更但 openapi spec 未同步更新 → 标记过期
|
||
if result["api_changed"] and "docs/contracts/openapi/backend-api.json" not in all_files:
|
||
result["openapi_spec_stale"] = True
|
||
|
||
for cf in code_files:
|
||
expected_docs = []
|
||
for prefix, docs in DOC_MAP.items():
|
||
if cf.startswith(prefix):
|
||
expected_docs.extend(docs)
|
||
if expected_docs:
|
||
has_doc = False
|
||
for ed in expected_docs:
|
||
if ed in doc_files:
|
||
has_doc = True
|
||
break
|
||
if ed.endswith("/") and any(d.startswith(ed) for d in doc_files):
|
||
has_doc = True
|
||
break
|
||
if not has_doc:
|
||
result["code_without_docs"].append({
|
||
"file": cf,
|
||
"expected_docs": expected_docs,
|
||
})
|
||
|
||
needs_check = bool(
|
||
result["new_migration_sql"]
|
||
or result["code_without_docs"]
|
||
or result["openapi_spec_stale"]
|
||
)
|
||
|
||
now = now_taipei()
|
||
write_json(COMPLIANCE_PATH, {
|
||
"needs_check": needs_check,
|
||
"scanned_at": now.isoformat(),
|
||
**result,
|
||
})
|
||
return result
|
||
|
||
|
||
# ── 步骤 4:构建审计上下文 ──
|
||
def do_build_audit_context(all_files, diff_result, compliance):
|
||
now = now_taipei()
|
||
audit_state = safe_read_json(STATE_PATH)
|
||
prompt_info = safe_read_json(PROMPT_ID_PATH)
|
||
|
||
# 使用 audit_state 中的 changed_files(来自 git status 的风险文件)
|
||
# 与本次对话的 baseline diff 合并
|
||
git_changed = audit_state.get("changed_files", [])
|
||
session_changed = all_files # 本次对话期间变更的文件
|
||
|
||
# 合并两个来源,去重
|
||
all_changed = sorted(set(git_changed + session_changed))
|
||
|
||
high_risk_files = [
|
||
f for f in all_changed
|
||
if any(p.search(f) for p in HIGH_RISK_PATTERNS)
|
||
]
|
||
|
||
diff_stat = git_diff_stat()
|
||
high_risk_diff = git_diff_files(high_risk_files)
|
||
prompt_log = get_latest_prompt_log()
|
||
|
||
context = {
|
||
"built_at": now.isoformat(),
|
||
"prompt_id": prompt_info.get("prompt_id", "unknown"),
|
||
"prompt_at": prompt_info.get("at", ""),
|
||
"audit_required": audit_state.get("audit_required", False),
|
||
"db_docs_required": audit_state.get("db_docs_required", False),
|
||
"reasons": audit_state.get("reasons", []),
|
||
"changed_files": all_changed[:100],
|
||
"high_risk_files": high_risk_files,
|
||
"session_diff": {
|
||
"added": diff_result.get("added", [])[:50],
|
||
"modified": diff_result.get("modified", [])[:50],
|
||
"deleted": diff_result.get("deleted", [])[:50],
|
||
},
|
||
"compliance": {
|
||
"code_without_docs": compliance.get("code_without_docs", []),
|
||
"new_migration_sql": compliance.get("new_migration_sql", []),
|
||
"has_bd_manual": compliance.get("has_bd_manual", False),
|
||
"has_audit_record": compliance.get("has_audit_record", False),
|
||
"has_ddl_baseline": compliance.get("has_ddl_baseline", False),
|
||
"api_changed": compliance.get("api_changed", False),
|
||
"openapi_spec_stale": compliance.get("openapi_spec_stale", False),
|
||
},
|
||
"diff_stat": diff_stat,
|
||
"high_risk_diff": high_risk_diff,
|
||
"latest_prompt_log": prompt_log,
|
||
}
|
||
|
||
write_json(CONTEXT_PATH, context)
|
||
|
||
|
||
# ── 步骤 5:审计提醒(15 分钟限频) ──
|
||
def do_audit_reminder(real_files):
|
||
state = safe_read_json(STATE_PATH)
|
||
if not state.get("audit_required"):
|
||
return
|
||
|
||
# 无变更时不提醒
|
||
if not real_files:
|
||
return
|
||
|
||
now = now_taipei()
|
||
last_str = state.get("last_reminded_at")
|
||
if last_str:
|
||
try:
|
||
last = datetime.fromisoformat(last_str)
|
||
if (now - last) < MIN_INTERVAL:
|
||
return
|
||
except Exception:
|
||
pass
|
||
|
||
state["last_reminded_at"] = now.isoformat()
|
||
write_json(STATE_PATH, state)
|
||
|
||
reasons = state.get("reasons", [])
|
||
reason_text = ", ".join(reasons) if reasons else "high-risk paths changed"
|
||
|
||
# 仅信息性提醒,exit(0) 避免 agent 将其视为错误并自行执行审计
|
||
# 审计留痕统一由用户手动触发 /audit 完成
|
||
sys.stderr.write(
|
||
f"[AUDIT REMINDER] Pending audit ({reason_text}), "
|
||
f"{len(real_files)} files changed this session. "
|
||
f"Run /audit to sync. (15min rate limit)\n"
|
||
)
|
||
sys.exit(0)
|
||
|
||
|
||
# ── 步骤 6:全量会话记录提取 ──
|
||
def do_full_session_extract():
|
||
"""从 Kiro globalStorage 提取当前 execution 的全量对话记录。
|
||
调用 scripts/ops/extract_kiro_session.py 的核心逻辑。
|
||
仅提取最新一条未索引的 execution,避免重复。
|
||
"""
|
||
# 动态导入提取器(避免启动时 import 开销)
|
||
scripts_ops = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "scripts", "ops")
|
||
scripts_ops = os.path.normpath(scripts_ops)
|
||
if scripts_ops not in sys.path:
|
||
sys.path.insert(0, scripts_ops)
|
||
|
||
try:
|
||
from extract_kiro_session import extract_latest
|
||
except ImportError:
|
||
return # 提取器不存在则静默跳过
|
||
|
||
# globalStorage 路径:从环境变量或默认位置
|
||
global_storage = os.environ.get(
|
||
"KIRO_GLOBAL_STORAGE",
|
||
os.path.join(os.environ.get("APPDATA", ""), "Kiro", "User", "globalStorage")
|
||
)
|
||
workspace_path = os.getcwd()
|
||
|
||
extract_latest(global_storage, workspace_path)
|
||
|
||
|
||
def _extract_summary_content(md_content: str) -> str:
|
||
"""从 session log markdown 中提取适合生成摘要的内容。
|
||
|
||
策略:如果"用户输入"包含 CONTEXT TRANSFER(跨轮续接),
|
||
则替换为简短标注,避免历史背景干扰本轮摘要生成。
|
||
"""
|
||
import re
|
||
# 检测用户输入中是否包含 context transfer
|
||
ct_pattern = re.compile(r"## 2\. 用户输入\s*\n```\s*\n.*?CONTEXT TRANSFER", re.DOTALL)
|
||
if ct_pattern.search(md_content):
|
||
# 替换"用户输入"section 为简短标注
|
||
# 匹配从 "## 2. 用户输入" 到下一个 "## 3." 之间的内容
|
||
md_content = re.sub(
|
||
r"(## 2\. 用户输入)\s*\n```[\s\S]*?```\s*\n(?=## 3\.)",
|
||
r"\1\n\n[本轮为 Context Transfer 续接,用户输入为历史多轮摘要,已省略。请基于执行摘要和对话记录中的实际工具调用判断本轮工作。]\n\n",
|
||
md_content,
|
||
)
|
||
return md_content
|
||
|
||
|
||
# ── 步骤 7:为最新 session 生成 LLM 摘要 ──
|
||
_SUMMARY_SYSTEM_PROMPT = """你是一个专业的技术对话分析师。你的任务是为 AI 编程助手的一轮执行(execution)生成简洁的中文摘要。
|
||
|
||
背景:一个对话(chatSession)包含多轮执行(execution)。每轮执行 = 用户发一条消息 → AI 完成响应。你收到的是单轮执行的完整记录。
|
||
|
||
摘要规则:
|
||
1. 只描述本轮执行实际完成的工作,不要描述历史背景
|
||
2. 列出完成的功能点/任务(一轮可能完成多个)
|
||
3. 包含关键技术细节:文件路径、模块名、数据库表、API 端点等
|
||
4. bug 修复要说明原因和方案
|
||
5. 不写过程性描述("用户说..."),只写结果
|
||
6. 内容太短或无实质内容的,写"无实质内容"
|
||
7. 不限字数,信息完整优先,避免截断失真
|
||
|
||
重要:
|
||
- "执行摘要"(📋)是最可靠的信息源,优先基于它判断本轮做了什么
|
||
- 如果"用户输入"包含 CONTEXT TRANSFER,那是之前多轮的历史摘要,不是本轮工作
|
||
- 对话记录中的实际工具调用和文件变更才是本轮的真实操作
|
||
|
||
请直接输出摘要,不要添加任何前缀或解释。"""
|
||
|
||
|
||
def do_generate_description():
|
||
"""为缺少 description 的主对话 entry 调用百炼 API 生成摘要,写入双索引。"""
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
|
||
api_key = os.environ.get("BAILIAN_API_KEY", "")
|
||
if not api_key:
|
||
return
|
||
|
||
model = os.environ.get("BAILIAN_MODEL", "qwen-plus")
|
||
base_url = os.environ.get("BAILIAN_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1")
|
||
|
||
scripts_ops = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "scripts", "ops")
|
||
scripts_ops = os.path.normpath(scripts_ops)
|
||
if scripts_ops not in sys.path:
|
||
sys.path.insert(0, scripts_ops)
|
||
|
||
try:
|
||
from extract_kiro_session import load_index, save_index, load_full_index, save_full_index
|
||
except ImportError:
|
||
return
|
||
|
||
index = load_index()
|
||
entries = index.get("entries", {})
|
||
if not entries:
|
||
return
|
||
|
||
# 收集所有缺少 description 的主对话 entry
|
||
targets = []
|
||
for eid, ent in entries.items():
|
||
if ent.get("is_sub"):
|
||
continue
|
||
if not ent.get("description"):
|
||
targets.append((eid, ent))
|
||
|
||
if not targets:
|
||
return
|
||
|
||
# agent_on_stop 场景下限制处理数量,避免超时
|
||
# 批量处理积压用独立脚本 batch_generate_summaries.py
|
||
MAX_PER_RUN = 10
|
||
if len(targets) > MAX_PER_RUN:
|
||
# 优先处理最新的(按 startTime 降序)
|
||
targets.sort(key=lambda t: t[1].get("startTime", ""), reverse=True)
|
||
targets = targets[:MAX_PER_RUN]
|
||
|
||
try:
|
||
from openai import OpenAI
|
||
client = OpenAI(api_key=api_key, base_url=base_url)
|
||
except Exception:
|
||
return
|
||
|
||
full_index = load_full_index()
|
||
full_entries = full_index.get("entries", {})
|
||
generated = 0
|
||
|
||
for target_eid, target_entry in targets:
|
||
out_dir = target_entry.get("output_dir", "")
|
||
if not out_dir or not os.path.isdir(out_dir):
|
||
continue
|
||
|
||
# 找到该 entry 对应的 main_*.md 文件
|
||
main_files = sorted(
|
||
f for f in os.listdir(out_dir)
|
||
if f.startswith("main_") and f.endswith(".md")
|
||
and target_eid[:8] in f # 按 executionId 短码匹配
|
||
)
|
||
if not main_files:
|
||
# 回退:取目录下所有 main 文件
|
||
main_files = sorted(
|
||
f for f in os.listdir(out_dir)
|
||
if f.startswith("main_") and f.endswith(".md")
|
||
)
|
||
if not main_files:
|
||
continue
|
||
|
||
content_parts = []
|
||
for mf in main_files:
|
||
try:
|
||
with open(os.path.join(out_dir, mf), "r", encoding="utf-8") as fh:
|
||
content_parts.append(fh.read())
|
||
except Exception:
|
||
continue
|
||
if not content_parts:
|
||
continue
|
||
|
||
content = "\n\n---\n\n".join(content_parts)
|
||
content = _extract_summary_content(content)
|
||
if len(content) > 60000:
|
||
content = content[:60000] + "\n\n[TRUNCATED]"
|
||
|
||
try:
|
||
resp = client.chat.completions.create(
|
||
model=model,
|
||
messages=[
|
||
{"role": "system", "content": _SUMMARY_SYSTEM_PROMPT},
|
||
{"role": "user", "content": f"请为以下单轮执行记录生成摘要:\n\n{content}"},
|
||
],
|
||
max_tokens=4096,
|
||
)
|
||
description = resp.choices[0].message.content.strip()
|
||
except Exception:
|
||
continue # 单条失败不影响其他
|
||
|
||
if not description:
|
||
continue
|
||
|
||
# 写入双索引(内存中)
|
||
entries[target_eid]["description"] = description
|
||
if target_eid in full_entries:
|
||
full_entries[target_eid]["description"] = description
|
||
generated += 1
|
||
|
||
# 批量保存
|
||
if generated > 0:
|
||
save_index(index)
|
||
save_full_index(full_index)
|
||
|
||
|
||
def main():
|
||
ensure_repo_root()
|
||
now = now_taipei()
|
||
force_rebuild = "--force-rebuild" in sys.argv
|
||
|
||
# 全量会话记录提取(无论是否有文件变更,每次对话都要记录)
|
||
try:
|
||
do_full_session_extract()
|
||
except Exception:
|
||
pass
|
||
|
||
# 步骤 1:基于文件基线检测变更
|
||
real_files, external_files, diff_result, no_change = detect_changes_via_baseline()
|
||
|
||
# 无任何文件变更 → 跳过所有审查(除非 --force-rebuild)
|
||
if no_change and not force_rebuild:
|
||
return
|
||
|
||
# --force-rebuild 且无变更时,仍需基于 git status 重建 context
|
||
if no_change and force_rebuild:
|
||
try:
|
||
compliance = do_compliance_prescan(real_files or [])
|
||
except Exception:
|
||
compliance = {}
|
||
try:
|
||
do_build_audit_context(real_files or [], diff_result, compliance)
|
||
except Exception:
|
||
pass
|
||
return
|
||
|
||
# 步骤 2:合规预扫描(基于本次对话变更的文件)
|
||
compliance = {}
|
||
try:
|
||
compliance = do_compliance_prescan(real_files)
|
||
except Exception:
|
||
pass
|
||
|
||
# 步骤 4:构建审计上下文
|
||
try:
|
||
do_build_audit_context(real_files, diff_result, compliance)
|
||
except Exception:
|
||
pass
|
||
|
||
# 步骤 7:审计提醒(信息性,exit(0),不触发 agent 自行审计)
|
||
try:
|
||
do_audit_reminder(real_files)
|
||
except SystemExit:
|
||
pass # exit(0) 信息性退出,不需要 re-raise
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except SystemExit as e:
|
||
sys.exit(e.code)
|
||
except Exception:
|
||
pass
|