# -*- coding: utf-8 -*- """ 排查 API -> ODS 环节的问题: 1. 检测 API 字段在 ODS 表中缺失的列 2. 检测 API 中的 0 值在 ODS 中是否变成了 NULL """ import json import os import sys from datetime import datetime from decimal import Decimal from pathlib import Path # 添加项目路径 project_root = Path(__file__).parent.parent / "etl_billiards" sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv(project_root / ".env") from database.connection import DatabaseConnection def load_api_ods_comparison(): """加载已有的 API-ODS 对比文件""" comparison_file = Path(__file__).parent / "api_ods_comparison.json" if comparison_file.exists(): with open(comparison_file, "r", encoding="utf-8") as f: return json.load(f) return {} def get_ods_tables_mapping(): """获取 ODS 任务代码与表名的映射""" return { "ODS_ASSISTANT_ACCOUNT": "billiards_ods.assistant_accounts_master", "ODS_SETTLEMENT_RECORDS": "billiards_ods.settlement_records", "ODS_TABLE_USE": "billiards_ods.table_fee_transactions", "ODS_ASSISTANT_LEDGER": "billiards_ods.assistant_service_records", "ODS_ASSISTANT_ABOLISH": "billiards_ods.assistant_cancellation_records", "ODS_STORE_GOODS_SALES": "billiards_ods.store_goods_sales_records", "ODS_PAYMENT": "billiards_ods.payment_transactions", "ODS_REFUND": "billiards_ods.refund_transactions", "ODS_PLATFORM_COUPON": "billiards_ods.platform_coupon_redemption_records", "ODS_MEMBER": "billiards_ods.member_profiles", "ODS_MEMBER_CARD": "billiards_ods.member_stored_value_cards", "ODS_MEMBER_BALANCE": "billiards_ods.member_balance_changes", "ODS_RECHARGE_SETTLE": "billiards_ods.recharge_settlements", "ODS_GROUP_PACKAGE": "billiards_ods.group_buy_packages", "ODS_GROUP_BUY_REDEMPTION": "billiards_ods.group_buy_redemption_records", "ODS_INVENTORY_STOCK": "billiards_ods.goods_stock_summary", "ODS_INVENTORY_CHANGE": "billiards_ods.goods_stock_movements", "ODS_TABLES": "billiards_ods.site_tables_master", "ODS_GOODS_CATEGORY": "billiards_ods.stock_goods_category_tree", "ODS_STORE_GOODS": "billiards_ods.store_goods_master", "ODS_TABLE_FEE_DISCOUNT": "billiards_ods.table_fee_discount_records", "ODS_TENANT_GOODS": "billiards_ods.tenant_goods_master", } def check_zero_to_null_issues(db: DatabaseConnection, table_name: str, limit: int = 100): """ 检查 ODS 表中是否存在 payload 里有 0 值但对应列为 NULL 的情况 """ issues = [] # 获取表的列信息 schema, name = table_name.split(".", 1) if "." in table_name else ("public", table_name) col_sql = """ SELECT column_name, data_type, udt_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """ try: cols = db.query(col_sql, (schema, name)) except Exception as e: return {"error": str(e), "issues": []} # 筛选数值类型列(可能存在 0 转 NULL 问题) numeric_cols = [ c["column_name"] for c in cols if c["data_type"] in ("integer", "bigint", "smallint", "numeric", "double precision", "real", "decimal") ] # 查询最近的记录,检查 payload 中的值与列值 check_sql = f""" SELECT payload, {', '.join(f'"{c}"' for c in numeric_cols)} FROM {table_name} WHERE payload IS NOT NULL ORDER BY fetched_at DESC NULLS LAST LIMIT %s """ try: rows = db.query(check_sql, (limit,)) except Exception as e: return {"error": str(e), "issues": []} zero_to_null_count = {} for row in rows: payload = row.get("payload") if not payload: continue if isinstance(payload, str): try: payload = json.loads(payload) except: continue if not isinstance(payload, dict): continue # 检查每个数值列 for col in numeric_cols: db_value = row.get(col) # 从 payload 中获取对应的值(不区分大小写) payload_value = None for k, v in payload.items(): if k.lower() == col.lower(): payload_value = v break # 检查:payload 中是 0,但数据库中是 NULL if payload_value == 0 and db_value is None: if col not in zero_to_null_count: zero_to_null_count[col] = 0 zero_to_null_count[col] += 1 if zero_to_null_count: issues = [ {"column": col, "count": count, "issue": "API 中的 0 值在 ODS 中变成了 NULL"} for col, count in zero_to_null_count.items() ] return {"issues": issues, "checked_rows": len(rows)} def generate_report(): """生成完整的排查报告""" print("=" * 80) print("API -> ODS 字段排查报告") print("生成时间:", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) print("=" * 80) # 加载对比数据 comparison = load_api_ods_comparison() if not comparison: print("\n[错误] 未找到 API-ODS 对比文件 (api_ods_comparison.json)") print("请先运行 compare_api_ods_fields.py 生成对比数据") return # 统计缺失字段 print("\n" + "=" * 80) print("一、API 字段在 ODS 表中缺失的情况") print("=" * 80) missing_summary = [] for task_code, data in comparison.items(): missing = data.get("missing_in_ods", []) if missing: # 过滤掉 siteprofile 等嵌套对象和系统字段 filtered_missing = [ f for f in missing if f.lower() not in ("siteprofile", "settleprofile", "tableprofile", "address", "avatar", "business_tel", "customer_service_qrcode", "customer_service_wechat", "fixed_pay_qrcode", "full_address", "latitude", "longitude", "light_status", "light_token", "light_type", "org_id", "prod_env", "shop_name", "shop_status", "site_label", "site_type", "tenant_site_region_id", "wifi_name", "wifi_password", "attendance_distance", "attendance_enabled", "auto_light") ] if filtered_missing: missing_summary.append({ "task_code": task_code, "table_name": data.get("table_name"), "endpoint": data.get("endpoint"), "missing_fields": filtered_missing, }) if missing_summary: for item in missing_summary: print(f"\n【{item['task_code']}】") print(f" 表名: {item['table_name']}") print(f" 端点: {item['endpoint']}") print(f" 缺失字段 ({len(item['missing_fields'])} 个):") for field in item['missing_fields']: print(f" - {field}") else: print("\n没有发现明显缺失的业务字段。") # 检查 0 转 NULL 问题 print("\n" + "=" * 80) print("二、检查 API 中的 0 值在 ODS 中是否变成了 NULL") print("=" * 80) try: dsn = os.getenv("PG_DSN") if not dsn: print("[错误] 未找到 PG_DSN 环境变量") return db = DatabaseConnection(dsn) tables = get_ods_tables_mapping() zero_null_issues = [] for task_code, table_name in tables.items(): print(f"\n检查 {task_code} ({table_name})...") result = check_zero_to_null_issues(db, table_name) if result.get("error"): print(f" [错误] {result['error']}") continue if result.get("issues"): zero_null_issues.append({ "task_code": task_code, "table_name": table_name, "checked_rows": result["checked_rows"], "issues": result["issues"], }) for issue in result["issues"]: print(f" [发现问题] 列 '{issue['column']}': {issue['count']} 条记录 - {issue['issue']}") else: print(f" [正常] 检查了 {result['checked_rows']} 条记录,未发现 0 转 NULL 问题") db.close() except Exception as e: print(f"\n[错误] 数据库连接失败: {e}") zero_null_issues = [] # 生成汇总 print("\n" + "=" * 80) print("三、问题汇总") print("=" * 80) print("\n1. 需要添加的 ODS 表列:") if missing_summary: all_ddl = [] for item in missing_summary: table_name = item['table_name'] for field in item['missing_fields']: # 根据字段名推断类型 if field.endswith("_id") or field in ("tenant_id", "member_id", "site_id"): col_type = "BIGINT" elif field.endswith("_money") or field.endswith("_amount") or field.endswith("_price"): col_type = "NUMERIC(18,2)" elif field.endswith("_time") or field.startswith("create") or field.startswith("update"): col_type = "TIMESTAMP" elif field.startswith("is_") or field.endswith("_status"): col_type = "INTEGER" else: col_type = "TEXT" ddl = f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS {field} {col_type};" all_ddl.append(ddl) print("\n生成的 DDL 语句:") for ddl in all_ddl: print(f" {ddl}") else: print(" 无") print("\n2. 需要修复的 0 转 NULL 问题:") if zero_null_issues: for item in zero_null_issues: print(f"\n 【{item['task_code']}】({item['table_name']})") for issue in item['issues']: print(f" - 列 '{issue['column']}': {issue['count']} 条记录受影响") else: print(" 未发现明显的 0 转 NULL 问题") # 保存报告 report = { "generated_at": datetime.now().isoformat(), "missing_fields": missing_summary, "zero_to_null_issues": zero_null_issues, } report_path = Path(__file__).parent / "api_ods_issue_report.json" with open(report_path, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2) print(f"\n报告已保存到: {report_path}") return report if __name__ == "__main__": generate_report()