# -*- coding: utf-8 -*- """ 文件清单分析器 — 对扫描结果进行用途分类和处置标签分配。 分类规则按优先级从高到低排列: 1. tmp/ 下所有文件 → 临时与调试 / 候选删除或候选归档 2. logs/、export/ 下的运行时产出 → 日志与输出 / 候选归档 3. *.lnk、*.rar 文件 → 其他 / 候选删除 4. 空目录 → 其他 / 候选删除 5. 核心代码目录(tasks/ 等)→ 核心代码 / 保留 6. config/ → 配置 / 保留 7. database/*.sql、database/migrations/ → 数据库定义 / 保留 8. database/*.py → 核心代码 / 保留 9. tests/ → 测试 / 保留 10. docs/ → 文档 / 保留 11. scripts/ 下的 .py 文件 → 脚本工具 / 保留 12. gui/ → GUI / 保留 13. 构建与部署文件 → 构建与部署 / 保留 14. 其余 → 其他 / 待确认 """ from __future__ import annotations import os from collections import Counter from datetime import datetime, timezone from itertools import groupby from scripts.audit import Category, Disposition, FileEntry, InventoryItem # --------------------------------------------------------------------------- # 常量 # --------------------------------------------------------------------------- # 核心代码顶层目录 _CORE_CODE_DIRS = ( "tasks/", "loaders/", "scd/", "orchestration/", "quality/", "models/", "utils/", "api/", ) # 构建与部署文件名(根目录级别) _BUILD_DEPLOY_BASENAMES = {"setup.py", "build_exe.py"} # 构建与部署扩展名 _BUILD_DEPLOY_EXTENSIONS = {".bat", ".sh", ".ps1"} # --------------------------------------------------------------------------- # 辅助函数 # --------------------------------------------------------------------------- def _top_dir(rel_path: str) -> str: """返回相对路径的第一级目录名(含尾部斜杠),如 'tmp/foo.py' → 'tmp/'。""" idx = rel_path.find("/") if idx == -1: return "" return rel_path[: idx + 1] def _basename(rel_path: str) -> str: """返回路径的最后一段文件名。""" return rel_path.rsplit("/", 1)[-1] def _is_init_py(rel_path: str) -> bool: """判断路径是否为 __init__.py。""" return _basename(rel_path) == "__init__.py" # --------------------------------------------------------------------------- # classify — 核心分类函数 # --------------------------------------------------------------------------- def classify(entry: FileEntry) -> InventoryItem: """根据路径、扩展名等规则对单个文件/目录进行分类和标签分配。 规则按优先级从高到低依次匹配,首个命中的规则决定分类和处置。 """ path = entry.rel_path top = _top_dir(path) ext = entry.extension.lower() base = _basename(path) # --- 优先级 1: tmp/ 下所有文件 --- if top == "tmp/" or path == "tmp": return _classify_tmp(entry) # --- 优先级 2: logs/、export/ 下的运行时产出 --- if top in ("logs/", "export/") or path in ("logs", "export"): return _classify_runtime_output(entry) # --- 优先级 3: .lnk / .rar 文件 --- if ext in (".lnk", ".rar"): return InventoryItem( rel_path=path, category=Category.OTHER, disposition=Disposition.CANDIDATE_DELETE, description=f"快捷方式/压缩包文件(`{ext}`),建议删除", ) # --- 优先级 4: 空目录 --- if entry.is_empty_dir: return InventoryItem( rel_path=path, category=Category.OTHER, disposition=Disposition.CANDIDATE_DELETE, description="空目录,建议删除", ) # --- 优先级 5: 核心代码目录 --- if any(path.startswith(d) or path + "/" == d for d in _CORE_CODE_DIRS): return InventoryItem( rel_path=path, category=Category.CORE_CODE, disposition=Disposition.KEEP, description=f"核心代码(`{top.rstrip('/')}`)", ) # --- 优先级 6: config/ --- if top == "config/" or path == "config": return InventoryItem( rel_path=path, category=Category.CONFIG, disposition=Disposition.KEEP, description="配置文件", ) # --- 优先级 7: database/*.sql 和 database/migrations/ --- if top == "database/" or path == "database": return _classify_database(entry) # --- 优先级 8: tests/ --- if top == "tests/" or path == "tests": return InventoryItem( rel_path=path, category=Category.TEST, disposition=Disposition.KEEP, description="测试文件", ) # --- 优先级 9: docs/ --- if top == "docs/" or path == "docs": return InventoryItem( rel_path=path, category=Category.DOCS, disposition=Disposition.KEEP, description="文档", ) # --- 优先级 10: scripts/ 下的 .py 文件 --- if top == "scripts/" or path == "scripts": cat = Category.SCRIPTS if ext == ".py" or entry.is_dir: return InventoryItem( rel_path=path, category=cat, disposition=Disposition.KEEP, description="脚本工具", ) return InventoryItem( rel_path=path, category=cat, disposition=Disposition.NEEDS_REVIEW, description="脚本目录下的非 Python 文件,需确认用途", ) # --- 优先级 11: gui/ --- if top == "gui/" or path == "gui": return InventoryItem( rel_path=path, category=Category.GUI, disposition=Disposition.KEEP, description="GUI 模块", ) # --- 优先级 12: 构建与部署 --- if base in _BUILD_DEPLOY_BASENAMES or ext in _BUILD_DEPLOY_EXTENSIONS: return InventoryItem( rel_path=path, category=Category.BUILD_DEPLOY, disposition=Disposition.KEEP, description="构建与部署文件", ) # --- 优先级 13: cli/ --- if top == "cli/" or path == "cli": return InventoryItem( rel_path=path, category=Category.CORE_CODE, disposition=Disposition.KEEP, description="CLI 入口模块", ) # --- 优先级 14: 已知根目录文件 --- if "/" not in path: return _classify_root_file(entry) # --- 兜底 --- return InventoryItem( rel_path=path, category=Category.OTHER, disposition=Disposition.NEEDS_REVIEW, description="未匹配已知规则,需人工确认用途", ) # --------------------------------------------------------------------------- # 子分类函数 # --------------------------------------------------------------------------- def _classify_tmp(entry: FileEntry) -> InventoryItem: """tmp/ 目录下的文件分类。 默认候选删除;有意义的 .py 文件标记为候选归档。 """ ext = entry.extension.lower() base = _basename(entry.rel_path) # 空目录直接候选删除 if entry.is_empty_dir: return InventoryItem( rel_path=entry.rel_path, category=Category.TEMP_DEBUG, disposition=Disposition.CANDIDATE_DELETE, description="临时目录下的空目录", ) # .py 文件可能有参考价值 → 候选归档 if ext == ".py" and len(base) > 4: return InventoryItem( rel_path=entry.rel_path, category=Category.TEMP_DEBUG, disposition=Disposition.CANDIDATE_ARCHIVE, description="临时 Python 脚本,可能有参考价值", ) return InventoryItem( rel_path=entry.rel_path, category=Category.TEMP_DEBUG, disposition=Disposition.CANDIDATE_DELETE, description="临时/调试文件,建议删除", ) def _classify_runtime_output(entry: FileEntry) -> InventoryItem: """logs/、export/ 目录下的运行时产出分类。 __init__.py 保留(包标记),其余候选归档。 """ if _is_init_py(entry.rel_path): return InventoryItem( rel_path=entry.rel_path, category=Category.LOG_OUTPUT, disposition=Disposition.KEEP, description="包初始化文件", ) return InventoryItem( rel_path=entry.rel_path, category=Category.LOG_OUTPUT, disposition=Disposition.CANDIDATE_ARCHIVE, description="运行时产出,建议归档", ) def _classify_database(entry: FileEntry) -> InventoryItem: """database/ 目录下的文件分类。""" path = entry.rel_path ext = entry.extension.lower() # migrations/ 子目录 if "migrations/" in path or path.endswith("migrations"): return InventoryItem( rel_path=path, category=Category.DATABASE_DEF, disposition=Disposition.KEEP, description="数据库迁移脚本", ) # .sql 文件 if ext == ".sql": return InventoryItem( rel_path=path, category=Category.DATABASE_DEF, disposition=Disposition.KEEP, description="数据库 DDL/DML 脚本", ) # .py 文件 → 核心代码 if ext == ".py": return InventoryItem( rel_path=path, category=Category.CORE_CODE, disposition=Disposition.KEEP, description="数据库操作模块", ) # 目录本身 if entry.is_dir: if entry.is_empty_dir: return InventoryItem( rel_path=path, category=Category.OTHER, disposition=Disposition.CANDIDATE_DELETE, description="数据库目录下的空目录", ) return InventoryItem( rel_path=path, category=Category.DATABASE_DEF, disposition=Disposition.KEEP, description="数据库子目录", ) # 其他文件 return InventoryItem( rel_path=path, category=Category.DATABASE_DEF, disposition=Disposition.NEEDS_REVIEW, description="数据库目录下的非标准文件,需确认", ) def _classify_root_file(entry: FileEntry) -> InventoryItem: """根目录散落文件的分类。""" ext = entry.extension.lower() base = _basename(entry.rel_path) # 已知构建文件 if base in _BUILD_DEPLOY_BASENAMES or ext in _BUILD_DEPLOY_EXTENSIONS: return InventoryItem( rel_path=entry.rel_path, category=Category.BUILD_DEPLOY, disposition=Disposition.KEEP, description="构建与部署文件", ) # 已知配置文件 if base in ( "requirements.txt", "pytest.ini", ".env", ".env.example", ".gitignore", ".flake8", "pyproject.toml", ): return InventoryItem( rel_path=entry.rel_path, category=Category.CONFIG, disposition=Disposition.KEEP, description="项目配置文件", ) # README if base.lower().startswith("readme"): return InventoryItem( rel_path=entry.rel_path, category=Category.DOCS, disposition=Disposition.KEEP, description="项目说明文档", ) # 其他根目录文件 → 待确认 return InventoryItem( rel_path=entry.rel_path, category=Category.OTHER, disposition=Disposition.NEEDS_REVIEW, description=f"根目录散落文件(`{base}`),需确认用途", ) # --------------------------------------------------------------------------- # build_inventory — 批量分类 # --------------------------------------------------------------------------- def build_inventory(entries: list[FileEntry]) -> list[InventoryItem]: """对所有文件条目执行分类,返回清单列表。""" return [classify(e) for e in entries] # --------------------------------------------------------------------------- # render_inventory_report — Markdown 渲染 # --------------------------------------------------------------------------- def render_inventory_report(items: list[InventoryItem], repo_root: str) -> str: """生成 Markdown 格式的文件清单报告。 报告结构: - 头部:标题、生成时间、仓库路径 - 主体:按 Category 分组的表格 - 尾部:统计摘要 """ lines: list[str] = [] # --- 头部 --- now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") lines.append("# 文件清单报告") lines.append("") lines.append(f"- 生成时间:{now}") lines.append(f"- 仓库路径:`{repo_root}`") lines.append("") # --- 按分类分组 --- # 保持 Category 枚举定义顺序 cat_order = {c: i for i, c in enumerate(Category)} sorted_items = sorted(items, key=lambda it: cat_order[it.category]) for cat, group in groupby(sorted_items, key=lambda it: it.category): group_list = list(group) lines.append(f"## {cat.value}") lines.append("") lines.append("| 相对路径 | 处置标签 | 简要说明 |") lines.append("|---|---|---|") for item in group_list: lines.append( f"| `{item.rel_path}` | {item.disposition.value} | {item.description} |" ) lines.append("") # --- 统计摘要 --- lines.append("## 统计摘要") lines.append("") # 各分类计数 cat_counter: Counter[Category] = Counter() disp_counter: Counter[Disposition] = Counter() for item in items: cat_counter[item.category] += 1 disp_counter[item.disposition] += 1 lines.append("### 按用途分类") lines.append("") lines.append("| 分类 | 数量 |") lines.append("|---|---|") for cat in Category: count = cat_counter.get(cat, 0) if count > 0: lines.append(f"| {cat.value} | {count} |") lines.append("") lines.append("### 按处置标签") lines.append("") lines.append("| 标签 | 数量 |") lines.append("|---|---|") for disp in Disposition: count = disp_counter.get(disp, 0) if count > 0: lines.append(f"| {disp.value} | {count} |") lines.append("") lines.append(f"**总计:{len(items)} 个条目**") lines.append("") return "\n".join(lines)