包含多个会话的累积代码变更: - backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔 - admin-web: ETL 状态页、任务管理、调度配置、登录优化 - miniprogram: 看板页面、聊天集成、UI 组件、导航更新 - etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强 - tenant-admin: 项目初始化 - db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8) - packages/shared: 枚举和工具函数更新 - tools: 数据库工具、报表生成、健康检查 - docs: PRD/架构/部署/合约文档更新 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
210 lines
7.3 KiB
Python
210 lines
7.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""ETL 全流程联调监控脚本
|
||
|
||
每 30 秒轮询执行状态和日志,检测错误/警告,最长等待 30 分钟。
|
||
将监控结果输出为 JSON 供后续报告生成使用。
|
||
|
||
用法: python scripts/ops/etl_monitor.py <execution_id> <jwt_token>
|
||
"""
|
||
import json
|
||
import re
|
||
import sys
|
||
import time
|
||
import urllib.request
|
||
from datetime import datetime, timezone
|
||
|
||
BASE_URL = "http://localhost:8000"
|
||
POLL_INTERVAL = 30 # 秒
|
||
MAX_IDLE_MINUTES = 30
|
||
MAX_IDLE_SECONDS = MAX_IDLE_MINUTES * 60
|
||
|
||
# 精确匹配真正的错误/警告日志行(排除 JSON 统计中的 'errors': 0 等误报)
|
||
ERROR_PATTERN = re.compile(
|
||
r"\b(ERROR|CRITICAL)\b.*(?!.*'errors':\s*0)", re.IGNORECASE
|
||
)
|
||
TRACEBACK_PATTERN = re.compile(r"Traceback \(most recent call last\)")
|
||
EXCEPTION_PATTERN = re.compile(r"^\[stderr\].*Exception:", re.IGNORECASE)
|
||
WARNING_PATTERN = re.compile(r"\bWARNING\b", re.IGNORECASE)
|
||
|
||
|
||
def api_get(path: str, token: str) -> dict:
|
||
req = urllib.request.Request(
|
||
f"{BASE_URL}{path}",
|
||
headers={"Authorization": f"Bearer {token}"},
|
||
)
|
||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||
return json.loads(resp.read().decode())
|
||
|
||
|
||
def classify_log_lines(lines: list[str]) -> dict:
|
||
"""分类日志行,返回错误和警告。"""
|
||
errors, warnings = [], []
|
||
for i, line in enumerate(lines):
|
||
if TRACEBACK_PATTERN.search(line) or EXCEPTION_PATTERN.search(line):
|
||
# 收集上下文(前后各 5 行)
|
||
ctx_start = max(0, i - 5)
|
||
ctx_end = min(len(lines), i + 6)
|
||
errors.append({
|
||
"line_no": i + 1,
|
||
"text": line.strip(),
|
||
"context": [l.strip() for l in lines[ctx_start:ctx_end]],
|
||
})
|
||
elif ERROR_PATTERN.search(line) and "'errors':" not in line:
|
||
errors.append({"line_no": i + 1, "text": line.strip(), "context": []})
|
||
elif WARNING_PATTERN.search(line) and "'errors':" not in line:
|
||
warnings.append({"line_no": i + 1, "text": line.strip()})
|
||
return {"errors": errors, "warnings": warnings}
|
||
|
||
|
||
def monitor(execution_id: str, token: str) -> dict:
|
||
"""主监控循环。返回完整监控结果。"""
|
||
print(f"[监控] 开始监控 execution_id={execution_id}")
|
||
print(f"[监控] 轮询间隔={POLL_INTERVAL}s, 最长空闲={MAX_IDLE_MINUTES}min")
|
||
|
||
start_time = datetime.now(timezone.utc)
|
||
last_log_count = 0
|
||
last_new_log_time = time.time()
|
||
poll_count = 0
|
||
all_log_text = ""
|
||
final_status = "unknown"
|
||
final_exit_code = None
|
||
poll_history = []
|
||
|
||
while True:
|
||
poll_count += 1
|
||
now = datetime.now(timezone.utc).isoformat()
|
||
|
||
# 获取日志
|
||
try:
|
||
logs_data = api_get(f"/api/execution/{execution_id}/logs", token)
|
||
except Exception as e:
|
||
print(f"[监控] #{poll_count} {now} 日志获取失败: {e}")
|
||
time.sleep(POLL_INTERVAL)
|
||
continue
|
||
|
||
log_text = logs_data.get("output_log") or ""
|
||
lines = log_text.split("\n") if log_text else []
|
||
current_count = len(lines)
|
||
new_lines = current_count - last_log_count
|
||
|
||
if new_lines > 0:
|
||
last_new_log_time = time.time()
|
||
all_log_text = log_text
|
||
|
||
# 获取执行状态
|
||
try:
|
||
hist_data = api_get("/api/execution/history?limit=10", token)
|
||
this_exec = next((h for h in hist_data if h["id"] == execution_id), None)
|
||
status = this_exec["status"] if this_exec else "unknown"
|
||
exit_code = this_exec.get("exit_code") if this_exec else None
|
||
duration_ms = this_exec.get("duration_ms") if this_exec else None
|
||
except Exception as e:
|
||
print(f"[监控] #{poll_count} {now} 状态获取失败: {e}")
|
||
status = "unknown"
|
||
exit_code = None
|
||
duration_ms = None
|
||
|
||
poll_record = {
|
||
"poll": poll_count,
|
||
"time": now,
|
||
"log_lines": current_count,
|
||
"new_lines": new_lines,
|
||
"status": status,
|
||
}
|
||
poll_history.append(poll_record)
|
||
|
||
# 打印最新几行日志
|
||
if new_lines > 0:
|
||
recent = lines[last_log_count:current_count]
|
||
for line in recent[-3:]:
|
||
print(f" {line.strip()}")
|
||
|
||
print(
|
||
f"[监控] #{poll_count} {now} | "
|
||
f"日志行={current_count}(+{new_lines}) | 状态={status}"
|
||
)
|
||
|
||
last_log_count = current_count
|
||
|
||
# 检查完成条件
|
||
if status in ("success", "failed", "cancelled"):
|
||
final_status = status
|
||
final_exit_code = exit_code
|
||
print(f"[监控] 任务完成: status={status}, exit_code={exit_code}, duration_ms={duration_ms}")
|
||
break
|
||
|
||
# 检查超时
|
||
idle_seconds = time.time() - last_new_log_time
|
||
if idle_seconds > MAX_IDLE_SECONDS:
|
||
print(f"[监控] 超时警告: {MAX_IDLE_MINUTES}分钟无新日志")
|
||
final_status = "timeout_warning"
|
||
break
|
||
|
||
time.sleep(POLL_INTERVAL)
|
||
|
||
end_time = datetime.now(timezone.utc)
|
||
|
||
# 分类日志
|
||
all_lines = all_log_text.split("\n") if all_log_text else []
|
||
classified = classify_log_lines(all_lines)
|
||
|
||
result = {
|
||
"execution_id": execution_id,
|
||
"start_time": start_time.isoformat(),
|
||
"end_time": end_time.isoformat(),
|
||
"monitor_duration_s": (end_time - start_time).total_seconds(),
|
||
"final_status": final_status,
|
||
"final_exit_code": final_exit_code,
|
||
"total_log_lines": len(all_lines),
|
||
"total_polls": poll_count,
|
||
"errors": classified["errors"],
|
||
"warnings": classified["warnings"],
|
||
"error_count": len(classified["errors"]),
|
||
"warning_count": len(classified["warnings"]),
|
||
"poll_history": poll_history,
|
||
"full_log": all_log_text,
|
||
}
|
||
|
||
return result
|
||
|
||
|
||
if __name__ == "__main__":
|
||
if len(sys.argv) < 3:
|
||
print("用法: python scripts/ops/etl_monitor.py <execution_id> <jwt_token>")
|
||
sys.exit(1)
|
||
|
||
exec_id = sys.argv[1]
|
||
jwt = sys.argv[2]
|
||
result = monitor(exec_id, jwt)
|
||
|
||
# 输出结果到 JSON 文件(临时位置,后续报告脚本读取)
|
||
import os
|
||
from pathlib import Path
|
||
from dotenv import load_dotenv
|
||
|
||
# 加载环境变量
|
||
root_env = Path(__file__).resolve().parent.parent.parent / ".env"
|
||
load_dotenv(root_env)
|
||
|
||
log_root = os.environ.get("SYSTEM_LOG_ROOT")
|
||
if not log_root:
|
||
raise RuntimeError("SYSTEM_LOG_ROOT 环境变量未设置")
|
||
|
||
out_dir = Path(log_root)
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
date_str = datetime.now().strftime("%Y-%m-%d")
|
||
out_path = out_dir / f"{date_str}__etl_monitor_result.json"
|
||
|
||
with open(out_path, "w", encoding="utf-8") as f:
|
||
# 不输出完整日志到 JSON(太大),单独保存
|
||
result_slim = {k: v for k, v in result.items() if k != "full_log"}
|
||
json.dump(result_slim, f, ensure_ascii=False, indent=2)
|
||
|
||
log_path = out_dir / f"{date_str}__etl_full_log.txt"
|
||
with open(log_path, "w", encoding="utf-8") as f:
|
||
f.write(result["full_log"])
|
||
|
||
print(f"[监控] 结果已保存: {out_path}")
|
||
print(f"[监控] 完整日志已保存: {log_path}")
|
||
print(f"[监控] 最终状态: {result['final_status']}, 错误数: {result['error_count']}, 警告数: {result['warning_count']}")
|