Files
Neo-ZQYY/scripts/ops/etl_monitor.py
Neo b25308c3f4 feat: P1-P3 全栈集成 — 数据库基础 + DWS 扩展 + 小程序鉴权 + 工程化体系
## P1 数据库基础
- zqyy_app: 创建 auth/biz schema、FDW 连接 etl_feiqiu
- etl_feiqiu: 创建 app schema RLS 视图、商品库存预警表
- 清理 assistant_abolish 残留数据

## P2 ETL/DWS 扩展
- 新增 DWS 助教订单贡献度表 (dws.assistant_order_contribution)
- 新增 assistant_order_contribution_task 任务及 RLS 视图
- member_consumption 增加充值字段、assistant_daily 增加处罚字段
- 更新 ODS/DWD/DWS 任务文档及业务规则文档
- 更新 consistency_checker、flow_runner、task_registry 等核心模块

## P3 小程序鉴权系统
- 新增 xcx_auth 路由/schema(微信登录 + JWT)
- 新增 wechat/role/matching/application 服务层
- zqyy_app 鉴权表迁移 + 角色权限种子数据
- auth/dependencies.py 支持小程序 JWT 鉴权

## 文档与审计
- 新增 DOCUMENTATION-MAP 文档导航
- 新增 7 份 BD_Manual 数据库变更文档
- 更新 DDL 基线快照(etl_feiqiu 6 schema + zqyy_app auth)
- 新增全栈集成审计记录、部署检查清单更新
- 新增 BACKLOG 路线图、FDW→Core 迁移计划

## Kiro 工程化
- 新增 5 个 Spec(P1/P2/P3/全栈集成/核心业务)
- 新增审计自动化脚本(agent_on_stop/build_audit_context/compliance_prescan)
- 新增 6 个 Hook(合规检查/会话日志/提交审计等)
- 新增 doc-map steering 文件

## 运维与测试
- 新增 ops 脚本:迁移验证/API 健康检查/ETL 监控/集成报告
- 新增属性测试:test_dws_contribution / test_auth_system
- 清理过期 export 报告文件
- 更新 .gitignore 排除规则
2026-02-26 08:03:53 +08:00

