206 lines
7.0 KiB
Python
206 lines
7.0 KiB
Python
"""
|
||
ETL 执行监控脚本 — 轮询 API 日志,检测 ERROR/WARNING,等待任务完成。
|
||
用法: python scripts/ops/_etl_monitor.py <execution_id> [--interval 30]
|
||
"""
|
||
import sys, time, re, json, os
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
import requests
|
||
|
||
API_BASE = "http://localhost:8000"
|
||
USERNAME = "admin"
|
||
PASSWORD = "admin123"
|
||
|
||
# 关键字检测
|
||
ERROR_KEYWORDS = re.compile(r"\b(ERROR|CRITICAL)\b|Traceback|Exception", re.IGNORECASE)
|
||
WARNING_KEYWORDS = re.compile(r"\bWARNING\b", re.IGNORECASE)
|
||
|
||
|
||
def login() -> str:
|
||
r = requests.post(f"{API_BASE}/api/auth/login", json={"username": USERNAME, "password": PASSWORD})
|
||
r.raise_for_status()
|
||
return r.json()["access_token"]
|
||
|
||
|
||
def get_logs(token: str, eid: str) -> dict:
|
||
r = requests.get(f"{API_BASE}/api/execution/{eid}/logs", headers={"Authorization": f"Bearer {token}"})
|
||
r.raise_for_status()
|
||
return r.json()
|
||
|
||
|
||
def get_history(token: str) -> list:
|
||
r = requests.get(f"{API_BASE}/api/execution/history", headers={"Authorization": f"Bearer {token}"})
|
||
r.raise_for_status()
|
||
return r.json()
|
||
|
||
|
||
def find_execution_status(history: list, eid: str) -> dict | None:
|
||
for item in history:
|
||
# API 返回 "id" 字段
|
||
if item.get("id") == eid or item.get("execution_id") == eid:
|
||
return item
|
||
return None
|
||
|
||
|
||
def scan_log_lines(log_text: str, seen_count: int) -> tuple[list, list, int]:
|
||
"""扫描日志行,返回 (errors, warnings, new_seen_count)"""
|
||
lines = log_text.split("\n") if log_text else []
|
||
errors = []
|
||
warnings = []
|
||
for i, line in enumerate(lines):
|
||
if i < seen_count:
|
||
continue
|
||
if ERROR_KEYWORDS.search(line):
|
||
errors.append(line.strip())
|
||
elif WARNING_KEYWORDS.search(line):
|
||
warnings.append(line.strip())
|
||
return errors, warnings, len(lines)
|
||
|
||
|
||
def get_last_timestamp(log_text: str) -> str | None:
|
||
"""提取日志中最后一个时间戳"""
|
||
matches = re.findall(r"\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]", log_text or "")
|
||
return matches[-1] if matches else None
|
||
|
||
|
||
def get_current_task(log_text: str) -> str | None:
|
||
"""提取当前正在执行的任务名"""
|
||
matches = re.findall(r"开始执行(\w+)", log_text or "")
|
||
return matches[-1] if matches else None
|
||
|
||
|
||
def main():
|
||
if len(sys.argv) < 2:
|
||
print("用法: python scripts/ops/_etl_monitor.py <execution_id>")
|
||
sys.exit(1)
|
||
|
||
eid = sys.argv[1]
|
||
interval = int(sys.argv[2]) if len(sys.argv) > 2 else 30
|
||
|
||
print(f"[监控] execution_id={eid}, 轮询间隔={interval}s")
|
||
print(f"[监控] 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||
print("-" * 60)
|
||
|
||
token = login()
|
||
print("[监控] 登录成功")
|
||
|
||
all_errors: list[str] = []
|
||
all_warnings: list[str] = []
|
||
seen_lines = 0
|
||
last_log_time = None
|
||
no_new_log_since = None
|
||
check_count = 0
|
||
|
||
while True:
|
||
check_count += 1
|
||
try:
|
||
# 检查执行状态
|
||
history = get_history(token)
|
||
exec_info = find_execution_status(history, eid)
|
||
|
||
status = exec_info.get("status", "unknown") if exec_info else "unknown"
|
||
|
||
# 获取日志
|
||
log_data = get_logs(token, eid)
|
||
log_text = log_data.get("output_log", "") or ""
|
||
|
||
# 扫描新日志行
|
||
new_errors, new_warnings, seen_lines = scan_log_lines(log_text, seen_lines)
|
||
all_errors.extend(new_errors)
|
||
all_warnings.extend(new_warnings)
|
||
|
||
# 提取当前进度信息
|
||
current_task = get_current_task(log_text)
|
||
last_ts = get_last_timestamp(log_text)
|
||
|
||
# 超时检测
|
||
if last_ts:
|
||
if last_ts != last_log_time:
|
||
last_log_time = last_ts
|
||
no_new_log_since = None
|
||
else:
|
||
if no_new_log_since is None:
|
||
no_new_log_since = datetime.now()
|
||
elapsed = (datetime.now() - no_new_log_since).total_seconds()
|
||
if elapsed > 1800: # 30 分钟
|
||
print(f"[超时警告] 连续 {elapsed/60:.0f} 分钟无新日志输出!")
|
||
|
||
# 输出状态
|
||
log_line_count = len(log_text.split("\n")) if log_text else 0
|
||
print(
|
||
f"[检查 #{check_count}] {datetime.now().strftime('%H:%M:%S')} | "
|
||
f"状态={status} | 日志行={log_line_count} | "
|
||
f"当前任务={current_task or '?'} | "
|
||
f"最后日志={last_ts or '?'} | "
|
||
f"新ERROR={len(new_errors)} 新WARNING={len(new_warnings)}"
|
||
)
|
||
|
||
# 输出新发现的错误/警告
|
||
for e in new_errors:
|
||
print(f" ❌ ERROR: {e[:200]}")
|
||
for w in new_warnings:
|
||
print(f" ⚠️ WARNING: {w[:200]}")
|
||
|
||
# 任务完成检测
|
||
if status in ("success", "failed", "cancelled"):
|
||
print("-" * 60)
|
||
print(f"[完成] 任务状态: {status}")
|
||
if exec_info:
|
||
print(f" 开始时间: {exec_info.get('started_at', '?')}")
|
||
print(f" 结束时间: {exec_info.get('finished_at', '?')}")
|
||
dur_ms = exec_info.get("duration_ms")
|
||
if dur_ms:
|
||
print(f" 时长: {dur_ms/1000:.1f}s ({dur_ms/60000:.1f}m)")
|
||
print(f" 退出码: {exec_info.get('exit_code', '?')}")
|
||
break
|
||
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"[网络错误] {e}")
|
||
# token 可能过期,重新登录
|
||
try:
|
||
token = login()
|
||
print("[监控] 重新登录成功")
|
||
except Exception:
|
||
pass
|
||
|
||
time.sleep(interval)
|
||
|
||
# 最终汇总
|
||
print("\n" + "=" * 60)
|
||
print("监控汇总")
|
||
print("=" * 60)
|
||
print(f"总检查次数: {check_count}")
|
||
print(f"ERROR 总数: {len(all_errors)}")
|
||
print(f"WARNING 总数: {len(all_warnings)}")
|
||
|
||
if all_errors:
|
||
print("\n--- 所有 ERROR ---")
|
||
for i, e in enumerate(all_errors, 1):
|
||
print(f" {i}. {e[:300]}")
|
||
|
||
if all_warnings:
|
||
print("\n--- 所有 WARNING ---")
|
||
for i, w in enumerate(all_warnings, 1):
|
||
print(f" {i}. {w[:300]}")
|
||
|
||
# 输出 JSON 摘要供后续任务使用
|
||
summary = {
|
||
"execution_id": eid,
|
||
"final_status": status,
|
||
"total_checks": check_count,
|
||
"error_count": len(all_errors),
|
||
"warning_count": len(all_warnings),
|
||
"errors": all_errors,
|
||
"warnings": all_warnings,
|
||
"exit_code": exec_info.get("exit_code") if exec_info else None,
|
||
"started_at": exec_info.get("started_at") if exec_info else None,
|
||
"ended_at": exec_info.get("finished_at") if exec_info else None,
|
||
"duration_ms": exec_info.get("duration_ms") if exec_info else None,
|
||
}
|
||
print(f"\n[JSON摘要]\n{json.dumps(summary, ensure_ascii=False, indent=2)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|