# -*- coding: utf-8 -*- """ 添加缺失的 DWD 列到数据库 根据 ODS 新增字段,在对应的 DWD 表中添加相关列 """ import psycopg2 DSN = 'postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test' # DWD 表缺失字段定义:表名 -> [(列名, 类型, 注释)] # 根据计划,核心业务字段放主表,扩展字段放 _ex 表 MISSING_COLUMNS = { # 结算表 - 核心金额字段放主表 'billiards_dwd.dwd_settlement_head': [ ('electricity_money', 'NUMERIC(18,2)', '电费金额'), ('real_electricity_money', 'NUMERIC(18,2)', '实际电费金额'), ('electricity_adjust_money', 'NUMERIC(18,2)', '电费调整金额'), ('pl_coupon_sale_amount', 'NUMERIC(18,2)', '平台券销售额'), ('mervou_sales_amount', 'NUMERIC(18,2)', '商户券销售额'), ], 'billiards_dwd.dwd_settlement_head_ex': [ ('settle_list', 'JSONB', '结算明细列表'), ], # 台费流水表 'billiards_dwd.dwd_table_fee_log': [ ('activity_discount_amount', 'NUMERIC(18,2)', '活动折扣金额'), ('real_service_money', 'NUMERIC(18,2)', '实际服务费金额'), ], 'billiards_dwd.dwd_table_fee_log_ex': [ ('order_consumption_type', 'INT', '订单消费类型'), ], # 助教服务流水表 'billiards_dwd.dwd_assistant_service_log': [ ('real_service_money', 'NUMERIC(18,2)', '实际服务费金额'), ], 'billiards_dwd.dwd_assistant_service_log_ex': [ ('assistant_team_name', 'TEXT', '助教团队名称'), ], # 团购核销记录表 'billiards_dwd.dwd_groupbuy_redemption': [ ('member_discount_money', 'NUMERIC(18,2)', '会员折扣金额'), ('coupon_sale_id', 'BIGINT', '优惠券销售ID'), ], 'billiards_dwd.dwd_groupbuy_redemption_ex': [ ('table_share_money', 'NUMERIC(18,2)', '台费分摊金额'), ('table_service_share_money', 'NUMERIC(18,2)', '台费服务分摊金额'), ('goods_share_money', 'NUMERIC(18,2)', '商品分摊金额'), ('good_service_share_money', 'NUMERIC(18,2)', '商品服务分摊金额'), ('assistant_share_money', 'NUMERIC(18,2)', '助教分摊金额'), ('assistant_service_share_money', 'NUMERIC(18,2)', '助教服务分摊金额'), ('recharge_share_money', 'NUMERIC(18,2)', '充值分摊金额'), ], # 台费调整记录表 'billiards_dwd.dwd_table_fee_adjust': [ ('table_name', 'TEXT', '台桌名称'), ('table_price', 'NUMERIC(18,2)', '台桌价格'), ('charge_free', 'BOOLEAN', '是否免费'), ], 'billiards_dwd.dwd_table_fee_adjust_ex': [ ('area_type_id', 'BIGINT', '区域类型ID'), ('site_table_area_id', 'BIGINT', '门店台区ID'), ('site_table_area_name', 'TEXT', '门店台区名称'), ('site_name', 'TEXT', '门店名称'), ('tenant_name', 'TEXT', '租户名称'), ], # 会员储值卡维度表 'billiards_dwd.dim_member_card_account': [ ('principal_balance', 'NUMERIC(18,2)', '本金余额'), ('member_grade', 'INT', '会员等级'), ], 'billiards_dwd.dim_member_card_account_ex': [ ('able_share_member_discount', 'BOOLEAN', '是否可共享会员折扣'), ('electricity_deduct_radio', 'NUMERIC(18,4)', '电费扣减比例'), ('electricity_discount', 'NUMERIC(18,4)', '电费折扣'), ('electricity_card_deduct', 'BOOLEAN', '电费卡扣'), ('recharge_freeze_balance', 'NUMERIC(18,2)', '充值冻结余额'), ], # 会员维度表 'billiards_dwd.dim_member': [ ('pay_money_sum', 'NUMERIC(18,2)', '累计支付金额'), ('recharge_money_sum', 'NUMERIC(18,2)', '累计充值金额'), ], 'billiards_dwd.dim_member_ex': [ ('person_tenant_org_id', 'BIGINT', '人员租户组织ID'), ('person_tenant_org_name', 'TEXT', '人员租户组织名称'), ('register_source', 'TEXT', '注册来源'), ], # 会员余额变更表 'billiards_dwd.dwd_member_balance_change': [ ('principal_before', 'NUMERIC(18,2)', '变动前本金'), ('principal_after', 'NUMERIC(18,2)', '变动后本金'), ], 'billiards_dwd.dwd_member_balance_change_ex': [ ('principal_data', 'TEXT', '本金变动数据'), ], # 团购套餐维度表 'billiards_dwd.dim_groupbuy_package': [ ('sort', 'INT', '排序'), ('is_first_limit', 'BOOLEAN', '是否首单限制'), ], 'billiards_dwd.dim_groupbuy_package_ex': [ ('tenant_coupon_sale_order_item_id', 'BIGINT', '租户券销售订单项ID'), ], # 门店商品维度表 'billiards_dwd.dim_store_goods': [ ('commodity_code', 'TEXT', '商品编码'), ('not_sale', 'BOOLEAN', '是否停售'), ], # 台桌维度表 'billiards_dwd.dim_table': [ ('order_id', 'BIGINT', '订单ID'), ], # 租户商品维度表 'billiards_dwd.dim_tenant_goods': [ ('not_sale', 'BOOLEAN', '是否停售'), ], # 助教作废记录表 'billiards_dwd.dwd_assistant_cancel_log': [ ('tenant_id', 'BIGINT', '租户ID'), ], # 商品销售流水表 'billiards_dwd.dwd_goods_sale_log': [ ('coupon_share_money', 'NUMERIC(18,2)', '优惠券分摊金额'), ], # 支付流水表 'billiards_dwd.dwd_payment': [ ('tenant_id', 'BIGINT', '租户ID'), ], } def get_existing_columns(conn, schema, table): """获取表已有的列""" sql = """ SELECT column_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s """ with conn.cursor() as cur: cur.execute(sql, (schema, table)) return {row[0].lower() for row in cur.fetchall()} def table_exists(conn, schema, table): """检查表是否存在""" sql = """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = %s AND table_name = %s ) """ with conn.cursor() as cur: cur.execute(sql, (schema, table)) return cur.fetchone()[0] def add_column(conn, full_table, col_name, col_type, comment): """添加列""" sql = f'ALTER TABLE {full_table} ADD COLUMN IF NOT EXISTS "{col_name}" {col_type}' comment_sql = f"COMMENT ON COLUMN {full_table}.\"{col_name}\" IS '{comment}'" with conn.cursor() as cur: cur.execute(sql) cur.execute(comment_sql) conn.commit() print(f" [OK] 添加列: {col_name} ({col_type})") def main(): conn = psycopg2.connect(DSN) print("=" * 80) print("添加缺失的 DWD 列") print("=" * 80) total_added = 0 total_skipped = 0 tables_not_found = [] for full_table, columns in MISSING_COLUMNS.items(): schema, table = full_table.split('.') if not table_exists(conn, schema, table): print(f"\n[跳过] 表不存在: {full_table}") tables_not_found.append(full_table) continue print(f"\n处理表: {full_table}") existing = get_existing_columns(conn, schema, table) for col_name, col_type, comment in columns: if col_name.lower() in existing: print(f" [跳过] 列已存在: {col_name}") total_skipped += 1 else: add_column(conn, full_table, col_name, col_type, comment) total_added += 1 conn.close() print("\n" + "=" * 80) print(f"完成: 添加 {total_added} 列, 跳过 {total_skipped} 列") if tables_not_found: print(f"未找到的表: {len(tables_not_found)}") for t in tables_not_found: print(f" - {t}") print("=" * 80) if __name__ == '__main__': main()