210 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""ETL 全流程联调监控脚本
每 30 秒轮询执行状态和日志,检测错误/警告,最长等待 30 分钟。
将监控结果输出为 JSON 供后续报告生成使用。
用法: python scripts/ops/etl_monitor.py <execution_id> <jwt_token>
"""
import json
import re
import sys
import time
import urllib.request
from datetime import datetime, timezone
BASE_URL = "http://localhost:8000"
POLL_INTERVAL = 30 # 秒
MAX_IDLE_MINUTES = 30
MAX_IDLE_SECONDS = MAX_IDLE_MINUTES * 60
# 精确匹配真正的错误/警告日志行(排除 JSON 统计中的 'errors': 0 等误报)
ERROR_PATTERN = re.compile(
r"\b(ERROR|CRITICAL)\b.*(?!.*'errors':\s*0)", re.IGNORECASE
)
TRACEBACK_PATTERN = re.compile(r"Traceback \(most recent call last\)")
EXCEPTION_PATTERN = re.compile(r"^\[stderr\].*Exception:", re.IGNORECASE)
WARNING_PATTERN = re.compile(r"\bWARNING\b", re.IGNORECASE)
def api_get(path: str, token: str) -> dict:
req = urllib.request.Request(
f"{BASE_URL}{path}",
headers={"Authorization": f"Bearer {token}"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
def classify_log_lines(lines: list[str]) -> dict:
"""分类日志行,返回错误和警告。"""
errors, warnings = [], []
for i, line in enumerate(lines):
if TRACEBACK_PATTERN.search(line) or EXCEPTION_PATTERN.search(line):
# 收集上下文(前后各 5 行)
ctx_start = max(0, i - 5)
ctx_end = min(len(lines), i + 6)
errors.append({
"line_no": i + 1,
"text": line.strip(),
"context": [l.strip() for l in lines[ctx_start:ctx_end]],
})
elif ERROR_PATTERN.search(line) and "'errors':" not in line:
errors.append({"line_no": i + 1, "text": line.strip(), "context": []})
elif WARNING_PATTERN.search(line) and "'errors':" not in line:
warnings.append({"line_no": i + 1, "text": line.strip()})
return {"errors": errors, "warnings": warnings}
def monitor(execution_id: str, token: str) -> dict:
"""主监控循环。返回完整监控结果。"""
print(f"[监控] 开始监控 execution_id={execution_id}")
print(f"[监控] 轮询间隔={POLL_INTERVAL}s, 最长空闲={MAX_IDLE_MINUTES}min")
start_time = datetime.now(timezone.utc)
last_log_count = 0
last_new_log_time = time.time()
poll_count = 0
all_log_text = ""
final_status = "unknown"
final_exit_code = None
poll_history = []
while True:
poll_count += 1
now = datetime.now(timezone.utc).isoformat()
# 获取日志
try:
logs_data = api_get(f"/api/execution/{execution_id}/logs", token)
except Exception as e:
print(f"[监控] #{poll_count} {now} 日志获取失败: {e}")
time.sleep(POLL_INTERVAL)
continue
log_text = logs_data.get("output_log") or ""
lines = log_text.split("\n") if log_text else []
current_count = len(lines)
new_lines = current_count - last_log_count
if new_lines > 0:
last_new_log_time = time.time()
all_log_text = log_text
# 获取执行状态
try:
hist_data = api_get("/api/execution/history?limit=10", token)
this_exec = next((h for h in hist_data if h["id"] == execution_id), None)
status = this_exec["status"] if this_exec else "unknown"
exit_code = this_exec.get("exit_code") if this_exec else None
duration_ms = this_exec.get("duration_ms") if this_exec else None
except Exception as e:
print(f"[监控] #{poll_count} {now} 状态获取失败: {e}")
status = "unknown"
exit_code = None
duration_ms = None
poll_record = {
"poll": poll_count,
"time": now,
"log_lines": current_count,
"new_lines": new_lines,
"status": status,
}
poll_history.append(poll_record)
# 打印最新几行日志
if new_lines > 0:
recent = lines[last_log_count:current_count]
for line in recent[-3:]:
print(f" {line.strip()}")
print(
f"[监控] #{poll_count} {now} | "
f"日志行={current_count}(+{new_lines}) | 状态={status}"
)
last_log_count = current_count
# 检查完成条件
if status in ("success", "failed", "cancelled"):
final_status = status
final_exit_code = exit_code
print(f"[监控] 任务完成: status={status}, exit_code={exit_code}, duration_ms={duration_ms}")
break
# 检查超时
idle_seconds = time.time() - last_new_log_time
if idle_seconds > MAX_IDLE_SECONDS:
print(f"[监控] 超时警告: {MAX_IDLE_MINUTES}分钟无新日志")
final_status = "timeout_warning"
break
time.sleep(POLL_INTERVAL)
end_time = datetime.now(timezone.utc)
# 分类日志
all_lines = all_log_text.split("\n") if all_log_text else []
classified = classify_log_lines(all_lines)
result = {
"execution_id": execution_id,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"monitor_duration_s": (end_time - start_time).total_seconds(),
"final_status": final_status,
"final_exit_code": final_exit_code,
"total_log_lines": len(all_lines),
"total_polls": poll_count,
"errors": classified["errors"],
"warnings": classified["warnings"],
"error_count": len(classified["errors"]),
"warning_count": len(classified["warnings"]),
"poll_history": poll_history,
"full_log": all_log_text,
}
return result
if __name__ == "__main__":
if len(sys.argv) < 3:
print("用法: python scripts/ops/etl_monitor.py <execution_id> <jwt_token>")
sys.exit(1)
exec_id = sys.argv[1]
jwt = sys.argv[2]
result = monitor(exec_id, jwt)
# 输出结果到 JSON 文件(临时位置,后续报告脚本读取)
import os
from pathlib import Path
from dotenv import load_dotenv
# 加载环境变量
root_env = Path(__file__).resolve().parent.parent.parent / ".env"
load_dotenv(root_env)
log_root = os.environ.get("SYSTEM_LOG_ROOT")
if not log_root:
raise RuntimeError("SYSTEM_LOG_ROOT 环境变量未设置")
out_dir = Path(log_root)
out_dir.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y-%m-%d")
out_path = out_dir / f"{date_str}__etl_monitor_result.json"
with open(out_path, "w", encoding="utf-8") as f:
# 不输出完整日志到 JSON太大单独保存
result_slim = {k: v for k, v in result.items() if k != "full_log"}
json.dump(result_slim, f, ensure_ascii=False, indent=2)
log_path = out_dir / f"{date_str}__etl_full_log.txt"
with open(log_path, "w", encoding="utf-8") as f:
f.write(result["full_log"])
print(f"[监控] 结果已保存: {out_path}")
print(f"[监控] 完整日志已保存: {log_path}")
print(f"[监控] 最终状态: {result['final_status']}, 错误数: {result['error_count']}, 警告数: {result['warning_count']}")