329 lines
10 KiB
Python
329 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
监控当前 ETL 执行状态,完成后导出执行结果报告到 SYSTEM_LOG_ROOT。
|
||
|
||
通过后端 API 轮询执行历史,检测 run_uuid 对应的执行是否完成。
|
||
完成后从浏览器日志或 API 提取任务级结果,生成 Markdown 报告。
|
||
|
||
用法:python scripts/ops/monitor_etl_run.py
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import requests
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
from _env_paths import get_output_path
|
||
|
||
BACKEND_URL = "http://localhost:8000"
|
||
TARGET_RUN_UUID = "4ba9d2d365ee4a858f1c4104b1942dc2"
|
||
POLL_INTERVAL = 30 # 秒
|
||
|
||
|
||
def get_auth_token() -> str:
|
||
"""从后端登录获取 JWT token(使用测试账号)"""
|
||
# 尝试读取已有 token
|
||
token_file = Path(__file__).parent / ".monitor_token"
|
||
if token_file.exists():
|
||
token = token_file.read_text(encoding="utf-8").strip()
|
||
# 验证 token 是否有效
|
||
try:
|
||
r = requests.get(
|
||
f"{BACKEND_URL}/api/execution/history",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
params={"limit": 1},
|
||
timeout=5,
|
||
)
|
||
if r.status_code == 200:
|
||
return token
|
||
except Exception:
|
||
pass
|
||
|
||
# token 无效,需要重新登录
|
||
print("需要登录后端获取 token。请在浏览器中登录后,")
|
||
print("从浏览器 DevTools > Application > Local Storage 中复制 token,")
|
||
print("或直接输入(留空跳过,使用无认证模式):")
|
||
token = input("JWT Token: ").strip()
|
||
if token:
|
||
token_file.write_text(token, encoding="utf-8")
|
||
return token
|
||
|
||
|
||
def poll_execution_status(token: str) -> dict | None:
|
||
"""轮询执行状态"""
|
||
headers = {}
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
|
||
try:
|
||
r = requests.get(
|
||
f"{BACKEND_URL}/api/execution/history",
|
||
headers=headers,
|
||
params={"limit": 5},
|
||
timeout=10,
|
||
)
|
||
if r.status_code != 200:
|
||
print(f" API 返回 {r.status_code}: {r.text[:200]}")
|
||
return None
|
||
|
||
data = r.json()
|
||
items = data if isinstance(data, list) else data.get("items", data.get("data", []))
|
||
|
||
for item in items:
|
||
if item.get("run_uuid") == TARGET_RUN_UUID:
|
||
return item
|
||
|
||
# 没找到精确匹配,返回最新的
|
||
if items:
|
||
return items[0]
|
||
return None
|
||
|
||
except requests.exceptions.ConnectionError:
|
||
print(" 后端连接失败,可能已停止")
|
||
return None
|
||
except Exception as e:
|
||
print(f" API 请求异常: {e}")
|
||
return None
|
||
|
||
|
||
def extract_log_from_api(token: str) -> str | None:
|
||
"""尝试从 API 获取执行日志"""
|
||
headers = {}
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
|
||
try:
|
||
# 尝试获取日志
|
||
r = requests.get(
|
||
f"{BACKEND_URL}/api/execution/log/{TARGET_RUN_UUID}",
|
||
headers=headers,
|
||
timeout=30,
|
||
)
|
||
if r.status_code == 200:
|
||
return r.text
|
||
except Exception:
|
||
pass
|
||
|
||
return None
|
||
|
||
|
||
def parse_task_results_from_log(log_text: str) -> list[dict]:
|
||
"""从日志文本解析各任务的执行结果"""
|
||
results = []
|
||
lines = log_text.split("\n") if log_text else []
|
||
|
||
current_task = None
|
||
task_start_time = None
|
||
|
||
for line in lines:
|
||
# 检测任务开始
|
||
if "开始执行" in line and "ODS" in line or "DWS" in line or "DWD" in line:
|
||
# 提取时间戳
|
||
ts = extract_timestamp(line)
|
||
# 提取任务名
|
||
for token in line.split():
|
||
if token.startswith("ODS_") or token.startswith("DWS_") or token.startswith("DWD_"):
|
||
task_name = token.rstrip(":")
|
||
current_task = task_name
|
||
task_start_time = ts
|
||
break
|
||
|
||
# 检测任务完成
|
||
if current_task and "任务完成" in line and current_task in line:
|
||
ts = extract_timestamp(line)
|
||
# 提取统计信息
|
||
stats = extract_stats(line)
|
||
results.append({
|
||
"task": current_task,
|
||
"status": "success",
|
||
"start": task_start_time,
|
||
"end": ts,
|
||
"stats": stats,
|
||
})
|
||
current_task = None
|
||
|
||
# 检测任务失败
|
||
if "任务" in line and "失败" in line:
|
||
ts = extract_timestamp(line)
|
||
for token in line.split():
|
||
if token.startswith("ODS_") or token.startswith("DWS_") or token.startswith("DWD_"):
|
||
task_name = token.rstrip(":")
|
||
# 提取错误信息
|
||
error_msg = line.split("失败:")[-1].strip() if "失败:" in line else "未知错误"
|
||
results.append({
|
||
"task": task_name,
|
||
"status": "failed",
|
||
"start": task_start_time if current_task == task_name else ts,
|
||
"end": ts,
|
||
"error": error_msg,
|
||
})
|
||
if current_task == task_name:
|
||
current_task = None
|
||
break
|
||
|
||
return results
|
||
|
||
|
||
def extract_timestamp(line: str) -> str:
|
||
"""从日志行提取时间戳"""
|
||
# 格式: [2026-02-21 15:29:21]
|
||
if "[" in line and "]" in line:
|
||
start = line.index("[") + 1
|
||
end = line.index("]", start)
|
||
return line[start:end]
|
||
return ""
|
||
|
||
|
||
def extract_stats(line: str) -> str:
|
||
"""从日志行提取统计信息"""
|
||
if "{" in line and "}" in line:
|
||
start = line.index("{")
|
||
end = line.index("}") + 1
|
||
return line[start:end]
|
||
return ""
|
||
|
||
|
||
|
||
def generate_report(execution: dict, task_results: list[dict]) -> str:
|
||
"""生成执行结果 Markdown 报告"""
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
status = execution.get("status", "unknown")
|
||
start_time = execution.get("started_at", execution.get("start_time", "—"))
|
||
end_time = execution.get("ended_at", execution.get("end_time", "—"))
|
||
duration = execution.get("duration", "—")
|
||
exit_code = execution.get("exit_code", "—")
|
||
|
||
lines = [
|
||
f"# ETL 执行结果报告",
|
||
f"",
|
||
f"> 生成时间:{now}",
|
||
f"> run_uuid:{TARGET_RUN_UUID}",
|
||
f"",
|
||
f"---",
|
||
f"",
|
||
f"## 执行概览",
|
||
f"",
|
||
f"| 项目 | 值 |",
|
||
f"|------|-----|",
|
||
f"| 状态 | {status} |",
|
||
f"| 开始时间 | {start_time} |",
|
||
f"| 结束时间 | {end_time} |",
|
||
f"| 时长 | {duration} |",
|
||
f"| 退出码 | {exit_code} |",
|
||
f"",
|
||
]
|
||
|
||
# 任务级结果
|
||
if task_results:
|
||
success_count = sum(1 for r in task_results if r["status"] == "success")
|
||
failed_count = sum(1 for r in task_results if r["status"] == "failed")
|
||
|
||
lines.extend([
|
||
f"## 任务级结果",
|
||
f"",
|
||
f"成功:{success_count} | 失败:{failed_count} | 总计:{len(task_results)}",
|
||
f"",
|
||
f"| # | 任务 | 状态 | 开始 | 结束 | 备注 |",
|
||
f"|---|------|------|------|------|------|",
|
||
])
|
||
|
||
for i, r in enumerate(task_results, 1):
|
||
note = r.get("stats", r.get("error", ""))
|
||
if len(note) > 80:
|
||
note = note[:77] + "..."
|
||
lines.append(
|
||
f"| {i} | {r['task']} | {r['status']} | {r.get('start', '—')} | {r.get('end', '—')} | {note} |"
|
||
)
|
||
|
||
lines.append("")
|
||
|
||
# 已知问题
|
||
lines.extend([
|
||
f"## 已知问题",
|
||
f"",
|
||
f"### DWS_ASSISTANT_DAILY 字段引用错误(已修复)",
|
||
f"",
|
||
f"根因:`_extract_trash_records` SQL 引用了 `dwd_assistant_trash_event` 中不存在的字段。",
|
||
f"级联影响:9 个任务失败(DWS_ASSISTANT_DAILY 及其下游 + ODS_SETTLEMENT_RECORDS/PAYMENT/REFUND/BUILD_ORDER_SUMMARY)。",
|
||
f"修复状态:代码已修复,待下次执行验证。",
|
||
f"详见:`export/SYSTEM/LOGS/2026-02-21__dws_assistant_daily_bug_fix.md`",
|
||
f"",
|
||
f"---",
|
||
f"",
|
||
f"## 下一步",
|
||
f"",
|
||
f"1. 重新提交包含失败任务的执行,验证修复",
|
||
f"2. 运行 ETL Data Consistency Check",
|
||
f"3. 运行 /audit 审计",
|
||
])
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def main():
|
||
out_dir = get_output_path("SYSTEM_LOG_ROOT")
|
||
print(f"ETL 执行监控启动")
|
||
print(f" 目标 run_uuid: {TARGET_RUN_UUID}")
|
||
print(f" 轮询间隔: {POLL_INTERVAL}s")
|
||
print(f" 输出目录: {out_dir}")
|
||
print()
|
||
|
||
# 获取认证 token — 非交互模式,直接尝试无 token
|
||
token = ""
|
||
token_file = Path(__file__).parent / ".monitor_token"
|
||
if token_file.exists():
|
||
token = token_file.read_text(encoding="utf-8").strip()
|
||
|
||
poll_count = 0
|
||
max_polls = 120 # 最多轮询 60 分钟
|
||
|
||
while poll_count < max_polls:
|
||
poll_count += 1
|
||
now = datetime.now().strftime("%H:%M:%S")
|
||
print(f"[{now}] 轮询 #{poll_count}...", end=" ")
|
||
|
||
execution = poll_execution_status(token)
|
||
|
||
if execution is None:
|
||
print("未获取到执行信息")
|
||
time.sleep(POLL_INTERVAL)
|
||
continue
|
||
|
||
status = execution.get("status", "unknown")
|
||
print(f"状态: {status}")
|
||
|
||
if status in ("success", "failed", "completed", "error", "stopped"):
|
||
print(f"\n执行已完成,状态: {status}")
|
||
|
||
# 尝试获取日志
|
||
log_text = extract_log_from_api(token)
|
||
task_results = parse_task_results_from_log(log_text) if log_text else []
|
||
|
||
# 生成报告
|
||
report = generate_report(execution, task_results)
|
||
out_file = out_dir / "2026-02-21__etl_run_result.md"
|
||
out_file.write_text(report, encoding="utf-8")
|
||
print(f"\n执行结果报告已导出: {out_file}")
|
||
|
||
# 同时保存原始 API 响应
|
||
raw_file = out_dir / "2026-02-21__etl_run_raw.json"
|
||
raw_file.write_text(
|
||
json.dumps(execution, ensure_ascii=False, indent=2, default=str),
|
||
encoding="utf-8",
|
||
)
|
||
print(f"原始数据已导出: {raw_file}")
|
||
return
|
||
|
||
time.sleep(POLL_INTERVAL)
|
||
|
||
print(f"\n超过最大轮询次数 ({max_polls}),退出监控")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|