Files
Neo-ZQYY/scripts/ops/export_etl_result.py

365 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
从后端 API 获取 ETL 执行日志,解析各任务结果,导出执行结果报告。
用法python scripts/ops/export_etl_result.py
"""
from __future__ import annotations
import json
import re
import sys
from datetime import datetime
from pathlib import Path
import requests
sys.path.insert(0, str(Path(__file__).parent))
from _env_paths import get_output_path
BACKEND_URL = "http://localhost:8000"
EXECUTION_ID = "dbf0c29a-253a-4705-a1ef-35cd71243d48"
TOKEN_FILE = Path(__file__).parent / ".monitor_token"
def get_token() -> str:
if TOKEN_FILE.exists():
return TOKEN_FILE.read_text(encoding="utf-8").strip()
return ""
def fetch_history(token: str) -> dict:
r = requests.get(
f"{BACKEND_URL}/api/execution/history",
headers={"Authorization": f"Bearer {token}"},
params={"limit": 5},
timeout=10,
)
r.raise_for_status()
for item in r.json():
if item.get("id") == EXECUTION_ID:
return item
return r.json()[0] if r.json() else {}
def fetch_logs(token: str) -> dict:
r = requests.get(
f"{BACKEND_URL}/api/execution/{EXECUTION_ID}/logs",
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
r.raise_for_status()
return r.json()
def parse_log(error_log: str) -> list[dict]:
"""从 stderr 日志解析各任务的执行结果和计时"""
results = []
lines = error_log.split("\n") if error_log else []
# 正则:提取时间戳
ts_re = re.compile(r"\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]")
# 正则:任务开始
start_re = re.compile(r"开始执行(\S+)\s+\((\w+)\)")
# 正则ODS 任务完成
ods_done_re = re.compile(r"(\S+)\s+ODS 任务完成:\s+(\{.*\})")
# 正则:任务失败
fail_re = re.compile(r"任务\s+(\S+)\s+失败:\s+(.*)")
# 正则DWS 抓取阶段开始
dws_start_re = re.compile(r"(\S+):\s+抓取阶段开始")
# 正则DWS 提取数据
dws_extract_re = re.compile(r"(\S+):\s+提取数据")
# 正则DWD 完成
dwd_done_re = re.compile(r"(\S+)\s+DWD.*完成|(\S+):\s+DWD.*装载完成")
# 正则:工具类任务开始
util_start_re = re.compile(r"(\S+):\s+开始执行工具类任务")
# 正则:工具类任务失败
util_fail_re = re.compile(r"(\S+):\s+工具类任务执行失败")
# 正则DWS/INDEX 任务完成
dws_done_re = re.compile(r"(\S+)\s+(?:DWS|INDEX)\s+任务完成")
# 正则:窗口拆分
window_re = re.compile(r"(\S+):\s+窗口拆分为\s+(\d+)\s+段")
task_starts: dict[str, str] = {} # task_code -> start_timestamp
task_windows: dict[str, int] = {} # task_code -> window_count
for line in lines:
ts_match = ts_re.search(line)
ts = ts_match.group(1) if ts_match else ""
# 任务开始
m = start_re.search(line)
if m:
task_code = m.group(1)
task_starts[task_code] = ts
continue
# DWS 抓取阶段开始
m = dws_start_re.search(line)
if m:
task_code = m.group(1)
if task_code not in task_starts:
task_starts[task_code] = ts
continue
# 工具类任务开始
m = util_start_re.search(line)
if m:
task_code = m.group(1)
if task_code not in task_starts:
task_starts[task_code] = ts
continue
# 窗口拆分
m = window_re.search(line)
if m:
task_windows[m.group(1)] = int(m.group(2))
continue
# ODS 任务完成
m = ods_done_re.search(line)
if m:
task_code = m.group(1)
stats_str = m.group(2)
results.append({
"task": task_code,
"layer": "ODS",
"status": "success",
"start": task_starts.get(task_code, ""),
"end": ts,
"windows": task_windows.get(task_code, 0),
"stats": stats_str,
})
continue
# 任务失败
m = fail_re.search(line)
if m:
task_code = m.group(1)
error_msg = m.group(2).strip()
# 避免重复记录(级联错误会多次出现)
if not any(r["task"] == task_code for r in results):
results.append({
"task": task_code,
"layer": guess_layer(task_code),
"status": "failed",
"start": task_starts.get(task_code, ""),
"end": ts,
"windows": task_windows.get(task_code, 0),
"error": error_msg[:120],
})
continue
# 检查是否有 DWD_LOAD_FROM_ODS 完成的标记
for line in lines:
if "DWD_LOAD_FROM_ODS" in line and "完成" in line:
ts_match = ts_re.search(line)
ts = ts_match.group(1) if ts_match else ""
if not any(r["task"] == "DWD_LOAD_FROM_ODS" for r in results):
results.append({
"task": "DWD_LOAD_FROM_ODS",
"layer": "DWD",
"status": "success",
"start": task_starts.get("DWD_LOAD_FROM_ODS", ""),
"end": ts,
"windows": 0,
"stats": "",
})
break
return results
def guess_layer(task_code: str) -> str:
if task_code.startswith("ODS_"):
return "ODS"
if task_code.startswith("DWD_"):
return "DWD"
if task_code.startswith("DWS_"):
return "DWS"
if task_code.startswith("INDEX_"):
return "INDEX"
return "OTHER"
def calc_duration(start: str, end: str) -> str:
"""计算时长"""
if not start or not end:
return ""
try:
fmt = "%Y-%m-%d %H:%M:%S"
s = datetime.strptime(start, fmt)
e = datetime.strptime(end, fmt)
delta = (e - s).total_seconds()
if delta < 60:
return f"{delta:.1f}s"
elif delta < 3600:
return f"{delta / 60:.1f}m"
else:
return f"{delta / 3600:.1f}h"
except Exception:
return ""
def generate_report(execution: dict, task_results: list[dict]) -> str:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
status = execution.get("status", "unknown")
started = execution.get("started_at", "")
finished = execution.get("finished_at", "")
duration_ms = execution.get("duration_ms", 0)
exit_code = execution.get("exit_code", "")
if duration_ms:
dur_str = f"{duration_ms / 1000:.1f}s ({duration_ms / 60000:.1f}m)"
else:
dur_str = ""
success_count = sum(1 for r in task_results if r["status"] == "success")
failed_count = sum(1 for r in task_results if r["status"] == "failed")
lines = [
"# ETL 执行结果报告",
"",
f"> 生成时间:{now}",
f"> execution_id{EXECUTION_ID}",
f"> run_uuid4ba9d2d365ee4a858f1c4104b1942dc2",
"",
"---",
"",
"## 执行概览",
"",
"| 项目 | 值 |",
"|------|-----|",
f"| 状态 | {status} |",
f"| 开始时间 | {started} |",
f"| 结束时间 | {finished} |",
f"| 总时长 | {dur_str} |",
f"| 退出码 | {exit_code} |",
f"| 任务总数 | {len(execution.get('task_codes', []))} |",
f"| 成功 | {success_count} |",
f"| 失败 | {failed_count} |",
"",
"---",
"",
"## 任务级结果",
"",
"| # | 任务 | 层 | 状态 | 开始 | 结束 | 耗时 | 窗口数 | 备注 |",
"|---|------|-----|------|------|------|------|--------|------|",
]
for i, r in enumerate(task_results, 1):
dur = calc_duration(r.get("start", ""), r.get("end", ""))
note = r.get("stats", r.get("error", ""))
if len(note) > 60:
note = note[:57] + "..."
win = r.get("windows", 0)
win_str = str(win) if win else ""
start_short = r.get("start", "")
if start_short and len(start_short) > 8:
start_short = start_short.split(" ")[-1] if " " in start_short else start_short
end_short = r.get("end", "")
if end_short and len(end_short) > 8:
end_short = end_short.split(" ")[-1] if " " in end_short else end_short
status_emoji = "" if r["status"] == "success" else ""
lines.append(
f"| {i} | {r['task']} | {r['layer']} | {status_emoji} {r['status']} "
f"| {start_short} | {end_short} | {dur} | {win_str} | {note} |"
)
lines.extend([
"",
"---",
"",
"## 失败任务分析",
"",
])
failed_tasks = [r for r in task_results if r["status"] == "failed"]
if failed_tasks:
root_cause = failed_tasks[0] if failed_tasks else None
cascade = failed_tasks[1:] if len(failed_tasks) > 1 else []
lines.extend([
f"### 根因:{root_cause['task']}",
"",
f"错误:`{root_cause.get('error', '未知')}`",
"",
"原因:`_extract_trash_records` SQL 引用了 `dwd_assistant_trash_event` 中不存在的字段 `assistant_service_id`。",
"",
"### 级联失败",
"",
])
if cascade:
for r in cascade:
lines.append(f"- {r['task']}InFailedSqlTransaction事务污染")
else:
lines.append("无级联失败。")
lines.extend([
"",
"### 修复状态",
"",
"代码已修复4 处改动),待下次执行验证。",
"详见:`export/SYSTEM/LOGS/2026-02-21__dws_assistant_daily_bug_fix.md`",
])
else:
lines.append("无失败任务。")
lines.extend([
"",
"---",
"",
"## 下一步",
"",
"1. 重新提交包含 9 个失败任务的执行,验证修复",
"2. 运行 ETL Data Consistency Check",
"3. 运行 /audit 审计",
])
return "\n".join(lines)
def main():
out_dir = get_output_path("SYSTEM_LOG_ROOT")
token = get_token()
print("获取执行历史...")
execution = fetch_history(token)
print(f" 状态: {execution.get('status')}, 时长: {execution.get('duration_ms', 0) / 1000:.1f}s")
print("获取执行日志...")
logs = fetch_logs(token)
error_log = logs.get("error_log", "")
print(f" error_log 长度: {len(error_log)} 字符")
print("解析任务结果...")
task_results = parse_log(error_log)
print(f" 解析到 {len(task_results)} 个任务结果")
print("生成报告...")
report = generate_report(execution, task_results)
out_file = out_dir / "2026-02-21__etl_run_result.md"
out_file.write_text(report, encoding="utf-8")
print(f"执行结果报告已导出: {out_file}")
# 保存原始 API 数据
raw_file = out_dir / "2026-02-21__etl_run_raw.json"
raw_data = {
"execution": execution,
"error_log_length": len(error_log),
"task_results_parsed": task_results,
}
raw_file.write_text(
json.dumps(raw_data, ensure_ascii=False, indent=2, default=str),
encoding="utf-8",
)
print(f"原始数据已导出: {raw_file}")
if __name__ == "__main__":
main()