Files
Neo-ZQYY/scripts/audit/prescan.py
Neo 8228b3fa37 chore: migrate IDE environment from Kiro to Claude Code
- Add CLAUDE.md (root + ETL subdirectory + db subdirectory) consolidating all Kiro steering docs
- Add .mcp.json migrated from .kiro/settings/mcp.json (test DBs enabled, prod disabled)
- Add .claude/commands/ (audit, doc-sync, db-docs) replacing Kiro skills
- Add .claude/hooks/ (session_start, post_edit_audit, stop_audit_check) replacing Kiro hooks
- Add .claude/settings.json registering all hooks
- Add scripts/audit/prescan.py merging Kiro's audit_flagger + compliance_prescan
- Remove .kiro/agents, hooks, scripts, settings, skills, state (migrated or obsolete)
- Update .gitignore for Claude Code

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-05 15:48:08 +08:00

215 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""审计预扫描 — 识别变更文件、分类风险、合规检查
合并自 .kiro/scripts/audit_flagger.py + change_compliance_prescan.py
去掉 .kiro/state 依赖,直接输出 JSON 到 stdout 供 /audit 命令读取。
用法:
python scripts/audit/prescan.py
python scripts/audit/prescan.py --files "apps/backend/app/routers/foo.py,db/etl_feiqiu/migrations/xxx.sql"
不带 --files 时从 git status 获取变更列表。
带 --files 时使用指定的文件列表(逗号分隔),跳过 git。
"""
import argparse
import json
import re
import subprocess
import sys
from datetime import datetime, timezone, timedelta
TZ_SHANGHAI = timezone(timedelta(hours=8))
# ── 高风险路径规则 ──
RISK_RULES = [
(re.compile(r"^apps/etl/connectors/feiqiu/(api|cli|config|database|loaders|models|orchestration|scd|tasks|utils|quality)/"), "etl"),
(re.compile(r"^apps/backend/app/"), "backend"),
(re.compile(r"^apps/admin-web/src/"), "admin-web"),
(re.compile(r"^apps/tenant-admin/src/"), "tenant-admin"),
(re.compile(r"^apps/miniprogram/(miniapp|miniprogram)/"), "miniprogram"),
(re.compile(r"^packages/shared/"), "shared"),
(re.compile(r"^db/"), "db"),
]
NOISE_PATTERNS = [
re.compile(r"^docs/audit/"),
re.compile(r"^\.kiro/"),
re.compile(r"^\.claude/"),
re.compile(r"^tmp/"),
re.compile(r"^\.hypothesis/"),
re.compile(r"\.png$"),
re.compile(r"\.jpg$"),
]
# ── 代码→文档映射 ──
DOC_MAP = {
"apps/backend/app/routers/": ["apps/backend/docs/API-REFERENCE.md"],
"apps/backend/app/services/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"],
"apps/backend/app/auth/": ["apps/backend/docs/API-REFERENCE.md", "apps/backend/README.md"],
"apps/backend/app/schemas/": ["apps/backend/docs/API-REFERENCE.md"],
"apps/etl/connectors/feiqiu/tasks/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
"apps/etl/connectors/feiqiu/loaders/": ["apps/etl/connectors/feiqiu/docs/etl_tasks/"],
"apps/etl/connectors/feiqiu/scd/": ["apps/etl/connectors/feiqiu/docs/business-rules/scd2_rules.md"],
"apps/etl/connectors/feiqiu/orchestration/": ["apps/etl/connectors/feiqiu/docs/architecture/"],
"apps/admin-web/src/": ["apps/admin-web/README.md"],
"apps/tenant-admin/src/": ["apps/tenant-admin/README.md"],
"apps/miniprogram/": ["apps/miniprogram/README.md"],
"packages/shared/": ["packages/shared/README.md"],
"db/etl_feiqiu/migrations/": ["docs/database/"],
"db/zqyy_app/migrations/": ["docs/database/"],
}
MIGRATION_PATTERNS = [
re.compile(r"^db/etl_feiqiu/migrations/.*\.sql$"),
re.compile(r"^db/zqyy_app/migrations/.*\.sql$"),
re.compile(r"^db/fdw/.*\.sql$"),
]
DDL_BASELINE_DIR = "docs/database/ddl/"
BD_MANUAL_PATTERN = re.compile(r"^docs/database/BD_Manual_.*\.md$")
def get_changed_files_from_git() -> list[str]:
"""从 git status --porcelain 提取变更文件路径"""
try:
result = subprocess.run(
["git", "status", "--porcelain"],
capture_output=True, text=True, timeout=10,
)
if result.returncode != 0:
return []
except Exception:
return []
files = []
for line in result.stdout.splitlines():
if len(line) < 4:
continue
path = line[3:].strip()
if " -> " in path:
path = path.split(" -> ")[-1]
path = path.strip().strip('"').replace("\\", "/")
if path:
files.append(path)
return sorted(set(files))
def is_noise(f: str) -> bool:
return any(p.search(f) for p in NOISE_PATTERNS)
def classify(files: list[str]) -> dict:
"""分类变更文件,输出结构化审查清单"""
real_files = [f for f in files if not is_noise(f)]
risk_tags = []
high_risk_files = []
new_migration_sql = []
code_without_docs = []
has_bd_manual = False
has_ddl_baseline = False
code_files = []
doc_files = set()
for f in real_files:
# 高风险分类
for pattern, label in RISK_RULES:
if pattern.search(f):
high_risk_files.append(f)
tag = f"dir:{label}"
if tag not in risk_tags:
risk_tags.append(tag)
break
# 根目录散文件
if "/" not in f and "root-file" not in risk_tags:
risk_tags.append("root-file")
# 迁移 SQL
for mp in MIGRATION_PATTERNS:
if mp.search(f):
new_migration_sql.append(f)
if "db-schema-change" not in risk_tags:
risk_tags.append("db-schema-change")
break
# BD Manual / DDL 基线
if BD_MANUAL_PATTERN.search(f):
has_bd_manual = True
if f.startswith(DDL_BASELINE_DIR):
has_ddl_baseline = True
# 分桶
if f.endswith(".md") or "/docs/" in f:
doc_files.add(f)
if f.endswith((".py", ".ts", ".tsx", ".js", ".jsx", ".sql")):
code_files.append(f)
# 代码→文档映射检查
for cf in code_files:
expected_docs = []
for prefix, docs in DOC_MAP.items():
if cf.startswith(prefix):
expected_docs.extend(docs)
if not expected_docs:
continue
has_doc = False
for ed in expected_docs:
if ed in doc_files:
has_doc = True
break
if ed.endswith("/") and any(d.startswith(ed) for d in doc_files):
has_doc = True
break
if not has_doc:
code_without_docs.append({
"file": cf,
"expected_docs": expected_docs,
})
return {
"scanned_at": datetime.now(TZ_SHANGHAI).strftime("%Y-%m-%d %H:%M:%S"),
"total_files": len(real_files),
"all_files": real_files,
"high_risk_files": sorted(set(high_risk_files)),
"risk_tags": risk_tags,
"new_migration_sql": new_migration_sql,
"code_without_docs": code_without_docs,
"has_bd_manual": has_bd_manual,
"has_ddl_baseline": has_ddl_baseline,
"audit_required": len(risk_tags) > 0,
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--files",
help="逗号分隔的文件列表(跳过 git status",
default=None,
)
args = parser.parse_args()
if args.files:
files = [f.strip() for f in args.files.split(",") if f.strip()]
else:
files = get_changed_files_from_git()
if not files:
print(json.dumps({"audit_required": False, "total_files": 0}, ensure_ascii=False))
return
result = classify(files)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
try:
main()
except Exception as e:
print(json.dumps({"error": str(e), "audit_required": False}, ensure_ascii=False))