Files
Neo-ZQYY/scripts/ops/_diagnose_etl_issues_v2.py

165 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""诊断 ETL 联调问题 2 和 3 的补充查询"""
import os, sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
PG_DSN = os.environ.get("PG_DSN")
if not PG_DSN:
raise RuntimeError("PG_DSN 未设置")
import psycopg2
import psycopg2.extras
conn = psycopg2.connect(PG_DSN)
conn.autocommit = True
def q(sql):
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(sql)
return cur.fetchall()
# ============================================================
# 问题 2 深入DWS_MEMBER_VISIT 唯一约束冲突
# ============================================================
print("=" * 60)
print("问题 2 深入: DWS_MEMBER_VISIT 唯一约束冲突")
print("=" * 60)
# 检查 biz_date 计算是否导致同一 order 在不同窗口切片中被重复处理
# 任务使用 delete-before-insert 按 visit_date 窗口删除
# 如果窗口切片有重叠,或者 biz_date 计算使得同一 order 出现在两个切片中
# 第一个切片删除+插入成功,第二个切片不删除(因为 biz_date 不在其窗口内)但又尝试插入
# 检查 dwd_settlement_head 中 pay_time 和 biz_date 的关系
rows = q("""
SELECT
pay_time::date AS pay_date,
(CASE WHEN EXTRACT(HOUR FROM pay_time) < 8
THEN (pay_time - INTERVAL '1 day')::date
ELSE pay_time::date END) AS biz_date,
COUNT(*) AS cnt
FROM dwd.dwd_settlement_head
WHERE member_id IS NOT NULL AND member_id != 0
AND pay_time IS NOT NULL
GROUP BY pay_date, biz_date
HAVING pay_time::date != (CASE WHEN EXTRACT(HOUR FROM pay_time) < 8
THEN (pay_time - INTERVAL '1 day')::date
ELSE pay_time::date END)
ORDER BY pay_date
LIMIT 20
""")
print(f"\npay_date != biz_date 的记录组数: {len(rows)}")
if rows:
for r in rows[:5]:
print(f" pay_date={r['pay_date']}, biz_date={r['biz_date']}, cnt={r['cnt']}")
# 检查是否有旧数据残留(之前运行留下的数据,新运行的 delete 窗口没覆盖到)
# 窗口是 2025-11-01 ~ 2026-02-2730天切分
# 切片1: 2025-11-01 ~ 2025-11-30
# 切片2: 2025-12-01 ~ 2025-12-30
# 切片3: 2025-12-31 ~ 2026-01-29
# 切片4: 2026-01-30 ~ 2026-02-27
# 如果之前有 2025-07 ~ 2025-10 的数据delete 不会删除它们
rows2 = q("""
SELECT visit_date, COUNT(*) AS cnt
FROM dws.dws_member_visit_detail
WHERE visit_date < '2025-11-01'
GROUP BY visit_date
ORDER BY visit_date
""")
print(f"\n2025-11-01 之前的残留数据: {len(rows2)} 个日期")
if rows2:
total = sum(r['cnt'] for r in rows2)
print(f" 总计: {total}")
print(f" 最早: {rows2[0]['visit_date']}")
print(f" 最晚: {rows2[-1]['visit_date']}")
# 关键检查:是否有 order_settle_id 在旧数据和新窗口数据中都出现
rows3 = q("""
WITH old_data AS (
SELECT site_id, member_id, order_settle_id
FROM dws.dws_member_visit_detail
WHERE visit_date < '2025-11-01'
),
new_window AS (
SELECT DISTINCT order_settle_id, member_id, site_id
FROM dwd.dwd_settlement_head
WHERE member_id IS NOT NULL AND member_id != 0
)
SELECT o.site_id, o.member_id, o.order_settle_id
FROM old_data o
JOIN new_window n ON o.site_id = n.site_id
AND o.member_id = n.member_id
AND o.order_settle_id = n.order_settle_id
LIMIT 10
""")
print(f"\n旧数据与新窗口重叠的 order: {len(rows3)}")
if rows3:
for r in rows3[:5]:
print(f" site={r['site_id']}, member={r['member_id']}, order={r['order_settle_id']}")
# ============================================================
# 问题 3: SPI 基数校准 WARNING
# ============================================================
print()
print("=" * 60)
print("问题 3: SPI 基数校准 WARNING")
print("=" * 60)
# 先查 consumption_summary 的实际列名
rows4 = q("""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'dws' AND table_name = 'dws_member_consumption_summary'
ORDER BY ordinal_position
""")
print("dws_member_consumption_summary 列:")
cols = [r['column_name'] for r in rows4]
print(f" {', '.join(cols[:20])}")
if len(cols) > 20:
print(f" ... 共 {len(cols)}")
# 检查 SPI 结果表
rows5 = q("""
SELECT COUNT(*) AS cnt,
AVG(display_score) AS avg_score,
MIN(display_score) AS min_score,
MAX(display_score) AS max_score
FROM dws.dws_member_spending_power_index
""")
if rows5:
r = rows5[0]
print(f"\ndws_member_spending_power_index: {r['cnt']}")
print(f" 平均分: {r['avg_score']}, 最低: {r['min_score']}, 最高: {r['max_score']}")
# 检查 SPI 源数据中各特征的中位数
rows6 = q("""
SELECT
COUNT(*) AS total,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY score_level_raw) AS median_level,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY score_speed_raw) AS median_speed,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY score_stability_raw) AS median_stability
FROM dws.dws_member_spending_power_index
""")
if rows6:
r = rows6[0]
print(f" 子分中位数: level={r['median_level']}, speed={r['median_speed']}, stability={r['median_stability']}")
# 检查有多少会员的消费数据为 0
rows7 = q("""
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE total_consume_amount > 0) AS has_consume,
COUNT(*) FILTER (WHERE total_consume_amount = 0 OR total_consume_amount IS NULL) AS no_consume
FROM dws.dws_member_consumption_summary
""")
if rows7:
r = rows7[0]
print(f"\n消费汇总: 总 {r['total']} 会员, 有消费 {r['has_consume']}, 无消费 {r['no_consume']}")
conn.close()
print("\n诊断完成。")