# -*- coding: utf-8 -*- """ 仓库扫描器 — 递归遍历仓库文件系统,返回结构化的文件元信息。 仅执行只读操作:读取文件元信息(大小、类型),不修改任何文件。 遇到权限错误时跳过并记录日志,不中断扫描流程。 """ from __future__ import annotations import fnmatch import logging from pathlib import Path from scripts.audit import FileEntry logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # 排除模式 # --------------------------------------------------------------------------- EXCLUDED_PATTERNS: list[str] = [ ".git", "__pycache__", ".pytest_cache", "*.pyc", ".kiro", ] # --------------------------------------------------------------------------- # 排除匹配逻辑 # --------------------------------------------------------------------------- def _is_excluded(name: str, patterns: list[str]) -> bool: """判断文件/目录名是否匹配任一排除模式。 支持两种模式: - 精确匹配(如 ".git"、"__pycache__") - 通配符匹配(如 "*.pyc"),使用 fnmatch 语义 """ for pat in patterns: if fnmatch.fnmatch(name, pat): return True return False # --------------------------------------------------------------------------- # 递归遍历 # --------------------------------------------------------------------------- def _walk( root: Path, base: Path, exclude: list[str], results: list[FileEntry], ) -> None: """递归遍历 *root* 下的文件和目录,将结果追加到 *results*。 Parameters ---------- root : Path 当前要遍历的目录。 base : Path 仓库根目录,用于计算相对路径。 exclude : list[str] 排除模式列表。 results : list[FileEntry] 收集结果的列表(就地修改)。 """ try: children = sorted(root.iterdir(), key=lambda p: p.name) except (PermissionError, OSError) as exc: logger.warning("无法读取目录 %s: %s", root, exc) return # 用于判断当前目录是否为"空目录"(排除后无可见子项) visible_count = 0 for child in children: if _is_excluded(child.name, exclude): continue visible_count += 1 rel = child.relative_to(base).as_posix() if child.is_dir(): # 先递归子目录,再判断该目录是否为空 sub_start = len(results) _walk(child, base, exclude, results) sub_end = len(results) # 该目录下递归产生的条目数为 0 → 空目录 is_empty = (sub_end == sub_start) results.append(FileEntry( rel_path=rel, is_dir=True, size_bytes=0, extension="", is_empty_dir=is_empty, )) else: # 文件 try: size = child.stat().st_size except (PermissionError, OSError) as exc: logger.warning("无法获取文件信息 %s: %s", child, exc) continue results.append(FileEntry( rel_path=rel, is_dir=False, size_bytes=size, extension=child.suffix.lower(), is_empty_dir=False, )) # 如果 root 是仓库根目录自身,不需要额外处理 # (根目录不作为条目出现在结果中) def scan_repo( root: Path, exclude: list[str] | None = None, ) -> list[FileEntry]: """递归扫描仓库,返回所有文件和目录的元信息列表。 Parameters ---------- root : Path 仓库根目录路径。 exclude : list[str] | None 排除模式列表,默认使用 EXCLUDED_PATTERNS。 Returns ------- list[FileEntry] 按 rel_path 排序的文件/目录元信息列表。 """ if exclude is None: exclude = EXCLUDED_PATTERNS results: list[FileEntry] = [] _walk(root, root, exclude, results) # 按相对路径排序,保证输出稳定 results.sort(key=lambda e: e.rel_path) return results