363 lines
11 KiB
Python
363 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
从后端 API 获取 ETL 执行日志,解析各任务结果,导出执行结果报告。
|
||
|
||
用法:python scripts/ops/export_etl_result.py
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
import sys
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import requests
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
from _env_paths import get_output_path
|
||
|
||
BACKEND_URL = "http://localhost:8000"
|
||
EXECUTION_ID = "dbf0c29a-253a-4705-a1ef-35cd71243d48"
|
||
TOKEN_FILE = Path(__file__).parent / ".monitor_token"
|
||
|
||
|
||
def get_token() -> str:
|
||
if TOKEN_FILE.exists():
|
||
return TOKEN_FILE.read_text(encoding="utf-8").strip()
|
||
return ""
|
||
|
||
|
||
def fetch_history(token: str) -> dict:
|
||
r = requests.get(
|
||
f"{BACKEND_URL}/api/execution/history",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
params={"limit": 5},
|
||
timeout=10,
|
||
)
|
||
r.raise_for_status()
|
||
for item in r.json():
|
||
if item.get("id") == EXECUTION_ID:
|
||
return item
|
||
return r.json()[0] if r.json() else {}
|
||
|
||
|
||
def fetch_logs(token: str) -> dict:
|
||
r = requests.get(
|
||
f"{BACKEND_URL}/api/execution/{EXECUTION_ID}/logs",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
timeout=30,
|
||
)
|
||
r.raise_for_status()
|
||
return r.json()
|
||
|
||
|
||
|
||
def parse_log(error_log: str) -> list[dict]:
|
||
"""从 stderr 日志解析各任务的执行结果和计时"""
|
||
results = []
|
||
lines = error_log.split("\n") if error_log else []
|
||
|
||
# 正则:提取时间戳
|
||
ts_re = re.compile(r"\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]")
|
||
# 正则:任务开始
|
||
start_re = re.compile(r"开始执行(\S+)\s+\((\w+)\)")
|
||
# 正则:ODS 任务完成
|
||
ods_done_re = re.compile(r"(\S+)\s+ODS 任务完成:\s+(\{.*\})")
|
||
# 正则:任务失败
|
||
fail_re = re.compile(r"任务\s+(\S+)\s+失败:\s+(.*)")
|
||
# 正则:DWS 抓取阶段开始
|
||
dws_start_re = re.compile(r"(\S+):\s+抓取阶段开始")
|
||
# 正则:DWS 提取数据
|
||
dws_extract_re = re.compile(r"(\S+):\s+提取数据")
|
||
# 正则:DWD 完成
|
||
dwd_done_re = re.compile(r"(\S+)\s+DWD.*完成|(\S+):\s+DWD.*装载完成")
|
||
# 正则:工具类任务开始
|
||
util_start_re = re.compile(r"(\S+):\s+开始执行工具类任务")
|
||
# 正则:工具类任务失败
|
||
util_fail_re = re.compile(r"(\S+):\s+工具类任务执行失败")
|
||
# 正则:DWS/INDEX 任务完成
|
||
dws_done_re = re.compile(r"(\S+)\s+(?:DWS|INDEX)\s+任务完成")
|
||
# 正则:窗口拆分
|
||
window_re = re.compile(r"(\S+):\s+窗口拆分为\s+(\d+)\s+段")
|
||
|
||
task_starts: dict[str, str] = {} # task_code -> start_timestamp
|
||
task_windows: dict[str, int] = {} # task_code -> window_count
|
||
|
||
for line in lines:
|
||
ts_match = ts_re.search(line)
|
||
ts = ts_match.group(1) if ts_match else ""
|
||
|
||
# 任务开始
|
||
m = start_re.search(line)
|
||
if m:
|
||
task_code = m.group(1)
|
||
task_starts[task_code] = ts
|
||
continue
|
||
|
||
# DWS 抓取阶段开始
|
||
m = dws_start_re.search(line)
|
||
if m:
|
||
task_code = m.group(1)
|
||
if task_code not in task_starts:
|
||
task_starts[task_code] = ts
|
||
continue
|
||
|
||
# 工具类任务开始
|
||
m = util_start_re.search(line)
|
||
if m:
|
||
task_code = m.group(1)
|
||
if task_code not in task_starts:
|
||
task_starts[task_code] = ts
|
||
continue
|
||
|
||
# 窗口拆分
|
||
m = window_re.search(line)
|
||
if m:
|
||
task_windows[m.group(1)] = int(m.group(2))
|
||
continue
|
||
|
||
# ODS 任务完成
|
||
m = ods_done_re.search(line)
|
||
if m:
|
||
task_code = m.group(1)
|
||
stats_str = m.group(2)
|
||
results.append({
|
||
"task": task_code,
|
||
"layer": "ODS",
|
||
"status": "success",
|
||
"start": task_starts.get(task_code, ""),
|
||
"end": ts,
|
||
"windows": task_windows.get(task_code, 0),
|
||
"stats": stats_str,
|
||
})
|
||
continue
|
||
|
||
# 任务失败
|
||
m = fail_re.search(line)
|
||
if m:
|
||
task_code = m.group(1)
|
||
error_msg = m.group(2).strip()
|
||
# 避免重复记录(级联错误会多次出现)
|
||
if not any(r["task"] == task_code for r in results):
|
||
results.append({
|
||
"task": task_code,
|
||
"layer": guess_layer(task_code),
|
||
"status": "failed",
|
||
"start": task_starts.get(task_code, ""),
|
||
"end": ts,
|
||
"windows": task_windows.get(task_code, 0),
|
||
"error": error_msg[:120],
|
||
})
|
||
continue
|
||
|
||
# 检查是否有 DWD_LOAD_FROM_ODS 完成的标记
|
||
for line in lines:
|
||
if "DWD_LOAD_FROM_ODS" in line and "完成" in line:
|
||
ts_match = ts_re.search(line)
|
||
ts = ts_match.group(1) if ts_match else ""
|
||
if not any(r["task"] == "DWD_LOAD_FROM_ODS" for r in results):
|
||
results.append({
|
||
"task": "DWD_LOAD_FROM_ODS",
|
||
"layer": "DWD",
|
||
"status": "success",
|
||
"start": task_starts.get("DWD_LOAD_FROM_ODS", ""),
|
||
"end": ts,
|
||
"windows": 0,
|
||
"stats": "",
|
||
})
|
||
break
|
||
|
||
return results
|
||
|
||
|
||
def guess_layer(task_code: str) -> str:
|
||
if task_code.startswith("ODS_"):
|
||
return "ODS"
|
||
if task_code.startswith("DWD_"):
|
||
return "DWD"
|
||
if task_code.startswith("DWS_"):
|
||
return "DWS"
|
||
if task_code.startswith("INDEX_"):
|
||
return "INDEX"
|
||
return "OTHER"
|
||
|
||
|
||
|
||
def calc_duration(start: str, end: str) -> str:
|
||
"""计算时长"""
|
||
if not start or not end:
|
||
return "—"
|
||
try:
|
||
fmt = "%Y-%m-%d %H:%M:%S"
|
||
s = datetime.strptime(start, fmt)
|
||
e = datetime.strptime(end, fmt)
|
||
delta = (e - s).total_seconds()
|
||
if delta < 60:
|
||
return f"{delta:.1f}s"
|
||
elif delta < 3600:
|
||
return f"{delta / 60:.1f}m"
|
||
else:
|
||
return f"{delta / 3600:.1f}h"
|
||
except Exception:
|
||
return "—"
|
||
|
||
|
||
def generate_report(execution: dict, task_results: list[dict]) -> str:
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
status = execution.get("status", "unknown")
|
||
started = execution.get("started_at", "—")
|
||
finished = execution.get("finished_at", "—")
|
||
duration_ms = execution.get("duration_ms", 0)
|
||
exit_code = execution.get("exit_code", "—")
|
||
|
||
if duration_ms:
|
||
dur_str = f"{duration_ms / 1000:.1f}s ({duration_ms / 60000:.1f}m)"
|
||
else:
|
||
dur_str = "—"
|
||
|
||
success_count = sum(1 for r in task_results if r["status"] == "success")
|
||
failed_count = sum(1 for r in task_results if r["status"] == "failed")
|
||
|
||
lines = [
|
||
"# ETL 执行结果报告",
|
||
"",
|
||
f"> 生成时间:{now}",
|
||
f"> execution_id:{EXECUTION_ID}",
|
||
f"> run_uuid:4ba9d2d365ee4a858f1c4104b1942dc2",
|
||
"",
|
||
"---",
|
||
"",
|
||
"## 执行概览",
|
||
"",
|
||
"| 项目 | 值 |",
|
||
"|------|-----|",
|
||
f"| 状态 | {status} |",
|
||
f"| 开始时间 | {started} |",
|
||
f"| 结束时间 | {finished} |",
|
||
f"| 总时长 | {dur_str} |",
|
||
f"| 退出码 | {exit_code} |",
|
||
f"| 任务总数 | {len(execution.get('task_codes', []))} |",
|
||
f"| 成功 | {success_count} |",
|
||
f"| 失败 | {failed_count} |",
|
||
"",
|
||
"---",
|
||
"",
|
||
"## 任务级结果",
|
||
"",
|
||
"| # | 任务 | 层 | 状态 | 开始 | 结束 | 耗时 | 窗口数 | 备注 |",
|
||
"|---|------|-----|------|------|------|------|--------|------|",
|
||
]
|
||
|
||
for i, r in enumerate(task_results, 1):
|
||
dur = calc_duration(r.get("start", ""), r.get("end", ""))
|
||
note = r.get("stats", r.get("error", ""))
|
||
if len(note) > 60:
|
||
note = note[:57] + "..."
|
||
win = r.get("windows", 0)
|
||
win_str = str(win) if win else "—"
|
||
start_short = r.get("start", "—")
|
||
if start_short and len(start_short) > 8:
|
||
start_short = start_short.split(" ")[-1] if " " in start_short else start_short
|
||
end_short = r.get("end", "—")
|
||
if end_short and len(end_short) > 8:
|
||
end_short = end_short.split(" ")[-1] if " " in end_short else end_short
|
||
|
||
status_emoji = "✅" if r["status"] == "success" else "❌"
|
||
lines.append(
|
||
f"| {i} | {r['task']} | {r['layer']} | {status_emoji} {r['status']} "
|
||
f"| {start_short} | {end_short} | {dur} | {win_str} | {note} |"
|
||
)
|
||
|
||
lines.extend([
|
||
"",
|
||
"---",
|
||
"",
|
||
"## 失败任务分析",
|
||
"",
|
||
])
|
||
|
||
failed_tasks = [r for r in task_results if r["status"] == "failed"]
|
||
if failed_tasks:
|
||
root_cause = failed_tasks[0] if failed_tasks else None
|
||
cascade = failed_tasks[1:] if len(failed_tasks) > 1 else []
|
||
|
||
lines.extend([
|
||
f"### 根因:{root_cause['task']}",
|
||
"",
|
||
f"错误:`{root_cause.get('error', '未知')}`",
|
||
"",
|
||
"### 级联失败",
|
||
"",
|
||
])
|
||
|
||
if cascade:
|
||
for r in cascade:
|
||
lines.append(f"- {r['task']}:InFailedSqlTransaction(事务污染)")
|
||
else:
|
||
lines.append("无级联失败。")
|
||
|
||
lines.extend([
|
||
"",
|
||
"### 修复状态",
|
||
"",
|
||
"代码已修复(4 处改动),待下次执行验证。",
|
||
"详见:`export/SYSTEM/LOGS/2026-02-21__dws_assistant_daily_bug_fix.md`",
|
||
])
|
||
else:
|
||
lines.append("无失败任务。")
|
||
|
||
lines.extend([
|
||
"",
|
||
"---",
|
||
"",
|
||
"## 下一步",
|
||
"",
|
||
"1. 重新提交包含 9 个失败任务的执行,验证修复",
|
||
"2. 运行 ETL Unified Analysis(统一分析)",
|
||
"3. 运行 /audit 审计",
|
||
])
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def main():
|
||
out_dir = get_output_path("SYSTEM_LOG_ROOT")
|
||
token = get_token()
|
||
|
||
print("获取执行历史...")
|
||
execution = fetch_history(token)
|
||
print(f" 状态: {execution.get('status')}, 时长: {execution.get('duration_ms', 0) / 1000:.1f}s")
|
||
|
||
print("获取执行日志...")
|
||
logs = fetch_logs(token)
|
||
error_log = logs.get("error_log", "")
|
||
print(f" error_log 长度: {len(error_log)} 字符")
|
||
|
||
print("解析任务结果...")
|
||
task_results = parse_log(error_log)
|
||
print(f" 解析到 {len(task_results)} 个任务结果")
|
||
|
||
print("生成报告...")
|
||
report = generate_report(execution, task_results)
|
||
out_file = out_dir / "2026-02-21__etl_run_result.md"
|
||
out_file.write_text(report, encoding="utf-8")
|
||
print(f"执行结果报告已导出: {out_file}")
|
||
|
||
# 保存原始 API 数据
|
||
raw_file = out_dir / "2026-02-21__etl_run_raw.json"
|
||
raw_data = {
|
||
"execution": execution,
|
||
"error_log_length": len(error_log),
|
||
"task_results_parsed": task_results,
|
||
}
|
||
raw_file.write_text(
|
||
json.dumps(raw_data, ensure_ascii=False, indent=2, default=str),
|
||
encoding="utf-8",
|
||
)
|
||
print(f"原始数据已导出: {raw_file}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|