173 lines
6.0 KiB
Python
173 lines
6.0 KiB
Python
"""诊断 ETL 联调三个问题:
|
||
1. DWS_ASSISTANT_SALARY 为什么 ins=0
|
||
2. DWS_MEMBER_VISIT 唯一约束冲突原因
|
||
3. SPI 基数校准 WARNING 原因
|
||
"""
|
||
import os, sys
|
||
from pathlib import Path
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
|
||
|
||
PG_DSN = os.environ.get("PG_DSN")
|
||
if not PG_DSN:
|
||
raise RuntimeError("PG_DSN 未设置")
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
|
||
conn = psycopg2.connect(PG_DSN)
|
||
conn.autocommit = True
|
||
|
||
def q(sql):
|
||
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||
cur.execute(sql)
|
||
return cur.fetchall()
|
||
|
||
print("=" * 60)
|
||
print("问题 1: DWS_ASSISTANT_SALARY 源数据检查")
|
||
print("=" * 60)
|
||
|
||
# 检查 dws_assistant_monthly_summary 是否有数据
|
||
rows = q("SELECT stat_month, COUNT(*) AS cnt FROM dws.dws_assistant_monthly_summary GROUP BY stat_month ORDER BY stat_month")
|
||
if rows:
|
||
print(f"dws_assistant_monthly_summary 数据分布(按月):")
|
||
for r in rows:
|
||
print(f" {r['stat_month']}: {r['cnt']} 行")
|
||
else:
|
||
print("dws_assistant_monthly_summary: 无数据!")
|
||
|
||
# 检查 salary_calc 当前数据
|
||
rows2 = q("SELECT salary_month, COUNT(*) AS cnt FROM dws.dws_assistant_salary_calc GROUP BY salary_month ORDER BY salary_month")
|
||
if rows2:
|
||
print(f"\ndws_assistant_salary_calc 已有数据:")
|
||
for r in rows2:
|
||
print(f" {r['salary_month']}: {r['cnt']} 行")
|
||
else:
|
||
print("\ndws_assistant_salary_calc: 无数据")
|
||
|
||
# 检查 _should_skip_run 逻辑:ETL 运行日期是 2/27,day=27 > run_days=5,所以会跳过
|
||
print(f"\n结论: ETL 运行日期 2026-02-27,day=27 > run_days(默认5)")
|
||
print(" _should_skip_run() 返回 True,任务被跳过,这是设计行为。")
|
||
print(" 工资计算仅在月初前5天运行(可通过 dws.salary.run_days 配置)。")
|
||
|
||
print()
|
||
print("=" * 60)
|
||
print("问题 2: DWS_MEMBER_VISIT 唯一约束冲突")
|
||
print("=" * 60)
|
||
|
||
# 检查源数据中是否有重复的 (site_id, member_id, order_settle_id)
|
||
rows3 = q("""
|
||
SELECT site_id, member_id, order_settle_id, COUNT(*) AS cnt
|
||
FROM dwd.dwd_settlement_head
|
||
WHERE member_id IS NOT NULL AND member_id != 0
|
||
GROUP BY site_id, member_id, order_settle_id
|
||
HAVING COUNT(*) > 1
|
||
ORDER BY cnt DESC
|
||
LIMIT 10
|
||
""")
|
||
if rows3:
|
||
print(f"dwd_settlement_head 中有 {len(rows3)} 组重复 (site_id, member_id, order_settle_id):")
|
||
for r in rows3:
|
||
print(f" site={r['site_id']}, member={r['member_id']}, order={r['order_settle_id']}, cnt={r['cnt']}")
|
||
else:
|
||
print("dwd_settlement_head 中无重复 (site_id, member_id, order_settle_id)")
|
||
|
||
# 检查是否是跨窗口重复(delete-before-insert 按日期窗口删除,但同一 order 可能跨窗口)
|
||
rows4 = q("""
|
||
SELECT site_id, member_id, order_settle_id, COUNT(*) AS cnt
|
||
FROM dws.dws_member_visit_detail
|
||
GROUP BY site_id, member_id, order_settle_id
|
||
HAVING COUNT(*) > 1
|
||
LIMIT 10
|
||
""")
|
||
if rows4:
|
||
print(f"\ndws_member_visit_detail 中已有重复:")
|
||
for r in rows4:
|
||
print(f" site={r['site_id']}, member={r['member_id']}, order={r['order_settle_id']}, cnt={r['cnt']}")
|
||
else:
|
||
print("\ndws_member_visit_detail 中无重复(当前数据干净)")
|
||
|
||
# 检查 visit_date 分布,看是否有跨窗口的 order
|
||
rows5 = q("""
|
||
SELECT visit_date, COUNT(*) AS cnt
|
||
FROM dws.dws_member_visit_detail
|
||
GROUP BY visit_date
|
||
ORDER BY visit_date
|
||
""")
|
||
print(f"\ndws_member_visit_detail visit_date 分布: {len(rows5)} 个日期")
|
||
if rows5:
|
||
print(f" 最早: {rows5[0]['visit_date']} ({rows5[0]['cnt']} 行)")
|
||
print(f" 最晚: {rows5[-1]['visit_date']} ({rows5[-1]['cnt']} 行)")
|
||
total = sum(r['cnt'] for r in rows5)
|
||
print(f" 总计: {total} 行")
|
||
|
||
# 关键:检查同一 order_settle_id 是否出现在不同 visit_date(biz_date 计算可能导致跨窗口)
|
||
rows6 = q("""
|
||
WITH order_dates AS (
|
||
SELECT order_settle_id, member_id, site_id,
|
||
pay_time,
|
||
pay_time::date AS pay_date
|
||
FROM dwd.dwd_settlement_head
|
||
WHERE member_id IS NOT NULL AND member_id != 0
|
||
)
|
||
SELECT order_settle_id, member_id, COUNT(DISTINCT pay_date) AS date_cnt
|
||
FROM order_dates
|
||
GROUP BY order_settle_id, member_id
|
||
HAVING COUNT(DISTINCT pay_date) > 1
|
||
LIMIT 5
|
||
""")
|
||
if rows6:
|
||
print(f"\n同一 order 出现在多个 pay_date: {len(rows6)} 组")
|
||
else:
|
||
print("\n同一 order 不跨日期")
|
||
|
||
# 检查 SCD2 是否导致 member_id 重复映射
|
||
rows7 = q("""
|
||
SELECT order_settle_id, COUNT(DISTINCT member_id) AS member_cnt
|
||
FROM dwd.dwd_settlement_head
|
||
WHERE member_id IS NOT NULL AND member_id != 0
|
||
GROUP BY order_settle_id
|
||
HAVING COUNT(DISTINCT member_id) > 1
|
||
LIMIT 10
|
||
""")
|
||
if rows7:
|
||
print(f"\n同一 order_settle_id 对应多个 member_id: {len(rows7)} 组")
|
||
for r in rows7:
|
||
print(f" order={r['order_settle_id']}, member_cnt={r['member_cnt']}")
|
||
else:
|
||
print("\n同一 order_settle_id 不对应多个 member_id")
|
||
|
||
print()
|
||
print("=" * 60)
|
||
print("问题 3: SPI 基数校准 WARNING")
|
||
print("=" * 60)
|
||
|
||
# 检查 SPI 源数据:有多少会员有消费
|
||
rows8 = q("""
|
||
SELECT
|
||
COUNT(*) AS total_members,
|
||
COUNT(*) FILTER (WHERE spend_30d > 0) AS has_spend_30,
|
||
COUNT(*) FILTER (WHERE spend_90d > 0) AS has_spend_90,
|
||
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY spend_30d) AS median_spend_30,
|
||
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY spend_90d) AS median_spend_90
|
||
FROM dws.dws_member_consumption_summary
|
||
""")
|
||
if rows8:
|
||
r = rows8[0]
|
||
print(f"dws_member_consumption_summary:")
|
||
print(f" 总会员数: {r['total_members']}")
|
||
print(f" 30天有消费: {r['has_spend_30']}")
|
||
print(f" 90天有消费: {r['has_spend_90']}")
|
||
print(f" 30天消费中位数: {r['median_spend_30']}")
|
||
print(f" 90天消费中位数: {r['median_spend_90']}")
|
||
|
||
# 检查 SPI 结果表
|
||
rows9 = q("SELECT COUNT(*) AS cnt, AVG(display_score) AS avg_score FROM dws.dws_member_spending_power_index")
|
||
if rows9:
|
||
r = rows9[0]
|
||
print(f"\ndws_member_spending_power_index: {r['cnt']} 行, 平均分: {r['avg_score']}")
|
||
|
||
conn.close()
|
||
print("\n诊断完成。")
|