# -*- coding: utf-8 -*- """ 同步 ODS 字段到 DWD 数据库表 1. 检测 ODS 新增字段对应的 DWD 表是否缺失列 2. 根据 dwd_load_task.py 的 FACT_MAPPINGS 生成 DDL """ import json import os import sys from datetime import datetime from pathlib import Path # 添加项目路径 project_root = Path(__file__).parent.parent / "etl_billiards" sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv(project_root / ".env") from database.connection import DatabaseConnection # ODS -> DWD 表映射(从 dwd_load_task.py 提取) ODS_TO_DWD_MAP = { "billiards_ods.table_fee_transactions": [ "billiards_dwd.dim_site", "billiards_dwd.dim_site_ex", "billiards_dwd.dwd_table_fee_log", "billiards_dwd.dwd_table_fee_log_ex", ], "billiards_ods.site_tables_master": [ "billiards_dwd.dim_table", "billiards_dwd.dim_table_ex", ], "billiards_ods.assistant_accounts_master": [ "billiards_dwd.dim_assistant", "billiards_dwd.dim_assistant_ex", ], "billiards_ods.assistant_service_records": [ "billiards_dwd.dwd_assistant_service_log", "billiards_dwd.dwd_assistant_service_log_ex", ], "billiards_ods.assistant_cancellation_records": [ "billiards_dwd.dwd_assistant_trash_event", "billiards_dwd.dwd_assistant_trash_event_ex", ], "billiards_ods.store_goods_sales_records": [ "billiards_dwd.dwd_store_goods_sale", "billiards_dwd.dwd_store_goods_sale_ex", ], "billiards_ods.payment_transactions": [ "billiards_dwd.dwd_payment", ], "billiards_ods.member_profiles": [ "billiards_dwd.dim_member", "billiards_dwd.dim_member_ex", ], "billiards_ods.member_stored_value_cards": [ "billiards_dwd.dim_member_card_account", "billiards_dwd.dim_member_card_account_ex", ], "billiards_ods.member_balance_changes": [ "billiards_dwd.dwd_member_balance_change", "billiards_dwd.dwd_member_balance_change_ex", ], "billiards_ods.settlement_records": [ "billiards_dwd.dwd_settlement_head", "billiards_dwd.dwd_settlement_head_ex", ], "billiards_ods.recharge_settlements": [ "billiards_dwd.dwd_recharge_order", "billiards_dwd.dwd_recharge_order_ex", ], "billiards_ods.group_buy_packages": [ "billiards_dwd.dim_groupbuy_package", "billiards_dwd.dim_groupbuy_package_ex", ], "billiards_ods.group_buy_redemption_records": [ "billiards_dwd.dwd_groupbuy_redemption", "billiards_dwd.dwd_groupbuy_redemption_ex", ], "billiards_ods.table_fee_discount_records": [ "billiards_dwd.dwd_table_fee_adjust", "billiards_dwd.dwd_table_fee_adjust_ex", ], "billiards_ods.tenant_goods_master": [ "billiards_dwd.dim_tenant_goods", "billiards_dwd.dim_tenant_goods_ex", ], "billiards_ods.store_goods_master": [ "billiards_dwd.dim_store_goods", "billiards_dwd.dim_store_goods_ex", ], } # 需要同步到 DWD 的新增 ODS 字段(从排查报告中获取) # 格式: {ods_table: [(ods_col, dwd_col, dwd_table, col_type), ...]} NEW_FIELDS_TO_DWD = { "billiards_ods.table_fee_transactions": [ ("activity_discount_amount", "activity_discount_amount", "billiards_dwd.dwd_table_fee_log", "NUMERIC(18,2)"), ("real_service_money", "real_service_money", "billiards_dwd.dwd_table_fee_log", "NUMERIC(18,2)"), ("order_consumption_type", "order_consumption_type", "billiards_dwd.dwd_table_fee_log_ex", "INTEGER"), ], "billiards_ods.assistant_service_records": [ ("real_service_money", "real_service_money", "billiards_dwd.dwd_assistant_service_log", "NUMERIC(18,2)"), ("assistantteamname", "assistant_team_name", "billiards_dwd.dwd_assistant_service_log_ex", "TEXT"), ], "billiards_ods.assistant_cancellation_records": [ ("tenant_id", "tenant_id", "billiards_dwd.dwd_assistant_trash_event", "BIGINT"), ], "billiards_ods.store_goods_sales_records": [ ("coupon_share_money", "coupon_share_money", "billiards_dwd.dwd_store_goods_sale", "NUMERIC(18,2)"), ], "billiards_ods.payment_transactions": [ ("tenant_id", "tenant_id", "billiards_dwd.dwd_payment", "BIGINT"), ], "billiards_ods.member_profiles": [ ("pay_money_sum", "pay_money_sum", "billiards_dwd.dim_member", "NUMERIC(18,2)"), ("recharge_money_sum", "recharge_money_sum", "billiards_dwd.dim_member", "NUMERIC(18,2)"), ("person_tenant_org_id", "person_tenant_org_id", "billiards_dwd.dim_member_ex", "BIGINT"), ("person_tenant_org_name", "person_tenant_org_name", "billiards_dwd.dim_member_ex", "TEXT"), ("register_source", "register_source", "billiards_dwd.dim_member_ex", "TEXT"), ], "billiards_ods.member_stored_value_cards": [ ("principal_balance", "principal_balance", "billiards_dwd.dim_member_card_account", "NUMERIC(18,2)"), ("member_grade", "member_grade", "billiards_dwd.dim_member_card_account", "INTEGER"), ("able_share_member_discount", "able_share_member_discount", "billiards_dwd.dim_member_card_account_ex", "BOOLEAN"), ("electricity_deduct_radio", "electricity_deduct_radio", "billiards_dwd.dim_member_card_account_ex", "NUMERIC(10,4)"), ("electricity_discount", "electricity_discount", "billiards_dwd.dim_member_card_account_ex", "NUMERIC(10,4)"), ("electricitycarddeduct", "electricity_card_deduct", "billiards_dwd.dim_member_card_account_ex", "BOOLEAN"), ("rechargefreezebalance", "recharge_freeze_balance", "billiards_dwd.dim_member_card_account_ex", "NUMERIC(18,2)"), ], "billiards_ods.member_balance_changes": [ ("principal_after", "principal_after", "billiards_dwd.dwd_member_balance_change", "NUMERIC(18,2)"), ("principal_before", "principal_before", "billiards_dwd.dwd_member_balance_change", "NUMERIC(18,2)"), ("principal_data", "principal_change_amount", "billiards_dwd.dwd_member_balance_change", "NUMERIC(18,2)"), ], "billiards_ods.settlement_records": [ ("tenant_id", "tenant_id", "billiards_dwd.dwd_settlement_head", "BIGINT"), ], "billiards_ods.recharge_settlements": [ ("tenant_id", "tenant_id", "billiards_dwd.dwd_recharge_order", "BIGINT"), ], "billiards_ods.group_buy_packages": [ ("sort", "sort", "billiards_dwd.dim_groupbuy_package", "INTEGER"), ("is_first_limit", "is_first_limit", "billiards_dwd.dim_groupbuy_package", "BOOLEAN"), ("tenantcouponsaleorderitemid", "tenant_coupon_sale_order_item_id", "billiards_dwd.dim_groupbuy_package_ex", "BIGINT"), ], "billiards_ods.group_buy_redemption_records": [ ("coupon_sale_id", "coupon_sale_id", "billiards_dwd.dwd_groupbuy_redemption", "BIGINT"), ("member_discount_money", "member_discount_money", "billiards_dwd.dwd_groupbuy_redemption", "NUMERIC(18,2)"), ("assistant_share_money", "assistant_share_money", "billiards_dwd.dwd_groupbuy_redemption_ex", "NUMERIC(18,2)"), ("table_share_money", "table_share_money", "billiards_dwd.dwd_groupbuy_redemption_ex", "NUMERIC(18,2)"), ("goods_share_money", "goods_share_money", "billiards_dwd.dwd_groupbuy_redemption_ex", "NUMERIC(18,2)"), ("recharge_share_money", "recharge_share_money", "billiards_dwd.dwd_groupbuy_redemption_ex", "NUMERIC(18,2)"), ], "billiards_ods.site_tables_master": [ ("order_id", "order_id", "billiards_dwd.dim_table", "BIGINT"), ], "billiards_ods.store_goods_master": [ ("commodity_code", "commodity_code", "billiards_dwd.dim_store_goods", "TEXT"), ("not_sale", "not_sale", "billiards_dwd.dim_store_goods", "INTEGER"), ], "billiards_ods.table_fee_discount_records": [ ("table_name", "table_name", "billiards_dwd.dwd_table_fee_adjust", "TEXT"), ("table_price", "table_price", "billiards_dwd.dwd_table_fee_adjust", "NUMERIC(18,2)"), ("charge_free", "charge_free", "billiards_dwd.dwd_table_fee_adjust", "BOOLEAN"), ("area_type_id", "area_type_id", "billiards_dwd.dwd_table_fee_adjust_ex", "BIGINT"), ("site_table_area_id", "site_table_area_id", "billiards_dwd.dwd_table_fee_adjust_ex", "BIGINT"), ("site_table_area_name", "site_table_area_name", "billiards_dwd.dwd_table_fee_adjust_ex", "TEXT"), ("sitename", "site_name", "billiards_dwd.dwd_table_fee_adjust_ex", "TEXT"), ("tenant_name", "tenant_name", "billiards_dwd.dwd_table_fee_adjust_ex", "TEXT"), ], "billiards_ods.tenant_goods_master": [ ("not_sale", "not_sale", "billiards_dwd.dim_tenant_goods", "INTEGER"), ], } def get_db_table_columns(db: DatabaseConnection, table_name: str) -> set: """获取数据库表的所有列名""" schema, name = table_name.split(".", 1) if "." in table_name else ("public", table_name) sql = """ SELECT column_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s """ rows = db.query(sql, (schema, name)) return {r["column_name"].lower() for r in rows} def main(): print("=" * 80) print("ODS → DWD 字段同步脚本") print("时间:", datetime.now().strftime("%Y-%m-%d %H:%M:%S")) print("=" * 80) # 连接数据库 dsn = os.getenv("PG_DSN") if not dsn: print("[错误] 未找到 PG_DSN 环境变量") return False db = DatabaseConnection(dsn) all_ddl = [] executed_ddl = [] failed_ddl = [] for ods_table, fields in NEW_FIELDS_TO_DWD.items(): print(f"\n处理 ODS 表: {ods_table}") for ods_col, dwd_col, dwd_table, col_type in fields: # 检查 DWD 表是否存在该列 try: dwd_cols = get_db_table_columns(db, dwd_table) except Exception as e: print(f" [跳过] DWD 表 {dwd_table} 不存在或无法访问: {e}") continue if dwd_col.lower() in dwd_cols: print(f" [存在] {dwd_table}.{dwd_col}") continue # 生成 DDL ddl = f'ALTER TABLE {dwd_table} ADD COLUMN IF NOT EXISTS "{dwd_col}" {col_type};' all_ddl.append(ddl) # 执行 DDL try: db.execute(ddl) db.commit() executed_ddl.append(ddl) print(f" [新增] {dwd_table}.{dwd_col} ({col_type})") except Exception as e: db.rollback() failed_ddl.append((ddl, str(e))) print(f" [失败] {dwd_table}.{dwd_col} - {e}") db.close() # 汇总 print("\n" + "=" * 80) print("执行汇总") print("=" * 80) print(f"总计生成 DDL: {len(all_ddl)} 条") print(f"执行成功: {len(executed_ddl)} 条") print(f"执行失败: {len(failed_ddl)} 条") if failed_ddl: print("\n失败的 DDL:") for ddl, err in failed_ddl: print(f" - {ddl}") print(f" 错误: {err}") # 保存执行日志 log_file = Path(__file__).parent / "sync_dwd_columns_log.json" log = { "executed_at": datetime.now().isoformat(), "total_ddl": len(all_ddl), "success_count": len(executed_ddl), "failed_count": len(failed_ddl), "executed_ddl": executed_ddl, "failed_ddl": [{"ddl": d, "error": e} for d, e in failed_ddl], } with open(log_file, "w", encoding="utf-8") as f: json.dump(log, f, ensure_ascii=False, indent=2) print(f"\n执行日志已保存到: {log_file}") return len(failed_ddl) == 0 if __name__ == "__main__": success = main() sys.exit(0 if success else 1)