Files
Neo-ZQYY/scripts/ops/_rerun_failed_tasks.py

286 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
单独重跑集成测试中失败的 DWS/INDEX 任务,验证 bugfix 效果。
使用与集成测试相同的参数:
- flow: api_full但只跑 DWS/INDEX 层)
- processing_mode: full_window
- window: 2025-11-01 ~ 2026-02-26
- window_split_days: 30
- force_full: True
通过后端 API 提交,与集成测试路径一致。
"""
import os
import sys
import json
import time
import requests
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from zoneinfo import ZoneInfo
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
TZ = ZoneInfo("Asia/Shanghai")
BASE_URL = "http://localhost:8000"
# 之前失败的任务DWS_MEMBER_VISIT 是根因,其余为级联失败)
FAILED_TASKS = [
# 根因任务
"DWS_MEMBER_VISIT",
"DWS_MEMBER_CONSUMPTION", # _extract_card_balances 也有同样 bug需验证
# 级联失败的 DWS 任务
"DWS_FINANCE_DAILY",
"DWS_FINANCE_RECHARGE",
"DWS_FINANCE_INCOME_STRUCTURE",
"DWS_FINANCE_DISCOUNT_DETAIL",
"DWS_ASSISTANT_MONTHLY",
"DWS_ASSISTANT_FINANCE",
# INDEX 层(依赖 DWS
"DWS_WINBACK_INDEX",
"DWS_NEWCONV_INDEX",
"DWS_RELATION_INDEX",
"DWS_SPENDING_POWER_INDEX",
]
def login() -> str:
"""登录获取 JWT"""
resp = requests.post(f"{BASE_URL}/api/auth/login", json={
"username": "admin",
"password": "admin123",
})
resp.raise_for_status()
return resp.json()["access_token"]
def submit_task(token: str, tasks: list[str]) -> dict:
"""提交 ETL 任务"""
headers = {"Authorization": f"Bearer {token}"}
payload = {
"flow": "api_full",
"processing_mode": "full_window",
"window_mode": "custom",
"window_start": "2025-11-01 00:00",
"window_end": "2026-02-26 23:59",
"window_split": "day",
"window_split_days": 30,
"force_full": True,
"dry_run": False,
"tasks": tasks,
}
resp = requests.post(f"{BASE_URL}/api/execution/run", json=payload, headers=headers)
resp.raise_for_status()
return resp.json()
def poll_execution(token: str, execution_id: str, timeout_minutes: int = 60) -> dict:
"""轮询执行状态"""
headers = {"Authorization": f"Bearer {token}"}
start = time.time()
last_log_count = 0
while True:
elapsed = time.time() - start
if elapsed > timeout_minutes * 60:
print(f"\n超时({timeout_minutes}分钟),停止等待")
return {"status": "timeout"}
try:
# 查询状态
resp = requests.get(f"{BASE_URL}/api/execution/queue", headers=headers)
resp.raise_for_status()
queue = resp.json()
current = None
for item in queue.get("items", []):
if item.get("execution_id") == execution_id:
current = item
break
if current is None:
# 可能已完成,查历史
resp2 = requests.get(
f"{BASE_URL}/api/execution/{execution_id}/logs",
headers=headers, params={"offset": 0, "limit": 5000}
)
if resp2.status_code == 200:
logs_data = resp2.json()
logs = logs_data.get("logs", [])
# 打印新日志
for log in logs[last_log_count:]:
ts = log.get("timestamp", "")
msg = log.get("message", "")
level = log.get("level", "INFO")
if level in ("ERROR", "CRITICAL"):
print(f" ❌ [{ts}] {msg}")
elif level == "WARNING":
print(f" ⚠️ [{ts}] {msg}")
elif "成功" in msg or "完成" in msg or "SUCCESS" in msg.upper():
print(f" ✅ [{ts}] {msg}")
else:
print(f" [{ts}] {msg}")
last_log_count = len(logs)
print(f"\n执行已结束({elapsed:.0f}s")
return {"status": "completed", "elapsed": elapsed}
status = current.get("status", "unknown")
progress = current.get("progress", "")
mins = int(elapsed) // 60
secs = int(elapsed) % 60
print(f"\r [{mins:02d}:{secs:02d}] 状态={status} {progress}", end="", flush=True)
# 获取日志
resp3 = requests.get(
f"{BASE_URL}/api/execution/{execution_id}/logs",
headers=headers, params={"offset": last_log_count, "limit": 200}
)
if resp3.status_code == 200:
logs_data = resp3.json()
logs = logs_data.get("logs", [])
for log in logs:
ts = log.get("timestamp", "")
msg = log.get("message", "")
level = log.get("level", "INFO")
if level in ("ERROR", "CRITICAL"):
print(f"\n ❌ [{ts}] {msg}")
elif level == "WARNING":
print(f"\n ⚠️ [{ts}] {msg}")
last_log_count += len(logs)
if status in ("completed", "failed", "cancelled"):
exit_code = current.get("exit_code")
print(f"\n执行结束: status={status}, exit_code={exit_code}, 耗时={elapsed:.0f}s")
return {"status": status, "exit_code": exit_code, "elapsed": elapsed}
except requests.RequestException as e:
print(f"\n 请求异常: {e}")
time.sleep(15)
def get_final_logs(token: str, execution_id: str) -> list[dict]:
"""获取完整日志"""
headers = {"Authorization": f"Bearer {token}"}
resp = requests.get(
f"{BASE_URL}/api/execution/{execution_id}/logs",
headers=headers, params={"offset": 0, "limit": 10000}
)
if resp.status_code == 200:
return resp.json().get("logs", [])
return []
def analyze_logs(logs: list[dict]) -> dict:
"""分析日志,提取任务结果"""
errors = []
warnings = []
task_results = {}
for log in logs:
msg = log.get("message", "")
level = log.get("level", "INFO")
if level in ("ERROR", "CRITICAL"):
errors.append(msg)
elif level == "WARNING":
warnings.append(msg)
# 解析任务结果
if "任务完成" in msg or "SUCCESS" in msg.upper():
for task in FAILED_TASKS:
if task in msg:
task_results[task] = "SUCCESS"
if "失败" in msg or "FAILED" in msg.upper() or "ERROR" in msg.upper():
for task in FAILED_TASKS:
if task in msg:
task_results[task] = "FAILED"
return {
"errors": errors,
"warnings": warnings,
"task_results": task_results,
}
def main():
now = datetime.now(TZ)
print(f"=== 失败任务重跑验证 ===")
print(f"时间: {now.isoformat()}")
print(f"任务数: {len(FAILED_TASKS)}")
print(f"任务列表: {', '.join(FAILED_TASKS)}")
print()
# 1. 检查后端是否在线
try:
resp = requests.get(f"{BASE_URL}/api/health", timeout=5)
print(f"后端状态: {resp.status_code}")
except requests.RequestException:
print("❌ 后端未启动,请先启动后端服务")
print(" cd apps/backend && uvicorn app.main:app --reload --port 8000")
sys.exit(1)
# 2. 登录
print("登录中...")
token = login()
print(f"登录成功")
# 3. 提交任务
print(f"\n提交 {len(FAILED_TASKS)} 个失败任务重跑...")
result = submit_task(token, FAILED_TASKS)
execution_id = result.get("execution_id")
print(f"execution_id: {execution_id}")
# 4. 监控执行
print(f"\n开始监控执行...")
poll_result = poll_execution(token, execution_id, timeout_minutes=60)
# 5. 获取完整日志并分析
print(f"\n获取完整日志...")
logs = get_final_logs(token, execution_id)
print(f"日志行数: {len(logs)}")
analysis = analyze_logs(logs)
# 6. 输出结果
print(f"\n{'='*60}")
print(f"=== 重跑结果 ===")
print(f"{'='*60}")
print(f"执行状态: {poll_result.get('status')}")
print(f"退出码: {poll_result.get('exit_code', 'N/A')}")
print(f"耗时: {poll_result.get('elapsed', 0):.0f}s")
print(f"错误数: {len(analysis['errors'])}")
print(f"警告数: {len(analysis['warnings'])}")
print(f"\n--- 任务级结果 ---")
for task in FAILED_TASKS:
status = analysis['task_results'].get(task, "未检测到")
icon = "" if status == "SUCCESS" else "" if status == "FAILED" else ""
print(f" {icon} {task}: {status}")
if analysis['errors']:
print(f"\n--- 错误详情 ---")
for i, err in enumerate(analysis['errors'][:20], 1):
print(f" {i}. {err[:200]}")
if analysis['warnings']:
print(f"\n--- 警告详情前10条---")
for i, warn in enumerate(analysis['warnings'][:10], 1):
print(f" {i}. {warn[:200]}")
# 7. 保存日志到文件
log_root = os.environ.get("SYSTEM_LOG_ROOT")
if log_root:
log_dir = Path(log_root)
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"{now.strftime('%Y%m%d')}_rerun_failed_tasks.json"
with open(log_file, "w", encoding="utf-8") as f:
json.dump({
"execution_id": execution_id,
"tasks": FAILED_TASKS,
"poll_result": poll_result,
"analysis": analysis,
"log_count": len(logs),
}, f, ensure_ascii=False, indent=2, default=str)
print(f"\n日志已保存: {log_file}")
if __name__ == "__main__":
main()