# -*- coding: utf-8 -*- """Build DWS order summary table from DWD fact tables.""" from __future__ import annotations from datetime import date from typing import Any from tasks.base_task import BaseTask, TaskContext from utils.windowing import build_window_segments, calc_window_minutes # 原先从 scripts.rebuild.build_dws_order_summary 导入;脚本已归档,SQL 内联于此 SQL_BUILD_SUMMARY = r""" WITH base AS ( SELECT sh.site_id, sh.order_settle_id, sh.order_trade_no, COALESCE(sh.pay_time, sh.create_time)::date AS order_date, sh.tenant_id, sh.member_id, COALESCE(sh.is_bind_member, FALSE) AS member_flag, (COALESCE(sh.consume_money, 0) = 0 AND COALESCE(sh.pay_amount, 0) > 0) AS recharge_order_flag, COALESCE(sh.member_discount_amount, 0) AS member_discount_amount, COALESCE(sh.adjust_amount, 0) AS manual_discount_amount, COALESCE(sh.pay_amount, 0) AS total_paid_amount, COALESCE(sh.balance_amount, 0) + COALESCE(sh.recharge_card_amount, 0) + COALESCE(sh.gift_card_amount, 0) AS stored_card_deduct, COALESCE(sh.coupon_amount, 0) AS total_coupon_deduction, COALESCE(sh.table_charge_money, 0) AS settle_table_fee_amount, COALESCE(sh.assistant_pd_money, 0) + COALESCE(sh.assistant_cx_money, 0) AS settle_assistant_service_amount, COALESCE(sh.real_goods_money, 0) AS settle_goods_amount FROM billiards_dwd.dwd_settlement_head sh WHERE (%(site_id)s IS NULL OR sh.site_id = %(site_id)s) AND (%(start_date)s IS NULL OR COALESCE(sh.pay_time, sh.create_time)::date >= %(start_date)s) AND (%(end_date)s IS NULL OR COALESCE(sh.pay_time, sh.create_time)::date <= %(end_date)s) ), table_fee AS ( SELECT site_id, order_settle_id, SUM(COALESCE(real_table_charge_money, 0)) AS table_fee_amount FROM billiards_dwd.dwd_table_fee_log WHERE COALESCE(is_delete, 0) = 0 AND (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR start_use_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR start_use_time::date <= %(end_date)s) GROUP BY site_id, order_settle_id ), assistant_fee AS ( SELECT site_id, order_settle_id, SUM(COALESCE(ledger_amount, 0)) AS assistant_service_amount FROM billiards_dwd.dwd_assistant_service_log WHERE COALESCE(is_delete, 0) = 0 AND (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR start_use_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR start_use_time::date <= %(end_date)s) GROUP BY site_id, order_settle_id ), goods_fee AS ( SELECT site_id, order_settle_id, COUNT(*) AS item_count, SUM(COALESCE(ledger_count, 0)) AS total_item_quantity, SUM(COALESCE(real_goods_money, 0)) AS goods_amount FROM billiards_dwd.dwd_store_goods_sale WHERE COALESCE(is_delete, 0) = 0 AND (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR create_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR create_time::date <= %(end_date)s) GROUP BY site_id, order_settle_id ), group_fee AS ( SELECT site_id, order_settle_id, SUM(COALESCE(ledger_amount, 0)) AS group_amount FROM billiards_dwd.dwd_groupbuy_redemption WHERE COALESCE(is_delete, 0) = 0 AND (%(site_id)s IS NULL OR site_id = %(site_id)s) AND (%(start_date)s IS NULL OR create_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR create_time::date <= %(end_date)s) GROUP BY site_id, order_settle_id ), refunds AS ( SELECT r.site_id, r.relate_id AS order_settle_id, SUM(COALESCE(rx.refund_amount, 0)) AS refund_amount FROM billiards_dwd.dwd_refund r LEFT JOIN billiards_dwd.dwd_refund_ex rx ON r.refund_id = rx.refund_id WHERE (%(site_id)s IS NULL OR r.site_id = %(site_id)s) AND (%(start_date)s IS NULL OR r.pay_time::date >= %(start_date)s) AND (%(end_date)s IS NULL OR r.pay_time::date <= %(end_date)s) GROUP BY r.site_id, r.relate_id ) INSERT INTO billiards_dws.dws_order_summary ( site_id, order_settle_id, order_trade_no, order_date, tenant_id, member_id, member_flag, recharge_order_flag, item_count, total_item_quantity, table_fee_amount, assistant_service_amount, goods_amount, group_amount, total_coupon_deduction, member_discount_amount, manual_discount_amount, order_original_amount, order_final_amount, stored_card_deduct, external_paid_amount, total_paid_amount, book_table_flow, book_assistant_flow, book_goods_flow, book_group_flow, book_order_flow, order_effective_consume_cash, order_effective_recharge_cash, order_effective_flow, refund_amount, net_income, created_at, updated_at ) SELECT b.site_id, b.order_settle_id, b.order_trade_no::text, b.order_date, b.tenant_id, b.member_id, b.member_flag, b.recharge_order_flag, COALESCE(gf.item_count, 0), COALESCE(gf.total_item_quantity, 0), COALESCE(tf.table_fee_amount, b.settle_table_fee_amount), COALESCE(af.assistant_service_amount, b.settle_assistant_service_amount), COALESCE(gf.goods_amount, b.settle_goods_amount), COALESCE(gr.group_amount, 0), b.total_coupon_deduction, b.member_discount_amount, b.manual_discount_amount, (b.total_paid_amount + b.total_coupon_deduction + b.member_discount_amount + b.manual_discount_amount), b.total_paid_amount, b.stored_card_deduct, GREATEST(b.total_paid_amount - b.stored_card_deduct, 0), b.total_paid_amount, COALESCE(tf.table_fee_amount, b.settle_table_fee_amount), COALESCE(af.assistant_service_amount, b.settle_assistant_service_amount), COALESCE(gf.goods_amount, b.settle_goods_amount), COALESCE(gr.group_amount, 0), COALESCE(tf.table_fee_amount, b.settle_table_fee_amount) + COALESCE(af.assistant_service_amount, b.settle_assistant_service_amount) + COALESCE(gf.goods_amount, b.settle_goods_amount) + COALESCE(gr.group_amount, 0), GREATEST(b.total_paid_amount - b.stored_card_deduct, 0), 0, b.total_paid_amount, COALESCE(rf.refund_amount, 0), b.total_paid_amount - COALESCE(rf.refund_amount, 0), now(), now() FROM base b LEFT JOIN table_fee tf ON b.site_id = tf.site_id AND b.order_settle_id = tf.order_settle_id LEFT JOIN assistant_fee af ON b.site_id = af.site_id AND b.order_settle_id = af.order_settle_id LEFT JOIN goods_fee gf ON b.site_id = gf.site_id AND b.order_settle_id = gf.order_settle_id LEFT JOIN group_fee gr ON b.site_id = gr.site_id AND b.order_settle_id = gr.order_settle_id LEFT JOIN refunds rf ON b.site_id = rf.site_id AND b.order_settle_id = rf.order_settle_id ON CONFLICT (site_id, order_settle_id) DO UPDATE SET order_trade_no = EXCLUDED.order_trade_no, order_date = EXCLUDED.order_date, tenant_id = EXCLUDED.tenant_id, member_id = EXCLUDED.member_id, member_flag = EXCLUDED.member_flag, recharge_order_flag = EXCLUDED.recharge_order_flag, item_count = EXCLUDED.item_count, total_item_quantity = EXCLUDED.total_item_quantity, table_fee_amount = EXCLUDED.table_fee_amount, assistant_service_amount = EXCLUDED.assistant_service_amount, goods_amount = EXCLUDED.goods_amount, group_amount = EXCLUDED.group_amount, total_coupon_deduction = EXCLUDED.total_coupon_deduction, member_discount_amount = EXCLUDED.member_discount_amount, manual_discount_amount = EXCLUDED.manual_discount_amount, order_original_amount = EXCLUDED.order_original_amount, order_final_amount = EXCLUDED.order_final_amount, stored_card_deduct = EXCLUDED.stored_card_deduct, external_paid_amount = EXCLUDED.external_paid_amount, total_paid_amount = EXCLUDED.total_paid_amount, book_table_flow = EXCLUDED.book_table_flow, book_assistant_flow = EXCLUDED.book_assistant_flow, book_goods_flow = EXCLUDED.book_goods_flow, book_group_flow = EXCLUDED.book_group_flow, book_order_flow = EXCLUDED.book_order_flow, order_effective_consume_cash = EXCLUDED.order_effective_consume_cash, order_effective_recharge_cash = EXCLUDED.order_effective_recharge_cash, order_effective_flow = EXCLUDED.order_effective_flow, refund_amount = EXCLUDED.refund_amount, net_income = EXCLUDED.net_income, updated_at = now(); """ class DwsBuildOrderSummaryTask(BaseTask): """Recompute/refresh `billiards_dws.dws_order_summary` for a date window.""" def get_task_code(self) -> str: return "DWS_BUILD_ORDER_SUMMARY" def execute(self, cursor_data: dict | None = None) -> dict: base_context = self._build_context(cursor_data) task_code = self.get_task_code() segments = build_window_segments( self.config, base_context.window_start, base_context.window_end, tz=self.tz, override_only=True, ) if not segments: segments = [(base_context.window_start, base_context.window_end)] total_segments = len(segments) if total_segments > 1: self.logger.info("%s: 分段执行 共%s段", task_code, total_segments) total_counts: dict = {} segment_results: list[dict] = [] request_params_list: list[dict] = [] total_deleted = 0 for idx, (window_start, window_end) in enumerate(segments, start=1): context = self._build_context_for_window(window_start, window_end, cursor_data) self.logger.info( "%s: 开始执行(%s/%s), 窗口[%s ~ %s]", task_code, idx, total_segments, context.window_start, context.window_end, ) try: extracted = self.extract(context) transformed = self.transform(extracted, context) load_result = self.load(transformed, context) or {} self.db.commit() except Exception: self.db.rollback() self.logger.error("%s: 执行失败", task_code, exc_info=True) raise counts = load_result.get("counts") or {} self._accumulate_counts(total_counts, counts) extra = load_result.get("extra") or {} deleted = int(extra.get("deleted") or 0) total_deleted += deleted request_params = load_result.get("request_params") if request_params: request_params_list.append(request_params) if total_segments > 1: segment_results.append( { "window": { "start": context.window_start, "end": context.window_end, "minutes": context.window_minutes, }, "counts": counts, "extra": extra, } ) overall_start = segments[0][0] overall_end = segments[-1][1] result = {"status": "SUCCESS", "counts": total_counts} result["window"] = { "start": overall_start, "end": overall_end, "minutes": calc_window_minutes(overall_start, overall_end), } if segment_results: result["segments"] = segment_results if request_params_list: result["request_params"] = request_params_list[0] if len(request_params_list) == 1 else request_params_list if total_deleted: result["extra"] = {"deleted": total_deleted} self.logger.info("%s: 完成, 统计=%s", task_code, total_counts) return result def extract(self, context: TaskContext) -> dict[str, Any]: store_id = int(self.config.get("app.store_id")) full_refresh = bool(self.config.get("dws.order_summary.full_refresh", False)) site_id = self.config.get("dws.order_summary.site_id", store_id) if site_id in ("", None, "null", "NULL"): site_id = None start_date = self.config.get("dws.order_summary.start_date") end_date = self.config.get("dws.order_summary.end_date") if not full_refresh: if not start_date: start_date = context.window_start.date() if not end_date: end_date = context.window_end.date() else: start_date = None end_date = None delete_before_insert = bool(self.config.get("dws.order_summary.delete_before_insert", True)) return { "site_id": site_id, "start_date": start_date, "end_date": end_date, "full_refresh": full_refresh, "delete_before_insert": delete_before_insert, } def load(self, extracted: dict[str, Any], context: TaskContext) -> dict: sql_params = { "site_id": extracted["site_id"], "start_date": extracted["start_date"], "end_date": extracted["end_date"], } request_params = { "site_id": extracted["site_id"], "start_date": _jsonable_date(extracted["start_date"]), "end_date": _jsonable_date(extracted["end_date"]), } with self.db.conn.cursor() as cur: cur.execute("SELECT to_regclass('billiards_dws.dws_order_summary') AS reg;") row = cur.fetchone() reg = row[0] if row else None if not reg: raise RuntimeError("DWS 表不存在:请先运行任务 INIT_DWS_SCHEMA") deleted = 0 if extracted["delete_before_insert"]: if extracted["full_refresh"] and extracted["site_id"] is None: cur.execute("TRUNCATE TABLE billiards_dws.dws_order_summary;") self.logger.info("DWS订单汇总: 已清空 billiards_dws.dws_order_summary") else: delete_sql = "DELETE FROM billiards_dws.dws_order_summary WHERE 1=1" delete_args: list[Any] = [] if extracted["site_id"] is not None: delete_sql += " AND site_id = %s" delete_args.append(extracted["site_id"]) if extracted["start_date"] is not None: delete_sql += " AND order_date >= %s" delete_args.append(_as_date(extracted["start_date"])) if extracted["end_date"] is not None: delete_sql += " AND order_date <= %s" delete_args.append(_as_date(extracted["end_date"])) cur.execute(delete_sql, delete_args) deleted = cur.rowcount self.logger.info("DWS订单汇总: 删除=%s 语句=%s", deleted, delete_sql) cur.execute(SQL_BUILD_SUMMARY, sql_params) affected = cur.rowcount return { "counts": {"fetched": 0, "inserted": affected, "updated": 0, "skipped": 0, "errors": 0}, "request_params": request_params, "extra": {"deleted": deleted}, } def _as_date(v: Any) -> date: if isinstance(v, date): return v return date.fromisoformat(str(v)) def _jsonable_date(v: Any): if v is None: return None if isinstance(v, date): return v.isoformat() return str(v)