#!/usr/bin/env python3 """change_compliance_prescan — 预扫描变更文件,输出需要合规审查的项目。 在 agentStop 时由 askAgent hook 调用,为 LLM 提供精简的审查清单, 避免 LLM 自行扫描文件浪费 Token。 输出到 stdout(供 askAgent 读取): - 若无需审查:输出 "NO_CHECK_NEEDED" - 若需审查:输出结构化 JSON 清单 """ import json import os import re import subprocess import sys from datetime import datetime, timezone, timedelta TZ_TAIPEI = timezone(timedelta(hours=8)) STATE_PATH = os.path.join(".kiro", ".audit_state.json") # doc-map 中定义的文档对应关系 DOC_MAP = { # 代码路径前缀 → 应同步更新的文档 "apps/backend/app/routers/": ["apps/backend/docs/API-REFERENCE.md"], "apps/backend/app/services/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"], "apps/backend/app/auth/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"], "apps/etl/connectors/feiqiu/tasks/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"], "apps/etl/connectors/feiqiu/loaders/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"], "apps/etl/connectors/feiqiu/scd/": ["apps/etl/connectors/feiqiu/docs/business-rules/scd2_rules.md"], "apps/etl/connectors/feiqiu/orchestration/": ["apps/etl/connectors/feiqiu/docs/architecture/"], "apps/admin-web/src/": ["apps/admin-web/README.md"], "apps/miniprogram/": ["apps/miniprogram/README.md"], "packages/shared/": ["packages/shared/README.md"], } # DDL 基线文件(doc-map 中定义) DDL_BASELINE_DIR = "docs/database/ddl/" # 迁移脚本路径 MIGRATION_PATTERNS = [ re.compile(r"^db/etl_feiqiu/migrations/.*\.sql$"), re.compile(r"^db/zqyy_app/migrations/.*\.sql$"), re.compile(r"^db/fdw/.*\.sql$"), ] # DB 文档路径 BD_MANUAL_PATTERN = re.compile(r"^docs/database/BD_Manual_.*\.md$") # 审计记录路径 AUDIT_CHANGES_DIR = "docs/audit/changes/" # 噪声路径(不参与合规检查) NOISE = [ re.compile(r"^docs/audit/"), re.compile(r"^\.kiro/"), re.compile(r"^\.hypothesis/"), re.compile(r"^tmp/"), re.compile(r"\.png$"), re.compile(r"\.jpg$"), ] def safe_read_json(path): if not os.path.isfile(path): return {} try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {} def get_changed_files(): """从 audit_state 或 git status 获取变更文件""" state = safe_read_json(STATE_PATH) files = state.get("changed_files", []) if files: return files # 回退到 git status try: r = subprocess.run( ["git", "status", "--porcelain"], capture_output=True, text=True, timeout=10 ) if r.returncode != 0: return [] result = [] for line in r.stdout.splitlines(): if len(line) < 4: continue path = line[3:].strip().strip('"').replace("\\", "/") if " -> " in path: path = path.split(" -> ")[-1] if path: result.append(path) return sorted(set(result)) except Exception: return [] def is_noise(f): return any(p.search(f) for p in NOISE) def classify_files(files): """将变更文件分类,输出审查清单""" result = { "new_migration_sql": [], # 新增的迁移 SQL "new_or_modified_sql": [], # 所有 SQL 变更 "code_without_docs": [], # 有代码改动但缺少对应文档改动 "new_files": [], # 新增文件(需检查目录规范) "has_bd_manual": False, # 是否有 BD_Manual 文档变更 "has_audit_record": False, # 是否有审计记录变更 "has_ddl_baseline": False, # 是否有 DDL 基线变更 } code_files = [] doc_files = set() for f in files: if is_noise(f): continue # 迁移 SQL for mp in MIGRATION_PATTERNS: if mp.search(f): result["new_migration_sql"].append(f) break # SQL 文件 if f.endswith(".sql"): result["new_or_modified_sql"].append(f) # BD_Manual if BD_MANUAL_PATTERN.search(f): result["has_bd_manual"] = True # 审计记录 if f.startswith(AUDIT_CHANGES_DIR): result["has_audit_record"] = True # DDL 基线 if f.startswith(DDL_BASELINE_DIR): result["has_ddl_baseline"] = True # 文档文件 if f.endswith(".md") or "/docs/" in f: doc_files.add(f) # 代码文件(非文档、非配置) if f.endswith((".py", ".ts", ".tsx", ".js", ".jsx")): code_files.append(f) # 检查代码文件是否有对应文档变更 for cf in code_files: expected_docs = [] for prefix, docs in DOC_MAP.items(): if cf.startswith(prefix): expected_docs.extend(docs) if expected_docs: # 检查是否有任一对应文档在变更列表中 has_doc = False for ed in expected_docs: if ed in doc_files: has_doc = True break # 目录级匹配 if ed.endswith("/"): if any(d.startswith(ed) for d in doc_files): has_doc = True break if not has_doc: result["code_without_docs"].append({ "file": cf, "expected_docs": expected_docs, }) return result COMPLIANCE_STATE_PATH = os.path.join(".kiro", ".compliance_state.json") def save_compliance_state(result, needs_check): """持久化合规检查结果,供 audit-writer 子代理读取""" os.makedirs(".kiro", exist_ok=True) now = datetime.now(TZ_TAIPEI) state = { "needs_check": needs_check, "scanned_at": now.isoformat(), **result, } with open(COMPLIANCE_STATE_PATH, "w", encoding="utf-8") as f: json.dump(state, f, indent=2, ensure_ascii=False) def main(): files = get_changed_files() if not files: save_compliance_state({"new_migration_sql": [], "new_or_modified_sql": [], "code_without_docs": [], "new_files": [], "has_bd_manual": False, "has_audit_record": False, "has_ddl_baseline": False}, False) print("NO_CHECK_NEEDED") return # 过滤噪声 real_files = [f for f in files if not is_noise(f)] if not real_files: save_compliance_state({"new_migration_sql": [], "new_or_modified_sql": [], "code_without_docs": [], "new_files": [], "has_bd_manual": False, "has_audit_record": False, "has_ddl_baseline": False}, False) print("NO_CHECK_NEEDED") return result = classify_files(files) # 判断是否需要审查 needs_check = ( result["new_migration_sql"] or result["code_without_docs"] or (result["new_migration_sql"] and not result["has_ddl_baseline"]) ) # 始终持久化结果 save_compliance_state(result, needs_check) if not needs_check: print("NO_CHECK_NEEDED") return # 输出精简 JSON 供 LLM 审查 print(json.dumps(result, indent=2, ensure_ascii=False)) if __name__ == "__main__": try: main() except Exception as e: # 出错时不阻塞,输出无需检查 print("NO_CHECK_NEEDED")