209 lines
7.9 KiB
Python
209 lines
7.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
从 ODS payload 回填缺失的列值
|
|
"""
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
project_root = Path(__file__).parent.parent / "etl_billiards"
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from dotenv import load_dotenv
|
|
load_dotenv(project_root / ".env")
|
|
|
|
from database.connection import DatabaseConnection
|
|
|
|
|
|
# 回填配置: (表名, [(db_col, payload_jsonb_expr), ...])
|
|
BACKFILL_CONFIGS = [
|
|
# settlement_records - settleList 内的字段
|
|
("billiards_ods.settlement_records", [
|
|
("plcouponsaleamount", "(payload->'settleList')->>'plCouponSaleAmount'"),
|
|
("mervousalesamount", "(payload->'settleList')->>'merVouSalesAmount'"),
|
|
("electricitymoney", "(payload->'settleList')->>'electricityMoney'"),
|
|
("realelectricitymoney", "(payload->'settleList')->>'realElectricityMoney'"),
|
|
("electricityadjustmoney", "(payload->'settleList')->>'electricityAdjustMoney'"),
|
|
]),
|
|
# recharge_settlements
|
|
("billiards_ods.recharge_settlements", [
|
|
("plcouponsaleamount", "(payload->'settleList')->>'plCouponSaleAmount'"),
|
|
("mervousalesamount", "(payload->'settleList')->>'merVouSalesAmount'"),
|
|
("electricitymoney", "(payload->'settleList')->>'electricityMoney'"),
|
|
("realelectricitymoney", "(payload->'settleList')->>'realElectricityMoney'"),
|
|
("electricityadjustmoney", "(payload->'settleList')->>'electricityAdjustMoney'"),
|
|
]),
|
|
# member_balance_changes
|
|
("billiards_ods.member_balance_changes", [
|
|
("principal_before", "payload->>'principal_before'"),
|
|
("principal_after", "payload->>'principal_after'"),
|
|
("principal_data", "payload->>'principal_data'"),
|
|
]),
|
|
# member_stored_value_cards
|
|
("billiards_ods.member_stored_value_cards", [
|
|
("principal_balance", "payload->>'principal_balance'"),
|
|
("member_grade", "payload->>'member_grade'"),
|
|
("rechargefreezebalance", "payload->>'rechargeFreezeBalance'"),
|
|
("able_share_member_discount", "payload->>'able_share_member_discount'"),
|
|
("electricity_deduct_radio", "payload->>'electricity_deduct_radio'"),
|
|
("electricity_discount", "payload->>'electricity_discount'"),
|
|
("electricitycarddeduct", "payload->>'electricityCardDeduct'"),
|
|
]),
|
|
# member_profiles
|
|
("billiards_ods.member_profiles", [
|
|
("pay_money_sum", "payload->>'pay_money_sum'"),
|
|
("recharge_money_sum", "payload->>'recharge_money_sum'"),
|
|
("person_tenant_org_id", "payload->>'person_tenant_org_id'"),
|
|
("person_tenant_org_name", "payload->>'person_tenant_org_name'"),
|
|
("register_source", "payload->>'register_source'"),
|
|
]),
|
|
# table_fee_transactions
|
|
("billiards_ods.table_fee_transactions", [
|
|
("activity_discount_amount", "payload->>'activity_discount_amount'"),
|
|
("real_service_money", "payload->>'real_service_money'"),
|
|
("order_consumption_type", "payload->>'order_consumption_type'"),
|
|
]),
|
|
# assistant_service_records
|
|
("billiards_ods.assistant_service_records", [
|
|
("real_service_money", "payload->>'real_service_money'"),
|
|
("assistantteamname", "payload->>'assistantTeamName'"),
|
|
]),
|
|
# store_goods_sales_records
|
|
("billiards_ods.store_goods_sales_records", [
|
|
("coupon_share_money", "payload->>'coupon_share_money'"),
|
|
]),
|
|
# group_buy_redemption_records
|
|
("billiards_ods.group_buy_redemption_records", [
|
|
("coupon_sale_id", "payload->>'coupon_sale_id'"),
|
|
("member_discount_money", "payload->>'member_discount_money'"),
|
|
("assistant_share_money", "payload->>'assistant_share_money'"),
|
|
("table_share_money", "payload->>'table_share_money'"),
|
|
("goods_share_money", "payload->>'goods_share_money'"),
|
|
("recharge_share_money", "payload->>'recharge_share_money'"),
|
|
]),
|
|
# site_tables_master
|
|
("billiards_ods.site_tables_master", [
|
|
("order_id", "payload->>'order_id'"),
|
|
]),
|
|
# store_goods_master
|
|
("billiards_ods.store_goods_master", [
|
|
("commodity_code", "payload->>'commodity_code'"),
|
|
("not_sale", "payload->>'not_sale'"),
|
|
]),
|
|
# table_fee_discount_records
|
|
("billiards_ods.table_fee_discount_records", [
|
|
("table_name", "payload->>'table_name'"),
|
|
("table_price", "payload->>'table_price'"),
|
|
("charge_free", "payload->>'charge_free'"),
|
|
("area_type_id", "payload->>'area_type_id'"),
|
|
("site_table_area_id", "payload->>'site_table_area_id'"),
|
|
("site_table_area_name", "payload->>'site_table_area_name'"),
|
|
]),
|
|
# tenant_goods_master
|
|
("billiards_ods.tenant_goods_master", [
|
|
("not_sale", "payload->>'not_sale'"),
|
|
]),
|
|
# group_buy_packages
|
|
("billiards_ods.group_buy_packages", [
|
|
("sort", "payload->>'sort'"),
|
|
("is_first_limit", "payload->>'is_first_limit'"),
|
|
]),
|
|
]
|
|
|
|
|
|
def column_exists(db, table: str, column: str) -> bool:
|
|
schema, tbl = table.split(".")
|
|
result = db.query("""
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_schema = %s AND table_name = %s AND column_name = %s
|
|
""", (schema, tbl, column.lower()))
|
|
return bool(result)
|
|
|
|
|
|
def get_column_type(db, table: str, column: str) -> str:
|
|
schema, tbl = table.split(".")
|
|
result = db.query("""
|
|
SELECT data_type FROM information_schema.columns
|
|
WHERE table_schema = %s AND table_name = %s AND column_name = %s
|
|
""", (schema, tbl, column.lower()))
|
|
return result[0]["data_type"] if result else "text"
|
|
|
|
|
|
def main():
|
|
dsn = os.getenv("PG_DSN")
|
|
if not dsn:
|
|
print("Error: PG_DSN not set")
|
|
return
|
|
|
|
db = DatabaseConnection(dsn)
|
|
|
|
print("=" * 70)
|
|
print("ODS Payload Backfill Script")
|
|
print("=" * 70)
|
|
|
|
total_updates = 0
|
|
errors = []
|
|
|
|
for table, columns in BACKFILL_CONFIGS:
|
|
print(f"\n[{table}]")
|
|
|
|
for db_col, payload_expr in columns:
|
|
# Check column exists
|
|
if not column_exists(db, table, db_col):
|
|
print(f" {db_col}: SKIP (column not found)")
|
|
continue
|
|
|
|
# Get column type for proper casting
|
|
col_type = get_column_type(db, table, db_col)
|
|
|
|
# Build UPDATE SQL with proper type casting
|
|
if col_type in ("numeric", "double precision", "real", "decimal"):
|
|
cast_expr = f"({payload_expr})::numeric"
|
|
elif col_type in ("integer", "bigint", "smallint"):
|
|
cast_expr = f"({payload_expr})::bigint"
|
|
elif col_type == "boolean":
|
|
cast_expr = f"({payload_expr})::boolean"
|
|
elif col_type in ("timestamp", "timestamp with time zone", "timestamp without time zone"):
|
|
cast_expr = f"({payload_expr})::timestamp"
|
|
else:
|
|
cast_expr = payload_expr # text, keep as is
|
|
|
|
sql = f"""
|
|
UPDATE {table}
|
|
SET "{db_col}" = {cast_expr}
|
|
WHERE "{db_col}" IS NULL
|
|
AND {payload_expr} IS NOT NULL
|
|
"""
|
|
|
|
try:
|
|
db.execute(sql)
|
|
db.commit()
|
|
|
|
# Count updated
|
|
count_sql = f"""
|
|
SELECT COUNT(*) as cnt FROM {table}
|
|
WHERE "{db_col}" IS NOT NULL
|
|
"""
|
|
cnt = db.query(count_sql)[0]["cnt"]
|
|
print(f" {db_col}: OK (now {cnt} non-null)")
|
|
total_updates += 1
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
err_msg = str(e).split("\n")[0][:80]
|
|
print(f" {db_col}: ERROR - {err_msg}")
|
|
errors.append((table, db_col, err_msg))
|
|
|
|
print("\n" + "=" * 70)
|
|
print(f"Completed: {total_updates} columns processed")
|
|
if errors:
|
|
print(f"Errors: {len(errors)}")
|
|
for t, c, e in errors:
|
|
print(f" - {t}.{c}: {e}")
|
|
|
|
db.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|