""" 逐个重跑失败的 DWS/INDEX 任务(每个任务独立子进程,避免级联失败)。 参数与集成测试一致。 """ import os import sys import subprocess import time from pathlib import Path from datetime import datetime from dotenv import load_dotenv from zoneinfo import ZoneInfo load_dotenv(Path(__file__).resolve().parents[2] / ".env") TZ = ZoneInfo("Asia/Shanghai") ETL_CWD = Path(__file__).resolve().parents[2] / "apps" / "etl" / "connectors" / "feiqiu" FAILED_TASKS = [ "DWS_MEMBER_VISIT", "DWS_MEMBER_CONSUMPTION", "DWS_FINANCE_DAILY", "DWS_FINANCE_RECHARGE", "DWS_FINANCE_INCOME_STRUCTURE", "DWS_FINANCE_DISCOUNT_DETAIL", "DWS_ASSISTANT_MONTHLY", "DWS_ASSISTANT_FINANCE", "DWS_WINBACK_INDEX", "DWS_NEWCONV_INDEX", "DWS_RELATION_INDEX", "DWS_SPENDING_POWER_INDEX", ] COMMON_ARGS = [ "--window-start", "2025-11-01 00:00:00", "--window-end", "2026-02-27 00:00:00", "--window-split-days", "30", "--force-full", ] def run_single_task(task: str) -> dict: """单独运行一个任务,返回结果""" layer = "INDEX" if task.endswith("_INDEX") else "DWS" cmd = [ "uv", "run", "--package", "etl-feiqiu", "python", "-m", "cli.main", "--layers", layer, "--tasks", task, *COMMON_ARGS, ] start = time.time() proc = subprocess.run( cmd, cwd=str(ETL_CWD), capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=600, ) elapsed = time.time() - start output = proc.stdout + proc.stderr # 判断是否成功 has_error = False error_lines = [] for line in output.splitlines(): if "ERROR" in line or "CRITICAL" in line: has_error = True error_lines.append(line.strip()) return { "task": task, "exit_code": proc.returncode, "elapsed": elapsed, "success": proc.returncode == 0 and not has_error, "errors": error_lines[:5], "output": output, } def main(): now = datetime.now(TZ) print(f"{'='*60}") print(f"失败任务逐个重跑验证 v2") print(f"时间: {now.isoformat()}") print(f"任务数: {len(FAILED_TASKS)}") print(f"{'='*60}\n") results = [] total_start = time.time() for i, task in enumerate(FAILED_TASKS, 1): print(f"[{i}/{len(FAILED_TASKS)}] {task} ...", end=" ", flush=True) try: r = run_single_task(task) results.append(r) icon = "✅" if r["success"] else "❌" print(f"{icon} ({r['elapsed']:.0f}s, exit={r['exit_code']})") if not r["success"] and r["errors"]: for err in r["errors"][:3]: print(f" ⚠ {err[:200]}") except subprocess.TimeoutExpired: results.append({"task": task, "success": False, "elapsed": 600, "exit_code": -1, "errors": ["TIMEOUT"], "output": ""}) print("⏰ TIMEOUT") total_elapsed = time.time() - total_start # 汇总 ok = [r for r in results if r["success"]] fail = [r for r in results if not r["success"]] print(f"\n{'='*60}") print(f"汇总: {len(ok)}/{len(results)} 成功, {len(fail)} 失败, 总耗时 {total_elapsed:.0f}s") print(f"{'='*60}") if fail: print("\n失败任务:") for r in fail: print(f" ❌ {r['task']}: exit={r['exit_code']}") for err in r.get("errors", [])[:3]: print(f" {err[:200]}") # 保存日志 log_root = os.environ.get("SYSTEM_LOG_ROOT") if not log_root: raise RuntimeError("SYSTEM_LOG_ROOT 未设置") log_dir = Path(log_root) log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / f"{now.strftime('%Y%m%d')}_rerun_v2.log" with open(log_file, "w", encoding="utf-8") as f: f.write(f"汇总: {len(ok)}/{len(results)} 成功\n") f.write(f"总耗时: {total_elapsed:.0f}s\n\n") for r in results: icon = "OK" if r["success"] else "FAIL" f.write(f"[{icon}] {r['task']} ({r['elapsed']:.0f}s)\n") if not r["success"]: f.write(f" errors: {r.get('errors', [])}\n") f.write(f"--- output ---\n{r['output']}\n{'='*40}\n") print(f"\n日志: {log_file}") sys.exit(0 if not fail else 1) if __name__ == "__main__": main()