Files
Neo-ZQYY/scripts/ops/export_etl_result_v2.py

193 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""导出第二次 ETL 执行结果报告(回归验证)。
基于 export_etl_result.py 的逻辑,指向新的 execution_id。
"""
from __future__ import annotations
import json
import sys
from datetime import datetime
from pathlib import Path
import requests
sys.path.insert(0, str(Path(__file__).parent))
from _env_paths import get_output_path
BACKEND_URL = "http://localhost:8000"
EXECUTION_ID = "e21e1935-5abf-434f-9984-69c492402db7"
TOKEN_FILE = Path(__file__).parent / ".monitor_token"
# refresh_token用于自动刷新
REFRESH_TOKEN = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
"eyJzdWIiOiIxIiwic2l0ZV9pZCI6Mjc5MDY4NTQxNTQ0MzI2OSwidHlwZSI6InJlZnJlc2giLCJleHAiOjE3NzIyNjM0NjN9."
"XYoda5lfxNtTSAGWoLlYhS9cA-hTK9iqK0SqUyn2KV4"
)
def get_token() -> str:
"""刷新并返回 access_token。"""
resp = requests.post(
f"{BACKEND_URL}/api/auth/refresh",
json={"refresh_token": REFRESH_TOKEN},
timeout=10,
)
if resp.status_code != 200:
raise RuntimeError(f"刷新 token 失败: {resp.status_code}")
token = resp.json()["access_token"]
TOKEN_FILE.write_text(token, encoding="utf-8")
return token
def fetch_history(token: str) -> dict:
r = requests.get(
f"{BACKEND_URL}/api/execution/history",
headers={"Authorization": f"Bearer {token}"},
params={"limit": 10},
timeout=10,
)
r.raise_for_status()
for item in r.json():
if item.get("id") == EXECUTION_ID:
return item
return {}
def fetch_logs(token: str) -> dict:
r = requests.get(
f"{BACKEND_URL}/api/execution/{EXECUTION_ID}/logs",
headers={"Authorization": f"Bearer {token}"},
timeout=60,
)
r.raise_for_status()
return r.json()
def main():
out_dir = get_output_path("SYSTEM_LOG_ROOT")
token = get_token()
print("获取执行历史...")
execution = fetch_history(token)
if not execution:
print(f"❌ 未找到 execution_id={EXECUTION_ID}")
sys.exit(1)
status = execution.get("status", "unknown")
duration_ms = execution.get("duration_ms", 0)
exit_code = execution.get("exit_code")
started = execution.get("started_at", "")
finished = execution.get("finished_at", "")
task_codes = execution.get("task_codes", [])
summary = execution.get("summary")
print(f" 状态: {status}, 耗时: {duration_ms / 1000:.1f}s, exit_code: {exit_code}")
print(f" 任务数: {len(task_codes)}")
print("获取执行日志...")
logs = fetch_logs(token)
output_log = logs.get("output_log", "") or ""
error_log = logs.get("error_log", "") or ""
print(f" output_log: {len(output_log)} 字符, error_log: {len(error_log)} 字符")
# 生成报告
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines = [
"# ETL 回归执行结果报告(第二次)",
"",
f"> 生成时间:{now}",
f"> execution_id{EXECUTION_ID}",
f"> 目的:验证 DWS_ASSISTANT_DAILY 修复 + 补跑上次失败的 31 个任务",
"",
"---",
"",
"## 执行概览",
"",
"| 项目 | 值 |",
"|------|-----|",
f"| 状态 | {status} |",
f"| 开始时间 | {started} |",
f"| 结束时间 | {finished} |",
f"| 总时长 | {duration_ms / 1000:.1f}s ({duration_ms / 60000:.1f}m) |",
f"| 退出码 | {exit_code} |",
f"| 任务总数 | {len(task_codes)} |",
"",
]
if summary:
lines.extend([
"## SummaryCLI 输出)",
"",
"```",
json.dumps(summary, ensure_ascii=False, indent=2) if isinstance(summary, dict) else str(summary),
"```",
"",
])
# 输出日志摘要
if error_log:
# 尝试从 error_log 提取任务级结果
lines.extend([
"## 执行日志error_log 末尾 100 行)",
"",
"```",
])
err_lines = error_log.strip().split("\n")
for line in err_lines[-100:]:
lines.append(line)
lines.extend(["```", ""])
if output_log:
lines.extend([
"## 执行日志output_log 末尾 50 行)",
"",
"```",
])
out_lines = output_log.strip().split("\n")
for line in out_lines[-50:]:
lines.append(line)
lines.extend(["```", ""])
# 与第一次执行的对比
lines.extend([
"---",
"",
"## 与第一次执行的对比",
"",
"| 项目 | 第一次 | 第二次(本次) |",
"|------|--------|---------------|",
f"| 任务数 | 41 | {len(task_codes)} |",
f"| 状态 | success (exit_code=0) | {status} (exit_code={exit_code}) |",
"| 耗时 | 590.7s (9.8m) | {:.1f}s ({:.1f}m) |".format(duration_ms / 1000, duration_ms / 60000),
"| 成功 | 10/41 | 待分析 |",
"| 失败 | 31/41 | 待分析 |",
"| 根因 | DWS_ASSISTANT_DAILY SQL 字段错误 | — |",
"",
])
report = "\n".join(lines)
out_file = out_dir / "2026-02-21__etl_run_result_v2.md"
out_file.write_text(report, encoding="utf-8")
print(f"✅ 报告已导出: {out_file}")
# 保存原始数据
raw_file = out_dir / "2026-02-21__etl_run_raw_v2.json"
raw_data = {
"execution": execution,
"output_log_length": len(output_log),
"error_log_length": len(error_log),
"output_log_tail_200": "\n".join(output_log.strip().split("\n")[-200:]) if output_log else "",
"error_log_tail_200": "\n".join(error_log.strip().split("\n")[-200:]) if error_log else "",
}
raw_file.write_text(
json.dumps(raw_data, ensure_ascii=False, indent=2, default=str),
encoding="utf-8",
)
print(f"✅ 原始数据已导出: {raw_file}")
if __name__ == "__main__":
main()