227 lines
7.7 KiB
Python
227 lines
7.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
添加缺失的 DWD 列到数据库
|
|
根据 ODS 新增字段,在对应的 DWD 表中添加相关列
|
|
"""
|
|
import psycopg2
|
|
|
|
DSN = 'postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test'
|
|
|
|
# DWD 表缺失字段定义:表名 -> [(列名, 类型, 注释)]
|
|
# 根据计划,核心业务字段放主表,扩展字段放 _ex 表
|
|
MISSING_COLUMNS = {
|
|
# 结算表 - 核心金额字段放主表
|
|
'billiards_dwd.dwd_settlement_head': [
|
|
('electricity_money', 'NUMERIC(18,2)', '电费金额'),
|
|
('real_electricity_money', 'NUMERIC(18,2)', '实际电费金额'),
|
|
('electricity_adjust_money', 'NUMERIC(18,2)', '电费调整金额'),
|
|
('pl_coupon_sale_amount', 'NUMERIC(18,2)', '平台券销售额'),
|
|
('mervou_sales_amount', 'NUMERIC(18,2)', '商户券销售额'),
|
|
],
|
|
'billiards_dwd.dwd_settlement_head_ex': [
|
|
('settle_list', 'JSONB', '结算明细列表'),
|
|
],
|
|
|
|
# 台费流水表
|
|
'billiards_dwd.dwd_table_fee_log': [
|
|
('activity_discount_amount', 'NUMERIC(18,2)', '活动折扣金额'),
|
|
('real_service_money', 'NUMERIC(18,2)', '实际服务费金额'),
|
|
],
|
|
'billiards_dwd.dwd_table_fee_log_ex': [
|
|
('order_consumption_type', 'INT', '订单消费类型'),
|
|
],
|
|
|
|
# 助教服务流水表
|
|
'billiards_dwd.dwd_assistant_service_log': [
|
|
('real_service_money', 'NUMERIC(18,2)', '实际服务费金额'),
|
|
],
|
|
'billiards_dwd.dwd_assistant_service_log_ex': [
|
|
('assistant_team_name', 'TEXT', '助教团队名称'),
|
|
],
|
|
|
|
# 团购核销记录表
|
|
'billiards_dwd.dwd_groupbuy_redemption': [
|
|
('member_discount_money', 'NUMERIC(18,2)', '会员折扣金额'),
|
|
('coupon_sale_id', 'BIGINT', '优惠券销售ID'),
|
|
],
|
|
'billiards_dwd.dwd_groupbuy_redemption_ex': [
|
|
('table_share_money', 'NUMERIC(18,2)', '台费分摊金额'),
|
|
('table_service_share_money', 'NUMERIC(18,2)', '台费服务分摊金额'),
|
|
('goods_share_money', 'NUMERIC(18,2)', '商品分摊金额'),
|
|
('good_service_share_money', 'NUMERIC(18,2)', '商品服务分摊金额'),
|
|
('assistant_share_money', 'NUMERIC(18,2)', '助教分摊金额'),
|
|
('assistant_service_share_money', 'NUMERIC(18,2)', '助教服务分摊金额'),
|
|
('recharge_share_money', 'NUMERIC(18,2)', '充值分摊金额'),
|
|
],
|
|
|
|
# 台费调整记录表
|
|
'billiards_dwd.dwd_table_fee_adjust': [
|
|
('table_name', 'TEXT', '台桌名称'),
|
|
('table_price', 'NUMERIC(18,2)', '台桌价格'),
|
|
('charge_free', 'BOOLEAN', '是否免费'),
|
|
],
|
|
'billiards_dwd.dwd_table_fee_adjust_ex': [
|
|
('area_type_id', 'BIGINT', '区域类型ID'),
|
|
('site_table_area_id', 'BIGINT', '门店台区ID'),
|
|
('site_table_area_name', 'TEXT', '门店台区名称'),
|
|
('site_name', 'TEXT', '门店名称'),
|
|
('tenant_name', 'TEXT', '租户名称'),
|
|
],
|
|
|
|
# 会员储值卡维度表
|
|
'billiards_dwd.dim_member_card_account': [
|
|
('principal_balance', 'NUMERIC(18,2)', '本金余额'),
|
|
('member_grade', 'INT', '会员等级'),
|
|
],
|
|
'billiards_dwd.dim_member_card_account_ex': [
|
|
('able_share_member_discount', 'BOOLEAN', '是否可共享会员折扣'),
|
|
('electricity_deduct_radio', 'NUMERIC(18,4)', '电费扣减比例'),
|
|
('electricity_discount', 'NUMERIC(18,4)', '电费折扣'),
|
|
('electricity_card_deduct', 'BOOLEAN', '电费卡扣'),
|
|
('recharge_freeze_balance', 'NUMERIC(18,2)', '充值冻结余额'),
|
|
],
|
|
|
|
# 会员维度表
|
|
'billiards_dwd.dim_member': [
|
|
('pay_money_sum', 'NUMERIC(18,2)', '累计支付金额'),
|
|
('recharge_money_sum', 'NUMERIC(18,2)', '累计充值金额'),
|
|
],
|
|
'billiards_dwd.dim_member_ex': [
|
|
('person_tenant_org_id', 'BIGINT', '人员租户组织ID'),
|
|
('person_tenant_org_name', 'TEXT', '人员租户组织名称'),
|
|
('register_source', 'TEXT', '注册来源'),
|
|
],
|
|
|
|
# 会员余额变更表
|
|
'billiards_dwd.dwd_member_balance_change': [
|
|
('principal_before', 'NUMERIC(18,2)', '变动前本金'),
|
|
('principal_after', 'NUMERIC(18,2)', '变动后本金'),
|
|
],
|
|
'billiards_dwd.dwd_member_balance_change_ex': [
|
|
('principal_data', 'TEXT', '本金变动数据'),
|
|
],
|
|
|
|
# 团购套餐维度表
|
|
'billiards_dwd.dim_groupbuy_package': [
|
|
('sort', 'INT', '排序'),
|
|
('is_first_limit', 'BOOLEAN', '是否首单限制'),
|
|
],
|
|
'billiards_dwd.dim_groupbuy_package_ex': [
|
|
('tenant_coupon_sale_order_item_id', 'BIGINT', '租户券销售订单项ID'),
|
|
],
|
|
|
|
# 门店商品维度表
|
|
'billiards_dwd.dim_store_goods': [
|
|
('commodity_code', 'TEXT', '商品编码'),
|
|
('not_sale', 'BOOLEAN', '是否停售'),
|
|
],
|
|
|
|
# 台桌维度表
|
|
'billiards_dwd.dim_table': [
|
|
('order_id', 'BIGINT', '订单ID'),
|
|
],
|
|
|
|
# 租户商品维度表
|
|
'billiards_dwd.dim_tenant_goods': [
|
|
('not_sale', 'BOOLEAN', '是否停售'),
|
|
],
|
|
|
|
# 助教作废记录表
|
|
'billiards_dwd.dwd_assistant_cancel_log': [
|
|
('tenant_id', 'BIGINT', '租户ID'),
|
|
],
|
|
|
|
# 商品销售流水表
|
|
'billiards_dwd.dwd_goods_sale_log': [
|
|
('coupon_share_money', 'NUMERIC(18,2)', '优惠券分摊金额'),
|
|
],
|
|
|
|
# 支付流水表
|
|
'billiards_dwd.dwd_payment': [
|
|
('tenant_id', 'BIGINT', '租户ID'),
|
|
],
|
|
}
|
|
|
|
|
|
def get_existing_columns(conn, schema, table):
|
|
"""获取表已有的列"""
|
|
sql = """
|
|
SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_schema = %s AND table_name = %s
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, (schema, table))
|
|
return {row[0].lower() for row in cur.fetchall()}
|
|
|
|
|
|
def table_exists(conn, schema, table):
|
|
"""检查表是否存在"""
|
|
sql = """
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM information_schema.tables
|
|
WHERE table_schema = %s AND table_name = %s
|
|
)
|
|
"""
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, (schema, table))
|
|
return cur.fetchone()[0]
|
|
|
|
|
|
def add_column(conn, full_table, col_name, col_type, comment):
|
|
"""添加列"""
|
|
sql = f'ALTER TABLE {full_table} ADD COLUMN IF NOT EXISTS "{col_name}" {col_type}'
|
|
comment_sql = f"COMMENT ON COLUMN {full_table}.\"{col_name}\" IS '{comment}'"
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
cur.execute(comment_sql)
|
|
conn.commit()
|
|
print(f" [OK] 添加列: {col_name} ({col_type})")
|
|
|
|
|
|
def main():
|
|
conn = psycopg2.connect(DSN)
|
|
|
|
print("=" * 80)
|
|
print("添加缺失的 DWD 列")
|
|
print("=" * 80)
|
|
|
|
total_added = 0
|
|
total_skipped = 0
|
|
tables_not_found = []
|
|
|
|
for full_table, columns in MISSING_COLUMNS.items():
|
|
schema, table = full_table.split('.')
|
|
|
|
if not table_exists(conn, schema, table):
|
|
print(f"\n[跳过] 表不存在: {full_table}")
|
|
tables_not_found.append(full_table)
|
|
continue
|
|
|
|
print(f"\n处理表: {full_table}")
|
|
|
|
existing = get_existing_columns(conn, schema, table)
|
|
|
|
for col_name, col_type, comment in columns:
|
|
if col_name.lower() in existing:
|
|
print(f" [跳过] 列已存在: {col_name}")
|
|
total_skipped += 1
|
|
else:
|
|
add_column(conn, full_table, col_name, col_type, comment)
|
|
total_added += 1
|
|
|
|
conn.close()
|
|
|
|
print("\n" + "=" * 80)
|
|
print(f"完成: 添加 {total_added} 列, 跳过 {total_skipped} 列")
|
|
if tables_not_found:
|
|
print(f"未找到的表: {len(tables_not_found)}")
|
|
for t in tables_not_found:
|
|
print(f" - {t}")
|
|
print("=" * 80)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|