Files
ZQYY.FQ-ETL/scripts/audit/inventory_analyzer.py

450 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
文件清单分析器 — 对扫描结果进行用途分类和处置标签分配。
分类规则按优先级从高到低排列:
1. tmp/ 下所有文件 → 临时与调试 / 候选删除或候选归档
2. logs/、export/ 下的运行时产出 → 日志与输出 / 候选归档
3. *.lnk、*.rar 文件 → 其他 / 候选删除
4. 空目录 → 其他 / 候选删除
5. 核心代码目录tasks/ 等)→ 核心代码 / 保留
6. config/ → 配置 / 保留
7. database/*.sql、database/migrations/ → 数据库定义 / 保留
8. database/*.py → 核心代码 / 保留
9. tests/ → 测试 / 保留
10. docs/ → 文档 / 保留
11. scripts/ 下的 .py 文件 → 脚本工具 / 保留
12. gui/ → GUI / 保留
13. 构建与部署文件 → 构建与部署 / 保留
14. 其余 → 其他 / 待确认
"""
from __future__ import annotations
import os
from collections import Counter
from datetime import datetime, timezone
from itertools import groupby
from scripts.audit import Category, Disposition, FileEntry, InventoryItem
# ---------------------------------------------------------------------------
# 常量
# ---------------------------------------------------------------------------
# 核心代码顶层目录
_CORE_CODE_DIRS = (
"tasks/", "loaders/", "scd/", "orchestration/",
"quality/", "models/", "utils/", "api/",
)
# 构建与部署文件名(根目录级别)
_BUILD_DEPLOY_BASENAMES = {"setup.py", "build_exe.py"}
# 构建与部署扩展名
_BUILD_DEPLOY_EXTENSIONS = {".bat", ".sh", ".ps1"}
# ---------------------------------------------------------------------------
# 辅助函数
# ---------------------------------------------------------------------------
def _top_dir(rel_path: str) -> str:
"""返回相对路径的第一级目录名(含尾部斜杠),如 'tmp/foo.py''tmp/'"""
idx = rel_path.find("/")
if idx == -1:
return ""
return rel_path[: idx + 1]
def _basename(rel_path: str) -> str:
"""返回路径的最后一段文件名。"""
return rel_path.rsplit("/", 1)[-1]
def _is_init_py(rel_path: str) -> bool:
"""判断路径是否为 __init__.py。"""
return _basename(rel_path) == "__init__.py"
# ---------------------------------------------------------------------------
# classify — 核心分类函数
# ---------------------------------------------------------------------------
def classify(entry: FileEntry) -> InventoryItem:
"""根据路径、扩展名等规则对单个文件/目录进行分类和标签分配。
规则按优先级从高到低依次匹配,首个命中的规则决定分类和处置。
"""
path = entry.rel_path
top = _top_dir(path)
ext = entry.extension.lower()
base = _basename(path)
# --- 优先级 1: tmp/ 下所有文件 ---
if top == "tmp/" or path == "tmp":
return _classify_tmp(entry)
# --- 优先级 2: logs/、export/ 下的运行时产出 ---
if top in ("logs/", "export/") or path in ("logs", "export"):
return _classify_runtime_output(entry)
# --- 优先级 3: .lnk / .rar 文件 ---
if ext in (".lnk", ".rar"):
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.CANDIDATE_DELETE,
description=f"快捷方式/压缩包文件(`{ext}`),建议删除",
)
# --- 优先级 4: 空目录 ---
if entry.is_empty_dir:
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.CANDIDATE_DELETE,
description="空目录,建议删除",
)
# --- 优先级 5: 核心代码目录 ---
if any(path.startswith(d) or path + "/" == d for d in _CORE_CODE_DIRS):
return InventoryItem(
rel_path=path,
category=Category.CORE_CODE,
disposition=Disposition.KEEP,
description=f"核心代码(`{top.rstrip('/')}`",
)
# --- 优先级 6: config/ ---
if top == "config/" or path == "config":
return InventoryItem(
rel_path=path,
category=Category.CONFIG,
disposition=Disposition.KEEP,
description="配置文件",
)
# --- 优先级 7: database/*.sql 和 database/migrations/ ---
if top == "database/" or path == "database":
return _classify_database(entry)
# --- 优先级 8: tests/ ---
if top == "tests/" or path == "tests":
return InventoryItem(
rel_path=path,
category=Category.TEST,
disposition=Disposition.KEEP,
description="测试文件",
)
# --- 优先级 9: docs/ ---
if top == "docs/" or path == "docs":
return InventoryItem(
rel_path=path,
category=Category.DOCS,
disposition=Disposition.KEEP,
description="文档",
)
# --- 优先级 10: scripts/ 下的 .py 文件 ---
if top == "scripts/" or path == "scripts":
cat = Category.SCRIPTS
if ext == ".py" or entry.is_dir:
return InventoryItem(
rel_path=path,
category=cat,
disposition=Disposition.KEEP,
description="脚本工具",
)
return InventoryItem(
rel_path=path,
category=cat,
disposition=Disposition.NEEDS_REVIEW,
description="脚本目录下的非 Python 文件,需确认用途",
)
# --- 优先级 11: gui/ ---
if top == "gui/" or path == "gui":
return InventoryItem(
rel_path=path,
category=Category.GUI,
disposition=Disposition.KEEP,
description="GUI 模块",
)
# --- 优先级 12: 构建与部署 ---
if base in _BUILD_DEPLOY_BASENAMES or ext in _BUILD_DEPLOY_EXTENSIONS:
return InventoryItem(
rel_path=path,
category=Category.BUILD_DEPLOY,
disposition=Disposition.KEEP,
description="构建与部署文件",
)
# --- 优先级 13: cli/ ---
if top == "cli/" or path == "cli":
return InventoryItem(
rel_path=path,
category=Category.CORE_CODE,
disposition=Disposition.KEEP,
description="CLI 入口模块",
)
# --- 优先级 14: 已知根目录文件 ---
if "/" not in path:
return _classify_root_file(entry)
# --- 兜底 ---
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.NEEDS_REVIEW,
description="未匹配已知规则,需人工确认用途",
)
# ---------------------------------------------------------------------------
# 子分类函数
# ---------------------------------------------------------------------------
def _classify_tmp(entry: FileEntry) -> InventoryItem:
"""tmp/ 目录下的文件分类。
默认候选删除;有意义的 .py 文件标记为候选归档。
"""
ext = entry.extension.lower()
base = _basename(entry.rel_path)
# 空目录直接候选删除
if entry.is_empty_dir:
return InventoryItem(
rel_path=entry.rel_path,
category=Category.TEMP_DEBUG,
disposition=Disposition.CANDIDATE_DELETE,
description="临时目录下的空目录",
)
# .py 文件可能有参考价值 → 候选归档
if ext == ".py" and len(base) > 4:
return InventoryItem(
rel_path=entry.rel_path,
category=Category.TEMP_DEBUG,
disposition=Disposition.CANDIDATE_ARCHIVE,
description="临时 Python 脚本,可能有参考价值",
)
return InventoryItem(
rel_path=entry.rel_path,
category=Category.TEMP_DEBUG,
disposition=Disposition.CANDIDATE_DELETE,
description="临时/调试文件,建议删除",
)
def _classify_runtime_output(entry: FileEntry) -> InventoryItem:
"""logs/、export/ 目录下的运行时产出分类。
__init__.py 保留(包标记),其余候选归档。
"""
if _is_init_py(entry.rel_path):
return InventoryItem(
rel_path=entry.rel_path,
category=Category.LOG_OUTPUT,
disposition=Disposition.KEEP,
description="包初始化文件",
)
return InventoryItem(
rel_path=entry.rel_path,
category=Category.LOG_OUTPUT,
disposition=Disposition.CANDIDATE_ARCHIVE,
description="运行时产出,建议归档",
)
def _classify_database(entry: FileEntry) -> InventoryItem:
"""database/ 目录下的文件分类。"""
path = entry.rel_path
ext = entry.extension.lower()
# migrations/ 子目录
if "migrations/" in path or path.endswith("migrations"):
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.KEEP,
description="数据库迁移脚本",
)
# .sql 文件
if ext == ".sql":
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.KEEP,
description="数据库 DDL/DML 脚本",
)
# .py 文件 → 核心代码
if ext == ".py":
return InventoryItem(
rel_path=path,
category=Category.CORE_CODE,
disposition=Disposition.KEEP,
description="数据库操作模块",
)
# 目录本身
if entry.is_dir:
if entry.is_empty_dir:
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.CANDIDATE_DELETE,
description="数据库目录下的空目录",
)
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.KEEP,
description="数据库子目录",
)
# 其他文件
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.NEEDS_REVIEW,
description="数据库目录下的非标准文件,需确认",
)
def _classify_root_file(entry: FileEntry) -> InventoryItem:
"""根目录散落文件的分类。"""
ext = entry.extension.lower()
base = _basename(entry.rel_path)
# 已知构建文件
if base in _BUILD_DEPLOY_BASENAMES or ext in _BUILD_DEPLOY_EXTENSIONS:
return InventoryItem(
rel_path=entry.rel_path,
category=Category.BUILD_DEPLOY,
disposition=Disposition.KEEP,
description="构建与部署文件",
)
# 已知配置文件
if base in (
"requirements.txt", "pytest.ini", ".env", ".env.example",
".gitignore", ".flake8", "pyproject.toml",
):
return InventoryItem(
rel_path=entry.rel_path,
category=Category.CONFIG,
disposition=Disposition.KEEP,
description="项目配置文件",
)
# README
if base.lower().startswith("readme"):
return InventoryItem(
rel_path=entry.rel_path,
category=Category.DOCS,
disposition=Disposition.KEEP,
description="项目说明文档",
)
# 其他根目录文件 → 待确认
return InventoryItem(
rel_path=entry.rel_path,
category=Category.OTHER,
disposition=Disposition.NEEDS_REVIEW,
description=f"根目录散落文件(`{base}`),需确认用途",
)
# ---------------------------------------------------------------------------
# build_inventory — 批量分类
# ---------------------------------------------------------------------------
def build_inventory(entries: list[FileEntry]) -> list[InventoryItem]:
"""对所有文件条目执行分类,返回清单列表。"""
return [classify(e) for e in entries]
# ---------------------------------------------------------------------------
# render_inventory_report — Markdown 渲染
# ---------------------------------------------------------------------------
def render_inventory_report(items: list[InventoryItem], repo_root: str) -> str:
"""生成 Markdown 格式的文件清单报告。
报告结构:
- 头部:标题、生成时间、仓库路径
- 主体:按 Category 分组的表格
- 尾部:统计摘要
"""
lines: list[str] = []
# --- 头部 ---
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
lines.append("# 文件清单报告")
lines.append("")
lines.append(f"- 生成时间:{now}")
lines.append(f"- 仓库路径:`{repo_root}`")
lines.append("")
# --- 按分类分组 ---
# 保持 Category 枚举定义顺序
cat_order = {c: i for i, c in enumerate(Category)}
sorted_items = sorted(items, key=lambda it: cat_order[it.category])
for cat, group in groupby(sorted_items, key=lambda it: it.category):
group_list = list(group)
lines.append(f"## {cat.value}")
lines.append("")
lines.append("| 相对路径 | 处置标签 | 简要说明 |")
lines.append("|---|---|---|")
for item in group_list:
lines.append(
f"| `{item.rel_path}` | {item.disposition.value} | {item.description} |"
)
lines.append("")
# --- 统计摘要 ---
lines.append("## 统计摘要")
lines.append("")
# 各分类计数
cat_counter: Counter[Category] = Counter()
disp_counter: Counter[Disposition] = Counter()
for item in items:
cat_counter[item.category] += 1
disp_counter[item.disposition] += 1
lines.append("### 按用途分类")
lines.append("")
lines.append("| 分类 | 数量 |")
lines.append("|---|---|")
for cat in Category:
count = cat_counter.get(cat, 0)
if count > 0:
lines.append(f"| {cat.value} | {count} |")
lines.append("")
lines.append("### 按处置标签")
lines.append("")
lines.append("| 标签 | 数量 |")
lines.append("|---|---|")
for disp in Disposition:
count = disp_counter.get(disp, 0)
if count > 0:
lines.append(f"| {disp.value} | {count} |")
lines.append("")
lines.append(f"**总计:{len(items)} 个条目**")
lines.append("")
return "\n".join(lines)