# -*- coding: utf-8 -*- """导出第二次 ETL 执行结果报告(回归验证)。 基于 export_etl_result.py 的逻辑,指向新的 execution_id。 """ from __future__ import annotations import json import sys from datetime import datetime from pathlib import Path import requests sys.path.insert(0, str(Path(__file__).parent)) from _env_paths import get_output_path BACKEND_URL = "http://localhost:8000" EXECUTION_ID = "e21e1935-5abf-434f-9984-69c492402db7" TOKEN_FILE = Path(__file__).parent / ".monitor_token" # refresh_token(用于自动刷新) REFRESH_TOKEN = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." "eyJzdWIiOiIxIiwic2l0ZV9pZCI6Mjc5MDY4NTQxNTQ0MzI2OSwidHlwZSI6InJlZnJlc2giLCJleHAiOjE3NzIyNjM0NjN9." "XYoda5lfxNtTSAGWoLlYhS9cA-hTK9iqK0SqUyn2KV4" ) def get_token() -> str: """刷新并返回 access_token。""" resp = requests.post( f"{BACKEND_URL}/api/auth/refresh", json={"refresh_token": REFRESH_TOKEN}, timeout=10, ) if resp.status_code != 200: raise RuntimeError(f"刷新 token 失败: {resp.status_code}") token = resp.json()["access_token"] TOKEN_FILE.write_text(token, encoding="utf-8") return token def fetch_history(token: str) -> dict: r = requests.get( f"{BACKEND_URL}/api/execution/history", headers={"Authorization": f"Bearer {token}"}, params={"limit": 10}, timeout=10, ) r.raise_for_status() for item in r.json(): if item.get("id") == EXECUTION_ID: return item return {} def fetch_logs(token: str) -> dict: r = requests.get( f"{BACKEND_URL}/api/execution/{EXECUTION_ID}/logs", headers={"Authorization": f"Bearer {token}"}, timeout=60, ) r.raise_for_status() return r.json() def main(): out_dir = get_output_path("SYSTEM_LOG_ROOT") token = get_token() print("获取执行历史...") execution = fetch_history(token) if not execution: print(f"❌ 未找到 execution_id={EXECUTION_ID}") sys.exit(1) status = execution.get("status", "unknown") duration_ms = execution.get("duration_ms", 0) exit_code = execution.get("exit_code") started = execution.get("started_at", "") finished = execution.get("finished_at", "") task_codes = execution.get("task_codes", []) summary = execution.get("summary") print(f" 状态: {status}, 耗时: {duration_ms / 1000:.1f}s, exit_code: {exit_code}") print(f" 任务数: {len(task_codes)}") print("获取执行日志...") logs = fetch_logs(token) output_log = logs.get("output_log", "") or "" error_log = logs.get("error_log", "") or "" print(f" output_log: {len(output_log)} 字符, error_log: {len(error_log)} 字符") # 生成报告 now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") lines = [ "# ETL 回归执行结果报告(第二次)", "", f"> 生成时间:{now}", f"> execution_id:{EXECUTION_ID}", f"> 目的:验证 DWS_ASSISTANT_DAILY 修复 + 补跑上次失败的 31 个任务", "", "---", "", "## 执行概览", "", "| 项目 | 值 |", "|------|-----|", f"| 状态 | {status} |", f"| 开始时间 | {started} |", f"| 结束时间 | {finished} |", f"| 总时长 | {duration_ms / 1000:.1f}s ({duration_ms / 60000:.1f}m) |", f"| 退出码 | {exit_code} |", f"| 任务总数 | {len(task_codes)} |", "", ] if summary: lines.extend([ "## Summary(CLI 输出)", "", "```", json.dumps(summary, ensure_ascii=False, indent=2) if isinstance(summary, dict) else str(summary), "```", "", ]) # 输出日志摘要 if error_log: # 尝试从 error_log 提取任务级结果 lines.extend([ "## 执行日志(error_log 末尾 100 行)", "", "```", ]) err_lines = error_log.strip().split("\n") for line in err_lines[-100:]: lines.append(line) lines.extend(["```", ""]) if output_log: lines.extend([ "## 执行日志(output_log 末尾 50 行)", "", "```", ]) out_lines = output_log.strip().split("\n") for line in out_lines[-50:]: lines.append(line) lines.extend(["```", ""]) # 与第一次执行的对比 lines.extend([ "---", "", "## 与第一次执行的对比", "", "| 项目 | 第一次 | 第二次(本次) |", "|------|--------|---------------|", f"| 任务数 | 41 | {len(task_codes)} |", f"| 状态 | success (exit_code=0) | {status} (exit_code={exit_code}) |", "| 耗时 | 590.7s (9.8m) | {:.1f}s ({:.1f}m) |".format(duration_ms / 1000, duration_ms / 60000), "| 成功 | 10/41 | 待分析 |", "| 失败 | 31/41 | 待分析 |", "| 根因 | DWS_ASSISTANT_DAILY SQL 字段错误 | — |", "", ]) report = "\n".join(lines) out_file = out_dir / "2026-02-21__etl_run_result_v2.md" out_file.write_text(report, encoding="utf-8") print(f"✅ 报告已导出: {out_file}") # 保存原始数据 raw_file = out_dir / "2026-02-21__etl_run_raw_v2.json" raw_data = { "execution": execution, "output_log_length": len(output_log), "error_log_length": len(error_log), "output_log_tail_200": "\n".join(output_log.strip().split("\n")[-200:]) if output_log else "", "error_log_tail_200": "\n".join(error_log.strip().split("\n")[-200:]) if error_log else "", } raw_file.write_text( json.dumps(raw_data, ensure_ascii=False, indent=2, default=str), encoding="utf-8", ) print(f"✅ 原始数据已导出: {raw_file}") if __name__ == "__main__": main()