121 lines
3.6 KiB
Python
121 lines
3.6 KiB
Python
"""深入诊断 SPI 数据时间分布"""
|
|
import os
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
|
|
|
|
PG_DSN = os.environ.get("PG_DSN")
|
|
if not PG_DSN:
|
|
raise RuntimeError("PG_DSN 未设置")
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
conn = psycopg2.connect(PG_DSN)
|
|
conn.autocommit = True
|
|
|
|
def q(sql, params=None):
|
|
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
|
cur.execute(sql, params or ())
|
|
return cur.fetchall()
|
|
|
|
print("SPI 数据时间分布深入诊断")
|
|
print("=" * 60)
|
|
|
|
# 1. dwd_settlement_head 全量时间范围
|
|
rows = q("""
|
|
SELECT
|
|
MIN(pay_time) AS min_pay, MAX(pay_time) AS max_pay,
|
|
COUNT(*) AS total,
|
|
COUNT(*) FILTER (WHERE settle_type IN (1, 3)) AS consume_cnt,
|
|
COUNT(*) FILTER (WHERE settle_type = 2) AS recharge_cnt
|
|
FROM dwd.dwd_settlement_head
|
|
""")
|
|
r = rows[0]
|
|
print(f"dwd_settlement_head 全量:")
|
|
print(f" 总记录: {r['total']}, 消费(1,3): {r['consume_cnt']}, 充值(2): {r['recharge_cnt']}")
|
|
print(f" pay_time 范围: {r['min_pay']} ~ {r['max_pay']}")
|
|
|
|
# 2. settle_type IN (1,3) 的最近数据
|
|
rows2 = q("""
|
|
SELECT pay_time::date AS pay_date, COUNT(*) AS cnt
|
|
FROM dwd.dwd_settlement_head
|
|
WHERE settle_type IN (1, 3)
|
|
GROUP BY pay_time::date
|
|
ORDER BY pay_date DESC
|
|
LIMIT 20
|
|
""")
|
|
print(f"\n消费记录(settle_type IN 1,3) 最近 20 天:")
|
|
for r in rows2:
|
|
print(f" {r['pay_date']}: {r['cnt']} 条")
|
|
|
|
# 3. SPI 用 NOW() - 90 days 作为截止,检查实际截止时间
|
|
rows3 = q("SELECT NOW() AS now_ts, NOW() - INTERVAL '90 days' AS cutoff_90, NOW() - INTERVAL '30 days' AS cutoff_30")
|
|
r = rows3[0]
|
|
print(f"\n时间窗口:")
|
|
print(f" NOW(): {r['now_ts']}")
|
|
print(f" 90天截止: {r['cutoff_90']}")
|
|
print(f" 30天截止: {r['cutoff_30']}")
|
|
|
|
# 4. 在 90 天窗口内,按月统计消费记录
|
|
rows4 = q("""
|
|
SELECT
|
|
DATE_TRUNC('month', pay_time)::date AS month,
|
|
COUNT(*) AS cnt,
|
|
COUNT(DISTINCT member_id) AS members,
|
|
SUM(COALESCE(pay_amount, 0)) AS total_amount
|
|
FROM dwd.dwd_settlement_head
|
|
WHERE settle_type IN (1, 3)
|
|
AND pay_time >= NOW() - INTERVAL '90 days'
|
|
GROUP BY month
|
|
ORDER BY month
|
|
""")
|
|
print(f"\n90天窗口内消费记录按月分布:")
|
|
for r in rows4:
|
|
print(f" {r['month']}: {r['cnt']} 条, {r['members']} 会员, 金额 {r['total_amount']}")
|
|
|
|
# 5. 在 30 天窗口内的消费记录
|
|
rows5 = q("""
|
|
SELECT
|
|
COUNT(*) AS cnt,
|
|
COUNT(DISTINCT member_id) AS members,
|
|
SUM(COALESCE(pay_amount, 0)) AS total_amount,
|
|
MIN(pay_time) AS min_pay,
|
|
MAX(pay_time) AS max_pay
|
|
FROM dwd.dwd_settlement_head
|
|
WHERE settle_type IN (1, 3)
|
|
AND pay_time >= NOW() - INTERVAL '30 days'
|
|
""")
|
|
r = rows5[0]
|
|
print(f"\n30天窗口内消费记录:")
|
|
print(f" 记录数: {r['cnt']}, 会员数: {r['members']}, 总金额: {r['total_amount']}")
|
|
print(f" 时间范围: {r['min_pay']} ~ {r['max_pay']}")
|
|
|
|
# 6. 检查 ODS 层最新数据时间
|
|
rows6 = q("""
|
|
SELECT
|
|
MIN(pay_time) AS min_pay, MAX(pay_time) AS max_pay, COUNT(*) AS cnt
|
|
FROM ods.ods_settlement_records
|
|
""")
|
|
r = rows6[0]
|
|
print(f"\nods_settlement_records:")
|
|
print(f" 记录数: {r['cnt']}, pay_time 范围: {r['min_pay']} ~ {r['max_pay']}")
|
|
|
|
# 7. 检查 DWD 层是否有 2 月中旬之后的数据
|
|
rows7 = q("""
|
|
SELECT pay_time::date AS pay_date, COUNT(*) AS cnt
|
|
FROM dwd.dwd_settlement_head
|
|
WHERE pay_time >= '2026-02-10'
|
|
ORDER BY pay_date
|
|
""")
|
|
print(f"\nDWD 2026-02-10 之后的数据:")
|
|
for r in rows7:
|
|
print(f" {r['pay_date']}: {r['cnt']} 条")
|
|
|
|
if not rows7:
|
|
print(" 无数据!")
|
|
|
|
conn.close()
|
|
print("\n诊断完成。")
|