Files
Neo-ZQYY/scripts/ops/_revalidate_data_consistency.py

194 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
重新验证 ODS 和 DWD 数据一致性,查明之前分析的矛盾
"""
import os
import psycopg2
from datetime import datetime
from dotenv import load_dotenv
def main():
# 加载环境变量
load_dotenv()
test_db_dsn = os.environ.get('TEST_DB_DSN')
if not test_db_dsn:
raise RuntimeError("TEST_DB_DSN 环境变量未设置")
print("🔍 重新验证数据一致性")
print("=" * 40)
with psycopg2.connect(test_db_dsn) as conn:
with conn.cursor() as cur:
# 1. 重新检查 ODS 和 DWD 的记录数对比
print("\n📊 1. 重新统计 ODS vs DWD 记录数")
# 检查具体日期的详细情况
test_dates = ['2026-02-13', '2026-02-12', '2026-02-11']
for test_date in test_dates:
print(f"\n--- {test_date} 详细分析 ---")
# ODS 记录数
cur.execute("""
SELECT COUNT(*) FROM ods.settlement_records
WHERE paytime::date = %s
""", (test_date,))
ods_count = cur.fetchone()[0]
# DWD 记录数
cur.execute("""
SELECT COUNT(*) FROM dwd.dwd_settlement_head
WHERE pay_time::date = %s
""", (test_date,))
dwd_count = cur.fetchone()[0]
print(f"ODS: {ods_count:,}")
print(f"DWD: {dwd_count:,}")
print(f"差异: {ods_count - dwd_count:,}")
if ods_count != dwd_count:
# 查找缺失的记录
cur.execute("""
SELECT o.id, o.paytime, o.payamount, o.settlestatus, o.settletype
FROM ods.settlement_records o
LEFT JOIN dwd.dwd_settlement_head d ON o.id = d.order_settle_id
WHERE o.paytime::date = %s AND d.order_settle_id IS NULL
LIMIT 5
""", (test_date,))
missing_records = cur.fetchall()
if missing_records:
print("缺失记录样本:")
for record in missing_records:
print(f" ID: {record[0]}, 时间: {record[1]}, 金额: {record[2]}")
# 查找多余的记录
cur.execute("""
SELECT d.order_settle_id, d.pay_time, d.pay_amount
FROM dwd.dwd_settlement_head d
LEFT JOIN ods.settlement_records o ON d.order_settle_id = o.id
WHERE d.pay_time::date = %s AND o.id IS NULL
LIMIT 5
""", (test_date,))
extra_records = cur.fetchall()
if extra_records:
print("多余记录样本:")
for record in extra_records:
print(f" ID: {record[0]}, 时间: {record[1]}, 金额: {record[2]}")
# 2. 检查数据类型和字段映射
print(f"\n🔍 2. 检查字段映射和数据类型")
# 检查 ODS 表结构
cur.execute("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'ods' AND table_name = 'settlement_records'
AND column_name IN ('id', 'paytime', 'payamount', 'memberid')
ORDER BY column_name
""")
ods_columns = cur.fetchall()
print("ODS 关键字段:")
for col_name, data_type, nullable in ods_columns:
print(f" {col_name}: {data_type} ({'NULL' if nullable == 'YES' else 'NOT NULL'})")
# 检查 DWD 表结构
cur.execute("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dwd_settlement_head'
AND column_name IN ('order_settle_id', 'pay_time', 'pay_amount', 'member_id')
ORDER BY column_name
""")
dwd_columns = cur.fetchall()
print("DWD 关键字段:")
for col_name, data_type, nullable in dwd_columns:
print(f" {col_name}: {data_type} ({'NULL' if nullable == 'YES' else 'NOT NULL'})")
# 3. 检查是否有数据转换问题
print(f"\n🔄 3. 检查数据转换问题")
# 检查 ID 映射
cur.execute("""
SELECT
COUNT(*) as total_ods,
COUNT(DISTINCT o.id) as unique_ods_ids,
COUNT(d.order_settle_id) as matched_dwd,
COUNT(DISTINCT d.order_settle_id) as unique_dwd_ids
FROM ods.settlement_records o
LEFT JOIN dwd.dwd_settlement_head d ON o.id = d.order_settle_id
WHERE o.paytime::date BETWEEN '2026-02-10' AND '2026-02-14'
""")
id_mapping = cur.fetchone()
print("ID 映射统计:")
print(f" ODS 总记录: {id_mapping[0]:,}")
print(f" ODS 唯一ID: {id_mapping[1]:,}")
print(f" DWD 匹配记录: {id_mapping[2]:,}")
print(f" DWD 唯一ID: {id_mapping[3]:,}")
# 4. 检查最新的 ETL 处理时间
print(f"\n⏰ 4. 检查 ETL 处理时间戳")
# 检查 DWD 表是否有处理时间戳字段
cur.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'dwd' AND table_name = 'dwd_settlement_head'
AND (column_name LIKE '%created%' OR column_name LIKE '%updated%' OR column_name LIKE '%processed%')
""")
timestamp_columns = cur.fetchall()
if timestamp_columns:
print("发现时间戳字段:")
for col in timestamp_columns:
print(f" {col[0]}")
else:
print("未发现时间戳字段")
# 5. 重新检查总体数据量
print(f"\n📈 5. 总体数据量对比")
cur.execute("SELECT COUNT(*) FROM ods.settlement_records")
total_ods = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM dwd.dwd_settlement_head")
total_dwd = cur.fetchone()[0]
print(f"ODS 总记录数: {total_ods:,}")
print(f"DWD 总记录数: {total_dwd:,}")
print(f"总体差异: {total_ods - total_dwd:,}")
if total_ods != total_dwd:
# 查找全局缺失模式
cur.execute("""
SELECT
CASE
WHEN o.id IS NULL THEN 'DWD多余'
WHEN d.order_settle_id IS NULL THEN 'ODS缺失'
ELSE '匹配'
END as status,
COUNT(*) as count
FROM ods.settlement_records o
FULL OUTER JOIN dwd.dwd_settlement_head d ON o.id = d.order_settle_id
GROUP BY
CASE
WHEN o.id IS NULL THEN 'DWD多余'
WHEN d.order_settle_id IS NULL THEN 'ODS缺失'
ELSE '匹配'
END
""")
status_summary = cur.fetchall()
print("数据匹配状态:")
for status, count in status_summary:
print(f" {status}: {count:,}")
if __name__ == "__main__":
main()