- Add CLAUDE.md (root + ETL subdirectory + db subdirectory) consolidating all Kiro steering docs - Add .mcp.json migrated from .kiro/settings/mcp.json (test DBs enabled, prod disabled) - Add .claude/commands/ (audit, doc-sync, db-docs) replacing Kiro skills - Add .claude/hooks/ (session_start, post_edit_audit, stop_audit_check) replacing Kiro hooks - Add .claude/settings.json registering all hooks - Add scripts/audit/prescan.py merging Kiro's audit_flagger + compliance_prescan - Remove .kiro/agents, hooks, scripts, settings, skills, state (migrated or obsolete) - Update .gitignore for Claude Code Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
215 lines
6.9 KiB
Python
215 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
||
"""审计预扫描 — 识别变更文件、分类风险、合规检查
|
||
|
||
合并自 .kiro/scripts/audit_flagger.py + change_compliance_prescan.py,
|
||
去掉 .kiro/state 依赖,直接输出 JSON 到 stdout 供 /audit 命令读取。
|
||
|
||
用法:
|
||
python scripts/audit/prescan.py
|
||
python scripts/audit/prescan.py --files "apps/backend/app/routers/foo.py,db/etl_feiqiu/migrations/xxx.sql"
|
||
|
||
不带 --files 时从 git status 获取变更列表。
|
||
带 --files 时使用指定的文件列表(逗号分隔),跳过 git。
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
TZ_SHANGHAI = timezone(timedelta(hours=8))
|
||
|
||
# ── 高风险路径规则 ──
|
||
|
||
RISK_RULES = [
|
||
(re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"), "etl"),
|
||
(re.compile(r"^apps/backend/app/"), "backend"),
|
||
(re.compile(r"^apps/admin-web/src/"), "admin-web"),
|
||
(re.compile(r"^apps/tenant-admin/src/"), "tenant-admin"),
|
||
(re.compile(r"^apps/miniprogram/(miniapp|miniprogram)/"), "miniprogram"),
|
||
(re.compile(r"^packages/shared/"), "shared"),
|
||
(re.compile(r"^db/"), "db"),
|
||
]
|
||
|
||
NOISE_PATTERNS = [
|
||
re.compile(r"^docs/audit/"),
|
||
re.compile(r"^\.kiro/"),
|
||
re.compile(r"^\.claude/"),
|
||
re.compile(r"^tmp/"),
|
||
re.compile(r"^\.hypothesis/"),
|
||
re.compile(r"\.png$"),
|
||
re.compile(r"\.jpg$"),
|
||
]
|
||
|
||
# ── 代码→文档映射 ──
|
||
|
||
DOC_MAP = {
|
||
"apps/backend/app/routers/": ["apps/backend/docs/API-REFERENCE.md"],
|
||
"apps/backend/app/services/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"],
|
||
"apps/backend/app/auth/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"],
|
||
"apps/backend/app/schemas/": ["apps/backend/docs/API-REFERENCE.md"],
|
||
"apps/etl/connectors/feiqiu/tasks/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
|
||
"apps/etl/connectors/feiqiu/loaders/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
|
||
"apps/etl/connectors/feiqiu/scd/": ["apps/etl/connectors/feiqiu/docs/business-rules/scd2_rules.md"],
|
||
"apps/etl/connectors/feiqiu/orchestration/": ["apps/etl/connectors/feiqiu/docs/architecture/"],
|
||
"apps/admin-web/src/": ["apps/admin-web/README.md"],
|
||
"apps/tenant-admin/src/": ["apps/tenant-admin/README.md"],
|
||
"apps/miniprogram/": ["apps/miniprogram/README.md"],
|
||
"packages/shared/": ["packages/shared/README.md"],
|
||
"db/etl_feiqiu/migrations/": ["docs/database/"],
|
||
"db/zqyy_app/migrations/": ["docs/database/"],
|
||
}
|
||
|
||
MIGRATION_PATTERNS = [
|
||
re.compile(r"^db/etl_feiqiu/migrations/.*\.sql$"),
|
||
re.compile(r"^db/zqyy_app/migrations/.*\.sql$"),
|
||
re.compile(r"^db/fdw/.*\.sql$"),
|
||
]
|
||
|
||
DDL_BASELINE_DIR = "docs/database/ddl/"
|
||
BD_MANUAL_PATTERN = re.compile(r"^docs/database/BD_Manual_.*\.md$")
|
||
|
||
|
||
def get_changed_files_from_git() -> list[str]:
|
||
"""从 git status --porcelain 提取变更文件路径"""
|
||
try:
|
||
result = subprocess.run(
|
||
["git", "status", "--porcelain"],
|
||
capture_output=True, text=True, timeout=10,
|
||
)
|
||
if result.returncode != 0:
|
||
return []
|
||
except Exception:
|
||
return []
|
||
|
||
files = []
|
||
for line in result.stdout.splitlines():
|
||
if len(line) < 4:
|
||
continue
|
||
path = line[3:].strip()
|
||
if " -> " in path:
|
||
path = path.split(" -> ")[-1]
|
||
path = path.strip().strip('"').replace("\\", "/")
|
||
if path:
|
||
files.append(path)
|
||
return sorted(set(files))
|
||
|
||
|
||
def is_noise(f: str) -> bool:
|
||
return any(p.search(f) for p in NOISE_PATTERNS)
|
||
|
||
|
||
def classify(files: list[str]) -> dict:
|
||
"""分类变更文件,输出结构化审查清单"""
|
||
real_files = [f for f in files if not is_noise(f)]
|
||
|
||
risk_tags = []
|
||
high_risk_files = []
|
||
new_migration_sql = []
|
||
code_without_docs = []
|
||
has_bd_manual = False
|
||
has_ddl_baseline = False
|
||
|
||
code_files = []
|
||
doc_files = set()
|
||
|
||
for f in real_files:
|
||
# 高风险分类
|
||
for pattern, label in RISK_RULES:
|
||
if pattern.search(f):
|
||
high_risk_files.append(f)
|
||
tag = f"dir:{label}"
|
||
if tag not in risk_tags:
|
||
risk_tags.append(tag)
|
||
break
|
||
|
||
# 根目录散文件
|
||
if "/" not in f and "root-file" not in risk_tags:
|
||
risk_tags.append("root-file")
|
||
|
||
# 迁移 SQL
|
||
for mp in MIGRATION_PATTERNS:
|
||
if mp.search(f):
|
||
new_migration_sql.append(f)
|
||
if "db-schema-change" not in risk_tags:
|
||
risk_tags.append("db-schema-change")
|
||
break
|
||
|
||
# BD Manual / DDL 基线
|
||
if BD_MANUAL_PATTERN.search(f):
|
||
has_bd_manual = True
|
||
if f.startswith(DDL_BASELINE_DIR):
|
||
has_ddl_baseline = True
|
||
|
||
# 分桶
|
||
if f.endswith(".md") or "/docs/" in f:
|
||
doc_files.add(f)
|
||
if f.endswith((".py", ".ts", ".tsx", ".js", ".jsx", ".sql")):
|
||
code_files.append(f)
|
||
|
||
# 代码→文档映射检查
|
||
for cf in code_files:
|
||
expected_docs = []
|
||
for prefix, docs in DOC_MAP.items():
|
||
if cf.startswith(prefix):
|
||
expected_docs.extend(docs)
|
||
if not expected_docs:
|
||
continue
|
||
has_doc = False
|
||
for ed in expected_docs:
|
||
if ed in doc_files:
|
||
has_doc = True
|
||
break
|
||
if ed.endswith("/") and any(d.startswith(ed) for d in doc_files):
|
||
has_doc = True
|
||
break
|
||
if not has_doc:
|
||
code_without_docs.append({
|
||
"file": cf,
|
||
"expected_docs": expected_docs,
|
||
})
|
||
|
||
return {
|
||
"scanned_at": datetime.now(TZ_SHANGHAI).strftime("%Y-%m-%d %H:%M:%S"),
|
||
"total_files": len(real_files),
|
||
"all_files": real_files,
|
||
"high_risk_files": sorted(set(high_risk_files)),
|
||
"risk_tags": risk_tags,
|
||
"new_migration_sql": new_migration_sql,
|
||
"code_without_docs": code_without_docs,
|
||
"has_bd_manual": has_bd_manual,
|
||
"has_ddl_baseline": has_ddl_baseline,
|
||
"audit_required": len(risk_tags) > 0,
|
||
}
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument(
|
||
"--files",
|
||
help="逗号分隔的文件列表(跳过 git status)",
|
||
default=None,
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if args.files:
|
||
files = [f.strip() for f in args.files.split(",") if f.strip()]
|
||
else:
|
||
files = get_changed_files_from_git()
|
||
|
||
if not files:
|
||
print(json.dumps({"audit_required": False, "total_files": 0}, ensure_ascii=False))
|
||
return
|
||
|
||
result = classify(files)
|
||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except Exception as e:
|
||
print(json.dumps({"error": str(e), "audit_required": False}, ensure_ascii=False))
|