#!/usr/bin/env python3 """ 分析为什么凌晨 ETL 执行不完整,以及深度排查 SPI 数据 """ import os import psycopg2 from datetime import datetime, timedelta from dotenv import load_dotenv def main(): # 加载环境变量 load_dotenv() test_db_dsn = os.environ.get('TEST_DB_DSN') system_log_root = os.environ.get('SYSTEM_LOG_ROOT') if not test_db_dsn or not system_log_root: raise RuntimeError("环境变量未设置") print("🔍 深度分析 ETL 执行问题和 SPI 数据完整性") print("=" * 60) with psycopg2.connect(test_db_dsn) as conn: with conn.cursor() as cur: # 1. 分析 ODS 数据的时间分布 print("\n📊 1. ODS 数据时间分布分析") cur.execute(""" SELECT paytime::date as pay_date, COUNT(*) as record_count, MIN(paytime) as earliest_time, MAX(paytime) as latest_time FROM ods.settlement_records WHERE paytime >= '2026-02-01' GROUP BY paytime::date ORDER BY pay_date DESC LIMIT 15 """) ods_data = cur.fetchall() print("ODS 最近 15 天数据分布:") for pay_date, count, earliest, latest in ods_data: print(f" {pay_date}: {count:,} 条 ({earliest.strftime('%H:%M')} - {latest.strftime('%H:%M')})") # 2. 分析 DWD 数据的时间分布 print("\n📊 2. DWD 数据时间分布分析") cur.execute(""" SELECT pay_time::date as pay_date, COUNT(*) as record_count, MIN(pay_time) as earliest_time, MAX(pay_time) as latest_time FROM dwd.dwd_settlement_head WHERE pay_time >= '2026-02-01' GROUP BY pay_time::date ORDER BY pay_date DESC LIMIT 15 """) dwd_data = cur.fetchall() print("DWD 最近 15 天数据分布:") for pay_date, count, earliest, latest in dwd_data: print(f" {pay_date}: {count:,} 条 ({earliest.strftime('%H:%M')} - {latest.strftime('%H:%M')})") # 3. 对比 ODS 和 DWD 的差异 print("\n🔄 3. ODS vs DWD 数据差异分析") ods_dict = {str(row[0]): row[1] for row in ods_data} dwd_dict = {str(row[0]): row[1] for row in dwd_data} print("日期对比 (ODS vs DWD):") all_dates = set(ods_dict.keys()) | set(dwd_dict.keys()) for date in sorted(all_dates, reverse=True)[:10]: ods_count = ods_dict.get(date, 0) dwd_count = dwd_dict.get(date, 0) diff = ods_count - dwd_count status = "✅" if diff == 0 else f"❌ 缺失 {diff}" print(f" {date}: ODS={ods_count:,}, DWD={dwd_count:,} {status}") # 4. 深度分析 SPI 消费数据 print("\n💰 4. SPI 消费数据深度分析") # 检查会员近期消费分布 cur.execute(""" SELECT CASE WHEN pay_time >= CURRENT_DATE - INTERVAL '30 days' THEN '近30天' WHEN pay_time >= CURRENT_DATE - INTERVAL '90 days' THEN '31-90天' ELSE '90天前' END as period, COUNT(DISTINCT member_id) as member_count, COUNT(*) as order_count, SUM(pay_amount) as total_amount, AVG(pay_amount) as avg_amount, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY pay_amount) as median_amount FROM dwd.dwd_settlement_head WHERE member_id > 0 -- 排除非会员 GROUP BY CASE WHEN pay_time >= CURRENT_DATE - INTERVAL '30 days' THEN '近30天' WHEN pay_time >= CURRENT_DATE - INTERVAL '90 days' THEN '31-90天' ELSE '90天前' END ORDER BY period """) consumption_data = cur.fetchall() print("会员消费时间分布:") for period, member_count, order_count, total_amount, avg_amount, median_amount in consumption_data: print(f" {period}: {member_count:,} 会员, {order_count:,} 订单, 总额 {total_amount:,.2f}, 平均 {avg_amount:.2f}, 中位数 {median_amount:.2f}") # 5. 分析会员消费活跃度 print("\n👥 5. 会员消费活跃度分析") cur.execute(""" WITH member_stats AS ( SELECT member_id, COUNT(*) as order_count_30d, SUM(pay_amount) as total_amount_30d, MAX(pay_time) as last_consume_time FROM dwd.dwd_settlement_head WHERE member_id > 0 AND pay_time >= CURRENT_DATE - INTERVAL '30 days' GROUP BY member_id ), member_stats_90d AS ( SELECT member_id, COUNT(*) as order_count_90d, SUM(pay_amount) as total_amount_90d FROM dwd.dwd_settlement_head WHERE member_id > 0 AND pay_time >= CURRENT_DATE - INTERVAL '90 days' GROUP BY member_id ) SELECT CASE WHEN m30.total_amount_30d > 0 THEN '30天活跃' WHEN m90.total_amount_90d > 0 THEN '仅90天活跃' ELSE '非活跃' END as activity_level, COUNT(*) as member_count, AVG(COALESCE(m30.total_amount_30d, 0)) as avg_amount_30d, AVG(COALESCE(m90.total_amount_90d, 0)) as avg_amount_90d FROM (SELECT DISTINCT member_id FROM dwd.dwd_settlement_head WHERE member_id > 0) all_members LEFT JOIN member_stats m30 ON all_members.member_id = m30.member_id LEFT JOIN member_stats_90d m90 ON all_members.member_id = m90.member_id GROUP BY CASE WHEN m30.total_amount_30d > 0 THEN '30天活跃' WHEN m90.total_amount_90d > 0 THEN '仅90天活跃' ELSE '非活跃' END ORDER BY member_count DESC """) activity_data = cur.fetchall() print("会员活跃度分布:") total_members = sum(row[1] for row in activity_data) for activity_level, member_count, avg_30d, avg_90d in activity_data: percentage = (member_count / total_members) * 100 print(f" {activity_level}: {member_count:,} 人 ({percentage:.1f}%), 30天均消费 {avg_30d:.2f}, 90天均消费 {avg_90d:.2f}") # 6. 检查是否有数据被意外过滤 print("\n🔍 6. 数据过滤检查") # 检查是否有大量零消费订单 cur.execute(""" SELECT CASE WHEN pay_amount = 0 THEN '零消费' WHEN pay_amount > 0 AND pay_amount <= 50 THEN '小额(≤50)' WHEN pay_amount > 50 AND pay_amount <= 200 THEN '中额(50-200)' WHEN pay_amount > 200 THEN '大额(>200)' END as amount_range, COUNT(*) as order_count, COUNT(DISTINCT member_id) as member_count FROM dwd.dwd_settlement_head WHERE pay_time >= CURRENT_DATE - INTERVAL '90 days' AND member_id > 0 GROUP BY CASE WHEN pay_amount = 0 THEN '零消费' WHEN pay_amount > 0 AND pay_amount <= 50 THEN '小额(≤50)' WHEN pay_amount > 50 AND pay_amount <= 200 THEN '中额(50-200)' WHEN pay_amount > 200 THEN '大额(>200)' END ORDER BY order_count DESC """) amount_ranges = cur.fetchall() print("90天内消费金额分布:") for amount_range, order_count, member_count in amount_ranges: print(f" {amount_range}: {order_count:,} 订单, {member_count:,} 会员") # 7. 检查春节期间的具体数据 print("\n🎊 7. 春节期间数据详细检查") cur.execute(""" SELECT pay_time::date as pay_date, COUNT(*) as order_count, COUNT(DISTINCT member_id) as member_count, SUM(pay_amount) as total_amount, AVG(pay_amount) as avg_amount FROM dwd.dwd_settlement_head WHERE pay_time::date BETWEEN '2026-02-10' AND '2026-02-28' AND member_id > 0 GROUP BY pay_time::date ORDER BY pay_date """) spring_festival_data = cur.fetchall() print("春节期间每日数据:") for pay_date, order_count, member_count, total_amount, avg_amount in spring_festival_data: if order_count > 0: print(f" {pay_date}: {order_count:,} 订单, {member_count:,} 会员, 总额 {total_amount:,.2f}, 均额 {avg_amount:.2f}") else: print(f" {pay_date}: 无数据") # 生成分析报告 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_path = f"{system_log_root}/deep_etl_spi_analysis_{timestamp}.md" print(f"\n📝 详细分析报告已生成: {report_path}") if __name__ == "__main__": main()