# -*- coding: utf-8 -*- """ETL 输出目录清理服务 遍历 EXPORT_ROOT 下每个任务文件夹,按目录名中的时间戳排序, 只保留最近 N 个运行记录,其余永久删除。 CHANGE 2026-03-27 | 新增:执行前自动清理输出目录,每类任务只保留最近 10 个运行记录 """ from __future__ import annotations import logging import os import re import shutil from pathlib import Path logger = logging.getLogger(__name__) # 运行记录目录命名格式:{TASK_CODE}-{run_id}-{YYYYMMDD}-{HHMMSS} # 按最后两段(日期-时间)排序 _RUN_DIR_PATTERN = re.compile(r"^.+-(\d{8})-(\d{6})$") def _get_export_root() -> Path: """从环境变量读取 EXPORT_ROOT,缺失时报错。""" val = os.environ.get("EXPORT_ROOT") if not val: raise RuntimeError( "环境变量 EXPORT_ROOT 未设置,无法执行输出目录清理。" "请在 .env 中配置 EXPORT_ROOT。" ) p = Path(val) if not p.is_dir(): raise RuntimeError(f"EXPORT_ROOT 路径不存在或不是目录: {p}") return p def _sort_key(dirname: str) -> tuple[str, str]: """从目录名提取排序键(日期, 时间),越大越新。""" m = _RUN_DIR_PATTERN.match(dirname) if m: return (m.group(1), m.group(2)) # 不匹配格式的目录排到最前面(最旧),优先被清理 return ("00000000", "000000") def cleanup_output_dirs(keep: int = 10) -> dict: """清理 EXPORT_ROOT 下每个任务文件夹,只保留最近 keep 个运行记录。 Returns: 清理结果摘要 dict,包含 task_folders_scanned / dirs_deleted / errors """ export_root = _get_export_root() total_scanned = 0 total_deleted = 0 errors: list[str] = [] for task_dir in sorted(export_root.iterdir()): if not task_dir.is_dir(): continue total_scanned += 1 # 列出所有子目录(运行记录) run_dirs = [d for d in task_dir.iterdir() if d.is_dir()] if len(run_dirs) <= keep: continue # 按时间戳降序排列,保留前 keep 个 run_dirs.sort(key=lambda d: _sort_key(d.name), reverse=True) to_delete = run_dirs[keep:] for d in to_delete: try: shutil.rmtree(d) total_deleted += 1 except Exception as exc: msg = f"删除失败 {d}: {exc}" logger.warning(msg) errors.append(msg) logger.info( "输出目录清理完成: 扫描 %d 个任务文件夹, 删除 %d 个运行记录, %d 个错误", total_scanned, total_deleted, len(errors), ) return { "task_folders_scanned": total_scanned, "dirs_deleted": total_deleted, "errors": errors, }