171 lines
5.0 KiB
Python
171 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
||
"""file_baseline — 基于文件 mtime+size 的独立基线快照系统。
|
||
|
||
不依赖 git commit 历史,通过扫描工作区文件的 (mtime, size) 指纹,
|
||
在 promptSubmit 和 agentStop 之间精确检测"本次对话期间"的文件变更。
|
||
|
||
用法:
|
||
from file_baseline import scan_workspace, diff_baselines, save_baseline, load_baseline
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
from typing import TypedDict
|
||
|
||
BASELINE_PATH = os.path.join(".kiro", "state", ".file_baseline.json")
|
||
|
||
# 扫描时排除的目录(与 .gitignore 对齐 + 额外排除)
|
||
EXCLUDE_DIRS = {
|
||
".git", ".venv", "venv", "ENV", "env",
|
||
"node_modules", "__pycache__", ".hypothesis", ".pytest_cache",
|
||
".idea", ".vscode", ".specstory",
|
||
"build", "dist", "eggs", ".eggs",
|
||
"export", "reports", "tmp",
|
||
"htmlcov", ".coverage",
|
||
# Kiro 运行时状态不参与业务变更检测
|
||
".kiro",
|
||
}
|
||
|
||
# 扫描时排除的文件后缀
|
||
EXCLUDE_SUFFIXES = {
|
||
".pyc", ".pyo", ".pyd", ".so", ".egg", ".whl",
|
||
".log", ".jsonl", ".lnk",
|
||
".swp", ".swo",
|
||
}
|
||
|
||
# 扫描时排除的文件名模式
|
||
EXCLUDE_NAMES = {
|
||
".DS_Store", "Thumbs.db", "desktop.ini",
|
||
}
|
||
|
||
# 业务目录白名单(只扫描这些顶层目录 + 根目录散文件)
|
||
# 这样可以避免扫描 .vite/deps 等深层缓存目录
|
||
SCAN_ROOTS = [
|
||
"apps",
|
||
"packages",
|
||
"db",
|
||
"docs",
|
||
"scripts",
|
||
"tests",
|
||
]
|
||
|
||
|
||
class FileEntry(TypedDict):
|
||
mtime: float
|
||
size: int
|
||
|
||
|
||
class DiffResult(TypedDict):
|
||
added: list[str]
|
||
modified: list[str]
|
||
deleted: list[str]
|
||
|
||
|
||
def _should_exclude_dir(dirname: str) -> bool:
|
||
"""判断目录是否应排除"""
|
||
return dirname in EXCLUDE_DIRS or dirname.startswith(".")
|
||
|
||
|
||
def _should_exclude_file(filename: str) -> bool:
|
||
"""判断文件是否应排除"""
|
||
if filename in EXCLUDE_NAMES:
|
||
return True
|
||
_, ext = os.path.splitext(filename)
|
||
if ext.lower() in EXCLUDE_SUFFIXES:
|
||
return True
|
||
return False
|
||
|
||
|
||
def scan_workspace(root: str = ".") -> dict[str, FileEntry]:
|
||
"""扫描工作区,返回 {相对路径: {mtime, size}} 字典。
|
||
|
||
只扫描 SCAN_ROOTS 中的目录 + 根目录下的散文件,
|
||
跳过 EXCLUDE_DIRS / EXCLUDE_SUFFIXES / EXCLUDE_NAMES。
|
||
"""
|
||
result: dict[str, FileEntry] = {}
|
||
|
||
# 1. 根目录散文件(pyproject.toml, .env 等)
|
||
try:
|
||
for entry in os.scandir(root):
|
||
if entry.is_file(follow_symlinks=False):
|
||
if _should_exclude_file(entry.name):
|
||
continue
|
||
try:
|
||
st = entry.stat(follow_symlinks=False)
|
||
rel = entry.name.replace("\\", "/")
|
||
result[rel] = {"mtime": st.st_mtime, "size": st.st_size}
|
||
except OSError:
|
||
continue
|
||
except OSError:
|
||
pass
|
||
|
||
# 2. 业务目录递归扫描
|
||
for scan_root in SCAN_ROOTS:
|
||
top = os.path.join(root, scan_root)
|
||
if not os.path.isdir(top):
|
||
continue
|
||
for dirpath, dirnames, filenames in os.walk(top):
|
||
# 原地修改 dirnames 以跳过排除目录
|
||
dirnames[:] = [
|
||
d for d in dirnames
|
||
if not _should_exclude_dir(d)
|
||
]
|
||
for fname in filenames:
|
||
if _should_exclude_file(fname):
|
||
continue
|
||
full = os.path.join(dirpath, fname)
|
||
try:
|
||
st = os.stat(full)
|
||
rel = os.path.relpath(full, root).replace("\\", "/")
|
||
result[rel] = {"mtime": st.st_mtime, "size": st.st_size}
|
||
except OSError:
|
||
continue
|
||
|
||
return result
|
||
|
||
|
||
def diff_baselines(
|
||
before: dict[str, FileEntry],
|
||
after: dict[str, FileEntry],
|
||
) -> DiffResult:
|
||
"""对比两次快照,返回 added/modified/deleted 列表。"""
|
||
before_keys = set(before.keys())
|
||
after_keys = set(after.keys())
|
||
|
||
added = sorted(after_keys - before_keys)
|
||
deleted = sorted(before_keys - after_keys)
|
||
|
||
modified = []
|
||
for path in sorted(before_keys & after_keys):
|
||
b = before[path]
|
||
a = after[path]
|
||
# mtime 或 size 任一变化即视为修改
|
||
if b["mtime"] != a["mtime"] or b["size"] != a["size"]:
|
||
modified.append(path)
|
||
|
||
return {"added": added, "modified": modified, "deleted": deleted}
|
||
|
||
|
||
def save_baseline(data: dict[str, FileEntry], path: str = BASELINE_PATH):
|
||
"""保存基线快照到 JSON 文件。"""
|
||
os.makedirs(os.path.dirname(path) or ".kiro", exist_ok=True)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False)
|
||
|
||
|
||
def load_baseline(path: str = BASELINE_PATH) -> dict[str, FileEntry]:
|
||
"""加载基线快照,文件不存在返回空字典。"""
|
||
if not os.path.isfile(path):
|
||
return {}
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def total_changes(diff: DiffResult) -> int:
|
||
"""变更文件总数"""
|
||
return len(diff["added"]) + len(diff["modified"]) + len(diff["deleted"])
|