#!/usr/bin/env python3 """ 深度调查 ETL 执行时间线,查明凌晨全量更新后为什么还有数据缺失 """ import os import glob from pathlib import Path from datetime import datetime, timedelta from dotenv import load_dotenv def main(): # 加载环境变量 load_dotenv() log_root = os.environ.get('LOG_ROOT') if not log_root: raise RuntimeError("LOG_ROOT 环境变量未设置") log_dir = Path(log_root) print("🔍 深度调查 ETL 执行时间线") print("=" * 60) # 获取所有日志文件并按修改时间排序 log_files = list(log_dir.glob("*.log")) log_files.sort(key=lambda x: x.stat().st_mtime) print(f"找到 {len(log_files)} 个日志文件") # 分析最近 24 小时的日志 now = datetime.now() yesterday = now - timedelta(hours=24) recent_logs = [] for log_file in log_files: mtime = datetime.fromtimestamp(log_file.stat().st_mtime) if mtime >= yesterday: recent_logs.append((log_file, mtime)) print(f"\n📅 最近 24 小时内的日志文件 ({len(recent_logs)} 个):") for log_file, mtime in recent_logs: print(f" {mtime.strftime('%Y-%m-%d %H:%M:%S')} - {log_file.name}") # 分析每个日志文件的关键信息 print(f"\n🔍 详细分析最近的日志:") for i, (log_file, mtime) in enumerate(recent_logs[-5:]): # 最近5个 print(f"\n--- 日志 {i+1}: {log_file.name} ---") print(f"修改时间: {mtime.strftime('%Y-%m-%d %H:%M:%S')}") try: with open(log_file, 'r', encoding='utf-8') as f: content = f.read() lines = content.split('\n') # 查找关键信息 start_time = None end_time = None flow_type = None window_info = None dwd_info = None errors = [] for line in lines: if '开始运行任务' in line and 'run_uuid' in line: start_time = line.split(']')[0].replace('[', '') if 'api_full' in line: flow_type = 'api_full (全量)' elif 'api_ods_dwd' in line: flow_type = 'api_ods_dwd (增量)' else: flow_type = '未知' if 'ETL运行完成' in line: end_time = line.split(']')[0].replace('[', '') if 'force_full' in line or '时间窗口' in line: window_info = line.strip() if 'DWD_LOAD_FROM_ODS' in line and ('开始' in line or '完成' in line): dwd_info = line.strip() if 'ERROR' in line or '失败' in line: errors.append(line.strip()) print(f" 执行类型: {flow_type or '未知'}") print(f" 开始时间: {start_time or '未找到'}") print(f" 结束时间: {end_time or '未找到'}") if window_info: print(f" 窗口信息: {window_info}") if dwd_info: print(f" DWD 处理: {dwd_info}") if errors: print(f" 错误数量: {len(errors)}") for error in errors[:3]: # 只显示前3个错误 print(f" - {error}") # 计算执行时长 if start_time and end_time: try: start_dt = datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') end_dt = datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') duration = end_dt - start_dt print(f" 执行时长: {duration}") except: pass except Exception as e: print(f" 读取失败: {e}") # 特别关注今天凌晨的执行 print(f"\n🌅 今天凌晨执行分析:") today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) morning_end = now.replace(hour=8, minute=0, second=0, microsecond=0) morning_logs = [] for log_file, mtime in recent_logs: if today_start <= mtime <= morning_end: morning_logs.append((log_file, mtime)) if morning_logs: print(f"找到 {len(morning_logs)} 个凌晨执行的日志:") for log_file, mtime in morning_logs: print(f" {mtime.strftime('%H:%M:%S')} - {log_file.name}") # 检查是否包含 DWD 处理 try: with open(log_file, 'r', encoding='utf-8') as f: content = f.read() if 'DWD_LOAD_FROM_ODS' in content: print(f" ✅ 包含 DWD 处理") # 检查处理的时间范围 lines = content.split('\n') for line in lines: if '时间窗口' in line or 'window' in line.lower(): print(f" 📅 {line.strip()}") else: print(f" ❌ 未包含 DWD 处理") except Exception as e: print(f" 读取失败: {e}") else: print("❌ 未找到今天凌晨的 ETL 执行记录!") print("这可能解释了为什么有数据缺失。") if __name__ == "__main__": main()