Co-Authored-By: OpenAI Codex <codex@openai.com> Co-authored-by: Cursor <cursoragent@cursor.com>
648 lines
25 KiB
Python
648 lines
25 KiB
Python
from __future__ import annotations
|
||
|
||
import csv
|
||
import datetime as dt
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
from collections import Counter, defaultdict
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
|
||
REPO_ROOT = Path(__file__).resolve().parents[2]
|
||
HOME = Path.home()
|
||
CLAUDE_HOME = HOME / ".claude"
|
||
CODEX_HOME = HOME / ".codex"
|
||
CODEX_SKILLS = CODEX_HOME / "skills"
|
||
HISTORY_ROOT = REPO_ROOT / "docs" / "claude-history"
|
||
|
||
SECRET_PATTERNS = [
|
||
re.compile(r"sk-[A-Za-z0-9_-]{12,}"),
|
||
re.compile(r"sk-proj-[A-Za-z0-9_-]{12,}"),
|
||
re.compile(r"(?i)(password|passwd|pwd|token|secret|api[_-]?key)\s*[:=]\s*['\"]?[^'\"\s,;]+"),
|
||
re.compile(r"postgresql://[^\s'\"`]+"),
|
||
re.compile(r"mysql://[^\s'\"`]+"),
|
||
re.compile(r"mongodb(?:\+srv)?://[^\s'\"`]+"),
|
||
]
|
||
|
||
|
||
def redact_preserve(text: str) -> str:
|
||
value = text.replace("sk-proj-xxxxx", "[示例密钥已脱敏]")
|
||
for pattern in SECRET_PATTERNS:
|
||
value = pattern.sub("[已脱敏]", value)
|
||
return value
|
||
|
||
|
||
def redact(text: str, limit: int | None = None) -> str:
|
||
value = redact_preserve(text)
|
||
value = re.sub(r"\s+", " ", value).strip()
|
||
if limit is not None and len(value) > limit:
|
||
return value[: limit - 1].rstrip() + "…"
|
||
return value
|
||
|
||
|
||
def backup(path: Path) -> None:
|
||
if not path.exists():
|
||
return
|
||
stamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S")
|
||
backup_root = CODEX_HOME / "backups" / "claude-migration"
|
||
backup_root.mkdir(parents=True, exist_ok=True)
|
||
safe_name = re.sub(r"[^A-Za-z0-9_.-]+", "_", str(path).replace(":", ""))
|
||
target = backup_root / f"{safe_name}.backup-{stamp}"
|
||
if path.is_dir():
|
||
shutil.copytree(path, target)
|
||
else:
|
||
shutil.copy2(path, target)
|
||
|
||
|
||
def parse_frontmatter(text: str) -> tuple[dict[str, str], str]:
|
||
if not text.startswith("---"):
|
||
return {}, text
|
||
end = text.find("\n---", 3)
|
||
if end == -1:
|
||
return {}, text
|
||
raw = text[3:end].strip()
|
||
body = text[end + len("\n---") :].lstrip("\n")
|
||
meta: dict[str, str] = {}
|
||
for line in raw.splitlines():
|
||
if ":" not in line:
|
||
continue
|
||
key, value = line.split(":", 1)
|
||
meta[key.strip()] = value.strip().strip('"').strip("'")
|
||
return meta, body
|
||
|
||
|
||
def title_case_slug(name: str) -> str:
|
||
return " ".join(part.capitalize() for part in name.replace("_", "-").split("-"))
|
||
|
||
|
||
def write_text(path: Path, text: str) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(text, encoding="utf-8")
|
||
|
||
|
||
def copytree_contents(src: Path, dst: Path) -> None:
|
||
if dst.exists():
|
||
shutil.rmtree(dst)
|
||
dst.mkdir(parents=True, exist_ok=True)
|
||
for item in src.iterdir():
|
||
target = dst / item.name
|
||
if item.is_dir():
|
||
shutil.copytree(item, target)
|
||
else:
|
||
shutil.copy2(item, target)
|
||
|
||
|
||
def migrate_skills() -> list[str]:
|
||
src_root = CLAUDE_HOME / "skills"
|
||
migrated: list[str] = []
|
||
if not src_root.exists():
|
||
return migrated
|
||
|
||
for src in sorted(p for p in src_root.iterdir() if p.is_dir()):
|
||
skill_md = src / "SKILL.md"
|
||
if not skill_md.exists():
|
||
continue
|
||
|
||
dst = CODEX_SKILLS / src.name
|
||
if dst.exists():
|
||
backup(dst)
|
||
copytree_contents(src, dst)
|
||
|
||
original = skill_md.read_text(encoding="utf-8", errors="replace")
|
||
meta, body = parse_frontmatter(original)
|
||
name = re.sub(r"[^a-z0-9-]", "-", meta.get("name", src.name).lower()).strip("-") or src.name
|
||
description = meta.get("description") or f"从 Claude Code 迁移的 {src.name} 工作流。"
|
||
body = redact_preserve(body)
|
||
new_text = (
|
||
"---\n"
|
||
f"name: {name}\n"
|
||
f"description: {description} 从 Claude Code 迁移;当用户提到 ${name}、{src.name}、原 Claude skill,或需要该工作流时使用。\n"
|
||
"---\n\n"
|
||
f"> 迁移说明:本 skill 从 `C:\\Users\\Administrator\\.claude\\skills\\{src.name}` 转换而来。"
|
||
"如内容包含 Claude Code 专属命令,请按 Codex 当前工具等价替换。\n\n"
|
||
+ body
|
||
)
|
||
write_text(dst / "SKILL.md", new_text)
|
||
|
||
agents_dir = dst / "agents"
|
||
agents_dir.mkdir(exist_ok=True)
|
||
short = redact(description, 120).replace('"', "'")
|
||
openai_yaml = (
|
||
f'display_name: "{title_case_slug(name)}"\n'
|
||
f'short_description: "{short}"\n'
|
||
f'default_prompt: "使用 {name} 处理当前任务,遵循从 Claude Code 迁移来的工作流。"\n'
|
||
)
|
||
write_text(agents_dir / "openai.yaml", openai_yaml)
|
||
migrated.append(name)
|
||
return migrated
|
||
|
||
|
||
def migrate_agents() -> list[str]:
|
||
src_root = CLAUDE_HOME / "agents"
|
||
if not src_root.exists():
|
||
return []
|
||
|
||
skill_dir = CODEX_SKILLS / "claude-agent-roles"
|
||
if skill_dir.exists():
|
||
backup(skill_dir)
|
||
shutil.rmtree(skill_dir)
|
||
refs = skill_dir / "references"
|
||
refs.mkdir(parents=True, exist_ok=True)
|
||
|
||
rows: list[tuple[str, str]] = []
|
||
for src in sorted(src_root.glob("*.md")):
|
||
text = src.read_text(encoding="utf-8", errors="replace")
|
||
meta, body = parse_frontmatter(text)
|
||
name = meta.get("name", src.stem)
|
||
description = meta.get("description", "")
|
||
rows.append((name, description))
|
||
write_text(refs / f"{src.stem}.md", redact_preserve(body))
|
||
|
||
table = "\n".join(
|
||
f"| `{name}` | {redact(description, 160)} | `references/{name}.md` |" for name, description in rows
|
||
)
|
||
skill_md = f"""---
|
||
name: claude-agent-roles
|
||
description: 从 Claude Code 迁移的自定义 agent 角色参考。Use when 用户提到 planner、architect、code-reviewer、security-reviewer、database-reviewer、python-reviewer、tdd-guide、refactor-cleaner,或要求沿用 Claude Code agent/角色/多视角审查习惯时使用。
|
||
---
|
||
|
||
# Claude Agent Roles
|
||
|
||
本 skill 保存原 Claude Code 自定义 agent 的角色提示词。Codex 当前不能一比一注册这些 Claude agent;使用时读取对应 reference,把它当作角色视角、检查清单或审查框架。
|
||
|
||
## 角色映射
|
||
|
||
| 角色 | 用途 | 参考文件 |
|
||
|------|------|----------|
|
||
{table}
|
||
|
||
## 使用规则
|
||
|
||
1. 用户明确点名某个角色时,读取对应 `references/*.md`。
|
||
2. 复杂功能、架构调整、重大重构时优先参考 `planner` 与 `architect`。
|
||
3. 代码修改后优先参考 `code-reviewer`;涉及认证、权限、数据库、密钥、用户输入时叠加 `security-reviewer` 或 `database-reviewer`。
|
||
4. Bug 修复和新功能需要测试设计时参考 `tdd-guide`。
|
||
5. 不要声称已经启动 Claude agent;用“按迁移角色检查/规划”描述即可。
|
||
"""
|
||
write_text(skill_dir / "SKILL.md", skill_md)
|
||
write_text(
|
||
skill_dir / "agents" / "openai.yaml",
|
||
'display_name: "Claude Agent Roles"\n'
|
||
'short_description: "迁移自 Claude Code 的 planner、architect、reviewer 等角色参考。"\n'
|
||
'default_prompt: "按迁移自 Claude Code 的角色习惯,对当前任务进行规划、审查或安全检查。"\n',
|
||
)
|
||
return [name for name, _ in rows]
|
||
|
||
|
||
def migrate_rules() -> list[str]:
|
||
src_root = CLAUDE_HOME / "rules"
|
||
if not src_root.exists():
|
||
return []
|
||
|
||
skill_dir = CODEX_SKILLS / "claude-rules-reference"
|
||
if skill_dir.exists():
|
||
backup(skill_dir)
|
||
shutil.rmtree(skill_dir)
|
||
refs = skill_dir / "references"
|
||
refs.mkdir(parents=True, exist_ok=True)
|
||
|
||
copied: list[str] = []
|
||
for src in sorted(src_root.rglob("*.md")):
|
||
rel = src.relative_to(src_root)
|
||
target = refs / rel
|
||
write_text(target, redact_preserve(src.read_text(encoding="utf-8", errors="replace")))
|
||
copied.append(rel.as_posix())
|
||
|
||
list_text = "\n".join(f"- `references/{path}`" for path in copied)
|
||
skill_md = f"""---
|
||
name: claude-rules-reference
|
||
description: 从 Claude Code 迁移的个人工程规则、中文工作流、Python/TypeScript/Web 编码规范、安全、测试、审查和性能偏好。Use when 需要沿用用户之前的 Claude Code 使用习惯、steering/rules/pre-prompt,或处理代码风格、测试、安全、评审、Web 设计质量要求时使用。
|
||
---
|
||
|
||
# Claude Rules Reference
|
||
|
||
本 skill 保存原 `C:\\Users\\Administrator\\.claude\\rules`。优先使用当前仓库 `AGENTS.md`,当用户要求沿用旧习惯、或任务涉及代码风格/测试/安全/审查/Web 体验时,再读取相关 reference。
|
||
|
||
## 可用参考
|
||
|
||
{list_text}
|
||
|
||
## 读取建议
|
||
|
||
- 中文通用习惯:读取 `references/zh/README.md` 及同目录相关主题。
|
||
- Python:读取 `references/python/*.md`。
|
||
- TypeScript/前端:读取 `references/typescript/*.md` 与 `references/web/*.md`。
|
||
- 安全、测试、代码审查:按主题读取对应文件,不要一次性加载全部。
|
||
"""
|
||
write_text(skill_dir / "SKILL.md", skill_md)
|
||
write_text(
|
||
skill_dir / "agents" / "openai.yaml",
|
||
'display_name: "Claude Rules Reference"\n'
|
||
'short_description: "迁移自 Claude Code 的个人规则、steering 和工程偏好。"\n'
|
||
'default_prompt: "沿用用户从 Claude Code 迁移来的工程规则和审查习惯处理当前任务。"\n',
|
||
)
|
||
return copied
|
||
|
||
|
||
def migrate_global_agents() -> None:
|
||
target = CODEX_HOME / "AGENTS.md"
|
||
backup(target)
|
||
text = """# 用户全局习惯(由 Claude Code 迁移)
|
||
|
||
## 语言与沟通
|
||
|
||
- 默认使用简体中文回复、解释、状态更新和审计记录。
|
||
- 技术术语、命令、API 字段、变量名保持原文。
|
||
- 先读上下文再动手;不确定时提出关键问题,但对低风险配置/文档迁移可直接执行。
|
||
- 回复要高信号、少套话;给出实际结果、验证状态和剩余风险。
|
||
|
||
## 工作方式
|
||
|
||
- 尊重既有代码风格和项目约定,优先复用现有模式。
|
||
- 每一处改动都应能追溯到用户请求;不要顺手做无关重构。
|
||
- 小步实施,保持可验证、可回滚。
|
||
- 复杂功能、重构、多模块改动前先做规划和影响分析。
|
||
- Bug 修复和新功能优先考虑测试驱动:先确认复现或 RED,再实现,再验证 GREEN。
|
||
- 修改代码后要说明改了哪些文件、为什么改、怎么验证、哪些风险未覆盖。
|
||
|
||
## 审查与安全偏好
|
||
|
||
- 代码修改后进行代码审查视角检查。
|
||
- 涉及认证、授权、数据库、文件系统、用户输入、外部 API、密钥、支付/财务时,必须叠加安全审查。
|
||
- 禁止硬编码密钥、令牌、密码和生产 DSN;日志和文档中避免暴露敏感信息。
|
||
- 数据库查询优先参数化,Schema 变更必须同步文档和回滚/验证步骤。
|
||
|
||
## 角色与 skill 迁移
|
||
|
||
- 原 Claude Code agents 已迁移为 Codex skill:`claude-agent-roles`。
|
||
- 原 Claude Code rules 已迁移为 Codex skill:`claude-rules-reference`。
|
||
- 原 Claude Code skills 已迁移到 `C:\\Users\\Administrator\\.codex\\skills`。
|
||
- 当用户提到旧角色或旧 skill 时,优先读取对应 Codex skill/reference,而不是重新发明流程。
|
||
|
||
## 历史追溯
|
||
|
||
- Claude Code 历史摘要归档在仓库 `docs/claude-history/`。
|
||
- 需要追踪“哪次对话改了什么、影响什么”时,先查 `session_index.csv` 和 `file_index.csv`,再读对应 `sessions/*.md`。
|
||
- 历史摘要是追溯材料,不是当前事实来源;真正编码前仍需读取当前文件、git diff、审计记录和测试结果。
|
||
"""
|
||
write_text(target, text)
|
||
|
||
|
||
def safe_rel(path: str) -> str:
|
||
value = path.replace("\\", "/")
|
||
normalized = value.lower()
|
||
markers = [
|
||
"c:/project/neozqyy/",
|
||
"/c/project/neozqyy/",
|
||
"c:/neozqyy/",
|
||
"/c/neozqyy/",
|
||
]
|
||
for marker in markers:
|
||
if normalized.startswith(marker):
|
||
return value[len(marker) :]
|
||
return value
|
||
|
||
|
||
def classify_area(path: str) -> str:
|
||
normalized = safe_rel(path)
|
||
if normalized.startswith("apps/backend/"):
|
||
return "后端"
|
||
if normalized.startswith("apps/etl/"):
|
||
return "ETL"
|
||
if normalized.startswith("apps/miniprogram/"):
|
||
return "小程序"
|
||
if normalized.startswith("apps/admin-web/"):
|
||
return "admin-web"
|
||
if normalized.startswith("apps/tenant-admin/"):
|
||
return "tenant-admin"
|
||
if normalized.startswith("db/") or normalized.endswith(".sql"):
|
||
return "数据库"
|
||
if normalized.startswith("docs/"):
|
||
return "文档"
|
||
if normalized.startswith("scripts/") or normalized.startswith("tools/"):
|
||
return "脚本/工具"
|
||
return "其他"
|
||
|
||
|
||
def extract_text_from_content(content: Any) -> str:
|
||
if isinstance(content, str):
|
||
return content
|
||
if isinstance(content, list):
|
||
parts: list[str] = []
|
||
for item in content:
|
||
if isinstance(item, dict) and item.get("type") == "text":
|
||
parts.append(str(item.get("text", "")))
|
||
return "\n".join(parts)
|
||
return ""
|
||
|
||
|
||
def first_sql_summary(sql: str) -> str:
|
||
clean = redact(sql, 240)
|
||
command = re.match(r"\s*(select|insert|update|delete|create|alter|drop|with|explain|truncate)\b", sql, re.I)
|
||
verb = command.group(1).upper() if command else "SQL"
|
||
tables = sorted(set(re.findall(r"\b(?:from|join|into|update|table|view)\s+([a-zA-Z_][\w.]*)(?:\s|$)", sql, re.I)))
|
||
if tables:
|
||
return f"{verb}: {', '.join(tables[:8])}"
|
||
return f"{verb}: {clean}"
|
||
|
||
|
||
def summarize_session(path: Path) -> dict[str, Any]:
|
||
session_id = path.stem
|
||
timestamps: list[str] = []
|
||
branches: set[str] = set()
|
||
cwds: set[str] = set()
|
||
user_prompts: list[str] = []
|
||
assistant_notes: list[str] = []
|
||
touched: Counter[str] = Counter()
|
||
read_files: Counter[str] = Counter()
|
||
commands: Counter[str] = Counter()
|
||
sql_ops: Counter[str] = Counter()
|
||
tools: Counter[str] = Counter()
|
||
mcp_tools: Counter[str] = Counter()
|
||
agents: Counter[str] = Counter()
|
||
risk_flags: set[str] = set()
|
||
line_count = 0
|
||
|
||
with path.open("r", encoding="utf-8", errors="replace") as f:
|
||
for line in f:
|
||
line_count += 1
|
||
try:
|
||
obj = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
continue
|
||
ts = obj.get("timestamp")
|
||
if isinstance(ts, str):
|
||
timestamps.append(ts)
|
||
branch = obj.get("gitBranch")
|
||
if isinstance(branch, str) and branch:
|
||
branches.add(branch)
|
||
cwd = obj.get("cwd")
|
||
if isinstance(cwd, str) and cwd:
|
||
cwds.add(cwd)
|
||
msg = obj.get("message")
|
||
if not isinstance(msg, dict):
|
||
continue
|
||
role = msg.get("role")
|
||
content = msg.get("content")
|
||
if role == "user":
|
||
text = extract_text_from_content(content)
|
||
if text and not obj.get("isMeta") and not obj.get("isCompactSummary"):
|
||
user_prompts.append(redact(text, 260))
|
||
elif role == "assistant":
|
||
text = extract_text_from_content(content)
|
||
if text:
|
||
assistant_notes.append(redact(text, 220))
|
||
|
||
if isinstance(content, list):
|
||
for item in content:
|
||
if not isinstance(item, dict) or item.get("type") != "tool_use":
|
||
continue
|
||
name = str(item.get("name", "unknown"))
|
||
tools[name] += 1
|
||
if name.startswith("mcp__"):
|
||
mcp_tools[name] += 1
|
||
inp = item.get("input") if isinstance(item.get("input"), dict) else {}
|
||
|
||
file_path = inp.get("file_path")
|
||
if isinstance(file_path, str):
|
||
rel = safe_rel(file_path)
|
||
if name in {"Edit", "Write", "MultiEdit"}:
|
||
touched[rel] += 1
|
||
elif name == "Read":
|
||
read_files[rel] += 1
|
||
path_value = inp.get("path")
|
||
if isinstance(path_value, str) and name in {"Write", "Edit", "mcp__weixin-devtools-mcp__screenshot"}:
|
||
touched[safe_rel(path_value)] += 1
|
||
command = inp.get("command")
|
||
if isinstance(command, str):
|
||
cmd = redact(command, 180)
|
||
commands[cmd] += 1
|
||
lowered = command.lower()
|
||
if "git reset --hard" in lowered or "git clean" in lowered:
|
||
risk_flags.add("包含高风险 git 清理命令")
|
||
if "drop table" in lowered or "truncate" in lowered:
|
||
risk_flags.add("包含高风险数据库命令")
|
||
if ".env" in lowered:
|
||
risk_flags.add("命令涉及环境文件")
|
||
sql = inp.get("sql")
|
||
if isinstance(sql, str):
|
||
summary = first_sql_summary(sql)
|
||
sql_ops[summary] += 1
|
||
lowered_sql = sql.lower()
|
||
if re.search(r"\b(drop|truncate|delete)\b", lowered_sql):
|
||
risk_flags.add("包含删除/回滚类 SQL")
|
||
subagent = inp.get("subagent_type") or inp.get("description")
|
||
if isinstance(subagent, str) and name == "Agent":
|
||
agents[redact(subagent, 120)] += 1
|
||
|
||
areas = Counter(classify_area(p) for p in touched)
|
||
first_ts = min(timestamps) if timestamps else ""
|
||
last_ts = max(timestamps) if timestamps else ""
|
||
return {
|
||
"session_id": session_id,
|
||
"source_path": str(path),
|
||
"bytes": path.stat().st_size,
|
||
"lines": line_count,
|
||
"first_ts": first_ts,
|
||
"last_ts": last_ts,
|
||
"branches": sorted(branches),
|
||
"cwds": sorted(cwds),
|
||
"user_prompts": user_prompts[:12],
|
||
"assistant_notes": assistant_notes[:8],
|
||
"touched_files": touched.most_common(),
|
||
"read_files": read_files.most_common(30),
|
||
"commands": commands.most_common(40),
|
||
"sql_ops": sql_ops.most_common(30),
|
||
"tools": tools.most_common(30),
|
||
"mcp_tools": mcp_tools.most_common(20),
|
||
"agents": agents.most_common(20),
|
||
"areas": areas.most_common(),
|
||
"risk_flags": sorted(risk_flags),
|
||
}
|
||
|
||
|
||
def session_markdown(summary: dict[str, Any]) -> str:
|
||
touched = summary["touched_files"]
|
||
areas = ", ".join(f"{area}({count})" for area, count in summary["areas"]) or "未识别"
|
||
goals = "\n".join(f"- {p}" for p in summary["user_prompts"]) or "- 未提取到用户目标"
|
||
files = "\n".join(f"- `{path}`:{count} 次写入/编辑" for path, count in touched[:80]) or "- 未检测到写入/编辑工具"
|
||
commands = "\n".join(f"- `{cmd}`:{count} 次" for cmd, count in summary["commands"][:30]) or "- 未检测到 Bash 命令"
|
||
sql_ops = "\n".join(f"- {op}:{count} 次" for op, count in summary["sql_ops"][:30]) or "- 未检测到 SQL 工具调用"
|
||
tools = "\n".join(f"- `{tool}`:{count} 次" for tool, count in summary["tools"][:20]) or "- 无"
|
||
agents = "\n".join(f"- {agent}:{count} 次" for agent, count in summary["agents"]) or "- 未检测到 Claude Agent 调用"
|
||
risks = "\n".join(f"- {flag}" for flag in summary["risk_flags"]) or "- 未从工具调用中检测到显式高风险信号"
|
||
notes = "\n".join(f"- {note}" for note in summary["assistant_notes"][:8]) or "- 未提取"
|
||
return f"""# Claude 会话摘要:{summary['session_id']}
|
||
|
||
| 字段 | 值 |
|
||
|------|----|
|
||
| 时间范围 | {summary['first_ts']} -> {summary['last_ts']} |
|
||
| 原始记录 | `{summary['source_path']}` |
|
||
| 大小 | {summary['bytes']} bytes / {summary['lines']} lines |
|
||
| 分支 | {', '.join(summary['branches']) or '未记录'} |
|
||
| 目录 | {', '.join(summary['cwds']) or '未记录'} |
|
||
| 影响范围 | {areas} |
|
||
|
||
## 用户目标摘录(已脱敏)
|
||
|
||
{goals}
|
||
|
||
## 可能修改的文件
|
||
|
||
{files}
|
||
|
||
## 运行过的命令(已脱敏)
|
||
|
||
{commands}
|
||
|
||
## 数据库/SQL 操作摘要
|
||
|
||
{sql_ops}
|
||
|
||
## 工具调用概览
|
||
|
||
{tools}
|
||
|
||
## Agent/子任务线索
|
||
|
||
{agents}
|
||
|
||
## 助手过程摘要摘录(已脱敏)
|
||
|
||
{notes}
|
||
|
||
## 风险与追溯提示
|
||
|
||
{risks}
|
||
|
||
> 本摘要由脚本从 Claude JSONL 工具调用和消息元数据中生成,不替代 `git diff`、审计记录、测试结果和当前代码事实。需要深挖时再读取原始 JSONL,并继续做脱敏处理。
|
||
"""
|
||
|
||
|
||
def migrate_history() -> dict[str, int]:
|
||
source = CLAUDE_HOME / "projects" / "C--Project-NeoZQYY"
|
||
if not source.exists():
|
||
return {"sessions": 0, "files": 0}
|
||
|
||
if HISTORY_ROOT.exists():
|
||
backup(HISTORY_ROOT)
|
||
shutil.rmtree(HISTORY_ROOT)
|
||
sessions_dir = HISTORY_ROOT / "sessions"
|
||
sessions_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
summaries = [summarize_session(path) for path in sorted(source.glob("*.jsonl"), key=lambda p: p.stat().st_mtime)]
|
||
summaries.sort(key=lambda item: item["last_ts"] or item["first_ts"])
|
||
|
||
file_index: dict[str, list[dict[str, Any]]] = defaultdict(list)
|
||
with (HISTORY_ROOT / "session_index.csv").open("w", newline="", encoding="utf-8-sig") as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(["session_id", "first_ts", "last_ts", "bytes", "lines", "branches", "areas", "touched_count", "risk_flags", "summary_file", "source_path"])
|
||
for summary in summaries:
|
||
summary_file = sessions_dir / f"{summary['session_id']}.md"
|
||
write_text(summary_file, session_markdown(summary))
|
||
areas = "; ".join(f"{area}:{count}" for area, count in summary["areas"])
|
||
writer.writerow([
|
||
summary["session_id"],
|
||
summary["first_ts"],
|
||
summary["last_ts"],
|
||
summary["bytes"],
|
||
summary["lines"],
|
||
"; ".join(summary["branches"]),
|
||
areas,
|
||
len(summary["touched_files"]),
|
||
"; ".join(summary["risk_flags"]),
|
||
str(summary_file.relative_to(REPO_ROOT)),
|
||
summary["source_path"],
|
||
])
|
||
for path, count in summary["touched_files"]:
|
||
file_index[path].append({"session_id": summary["session_id"], "count": count, "last_ts": summary["last_ts"]})
|
||
|
||
with (HISTORY_ROOT / "file_index.csv").open("w", newline="", encoding="utf-8-sig") as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(["file_path", "session_id", "last_ts", "edit_count", "session_summary"])
|
||
for file_path in sorted(file_index):
|
||
for row in sorted(file_index[file_path], key=lambda item: item["last_ts"]):
|
||
writer.writerow([
|
||
file_path,
|
||
row["session_id"],
|
||
row["last_ts"],
|
||
row["count"],
|
||
f"docs/claude-history/sessions/{row['session_id']}.md",
|
||
])
|
||
|
||
recent_lines = "\n".join(
|
||
f"- `{s['session_id']}`:{s['first_ts']} -> {s['last_ts']},修改 {len(s['touched_files'])} 个文件,范围 {', '.join(a for a, _ in s['areas']) or '未识别'}"
|
||
for s in summaries[-20:]
|
||
)
|
||
readme = f"""# Claude Code 历史摘要归档
|
||
|
||
本目录由 `tools/codex/migrate_claude_assets.py` 从 `C:\\Users\\Administrator\\.claude\\projects\\C--Project-NeoZQYY` 同名项目历史生成,用于迁移到 Codex 后的追本溯源。
|
||
|
||
## 文件说明
|
||
|
||
- `session_index.csv`:会话级索引,按 session 记录时间范围、影响范围、风险标签、摘要文件。
|
||
- `file_index.csv`:文件反向索引,回答“哪个会话改过这个文件”。
|
||
- `sessions/*.md`:每个 Claude JSONL 会话的脱敏摘要。
|
||
|
||
## 最近 20 个会话
|
||
|
||
{recent_lines}
|
||
|
||
## 使用方式
|
||
|
||
1. 查某个文件历史:在 `file_index.csv` 搜索文件路径。
|
||
2. 查某次会话影响:打开对应 `sessions/<session_id>.md`。
|
||
3. 需要完整细节时,再回到原始 JSONL;读取前注意脱敏。
|
||
|
||
## 注意
|
||
|
||
摘要基于工具调用和消息元数据自动生成,不能替代当前代码、审计文档和测试结果。编码前仍需读取当前文件和 `git diff`。
|
||
"""
|
||
write_text(HISTORY_ROOT / "README.md", readme)
|
||
return {"sessions": len(summaries), "files": len(file_index)}
|
||
|
||
|
||
def update_migration_doc(result: dict[str, Any]) -> None:
|
||
doc = REPO_ROOT / "docs" / "codex_migration.md"
|
||
existing = doc.read_text(encoding="utf-8", errors="replace") if doc.exists() else "# Codex 迁移配置说明\n"
|
||
marker = "## 本次深度迁移结果"
|
||
existing = existing.split(marker)[0].rstrip()
|
||
section = f"""
|
||
|
||
{marker}
|
||
|
||
- 用户全局习惯已写入 `C:\\Users\\Administrator\\.codex\\AGENTS.md`。
|
||
- Claude skills 已迁移 {len(result['skills'])} 个到 `C:\\Users\\Administrator\\.codex\\skills`:{', '.join(result['skills'])}。
|
||
- Claude agents 已迁移为 Codex skill `claude-agent-roles`,包含 {len(result['agents'])} 个角色参考。
|
||
- Claude rules 已迁移为 Codex skill `claude-rules-reference`,包含 {len(result['rules'])} 个规则文件。
|
||
- NeoZQYY Claude 会话历史已摘要归档到 `docs/claude-history/`:{result['history']['sessions']} 个会话,{result['history']['files']} 个被编辑文件索引。
|
||
|
||
### 追溯入口
|
||
|
||
- 会话索引:`docs/claude-history/session_index.csv`
|
||
- 文件索引:`docs/claude-history/file_index.csv`
|
||
- 会话摘要:`docs/claude-history/sessions/`
|
||
"""
|
||
write_text(doc, existing + section)
|
||
|
||
|
||
def main() -> None:
|
||
CODEX_HOME.mkdir(exist_ok=True)
|
||
CODEX_SKILLS.mkdir(exist_ok=True)
|
||
result = {
|
||
"skills": migrate_skills(),
|
||
"agents": migrate_agents(),
|
||
"rules": migrate_rules(),
|
||
"history": migrate_history(),
|
||
}
|
||
migrate_global_agents()
|
||
update_migration_doc(result)
|
||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|