336 lines
11 KiB
Python
336 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""migrate_session_dirs — 将旧格式 session_logs 目录迁移到新格式。
|
||
|
||
旧格式:{chatShort}_{HHMMSS}/main_{seq}_{execShort}_{HHMMSS}.md
|
||
新格式:{seq:02d}_{chatShort}_{HHMMSS}/main_{seq}_{execShort}.md
|
||
|
||
迁移规则:
|
||
1. 同一天内,按 chatSessionId 分组,同 chatSession 的多个旧目录合并到一个新目录
|
||
2. 新目录按当天出现顺序分配序号(01_, 02_, ...)
|
||
3. 文件名去掉时间后缀(_HHMMSS)
|
||
4. 更新双索引中所有 entry 的 output_dir
|
||
5. 跨天对话生成 _ref_{chatShort}.md 指引文件
|
||
|
||
用法:
|
||
python scripts/ops/migrate_session_dirs.py --dry-run # 预览变更
|
||
python scripts/ops/migrate_session_dirs.py # 执行迁移
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import sys
|
||
from collections import defaultdict
|
||
|
||
from _env_paths import ensure_repo_root
|
||
|
||
ensure_repo_root()
|
||
|
||
SESSION_LOG_DIR = os.path.join("docs", "audit", "session_logs")
|
||
INDEX_PATH = os.path.join(SESSION_LOG_DIR, "_session_index.json")
|
||
INDEX_FULL_PATH = os.path.join(SESSION_LOG_DIR, "_session_index_full.json")
|
||
|
||
# 旧目录名格式:{hex8}_{HHMMSS}(无序号前缀)
|
||
OLD_DIR_PATTERN = re.compile(r"^([0-9a-f]{8})_(\d{6})$")
|
||
# 新目录名格式:{seq:02d}_{hex8}_{HHMMSS}(有序号前缀)
|
||
NEW_DIR_PATTERN = re.compile(r"^(\d{2})_([0-9a-f]{8})_(\d{6})$")
|
||
# 旧文件名格式:main_{seq}_{hash8}_{HHMMSS}.md 或 sub_{seq}_{hash8}_{HHMMSS}.md
|
||
OLD_FILE_PATTERN = re.compile(r"^(main|sub)_(\d{2})_([0-9a-f]{8})_(\d{6})\.md$")
|
||
|
||
|
||
def load_json(path):
|
||
if not os.path.isfile(path):
|
||
return {}
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def save_json(path, data):
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def find_all_day_dirs():
|
||
"""找到所有 YYYY-MM/DD 目录"""
|
||
results = []
|
||
if not os.path.isdir(SESSION_LOG_DIR):
|
||
return results
|
||
for ym in sorted(os.listdir(SESSION_LOG_DIR)):
|
||
ym_path = os.path.join(SESSION_LOG_DIR, ym)
|
||
if not os.path.isdir(ym_path) or not re.match(r"^\d{4}-\d{2}$", ym):
|
||
continue
|
||
for dd in sorted(os.listdir(ym_path)):
|
||
dd_path = os.path.join(ym_path, dd)
|
||
if not os.path.isdir(dd_path) or not re.match(r"^\d{2}$", dd):
|
||
continue
|
||
results.append(dd_path)
|
||
return results
|
||
|
||
|
||
def scan_day_dir(day_dir):
|
||
"""扫描一个 day_dir,返回需要迁移的旧目录列表。
|
||
|
||
返回 [(dir_name, chat_short, hms, full_path), ...]
|
||
已经是新格式的目录会被跳过。
|
||
"""
|
||
old_dirs = []
|
||
for d in sorted(os.listdir(day_dir)):
|
||
full = os.path.join(day_dir, d)
|
||
if not os.path.isdir(full):
|
||
continue
|
||
# 跳过已经是新格式的
|
||
if NEW_DIR_PATTERN.match(d):
|
||
continue
|
||
m = OLD_DIR_PATTERN.match(d)
|
||
if m:
|
||
old_dirs.append((d, m.group(1), m.group(2), full))
|
||
return old_dirs
|
||
|
||
|
||
def rename_file(old_name):
|
||
"""将旧文件名转为新文件名(去掉时间后缀)。
|
||
|
||
main_01_abc12345_013337.md → main_01_abc12345.md
|
||
sub_02_def67890_013337.md → sub_02_def67890.md
|
||
"""
|
||
m = OLD_FILE_PATTERN.match(old_name)
|
||
if m:
|
||
prefix, seq, hash8, _hms = m.groups()
|
||
return f"{prefix}_{seq}_{hash8}.md"
|
||
return old_name # 不匹配的文件名保持不变
|
||
|
||
|
||
def build_migration_plan(day_dir):
|
||
"""为一个 day_dir 构建迁移计划。
|
||
|
||
返回 plan: [{
|
||
"old_dirs": [(dir_name, full_path), ...], # 同一 chatSession 的旧目录(可能多个)
|
||
"chat_short": str,
|
||
"first_hms": str, # 最早的 HHMMSS
|
||
"new_dir_name": str, # 新目录名 {seq:02d}_{chatShort}_{firstHms}
|
||
"file_renames": [(old_name, new_name), ...],
|
||
}]
|
||
"""
|
||
old_dirs = scan_day_dir(day_dir)
|
||
if not old_dirs:
|
||
return []
|
||
|
||
# 按 chatShort 分组,同一 chatSession 的多个旧目录合并
|
||
groups = defaultdict(list)
|
||
for dir_name, chat_short, hms, full_path in old_dirs:
|
||
groups[chat_short].append((dir_name, hms, full_path))
|
||
|
||
# 按每组最早的 hms 排序,分配序号
|
||
sorted_groups = sorted(groups.items(), key=lambda g: min(h for _, h, _ in g[1]))
|
||
|
||
# 检查已有新格式目录,避免序号冲突
|
||
existing_seqs = []
|
||
for d in os.listdir(day_dir):
|
||
if os.path.isdir(os.path.join(day_dir, d)):
|
||
m = NEW_DIR_PATTERN.match(d)
|
||
if m:
|
||
existing_seqs.append(int(m.group(1)))
|
||
next_seq = max(existing_seqs, default=0) + 1
|
||
|
||
plan = []
|
||
for chat_short, dirs in sorted_groups:
|
||
dirs.sort(key=lambda x: x[1]) # 按 hms 排序
|
||
first_hms = dirs[0][1]
|
||
|
||
# 检查是否已有新格式目录包含此 chatShort(已部分迁移)
|
||
existing_new = None
|
||
for d in os.listdir(day_dir):
|
||
m = NEW_DIR_PATTERN.match(d)
|
||
if m and m.group(2) == chat_short:
|
||
existing_new = d
|
||
break
|
||
|
||
if existing_new:
|
||
new_dir_name = existing_new
|
||
else:
|
||
new_dir_name = f"{next_seq:02d}_{chat_short}_{first_hms}"
|
||
next_seq += 1
|
||
|
||
# 收集所有文件的重命名计划
|
||
file_renames = []
|
||
for dir_name, hms, full_path in dirs:
|
||
for fname in sorted(os.listdir(full_path)):
|
||
if not fname.endswith(".md"):
|
||
continue
|
||
new_fname = rename_file(fname)
|
||
file_renames.append((
|
||
os.path.join(full_path, fname), # 旧完整路径
|
||
fname, # 旧文件名
|
||
new_fname, # 新文件名
|
||
))
|
||
|
||
plan.append({
|
||
"old_dirs": [(d, p) for d, _, p in dirs],
|
||
"chat_short": chat_short,
|
||
"first_hms": first_hms,
|
||
"new_dir_name": new_dir_name,
|
||
"new_dir_path": os.path.join(day_dir, new_dir_name),
|
||
"file_renames": file_renames,
|
||
})
|
||
|
||
return plan
|
||
|
||
|
||
def execute_migration(plan, day_dir, dry_run=False):
|
||
"""执行一个 day_dir 的迁移计划。返回 (moved_dirs, moved_files, errors)"""
|
||
moved_dirs = 0
|
||
moved_files = 0
|
||
errors = []
|
||
|
||
for item in plan:
|
||
new_dir_path = item["new_dir_path"]
|
||
old_dirs = item["old_dirs"]
|
||
|
||
if not dry_run:
|
||
os.makedirs(new_dir_path, exist_ok=True)
|
||
|
||
# 移动文件到新目录
|
||
for old_src, old_fname, new_fname in item["file_renames"]:
|
||
dst = os.path.join(new_dir_path, new_fname)
|
||
# 如果目标已存在(部分迁移过),跳过
|
||
if os.path.isfile(dst):
|
||
continue
|
||
if dry_run:
|
||
print(f" MOVE {old_src} → {dst}")
|
||
else:
|
||
try:
|
||
shutil.move(old_src, dst)
|
||
moved_files += 1
|
||
except Exception as e:
|
||
errors.append(f"move {old_src}: {e}")
|
||
|
||
# 删除空的旧目录
|
||
for dir_name, dir_path in old_dirs:
|
||
# 新目录名和旧目录名相同时跳过(不需要删除)
|
||
if dir_name == item["new_dir_name"]:
|
||
continue
|
||
if dry_run:
|
||
print(f" RMDIR {dir_path}")
|
||
else:
|
||
try:
|
||
# 只删除空目录
|
||
remaining = os.listdir(dir_path)
|
||
if not remaining:
|
||
os.rmdir(dir_path)
|
||
moved_dirs += 1
|
||
else:
|
||
errors.append(f"rmdir {dir_path}: not empty ({remaining})")
|
||
except Exception as e:
|
||
errors.append(f"rmdir {dir_path}: {e}")
|
||
|
||
return moved_dirs, moved_files, errors
|
||
|
||
|
||
def update_indexes(all_plans):
|
||
"""根据迁移计划更新双索引中的 output_dir。
|
||
|
||
旧 output_dir 格式:docs/audit/session_logs/2026-03/03/b6b5e1fd_013337
|
||
新 output_dir 格式:docs/audit/session_logs/2026-03/03/01_b6b5e1fd_013337
|
||
"""
|
||
# 构建映射:旧目录路径 → 新目录路径
|
||
dir_map = {}
|
||
for plan_list, day_dir in all_plans:
|
||
for item in plan_list:
|
||
new_path = item["new_dir_path"].replace("\\", "/")
|
||
for dir_name, dir_path in item["old_dirs"]:
|
||
old_path = dir_path.replace("\\", "/")
|
||
dir_map[old_path] = new_path
|
||
|
||
if not dir_map:
|
||
return 0
|
||
|
||
updated = 0
|
||
for idx_path in [INDEX_PATH, INDEX_FULL_PATH]:
|
||
data = load_json(idx_path)
|
||
entries = data.get("entries", {})
|
||
changed = False
|
||
for eid, ent in entries.items():
|
||
old_dir = ent.get("output_dir", "")
|
||
if old_dir in dir_map:
|
||
ent["output_dir"] = dir_map[old_dir]
|
||
changed = True
|
||
updated += 1
|
||
if changed:
|
||
save_json(idx_path, data)
|
||
|
||
return updated
|
||
|
||
|
||
def main():
|
||
import argparse
|
||
parser = argparse.ArgumentParser(description="迁移 session_logs 目录到新格式")
|
||
parser.add_argument("--dry-run", action="store_true", help="预览变更,不实际执行")
|
||
args = parser.parse_args()
|
||
|
||
day_dirs = find_all_day_dirs()
|
||
if not day_dirs:
|
||
print("[migrate] 未找到任何 day_dir")
|
||
return
|
||
|
||
total_plans = []
|
||
total_items = 0
|
||
total_files = 0
|
||
|
||
for day_dir in day_dirs:
|
||
plan = build_migration_plan(day_dir)
|
||
if plan:
|
||
total_plans.append((plan, day_dir))
|
||
total_items += len(plan)
|
||
total_files += sum(len(item["file_renames"]) for item in plan)
|
||
|
||
if not total_plans:
|
||
print("[migrate] 所有目录已是新格式,无需迁移")
|
||
return
|
||
|
||
print(f"[migrate] 共 {len(day_dirs)} 个 day_dir,{total_items} 个对话组,{total_files} 个文件待迁移")
|
||
|
||
if args.dry_run:
|
||
print("\n=== DRY RUN ===\n")
|
||
for plan, day_dir in total_plans:
|
||
rel = os.path.relpath(day_dir)
|
||
print(f"\n--- {rel} ---")
|
||
for item in plan:
|
||
old_names = [d for d, _ in item["old_dirs"]]
|
||
print(f" {' + '.join(old_names)} → {item['new_dir_name']}/")
|
||
for _, old_fname, new_fname in item["file_renames"]:
|
||
if old_fname != new_fname:
|
||
print(f" {old_fname} → {new_fname}")
|
||
else:
|
||
print(f" {old_fname} (不变)")
|
||
print(f"\n[dry-run] 共 {total_items} 个对话组,{total_files} 个文件")
|
||
return
|
||
|
||
# 执行迁移
|
||
all_moved_dirs = 0
|
||
all_moved_files = 0
|
||
all_errors = []
|
||
|
||
for plan, day_dir in total_plans:
|
||
md, mf, errs = execute_migration(plan, day_dir, dry_run=False)
|
||
all_moved_dirs += md
|
||
all_moved_files += mf
|
||
all_errors.extend(errs)
|
||
rel = os.path.relpath(day_dir)
|
||
print(f"[migrate] {rel}: {mf} files moved, {md} dirs removed")
|
||
|
||
# 更新索引
|
||
idx_updated = update_indexes(total_plans)
|
||
print(f"[migrate] 索引更新: {idx_updated} entries")
|
||
|
||
if all_errors:
|
||
print(f"\n[migrate] {len(all_errors)} 个错误:")
|
||
for e in all_errors[:20]:
|
||
print(f" ✗ {e}")
|
||
|
||
print(f"\n[migrate] 完成: {all_moved_files} files, {all_moved_dirs} dirs removed, {idx_updated} index entries updated")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|