"""从 ETL 日志中提取 DWS_MEMBER_VISIT 错误上下文""" import os, re from pathlib import Path from dotenv import load_dotenv load_dotenv(Path(__file__).resolve().parents[2] / ".env") LOG_ROOT = os.environ.get("LOG_ROOT") if not LOG_ROOT: raise RuntimeError("LOG_ROOT 未设置") # 找到最近的日志文件 log_dir = Path(LOG_ROOT) log_file = None for f in sorted(log_dir.glob("*.log"), key=lambda x: x.stat().st_mtime, reverse=True): log_file = f break if not log_file: raise RuntimeError(f"在 {LOG_ROOT} 中未找到日志文件") print(f"日志文件: {log_file}") print(f"大小: {log_file.stat().st_size / 1024:.1f} KB") print("=" * 60) # 读取日志,找 DWS_MEMBER_VISIT 相关行 with open(log_file, 'r', encoding='utf-8') as f: lines = f.readlines() # 找到 MEMBER_VISIT 相关的所有行 visit_lines = [] capture = False for i, line in enumerate(lines): if 'MEMBER_VISIT' in line or 'member_visit' in line: # 取前后 3 行上下文 start = max(0, i - 3) end = min(len(lines), i + 4) visit_lines.append((i, lines[start:end])) if 'uk_dws_member_visit' in line: start = max(0, i - 5) end = min(len(lines), i + 6) visit_lines.append((i, lines[start:end])) print(f"找到 {len(visit_lines)} 处 MEMBER_VISIT 相关日志") print() seen = set() for line_no, context in visit_lines: if line_no in seen: continue seen.add(line_no) print(f"--- 行 {line_no + 1} ---") for l in context: print(l.rstrip()) print() # 额外:检查是否有 DETAIL 行(PostgreSQL 错误详情) print("=" * 60) print("搜索 PostgreSQL 错误详情:") for i, line in enumerate(lines): if 'DETAIL' in line and 'member_visit' in lines[max(0, i-5):i+1].__repr__(): print(f"行 {i+1}: {line.rstrip()}") if '重复键' in line or 'duplicate key' in line.lower(): print(f"行 {i+1}: {line.rstrip()}") # 检查窗口切片信息 print() print("=" * 60) print("DWS_MEMBER_VISIT 窗口切片:") for i, line in enumerate(lines): if 'DWS_MEMBER_VISIT' in line and ('窗口' in line or 'window' in line.lower() or '切片' in line or '日期范围' in line): print(f"行 {i+1}: {line.rstrip()}") print("\n诊断完成。")