426 lines
14 KiB
Python
426 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Populate PRD DWD tables from ODS payload snapshots."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
|
|
import psycopg2
|
|
|
|
|
|
SQL_STEPS: list[tuple[str, str]] = [
|
|
(
|
|
"dim_tenant",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_tenant (tenant_id, tenant_name, status)
|
|
SELECT DISTINCT tenant_id, 'default' AS tenant_name, 'active' AS status
|
|
FROM (
|
|
SELECT tenant_id FROM billiards_ods.ods_order_settle
|
|
UNION SELECT tenant_id FROM billiards_ods.ods_order_receipt_detail
|
|
UNION SELECT tenant_id FROM billiards_ods.ods_member_profile
|
|
) s
|
|
WHERE tenant_id IS NOT NULL
|
|
ON CONFLICT (tenant_id) DO UPDATE SET updated_at = now();
|
|
""",
|
|
),
|
|
(
|
|
"dim_site",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_site (site_id, tenant_id, site_name, status)
|
|
SELECT DISTINCT site_id, MAX(tenant_id) AS tenant_id, 'default' AS site_name, 'active' AS status
|
|
FROM (
|
|
SELECT site_id, tenant_id FROM billiards_ods.ods_order_settle
|
|
UNION SELECT site_id, tenant_id FROM billiards_ods.ods_order_receipt_detail
|
|
UNION SELECT site_id, tenant_id FROM billiards_ods.ods_table_info
|
|
) s
|
|
WHERE site_id IS NOT NULL
|
|
GROUP BY site_id
|
|
ON CONFLICT (site_id) DO UPDATE SET updated_at = now();
|
|
""",
|
|
),
|
|
(
|
|
"dim_product_category",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_product_category (category_id, category_name, parent_id, level_no, status)
|
|
SELECT DISTINCT category_id, category_name, parent_id, level_no, status
|
|
FROM billiards_ods.ods_goods_category
|
|
WHERE category_id IS NOT NULL
|
|
ON CONFLICT (category_id) DO UPDATE SET
|
|
category_name = EXCLUDED.category_name,
|
|
parent_id = EXCLUDED.parent_id,
|
|
level_no = EXCLUDED.level_no,
|
|
status = EXCLUDED.status;
|
|
""",
|
|
),
|
|
(
|
|
"dim_product",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_product (goods_id, goods_name, goods_code, category_id, category_name, unit, default_price, status)
|
|
SELECT DISTINCT goods_id, goods_name, NULL::TEXT AS goods_code, category_id, category_name, NULL::TEXT AS unit, sale_price AS default_price, status
|
|
FROM billiards_ods.ods_store_product
|
|
WHERE goods_id IS NOT NULL
|
|
ON CONFLICT (goods_id) DO UPDATE SET
|
|
goods_name = EXCLUDED.goods_name,
|
|
category_id = EXCLUDED.category_id,
|
|
category_name = EXCLUDED.category_name,
|
|
default_price = EXCLUDED.default_price,
|
|
status = EXCLUDED.status,
|
|
updated_at = now();
|
|
""",
|
|
),
|
|
(
|
|
"dim_product_from_sales",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_product (goods_id, goods_name)
|
|
SELECT DISTINCT goods_id, goods_name
|
|
FROM billiards_ods.ods_store_sale_item
|
|
WHERE goods_id IS NOT NULL
|
|
ON CONFLICT (goods_id) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"dim_member_card_type",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_member_card_type (card_type_id, card_type_name, discount_rate)
|
|
SELECT DISTINCT card_type_id, card_type_name, discount_rate
|
|
FROM billiards_ods.ods_member_card
|
|
WHERE card_type_id IS NOT NULL
|
|
ON CONFLICT (card_type_id) DO UPDATE SET
|
|
card_type_name = EXCLUDED.card_type_name,
|
|
discount_rate = EXCLUDED.discount_rate;
|
|
""",
|
|
),
|
|
(
|
|
"dim_member",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_member (
|
|
site_id, member_id, tenant_id, member_name, nickname, gender, birthday, mobile,
|
|
member_type_id, member_type_name, status, register_time, last_visit_time,
|
|
balance, total_recharge_amount, total_consumed_amount, wechat_id, alipay_id, remark
|
|
)
|
|
SELECT DISTINCT
|
|
prof.site_id,
|
|
prof.member_id,
|
|
prof.tenant_id,
|
|
prof.member_name,
|
|
prof.nickname,
|
|
prof.gender,
|
|
prof.birthday,
|
|
prof.mobile,
|
|
card.member_type_id,
|
|
card.member_type_name,
|
|
prof.status,
|
|
prof.register_time,
|
|
prof.last_visit_time,
|
|
prof.balance,
|
|
NULL::NUMERIC AS total_recharge_amount,
|
|
NULL::NUMERIC AS total_consumed_amount,
|
|
prof.wechat_id,
|
|
prof.alipay_id,
|
|
prof.remarks
|
|
FROM billiards_ods.ods_member_profile prof
|
|
LEFT JOIN (
|
|
SELECT DISTINCT site_id, member_id, card_type_id AS member_type_id, card_type_name AS member_type_name
|
|
FROM billiards_ods.ods_member_card
|
|
) card
|
|
ON prof.site_id = card.site_id AND prof.member_id = card.member_id
|
|
WHERE prof.member_id IS NOT NULL
|
|
ON CONFLICT (site_id, member_id) DO UPDATE SET
|
|
member_name = EXCLUDED.member_name,
|
|
nickname = EXCLUDED.nickname,
|
|
gender = EXCLUDED.gender,
|
|
birthday = EXCLUDED.birthday,
|
|
mobile = EXCLUDED.mobile,
|
|
member_type_id = EXCLUDED.member_type_id,
|
|
member_type_name = EXCLUDED.member_type_name,
|
|
status = EXCLUDED.status,
|
|
register_time = EXCLUDED.register_time,
|
|
last_visit_time = EXCLUDED.last_visit_time,
|
|
balance = EXCLUDED.balance,
|
|
wechat_id = EXCLUDED.wechat_id,
|
|
alipay_id = EXCLUDED.alipay_id,
|
|
remark = EXCLUDED.remark,
|
|
updated_at = now();
|
|
""",
|
|
),
|
|
(
|
|
"dim_table",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_table (table_id, site_id, table_code, table_name, table_type, area_name, status, created_time, updated_time)
|
|
SELECT DISTINCT table_id, site_id, table_code, table_name, table_type, area_name, status, created_time, updated_time
|
|
FROM billiards_ods.ods_table_info
|
|
WHERE table_id IS NOT NULL
|
|
ON CONFLICT (table_id) DO UPDATE SET
|
|
site_id = EXCLUDED.site_id,
|
|
table_code = EXCLUDED.table_code,
|
|
table_name = EXCLUDED.table_name,
|
|
table_type = EXCLUDED.table_type,
|
|
area_name = EXCLUDED.area_name,
|
|
status = EXCLUDED.status,
|
|
created_time = EXCLUDED.created_time,
|
|
updated_time = EXCLUDED.updated_time;
|
|
""",
|
|
),
|
|
(
|
|
"dim_assistant",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_assistant (assistant_id, assistant_name, mobile, status)
|
|
SELECT DISTINCT assistant_id, assistant_name, mobile, status
|
|
FROM billiards_ods.ods_assistant_account
|
|
WHERE assistant_id IS NOT NULL
|
|
ON CONFLICT (assistant_id) DO UPDATE SET
|
|
assistant_name = EXCLUDED.assistant_name,
|
|
mobile = EXCLUDED.mobile,
|
|
status = EXCLUDED.status,
|
|
updated_at = now();
|
|
""",
|
|
),
|
|
(
|
|
"dim_pay_method",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_pay_method (pay_method_code, pay_method_name, is_stored_value, status)
|
|
SELECT DISTINCT pay_method_code, pay_method_name, FALSE AS is_stored_value, 'active' AS status
|
|
FROM billiards_ods.ods_payment_record
|
|
WHERE pay_method_code IS NOT NULL
|
|
ON CONFLICT (pay_method_code) DO UPDATE SET
|
|
pay_method_name = EXCLUDED.pay_method_name,
|
|
status = EXCLUDED.status,
|
|
updated_at = now();
|
|
""",
|
|
),
|
|
(
|
|
"dim_coupon_platform",
|
|
"""
|
|
INSERT INTO billiards_dwd.dim_coupon_platform (platform_code, platform_name)
|
|
SELECT DISTINCT platform_code, platform_code AS platform_name
|
|
FROM billiards_ods.ods_platform_coupon_log
|
|
WHERE platform_code IS NOT NULL
|
|
ON CONFLICT (platform_code) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"fact_sale_item",
|
|
"""
|
|
INSERT INTO billiards_dwd.fact_sale_item (
|
|
site_id, sale_item_id, order_trade_no, order_settle_id, member_id,
|
|
goods_id, category_id, quantity, original_amount, discount_amount,
|
|
final_amount, is_gift, sale_time
|
|
)
|
|
SELECT
|
|
site_id,
|
|
sale_item_id,
|
|
order_trade_no,
|
|
order_settle_id,
|
|
NULL::BIGINT AS member_id,
|
|
goods_id,
|
|
category_id,
|
|
quantity,
|
|
original_amount,
|
|
discount_amount,
|
|
final_amount,
|
|
COALESCE(is_gift, FALSE),
|
|
sale_time
|
|
FROM billiards_ods.ods_store_sale_item
|
|
ON CONFLICT (site_id, sale_item_id) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"fact_table_usage",
|
|
"""
|
|
INSERT INTO billiards_dwd.fact_table_usage (
|
|
site_id, ledger_id, order_trade_no, order_settle_id, table_id,
|
|
member_id, start_time, end_time, duration_minutes,
|
|
original_table_fee, member_discount_amount, manual_discount_amount,
|
|
final_table_fee, is_canceled, cancel_time
|
|
)
|
|
SELECT
|
|
site_id,
|
|
ledger_id,
|
|
order_trade_no,
|
|
order_settle_id,
|
|
table_id,
|
|
member_id,
|
|
start_time,
|
|
end_time,
|
|
duration_minutes,
|
|
original_table_fee,
|
|
0::NUMERIC AS member_discount_amount,
|
|
discount_amount AS manual_discount_amount,
|
|
final_table_fee,
|
|
FALSE AS is_canceled,
|
|
NULL::TIMESTAMPTZ AS cancel_time
|
|
FROM billiards_ods.ods_table_use_log
|
|
ON CONFLICT (site_id, ledger_id) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"fact_assistant_service",
|
|
"""
|
|
INSERT INTO billiards_dwd.fact_assistant_service (
|
|
site_id, ledger_id, order_trade_no, order_settle_id, assistant_id,
|
|
assist_type_code, member_id, start_time, end_time, duration_minutes,
|
|
original_fee, member_discount_amount, manual_discount_amount,
|
|
final_fee, is_canceled, cancel_time
|
|
)
|
|
SELECT
|
|
site_id,
|
|
ledger_id,
|
|
order_trade_no,
|
|
order_settle_id,
|
|
assistant_id,
|
|
NULL::TEXT AS assist_type_code,
|
|
member_id,
|
|
start_time,
|
|
end_time,
|
|
duration_minutes,
|
|
original_fee,
|
|
0::NUMERIC AS member_discount_amount,
|
|
discount_amount AS manual_discount_amount,
|
|
final_fee,
|
|
FALSE AS is_canceled,
|
|
NULL::TIMESTAMPTZ AS cancel_time
|
|
FROM billiards_ods.ods_assistant_service_log
|
|
ON CONFLICT (site_id, ledger_id) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"fact_coupon_usage",
|
|
"""
|
|
INSERT INTO billiards_dwd.fact_coupon_usage (
|
|
site_id, coupon_id, package_id, order_trade_no, order_settle_id,
|
|
member_id, platform_code, status, deduct_amount, settle_price, used_time
|
|
)
|
|
SELECT
|
|
site_id,
|
|
coupon_id,
|
|
NULL::BIGINT AS package_id,
|
|
order_trade_no,
|
|
order_settle_id,
|
|
member_id,
|
|
platform_code,
|
|
status,
|
|
deduct_amount,
|
|
settle_price,
|
|
used_time
|
|
FROM billiards_ods.ods_platform_coupon_log
|
|
ON CONFLICT (site_id, coupon_id) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"fact_payment",
|
|
"""
|
|
INSERT INTO billiards_dwd.fact_payment (
|
|
site_id, pay_id, order_trade_no, order_settle_id, member_id,
|
|
pay_method_code, pay_amount, pay_time, relate_type, relate_id
|
|
)
|
|
SELECT
|
|
site_id,
|
|
pay_id,
|
|
order_trade_no,
|
|
order_settle_id,
|
|
member_id,
|
|
pay_method_code,
|
|
pay_amount,
|
|
pay_time,
|
|
relate_type,
|
|
relate_id
|
|
FROM billiards_ods.ods_payment_record
|
|
ON CONFLICT (site_id, pay_id) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"fact_refund",
|
|
"""
|
|
INSERT INTO billiards_dwd.fact_refund (
|
|
site_id, refund_id, order_trade_no, order_settle_id, member_id,
|
|
pay_method_code, refund_amount, refund_time, status
|
|
)
|
|
SELECT
|
|
site_id,
|
|
refund_id,
|
|
order_trade_no,
|
|
order_settle_id,
|
|
member_id,
|
|
pay_method_code,
|
|
refund_amount,
|
|
refund_time,
|
|
status
|
|
FROM billiards_ods.ods_refund_record
|
|
ON CONFLICT (site_id, refund_id) DO NOTHING;
|
|
""",
|
|
),
|
|
(
|
|
"fact_balance_change",
|
|
"""
|
|
INSERT INTO billiards_dwd.fact_balance_change (
|
|
site_id, change_id, member_id, change_type, relate_type, relate_id,
|
|
pay_method_code, change_amount, balance_before, balance_after, change_time
|
|
)
|
|
SELECT
|
|
site_id,
|
|
change_id,
|
|
member_id,
|
|
change_type,
|
|
NULL::TEXT AS relate_type,
|
|
relate_id,
|
|
NULL::TEXT AS pay_method_code,
|
|
change_amount,
|
|
balance_before,
|
|
balance_after,
|
|
change_time
|
|
FROM billiards_ods.ods_balance_change
|
|
ON CONFLICT (site_id, change_id) DO NOTHING;
|
|
""",
|
|
),
|
|
]
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Build DWD tables from ODS payloads (PRD schema).")
|
|
parser.add_argument(
|
|
"--dsn",
|
|
default=os.environ.get("PG_DSN"),
|
|
help="PostgreSQL DSN (fallback PG_DSN env)",
|
|
)
|
|
parser.add_argument(
|
|
"--timeout",
|
|
type=int,
|
|
default=int(os.environ.get("PG_CONNECT_TIMEOUT", 10) or 10),
|
|
help="connect_timeout seconds (capped at 20, default 10)",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if not args.dsn:
|
|
print("Missing DSN. Use --dsn or PG_DSN.", file=sys.stderr)
|
|
return 2
|
|
|
|
timeout_val = max(1, min(args.timeout, 20))
|
|
conn = psycopg2.connect(args.dsn, connect_timeout=timeout_val)
|
|
conn.autocommit = False
|
|
try:
|
|
with conn.cursor() as cur:
|
|
for name, sql in SQL_STEPS:
|
|
cur.execute(sql)
|
|
print(f"[OK] {name}")
|
|
conn.commit()
|
|
except Exception as exc: # pragma: no cover - operational script
|
|
conn.rollback()
|
|
print(f"[FAIL] {exc}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
print("DWD build complete.")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|