# -*- coding: utf-8 -*- """ 监控当前 ETL 执行状态,完成后导出执行结果报告到 SYSTEM_LOG_ROOT。 通过后端 API 轮询执行历史,检测 run_uuid 对应的执行是否完成。 完成后从浏览器日志或 API 提取任务级结果,生成 Markdown 报告。 用法:python scripts/ops/monitor_etl_run.py """ from __future__ import annotations import json import sys import time from datetime import datetime from pathlib import Path import requests sys.path.insert(0, str(Path(__file__).parent)) from _env_paths import get_output_path BACKEND_URL = "http://localhost:8000" TARGET_RUN_UUID = "4ba9d2d365ee4a858f1c4104b1942dc2" POLL_INTERVAL = 30 # 秒 def get_auth_token() -> str: """从后端登录获取 JWT token(使用测试账号)""" # 尝试读取已有 token token_file = Path(__file__).parent / ".monitor_token" if token_file.exists(): token = token_file.read_text(encoding="utf-8").strip() # 验证 token 是否有效 try: r = requests.get( f"{BACKEND_URL}/api/execution/history", headers={"Authorization": f"Bearer {token}"}, params={"limit": 1}, timeout=5, ) if r.status_code == 200: return token except Exception: pass # token 无效,需要重新登录 print("需要登录后端获取 token。请在浏览器中登录后,") print("从浏览器 DevTools > Application > Local Storage 中复制 token,") print("或直接输入(留空跳过,使用无认证模式):") token = input("JWT Token: ").strip() if token: token_file.write_text(token, encoding="utf-8") return token def poll_execution_status(token: str) -> dict | None: """轮询执行状态""" headers = {} if token: headers["Authorization"] = f"Bearer {token}" try: r = requests.get( f"{BACKEND_URL}/api/execution/history", headers=headers, params={"limit": 5}, timeout=10, ) if r.status_code != 200: print(f" API 返回 {r.status_code}: {r.text[:200]}") return None data = r.json() items = data if isinstance(data, list) else data.get("items", data.get("data", [])) for item in items: if item.get("run_uuid") == TARGET_RUN_UUID: return item # 没找到精确匹配,返回最新的 if items: return items[0] return None except requests.exceptions.ConnectionError: print(" 后端连接失败,可能已停止") return None except Exception as e: print(f" API 请求异常: {e}") return None def extract_log_from_api(token: str) -> str | None: """尝试从 API 获取执行日志""" headers = {} if token: headers["Authorization"] = f"Bearer {token}" try: # 尝试获取日志 r = requests.get( f"{BACKEND_URL}/api/execution/log/{TARGET_RUN_UUID}", headers=headers, timeout=30, ) if r.status_code == 200: return r.text except Exception: pass return None def parse_task_results_from_log(log_text: str) -> list[dict]: """从日志文本解析各任务的执行结果""" results = [] lines = log_text.split("\n") if log_text else [] current_task = None task_start_time = None for line in lines: # 检测任务开始 if "开始执行" in line and "ODS" in line or "DWS" in line or "DWD" in line: # 提取时间戳 ts = extract_timestamp(line) # 提取任务名 for token in line.split(): if token.startswith("ODS_") or token.startswith("DWS_") or token.startswith("DWD_"): task_name = token.rstrip(":") current_task = task_name task_start_time = ts break # 检测任务完成 if current_task and "任务完成" in line and current_task in line: ts = extract_timestamp(line) # 提取统计信息 stats = extract_stats(line) results.append({ "task": current_task, "status": "success", "start": task_start_time, "end": ts, "stats": stats, }) current_task = None # 检测任务失败 if "任务" in line and "失败" in line: ts = extract_timestamp(line) for token in line.split(): if token.startswith("ODS_") or token.startswith("DWS_") or token.startswith("DWD_"): task_name = token.rstrip(":") # 提取错误信息 error_msg = line.split("失败:")[-1].strip() if "失败:" in line else "未知错误" results.append({ "task": task_name, "status": "failed", "start": task_start_time if current_task == task_name else ts, "end": ts, "error": error_msg, }) if current_task == task_name: current_task = None break return results def extract_timestamp(line: str) -> str: """从日志行提取时间戳""" # 格式: [2026-02-21 15:29:21] if "[" in line and "]" in line: start = line.index("[") + 1 end = line.index("]", start) return line[start:end] return "" def extract_stats(line: str) -> str: """从日志行提取统计信息""" if "{" in line and "}" in line: start = line.index("{") end = line.index("}") + 1 return line[start:end] return "" def generate_report(execution: dict, task_results: list[dict]) -> str: """生成执行结果 Markdown 报告""" now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") status = execution.get("status", "unknown") start_time = execution.get("started_at", execution.get("start_time", "—")) end_time = execution.get("ended_at", execution.get("end_time", "—")) duration = execution.get("duration", "—") exit_code = execution.get("exit_code", "—") lines = [ f"# ETL 执行结果报告", f"", f"> 生成时间:{now}", f"> run_uuid:{TARGET_RUN_UUID}", f"", f"---", f"", f"## 执行概览", f"", f"| 项目 | 值 |", f"|------|-----|", f"| 状态 | {status} |", f"| 开始时间 | {start_time} |", f"| 结束时间 | {end_time} |", f"| 时长 | {duration} |", f"| 退出码 | {exit_code} |", f"", ] # 任务级结果 if task_results: success_count = sum(1 for r in task_results if r["status"] == "success") failed_count = sum(1 for r in task_results if r["status"] == "failed") lines.extend([ f"## 任务级结果", f"", f"成功:{success_count} | 失败:{failed_count} | 总计:{len(task_results)}", f"", f"| # | 任务 | 状态 | 开始 | 结束 | 备注 |", f"|---|------|------|------|------|------|", ]) for i, r in enumerate(task_results, 1): note = r.get("stats", r.get("error", "")) if len(note) > 80: note = note[:77] + "..." lines.append( f"| {i} | {r['task']} | {r['status']} | {r.get('start', '—')} | {r.get('end', '—')} | {note} |" ) lines.append("") # 已知问题 lines.extend([ f"## 已知问题", f"", f"### DWS_ASSISTANT_DAILY 字段引用错误(已修复)", f"", f"根因:`_extract_trash_records` SQL 引用了 `dwd_assistant_trash_event` 中不存在的字段。", f"级联影响:9 个任务失败(DWS_ASSISTANT_DAILY 及其下游 + ODS_SETTLEMENT_RECORDS/PAYMENT/REFUND/BUILD_ORDER_SUMMARY)。", f"修复状态:代码已修复,待下次执行验证。", f"详见:`export/SYSTEM/LOGS/2026-02-21__dws_assistant_daily_bug_fix.md`", f"", f"---", f"", f"## 下一步", f"", f"1. 重新提交包含失败任务的执行,验证修复", f"2. 运行 ETL Data Consistency Check", f"3. 运行 /audit 审计", ]) return "\n".join(lines) def main(): out_dir = get_output_path("SYSTEM_LOG_ROOT") print(f"ETL 执行监控启动") print(f" 目标 run_uuid: {TARGET_RUN_UUID}") print(f" 轮询间隔: {POLL_INTERVAL}s") print(f" 输出目录: {out_dir}") print() # 获取认证 token — 非交互模式,直接尝试无 token token = "" token_file = Path(__file__).parent / ".monitor_token" if token_file.exists(): token = token_file.read_text(encoding="utf-8").strip() poll_count = 0 max_polls = 120 # 最多轮询 60 分钟 while poll_count < max_polls: poll_count += 1 now = datetime.now().strftime("%H:%M:%S") print(f"[{now}] 轮询 #{poll_count}...", end=" ") execution = poll_execution_status(token) if execution is None: print("未获取到执行信息") time.sleep(POLL_INTERVAL) continue status = execution.get("status", "unknown") print(f"状态: {status}") if status in ("success", "failed", "completed", "error", "stopped"): print(f"\n执行已完成,状态: {status}") # 尝试获取日志 log_text = extract_log_from_api(token) task_results = parse_task_results_from_log(log_text) if log_text else [] # 生成报告 report = generate_report(execution, task_results) out_file = out_dir / "2026-02-21__etl_run_result.md" out_file.write_text(report, encoding="utf-8") print(f"\n执行结果报告已导出: {out_file}") # 同时保存原始 API 响应 raw_file = out_dir / "2026-02-21__etl_run_raw.json" raw_file.write_text( json.dumps(execution, ensure_ascii=False, indent=2, default=str), encoding="utf-8", ) print(f"原始数据已导出: {raw_file}") return time.sleep(POLL_INTERVAL) print(f"\n超过最大轮询次数 ({max_polls}),退出监控") if __name__ == "__main__": main()