129 lines
5.2 KiB
Python
129 lines
5.2 KiB
Python
"""检查上游 API 数据为什么只到 2/14,以及 SPI 30天窗口内的实际会员数"""
|
||
import os
|
||
from pathlib import Path
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv(Path(__file__).resolve().parents[2] / ".env")
|
||
|
||
PG_DSN = os.environ.get("PG_DSN")
|
||
if not PG_DSN:
|
||
raise RuntimeError("PG_DSN 未设置")
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
|
||
conn = psycopg2.connect(PG_DSN)
|
||
conn.autocommit = True
|
||
|
||
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
||
|
||
# 1. ODS 结算表名
|
||
cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'ods' ORDER BY table_name")
|
||
ods_tables = [r['table_name'] for r in cur.fetchall()]
|
||
print(f"ODS 表 ({len(ods_tables)} 张): {', '.join(ods_tables[:10])}...")
|
||
|
||
# 2. 找 payment 相关的 ODS 表
|
||
payment_tables = [t for t in ods_tables if 'payment' in t or 'settle' in t]
|
||
print(f"\n结算/支付相关 ODS 表: {payment_tables}")
|
||
|
||
# 3. 检查 ods_payment 的最新数据
|
||
for tname in payment_tables:
|
||
try:
|
||
cur.execute(f"SELECT MAX(etl_loaded_at) AS max_loaded, COUNT(*) AS cnt FROM ods.{tname}")
|
||
r = cur.fetchone()
|
||
print(f" {tname}: {r['cnt']} 行, max_loaded={r['max_loaded']}")
|
||
except Exception as e:
|
||
conn.rollback()
|
||
# 尝试其他时间列
|
||
try:
|
||
cur.execute(f"SELECT column_name FROM information_schema.columns WHERE table_schema='ods' AND table_name='{tname}' AND data_type LIKE 'timestamp%%' ORDER BY ordinal_position LIMIT 5")
|
||
ts_cols = [r['column_name'] for r in cur.fetchall()]
|
||
print(f" {tname}: 时间列={ts_cols}")
|
||
if ts_cols:
|
||
cur.execute(f"SELECT MAX({ts_cols[-1]}) AS max_ts, COUNT(*) AS cnt FROM ods.{tname}")
|
||
r = cur.fetchone()
|
||
print(f" {ts_cols[-1]}: max={r['max_ts']}, cnt={r['cnt']}")
|
||
except Exception as e2:
|
||
conn.rollback()
|
||
print(f" {tname}: 查询失败 ({e2})")
|
||
|
||
# 4. 检查 ods_payment 的 pay_time 分布
|
||
if 'ods_payment' in ods_tables:
|
||
print("\nods_payment pay_time 分布(最近):")
|
||
try:
|
||
cur.execute("""
|
||
SELECT column_name FROM information_schema.columns
|
||
WHERE table_schema='ods' AND table_name='ods_payment'
|
||
AND column_name IN ('pay_time', 'create_time', 'updated_at')
|
||
""")
|
||
cols = [r['column_name'] for r in cur.fetchall()]
|
||
print(f" 可用时间列: {cols}")
|
||
for col in cols:
|
||
cur.execute(f"SELECT MIN({col}) AS min_t, MAX({col}) AS max_t FROM ods.ods_payment")
|
||
r = cur.fetchone()
|
||
print(f" {col}: {r['min_t']} ~ {r['max_t']}")
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f" 查询失败: {e}")
|
||
|
||
# 5. SPI canonical_member_id 30天窗口分析
|
||
print("\n" + "=" * 60)
|
||
print("SPI 30天窗口 canonical_member_id 分析")
|
||
cur.execute("""
|
||
WITH consume_source AS (
|
||
SELECT
|
||
COALESCE(NULLIF(s.member_id, 0), mca.tenant_member_id) AS canonical_member_id,
|
||
s.pay_time,
|
||
COALESCE(s.pay_amount, 0) AS pay_amount
|
||
FROM dwd.dwd_settlement_head s
|
||
LEFT JOIN dwd.dim_member_card_account mca
|
||
ON s.member_card_account_id = mca.member_card_id
|
||
AND mca.scd2_is_current = 1
|
||
AND mca.register_site_id = s.site_id
|
||
AND COALESCE(mca.is_delete, 0) = 0
|
||
WHERE s.site_id = (SELECT DISTINCT site_id FROM dwd.dwd_settlement_head LIMIT 1)
|
||
AND s.settle_type IN (1, 3)
|
||
AND s.pay_time >= NOW() - INTERVAL '90 days'
|
||
)
|
||
SELECT
|
||
canonical_member_id,
|
||
SUM(pay_amount) AS spend_90,
|
||
SUM(CASE WHEN pay_time >= NOW() - INTERVAL '30 days' THEN pay_amount ELSE 0 END) AS spend_30,
|
||
COUNT(*) AS orders_90,
|
||
SUM(CASE WHEN pay_time >= NOW() - INTERVAL '30 days' THEN 1 ELSE 0 END) AS orders_30
|
||
FROM consume_source
|
||
WHERE canonical_member_id > 0
|
||
GROUP BY canonical_member_id
|
||
""")
|
||
rows = cur.fetchall()
|
||
total = len(rows)
|
||
has_30 = sum(1 for r in rows if float(r['spend_30']) > 0)
|
||
zero_30 = total - has_30
|
||
print(f" 90天有消费会员: {total}")
|
||
print(f" 30天有消费: {has_30} ({has_30/total*100:.1f}%)")
|
||
print(f" 30天无消费: {zero_30} ({zero_30/total*100:.1f}%)")
|
||
|
||
spend_30_vals = sorted([float(r['spend_30']) for r in rows])
|
||
spend_90_vals = sorted([float(r['spend_90']) for r in rows])
|
||
n = len(spend_30_vals)
|
||
print(f" spend_30 中位数: {spend_30_vals[n//2]:.2f}")
|
||
print(f" spend_90 中位数: {spend_90_vals[n//2]:.2f}")
|
||
|
||
# 6. 上游 API 数据最新时间(从 DWD 看各表)
|
||
print("\n" + "=" * 60)
|
||
print("DWD 各表最新 pay_time / create_time:")
|
||
for tname in ['dwd_settlement_head', 'dwd_assistant_service_log', 'dwd_table_fee_log']:
|
||
try:
|
||
cur.execute(f"SELECT column_name FROM information_schema.columns WHERE table_schema='dwd' AND table_name='{tname}' AND column_name IN ('pay_time', 'create_time') ORDER BY column_name")
|
||
cols = [r['column_name'] for r in cur.fetchall()]
|
||
for col in cols:
|
||
cur.execute(f"SELECT MAX({col}) AS max_t FROM dwd.{tname}")
|
||
r = cur.fetchone()
|
||
print(f" {tname}.{col}: {r['max_t']}")
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f" {tname}: {e}")
|
||
|
||
conn.close()
|
||
print("\n诊断完成。")
|