Files
feiqiu-ETL/tmp/check_api_ods_issues.py
2026-02-04 21:39:01 +08:00

296 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
排查 API -> ODS 环节的问题:
1. 检测 API 字段在 ODS 表中缺失的列
2. 检测 API 中的 0 值在 ODS 中是否变成了 NULL
"""
import json
import os
import sys
from datetime import datetime
from decimal import Decimal
from pathlib import Path
# 添加项目路径
project_root = Path(__file__).parent.parent / "etl_billiards"
sys.path.insert(0, str(project_root))
from dotenv import load_dotenv
load_dotenv(project_root / ".env")
from database.connection import DatabaseConnection
def load_api_ods_comparison():
"""加载已有的 API-ODS 对比文件"""
comparison_file = Path(__file__).parent / "api_ods_comparison.json"
if comparison_file.exists():
with open(comparison_file, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def get_ods_tables_mapping():
"""获取 ODS 任务代码与表名的映射"""
return {
"ODS_ASSISTANT_ACCOUNT": "billiards_ods.assistant_accounts_master",
"ODS_SETTLEMENT_RECORDS": "billiards_ods.settlement_records",
"ODS_TABLE_USE": "billiards_ods.table_fee_transactions",
"ODS_ASSISTANT_LEDGER": "billiards_ods.assistant_service_records",
"ODS_ASSISTANT_ABOLISH": "billiards_ods.assistant_cancellation_records",
"ODS_STORE_GOODS_SALES": "billiards_ods.store_goods_sales_records",
"ODS_PAYMENT": "billiards_ods.payment_transactions",
"ODS_REFUND": "billiards_ods.refund_transactions",
"ODS_PLATFORM_COUPON": "billiards_ods.platform_coupon_redemption_records",
"ODS_MEMBER": "billiards_ods.member_profiles",
"ODS_MEMBER_CARD": "billiards_ods.member_stored_value_cards",
"ODS_MEMBER_BALANCE": "billiards_ods.member_balance_changes",
"ODS_RECHARGE_SETTLE": "billiards_ods.recharge_settlements",
"ODS_GROUP_PACKAGE": "billiards_ods.group_buy_packages",
"ODS_GROUP_BUY_REDEMPTION": "billiards_ods.group_buy_redemption_records",
"ODS_INVENTORY_STOCK": "billiards_ods.goods_stock_summary",
"ODS_INVENTORY_CHANGE": "billiards_ods.goods_stock_movements",
"ODS_TABLES": "billiards_ods.site_tables_master",
"ODS_GOODS_CATEGORY": "billiards_ods.stock_goods_category_tree",
"ODS_STORE_GOODS": "billiards_ods.store_goods_master",
"ODS_TABLE_FEE_DISCOUNT": "billiards_ods.table_fee_discount_records",
"ODS_TENANT_GOODS": "billiards_ods.tenant_goods_master",
}
def check_zero_to_null_issues(db: DatabaseConnection, table_name: str, limit: int = 100):
"""
检查 ODS 表中是否存在 payload 里有 0 值但对应列为 NULL 的情况
"""
issues = []
# 获取表的列信息
schema, name = table_name.split(".", 1) if "." in table_name else ("public", table_name)
col_sql = """
SELECT column_name, data_type, udt_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
"""
try:
cols = db.query(col_sql, (schema, name))
except Exception as e:
return {"error": str(e), "issues": []}
# 筛选数值类型列(可能存在 0 转 NULL 问题)
numeric_cols = [
c["column_name"] for c in cols
if c["data_type"] in ("integer", "bigint", "smallint", "numeric", "double precision", "real", "decimal")
]
# 查询最近的记录,检查 payload 中的值与列值
check_sql = f"""
SELECT payload, {', '.join(f'"{c}"' for c in numeric_cols)}
FROM {table_name}
WHERE payload IS NOT NULL
ORDER BY fetched_at DESC NULLS LAST
LIMIT %s
"""
try:
rows = db.query(check_sql, (limit,))
except Exception as e:
return {"error": str(e), "issues": []}
zero_to_null_count = {}
for row in rows:
payload = row.get("payload")
if not payload:
continue
if isinstance(payload, str):
try:
payload = json.loads(payload)
except:
continue
if not isinstance(payload, dict):
continue
# 检查每个数值列
for col in numeric_cols:
db_value = row.get(col)
# 从 payload 中获取对应的值(不区分大小写)
payload_value = None
for k, v in payload.items():
if k.lower() == col.lower():
payload_value = v
break
# 检查payload 中是 0但数据库中是 NULL
if payload_value == 0 and db_value is None:
if col not in zero_to_null_count:
zero_to_null_count[col] = 0
zero_to_null_count[col] += 1
if zero_to_null_count:
issues = [
{"column": col, "count": count, "issue": "API 中的 0 值在 ODS 中变成了 NULL"}
for col, count in zero_to_null_count.items()
]
return {"issues": issues, "checked_rows": len(rows)}
def generate_report():
"""生成完整的排查报告"""
print("=" * 80)
print("API -> ODS 字段排查报告")
print("生成时间:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
print("=" * 80)
# 加载对比数据
comparison = load_api_ods_comparison()
if not comparison:
print("\n[错误] 未找到 API-ODS 对比文件 (api_ods_comparison.json)")
print("请先运行 compare_api_ods_fields.py 生成对比数据")
return
# 统计缺失字段
print("\n" + "=" * 80)
print("一、API 字段在 ODS 表中缺失的情况")
print("=" * 80)
missing_summary = []
for task_code, data in comparison.items():
missing = data.get("missing_in_ods", [])
if missing:
# 过滤掉 siteprofile 等嵌套对象和系统字段
filtered_missing = [
f for f in missing
if f.lower() not in ("siteprofile", "settleprofile", "tableprofile", "address", "avatar",
"business_tel", "customer_service_qrcode", "customer_service_wechat",
"fixed_pay_qrcode", "full_address", "latitude", "longitude",
"light_status", "light_token", "light_type", "org_id", "prod_env",
"shop_name", "shop_status", "site_label", "site_type",
"tenant_site_region_id", "wifi_name", "wifi_password",
"attendance_distance", "attendance_enabled", "auto_light")
]
if filtered_missing:
missing_summary.append({
"task_code": task_code,
"table_name": data.get("table_name"),
"endpoint": data.get("endpoint"),
"missing_fields": filtered_missing,
})
if missing_summary:
for item in missing_summary:
print(f"\n{item['task_code']}")
print(f" 表名: {item['table_name']}")
print(f" 端点: {item['endpoint']}")
print(f" 缺失字段 ({len(item['missing_fields'])} 个):")
for field in item['missing_fields']:
print(f" - {field}")
else:
print("\n没有发现明显缺失的业务字段。")
# 检查 0 转 NULL 问题
print("\n" + "=" * 80)
print("二、检查 API 中的 0 值在 ODS 中是否变成了 NULL")
print("=" * 80)
try:
dsn = os.getenv("PG_DSN")
if not dsn:
print("[错误] 未找到 PG_DSN 环境变量")
return
db = DatabaseConnection(dsn)
tables = get_ods_tables_mapping()
zero_null_issues = []
for task_code, table_name in tables.items():
print(f"\n检查 {task_code} ({table_name})...")
result = check_zero_to_null_issues(db, table_name)
if result.get("error"):
print(f" [错误] {result['error']}")
continue
if result.get("issues"):
zero_null_issues.append({
"task_code": task_code,
"table_name": table_name,
"checked_rows": result["checked_rows"],
"issues": result["issues"],
})
for issue in result["issues"]:
print(f" [发现问题] 列 '{issue['column']}': {issue['count']} 条记录 - {issue['issue']}")
else:
print(f" [正常] 检查了 {result['checked_rows']} 条记录,未发现 0 转 NULL 问题")
db.close()
except Exception as e:
print(f"\n[错误] 数据库连接失败: {e}")
zero_null_issues = []
# 生成汇总
print("\n" + "=" * 80)
print("三、问题汇总")
print("=" * 80)
print("\n1. 需要添加的 ODS 表列:")
if missing_summary:
all_ddl = []
for item in missing_summary:
table_name = item['table_name']
for field in item['missing_fields']:
# 根据字段名推断类型
if field.endswith("_id") or field in ("tenant_id", "member_id", "site_id"):
col_type = "BIGINT"
elif field.endswith("_money") or field.endswith("_amount") or field.endswith("_price"):
col_type = "NUMERIC(18,2)"
elif field.endswith("_time") or field.startswith("create") or field.startswith("update"):
col_type = "TIMESTAMP"
elif field.startswith("is_") or field.endswith("_status"):
col_type = "INTEGER"
else:
col_type = "TEXT"
ddl = f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS {field} {col_type};"
all_ddl.append(ddl)
print("\n生成的 DDL 语句:")
for ddl in all_ddl:
print(f" {ddl}")
else:
print("")
print("\n2. 需要修复的 0 转 NULL 问题:")
if zero_null_issues:
for item in zero_null_issues:
print(f"\n{item['task_code']}】({item['table_name']})")
for issue in item['issues']:
print(f" - 列 '{issue['column']}': {issue['count']} 条记录受影响")
else:
print(" 未发现明显的 0 转 NULL 问题")
# 保存报告
report = {
"generated_at": datetime.now().isoformat(),
"missing_fields": missing_summary,
"zero_to_null_issues": zero_null_issues,
}
report_path = Path(__file__).parent / "api_ods_issue_report.json"
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"\n报告已保存到: {report_path}")
return report
if __name__ == "__main__":
generate_report()