from __future__ import annotations import csv import datetime as dt import json import re import shutil from collections import Counter, defaultdict from pathlib import Path from typing import Any REPO_ROOT = Path(__file__).resolve().parents[2] HOME = Path.home() CLAUDE_HOME = HOME / ".claude" CODEX_HOME = HOME / ".codex" CURSOR_HOME = HOME / ".cursor" CURSOR_USER_SETTINGS = HOME / "AppData" / "Roaming" / "Cursor" / "User" / "settings.json" VSCODE_INSIDERS_ROOT = Path("C:/Dev/VSCodeInsiders") PROJECT_CURSOR = REPO_ROOT / ".cursor" PROJECT_RULES = PROJECT_CURSOR / "rules" PROJECT_SKILLS = PROJECT_CURSOR / "skills" PROJECT_HOOKS = PROJECT_CURSOR / "hooks" USER_CURSOR_SKILLS = CURSOR_HOME / "skills" USER_CURSOR_AGENTS = CURSOR_HOME / "agents" HISTORY_ROOT = REPO_ROOT / "docs" / "ai-env-history" HISTORY_SUMMARIES = HISTORY_ROOT / "sessions" MANIFEST_PATH = HISTORY_ROOT / "cursor_migration_manifest.json" STAMP = dt.datetime.now(dt.timezone(dt.timedelta(hours=8))).strftime("%Y%m%d-%H%M%S") BACKUP_ROOT = CURSOR_HOME / "backups" / "neozqyy-cursor-migration" / STAMP SKILLS_TO_COPY = [ "agent-introspection-debugging", "claude-api", "code-tour", "codebase-onboarding", "repo-scan", "rules-distill", "search-first", "security-review", "strategic-compact", "tdd-workflow", "claude-agent-roles", "claude-rules-reference", ] EXPLICIT_ONLY_SKILLS = { "agent-introspection-debugging", "claude-agent-roles", "claude-rules-reference", "code-tour", "codebase-onboarding", "repo-scan", "rules-distill", "search-first", "security-review", "tdd-workflow", } NATURAL_TRIGGER_SKILLS = { "claude-api", "neozqyy-cursor-migration", "strategic-compact", } TOPIC_RULES = [ ("AI 开发环境迁移", ["迁移", "cursor", "codex", "claude", "agents", "skills", "mcp", ".cursor", ".claude"]), ("AI 应用与 Prompt", ["app2", "app3", "prompt", "dashscope", "ai_run", "ai_run_logs", "app/ai", "百炼", "千问"]), ("Admin AI 管理后台", ["admin-web", "ai 管理", "ai management", "ai triggers", "airunlogs", "aioperations"]), ("运行时上下文", ["runtime_context", "runtime context", "sandbox", "admin_runtime_context"]), ("后端 API 与服务", ["apps/backend", "fastapi", "router", "service", "schemas", "websocket"]), ("ETL 与财务口径", ["apps/etl", "dwd", "dws", "finance", "consume_money", "revenue", "财务"]), ("数据库与 RLS", ["db/", "migrations", "schema", "rls", "fdw", "view", "postgres"]), ("小程序体验", ["apps/miniprogram", "wxml", "wxss", "微信", "小程序", "board-finance"]), ("任务引擎与触发器", ["task_engine", "task_generator", "trigger", "scheduler", "任务"]), ("审计与文档", ["docs/audit", "audit", "审计", "文档", "docs/"]), ] SECRET_PATTERNS = [ re.compile(r"sk-[A-Za-z0-9_-]{12,}"), re.compile(r"sk-proj-[A-Za-z0-9_-]{12,}"), re.compile(r"(?i)(password|passwd|pwd|token|secret|api[_-]?key)\s*[:=]\s*['\"]?[^'\"\s,;]+"), re.compile(r"postgresql://[^\s'\"`]+"), re.compile(r"mysql://[^\s'\"`]+"), re.compile(r"mongodb(?:\+srv)?://[^\s'\"`]+"), ] FILE_PATTERN = re.compile( r"(?:(?:[A-Za-z]:)?[\\/](?:Project|c)[\\/]NeoZQYY[\\/])?" r"((?:apps|db|docs|tools|scripts|packages|tests|infra|\.cursor|\.claude)[\\/][^\"'\s,;)\]}]+)", re.IGNORECASE, ) manifest: dict[str, Any] = { "generated_at": STAMP, "repo_root": str(REPO_ROOT), "backup_root": str(BACKUP_ROOT), "actions": [], "history_sources": [], } def record(action: str, target: Path, source: Path | None = None, note: str = "") -> None: manifest["actions"].append( { "action": action, "target": str(target), "source": str(source) if source else None, "note": note, } ) def sanitize(text: str, limit: int | None = None) -> str: value = text.replace("\r\n", "\n").replace("\r", "\n") for pattern in SECRET_PATTERNS: value = pattern.sub("[已脱敏]", value) value = re.sub(r"\s+", " ", value).strip() if limit and len(value) > limit: return value[: limit - 1].rstrip() + "…" return value def read_text(path: Path) -> str: return path.read_text(encoding="utf-8", errors="replace") def write_text(path: Path, text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf-8", newline="\n") record("write", path) def backup(path: Path) -> None: if not path.exists(): return safe = re.sub(r"[^A-Za-z0-9_.-]+", "_", str(path).replace(":", "")) dst = BACKUP_ROOT / safe dst.parent.mkdir(parents=True, exist_ok=True) if path.is_dir(): if dst.exists(): shutil.rmtree(dst) shutil.copytree(path, dst) else: dst.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(path, dst) record("backup", dst, path) def copytree(src: Path, dst: Path) -> None: if dst.exists(): backup(dst) shutil.rmtree(dst) shutil.copytree(src, dst) record("copytree", dst, src) def parse_frontmatter(text: str) -> tuple[dict[str, str], str]: if not text.startswith("---"): return {}, text end = text.find("\n---", 3) if end == -1: return {}, text raw = text[3:end].strip() body = text[end + len("\n---") :].lstrip("\n") meta: dict[str, str] = {} for line in raw.splitlines(): if ":" not in line: continue key, value = line.split(":", 1) meta[key.strip()] = value.strip().strip('"').strip("'") return meta, body def render_frontmatter(meta: dict[str, Any], body: str) -> str: lines = ["---"] for key, value in meta.items(): if isinstance(value, bool): rendered = "true" if value else "false" else: rendered = str(value) lines.append(f"{key}: {rendered}") lines.append("---") return "\n".join(lines) + "\n\n" + body.lstrip("\n") def normalize_file_path(value: str) -> str: value = value.replace("\\\\", "\\").replace("\\", "/").strip() value = re.sub(r"^[A-Za-z]:/Project/NeoZQYY/", "", value, flags=re.IGNORECASE) value = re.sub(r"^/c/Project/NeoZQYY/", "", value, flags=re.IGNORECASE) for marker in ["\\n", "/n", "\\r", "/r", "`", "<", ">", "|"]: value = value.split(marker, 1)[0] ext_match = re.match( r"(.+?\.(?:py|ts|tsx|js|jsx|md|mdc|sql|json|toml|yaml|yml|wxml|wxss|css|scss|html|png|jpg|jpeg|xlsx|csv|txt|bat|ps1))", value, flags=re.IGNORECASE, ) if ext_match: value = ext_match.group(1) value = value.strip("/").rstrip("`").strip() lower = value.lower() if not value or len(value) > 240: return "" if "postgresql://" in lower or "=" in value: return "" if lower.endswith("/.env") or lower.endswith(".env") or ".env.local" in lower: return "" return value def extract_files(text: str) -> set[str]: return {path for match in FILE_PATTERN.finditer(text) if (path := normalize_file_path(match.group(1)))} def extract_message_text(message: Any) -> str: if isinstance(message, str): return message if isinstance(message, dict): content = message.get("content") if isinstance(content, str): return content if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict): if item.get("type") in {"text", "input_text", "output_text"}: parts.append(str(item.get("text", ""))) elif "content" in item and isinstance(item["content"], str): parts.append(item["content"]) elif isinstance(item, str): parts.append(item) return "\n".join(parts) return "" def extract_json_value(data: dict[str, Any], *path: str) -> Any: node: Any = data for key in path: if not isinstance(node, dict): return None node = node.get(key) return node def write_rule(name: str, description: str, globs: str | None, always: bool, body: str) -> None: frontmatter = ["---", f"description: {description}"] if globs: frontmatter.append(f"globs: {globs}") frontmatter.append(f"alwaysApply: {'true' if always else 'false'}") frontmatter.append("---") write_text(PROJECT_RULES / f"{name}.mdc", "\n".join(frontmatter) + "\n\n" + body.strip() + "\n") def create_project_rules() -> None: backup(PROJECT_RULES) write_rule( "neozqyy-core", "NeoZQYY 核心工作规范:中文、调研、验证、审计、dirty tree 保护。", None, True, """ # NeoZQYY 核心规范 - 始终使用中文交流、解释、审计和文档;命令、API 字段、变量名保持原文。 - 以 `AGENTS.md` 为当前权威规则,`CLAUDE.md` 作为历史兼容来源。 - 逻辑改动前先做需求审问和前置调研;用户明确跳过时除外。 - 逻辑改动后运行相关验证,输出 diff 摘要和未覆盖风险。 - 不回滚用户已有改动,不使用破坏性 git 命令,除非用户明确要求。 - 审计记录统一写入 `docs/audit/changes/`,`docs/audit/audit_dashboard.md` 只由脚本生成。 - 历史追溯优先查 `docs/ai-env-history/`、`docs/claude-history/`、`docs/audit/`,再查原始对话。 - 用户偏好模型为 GPT 5.5 与 Claude 4.7;模型选择由 Cursor UI/会话设置控制,规则只保留偏好。 - CLI / Shell 中文处理必须优先确保 UTF-8:Python 用 `encoding="utf-8"` / `PYTHONUTF8=1`,CSV 给 Excel 用 `utf-8-sig`,PowerShell/Node 避免依赖系统默认 ANSI 编码。 - 遇到中文乱码时,不要把乱码输出当作事实;先调整编码重跑,或明确说明终端编码异常并转述可确认的信息。 - Shell 路径和参数含中文、空格或特殊字符时必须正确加引号;复杂中文输出优先用脚本或结构化 API,避免手写脆弱转义。 """, ) write_rule( "backend-fastapi", "FastAPI 后端规则:响应包装、认证、AI 集成、RLS 与测试库。", "apps/backend/**", False, """ # 后端规则 - 2xx 响应经 `ResponseWrapperMiddleware` 包装为 `{ "code": 0, "data": }`。 - 后端内部使用 snake_case,JSON 输出通过 `CamelModel` 转 camelCase。 - admin、miniapp、tenant-admin 三类 JWT aud 不可混用。 - 访问 ETL FDW/RLS 视图前必须设置 `app.current_site_id`。 - AI 集成涉及 DashScope、熔断、限流、预算、缓存和运行日志,改动后必须查审计历史。 - 后端验证默认在 `apps/backend` 下运行,使用测试库,禁止连正式库。 """, ) write_rule( "etl-feiqiu", "飞球 ETL 规则:DWD-DOC 优先、金额口径、DWS 优先、禁止归档目录。", "apps/etl/connectors/feiqiu/**", False, """ # ETL 飞球规则 - 金额、支付、消费链路、字段语义优先参考 `apps/etl/connectors/feiqiu/docs/reports/DWD-DOC/`。 - `consume_money` 禁止直接用于计算,使用 `items_sum` 拆分字段。 - 助教费用必须区分 `assistant_pd_money` 和 `assistant_cx_money`。 - 正向结算使用 `settle_type IN (1, 3)`,禁止随意读取 ODS 做业务计算。 - DWS/DWD 汇总默认保持幂等,禁止 `TRUNCATE`。 - 所有 `_archived/` 目录禁止读取或参考。 """, ) write_rule( "database", "数据库规则:schema 变更、RLS 双 schema、文档同步和验证 SQL。", "db/**,docs/database/**", False, """ # 数据库规则 - 任何 PostgreSQL schema/迁移/DDL/ORM 结构变更必须同步 `docs/database/`。 - 新建 DWS/DWD RLS 视图必须同时创建原 schema 和 `app` schema 视图。 - 回滚需逆序 DROP/ALTER,并提供至少 3 条验证 SQL。 - 默认使用 `TEST_DB_DSN` / `TEST_APP_DB_DSN`,禁止连正式库。 - DDL 基线变更后运行 `tools/db/gen_consolidated_ddl.py` 并同步权威 DDL。 """, ) write_rule( "admin-web", "admin-web 规则:React/Vite/AntD、AI 管理套件与前端验证。", "apps/admin-web/**", False, """ # admin-web 规则 - `admin-web` 是开发/运维后台,不是租户后台。 - 遵循 React + Vite + Ant Design 现有页面和 API 封装风格。 - AI 管理相关改动先查 2026-04-21、2026-04-30 审计记录。 - 前端逻辑改动后优先运行 `pnpm test` / `pnpm lint`,无法运行需说明原因。 """, ) write_rule( "miniprogram", "微信小程序规则:生产小程序、Donut/TDesign、demo 标杆对齐。", "apps/miniprogram/**", False, """ # 小程序规则 - `apps/miniprogram` 是生产小程序,数据来自后端 API。 - UI 样式和展示格式需要参考 `apps/demo-miniprogram` 的 MOCK 标杆。 - 改动关键交互、鉴权、API 字段或页面跳转后必须说明验证路径。 - 涉及微信开发者工具时优先使用 `weixin-devtools-mcp` 或记录手工验证步骤。 """, ) write_rule( "demo-miniprogram-protect", "demo-miniprogram 保护规则:假数据标杆,不删除不迁移到 _DEL。", "apps/demo-miniprogram/**", False, """ # demo-miniprogram 保护 - 本目录是假数据 MOCK 版小程序,用于页面样式和展示格式标杆校对。 - 禁止删除、移入 `_DEL/` 或改造成真实 API 驱动。 - 只有在用户明确要求校正 demo 标杆时才修改。 """, ) write_rule( "audit-history", "历史追溯规则:优先使用精简索引、审计记录和当前代码。", "docs/**", False, """ # 历史追溯规则 - 日常追溯先查 `docs/ai-env-history/README.md`、`docs/claude-history/`、`docs/audit/changes/`。 - 历史摘要只解释来龙去脉,编码前仍以当前文件、当前 diff、当前测试为准。 - 原始 JSONL 可用于追查细节,但不要把密钥、DSN、token 原文写入仓库文档。 """, ) def create_user_skills() -> None: USER_CURSOR_SKILLS.mkdir(parents=True, exist_ok=True) for name in SKILLS_TO_COPY: src = CODEX_HOME / "skills" / name if not src.exists(): src = CLAUDE_HOME / "skills" / name if src.exists(): target = USER_CURSOR_SKILLS / name copytree(src, target) skill_md = target / "SKILL.md" if name in EXPLICIT_ONLY_SKILLS and skill_md.exists(): meta, body = parse_frontmatter(read_text(skill_md)) meta["disable-model-invocation"] = True write_text(skill_md, render_frontmatter(meta, body)) skill = """--- name: neozqyy-cursor-migration description: NeoZQYY AI 开发环境迁移与历史追溯工作流。Use when 需要理解 Claude Code、Codex、Cursor 迁移关系,查历史对话,或恢复用户 AI 开发习惯。 --- # NeoZQYY Cursor 迁移 ## 快速入口 1. 当前权威项目规则:`AGENTS.md` 与 `.cursor/rules/`。 2. 迁移说明:`docs/cursor_migration.md`。 3. 历史索引:`docs/ai-env-history/README.md`。 4. Claude 历史摘要:`docs/claude-history/`。 5. 审计记录:`docs/audit/changes/`。 ## 使用原则 - 先查精简索引,再读原始对话。 - 历史用于理解前因后果,不替代当前代码和测试。 - GPT 5.5 / Claude 4.7 是用户偏好模型;具体模型选择由 Cursor 当前会话或 UI 控制。 """ write_text(USER_CURSOR_SKILLS / "neozqyy-cursor-migration" / "SKILL.md", skill) def create_user_agents() -> None: USER_CURSOR_AGENTS.mkdir(parents=True, exist_ok=True) src_root = CLAUDE_HOME / "agents" if not src_root.exists(): return for src in sorted(src_root.glob("*.md")): text = read_text(src) meta, body = parse_frontmatter(text) name = re.sub(r"[^a-z0-9-]", "-", meta.get("name", src.stem).lower()).strip("-") or src.stem description = meta.get("description") or f"从 Claude Code 迁移的 {name} 子代理。Use proactively when relevant." target = USER_CURSOR_AGENTS / f"{name}.md" write_text(target, f"---\nname: {name}\ndescription: {description}\n---\n\n{body.strip()}\n") def create_project_skills() -> None: backup(PROJECT_SKILLS) commands_dir = REPO_ROOT / ".claude" / "commands" for src in sorted(commands_dir.glob("*.md")): body = read_text(src) title = body.splitlines()[0].lstrip("# ").strip() if body.splitlines() else src.stem description = f"{title}。从 Claude Code 命令迁移为 Cursor project skill;用户要求执行 {src.stem}、/{src.stem} 或相关流程时使用。" content = ( "---\n" f"name: {src.stem}\n" f"description: {description}\n" "disable-model-invocation: true\n" "---\n\n" + body.strip() + "\n" ) write_text(PROJECT_SKILLS / src.stem / "SKILL.md", content) def create_project_hooks() -> None: backup(PROJECT_CURSOR / "hooks.json") backup(PROJECT_HOOKS) hook_config = { "version": 1, "hooks": { "preToolUse": [ { "command": ".venv\\Scripts\\python.exe .cursor\\hooks\\ai_env_guard.py preToolUse", "matcher": "Read|Glob|Edit|Write|ApplyPatch", "timeout": 5, "failClosed": True, } ], "beforeReadFile": [ { "command": ".venv\\Scripts\\python.exe .cursor\\hooks\\ai_env_guard.py beforeReadFile", "matcher": "_archived", "timeout": 5, "failClosed": True, } ], "beforeMCPExecution": [ { "command": ".venv\\Scripts\\python.exe .cursor\\hooks\\ai_env_guard.py beforeMCPExecution", "matcher": "pg-etl|pg-app", "timeout": 5, "failClosed": False, } ], "beforeShellExecution": [ { "command": ".venv\\Scripts\\python.exe .cursor\\hooks\\ai_env_guard.py beforeShellExecution", "matcher": "git\\s+(reset|checkout)|--no-verify|psql|mcp-postgres|PG_DSN|APP_DB_DSN", "timeout": 5, "failClosed": False, } ], "postToolUse": [ { "command": ".venv\\Scripts\\python.exe .cursor\\hooks\\ai_env_guard.py postToolUse", "matcher": "ApplyPatch|Edit|Write", "timeout": 5, "failClosed": False, } ], }, } write_text(PROJECT_CURSOR / "hooks.json", json.dumps(hook_config, ensure_ascii=False, indent=2) + "\n") hook_script = r'''from __future__ import annotations import json import re import sys from pathlib import Path ARCHIVED_RE = re.compile(r"(^|[/\\])_archived([/\\]|$)", re.IGNORECASE) PROD_MCP_RE = re.compile(r"\b(pg-etl|pg-app)\b", re.IGNORECASE) TEST_MCP_RE = re.compile(r"\b(pg-etl-test|pg-app-test)\b", re.IGNORECASE) WRITE_SQL_RE = re.compile( r"\b(insert|update|delete|truncate|drop|alter|create|grant|revoke|merge|copy|call|vacuum|reindex)\b", re.IGNORECASE, ) def load_input() -> dict: try: return json.load(sys.stdin) except Exception: return {} def target_text(data: dict) -> str: chunks = [json.dumps(data, ensure_ascii=False)] for key in ("command", "file_path", "path"): value = data.get(key) if isinstance(value, str): chunks.append(value) tool_input = data.get("tool_input") if isinstance(tool_input, dict): chunks.append(json.dumps(tool_input, ensure_ascii=False)) return "\n".join(chunks) def allow() -> None: print(json.dumps({"permission": "allow"}, ensure_ascii=False)) def before_shell(data: dict) -> None: text = target_text(data) dangerous = [ r"git\s+reset\s+--hard", r"git\s+checkout\s+--", r"--no-verify", r"\bPG_DSN\b", r"\bAPP_DB_DSN\b", r"\bpsql\b", ] if any(re.search(pattern, text, re.IGNORECASE) for pattern in dangerous): print(json.dumps({ "permission": "ask", "user_message": "命令命中 NeoZQYY 高风险规则:请确认是否需要执行。", "agent_message": "执行前说明风险、目标库/文件、回滚方式;生产库默认不执行。", }, ensure_ascii=False)) return allow() def guard_archived(data: dict) -> None: text = target_text(data) if ARCHIVED_RE.search(text): print(json.dumps({ "permission": "deny", "user_message": "已阻断:`_archived/` 是废弃归档目录,禁止读取、搜索或作为实现参考。", "agent_message": "改用当前版本文件;如确需考古,请让用户明确授权并说明目的。", }, ensure_ascii=False)) return allow() def before_mcp(data: dict) -> None: text = target_text(data) if not PROD_MCP_RE.search(text) or TEST_MCP_RE.search(text): allow() return if WRITE_SQL_RE.search(text): print(json.dumps({ "permission": "deny", "user_message": "已阻断:生产库 MCP 写入/DDL 类操作默认禁止。请改用测试库验证,或让用户给出单次明确授权。", "agent_message": "生产库仅允许人工确认后的只读排查;DDL/写入需走单独变更流程。", }, ensure_ascii=False)) return print(json.dumps({ "permission": "ask", "user_message": "检测到生产库 MCP 只读调用。请确认本次查询目的、SQL 是否只读、是否会暴露敏感数据。", "agent_message": "说明查询目的、目标库、SQL 摘要和风险后等待用户确认。", }, ensure_ascii=False)) def post_tool_use(data: dict) -> None: text = target_text(data).replace("\\", "/") hints = [] if "/_archived/" in text or text.endswith("/_archived"): hints.append("检测到 `_archived/` 路径:该目录内容已废弃,禁止作为实现参考。") if "apps/demo-miniprogram/" in text: hints.append("检测到 demo-miniprogram:这是 MOCK 标杆目录,修改前需确认目的。") if re.search(r"(db/|docs/database/|migrations/|schemas/)", text): hints.append("检测到数据库相关改动:如影响 schema,需同步 `docs/database/` 并提供 3 条验证 SQL。") if re.search(r"(apps/backend/|apps/etl/|app/ai/|prompts/)", text): hints.append("检测到后端/ETL/AI 逻辑改动:完成后需运行相关测试并写审计记录。") if hints: print(json.dumps({"additional_context": "\n".join(f"[NeoZQYY] {hint}" for hint in hints)}, ensure_ascii=False)) def main() -> None: mode = sys.argv[1] if len(sys.argv) > 1 else "" data = load_input() if mode in {"preToolUse", "beforeReadFile"}: guard_archived(data) elif mode == "beforeMCPExecution": before_mcp(data) elif mode == "beforeShellExecution": before_shell(data) elif mode == "postToolUse": post_tool_use(data) if __name__ == "__main__": main() ''' write_text(PROJECT_HOOKS / "ai_env_guard.py", hook_script) def summarize_jsonl(path: Path, source: str) -> dict[str, Any]: session_id = path.stem timestamps: list[str] = [] cwd_values: Counter[str] = Counter() user_prompts: list[str] = [] assistant_notes: list[str] = [] tools: Counter[str] = Counter() files: set[str] = set() line_count = 0 with path.open("r", encoding="utf-8", errors="replace") as fh: for line in fh: line_count += 1 raw = line.strip() if not raw: continue files.update(extract_files(raw)) try: data = json.loads(raw) except Exception: continue ts = data.get("timestamp") or extract_json_value(data, "payload", "timestamp") if isinstance(ts, str): timestamps.append(ts) sid = data.get("sessionId") or extract_json_value(data, "payload", "id") if isinstance(sid, str): session_id = sid cwd = data.get("cwd") or extract_json_value(data, "payload", "cwd") if isinstance(cwd, str): cwd_values[cwd] += 1 role = data.get("type") or data.get("role") or extract_json_value(data, "payload", "role") if role == "user": prompt = extract_message_text(data.get("message") or data) if prompt: user_prompts.append(sanitize(prompt, 600)) elif role == "assistant": note = extract_message_text(data.get("message") or data) if note: assistant_notes.append(sanitize(note, 400)) payload = data.get("payload") if isinstance(data.get("payload"), dict) else {} if payload: if payload.get("type") == "function_call": name = payload.get("name") if isinstance(name, str): tools[name] += 1 args = payload.get("arguments") if isinstance(args, str): files.update(extract_files(args)) if payload.get("type") == "message": for item in payload.get("content", []) or []: if isinstance(item, dict): if item.get("type") == "tool_use": name = item.get("name") if isinstance(name, str): tools[name] += 1 files.update(extract_files(json.dumps(item, ensure_ascii=False))) message = data.get("message") if isinstance(message, dict): for item in message.get("content", []) or []: if isinstance(item, dict): if item.get("type") == "tool_use": name = item.get("name") if isinstance(name, str): tools[name] += 1 files.update(extract_files(json.dumps(item, ensure_ascii=False))) attachment = data.get("attachment") if isinstance(attachment, dict): atype = attachment.get("type") if isinstance(atype, str): tools[atype] += 1 first_ts = min(timestamps) if timestamps else "" last_ts = max(timestamps) if timestamps else "" prompts = user_prompts[:12] notes = assistant_notes[:8] risk_flags = [] joined_files = " ".join(sorted(files)).lower() if "db/" in joined_files or "migrations/" in joined_files or "schemas/" in joined_files: risk_flags.append("数据库") if "apps/backend/" in joined_files: risk_flags.append("后端") if "apps/etl/" in joined_files: risk_flags.append("ETL") if "apps/miniprogram/" in joined_files: risk_flags.append("小程序") if any("_archived" in item.lower() for item in files): risk_flags.append("涉及归档目录") return { "source": source, "session_id": session_id, "path": str(path), "first_ts": first_ts, "last_ts": last_ts, "line_count": line_count, "cwd": cwd_values.most_common(1)[0][0] if cwd_values else "", "user_prompts": prompts, "assistant_notes": notes, "tools": dict(tools.most_common(20)), "files": sorted(files), "risk_flags": risk_flags, "topics": classify_topics( "\n".join(user_prompts + assistant_notes + sorted(files) + list(tools.keys())) ), } def classify_topics(text: str) -> list[str]: lower = text.lower() topics = [] for topic, keywords in TOPIC_RULES: if any(keyword.lower() in lower for keyword in keywords): topics.append(topic) return topics or ["未分类"] def write_session_summary(summary: dict[str, Any]) -> None: source = summary["source"] sid = re.sub(r"[^A-Za-z0-9_.-]+", "_", summary["session_id"])[:120] target = HISTORY_SUMMARIES / source / f"{sid}.md" prompts = "\n".join(f"- {item}" for item in summary["user_prompts"]) or "- 未识别" notes = "\n".join(f"- {item}" for item in summary["assistant_notes"]) or "- 未识别" files = "\n".join(f"- `{item}`" for item in summary["files"][:80]) or "- 未识别" tools = "\n".join(f"- `{name}`: {count}" for name, count in summary["tools"].items()) or "- 未识别" risks = "、".join(summary["risk_flags"]) if summary["risk_flags"] else "未识别" body = f"""# {source} 会话 {summary['session_id']} ## 基本信息 - 来源:{source} - 时间:{summary['first_ts']} -> {summary['last_ts']} - 行数:{summary['line_count']} - cwd:`{summary['cwd']}` - 风险标签:{risks} - 主题:{", ".join(summary.get("topics", ["未分类"]))} - 原始记录:`{summary['path']}` ## 用户需求线索 {prompts} ## 助手回应线索 {notes} ## 工具与事件 {tools} ## 文件影响线索 {files} ## 使用说明 本摘要由原始 JSONL 逐行分析生成,用于理解对话前因后果。编码前仍需读取当前文件、git diff、审计记录和测试结果。 """ write_text(target, body) def collect_history_paths() -> dict[str, list[Path]]: return { "claude": sorted((CLAUDE_HOME / "projects" / "C--Project-NeoZQYY").glob("*.jsonl")), "codex": sorted((CODEX_HOME / "sessions").glob("**/*.jsonl")) + sorted((CODEX_HOME / "archived_sessions").glob("*.jsonl")), "cursor": sorted((CURSOR_HOME / "projects" / "c-Project-NeoZQYY" / "agent-transcripts").glob("*/*.jsonl")), } def analyze_histories() -> None: backup(HISTORY_ROOT) sources = collect_history_paths() all_summaries: list[dict[str, Any]] = [] file_index: dict[str, list[dict[str, str]]] = defaultdict(list) for source, paths in sources.items(): manifest["history_sources"].append({"source": source, "count": len(paths)}) for path in paths: summary = summarize_jsonl(path, source) all_summaries.append(summary) write_session_summary(summary) for file_path in summary["files"]: file_index[file_path].append( { "source": source, "session_id": summary["session_id"], "first_ts": summary["first_ts"], "last_ts": summary["last_ts"], "summary": str(HISTORY_SUMMARIES / source / f"{re.sub(r'[^A-Za-z0-9_.-]+', '_', summary['session_id'])[:120]}.md"), } ) HISTORY_ROOT.mkdir(parents=True, exist_ok=True) with (HISTORY_ROOT / "conversation_index.csv").open("w", encoding="utf-8-sig", newline="") as fh: writer = csv.DictWriter( fh, fieldnames=["source", "session_id", "first_ts", "last_ts", "line_count", "cwd", "file_count", "risk_flags", "summary_path", "raw_path"], ) writer.writeheader() for summary in sorted(all_summaries, key=lambda item: (item["first_ts"], item["source"])): sid = re.sub(r"[^A-Za-z0-9_.-]+", "_", summary["session_id"])[:120] writer.writerow( { "source": summary["source"], "session_id": summary["session_id"], "first_ts": summary["first_ts"], "last_ts": summary["last_ts"], "line_count": summary["line_count"], "cwd": summary["cwd"], "file_count": len(summary["files"]), "risk_flags": ";".join(summary["risk_flags"]), "summary_path": str(HISTORY_SUMMARIES / summary["source"] / f"{sid}.md"), "raw_path": summary["path"], } ) record("write", HISTORY_ROOT / "conversation_index.csv") with (HISTORY_ROOT / "file_impact_index.csv").open("w", encoding="utf-8-sig", newline="") as fh: writer = csv.DictWriter(fh, fieldnames=["file", "source", "session_id", "first_ts", "last_ts", "summary"]) writer.writeheader() for file_path, rows in sorted(file_index.items()): for row in rows: writer.writerow({"file": file_path, **row}) record("write", HISTORY_ROOT / "file_impact_index.csv") with (HISTORY_ROOT / "topic_timeline.csv").open("w", encoding="utf-8-sig", newline="") as fh: writer = csv.DictWriter( fh, fieldnames=["topic", "source", "session_id", "first_ts", "last_ts", "risk_flags", "prompt", "summary_path", "raw_path"], ) writer.writeheader() for summary in sorted(all_summaries, key=lambda item: (item["first_ts"], item["source"])): sid = re.sub(r"[^A-Za-z0-9_.-]+", "_", summary["session_id"])[:120] prompt = summary["user_prompts"][0] if summary["user_prompts"] else "" for topic in summary.get("topics", ["未分类"]): writer.writerow( { "topic": topic, "source": summary["source"], "session_id": summary["session_id"], "first_ts": summary["first_ts"], "last_ts": summary["last_ts"], "risk_flags": ";".join(summary["risk_flags"]), "prompt": prompt, "summary_path": str(HISTORY_SUMMARIES / summary["source"] / f"{sid}.md"), "raw_path": summary["path"], } ) record("write", HISTORY_ROOT / "topic_timeline.csv") grouped: dict[str, list[dict[str, Any]]] = defaultdict(list) for summary in all_summaries: for topic in summary.get("topics", ["未分类"]): grouped[topic].append(summary) parts = ["# AI 开发历史主题时间线", ""] for topic in sorted(grouped): parts.append(f"## {topic}") for summary in sorted(grouped[topic], key=lambda item: item["first_ts"])[:80]: sid = re.sub(r"[^A-Za-z0-9_.-]+", "_", summary["session_id"])[:120] prompt = summary["user_prompts"][0] if summary["user_prompts"] else "未识别用户需求" risks = "、".join(summary["risk_flags"]) if summary["risk_flags"] else "无显式风险标签" parts.append( f"- {summary['first_ts']} -> {summary['last_ts']} " f"`{summary['source']}` `{summary['session_id']}`:{prompt} " f"(风险:{risks};摘要:`docs/ai-env-history/sessions/{summary['source']}/{sid}.md`)" ) parts.append("") write_text(HISTORY_ROOT / "topic_timeline.md", "\n".join(parts).rstrip() + "\n") claude_history = CLAUDE_HOME / "history.jsonl" plans = sorted((CLAUDE_HOME / "plans").glob("*.md")) file_history_count = len(list((CLAUDE_HOME / "file-history").glob("*/*"))) if (CLAUDE_HOME / "file-history").exists() else 0 checkpoints = sorted((HOME / "AppData" / "Roaming" / "Cursor" / "User" / "globalStorage" / "anysphere.cursor-commits" / "checkpoints").glob("*/metadata.json")) local_history = sorted((HOME / "AppData" / "Roaming" / "Cursor" / "User" / "History").glob("**/*.json")) catalog = f"""# AI 开发环境历史索引 ## 覆盖范围 - Claude 原始项目会话:{len(sources['claude'])} 个,逐一生成摘要。 - Codex 本地/归档会话:{len(sources['codex'])} 个,逐一生成摘要。 - Cursor 父会话 transcripts:{len(sources['cursor'])} 个,逐一生成摘要。 - Claude 全局 history:`{claude_history}`,存在:{claude_history.exists()}。 - Claude plans:{len(plans)} 份。 - Claude file-history 快照:约 {file_history_count} 个。 - Cursor 本地文件历史 JSON:{len(local_history)} 个。 - Cursor checkpoints:{len(checkpoints)} 个。 ## 主要入口 - 会话索引:`docs/ai-env-history/conversation_index.csv` - 文件影响索引:`docs/ai-env-history/file_impact_index.csv` - 主题时间线:`docs/ai-env-history/topic_timeline.md` - 会话摘要:`docs/ai-env-history/sessions/` - Claude 旧摘要:`docs/claude-history/` - 审计记录:`docs/audit/changes/` ## 使用策略 1. 查“某文件为什么改”:先查 `file_impact_index.csv`,再读对应会话摘要和审计记录。 2. 查“某次对话做了什么”:先查 `conversation_index.csv`,再读摘要;必要时读原始 JSONL。 3. 恢复误改:再查 Claude file-history、Cursor local History 或 Cursor checkpoints。 4. 原始记录可能包含敏感信息;摘要会脱敏常见密钥和 DSN 形态,原文只在本机按需读取。 """ write_text(HISTORY_ROOT / "README.md", catalog) def create_cursor_migration_doc() -> None: doc = """# Cursor AI 开发环境迁移说明 ## 状态 本轮把 NeoZQYY 的 AI 开发环境从 Claude Code / Codex 迁移到 Cursor 原生资产: - 项目规则:`.cursor/rules/` - 项目流程技能:`.cursor/skills/` - 用户技能:`C:\\Users\\Administrator\\.cursor\\skills` - 用户子代理:`C:\\Users\\Administrator\\.cursor\\agents` - 轻量 hooks:`.cursor/hooks.json` 与 `.cursor/hooks/` - 历史索引:`docs/ai-env-history/` ## 去重原则 1. 当前仓库 `AGENTS.md` 是项目权威规则。 2. `CLAUDE.md` 保留为历史兼容来源。 3. `.cursor/rules` 只放短规则和入口,不复制长文。 4. skills 使用“短入口 + reference”,避免每次会话加载过多旧内容。 5. 原始对话逐一分析为摘要和索引,不把原始 JSONL 全文写入仓库。 ## 模型偏好 用户主要使用 GPT 5.5 与 Claude 4.7。Cursor 的实际模型选择由当前会话或 UI 控制;本迁移只把偏好写入规则和说明,不强写不可控的内部模型字段。 ## 内置流程与规则边界 Cursor、GPT 5.5、Claude 4.7 都具备调研、审计、验证、子代理和工具调用能力,但它们不知道 NeoZQYY 的项目事实、历史踩坑、数据库口径、RLS 双 schema、demo 标杆保护和审计路径。因此仍需要项目规则和技能作为触发器与约束。 规则不重复实现模型能力,只固化三类内容: - 项目事实:目录职责、数据库、ETL、后端、前端、小程序约定。 - 用户习惯:中文、先调研、再改动、再验证、再审计。 - 风险边界:生产库禁用、敏感信息不扩散、危险 git/SQL/归档目录保护。 ## 历史入口 - 新索引:`docs/ai-env-history/README.md` - Claude 摘要:`docs/claude-history/README.md` - 审计一览:`docs/audit/audit_dashboard.md` - Codex 迁移说明:`docs/codex_migration.md` ## 用户级资产触发策略 ### 保留自然触发 - `neozqyy-cursor-migration`:NeoZQYY 迁移和历史追溯入口。 - `strategic-compact`:长会话上下文压缩建议。 - `claude-api`:仅在 Anthropic / Claude API 场景触发。 ### 改为显式触发 - `claude-agent-roles`:旧 Claude agents 参考;日常使用 Cursor subagents。 - `claude-rules-reference`:旧 Claude rules 档案;当前权威规则是 `AGENTS.md` 与 `.cursor/rules/`。 - `tdd-workflow`:详细 TDD 参考;日常入口是 `tdd-guide` subagent。 - `security-review`:详细安全 checklist;日常入口是 `security-reviewer` subagent。 - `search-first`:通用搜索优先流程;NeoZQYY 内优先使用 `.cursor/skills/pre-change`。 - `rules-distill`、`repo-scan`、`codebase-onboarding`、`code-tour`、`agent-introspection-debugging`:低频或重型工具,按需显式使用。 保留全部 8 个用户级 subagents:`planner`、`architect`、`code-reviewer`、`security-reviewer`、`database-reviewer`、`python-reviewer`、`tdd-guide`、`refactor-cleaner`。 ## Hooks 强度判断 当前 hooks 保持轻量提醒和 ask,不默认强阻断。建议: - `_archived/` 读取/搜索:已升级为 `failClosed` 强阻断。依据:项目规则明确禁止参考归档目录,误用概率高。 - 危险 git 命令:保持 ask,不建议 `failClosed`。依据:极少数维护场景可能需要,必须由用户明确确认。 - 生产库 MCP / 生产 DSN:只读调用走 ask;写入/DDL 类调用 deny。依据:测试库足够覆盖日常验证,生产库写操作风险高。 - demo-miniprogram:保持提醒,不建议强阻断。依据:它是标杆目录,偶尔仍可能需要按用户明确要求修改。 - DB schema 改动提醒:保持提醒。依据:是否需要文档同步要结合实际变更判断。 ## MCP 策略 延续既有用途: - `pg-etl-test`、`pg-app-test`:测试库,默认可用。 - `pg-etl`、`pg-app`:生产库,默认禁用或 ask,不自动执行。 - `weixin-devtools-mcp`:小程序调试和截图验证。 - `playwright` / `cursor-ide-browser`:Web 前端验证、截图、交互检查。 - `openapi`:后端 API 合同查询。 ## VSCode Insiders 迁移 - 用户数据目录:`C:\\Users\\Administrator\\AppData\\Roaming\\Code - Insiders\\User` - 扩展清单:`C:\\Users\\Administrator\\.vscode-insiders\\extensions\\extensions.json` - Cursor 扩展对照表:`docs/ai-env-history/vscode_insiders_extensions.csv` - 补装脚本:`tools/cursor/install_vscode_insiders_extensions.ps1` - 旧环境 disabled 状态已记录;Cursor 没有稳定 CLI 持久禁用接口,补装后需要在 Cursor UI 中按清单确认禁用状态。 ## 主题时间线 - 主题时间线:`docs/ai-env-history/topic_timeline.md` - 主题 CSV:`docs/ai-env-history/topic_timeline.csv` ## 验证 ```powershell .venv\\Scripts\\python.exe tools\\cursor\\migrate_ai_environment.py --check .venv\\Scripts\\python.exe scripts\\audit\\prescan.py --files "tools/cursor/migrate_ai_environment.py,docs/cursor_migration.md" ``` ## 回滚 迁移脚本会把覆盖前的 Cursor 用户资产和项目 `.cursor` 资产备份到: `C:\\Users\\Administrator\\.cursor\\backups\\neozqyy-cursor-migration\\` 如需回滚,按 manifest 中的源/目标路径恢复即可。 """ write_text(REPO_ROOT / "docs" / "cursor_migration.md", doc) def create_audit_record() -> None: now = dt.datetime.now(dt.timezone(dt.timedelta(hours=8))).strftime("%Y-%m-%d %H:%M:%S") record = f"""# 变更审计记录:Cursor AI 开发环境迁移 | 字段 | 值 | |------|-----| | 日期 | {now} | ## 操作摘要 本轮将 Claude Code 与 Codex 中已经沉淀的 AI 开发习惯迁移到 Cursor 原生结构。迁移目标是让 Cursor 继承中文沟通、前置调研、验证、审计、子代理审查、MCP 测试库优先和历史追溯习惯,同时避免把长规则和原始对话全文堆入常规上下文。 ## 变更文件 ### 新增/修改 - `tools/cursor/migrate_ai_environment.py`:可重复迁移脚本。 - `.cursor/rules/`:Cursor 项目级短规则。 - `.cursor/skills/`:审计、前置调研、文档同步和 spec 收尾流程技能。 - `.cursor/hooks.json`、`.cursor/hooks/ai_env_guard.py`:轻量提醒/保护 hooks。 - `docs/cursor_migration.md`:Cursor 迁移说明。 - `docs/ai-env-history/`:Claude/Codex/Cursor 原始对话精简索引和逐会话摘要。 - `C:\\Users\\Administrator\\.cursor\\skills`:用户级技能迁移。 - `C:\\Users\\Administrator\\.cursor\\agents`:用户级子代理迁移。 ## 数据库变更 无数据库 schema 变更。未执行 DDL,未修改迁移 SQL。 ## 风险与回滚 - 中:Cursor hooks 与 Claude hooks 事件字段不完全一致,本轮只启用轻量提醒和 ask,不启用强阻断。 - 中:历史摘要从原始 JSONL 自动分析,仍可能遗漏自然语言里的隐含决策;关键判断需回看原文和当前代码。 - 低:用户级 Cursor 资产覆盖前已备份,可从 manifest 恢复。 ## 验证 - 运行 `python tools/cursor/migrate_ai_environment.py --check` 校验生成结构。 - 运行 `python scripts/audit/prescan.py --files "<本轮迁移文件>"` 做审计预扫描。 - 解析 `.mcp.json`、`.cursor/hooks.json`、skill/subagent frontmatter。 ## 合规检查 - 文档同步:已新增 `docs/cursor_migration.md` 和 `docs/ai-env-history/README.md`。 - 数据库文档:不适用,本轮无 schema 变更。 - 审计记录:已落盘。 """ write_text(REPO_ROOT / "docs" / "audit" / "changes" / "2026-05-01__cursor_migration.md", record) def update_cursor_cli_config() -> None: path = CURSOR_HOME / "cli-config.json" if path.exists(): backup(path) try: data = json.loads(read_text(path)) except Exception: data = {} else: data = {} data.setdefault("display", {}) data["display"].setdefault("showStatusIndicators", True) data.setdefault("network", {}) data["network"].setdefault("useHttp1ForAgent", True) data.setdefault("permissions", {}) data["permissions"].setdefault("allow", []) data["permissions"].setdefault("deny", []) write_text(path, json.dumps(data, ensure_ascii=False, indent=2) + "\n") def collect_dev_environment_note() -> None: cursor_extensions = CURSOR_HOME / "extensions" / "extensions.json" vscode_user = HOME / "AppData" / "Roaming" / "Code - Insiders" / "User" vscode_settings = vscode_user / "settings.json" vscode_keybindings = vscode_user / "keybindings.json" vscode_extensions = HOME / ".vscode-insiders" / "extensions" / "extensions.json" sync_extensions = sorted((vscode_user / "sync" / "extensions").glob("*.json")) cursor_ids = read_extension_ids(cursor_extensions) vscode_ids = read_extension_ids(vscode_extensions) disabled_ids = read_disabled_synced_extension_ids(sync_extensions) missing_all = sorted(vscode_ids - cursor_ids) disabled_missing = sorted((disabled_ids & vscode_ids) - cursor_ids) install_script = [ "# 从 VSCode Insiders 迁移到 Cursor 的扩展补装脚本", "# 自动生成;安装旧 VSCode Insiders 中已安装但 Cursor 当前缺失的扩展。", "# 旧环境 disabled 状态见 docs/ai-env-history/vscode_insiders_extensions.csv;Cursor 无稳定 CLI 持久禁用接口,需在 UI 中按清单确认。", "$ErrorActionPreference = 'Continue'", "$cursor = 'cursor'", "$timeoutSec = 90", "$extensions = @(", ] for extension_id in missing_all: install_script.append(f' "{extension_id}"') install_script.extend( [ ")", "foreach ($ext in $extensions) {", " Write-Host \"Installing $ext ...\"", " $p = Start-Process -FilePath $cursor -ArgumentList @('--install-extension', $ext) -NoNewWindow -PassThru -Wait:$false", " if (-not $p.WaitForExit($timeoutSec * 1000)) {", " Write-Warning \"Timeout installing $ext; killing process and continuing.\"", " try { Stop-Process -Id $p.Id -Force } catch {}", " continue", " }", " Write-Host \"Finished $ext exit=$($p.ExitCode)\"", "}", "", ] ) if disabled_missing: install_script.append("") install_script.append("# 以下扩展在 VSCode Insiders 同步状态中为 disabled,安装后建议在 Cursor UI 中禁用:") for extension_id in disabled_missing: install_script.append(f"# disabled-before: {extension_id}") write_text(REPO_ROOT / "tools" / "cursor" / "install_vscode_insiders_extensions.ps1", "\n".join(install_script) + "\n") with (HISTORY_ROOT / "vscode_insiders_extensions.csv").open("w", encoding="utf-8-sig", newline="") as fh: writer = csv.DictWriter(fh, fieldnames=["extension_id", "in_vscode_insiders", "in_cursor", "disabled_in_sync", "recommended_action"]) writer.writeheader() for extension_id in sorted(vscode_ids | cursor_ids | disabled_ids): in_vscode = extension_id in vscode_ids in_cursor = extension_id in cursor_ids disabled = extension_id in disabled_ids if in_cursor: action = "已在 Cursor" elif disabled: action = "安装到 Cursor 后按旧环境状态禁用" elif in_vscode: action = "建议安装到 Cursor" else: action = "仅 Cursor 当前存在" writer.writerow( { "extension_id": extension_id, "in_vscode_insiders": in_vscode, "in_cursor": in_cursor, "disabled_in_sync": disabled, "recommended_action": action, } ) record("write", HISTORY_ROOT / "vscode_insiders_extensions.csv") note = f"""# Cursor / VSCode Insiders 开发环境盘点 ## Cursor - 用户设置:`{CURSOR_USER_SETTINGS}`,存在:{CURSOR_USER_SETTINGS.exists()} - 扩展清单:`{cursor_extensions}`,存在:{cursor_extensions.exists()} - 当前 Cursor 扩展数量:{len(cursor_ids)} ## 旧 VSCode Insiders - 根目录:`{VSCODE_INSIDERS_ROOT}`,存在:{VSCODE_INSIDERS_ROOT.exists()} - 用户数据目录:`{vscode_user}`,存在:{vscode_user.exists()} - 设置文件:`{vscode_settings}`,存在:{vscode_settings.exists()} - 键位文件:`{vscode_keybindings}`,存在:{vscode_keybindings.exists()} - 扩展清单:`{vscode_extensions}`,存在:{vscode_extensions.exists()} - VSCode Insiders 扩展数量:{len(vscode_ids)} - 同步状态中禁用扩展数量:{len(disabled_ids)} - Cursor 缺失且建议安装数量:{len(missing_all)} ## 扩展迁移 - 扩展对照表:`docs/ai-env-history/vscode_insiders_extensions.csv` - 安装脚本:`tools/cursor/install_vscode_insiders_extensions.ps1` - 旧环境 disabled 状态已写入扩展对照表;补装后需在 Cursor UI 中按表确认禁用状态。 - 未发现旧环境中已安装 Playwright、PostgreSQL/SQL、微信开发者工具扩展;这些能力当前主要由 MCP 和项目命令承接。 """ write_text(HISTORY_ROOT / "dev_environment_inventory.md", note) def read_extension_ids(path: Path) -> set[str]: if not path.exists(): return set() try: data = json.loads(read_text(path)) except Exception: return set() ids = set() if isinstance(data, list): for item in data: identifier = item.get("identifier") if isinstance(item, dict) else None if isinstance(identifier, dict) and isinstance(identifier.get("id"), str): ids.add(identifier["id"].lower()) return ids def read_disabled_synced_extension_ids(paths: list[Path]) -> set[str]: disabled: set[str] = set() for path in paths: try: wrapper = json.loads(read_text(path)) except Exception: continue contents: list[str] = [] if isinstance(wrapper.get("content"), str): contents.append(wrapper["content"]) sync_data = wrapper.get("syncData") if isinstance(sync_data, dict) and isinstance(sync_data.get("content"), str): contents.append(sync_data["content"]) for content in contents: try: extensions = json.loads(content) except Exception: continue if not isinstance(extensions, list): continue for item in extensions: if not isinstance(item, dict) or not item.get("disabled"): continue identifier = item.get("identifier") if isinstance(identifier, dict) and isinstance(identifier.get("id"), str): disabled.add(identifier["id"].lower()) return disabled def run_migration() -> None: for path in [ PROJECT_CURSOR, PROJECT_RULES, PROJECT_SKILLS, USER_CURSOR_SKILLS, USER_CURSOR_AGENTS, HISTORY_ROOT, CURSOR_USER_SETTINGS, ]: backup(path) create_project_rules() create_project_skills() create_project_hooks() create_user_skills() create_user_agents() update_cursor_cli_config() analyze_histories() collect_dev_environment_note() create_cursor_migration_doc() create_audit_record() write_text(MANIFEST_PATH, json.dumps(manifest, ensure_ascii=False, indent=2) + "\n") def check_generated() -> int: required = [ PROJECT_RULES / "neozqyy-core.mdc", PROJECT_SKILLS / "audit" / "SKILL.md", PROJECT_CURSOR / "hooks.json", USER_CURSOR_SKILLS / "neozqyy-cursor-migration" / "SKILL.md", USER_CURSOR_AGENTS / "planner.md", HISTORY_ROOT / "conversation_index.csv", HISTORY_ROOT / "file_impact_index.csv", REPO_ROOT / "docs" / "cursor_migration.md", REPO_ROOT / "docs" / "audit" / "changes" / "2026-05-01__cursor_migration.md", ] missing = [path for path in required if not path.exists()] for json_path in [PROJECT_CURSOR / "hooks.json", MANIFEST_PATH, CURSOR_HOME / "cli-config.json"]: if json_path.exists(): json.loads(read_text(json_path)) if missing: print("缺失文件:") for path in missing: print(f"- {path}") return 1 print("Cursor 迁移生成文件校验通过。") return 0 if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="迁移 NeoZQYY AI 开发环境到 Cursor。") parser.add_argument("--check", action="store_true", help="只校验生成结果。") args = parser.parse_args() if args.check: raise SystemExit(check_generated()) run_migration() raise SystemExit(check_generated())