#!/usr/bin/env python3 """audit_flagger — 判断 git 工作区是否存在高风险改动,写入 .kiro/.audit_state.json 替代原 PowerShell 版本,避免 Windows PowerShell 5.1 解析器 bug。 """ import hashlib import json import os import re import subprocess import sys from datetime import datetime, timezone, timedelta TZ_TAIPEI = timezone(timedelta(hours=8)) RISK_RULES = [ (re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"), "etl"), (re.compile(r"^apps/backend/app/"), "backend"), (re.compile(r"^apps/admin-web/src/"), "admin-web"), (re.compile(r"^apps/miniprogram/(miniapp|miniprogram)/"), "miniprogram"), (re.compile(r"^packages/shared/"), "shared"), (re.compile(r"^db/"), "db"), ] NOISE_PATTERNS = [ re.compile(r"^docs/audit/"), re.compile(r"^\.kiro/"), # .kiro 配置变更不触发业务审计 re.compile(r"^tmp/"), re.compile(r"^\.hypothesis/"), ] DB_PATTERNS = [ re.compile(r"^db/"), re.compile(r"/migrations/"), re.compile(r"\.sql$"), re.compile(r"\.prisma$"), ] STATE_PATH = os.path.join(".kiro", ".audit_state.json") def now_taipei(): return datetime.now(TZ_TAIPEI).isoformat() def sha1hex(s: str) -> str: return hashlib.sha1(s.encode("utf-8")).hexdigest() def get_changed_files() -> list[str]: """从 git status --porcelain 提取变更文件路径""" try: result = subprocess.run( ["git", "status", "--porcelain"], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: return [] except Exception: return [] files = [] for line in result.stdout.splitlines(): if len(line) < 4: continue path = line[3:].strip() if " -> " in path: path = path.split(" -> ")[-1] path = path.strip().strip('"').replace("\\", "/") if path: files.append(path) return files def is_noise(f: str) -> bool: return any(p.search(f) for p in NOISE_PATTERNS) def write_state(state: dict): os.makedirs(".kiro", exist_ok=True) with open(STATE_PATH, "w", encoding="utf-8") as fh: json.dump(state, fh, indent=2, ensure_ascii=False) def main(): # 非 git 仓库直接退出 try: r = subprocess.run( ["git", "rev-parse", "--is-inside-work-tree"], capture_output=True, text=True, timeout=5 ) if r.returncode != 0: return except Exception: return all_files = get_changed_files() files = sorted(set(f for f in all_files if not is_noise(f))) now = now_taipei() if not files: write_state({ "audit_required": False, "db_docs_required": False, "reasons": [], "changed_files": [], "change_fingerprint": "", "marked_at": now, "last_reminded_at": None, }) return reasons = [] audit_required = False db_docs_required = False for f in files: for pattern, label in RISK_RULES: if pattern.search(f): audit_required = True tag = f"dir:{label}" if tag not in reasons: reasons.append(tag) # 根目录散文件 if "/" not in f: audit_required = True if "root-file" not in reasons: reasons.append("root-file") # DB 文档触发 if any(p.search(f) for p in DB_PATTERNS): db_docs_required = True if "db-schema-change" not in reasons: reasons.append("db-schema-change") fp = sha1hex("\n".join(files)) # 保留已有状态的 last_reminded_at last_reminded = None if os.path.isfile(STATE_PATH): try: with open(STATE_PATH, "r", encoding="utf-8") as fh: existing = json.load(fh) if existing.get("change_fingerprint") == fp: last_reminded = existing.get("last_reminded_at") except Exception: pass write_state({ "audit_required": audit_required, "db_docs_required": db_docs_required, "reasons": reasons, "changed_files": files[:50], "change_fingerprint": fp, "marked_at": now, "last_reminded_at": last_reminded, }) if __name__ == "__main__": try: main() except Exception: # 绝不阻塞 prompt 提交 pass