296 lines
11 KiB
Python
296 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
排查 API -> ODS 环节的问题:
|
||
1. 检测 API 字段在 ODS 表中缺失的列
|
||
2. 检测 API 中的 0 值在 ODS 中是否变成了 NULL
|
||
"""
|
||
import json
|
||
import os
|
||
import sys
|
||
from datetime import datetime
|
||
from decimal import Decimal
|
||
from pathlib import Path
|
||
|
||
# 添加项目路径
|
||
project_root = Path(__file__).parent.parent / "etl_billiards"
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
from dotenv import load_dotenv
|
||
load_dotenv(project_root / ".env")
|
||
|
||
from database.connection import DatabaseConnection
|
||
|
||
|
||
def load_api_ods_comparison():
|
||
"""加载已有的 API-ODS 对比文件"""
|
||
comparison_file = Path(__file__).parent / "api_ods_comparison.json"
|
||
if comparison_file.exists():
|
||
with open(comparison_file, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
return {}
|
||
|
||
|
||
def get_ods_tables_mapping():
|
||
"""获取 ODS 任务代码与表名的映射"""
|
||
return {
|
||
"ODS_ASSISTANT_ACCOUNT": "billiards_ods.assistant_accounts_master",
|
||
"ODS_SETTLEMENT_RECORDS": "billiards_ods.settlement_records",
|
||
"ODS_TABLE_USE": "billiards_ods.table_fee_transactions",
|
||
"ODS_ASSISTANT_LEDGER": "billiards_ods.assistant_service_records",
|
||
"ODS_ASSISTANT_ABOLISH": "billiards_ods.assistant_cancellation_records",
|
||
"ODS_STORE_GOODS_SALES": "billiards_ods.store_goods_sales_records",
|
||
"ODS_PAYMENT": "billiards_ods.payment_transactions",
|
||
"ODS_REFUND": "billiards_ods.refund_transactions",
|
||
"ODS_PLATFORM_COUPON": "billiards_ods.platform_coupon_redemption_records",
|
||
"ODS_MEMBER": "billiards_ods.member_profiles",
|
||
"ODS_MEMBER_CARD": "billiards_ods.member_stored_value_cards",
|
||
"ODS_MEMBER_BALANCE": "billiards_ods.member_balance_changes",
|
||
"ODS_RECHARGE_SETTLE": "billiards_ods.recharge_settlements",
|
||
"ODS_GROUP_PACKAGE": "billiards_ods.group_buy_packages",
|
||
"ODS_GROUP_BUY_REDEMPTION": "billiards_ods.group_buy_redemption_records",
|
||
"ODS_INVENTORY_STOCK": "billiards_ods.goods_stock_summary",
|
||
"ODS_INVENTORY_CHANGE": "billiards_ods.goods_stock_movements",
|
||
"ODS_TABLES": "billiards_ods.site_tables_master",
|
||
"ODS_GOODS_CATEGORY": "billiards_ods.stock_goods_category_tree",
|
||
"ODS_STORE_GOODS": "billiards_ods.store_goods_master",
|
||
"ODS_TABLE_FEE_DISCOUNT": "billiards_ods.table_fee_discount_records",
|
||
"ODS_TENANT_GOODS": "billiards_ods.tenant_goods_master",
|
||
}
|
||
|
||
|
||
def check_zero_to_null_issues(db: DatabaseConnection, table_name: str, limit: int = 100):
|
||
"""
|
||
检查 ODS 表中是否存在 payload 里有 0 值但对应列为 NULL 的情况
|
||
"""
|
||
issues = []
|
||
|
||
# 获取表的列信息
|
||
schema, name = table_name.split(".", 1) if "." in table_name else ("public", table_name)
|
||
col_sql = """
|
||
SELECT column_name, data_type, udt_name
|
||
FROM information_schema.columns
|
||
WHERE table_schema = %s AND table_name = %s
|
||
ORDER BY ordinal_position
|
||
"""
|
||
|
||
try:
|
||
cols = db.query(col_sql, (schema, name))
|
||
except Exception as e:
|
||
return {"error": str(e), "issues": []}
|
||
|
||
# 筛选数值类型列(可能存在 0 转 NULL 问题)
|
||
numeric_cols = [
|
||
c["column_name"] for c in cols
|
||
if c["data_type"] in ("integer", "bigint", "smallint", "numeric", "double precision", "real", "decimal")
|
||
]
|
||
|
||
# 查询最近的记录,检查 payload 中的值与列值
|
||
check_sql = f"""
|
||
SELECT payload, {', '.join(f'"{c}"' for c in numeric_cols)}
|
||
FROM {table_name}
|
||
WHERE payload IS NOT NULL
|
||
ORDER BY fetched_at DESC NULLS LAST
|
||
LIMIT %s
|
||
"""
|
||
|
||
try:
|
||
rows = db.query(check_sql, (limit,))
|
||
except Exception as e:
|
||
return {"error": str(e), "issues": []}
|
||
|
||
zero_to_null_count = {}
|
||
|
||
for row in rows:
|
||
payload = row.get("payload")
|
||
if not payload:
|
||
continue
|
||
|
||
if isinstance(payload, str):
|
||
try:
|
||
payload = json.loads(payload)
|
||
except:
|
||
continue
|
||
|
||
if not isinstance(payload, dict):
|
||
continue
|
||
|
||
# 检查每个数值列
|
||
for col in numeric_cols:
|
||
db_value = row.get(col)
|
||
|
||
# 从 payload 中获取对应的值(不区分大小写)
|
||
payload_value = None
|
||
for k, v in payload.items():
|
||
if k.lower() == col.lower():
|
||
payload_value = v
|
||
break
|
||
|
||
# 检查:payload 中是 0,但数据库中是 NULL
|
||
if payload_value == 0 and db_value is None:
|
||
if col not in zero_to_null_count:
|
||
zero_to_null_count[col] = 0
|
||
zero_to_null_count[col] += 1
|
||
|
||
if zero_to_null_count:
|
||
issues = [
|
||
{"column": col, "count": count, "issue": "API 中的 0 值在 ODS 中变成了 NULL"}
|
||
for col, count in zero_to_null_count.items()
|
||
]
|
||
|
||
return {"issues": issues, "checked_rows": len(rows)}
|
||
|
||
|
||
def generate_report():
|
||
"""生成完整的排查报告"""
|
||
print("=" * 80)
|
||
print("API -> ODS 字段排查报告")
|
||
print("生成时间:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
||
print("=" * 80)
|
||
|
||
# 加载对比数据
|
||
comparison = load_api_ods_comparison()
|
||
|
||
if not comparison:
|
||
print("\n[错误] 未找到 API-ODS 对比文件 (api_ods_comparison.json)")
|
||
print("请先运行 compare_api_ods_fields.py 生成对比数据")
|
||
return
|
||
|
||
# 统计缺失字段
|
||
print("\n" + "=" * 80)
|
||
print("一、API 字段在 ODS 表中缺失的情况")
|
||
print("=" * 80)
|
||
|
||
missing_summary = []
|
||
for task_code, data in comparison.items():
|
||
missing = data.get("missing_in_ods", [])
|
||
if missing:
|
||
# 过滤掉 siteprofile 等嵌套对象和系统字段
|
||
filtered_missing = [
|
||
f for f in missing
|
||
if f.lower() not in ("siteprofile", "settleprofile", "tableprofile", "address", "avatar",
|
||
"business_tel", "customer_service_qrcode", "customer_service_wechat",
|
||
"fixed_pay_qrcode", "full_address", "latitude", "longitude",
|
||
"light_status", "light_token", "light_type", "org_id", "prod_env",
|
||
"shop_name", "shop_status", "site_label", "site_type",
|
||
"tenant_site_region_id", "wifi_name", "wifi_password",
|
||
"attendance_distance", "attendance_enabled", "auto_light")
|
||
]
|
||
if filtered_missing:
|
||
missing_summary.append({
|
||
"task_code": task_code,
|
||
"table_name": data.get("table_name"),
|
||
"endpoint": data.get("endpoint"),
|
||
"missing_fields": filtered_missing,
|
||
})
|
||
|
||
if missing_summary:
|
||
for item in missing_summary:
|
||
print(f"\n【{item['task_code']}】")
|
||
print(f" 表名: {item['table_name']}")
|
||
print(f" 端点: {item['endpoint']}")
|
||
print(f" 缺失字段 ({len(item['missing_fields'])} 个):")
|
||
for field in item['missing_fields']:
|
||
print(f" - {field}")
|
||
else:
|
||
print("\n没有发现明显缺失的业务字段。")
|
||
|
||
# 检查 0 转 NULL 问题
|
||
print("\n" + "=" * 80)
|
||
print("二、检查 API 中的 0 值在 ODS 中是否变成了 NULL")
|
||
print("=" * 80)
|
||
|
||
try:
|
||
dsn = os.getenv("PG_DSN")
|
||
if not dsn:
|
||
print("[错误] 未找到 PG_DSN 环境变量")
|
||
return
|
||
db = DatabaseConnection(dsn)
|
||
tables = get_ods_tables_mapping()
|
||
|
||
zero_null_issues = []
|
||
for task_code, table_name in tables.items():
|
||
print(f"\n检查 {task_code} ({table_name})...")
|
||
result = check_zero_to_null_issues(db, table_name)
|
||
|
||
if result.get("error"):
|
||
print(f" [错误] {result['error']}")
|
||
continue
|
||
|
||
if result.get("issues"):
|
||
zero_null_issues.append({
|
||
"task_code": task_code,
|
||
"table_name": table_name,
|
||
"checked_rows": result["checked_rows"],
|
||
"issues": result["issues"],
|
||
})
|
||
for issue in result["issues"]:
|
||
print(f" [发现问题] 列 '{issue['column']}': {issue['count']} 条记录 - {issue['issue']}")
|
||
else:
|
||
print(f" [正常] 检查了 {result['checked_rows']} 条记录,未发现 0 转 NULL 问题")
|
||
|
||
db.close()
|
||
|
||
except Exception as e:
|
||
print(f"\n[错误] 数据库连接失败: {e}")
|
||
zero_null_issues = []
|
||
|
||
# 生成汇总
|
||
print("\n" + "=" * 80)
|
||
print("三、问题汇总")
|
||
print("=" * 80)
|
||
|
||
print("\n1. 需要添加的 ODS 表列:")
|
||
if missing_summary:
|
||
all_ddl = []
|
||
for item in missing_summary:
|
||
table_name = item['table_name']
|
||
for field in item['missing_fields']:
|
||
# 根据字段名推断类型
|
||
if field.endswith("_id") or field in ("tenant_id", "member_id", "site_id"):
|
||
col_type = "BIGINT"
|
||
elif field.endswith("_money") or field.endswith("_amount") or field.endswith("_price"):
|
||
col_type = "NUMERIC(18,2)"
|
||
elif field.endswith("_time") or field.startswith("create") or field.startswith("update"):
|
||
col_type = "TIMESTAMP"
|
||
elif field.startswith("is_") or field.endswith("_status"):
|
||
col_type = "INTEGER"
|
||
else:
|
||
col_type = "TEXT"
|
||
|
||
ddl = f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS {field} {col_type};"
|
||
all_ddl.append(ddl)
|
||
|
||
print("\n生成的 DDL 语句:")
|
||
for ddl in all_ddl:
|
||
print(f" {ddl}")
|
||
else:
|
||
print(" 无")
|
||
|
||
print("\n2. 需要修复的 0 转 NULL 问题:")
|
||
if zero_null_issues:
|
||
for item in zero_null_issues:
|
||
print(f"\n 【{item['task_code']}】({item['table_name']})")
|
||
for issue in item['issues']:
|
||
print(f" - 列 '{issue['column']}': {issue['count']} 条记录受影响")
|
||
else:
|
||
print(" 未发现明显的 0 转 NULL 问题")
|
||
|
||
# 保存报告
|
||
report = {
|
||
"generated_at": datetime.now().isoformat(),
|
||
"missing_fields": missing_summary,
|
||
"zero_to_null_issues": zero_null_issues,
|
||
}
|
||
|
||
report_path = Path(__file__).parent / "api_ods_issue_report.json"
|
||
with open(report_path, "w", encoding="utf-8") as f:
|
||
json.dump(report, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"\n报告已保存到: {report_path}")
|
||
|
||
return report
|
||
|
||
|
||
if __name__ == "__main__":
|
||
generate_report()
|