from __future__ import annotations import json import re import sys from pathlib import Path ARCHIVED_RE = re.compile(r"(^|[/\\])_archived([/\\]|$)", re.IGNORECASE) PROD_MCP_RE = re.compile(r"\b(pg-etl|pg-app)\b", re.IGNORECASE) TEST_MCP_RE = re.compile(r"\b(pg-etl-test|pg-app-test)\b", re.IGNORECASE) WRITE_SQL_RE = re.compile( r"\b(insert|update|delete|truncate|drop|alter|create|grant|revoke|merge|copy|call|vacuum|reindex)\b", re.IGNORECASE, ) def load_input() -> dict: try: return json.load(sys.stdin) except Exception: return {} def target_text(data: dict) -> str: chunks = [json.dumps(data, ensure_ascii=False)] for key in ("command", "file_path", "path"): value = data.get(key) if isinstance(value, str): chunks.append(value) tool_input = data.get("tool_input") if isinstance(tool_input, dict): chunks.append(json.dumps(tool_input, ensure_ascii=False)) return "\n".join(chunks) def allow() -> None: print(json.dumps({"permission": "allow"}, ensure_ascii=False)) def before_shell(data: dict) -> None: text = target_text(data) dangerous = [ r"git\s+reset\s+--hard", r"git\s+checkout\s+--", r"--no-verify", r"\bPG_DSN\b", r"\bAPP_DB_DSN\b", r"\bpsql\b", ] if any(re.search(pattern, text, re.IGNORECASE) for pattern in dangerous): print(json.dumps({ "permission": "ask", "user_message": "命令命中 NeoZQYY 高风险规则:请确认是否需要执行。", "agent_message": "执行前说明风险、目标库/文件、回滚方式;生产库默认不执行。", }, ensure_ascii=False)) return allow() def guard_archived(data: dict) -> None: text = target_text(data) if ARCHIVED_RE.search(text): print(json.dumps({ "permission": "deny", "user_message": "已阻断:`_archived/` 是废弃归档目录,禁止读取、搜索或作为实现参考。", "agent_message": "改用当前版本文件;如确需考古,请让用户明确授权并说明目的。", }, ensure_ascii=False)) return allow() def before_mcp(data: dict) -> None: text = target_text(data) if not PROD_MCP_RE.search(text) or TEST_MCP_RE.search(text): allow() return if WRITE_SQL_RE.search(text): print(json.dumps({ "permission": "deny", "user_message": "已阻断:生产库 MCP 写入/DDL 类操作默认禁止。请改用测试库验证,或让用户给出单次明确授权。", "agent_message": "生产库仅允许人工确认后的只读排查;DDL/写入需走单独变更流程。", }, ensure_ascii=False)) return print(json.dumps({ "permission": "ask", "user_message": "检测到生产库 MCP 只读调用。请确认本次查询目的、SQL 是否只读、是否会暴露敏感数据。", "agent_message": "说明查询目的、目标库、SQL 摘要和风险后等待用户确认。", }, ensure_ascii=False)) def post_tool_use(data: dict) -> None: text = target_text(data).replace("\\", "/") hints = [] if "/_archived/" in text or text.endswith("/_archived"): hints.append("检测到 `_archived/` 路径:该目录内容已废弃,禁止作为实现参考。") if "apps/demo-miniprogram/" in text: hints.append("检测到 demo-miniprogram:这是 MOCK 标杆目录,修改前需确认目的。") if re.search(r"(db/|docs/database/|migrations/|schemas/)", text): hints.append("检测到数据库相关改动:如影响 schema,需同步 `docs/database/` 并提供 3 条验证 SQL。") if re.search(r"(apps/backend/|apps/etl/|app/ai/|prompts/)", text): hints.append("检测到后端/ETL/AI 逻辑改动:完成后需运行相关测试并写审计记录。") if hints: print(json.dumps({"additional_context": "\n".join(f"[NeoZQYY] {hint}" for hint in hints)}, ensure_ascii=False)) def main() -> None: mode = sys.argv[1] if len(sys.argv) > 1 else "" data = load_input() if mode in {"preToolUse", "beforeReadFile"}: guard_archived(data) elif mode == "beforeMCPExecution": before_mcp(data) elif mode == "beforeShellExecution": before_shell(data) elif mode == "postToolUse": post_tool_use(data) if __name__ == "__main__": main()