# -*- coding: utf-8 -*- """Recompute billiards_dws.dws_order_summary from DWD fact tables.""" from __future__ import annotations import argparse import os import sys from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from database.connection import DatabaseConnection # noqa: E402 SQL_BUILD_SUMMARY = r""" WITH table_fee AS ( SELECT site_id, order_settle_id, order_trade_no, MIN(member_id) AS member_id, SUM(COALESCE(final_table_fee, 0)) AS table_fee_amount, SUM(COALESCE(member_discount_amount, 0)) AS member_discount_amount, SUM(COALESCE(manual_discount_amount, 0)) AS manual_discount_amount, SUM(COALESCE(original_table_fee, 0)) AS original_table_fee, MIN(start_time) AS first_time FROM billiards_dwd.fact_table_usage WHERE (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR start_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR start_time::date <= %(end_date)s) AND COALESCE(is_canceled, FALSE) = FALSE GROUP BY site_id, order_settle_id, order_trade_no ), assistant_fee AS ( SELECT site_id, order_settle_id, order_trade_no, MIN(member_id) AS member_id, SUM(COALESCE(final_fee, 0)) AS assistant_service_amount, SUM(COALESCE(member_discount_amount, 0)) AS member_discount_amount, SUM(COALESCE(manual_discount_amount, 0)) AS manual_discount_amount, SUM(COALESCE(original_fee, 0)) AS original_fee, MIN(start_time) AS first_time FROM billiards_dwd.fact_assistant_service WHERE (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR start_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR start_time::date <= %(end_date)s) AND COALESCE(is_canceled, FALSE) = FALSE GROUP BY site_id, order_settle_id, order_trade_no ), goods_fee AS ( SELECT site_id, order_settle_id, order_trade_no, MIN(member_id) AS member_id, SUM(COALESCE(final_amount, 0)) FILTER (WHERE COALESCE(is_gift, FALSE) = FALSE) AS goods_amount, SUM(COALESCE(discount_amount, 0)) FILTER (WHERE COALESCE(is_gift, FALSE) = FALSE) AS goods_discount_amount, SUM(COALESCE(original_amount, 0)) FILTER (WHERE COALESCE(is_gift, FALSE) = FALSE) AS goods_original_amount, COUNT(*) FILTER (WHERE COALESCE(is_gift, FALSE) = FALSE) AS item_count, SUM(COALESCE(quantity, 0)) FILTER (WHERE COALESCE(is_gift, FALSE) = FALSE) AS total_item_quantity, MIN(sale_time) AS first_time FROM billiards_dwd.fact_sale_item WHERE (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR sale_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR sale_time::date <= %(end_date)s) GROUP BY site_id, order_settle_id, order_trade_no ), coupon_usage AS ( SELECT site_id, order_settle_id, order_trade_no, MIN(member_id) AS member_id, SUM(COALESCE(deduct_amount, 0)) AS coupon_deduction, SUM(COALESCE(settle_price, 0)) AS settle_price, MIN(used_time) AS first_time FROM billiards_dwd.fact_coupon_usage WHERE (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR used_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR used_time::date <= %(end_date)s) GROUP BY site_id, order_settle_id, order_trade_no ), payments AS ( SELECT fp.site_id, fp.order_settle_id, fp.order_trade_no, MIN(fp.member_id) AS member_id, SUM(COALESCE(fp.pay_amount, 0)) AS total_paid_amount, SUM(COALESCE(fp.pay_amount, 0)) FILTER (WHERE COALESCE(pm.is_stored_value, FALSE)) AS stored_card_deduct, SUM(COALESCE(fp.pay_amount, 0)) FILTER (WHERE NOT COALESCE(pm.is_stored_value, FALSE)) AS external_paid_amount, MIN(fp.pay_time) AS first_time FROM billiards_dwd.fact_payment fp LEFT JOIN billiards_dwd.dim_pay_method pm ON fp.pay_method_code = pm.pay_method_code WHERE (%(site_id)s IS NULL OR fp.site_id = %(site_id)s) AND (%(start_date)s IS NULL OR fp.pay_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR fp.pay_time::date <= %(end_date)s) GROUP BY fp.site_id, fp.order_settle_id, fp.order_trade_no ), refunds AS ( SELECT site_id, order_settle_id, order_trade_no, SUM(COALESCE(refund_amount, 0)) AS refund_amount FROM billiards_dwd.fact_refund WHERE (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR refund_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR refund_time::date <= %(end_date)s) GROUP BY site_id, order_settle_id, order_trade_no ), combined_ids AS ( SELECT site_id, order_settle_id, order_trade_no FROM table_fee UNION SELECT site_id, order_settle_id, order_trade_no FROM assistant_fee UNION SELECT site_id, order_settle_id, order_trade_no FROM goods_fee UNION SELECT site_id, order_settle_id, order_trade_no FROM coupon_usage UNION SELECT site_id, order_settle_id, order_trade_no FROM payments UNION SELECT site_id, order_settle_id, order_trade_no FROM refunds ), site_dim AS ( SELECT site_id, tenant_id FROM billiards_dwd.dim_site ) INSERT INTO billiards_dws.dws_order_summary ( site_id, order_settle_id, order_trade_no, order_date, tenant_id, member_id, member_flag, recharge_order_flag, item_count, total_item_quantity, table_fee_amount, assistant_service_amount, goods_amount, group_amount, total_coupon_deduction, member_discount_amount, manual_discount_amount, order_original_amount, order_final_amount, stored_card_deduct, external_paid_amount, total_paid_amount, book_table_flow, book_assistant_flow, book_goods_flow, book_group_flow, book_order_flow, order_effective_consume_cash, order_effective_recharge_cash, order_effective_flow, refund_amount, net_income, created_at, updated_at ) SELECT c.site_id, c.order_settle_id, c.order_trade_no, COALESCE(tf.first_time, af.first_time, gf.first_time, pay.first_time, cu.first_time)::date AS order_date, sd.tenant_id, COALESCE(tf.member_id, af.member_id, gf.member_id, cu.member_id, pay.member_id) AS member_id, COALESCE(tf.member_id, af.member_id, gf.member_id, cu.member_id, pay.member_id) IS NOT NULL AS member_flag, -- recharge flag: no consumption side but has payments (COALESCE(tf.table_fee_amount, 0) + COALESCE(af.assistant_service_amount, 0) + COALESCE(gf.goods_amount, 0) + COALESCE(cu.settle_price, 0) = 0) AND COALESCE(pay.total_paid_amount, 0) > 0 AS recharge_order_flag, COALESCE(gf.item_count, 0) AS item_count, COALESCE(gf.total_item_quantity, 0) AS total_item_quantity, COALESCE(tf.table_fee_amount, 0) AS table_fee_amount, COALESCE(af.assistant_service_amount, 0) AS assistant_service_amount, COALESCE(gf.goods_amount, 0) AS goods_amount, COALESCE(cu.settle_price, 0) AS group_amount, COALESCE(cu.coupon_deduction, 0) AS total_coupon_deduction, COALESCE(tf.member_discount_amount, 0) + COALESCE(af.member_discount_amount, 0) + COALESCE(gf.goods_discount_amount, 0) AS member_discount_amount, COALESCE(tf.manual_discount_amount, 0) + COALESCE(af.manual_discount_amount, 0) AS manual_discount_amount, COALESCE(tf.original_table_fee, 0) + COALESCE(af.original_fee, 0) + COALESCE(gf.goods_original_amount, 0) AS order_original_amount, COALESCE(tf.table_fee_amount, 0) + COALESCE(af.assistant_service_amount, 0) + COALESCE(gf.goods_amount, 0) + COALESCE(cu.settle_price, 0) - COALESCE(cu.coupon_deduction, 0) AS order_final_amount, COALESCE(pay.stored_card_deduct, 0) AS stored_card_deduct, COALESCE(pay.external_paid_amount, 0) AS external_paid_amount, COALESCE(pay.total_paid_amount, 0) AS total_paid_amount, COALESCE(tf.table_fee_amount, 0) AS book_table_flow, COALESCE(af.assistant_service_amount, 0) AS book_assistant_flow, COALESCE(gf.goods_amount, 0) AS book_goods_flow, COALESCE(cu.settle_price, 0) AS book_group_flow, COALESCE(tf.table_fee_amount, 0) + COALESCE(af.assistant_service_amount, 0) + COALESCE(gf.goods_amount, 0) + COALESCE(cu.settle_price, 0) AS book_order_flow, CASE WHEN (COALESCE(tf.table_fee_amount, 0) + COALESCE(af.assistant_service_amount, 0) + COALESCE(gf.goods_amount, 0) + COALESCE(cu.settle_price, 0) = 0) THEN 0 ELSE COALESCE(pay.external_paid_amount, 0) END AS order_effective_consume_cash, CASE WHEN (COALESCE(tf.table_fee_amount, 0) + COALESCE(af.assistant_service_amount, 0) + COALESCE(gf.goods_amount, 0) + COALESCE(cu.settle_price, 0) = 0) THEN COALESCE(pay.external_paid_amount, 0) ELSE 0 END AS order_effective_recharge_cash, COALESCE(pay.external_paid_amount, 0) + COALESCE(cu.settle_price, 0) AS order_effective_flow, COALESCE(rf.refund_amount, 0) AS refund_amount, (COALESCE(pay.external_paid_amount, 0) + COALESCE(cu.settle_price, 0)) - COALESCE(rf.refund_amount, 0) AS net_income, now() AS created_at, now() AS updated_at FROM combined_ids c LEFT JOIN table_fee tf ON c.site_id = tf.site_id AND c.order_settle_id = tf.order_settle_id LEFT JOIN assistant_fee af ON c.site_id = af.site_id AND c.order_settle_id = af.order_settle_id LEFT JOIN goods_fee gf ON c.site_id = gf.site_id AND c.order_settle_id = gf.order_settle_id LEFT JOIN coupon_usage cu ON c.site_id = cu.site_id AND c.order_settle_id = cu.order_settle_id LEFT JOIN payments pay ON c.site_id = pay.site_id AND c.order_settle_id = pay.order_settle_id LEFT JOIN refunds rf ON c.site_id = rf.site_id AND c.order_settle_id = rf.order_settle_id LEFT JOIN site_dim sd ON c.site_id = sd.site_id ON CONFLICT (site_id, order_settle_id) DO UPDATE SET order_trade_no = EXCLUDED.order_trade_no, order_date = EXCLUDED.order_date, tenant_id = EXCLUDED.tenant_id, member_id = EXCLUDED.member_id, member_flag = EXCLUDED.member_flag, recharge_order_flag = EXCLUDED.recharge_order_flag, item_count = EXCLUDED.item_count, total_item_quantity = EXCLUDED.total_item_quantity, table_fee_amount = EXCLUDED.table_fee_amount, assistant_service_amount = EXCLUDED.assistant_service_amount, goods_amount = EXCLUDED.goods_amount, group_amount = EXCLUDED.group_amount, total_coupon_deduction = EXCLUDED.total_coupon_deduction, member_discount_amount = EXCLUDED.member_discount_amount, manual_discount_amount = EXCLUDED.manual_discount_amount, order_original_amount = EXCLUDED.order_original_amount, order_final_amount = EXCLUDED.order_final_amount, stored_card_deduct = EXCLUDED.stored_card_deduct, external_paid_amount = EXCLUDED.external_paid_amount, total_paid_amount = EXCLUDED.total_paid_amount, book_table_flow = EXCLUDED.book_table_flow, book_assistant_flow = EXCLUDED.book_assistant_flow, book_goods_flow = EXCLUDED.book_goods_flow, book_group_flow = EXCLUDED.book_group_flow, book_order_flow = EXCLUDED.book_order_flow, order_effective_consume_cash = EXCLUDED.order_effective_consume_cash, order_effective_recharge_cash = EXCLUDED.order_effective_recharge_cash, order_effective_flow = EXCLUDED.order_effective_flow, refund_amount = EXCLUDED.refund_amount, net_income = EXCLUDED.net_income, updated_at = now(); """ def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Build/update dws_order_summary from DWD fact tables." ) parser.add_argument( "--dsn", default=os.environ.get("PG_DSN"), help="PostgreSQL DSN (fallback: PG_DSN env)", ) parser.add_argument( "--site-id", type=int, default=None, help="Filter by site_id (optional, default all sites)", ) parser.add_argument( "--start-date", dest="start_date", default=None, help="Filter facts from this date (YYYY-MM-DD, optional)", ) parser.add_argument( "--end-date", dest="end_date", default=None, help="Filter facts until this date (YYYY-MM-DD, optional)", ) parser.add_argument( "--timeout", type=int, default=int(os.environ.get("PG_CONNECT_TIMEOUT", 10) or 10), help="connect_timeout seconds (capped at 20, default 10)", ) return parser.parse_args() def main() -> int: args = parse_args() if not args.dsn: print("Missing DSN. Set PG_DSN or pass --dsn.", file=sys.stderr) return 2 params = { "site_id": args.site_id, "start_date": args.start_date, "end_date": args.end_date, } timeout_val = max(1, min(args.timeout, 20)) conn = DatabaseConnection(args.dsn, connect_timeout=timeout_val) try: with conn.conn.cursor() as cur: cur.execute(SQL_BUILD_SUMMARY, params) conn.commit() except Exception as exc: # pragma: no cover - operational script conn.rollback() print(f"DWS build failed: {exc}", file=sys.stderr) return 1 finally: conn.close() print("dws_order_summary refreshed.") return 0 if __name__ == "__main__": raise SystemExit(main())