Files
Neo-ZQYY/scripts/ops/parse_kiro_log_final_v2.py

424 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Kiro Agent 执行日志全量解析器 - 最终版 v2
修正 Hash 规律分析,基于实际验证结果。
"""
import json
from pathlib import Path
from datetime import datetime, timezone, timedelta
# === 路径 ===
BASE = Path(r"C:\temp\FX\globalStorage\kiro.kiroagent")
EXEC_ID_DIR = "4d8f681b2b78799de676ab36904d08c7"
ACTION_LOG_ID = "414d1636299d2b9e4ce7e17fb11f63e9"
LOG_FILE_HASH = "794adc0617d71b2231c20ac9d101c7a6"
DIFF_PARENT_HASH = "74a08cf8613c7dec4db7b264470db812"
LOG_PATH = BASE / EXEC_ID_DIR / ACTION_LOG_ID / LOG_FILE_HASH
DIFF_DIR = BASE / EXEC_ID_DIR / DIFF_PARENT_HASH
SESSION_DIR = BASE / "workspace-sessions" / "ZDpcVXNlcnNcTmVvXGRlc2t0b3BcVA__"
SESSION_JSON = SESSION_DIR / "dba26892-8e58-447f-ad82-bc9459a51753.json"
OUT_MD = Path(r"C:\temp\FX\kiro_full_session_record.md")
CST = timezone(timedelta(hours=8))
def ts(ms):
if not ms: return "N/A"
return datetime.fromtimestamp(ms / 1000, tz=CST).strftime("%Y-%m-%d %H:%M:%S")
def trunc(s, n=3000):
if not isinstance(s, str): return str(s)
return s if len(s) <= n else s[:n] + f"\n... [截断,原文共 {len(s)} 字符]"
def safe_json(obj, n=5000):
s = json.dumps(obj, ensure_ascii=False, indent=2)
return s if len(s) <= n else s[:n] + f"\n... [截断,原文共 {len(s)} 字符]"
def parse_messages(messages):
conversation = []
for i, msg in enumerate(messages):
entries = msg.get("entries", [])
parsed = []
for entry in entries:
if not isinstance(entry, dict):
continue
etype = entry.get("type", "unknown")
if etype == "text":
parsed.append({"type": "text", "text": entry.get("text", "")})
elif etype == "toolUse":
parsed.append({
"type": "toolUse", "id": entry.get("id"),
"name": entry.get("name"), "args": entry.get("args", {}),
"requestMessageId": entry.get("requestMessageId"),
})
elif etype == "toolUseResponse":
parsed.append({
"type": "toolUseResponse", "id": entry.get("id"),
"name": entry.get("name"), "args": entry.get("args", {}),
"message": entry.get("message", ""), "success": entry.get("success"),
})
elif etype == "document":
doc = entry.get("document", {})
parsed.append({
"type": "document", "doc_type": doc.get("type"),
"target": doc.get("target"), "doc_keys": list(doc.keys()),
})
else:
parsed.append({"type": etype, "raw_keys": list(entry.keys())})
conversation.append({
"index": i, "role": msg.get("role", "?"),
"messageId": msg.get("messageId", "?"),
"forcedRole": msg.get("forcedRole"), "entries": parsed,
})
return conversation
def parse_actions(actions):
timeline = []
for i, action in enumerate(actions):
entry = {
"index": i, "actionId": action.get("actionId"),
"actionType": action.get("actionType"),
"actionState": action.get("actionState"),
"emittedAt": ts(action.get("emittedAt")),
}
if action.get("subExecutionId"):
entry["subExecutionId"] = action["subExecutionId"]
if action.get("endTime"):
entry["endTime"] = ts(action["endTime"])
for k in ("intentResult", "input", "output"):
if k in action:
entry[k] = action[k]
timeline.append(entry)
return timeline
def collect_diffs():
diffs = {}
if not DIFF_DIR.exists():
return diffs
for sub in sorted(DIFF_DIR.iterdir()):
if sub.is_dir():
for f in sub.iterdir():
if f.is_file():
diffs[f"{sub.name}/{f.name}"] = {
"size": f.stat().st_size,
"content": f.read_text(encoding="utf-8", errors="replace"),
}
return diffs
def collect_ids(log, conversation, timeline):
refs = {
"executionId": log.get("executionId"),
"chatSessionId": log.get("chatSessionId"),
"actionIds": [], "subExecutionIds": [],
"toolUseIds": [], "messageIds": [],
"requestMessageIds": [], "diff_hashes": [],
}
for step in timeline:
for k, lst in [("actionId", "actionIds"), ("subExecutionId", "subExecutionIds")]:
v = step.get(k)
if v and v not in refs[lst]:
refs[lst].append(v)
for msg in conversation:
mid = msg.get("messageId")
if mid and mid not in refs["messageIds"]:
refs["messageIds"].append(mid)
for entry in msg.get("entries", []):
for k, lst in [("id", "toolUseIds"), ("requestMessageId", "requestMessageIds")]:
v = entry.get(k)
if v and v not in refs[lst]:
refs[lst].append(v)
if DIFF_DIR.exists():
refs["diff_hashes"] = [s.name for s in sorted(DIFF_DIR.iterdir()) if s.is_dir()]
return refs
def gen_md(meta, input_msg, conversation, timeline, diffs, ids, usage, session_info):
L = []
L.append("# Kiro Agent 会话全量记录\n")
L.append(f"> 生成时间: {datetime.now(CST).strftime('%Y-%m-%d %H:%M:%S')} CST\n")
# --- 1. 元数据 ---
L.append("## 1. 会话元数据\n")
L.append("| 字段 | 值 |")
L.append("|------|-----|")
for k, v in meta.items():
L.append(f"| {k} | `{v}` |")
L.append("")
if session_info:
L.append("### 会话配置\n")
L.append(f"- title: `{session_info.get('title')}`")
L.append(f"- dateCreated: `{ts(int(session_info.get('dateCreated', 0)))}`")
L.append(f"- workspaceDirectory: `{session_info.get('workspaceDirectory')}`")
L.append("")
# --- 2. 用户原始输入 ---
L.append("## 2. 用户原始输入\n")
L.append(f"```\n{trunc(input_msg, 5000)}\n```\n")
# --- 3. ID 引用关系 ---
L.append("## 3. ID/Hash 引用关系图\n")
L.append("### 3.1 目录层级 Hash已验证\n")
L.append("| Hash | 位置 | 验证结果 |")
L.append("|------|------|----------|")
L.append(f"| `ZDpcVXNlcnNcTmVvXGRlc2t0b3BcVA__` | workspace-sessions 子目录 | ✅ Base64url 编码 workspace 路径 `d:\\Users\\Neo\\desktop\\T``==` 填充替换为 `__` |")
L.append(f"| `dba26892-8e58-447f-ad82-bc9459a51753` | session 文件名 | ✅ UUID v4即 chatSessionId |")
L.append(f"| `{EXEC_ID_DIR}` | 顶层目录 | ❓ 32 位 hexMD5 格式),非 workspace 路径的 MD5可能是 profile/account hash |")
L.append(f"| `{ACTION_LOG_ID}` | 二级目录 | ❓ 32 位 hex非 executionId 的 MD5可能是 execution 内部标识的 hash |")
L.append(f"| `{LOG_FILE_HASH}` | 日志文件名 | ❓ 32 位 hex可能是固定 schema 标识或内容 hash |")
L.append(f"| `{DIFF_PARENT_HASH}` | diff 父目录 | ❓ 32 位 hex与 execution 关联 |")
L.append(f"| `f62de366d0006e17ea00a01f6624aabf` | 执行索引文件 | ❓ 固定文件名,出现在 workspace 和 default 两处 |")
L.append(f"| diff 子目录8位 hex | diff 快照 | 文件内容的短 hash |")
L.append(f"| `tooluse_*` | toolUse entry id | `tooluse_` 前缀 + 随机串(如 `tooluse_fileTree`、`tooluse_DH4yvpYUCRxZsulG2G2bU1` |")
L.append("")
L.append("### 3.2 运行时 ID\n")
L.append(f"- executionId: `{ids['executionId']}`")
L.append(f"- chatSessionId: `{ids['chatSessionId']}`")
L.append(f"- subExecutionIds: `{ids['subExecutionIds']}`")
L.append(f"\ntoolUseIds ({len(ids['toolUseIds'])} 个):\n")
for tid in ids["toolUseIds"]:
L.append(f"- `{tid}`")
L.append(f"\nmessageIds ({len(ids['messageIds'])} 个):\n")
for mid in ids["messageIds"]:
L.append(f"- `{mid}`")
L.append(f"\nrequestMessageIds ({len(ids['requestMessageIds'])} 个):\n")
for rmid in ids["requestMessageIds"]:
L.append(f"- `{rmid}`")
L.append(f"\ndiff_hashes: `{ids['diff_hashes']}`\n")
# --- 4. 对话记录 ---
L.append("## 4. 对话全量记录\n")
h = sum(1 for m in conversation if m['role']=='human')
b = sum(1 for m in conversation if m['role']=='bot')
t = sum(1 for m in conversation if m['role']=='tool')
L.append(f"{len(conversation)} 条消息human={h}, bot={b}, tool={t}\n")
for msg in conversation:
emoji = {"human": "👤", "bot": "🤖", "tool": "🔧"}.get(msg["role"], "")
L.append(f"### Msg {msg['index']}: {emoji} {msg['role'].upper()} `{msg['messageId']}`\n")
for entry in msg["entries"]:
et = entry["type"]
if et == "text":
text = entry["text"]
if not text:
L.append("*(空)*\n")
elif len(text) > 10000 and msg["role"] == "human":
L.append(f"**[系统提示词]** ({len(text)} 字符)\n")
L.append("<details><summary>展开</summary>\n")
L.append(f"```\n{trunc(text, 20000)}\n```\n</details>\n")
else:
L.append(f"```\n{trunc(text, 5000)}\n```\n")
elif et == "toolUse":
L.append(f"**[🔧 调用]** `{entry['name']}` id=`{entry['id']}`\n")
L.append(f"```json\n{safe_json(entry['args'], 5000)}\n```\n")
elif et == "toolUseResponse":
ok = "" if entry.get("success") else ""
L.append(f"**[📋 结果]** `{entry['name']}` {ok} id=`{entry['id']}`\n")
if entry.get("message"):
L.append(f"```\n{trunc(entry['message'], 5000)}\n```\n")
if entry.get("args"):
L.append(f"<details><summary>完整参数</summary>\n")
L.append(f"```json\n{safe_json(entry['args'], 5000)}\n```\n</details>\n")
elif et == "document":
L.append(f"**[📄 文档]** type=`{entry.get('doc_type')}` target=`{entry.get('target')}`\n")
else:
L.append(f"**[{et}]** keys={entry.get('raw_keys')}\n")
# --- 5. Actions ---
L.append("## 5. Actions 时间线\n")
L.append(f"{len(timeline)}\n")
for step in timeline:
L.append(f"### Step {step['index']}: `{step.get('actionType','?')}` [{step.get('actionState','?')}] @ {step.get('emittedAt','?')}\n")
L.append(f"- actionId: `{step.get('actionId')}`")
if step.get("subExecutionId"):
L.append(f"- subExecutionId: `{step['subExecutionId']}`")
if step.get("endTime"):
L.append(f"- endTime: {step['endTime']}")
for k in ("intentResult", "input", "output"):
if k in step:
L.append(f"- {k}:\n```json\n{safe_json(step[k], 5000)}\n```")
L.append("")
# --- 6. Diff ---
L.append("## 6. 文件版本快照\n")
if diffs:
for path, info in diffs.items():
lang = "python" if path.endswith(".py") else "markdown" if path.endswith(".md") else ""
L.append(f"### `{path}` ({info['size']} bytes)\n")
L.append(f"```{lang}\n{trunc(info['content'], 5000)}\n```\n")
else:
L.append("*(无)*\n")
# --- 7. 资源消耗 ---
L.append("## 7. 资源消耗\n")
if usage:
L.append("| 工具 | 消耗 | 单位 |")
L.append("|------|------|------|")
total = 0
for u in usage:
tools = ", ".join(u.get("usedTools", ["-"]))
amt = u.get("usage", 0)
total += amt
L.append(f"| {tools} | {amt} | {u.get('unit','?')} |")
L.append(f"| **合计** | **{total:.4f}** | |")
L.append("")
# --- 8. 存储结构 ---
L.append("## 8. Kiro 日志存储结构\n")
L.append("```")
L.append("globalStorage/")
L.append("├── state.vscdb # VS Code 状态 SQLite")
L.append("├── state.vscdb.backup")
L.append("├── storage.json # 窗口/主题/工作区配置")
L.append("└── kiro.kiroagent/")
L.append(" ├── config.json # 上下文提供者列表")
L.append(" ├── profile.json # 用户 profileARN")
L.append(" ├── .migrations/ # 迁移标记")
L.append(f" ├── <32位hex>/ # 推测: profile/account hash")
L.append(f" │ ├── f62de366... # 执行索引 JSON")
L.append(f" │ ├── <32位hex>/ # 推测: execution 相关 hash")
L.append(f" │ │ └── <32位hex> # 完整执行日志 JSON")
L.append(f" │ └── <32位hex>/ # diff 快照集合")
L.append(f" │ └── <8位hex>/ # 文件内容短 hash")
L.append(f" │ └── <filename> # 文件快照")
L.append(" ├── default/")
L.append(" │ └── f62de366... # 默认执行索引(空)")
L.append(" ├── dev_data/tokens_generated.jsonl")
L.append(" ├── index/")
L.append(" │ ├── docs.sqlite")
L.append(" │ └── globalContext.json")
L.append(" └── workspace-sessions/")
L.append(" └── <base64url(workspace_path)>/ # == 替换为 __")
L.append(" ├── sessions.json # 会话列表")
L.append(" └── <chatSessionId>.json # 会话状态")
L.append("```\n")
L.append("### Hash 命名规律总结\n")
L.append("| 类型 | 格式 | 规律 |")
L.append("|------|------|------|")
L.append("| workspace-sessions 子目录 | Base64url | workspace 绝对路径的 base64url 编码,`==` 填充替换为 `__` |")
L.append("| session 文件名 | UUID v4 | 即 chatSessionId |")
L.append("| 顶层 32 位 hex 目录 | MD5 格式 | 非 workspace 路径 MD5推测为 profile ARN 或 account ID 的 hash |")
L.append("| 二级 32 位 hex 目录 | MD5 格式 | 非 executionId 的 MD5推测为内部标识 hash |")
L.append("| 日志文件名 | 32 位 hex | 可能是固定 schema 标识 |")
L.append("| 执行索引文件名 | 32 位 hex | 固定值 `f62de366...`,出现在多处 |")
L.append("| diff 子目录 | 8 位 hex | 文件内容短 hash |")
L.append("| toolUse ID | `tooluse_` + 随机串 | 特殊的有 `tooluse_fileTree`(内置工具) |")
L.append("")
# --- 9. 数据 Schema ---
L.append("## 9. 执行日志 JSON Schema\n")
L.append("```")
L.append("{")
L.append(' "executionId": "UUID",')
L.append(' "chatSessionId": "UUID",')
L.append(' "workflowType": "chat-agent",')
L.append(' "autonomyMode": "Autopilot",')
L.append(' "status": "succeed",')
L.append(' "startTime": <毫秒时间戳>,')
L.append(' "endTime": <毫秒时间戳>,')
L.append(' "contextUsagePercentage": <float>,')
L.append(' "input": {')
L.append(' "data": {')
L.append(' "messages": [{ "role": "user", "content": [{ "text": "..." }] }],')
L.append(' "chatSessionId": "UUID"')
L.append(' },')
L.append(' "documents": []')
L.append(' },')
L.append(' "actions": [')
L.append(' {')
L.append(' "type": "AgentExecutionAction",')
L.append(' "executionId": "UUID",')
L.append(' "actionId": "UUID",')
L.append(' "actionType": "intent|tool|generation|subAgent",')
L.append(' "actionState": "start|succeed|failed",')
L.append(' "chatSessionId": "UUID",')
L.append(' "emittedAt": <毫秒时间戳>,')
L.append(' "endTime?": <毫秒时间戳>,')
L.append(' "subExecutionId?": "UUID",')
L.append(' "intentResult?": { "classification": "do|..." },')
L.append(' "input?": { "toolName": "...", ... },')
L.append(' "output?": { "text": "...", ... }')
L.append(' }')
L.append(' ],')
L.append(' "context": {')
L.append(' "messages": [')
L.append(' {')
L.append(' "role": "human|bot|tool",')
L.append(' "messageId": "UUID",')
L.append(' "forcedRole": null,')
L.append(' "entries": [')
L.append(' { "type": "text", "text": "..." },')
L.append(' { "type": "toolUse", "id": "tooluse_*", "name": "...", "args": {...}, "requestMessageId": "UUID" },')
L.append(' { "type": "toolUseResponse", "id": "tooluse_*", "name": "...", "args": {...}, "message": "...", "success": bool },')
L.append(' { "type": "document", "document": { "type": "...", "target": "...", ... } }')
L.append(' ]')
L.append(' }')
L.append(' ]')
L.append(' },')
L.append(' "usageSummary": [')
L.append(' { "usedTools?": ["toolName"], "unit": "credit", "unitPlural": "credits", "usage": <float> }')
L.append(' ],')
L.append(' "result": { "status": "success", "executionId": "UUID", "result": "..." }')
L.append("}")
L.append("```\n")
return "\n".join(L)
def main():
print("=== Kiro Agent 日志全量解析 v2 ===\n")
with open(LOG_PATH, "r", encoding="utf-8") as f:
log = json.load(f)
print(f"日志: {LOG_PATH.stat().st_size / 1024:.1f} KB")
meta = {
"executionId": log.get("executionId"),
"chatSessionId": log.get("chatSessionId"),
"workflowType": log.get("workflowType"),
"autonomyMode": log.get("autonomyMode"),
"status": log.get("status"),
"startTime": ts(log.get("startTime")),
"endTime": ts(log.get("endTime")),
"duration": f"{(log.get('endTime',0) - log.get('startTime',0)) / 1000:.1f}s",
"contextUsage": f"{log.get('contextUsagePercentage',0):.2f}%",
}
# 用户输入
input_text = ""
for msg in log.get("input",{}).get("data",{}).get("messages",[]):
for entry in msg.get("content", msg.get("entries", [])):
if isinstance(entry, dict) and entry.get("text"):
input_text += entry["text"] + "\n"
conversation = parse_messages(log.get("context",{}).get("messages",[]))
timeline = parse_actions(log.get("actions",[]))
diffs = collect_diffs()
ids = collect_ids(log, conversation, timeline)
session_info = None
if SESSION_JSON.exists():
with open(SESSION_JSON, "r", encoding="utf-8") as f:
session_info = json.load(f)
usage = log.get("usageSummary", [])
md = gen_md(meta, input_text, conversation, timeline, diffs, ids, usage, session_info)
with open(OUT_MD, "w", encoding="utf-8") as f:
f.write(md)
lines = len(md.splitlines())
size_kb = OUT_MD.stat().st_size / 1024
print(f"输出: {OUT_MD} ({size_kb:.1f} KB, {lines} 行)")
print(f"消息: human={sum(1 for m in conversation if m['role']=='human')}, "
f"bot={sum(1 for m in conversation if m['role']=='bot')}, "
f"tool={sum(1 for m in conversation if m['role']=='tool')}")
print(f"工具调用: {sum(1 for m in conversation for e in m['entries'] if e['type']=='toolUse')}")
print(f"Actions: {len(timeline)}, Diffs: {len(diffs)}")
print(f"toolUseIds: {len(ids['toolUseIds'])}, messageIds: {len(ids['messageIds'])}")
if __name__ == "__main__":
main()