在准备环境前提交次全部更改。

This commit is contained in:
Neo
2026-02-19 08:35:13 +08:00
parent ded6dfb9d8
commit 4eac07da47
1387 changed files with 6107191 additions and 33002 deletions

View File

@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
"""
仓库治理只读审计 — 共享数据模型
定义审计脚本各模块共用的 dataclass 和枚举类型。
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
# ---------------------------------------------------------------------------
# 文件元信息
# ---------------------------------------------------------------------------
@dataclass
class FileEntry:
"""单个文件/目录的元信息。"""
rel_path: str # 相对于仓库根目录的路径
is_dir: bool # 是否为目录
size_bytes: int # 文件大小(目录为 0
extension: str # 文件扩展名(小写,含点号)
is_empty_dir: bool # 是否为空目录
# ---------------------------------------------------------------------------
# 用途分类与处置标签
# ---------------------------------------------------------------------------
class Category(str, Enum):
"""文件用途分类。"""
CORE_CODE = "核心代码"
CONFIG = "配置"
DATABASE_DEF = "数据库定义"
TEST = "测试"
DOCS = "文档"
SCRIPTS = "脚本工具"
GUI = "GUI"
BUILD_DEPLOY = "构建与部署"
LOG_OUTPUT = "日志与输出"
TEMP_DEBUG = "临时与调试"
OTHER = "其他"
class Disposition(str, Enum):
"""处置标签。"""
KEEP = "保留"
CANDIDATE_DELETE = "候选删除"
CANDIDATE_ARCHIVE = "候选归档"
NEEDS_REVIEW = "待确认"
# ---------------------------------------------------------------------------
# 文件清单条目
# ---------------------------------------------------------------------------
@dataclass
class InventoryItem:
"""清单条目:路径 + 分类 + 处置 + 说明。"""
rel_path: str
category: Category
disposition: Disposition
description: str
# ---------------------------------------------------------------------------
# 流程树节点
# ---------------------------------------------------------------------------
@dataclass
class FlowNode:
"""流程树节点。"""
name: str # 节点名称(模块名/类名/函数名)
source_file: str # 所在源文件路径
node_type: str # 类型entry / module / class / function
children: list[FlowNode] = field(default_factory=list)
# ---------------------------------------------------------------------------
# 文档对齐
# ---------------------------------------------------------------------------
@dataclass
class DocMapping:
"""文档与代码的映射关系。"""
doc_path: str # 文档文件路径
doc_topic: str # 文档主题
related_code: list[str] # 关联的代码文件/模块
status: str # 状态aligned / stale / conflict / orphan
@dataclass
class AlignmentIssue:
"""对齐问题。"""
doc_path: str # 文档路径
issue_type: str # stale / conflict / missing
description: str # 问题描述
related_code: str # 关联代码路径

View File

@@ -0,0 +1,608 @@
# -*- coding: utf-8 -*-
"""
文档对齐分析器 — 检查文档与代码之间的映射关系、过期点、冲突点和缺失点。
文档来源:
- docs/ 目录(.md, .txt, .csv, .json
- 根目录 README.md
- 各模块内的 README.md
- .kiro/steering/ 引导文件
- docs/test-json-doc/ API 响应样本
"""
from __future__ import annotations
import json
import re
from datetime import datetime, timezone
from pathlib import Path
from scripts.audit import AlignmentIssue, DocMapping
# ---------------------------------------------------------------------------
# 常量
# ---------------------------------------------------------------------------
# 文档文件扩展名
_DOC_EXTENSIONS = {".md", ".txt", ".csv"}
# 核心代码目录——缺少文档时应报告
_CORE_CODE_DIRS = {
"tasks",
"loaders",
"orchestration",
"quality",
"models",
"utils",
"api",
"scd",
"config",
"database",
}
# ODS 表中的通用元数据列,比对时忽略
_ODS_META_COLUMNS = {"content_hash", "payload", "created_at", "updated_at", "id"}
# SQL 关键字,解析 DDL 列名时排除
_SQL_KEYWORDS = {
"primary", "key", "not", "null", "default", "unique", "check",
"references", "foreign", "constraint", "index", "create", "table",
"if", "exists", "serial", "bigserial", "true", "false",
}
# ---------------------------------------------------------------------------
# 安全读取文件(编码回退)
# ---------------------------------------------------------------------------
def _safe_read(path: Path) -> str:
"""尝试以 utf-8 → gbk → latin-1 回退读取文件内容。"""
for enc in ("utf-8", "gbk", "latin-1"):
try:
return path.read_text(encoding=enc)
except (UnicodeDecodeError, UnicodeError):
continue
return ""
# ---------------------------------------------------------------------------
# scan_docs — 扫描所有文档来源
# ---------------------------------------------------------------------------
def scan_docs(repo_root: Path) -> list[str]:
"""扫描所有文档文件路径,返回相对路径列表(已排序)。
文档来源:
1. docs/ 目录下的 .md, .txt, .csv, .json 文件
2. 根目录 README.md
3. 各模块内的 README.md如 gui/README.md
4. .kiro/steering/ 引导文件
"""
results: list[str] = []
def _rel(p: Path) -> str:
"""返回归一化的正斜杠相对路径。"""
return str(p.relative_to(repo_root)).replace("\\", "/")
# 1. docs/ 目录(递归,含 test-json-doc 下的 .json
docs_dir = repo_root / "docs"
if docs_dir.is_dir():
for p in docs_dir.rglob("*"):
if p.is_file():
ext = p.suffix.lower()
if ext in _DOC_EXTENSIONS or ext == ".json":
results.append(_rel(p))
# 2. 根目录 README.md
root_readme = repo_root / "README.md"
if root_readme.is_file():
results.append("README.md")
# 3. 各模块内的 README.md
for child in sorted(repo_root.iterdir()):
if child.is_dir() and child.name not in ("docs", ".kiro"):
readme = child / "README.md"
if readme.is_file():
results.append(_rel(readme))
# 4. .kiro/steering/
steering_dir = repo_root / ".kiro" / "steering"
if steering_dir.is_dir():
for p in sorted(steering_dir.iterdir()):
if p.is_file():
results.append(_rel(p))
return sorted(set(results))
# ---------------------------------------------------------------------------
# extract_code_references — 从文档提取代码引用
# ---------------------------------------------------------------------------
def extract_code_references(doc_path: Path) -> list[str]:
"""从文档中提取代码引用(反引号内的文件路径、类名、函数名等)。
规则:
- 提取反引号内的内容
- 跳过单字符引用
- 跳过纯数字/版本号
- 反斜杠归一化为正斜杠
- 去重
"""
if not doc_path.is_file():
return []
text = _safe_read(doc_path)
if not text:
return []
# 提取反引号内容
backtick_refs = re.findall(r"`([^`]+)`", text)
seen: set[str] = set()
results: list[str] = []
for raw in backtick_refs:
ref = raw.strip()
# 归一化反斜杠
ref = ref.replace("\\", "/")
# 跳过单字符
if len(ref) <= 1:
continue
# 跳过纯数字和版本号
if re.fullmatch(r"[\d.]+", ref):
continue
# 去重
if ref in seen:
continue
seen.add(ref)
results.append(ref)
return results
# ---------------------------------------------------------------------------
# check_reference_validity — 检查引用有效性
# ---------------------------------------------------------------------------
def check_reference_validity(ref: str, repo_root: Path) -> bool:
"""检查文档中的代码引用是否仍然有效。
检查策略:
1. 直接作为文件/目录路径检查
2. 去掉 FQ-ETL/ 前缀后检查(兼容旧文档引用)
3. 将点号路径转为文件路径检查(如 config.settings → config/settings.py
"""
# 1. 直接路径
if (repo_root / ref).exists():
return True
# 2. 去掉旧包名前缀(兼容历史文档)
for prefix in ("FQ-ETL/", "etl_billiards/"):
if ref.startswith(prefix):
stripped = ref[len(prefix):]
if (repo_root / stripped).exists():
return True
# 3. 点号模块路径 → 文件路径
if "." in ref and "/" not in ref:
as_path = ref.replace(".", "/") + ".py"
if (repo_root / as_path).exists():
return True
# 也可能是目录(包)
as_dir = ref.replace(".", "/")
if (repo_root / as_dir).is_dir():
return True
return False
# ---------------------------------------------------------------------------
# find_undocumented_modules — 找出缺少文档的核心代码模块
# ---------------------------------------------------------------------------
def find_undocumented_modules(
repo_root: Path,
documented: set[str],
) -> list[str]:
"""找出缺少文档的核心代码模块。
只检查 _CORE_CODE_DIRS 中的 .py 文件(排除 __init__.py
返回已排序的相对路径列表。
"""
undocumented: list[str] = []
for core_dir in sorted(_CORE_CODE_DIRS):
dir_path = repo_root / core_dir
if not dir_path.is_dir():
continue
for py_file in dir_path.rglob("*.py"):
if py_file.name == "__init__.py":
continue
rel = str(py_file.relative_to(repo_root))
# 归一化路径分隔符
rel = rel.replace("\\", "/")
if rel not in documented:
undocumented.append(rel)
return sorted(undocumented)
# ---------------------------------------------------------------------------
# DDL / 数据字典解析辅助函数
# ---------------------------------------------------------------------------
def _parse_ddl_tables(sql: str) -> dict[str, set[str]]:
"""从 DDL SQL 中提取表名和列名。
返回 {表名: {列名集合}} 字典。
支持带 schema 前缀的表名(如 dwd.dim_member → dim_member
"""
tables: dict[str, set[str]] = {}
# 匹配 CREATE TABLE [IF NOT EXISTS] [schema.]table_name (
create_re = re.compile(
r"CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?"
r"(?:\w+\.)?(\w+)\s*\(",
re.IGNORECASE,
)
for match in create_re.finditer(sql):
table_name = match.group(1)
# 找到对应的括号内容
start = match.end()
depth = 1
pos = start
while pos < len(sql) and depth > 0:
if sql[pos] == "(":
depth += 1
elif sql[pos] == ")":
depth -= 1
pos += 1
body = sql[start:pos - 1]
columns: set[str] = set()
# 逐行提取列名——取每行第一个标识符
for line in body.split("\n"):
line = line.strip().rstrip(",")
if not line:
continue
# 提取第一个单词
col_match = re.match(r"(\w+)", line)
if col_match:
col_name = col_match.group(1).lower()
# 排除 SQL 关键字
if col_name not in _SQL_KEYWORDS:
columns.add(col_name)
tables[table_name] = columns
return tables
def _parse_dictionary_tables(md: str) -> dict[str, set[str]]:
"""从数据字典 Markdown 中提取表名和字段名。
约定:
- 表名出现在 ## 标题中(可能带反引号)
- 字段名出现在 Markdown 表格的第一列
- 跳过表头行(含"字段"字样)和分隔行(含 ---
"""
tables: dict[str, set[str]] = {}
current_table: str | None = None
for line in md.split("\n"):
# 匹配 ## 标题中的表名
heading = re.match(r"^##\s+`?(\w+)`?", line)
if heading:
current_table = heading.group(1)
tables[current_table] = set()
continue
if current_table is None:
continue
# 跳过分隔行
if re.match(r"^\s*\|[-\s|]+\|\s*$", line):
continue
# 解析表格行
row_match = re.match(r"^\s*\|\s*(\S+)", line)
if row_match:
field = row_match.group(1)
# 跳过表头(含"字段"字样)
if field in ("字段",):
continue
tables[current_table].add(field)
return tables
# ---------------------------------------------------------------------------
# check_ddl_vs_dictionary — DDL 与数据字典比对
# ---------------------------------------------------------------------------
def check_ddl_vs_dictionary(repo_root: Path) -> list[AlignmentIssue]:
"""比对 DDL 文件与数据字典文档的覆盖度。
检查:
1. DDL 中有但字典中没有的表 → missing
2. 同名表中 DDL 有但字典没有的列 → conflict
"""
issues: list[AlignmentIssue] = []
# 收集所有 DDL 表定义
ddl_tables: dict[str, set[str]] = {}
db_dir = repo_root / "database"
if db_dir.is_dir():
for sql_file in sorted(db_dir.glob("schema_*.sql")):
content = _safe_read(sql_file)
for tbl, cols in _parse_ddl_tables(content).items():
if tbl in ddl_tables:
ddl_tables[tbl] |= cols
else:
ddl_tables[tbl] = set(cols)
# 收集所有数据字典表定义
dict_tables: dict[str, set[str]] = {}
docs_dir = repo_root / "docs"
if docs_dir.is_dir():
for dict_file in sorted(docs_dir.glob("*dictionary*.md")):
content = _safe_read(dict_file)
for tbl, fields in _parse_dictionary_tables(content).items():
if tbl in dict_tables:
dict_tables[tbl] |= fields
else:
dict_tables[tbl] = set(fields)
# 比对
for tbl, ddl_cols in sorted(ddl_tables.items()):
if tbl not in dict_tables:
issues.append(AlignmentIssue(
doc_path="docs/*dictionary*.md",
issue_type="missing",
description=f"DDL 定义了表 `{tbl}`,但数据字典中未收录",
related_code=f"database/schema_*.sql ({tbl})",
))
else:
# 检查列差异
dict_cols = dict_tables[tbl]
missing_cols = ddl_cols - dict_cols
for col in sorted(missing_cols):
issues.append(AlignmentIssue(
doc_path="docs/*dictionary*.md",
issue_type="conflict",
description=f"表 `{tbl}` 的列 `{col}` 在 DDL 中存在但数据字典中缺失",
related_code=f"database/schema_*.sql ({tbl}.{col})",
))
return issues
# ---------------------------------------------------------------------------
# check_api_samples_vs_parsers — API 样本与解析器比对
# ---------------------------------------------------------------------------
def check_api_samples_vs_parsers(repo_root: Path) -> list[AlignmentIssue]:
"""比对 API 响应样本与 ODS 表结构的一致性。
策略:
1. 扫描 docs/test-json-doc/ 下的 .json 文件
2. 提取 JSON 中的顶层字段名
3. 从 ODS DDL 中查找同名表
4. 比对字段差异(忽略 ODS 元数据列)
"""
issues: list[AlignmentIssue] = []
sample_dir = repo_root / "docs" / "test-json-doc"
if not sample_dir.is_dir():
return issues
# 收集 ODS 表定义(保留全部列,比对时忽略元数据列)
ods_tables: dict[str, set[str]] = {}
db_dir = repo_root / "database"
if db_dir.is_dir():
for sql_file in sorted(db_dir.glob("schema_*ODS*.sql")):
content = _safe_read(sql_file)
for tbl, cols in _parse_ddl_tables(content).items():
ods_tables[tbl] = cols
# 逐个样本文件比对
for json_file in sorted(sample_dir.glob("*.json")):
entity_name = json_file.stem # 文件名(不含扩展名)作为实体名
# 解析 JSON 样本
try:
content = _safe_read(json_file)
data = json.loads(content)
except (json.JSONDecodeError, ValueError):
continue
# 提取顶层字段名
sample_fields: set[str] = set()
if isinstance(data, list) and data:
# 数组格式——取第一个元素的键
first = data[0]
if isinstance(first, dict):
sample_fields = set(first.keys())
elif isinstance(data, dict):
sample_fields = set(data.keys())
if not sample_fields:
continue
# 查找匹配的 ODS 表
matched_table: str | None = None
matched_cols: set[str] = set()
for tbl, cols in ods_tables.items():
# 表名包含实体名(如 test_entity 匹配 ods.test_entity
tbl_lower = tbl.lower()
entity_lower = entity_name.lower()
if entity_lower in tbl_lower or tbl_lower == entity_lower:
matched_table = tbl
matched_cols = cols
break
if matched_table is None:
continue
# 比对:样本中有但 ODS 表中没有的字段
extra_fields = sample_fields - matched_cols
for field in sorted(extra_fields):
issues.append(AlignmentIssue(
doc_path=f"docs/test-json-doc/{json_file.name}",
issue_type="conflict",
description=(
f"API 样本字段 `{field}` 在 ODS 表 `{matched_table}` 中未定义"
),
related_code=f"database/schema_*ODS*.sql ({matched_table})",
))
return issues
# ---------------------------------------------------------------------------
# build_mappings — 构建文档与代码的映射关系
# ---------------------------------------------------------------------------
def build_mappings(
doc_paths: list[str],
repo_root: Path,
) -> list[DocMapping]:
"""为每份文档建立与代码模块的映射关系。"""
mappings: list[DocMapping] = []
for doc_rel in doc_paths:
doc_path = repo_root / doc_rel
refs = extract_code_references(doc_path)
# 确定关联代码和状态
valid_refs: list[str] = []
has_stale = False
for ref in refs:
if check_reference_validity(ref, repo_root):
valid_refs.append(ref)
else:
has_stale = True
# 推断文档主题(取文件名或第一行标题)
topic = _infer_topic(doc_path, doc_rel)
if not refs:
status = "orphan"
elif has_stale:
status = "stale"
else:
status = "aligned"
mappings.append(DocMapping(
doc_path=doc_rel,
doc_topic=topic,
related_code=valid_refs,
status=status,
))
return mappings
def _infer_topic(doc_path: Path, doc_rel: str) -> str:
"""从文档推断主题——优先取 Markdown 一级标题,否则用文件名。"""
if doc_path.is_file() and doc_path.suffix.lower() in (".md", ".txt"):
try:
text = _safe_read(doc_path)
for line in text.split("\n"):
line = line.strip()
if line.startswith("# "):
return line[2:].strip()
except Exception:
pass
return doc_rel
# ---------------------------------------------------------------------------
# render_alignment_report — 生成 Markdown 格式的文档对齐报告
# ---------------------------------------------------------------------------
def render_alignment_report(
mappings: list[DocMapping],
issues: list[AlignmentIssue],
repo_root: str,
) -> str:
"""生成 Markdown 格式的文档对齐报告。
分区:映射关系表、过期点列表、冲突点列表、缺失点列表、统计摘要。
"""
lines: list[str] = []
# --- 头部 ---
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
lines.append("# 文档对齐报告")
lines.append("")
lines.append(f"- 生成时间:{now}")
lines.append(f"- 仓库路径:`{repo_root}`")
lines.append("")
# --- 映射关系 ---
lines.append("## 映射关系")
lines.append("")
if mappings:
lines.append("| 文档路径 | 主题 | 关联代码 | 状态 |")
lines.append("|---|---|---|---|")
for m in mappings:
code_str = ", ".join(f"`{c}`" for c in m.related_code) if m.related_code else ""
lines.append(f"| `{m.doc_path}` | {m.doc_topic} | {code_str} | {m.status} |")
else:
lines.append("未发现文档映射关系。")
lines.append("")
# --- 按 issue_type 分组 ---
stale = [i for i in issues if i.issue_type == "stale"]
conflict = [i for i in issues if i.issue_type == "conflict"]
missing = [i for i in issues if i.issue_type == "missing"]
# --- 过期点 ---
lines.append("## 过期点")
lines.append("")
if stale:
lines.append("| 文档路径 | 描述 | 关联代码 |")
lines.append("|---|---|---|")
for i in stale:
lines.append(f"| `{i.doc_path}` | {i.description} | `{i.related_code}` |")
else:
lines.append("未发现过期点。")
lines.append("")
# --- 冲突点 ---
lines.append("## 冲突点")
lines.append("")
if conflict:
lines.append("| 文档路径 | 描述 | 关联代码 |")
lines.append("|---|---|---|")
for i in conflict:
lines.append(f"| `{i.doc_path}` | {i.description} | `{i.related_code}` |")
else:
lines.append("未发现冲突点。")
lines.append("")
# --- 缺失点 ---
lines.append("## 缺失点")
lines.append("")
if missing:
lines.append("| 文档路径 | 描述 | 关联代码 |")
lines.append("|---|---|---|")
for i in missing:
lines.append(f"| `{i.doc_path}` | {i.description} | `{i.related_code}` |")
else:
lines.append("未发现缺失点。")
lines.append("")
# --- 统计摘要 ---
lines.append("## 统计摘要")
lines.append("")
lines.append(f"- 文档总数:{len(mappings)}")
lines.append(f"- 过期点数量:{len(stale)}")
lines.append(f"- 冲突点数量:{len(conflict)}")
lines.append(f"- 缺失点数量:{len(missing)}")
lines.append("")
return "\n".join(lines)

View File

@@ -0,0 +1,618 @@
# -*- coding: utf-8 -*-
"""
流程树分析器 — 通过静态分析 Python 源码的 import 语句和类继承关系,
构建从入口到末端模块的调用树。
仅执行只读操作:读取并解析 Python 源文件,不修改任何文件。
"""
from __future__ import annotations
import ast
import logging
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from scripts.audit import FileEntry, FlowNode
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 项目内部包名列表(顶层目录中属于项目代码的包)
# ---------------------------------------------------------------------------
_PROJECT_PACKAGES: set[str] = {
"cli", "config", "api", "database", "tasks", "loaders",
"scd", "orchestration", "quality", "models", "utils",
"gui", "scripts",
}
# ---------------------------------------------------------------------------
# 已知的第三方包和标准库顶层模块(用于排除非项目导入)
# ---------------------------------------------------------------------------
_KNOWN_THIRD_PARTY: set[str] = {
"psycopg2", "requests", "dateutil", "python_dateutil",
"dotenv", "openpyxl", "PySide6", "flask", "pyinstaller",
"PyInstaller", "hypothesis", "pytest", "_pytest", "py",
"pluggy", "pkg_resources", "setuptools", "pip", "wheel",
"tzdata", "six", "certifi", "urllib3", "charset_normalizer",
"idna", "shiboken6",
}
def _is_project_module(module_name: str) -> bool:
"""判断模块名是否属于项目内部模块。"""
top = module_name.split(".")[0]
if top in _PROJECT_PACKAGES:
return True
return False
def _is_stdlib_or_third_party(module_name: str) -> bool:
"""判断模块名是否属于标准库或已知第三方包。"""
top = module_name.split(".")[0]
if top in _KNOWN_THIRD_PARTY:
return True
# 检查标准库
if top in sys.stdlib_module_names:
return True
return False
# ---------------------------------------------------------------------------
# 文件读取(多编码回退)
# ---------------------------------------------------------------------------
def _read_source(filepath: Path) -> str | None:
"""读取 Python 源文件内容,尝试 utf-8 → gbk → latin-1 回退。
返回文件内容字符串,读取失败时返回 None。
"""
for encoding in ("utf-8", "gbk", "latin-1"):
try:
return filepath.read_text(encoding=encoding)
except (UnicodeDecodeError, UnicodeError):
continue
except (OSError, PermissionError) as exc:
logger.warning("无法读取文件 %s: %s", filepath, exc)
return None
logger.warning("无法以任何编码读取文件 %s", filepath)
return None
# ---------------------------------------------------------------------------
# 路径 ↔ 模块名转换
# ---------------------------------------------------------------------------
def _path_to_module_name(rel_path: str) -> str:
"""将相对路径转换为 Python 模块名。
例如:
- "cli/main.py""cli.main"
- "cli/__init__.py""cli"
- "tasks/dws/assistant.py""tasks.dws.assistant"
"""
p = rel_path.replace("\\", "/")
if p.endswith("/__init__.py"):
p = p[: -len("/__init__.py")]
elif p.endswith(".py"):
p = p[:-3]
return p.replace("/", ".")
def _module_to_path(module_name: str) -> str:
"""将模块名转换为相对文件路径(优先 .py 文件)。
例如:
- "cli.main""cli/main.py"
- "cli""cli/__init__.py"
"""
return module_name.replace(".", "/") + ".py"
# ---------------------------------------------------------------------------
# parse_imports — 解析 Python 文件的 import 语句
# ---------------------------------------------------------------------------
def parse_imports(filepath: Path) -> list[str]:
"""使用 ast 模块解析 Python 文件的 import 语句,返回被导入的本地模块列表。
- 仅返回项目内部模块(排除标准库和第三方包)
- 结果去重
- 语法错误或文件不存在时返回空列表
"""
if not filepath.exists():
return []
source = _read_source(filepath)
if source is None:
return []
try:
tree = ast.parse(source, filename=str(filepath))
except SyntaxError:
logger.warning("语法错误,无法解析 %s", filepath)
return []
modules: list[str] = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
name = alias.name
if _is_project_module(name) and not _is_stdlib_or_third_party(name):
modules.append(name)
elif isinstance(node, ast.ImportFrom):
if node.module and node.level == 0:
name = node.module
if _is_project_module(name) and not _is_stdlib_or_third_party(name):
modules.append(name)
# 去重并保持顺序
seen: set[str] = set()
result: list[str] = []
for m in modules:
if m not in seen:
seen.add(m)
result.append(m)
return result
# ---------------------------------------------------------------------------
# build_flow_tree — 从入口递归追踪 import 链,构建流程树
# ---------------------------------------------------------------------------
def build_flow_tree(
repo_root: Path,
entry_file: str,
_visited: set[str] | None = None,
) -> FlowNode:
"""从指定入口文件出发,递归追踪 import 链,构建流程树。
Parameters
----------
repo_root : Path
仓库根目录。
entry_file : str
入口文件的相对路径(如 "cli/main.py")。
_visited : set[str] | None
内部使用,防止循环导入导致无限递归。
Returns
-------
FlowNode
以入口文件为根的流程树。
"""
is_root = _visited is None
if _visited is None:
_visited = set()
module_name = _path_to_module_name(entry_file)
node_type = "entry" if is_root else "module"
_visited.add(entry_file)
filepath = repo_root / entry_file
children: list[FlowNode] = []
if filepath.exists():
imported_modules = parse_imports(filepath)
for mod in imported_modules:
child_path = _module_to_path(mod)
# 如果 .py 文件不存在,尝试 __init__.py
if not (repo_root / child_path).exists():
alt_path = mod.replace(".", "/") + "/__init__.py"
if (repo_root / alt_path).exists():
child_path = alt_path
if child_path not in _visited:
child_node = build_flow_tree(repo_root, child_path, _visited)
children.append(child_node)
return FlowNode(
name=module_name,
source_file=entry_file,
node_type=node_type,
children=children,
)
# ---------------------------------------------------------------------------
# 批处理文件解析
# ---------------------------------------------------------------------------
def _parse_bat_python_target(bat_path: Path) -> str | None:
"""从批处理文件中解析 python -m 命令的目标模块名。
返回模块名(如 "cli.main"),未找到时返回 None。
"""
if not bat_path.exists():
return None
content = _read_source(bat_path)
if content is None:
return None
# 匹配 python -m module.name 或 python3 -m module.name
pattern = re.compile(r"python[3]?\s+-m\s+([\w.]+)", re.IGNORECASE)
for line in content.splitlines():
m = pattern.search(line)
if m:
return m.group(1)
return None
# ---------------------------------------------------------------------------
# 入口点识别
# ---------------------------------------------------------------------------
def discover_entry_points(repo_root: Path) -> list[dict[str, str]]:
"""识别项目的所有入口点。
返回字典列表,每个字典包含:
- type: 入口类型CLI / GUI / 批处理 / 运维脚本)
- file: 相对路径
- description: 简要说明
识别规则:
- cli/main.py → CLI 入口
- gui/main.py → GUI 入口
- *.bat 文件 → 解析其中的 python -m 命令
- scripts/*.py含 if __name__ == "__main__",排除 __init__.py 和 audit/ 子目录)
"""
entries: list[dict[str, str]] = []
# CLI 入口
cli_main = repo_root / "cli" / "main.py"
if cli_main.exists():
entries.append({
"type": "CLI",
"file": "cli/main.py",
"description": "CLI 主入口 (`python -m cli.main`)",
})
# GUI 入口
gui_main = repo_root / "gui" / "main.py"
if gui_main.exists():
entries.append({
"type": "GUI",
"file": "gui/main.py",
"description": "GUI 主入口 (`python -m gui.main`)",
})
# 批处理文件
for bat in sorted(repo_root.glob("*.bat")):
target = _parse_bat_python_target(bat)
desc = f"批处理脚本"
if target:
desc += f",调用 `{target}`"
entries.append({
"type": "批处理",
"file": bat.name,
"description": desc,
})
# 运维脚本scripts/ 下的 .py 文件(排除 __init__.py 和 audit/ 子目录)
scripts_dir = repo_root / "scripts"
if scripts_dir.is_dir():
for py_file in sorted(scripts_dir.glob("*.py")):
if py_file.name == "__init__.py":
continue
# 检查是否包含 if __name__ == "__main__"
source = _read_source(py_file)
if source and '__name__' in source and '__main__' in source:
rel = py_file.relative_to(repo_root).as_posix()
entries.append({
"type": "运维脚本",
"file": rel,
"description": f"运维脚本 `{py_file.name}`",
})
return entries
# ---------------------------------------------------------------------------
# 任务类型和加载器类型区分
# ---------------------------------------------------------------------------
def classify_task_type(rel_path: str) -> str:
"""根据文件路径区分任务类型。
返回值:
- "ODS 抓取任务"
- "DWD 加载任务"
- "DWS 汇总任务"
- "校验任务"
- "Schema 初始化任务"
- "任务"(无法细分时的默认值)
"""
p = rel_path.replace("\\", "/").lower()
if "verification/" in p or "verification\\" in p:
return "校验任务"
if "dws/" in p or "dws\\" in p:
return "DWS 汇总任务"
# 文件名级别判断
basename = p.rsplit("/", 1)[-1] if "/" in p else p
if basename.startswith("ods_") or basename.startswith("ods."):
return "ODS 抓取任务"
if basename.startswith("dwd_") or basename.startswith("dwd."):
return "DWD 加载任务"
if basename.startswith("dws_"):
return "DWS 汇总任务"
if "init" in basename and "schema" in basename:
return "Schema 初始化任务"
return "任务"
def classify_loader_type(rel_path: str) -> str:
"""根据文件路径区分加载器类型。
返回值:
- "维度加载器 (SCD2)"
- "事实表加载器"
- "ODS 通用加载器"
- "加载器"(无法细分时的默认值)
"""
p = rel_path.replace("\\", "/").lower()
if "dimensions/" in p or "dimensions\\" in p:
return "维度加载器 (SCD2)"
if "facts/" in p or "facts\\" in p:
return "事实表加载器"
if "ods/" in p or "ods\\" in p:
return "ODS 通用加载器"
return "加载器"
# ---------------------------------------------------------------------------
# find_orphan_modules — 找出未被任何入口直接或间接引用的 Python 模块
# ---------------------------------------------------------------------------
def find_orphan_modules(
repo_root: Path,
all_entries: list[FileEntry],
reachable: set[str],
) -> list[str]:
"""找出未被任何入口直接或间接引用的 Python 模块。
排除规则(不视为孤立):
- __init__.py 文件
- tests/ 目录下的文件
- scripts/audit/ 目录下的文件(审计脚本自身)
- 目录条目
- 非 .py 文件
- 不属于项目包的文件
返回按路径排序的孤立模块列表。
"""
orphans: list[str] = []
for entry in all_entries:
# 跳过目录
if entry.is_dir:
continue
# 只关注 .py 文件
if entry.extension != ".py":
continue
rel = entry.rel_path.replace("\\", "/")
# 排除 __init__.py
if rel.endswith("/__init__.py") or rel == "__init__.py":
continue
# 排除测试文件
if rel.startswith("tests/") or rel.startswith("tests\\"):
continue
# 排除审计脚本自身
if rel.startswith("scripts/audit/") or rel.startswith("scripts\\audit\\"):
continue
# 只检查属于项目包的文件
top_dir = rel.split("/")[0] if "/" in rel else ""
if top_dir not in _PROJECT_PACKAGES:
continue
# 不在可达集合中 → 孤立
if rel not in reachable:
orphans.append(rel)
orphans.sort()
return orphans
# ---------------------------------------------------------------------------
# 统计辅助
# ---------------------------------------------------------------------------
def _count_nodes_by_type(trees: list[FlowNode]) -> dict[str, int]:
"""递归统计流程树中各类型节点的数量。"""
counts: dict[str, int] = {"entry": 0, "module": 0, "class": 0, "function": 0}
def _walk(node: FlowNode) -> None:
t = node.node_type
counts[t] = counts.get(t, 0) + 1
for child in node.children:
_walk(child)
for tree in trees:
_walk(tree)
return counts
def _count_tasks_and_loaders(trees: list[FlowNode]) -> tuple[int, int]:
"""统计流程树中任务模块和加载器模块的数量。"""
tasks = 0
loaders = 0
seen: set[str] = set()
def _walk(node: FlowNode) -> None:
nonlocal tasks, loaders
if node.source_file in seen:
return
seen.add(node.source_file)
sf = node.source_file.replace("\\", "/")
if sf.startswith("tasks/") and not sf.endswith("__init__.py"):
base = sf.rsplit("/", 1)[-1]
if not base.startswith("base_"):
tasks += 1
if sf.startswith("loaders/") and not sf.endswith("__init__.py"):
base = sf.rsplit("/", 1)[-1]
if not base.startswith("base_"):
loaders += 1
for child in node.children:
_walk(child)
for tree in trees:
_walk(tree)
return tasks, loaders
# ---------------------------------------------------------------------------
# 类型标注辅助
# ---------------------------------------------------------------------------
def _get_type_annotation(source_file: str) -> str:
"""根据源文件路径返回类型标注字符串(用于报告中的节点标注)。"""
sf = source_file.replace("\\", "/")
if sf.startswith("tasks/"):
return f" [{classify_task_type(sf)}]"
if sf.startswith("loaders/"):
return f" [{classify_loader_type(sf)}]"
return ""
# ---------------------------------------------------------------------------
# Mermaid 图生成
# ---------------------------------------------------------------------------
def _render_mermaid(trees: list[FlowNode]) -> str:
"""生成 Mermaid 流程图代码。"""
lines: list[str] = ["```mermaid", "graph TD"]
seen_edges: set[tuple[str, str]] = set()
node_ids: dict[str, str] = {}
counter = [0]
def _node_id(name: str) -> str:
if name not in node_ids:
node_ids[name] = f"N{counter[0]}"
counter[0] += 1
return node_ids[name]
def _walk(node: FlowNode) -> None:
nid = _node_id(node.name)
annotation = _get_type_annotation(node.source_file)
label = f"{node.name}{annotation}"
# 声明节点
lines.append(f" {nid}[\"`{label}`\"]")
for child in node.children:
cid = _node_id(child.name)
edge = (nid, cid)
if edge not in seen_edges:
seen_edges.add(edge)
lines.append(f" {nid} --> {cid}")
_walk(child)
for tree in trees:
_walk(tree)
lines.append("```")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# 缩进文本树生成
# ---------------------------------------------------------------------------
def _render_text_tree(trees: list[FlowNode]) -> str:
"""生成缩进文本形式的流程树。"""
lines: list[str] = []
seen: set[str] = set()
def _walk(node: FlowNode, depth: int) -> None:
indent = " " * depth
annotation = _get_type_annotation(node.source_file)
line = f"{indent}- `{node.name}` (`{node.source_file}`){annotation}"
lines.append(line)
key = node.source_file
if key in seen:
# 已展开过,不再递归(避免循环)
if node.children:
lines.append(f"{indent} - *(已展开)*")
return
seen.add(key)
for child in node.children:
_walk(child, depth + 1)
for tree in trees:
_walk(tree, 0)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# render_flow_report — 生成 Markdown 格式的流程树报告
# ---------------------------------------------------------------------------
def render_flow_report(
trees: list[FlowNode],
orphans: list[str],
repo_root: str,
) -> str:
"""生成 Markdown 格式的流程树报告(含 Mermaid 图和缩进文本)。
报告结构:
1. 头部(时间戳、仓库路径)
2. Mermaid 流程图
3. 缩进文本树
4. 孤立模块列表
5. 统计摘要
"""
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
sections: list[str] = []
# --- 头部 ---
sections.append("# 项目流程树报告\n")
sections.append(f"- 生成时间: {timestamp}")
sections.append(f"- 仓库路径: `{repo_root}`\n")
# --- Mermaid 图 ---
sections.append("## 流程图Mermaid\n")
sections.append(_render_mermaid(trees))
sections.append("")
# --- 缩进文本树 ---
sections.append("## 流程树(缩进文本)\n")
sections.append(_render_text_tree(trees))
sections.append("")
# --- 孤立模块 ---
sections.append("## 孤立模块\n")
if orphans:
for o in orphans:
sections.append(f"- `{o}`")
else:
sections.append("未发现孤立模块。")
sections.append("")
# --- 统计摘要 ---
entry_count = sum(1 for t in trees if t.node_type == "entry")
task_count, loader_count = _count_tasks_and_loaders(trees)
orphan_count = len(orphans)
sections.append("## 统计摘要\n")
sections.append(f"| 指标 | 数量 |")
sections.append(f"|------|------|")
sections.append(f"| 入口点 | {entry_count} |")
sections.append(f"| 任务 | {task_count} |")
sections.append(f"| 加载器 | {loader_count} |")
sections.append(f"| 孤立模块 | {orphan_count} |")
sections.append("")
return "\n".join(sections)

View File

@@ -0,0 +1,449 @@
# -*- coding: utf-8 -*-
"""
文件清单分析器 — 对扫描结果进行用途分类和处置标签分配。
分类规则按优先级从高到低排列:
1. tmp/ 下所有文件 → 临时与调试 / 候选删除或候选归档
2. logs/、export/ 下的运行时产出 → 日志与输出 / 候选归档
3. *.lnk、*.rar 文件 → 其他 / 候选删除
4. 空目录 → 其他 / 候选删除
5. 核心代码目录tasks/ 等)→ 核心代码 / 保留
6. config/ → 配置 / 保留
7. database/*.sql、database/migrations/ → 数据库定义 / 保留
8. database/*.py → 核心代码 / 保留
9. tests/ → 测试 / 保留
10. docs/ → 文档 / 保留
11. scripts/ 下的 .py 文件 → 脚本工具 / 保留
12. gui/ → GUI / 保留
13. 构建与部署文件 → 构建与部署 / 保留
14. 其余 → 其他 / 待确认
"""
from __future__ import annotations
import os
from collections import Counter
from datetime import datetime, timezone
from itertools import groupby
from scripts.audit import Category, Disposition, FileEntry, InventoryItem
# ---------------------------------------------------------------------------
# 常量
# ---------------------------------------------------------------------------
# 核心代码顶层目录
_CORE_CODE_DIRS = (
"tasks/", "loaders/", "scd/", "orchestration/",
"quality/", "models/", "utils/", "api/",
)
# 构建与部署文件名(根目录级别)
_BUILD_DEPLOY_BASENAMES = {"setup.py", "build_exe.py"}
# 构建与部署扩展名
_BUILD_DEPLOY_EXTENSIONS = {".bat", ".sh", ".ps1"}
# ---------------------------------------------------------------------------
# 辅助函数
# ---------------------------------------------------------------------------
def _top_dir(rel_path: str) -> str:
"""返回相对路径的第一级目录名(含尾部斜杠),如 'tmp/foo.py''tmp/'"""
idx = rel_path.find("/")
if idx == -1:
return ""
return rel_path[: idx + 1]
def _basename(rel_path: str) -> str:
"""返回路径的最后一段文件名。"""
return rel_path.rsplit("/", 1)[-1]
def _is_init_py(rel_path: str) -> bool:
"""判断路径是否为 __init__.py。"""
return _basename(rel_path) == "__init__.py"
# ---------------------------------------------------------------------------
# classify — 核心分类函数
# ---------------------------------------------------------------------------
def classify(entry: FileEntry) -> InventoryItem:
"""根据路径、扩展名等规则对单个文件/目录进行分类和标签分配。
规则按优先级从高到低依次匹配,首个命中的规则决定分类和处置。
"""
path = entry.rel_path
top = _top_dir(path)
ext = entry.extension.lower()
base = _basename(path)
# --- 优先级 1: tmp/ 下所有文件 ---
if top == "tmp/" or path == "tmp":
return _classify_tmp(entry)
# --- 优先级 2: logs/、export/ 下的运行时产出 ---
if top in ("logs/", "export/") or path in ("logs", "export"):
return _classify_runtime_output(entry)
# --- 优先级 3: .lnk / .rar 文件 ---
if ext in (".lnk", ".rar"):
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.CANDIDATE_DELETE,
description=f"快捷方式/压缩包文件(`{ext}`),建议删除",
)
# --- 优先级 4: 空目录 ---
if entry.is_empty_dir:
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.CANDIDATE_DELETE,
description="空目录,建议删除",
)
# --- 优先级 5: 核心代码目录 ---
if any(path.startswith(d) or path + "/" == d for d in _CORE_CODE_DIRS):
return InventoryItem(
rel_path=path,
category=Category.CORE_CODE,
disposition=Disposition.KEEP,
description=f"核心代码(`{top.rstrip('/')}`",
)
# --- 优先级 6: config/ ---
if top == "config/" or path == "config":
return InventoryItem(
rel_path=path,
category=Category.CONFIG,
disposition=Disposition.KEEP,
description="配置文件",
)
# --- 优先级 7: database/*.sql 和 database/migrations/ ---
if top == "database/" or path == "database":
return _classify_database(entry)
# --- 优先级 8: tests/ ---
if top == "tests/" or path == "tests":
return InventoryItem(
rel_path=path,
category=Category.TEST,
disposition=Disposition.KEEP,
description="测试文件",
)
# --- 优先级 9: docs/ ---
if top == "docs/" or path == "docs":
return InventoryItem(
rel_path=path,
category=Category.DOCS,
disposition=Disposition.KEEP,
description="文档",
)
# --- 优先级 10: scripts/ 下的 .py 文件 ---
if top == "scripts/" or path == "scripts":
cat = Category.SCRIPTS
if ext == ".py" or entry.is_dir:
return InventoryItem(
rel_path=path,
category=cat,
disposition=Disposition.KEEP,
description="脚本工具",
)
return InventoryItem(
rel_path=path,
category=cat,
disposition=Disposition.NEEDS_REVIEW,
description="脚本目录下的非 Python 文件,需确认用途",
)
# --- 优先级 11: gui/ ---
if top == "gui/" or path == "gui":
return InventoryItem(
rel_path=path,
category=Category.GUI,
disposition=Disposition.KEEP,
description="GUI 模块",
)
# --- 优先级 12: 构建与部署 ---
if base in _BUILD_DEPLOY_BASENAMES or ext in _BUILD_DEPLOY_EXTENSIONS:
return InventoryItem(
rel_path=path,
category=Category.BUILD_DEPLOY,
disposition=Disposition.KEEP,
description="构建与部署文件",
)
# --- 优先级 13: cli/ ---
if top == "cli/" or path == "cli":
return InventoryItem(
rel_path=path,
category=Category.CORE_CODE,
disposition=Disposition.KEEP,
description="CLI 入口模块",
)
# --- 优先级 14: 已知根目录文件 ---
if "/" not in path:
return _classify_root_file(entry)
# --- 兜底 ---
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.NEEDS_REVIEW,
description="未匹配已知规则,需人工确认用途",
)
# ---------------------------------------------------------------------------
# 子分类函数
# ---------------------------------------------------------------------------
def _classify_tmp(entry: FileEntry) -> InventoryItem:
"""tmp/ 目录下的文件分类。
默认候选删除;有意义的 .py 文件标记为候选归档。
"""
ext = entry.extension.lower()
base = _basename(entry.rel_path)
# 空目录直接候选删除
if entry.is_empty_dir:
return InventoryItem(
rel_path=entry.rel_path,
category=Category.TEMP_DEBUG,
disposition=Disposition.CANDIDATE_DELETE,
description="临时目录下的空目录",
)
# .py 文件可能有参考价值 → 候选归档
if ext == ".py" and len(base) > 4:
return InventoryItem(
rel_path=entry.rel_path,
category=Category.TEMP_DEBUG,
disposition=Disposition.CANDIDATE_ARCHIVE,
description="临时 Python 脚本,可能有参考价值",
)
return InventoryItem(
rel_path=entry.rel_path,
category=Category.TEMP_DEBUG,
disposition=Disposition.CANDIDATE_DELETE,
description="临时/调试文件,建议删除",
)
def _classify_runtime_output(entry: FileEntry) -> InventoryItem:
"""logs/、export/ 目录下的运行时产出分类。
__init__.py 保留(包标记),其余候选归档。
"""
if _is_init_py(entry.rel_path):
return InventoryItem(
rel_path=entry.rel_path,
category=Category.LOG_OUTPUT,
disposition=Disposition.KEEP,
description="包初始化文件",
)
return InventoryItem(
rel_path=entry.rel_path,
category=Category.LOG_OUTPUT,
disposition=Disposition.CANDIDATE_ARCHIVE,
description="运行时产出,建议归档",
)
def _classify_database(entry: FileEntry) -> InventoryItem:
"""database/ 目录下的文件分类。"""
path = entry.rel_path
ext = entry.extension.lower()
# migrations/ 子目录
if "migrations/" in path or path.endswith("migrations"):
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.KEEP,
description="数据库迁移脚本",
)
# .sql 文件
if ext == ".sql":
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.KEEP,
description="数据库 DDL/DML 脚本",
)
# .py 文件 → 核心代码
if ext == ".py":
return InventoryItem(
rel_path=path,
category=Category.CORE_CODE,
disposition=Disposition.KEEP,
description="数据库操作模块",
)
# 目录本身
if entry.is_dir:
if entry.is_empty_dir:
return InventoryItem(
rel_path=path,
category=Category.OTHER,
disposition=Disposition.CANDIDATE_DELETE,
description="数据库目录下的空目录",
)
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.KEEP,
description="数据库子目录",
)
# 其他文件
return InventoryItem(
rel_path=path,
category=Category.DATABASE_DEF,
disposition=Disposition.NEEDS_REVIEW,
description="数据库目录下的非标准文件,需确认",
)
def _classify_root_file(entry: FileEntry) -> InventoryItem:
"""根目录散落文件的分类。"""
ext = entry.extension.lower()
base = _basename(entry.rel_path)
# 已知构建文件
if base in _BUILD_DEPLOY_BASENAMES or ext in _BUILD_DEPLOY_EXTENSIONS:
return InventoryItem(
rel_path=entry.rel_path,
category=Category.BUILD_DEPLOY,
disposition=Disposition.KEEP,
description="构建与部署文件",
)
# 已知配置文件
if base in (
"requirements.txt", "pytest.ini", ".env", ".env.example",
".gitignore", ".flake8", "pyproject.toml",
):
return InventoryItem(
rel_path=entry.rel_path,
category=Category.CONFIG,
disposition=Disposition.KEEP,
description="项目配置文件",
)
# README
if base.lower().startswith("readme"):
return InventoryItem(
rel_path=entry.rel_path,
category=Category.DOCS,
disposition=Disposition.KEEP,
description="项目说明文档",
)
# 其他根目录文件 → 待确认
return InventoryItem(
rel_path=entry.rel_path,
category=Category.OTHER,
disposition=Disposition.NEEDS_REVIEW,
description=f"根目录散落文件(`{base}`),需确认用途",
)
# ---------------------------------------------------------------------------
# build_inventory — 批量分类
# ---------------------------------------------------------------------------
def build_inventory(entries: list[FileEntry]) -> list[InventoryItem]:
"""对所有文件条目执行分类,返回清单列表。"""
return [classify(e) for e in entries]
# ---------------------------------------------------------------------------
# render_inventory_report — Markdown 渲染
# ---------------------------------------------------------------------------
def render_inventory_report(items: list[InventoryItem], repo_root: str) -> str:
"""生成 Markdown 格式的文件清单报告。
报告结构:
- 头部:标题、生成时间、仓库路径
- 主体:按 Category 分组的表格
- 尾部:统计摘要
"""
lines: list[str] = []
# --- 头部 ---
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
lines.append("# 文件清单报告")
lines.append("")
lines.append(f"- 生成时间:{now}")
lines.append(f"- 仓库路径:`{repo_root}`")
lines.append("")
# --- 按分类分组 ---
# 保持 Category 枚举定义顺序
cat_order = {c: i for i, c in enumerate(Category)}
sorted_items = sorted(items, key=lambda it: cat_order[it.category])
for cat, group in groupby(sorted_items, key=lambda it: it.category):
group_list = list(group)
lines.append(f"## {cat.value}")
lines.append("")
lines.append("| 相对路径 | 处置标签 | 简要说明 |")
lines.append("|---|---|---|")
for item in group_list:
lines.append(
f"| `{item.rel_path}` | {item.disposition.value} | {item.description} |"
)
lines.append("")
# --- 统计摘要 ---
lines.append("## 统计摘要")
lines.append("")
# 各分类计数
cat_counter: Counter[Category] = Counter()
disp_counter: Counter[Disposition] = Counter()
for item in items:
cat_counter[item.category] += 1
disp_counter[item.disposition] += 1
lines.append("### 按用途分类")
lines.append("")
lines.append("| 分类 | 数量 |")
lines.append("|---|---|")
for cat in Category:
count = cat_counter.get(cat, 0)
if count > 0:
lines.append(f"| {cat.value} | {count} |")
lines.append("")
lines.append("### 按处置标签")
lines.append("")
lines.append("| 标签 | 数量 |")
lines.append("|---|---|")
for disp in Disposition:
count = disp_counter.get(disp, 0)
if count > 0:
lines.append(f"| {disp.value} | {count} |")
lines.append("")
lines.append(f"**总计:{len(items)} 个条目**")
lines.append("")
return "\n".join(lines)

View File

@@ -0,0 +1,255 @@
# -*- coding: utf-8 -*-
"""
审计主入口 — 依次调用扫描器和三个分析器,生成三份报告到 docs/audit/repo/。
仅在 docs/audit/repo/ 目录下创建文件,不修改仓库中的任何现有文件。
"""
from __future__ import annotations
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from scripts.audit.scanner import scan_repo
from scripts.audit.inventory_analyzer import (
build_inventory,
render_inventory_report,
)
from scripts.audit.flow_analyzer import (
build_flow_tree,
discover_entry_points,
find_orphan_modules,
render_flow_report,
)
from scripts.audit.doc_alignment_analyzer import (
build_mappings,
check_api_samples_vs_parsers,
check_ddl_vs_dictionary,
find_undocumented_modules,
render_alignment_report,
scan_docs,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 仓库根目录自动检测
# ---------------------------------------------------------------------------
def _detect_repo_root() -> Path:
"""从当前文件向上查找仓库根目录。
判断依据:包含 cli/ 目录或 .git/ 目录的祖先目录。
"""
current = Path(__file__).resolve().parent
for parent in (current, *current.parents):
if (parent / "cli").is_dir() or (parent / ".git").is_dir():
return parent
# 回退:假设 scripts/audit/ 在仓库根目录下
return current.parent.parent
# ---------------------------------------------------------------------------
# 报告输出目录
# ---------------------------------------------------------------------------
def _ensure_report_dir(repo_root: Path) -> Path:
"""检查并创建 docs/audit/repo/ 目录。
如果目录已存在则直接返回;不存在则创建。
创建失败时抛出 RuntimeError因为无法输出报告
"""
audit_dir = repo_root / "docs" / "audit" / "repo"
if audit_dir.is_dir():
return audit_dir
try:
audit_dir.mkdir(parents=True, exist_ok=True)
except OSError as exc:
raise RuntimeError(f"无法创建报告输出目录 {audit_dir}: {exc}") from exc
logger.info("已创建报告输出目录: %s", audit_dir)
return audit_dir
# ---------------------------------------------------------------------------
# 报告头部元信息注入
# ---------------------------------------------------------------------------
_HEADER_PATTERN = re.compile(r"生成时间[:]")
_ISO_TS_PATTERN = re.compile(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z")
# 匹配非 ISO 格式的时间戳行,用于替换
_NON_ISO_TS_LINE = re.compile(
r"([-*]\s*生成时间[:]\s*)\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}"
)
def _inject_header(report: str, timestamp: str, repo_path: str) -> str:
"""确保报告头部包含 ISO 格式时间戳和仓库路径。
- 已有 ISO 时间戳 → 不修改
- 有非 ISO 时间戳 → 替换为 ISO 格式
- 无头部 → 在标题后注入
"""
if _HEADER_PATTERN.search(report):
# 已有头部——检查时间戳格式是否为 ISO
if _ISO_TS_PATTERN.search(report):
return report
# 非 ISO 格式 → 替换时间戳
report = _NON_ISO_TS_LINE.sub(
lambda m: m.group(1) + timestamp, report,
)
# 同时确保仓库路径使用统一值(用 lambda 避免反斜杠转义问题)
safe_path = repo_path
report = re.sub(
r"([-*]\s*仓库路径[:]\s*)`[^`]*`",
lambda m: m.group(1) + "`" + safe_path + "`",
report,
)
return report
# 无头部 → 在第一个标题行之后插入
lines = report.split("\n")
insert_idx = 1
for i, line in enumerate(lines):
if line.startswith("# "):
insert_idx = i + 1
break
header_lines = [
"",
f"- 生成时间: {timestamp}",
f"- 仓库路径: `{repo_path}`",
"",
]
lines[insert_idx:insert_idx] = header_lines
return "\n".join(lines)
# ---------------------------------------------------------------------------
# 主函数
# ---------------------------------------------------------------------------
def run_audit(repo_root: Path | None = None) -> None:
"""执行完整审计流程,生成三份报告到 docs/audit/repo/。
Parameters
----------
repo_root : Path | None
仓库根目录。为 None 时自动检测。
"""
# 1. 确定仓库根目录
if repo_root is None:
repo_root = _detect_repo_root()
repo_root = repo_root.resolve()
repo_path_str = str(repo_root)
logger.info("审计开始 — 仓库路径: %s", repo_path_str)
# 2. 检查/创建输出目录
audit_dir = _ensure_report_dir(repo_root)
# 3. 生成 UTC 时间戳(所有报告共用)
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# 4. 扫描仓库
logger.info("正在扫描仓库文件...")
entries = scan_repo(repo_root)
logger.info("扫描完成,共 %d 个条目", len(entries))
# 5. 文件清单报告
logger.info("正在生成文件清单报告...")
try:
inventory_items = build_inventory(entries)
inventory_report = render_inventory_report(inventory_items, repo_path_str)
inventory_report = _inject_header(inventory_report, timestamp, repo_path_str)
(audit_dir / "file_inventory.md").write_text(
inventory_report, encoding="utf-8",
)
logger.info("文件清单报告已写入: file_inventory.md")
except Exception:
logger.exception("生成文件清单报告时出错")
# 6. 流程树报告
logger.info("正在生成流程树报告...")
try:
entry_points = discover_entry_points(repo_root)
trees = []
reachable: set[str] = set()
for ep in entry_points:
ep_file = ep["file"]
# 批处理文件不构建流程树
if not ep_file.endswith(".py"):
continue
tree = build_flow_tree(repo_root, ep_file)
trees.append(tree)
# 收集可达模块
_collect_reachable(tree, reachable)
orphans = find_orphan_modules(repo_root, entries, reachable)
flow_report = render_flow_report(trees, orphans, repo_path_str)
flow_report = _inject_header(flow_report, timestamp, repo_path_str)
(audit_dir / "flow_tree.md").write_text(
flow_report, encoding="utf-8",
)
logger.info("流程树报告已写入: flow_tree.md")
except Exception:
logger.exception("生成流程树报告时出错")
# 7. 文档对齐报告
logger.info("正在生成文档对齐报告...")
try:
doc_paths = scan_docs(repo_root)
mappings = build_mappings(doc_paths, repo_root)
issues = []
issues.extend(check_ddl_vs_dictionary(repo_root))
issues.extend(check_api_samples_vs_parsers(repo_root))
# 缺失文档检测
documented: set[str] = set()
for m in mappings:
documented.update(m.related_code)
undoc_modules = find_undocumented_modules(repo_root, documented)
from scripts.audit import AlignmentIssue
for mod in undoc_modules:
issues.append(AlignmentIssue(
doc_path="",
issue_type="missing",
description=f"核心代码模块 `{mod}` 缺少对应文档",
related_code=mod,
))
alignment_report = render_alignment_report(mappings, issues, repo_path_str)
alignment_report = _inject_header(alignment_report, timestamp, repo_path_str)
(audit_dir / "doc_alignment.md").write_text(
alignment_report, encoding="utf-8",
)
logger.info("文档对齐报告已写入: doc_alignment.md")
except Exception:
logger.exception("生成文档对齐报告时出错")
logger.info("审计完成 — 报告输出目录: %s", audit_dir)
# ---------------------------------------------------------------------------
# 辅助:收集可达模块
# ---------------------------------------------------------------------------
def _collect_reachable(node, reachable: set[str]) -> None:
"""递归收集流程树中所有节点的 source_file。"""
reachable.add(node.source_file)
for child in node.children:
_collect_reachable(child, reachable)
# ---------------------------------------------------------------------------
# 入口
# ---------------------------------------------------------------------------
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
run_audit()

View File

@@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
"""
仓库扫描器 — 递归遍历仓库文件系统,返回结构化的文件元信息。
仅执行只读操作:读取文件元信息(大小、类型),不修改任何文件。
遇到权限错误时跳过并记录日志,不中断扫描流程。
"""
from __future__ import annotations
import fnmatch
import logging
from pathlib import Path
from scripts.audit import FileEntry
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# 排除模式
# ---------------------------------------------------------------------------
EXCLUDED_PATTERNS: list[str] = [
".git",
"__pycache__",
".pytest_cache",
"*.pyc",
".kiro",
]
# ---------------------------------------------------------------------------
# 排除匹配逻辑
# ---------------------------------------------------------------------------
def _is_excluded(name: str, patterns: list[str]) -> bool:
"""判断文件/目录名是否匹配任一排除模式。
支持两种模式:
- 精确匹配(如 ".git""__pycache__"
- 通配符匹配(如 "*.pyc"),使用 fnmatch 语义
"""
for pat in patterns:
if fnmatch.fnmatch(name, pat):
return True
return False
# ---------------------------------------------------------------------------
# 递归遍历
# ---------------------------------------------------------------------------
def _walk(
root: Path,
base: Path,
exclude: list[str],
results: list[FileEntry],
) -> None:
"""递归遍历 *root* 下的文件和目录,将结果追加到 *results*。
Parameters
----------
root : Path
当前要遍历的目录。
base : Path
仓库根目录,用于计算相对路径。
exclude : list[str]
排除模式列表。
results : list[FileEntry]
收集结果的列表(就地修改)。
"""
try:
children = sorted(root.iterdir(), key=lambda p: p.name)
except (PermissionError, OSError) as exc:
logger.warning("无法读取目录 %s: %s", root, exc)
return
# 用于判断当前目录是否为"空目录"(排除后无可见子项)
visible_count = 0
for child in children:
if _is_excluded(child.name, exclude):
continue
visible_count += 1
rel = child.relative_to(base).as_posix()
if child.is_dir():
# 先递归子目录,再判断该目录是否为空
sub_start = len(results)
_walk(child, base, exclude, results)
sub_end = len(results)
# 该目录下递归产生的条目数为 0 → 空目录
is_empty = (sub_end == sub_start)
results.append(FileEntry(
rel_path=rel,
is_dir=True,
size_bytes=0,
extension="",
is_empty_dir=is_empty,
))
else:
# 文件
try:
size = child.stat().st_size
except (PermissionError, OSError) as exc:
logger.warning("无法获取文件信息 %s: %s", child, exc)
continue
results.append(FileEntry(
rel_path=rel,
is_dir=False,
size_bytes=size,
extension=child.suffix.lower(),
is_empty_dir=False,
))
# 如果 root 是仓库根目录自身,不需要额外处理
# (根目录不作为条目出现在结果中)
def scan_repo(
root: Path,
exclude: list[str] | None = None,
) -> list[FileEntry]:
"""递归扫描仓库,返回所有文件和目录的元信息列表。
Parameters
----------
root : Path
仓库根目录路径。
exclude : list[str] | None
排除模式列表,默认使用 EXCLUDED_PATTERNS。
Returns
-------
list[FileEntry]
按 rel_path 排序的文件/目录元信息列表。
"""
if exclude is None:
exclude = EXCLUDED_PATTERNS
results: list[FileEntry] = []
_walk(root, root, exclude, results)
# 按相对路径排序,保证输出稳定
results.sort(key=lambda e: e.rel_path)
return results