# -*- coding: utf-8 -*- """ 从 ODS payload 回填缺失的列值 """ import os import sys from pathlib import Path project_root = Path(__file__).parent.parent / "etl_billiards" sys.path.insert(0, str(project_root)) from dotenv import load_dotenv load_dotenv(project_root / ".env") from database.connection import DatabaseConnection # 回填配置: (表名, [(db_col, payload_jsonb_expr), ...]) BACKFILL_CONFIGS = [ # settlement_records - settleList 内的字段 ("billiards_ods.settlement_records", [ ("plcouponsaleamount", "(payload->'settleList')->>'plCouponSaleAmount'"), ("mervousalesamount", "(payload->'settleList')->>'merVouSalesAmount'"), ("electricitymoney", "(payload->'settleList')->>'electricityMoney'"), ("realelectricitymoney", "(payload->'settleList')->>'realElectricityMoney'"), ("electricityadjustmoney", "(payload->'settleList')->>'electricityAdjustMoney'"), ]), # recharge_settlements ("billiards_ods.recharge_settlements", [ ("plcouponsaleamount", "(payload->'settleList')->>'plCouponSaleAmount'"), ("mervousalesamount", "(payload->'settleList')->>'merVouSalesAmount'"), ("electricitymoney", "(payload->'settleList')->>'electricityMoney'"), ("realelectricitymoney", "(payload->'settleList')->>'realElectricityMoney'"), ("electricityadjustmoney", "(payload->'settleList')->>'electricityAdjustMoney'"), ]), # member_balance_changes ("billiards_ods.member_balance_changes", [ ("principal_before", "payload->>'principal_before'"), ("principal_after", "payload->>'principal_after'"), ("principal_data", "payload->>'principal_data'"), ]), # member_stored_value_cards ("billiards_ods.member_stored_value_cards", [ ("principal_balance", "payload->>'principal_balance'"), ("member_grade", "payload->>'member_grade'"), ("rechargefreezebalance", "payload->>'rechargeFreezeBalance'"), ("able_share_member_discount", "payload->>'able_share_member_discount'"), ("electricity_deduct_radio", "payload->>'electricity_deduct_radio'"), ("electricity_discount", "payload->>'electricity_discount'"), ("electricitycarddeduct", "payload->>'electricityCardDeduct'"), ]), # member_profiles ("billiards_ods.member_profiles", [ ("pay_money_sum", "payload->>'pay_money_sum'"), ("recharge_money_sum", "payload->>'recharge_money_sum'"), ("person_tenant_org_id", "payload->>'person_tenant_org_id'"), ("person_tenant_org_name", "payload->>'person_tenant_org_name'"), ("register_source", "payload->>'register_source'"), ]), # table_fee_transactions ("billiards_ods.table_fee_transactions", [ ("activity_discount_amount", "payload->>'activity_discount_amount'"), ("real_service_money", "payload->>'real_service_money'"), ("order_consumption_type", "payload->>'order_consumption_type'"), ]), # assistant_service_records ("billiards_ods.assistant_service_records", [ ("real_service_money", "payload->>'real_service_money'"), ("assistantteamname", "payload->>'assistantTeamName'"), ]), # store_goods_sales_records ("billiards_ods.store_goods_sales_records", [ ("coupon_share_money", "payload->>'coupon_share_money'"), ]), # group_buy_redemption_records ("billiards_ods.group_buy_redemption_records", [ ("coupon_sale_id", "payload->>'coupon_sale_id'"), ("member_discount_money", "payload->>'member_discount_money'"), ("assistant_share_money", "payload->>'assistant_share_money'"), ("table_share_money", "payload->>'table_share_money'"), ("goods_share_money", "payload->>'goods_share_money'"), ("recharge_share_money", "payload->>'recharge_share_money'"), ]), # site_tables_master ("billiards_ods.site_tables_master", [ ("order_id", "payload->>'order_id'"), ]), # store_goods_master ("billiards_ods.store_goods_master", [ ("commodity_code", "payload->>'commodity_code'"), ("not_sale", "payload->>'not_sale'"), ]), # table_fee_discount_records ("billiards_ods.table_fee_discount_records", [ ("table_name", "payload->>'table_name'"), ("table_price", "payload->>'table_price'"), ("charge_free", "payload->>'charge_free'"), ("area_type_id", "payload->>'area_type_id'"), ("site_table_area_id", "payload->>'site_table_area_id'"), ("site_table_area_name", "payload->>'site_table_area_name'"), ]), # tenant_goods_master ("billiards_ods.tenant_goods_master", [ ("not_sale", "payload->>'not_sale'"), ]), # group_buy_packages ("billiards_ods.group_buy_packages", [ ("sort", "payload->>'sort'"), ("is_first_limit", "payload->>'is_first_limit'"), ]), ] def column_exists(db, table: str, column: str) -> bool: schema, tbl = table.split(".") result = db.query(""" SELECT 1 FROM information_schema.columns WHERE table_schema = %s AND table_name = %s AND column_name = %s """, (schema, tbl, column.lower())) return bool(result) def get_column_type(db, table: str, column: str) -> str: schema, tbl = table.split(".") result = db.query(""" SELECT data_type FROM information_schema.columns WHERE table_schema = %s AND table_name = %s AND column_name = %s """, (schema, tbl, column.lower())) return result[0]["data_type"] if result else "text" def main(): dsn = os.getenv("PG_DSN") if not dsn: print("Error: PG_DSN not set") return db = DatabaseConnection(dsn) print("=" * 70) print("ODS Payload Backfill Script") print("=" * 70) total_updates = 0 errors = [] for table, columns in BACKFILL_CONFIGS: print(f"\n[{table}]") for db_col, payload_expr in columns: # Check column exists if not column_exists(db, table, db_col): print(f" {db_col}: SKIP (column not found)") continue # Get column type for proper casting col_type = get_column_type(db, table, db_col) # Build UPDATE SQL with proper type casting if col_type in ("numeric", "double precision", "real", "decimal"): cast_expr = f"({payload_expr})::numeric" elif col_type in ("integer", "bigint", "smallint"): cast_expr = f"({payload_expr})::bigint" elif col_type == "boolean": cast_expr = f"({payload_expr})::boolean" elif col_type in ("timestamp", "timestamp with time zone", "timestamp without time zone"): cast_expr = f"({payload_expr})::timestamp" else: cast_expr = payload_expr # text, keep as is sql = f""" UPDATE {table} SET "{db_col}" = {cast_expr} WHERE "{db_col}" IS NULL AND {payload_expr} IS NOT NULL """ try: db.execute(sql) db.commit() # Count updated count_sql = f""" SELECT COUNT(*) as cnt FROM {table} WHERE "{db_col}" IS NOT NULL """ cnt = db.query(count_sql)[0]["cnt"] print(f" {db_col}: OK (now {cnt} non-null)") total_updates += 1 except Exception as e: db.rollback() err_msg = str(e).split("\n")[0][:80] print(f" {db_col}: ERROR - {err_msg}") errors.append((table, db_col, err_msg)) print("\n" + "=" * 70) print(f"Completed: {total_updates} columns processed") if errors: print(f"Errors: {len(errors)}") for t, c, e in errors: print(f" - {t}.{c}: {e}") db.close() if __name__ == "__main__": main()