Files
feiqiu-ETL/etl_billiards/scripts/build_dwd_from_ods.py
2025-11-30 07:19:05 +08:00

426 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""Populate PRD DWD tables from ODS payload snapshots."""
from __future__ import annotations
import argparse
import os
import sys
import psycopg2
SQL_STEPS: list[tuple[str, str]] = [
(
"dim_tenant",
"""
INSERT INTO billiards_dwd.dim_tenant (tenant_id, tenant_name, status)
SELECT DISTINCT tenant_id, 'default' AS tenant_name, 'active' AS status
FROM (
SELECT tenant_id FROM billiards_ods.ods_order_settle
UNION SELECT tenant_id FROM billiards_ods.ods_order_receipt_detail
UNION SELECT tenant_id FROM billiards_ods.ods_member_profile
) s
WHERE tenant_id IS NOT NULL
ON CONFLICT (tenant_id) DO UPDATE SET updated_at = now();
""",
),
(
"dim_site",
"""
INSERT INTO billiards_dwd.dim_site (site_id, tenant_id, site_name, status)
SELECT DISTINCT site_id, MAX(tenant_id) AS tenant_id, 'default' AS site_name, 'active' AS status
FROM (
SELECT site_id, tenant_id FROM billiards_ods.ods_order_settle
UNION SELECT site_id, tenant_id FROM billiards_ods.ods_order_receipt_detail
UNION SELECT site_id, tenant_id FROM billiards_ods.ods_table_info
) s
WHERE site_id IS NOT NULL
GROUP BY site_id
ON CONFLICT (site_id) DO UPDATE SET updated_at = now();
""",
),
(
"dim_product_category",
"""
INSERT INTO billiards_dwd.dim_product_category (category_id, category_name, parent_id, level_no, status)
SELECT DISTINCT category_id, category_name, parent_id, level_no, status
FROM billiards_ods.ods_goods_category
WHERE category_id IS NOT NULL
ON CONFLICT (category_id) DO UPDATE SET
category_name = EXCLUDED.category_name,
parent_id = EXCLUDED.parent_id,
level_no = EXCLUDED.level_no,
status = EXCLUDED.status;
""",
),
(
"dim_product",
"""
INSERT INTO billiards_dwd.dim_product (goods_id, goods_name, goods_code, category_id, category_name, unit, default_price, status)
SELECT DISTINCT goods_id, goods_name, NULL::TEXT AS goods_code, category_id, category_name, NULL::TEXT AS unit, sale_price AS default_price, status
FROM billiards_ods.ods_store_product
WHERE goods_id IS NOT NULL
ON CONFLICT (goods_id) DO UPDATE SET
goods_name = EXCLUDED.goods_name,
category_id = EXCLUDED.category_id,
category_name = EXCLUDED.category_name,
default_price = EXCLUDED.default_price,
status = EXCLUDED.status,
updated_at = now();
""",
),
(
"dim_product_from_sales",
"""
INSERT INTO billiards_dwd.dim_product (goods_id, goods_name)
SELECT DISTINCT goods_id, goods_name
FROM billiards_ods.ods_store_sale_item
WHERE goods_id IS NOT NULL
ON CONFLICT (goods_id) DO NOTHING;
""",
),
(
"dim_member_card_type",
"""
INSERT INTO billiards_dwd.dim_member_card_type (card_type_id, card_type_name, discount_rate)
SELECT DISTINCT card_type_id, card_type_name, discount_rate
FROM billiards_ods.ods_member_card
WHERE card_type_id IS NOT NULL
ON CONFLICT (card_type_id) DO UPDATE SET
card_type_name = EXCLUDED.card_type_name,
discount_rate = EXCLUDED.discount_rate;
""",
),
(
"dim_member",
"""
INSERT INTO billiards_dwd.dim_member (
site_id, member_id, tenant_id, member_name, nickname, gender, birthday, mobile,
member_type_id, member_type_name, status, register_time, last_visit_time,
balance, total_recharge_amount, total_consumed_amount, wechat_id, alipay_id, remark
)
SELECT DISTINCT
prof.site_id,
prof.member_id,
prof.tenant_id,
prof.member_name,
prof.nickname,
prof.gender,
prof.birthday,
prof.mobile,
card.member_type_id,
card.member_type_name,
prof.status,
prof.register_time,
prof.last_visit_time,
prof.balance,
NULL::NUMERIC AS total_recharge_amount,
NULL::NUMERIC AS total_consumed_amount,
prof.wechat_id,
prof.alipay_id,
prof.remarks
FROM billiards_ods.ods_member_profile prof
LEFT JOIN (
SELECT DISTINCT site_id, member_id, card_type_id AS member_type_id, card_type_name AS member_type_name
FROM billiards_ods.ods_member_card
) card
ON prof.site_id = card.site_id AND prof.member_id = card.member_id
WHERE prof.member_id IS NOT NULL
ON CONFLICT (site_id, member_id) DO UPDATE SET
member_name = EXCLUDED.member_name,
nickname = EXCLUDED.nickname,
gender = EXCLUDED.gender,
birthday = EXCLUDED.birthday,
mobile = EXCLUDED.mobile,
member_type_id = EXCLUDED.member_type_id,
member_type_name = EXCLUDED.member_type_name,
status = EXCLUDED.status,
register_time = EXCLUDED.register_time,
last_visit_time = EXCLUDED.last_visit_time,
balance = EXCLUDED.balance,
wechat_id = EXCLUDED.wechat_id,
alipay_id = EXCLUDED.alipay_id,
remark = EXCLUDED.remark,
updated_at = now();
""",
),
(
"dim_table",
"""
INSERT INTO billiards_dwd.dim_table (table_id, site_id, table_code, table_name, table_type, area_name, status, created_time, updated_time)
SELECT DISTINCT table_id, site_id, table_code, table_name, table_type, area_name, status, created_time, updated_time
FROM billiards_ods.ods_table_info
WHERE table_id IS NOT NULL
ON CONFLICT (table_id) DO UPDATE SET
site_id = EXCLUDED.site_id,
table_code = EXCLUDED.table_code,
table_name = EXCLUDED.table_name,
table_type = EXCLUDED.table_type,
area_name = EXCLUDED.area_name,
status = EXCLUDED.status,
created_time = EXCLUDED.created_time,
updated_time = EXCLUDED.updated_time;
""",
),
(
"dim_assistant",
"""
INSERT INTO billiards_dwd.dim_assistant (assistant_id, assistant_name, mobile, status)
SELECT DISTINCT assistant_id, assistant_name, mobile, status
FROM billiards_ods.ods_assistant_account
WHERE assistant_id IS NOT NULL
ON CONFLICT (assistant_id) DO UPDATE SET
assistant_name = EXCLUDED.assistant_name,
mobile = EXCLUDED.mobile,
status = EXCLUDED.status,
updated_at = now();
""",
),
(
"dim_pay_method",
"""
INSERT INTO billiards_dwd.dim_pay_method (pay_method_code, pay_method_name, is_stored_value, status)
SELECT DISTINCT pay_method_code, pay_method_name, FALSE AS is_stored_value, 'active' AS status
FROM billiards_ods.ods_payment_record
WHERE pay_method_code IS NOT NULL
ON CONFLICT (pay_method_code) DO UPDATE SET
pay_method_name = EXCLUDED.pay_method_name,
status = EXCLUDED.status,
updated_at = now();
""",
),
(
"dim_coupon_platform",
"""
INSERT INTO billiards_dwd.dim_coupon_platform (platform_code, platform_name)
SELECT DISTINCT platform_code, platform_code AS platform_name
FROM billiards_ods.ods_platform_coupon_log
WHERE platform_code IS NOT NULL
ON CONFLICT (platform_code) DO NOTHING;
""",
),
(
"fact_sale_item",
"""
INSERT INTO billiards_dwd.fact_sale_item (
site_id, sale_item_id, order_trade_no, order_settle_id, member_id,
goods_id, category_id, quantity, original_amount, discount_amount,
final_amount, is_gift, sale_time
)
SELECT
site_id,
sale_item_id,
order_trade_no,
order_settle_id,
NULL::BIGINT AS member_id,
goods_id,
category_id,
quantity,
original_amount,
discount_amount,
final_amount,
COALESCE(is_gift, FALSE),
sale_time
FROM billiards_ods.ods_store_sale_item
ON CONFLICT (site_id, sale_item_id) DO NOTHING;
""",
),
(
"fact_table_usage",
"""
INSERT INTO billiards_dwd.fact_table_usage (
site_id, ledger_id, order_trade_no, order_settle_id, table_id,
member_id, start_time, end_time, duration_minutes,
original_table_fee, member_discount_amount, manual_discount_amount,
final_table_fee, is_canceled, cancel_time
)
SELECT
site_id,
ledger_id,
order_trade_no,
order_settle_id,
table_id,
member_id,
start_time,
end_time,
duration_minutes,
original_table_fee,
0::NUMERIC AS member_discount_amount,
discount_amount AS manual_discount_amount,
final_table_fee,
FALSE AS is_canceled,
NULL::TIMESTAMPTZ AS cancel_time
FROM billiards_ods.ods_table_use_log
ON CONFLICT (site_id, ledger_id) DO NOTHING;
""",
),
(
"fact_assistant_service",
"""
INSERT INTO billiards_dwd.fact_assistant_service (
site_id, ledger_id, order_trade_no, order_settle_id, assistant_id,
assist_type_code, member_id, start_time, end_time, duration_minutes,
original_fee, member_discount_amount, manual_discount_amount,
final_fee, is_canceled, cancel_time
)
SELECT
site_id,
ledger_id,
order_trade_no,
order_settle_id,
assistant_id,
NULL::TEXT AS assist_type_code,
member_id,
start_time,
end_time,
duration_minutes,
original_fee,
0::NUMERIC AS member_discount_amount,
discount_amount AS manual_discount_amount,
final_fee,
FALSE AS is_canceled,
NULL::TIMESTAMPTZ AS cancel_time
FROM billiards_ods.ods_assistant_service_log
ON CONFLICT (site_id, ledger_id) DO NOTHING;
""",
),
(
"fact_coupon_usage",
"""
INSERT INTO billiards_dwd.fact_coupon_usage (
site_id, coupon_id, package_id, order_trade_no, order_settle_id,
member_id, platform_code, status, deduct_amount, settle_price, used_time
)
SELECT
site_id,
coupon_id,
NULL::BIGINT AS package_id,
order_trade_no,
order_settle_id,
member_id,
platform_code,
status,
deduct_amount,
settle_price,
used_time
FROM billiards_ods.ods_platform_coupon_log
ON CONFLICT (site_id, coupon_id) DO NOTHING;
""",
),
(
"fact_payment",
"""
INSERT INTO billiards_dwd.fact_payment (
site_id, pay_id, order_trade_no, order_settle_id, member_id,
pay_method_code, pay_amount, pay_time, relate_type, relate_id
)
SELECT
site_id,
pay_id,
order_trade_no,
order_settle_id,
member_id,
pay_method_code,
pay_amount,
pay_time,
relate_type,
relate_id
FROM billiards_ods.ods_payment_record
ON CONFLICT (site_id, pay_id) DO NOTHING;
""",
),
(
"fact_refund",
"""
INSERT INTO billiards_dwd.fact_refund (
site_id, refund_id, order_trade_no, order_settle_id, member_id,
pay_method_code, refund_amount, refund_time, status
)
SELECT
site_id,
refund_id,
order_trade_no,
order_settle_id,
member_id,
pay_method_code,
refund_amount,
refund_time,
status
FROM billiards_ods.ods_refund_record
ON CONFLICT (site_id, refund_id) DO NOTHING;
""",
),
(
"fact_balance_change",
"""
INSERT INTO billiards_dwd.fact_balance_change (
site_id, change_id, member_id, change_type, relate_type, relate_id,
pay_method_code, change_amount, balance_before, balance_after, change_time
)
SELECT
site_id,
change_id,
member_id,
change_type,
NULL::TEXT AS relate_type,
relate_id,
NULL::TEXT AS pay_method_code,
change_amount,
balance_before,
balance_after,
change_time
FROM billiards_ods.ods_balance_change
ON CONFLICT (site_id, change_id) DO NOTHING;
""",
),
]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Build DWD tables from ODS payloads (PRD schema).")
parser.add_argument(
"--dsn",
default=os.environ.get("PG_DSN"),
help="PostgreSQL DSN (fallback PG_DSN env)",
)
parser.add_argument(
"--timeout",
type=int,
default=int(os.environ.get("PG_CONNECT_TIMEOUT", 10) or 10),
help="connect_timeout seconds (capped at 20, default 10)",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if not args.dsn:
print("Missing DSN. Use --dsn or PG_DSN.", file=sys.stderr)
return 2
timeout_val = max(1, min(args.timeout, 20))
conn = psycopg2.connect(args.dsn, connect_timeout=timeout_val)
conn.autocommit = False
try:
with conn.cursor() as cur:
for name, sql in SQL_STEPS:
cur.execute(sql)
print(f"[OK] {name}")
conn.commit()
except Exception as exc: # pragma: no cover - operational script
conn.rollback()
print(f"[FAIL] {exc}", file=sys.stderr)
return 1
finally:
try:
conn.close()
except Exception:
pass
print("DWD build complete.")
return 0
if __name__ == "__main__":
raise SystemExit(main())