189 lines
8.3 KiB
Python
189 lines
8.3 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""小票详情加载器"""
|
||
from ..base_loader import BaseLoader
|
||
import json
|
||
|
||
class TicketLoader(BaseLoader):
|
||
"""
|
||
小票详情 JSON 解析加载器,写入 DWD 事实表。
|
||
处理:
|
||
- fact_order(订单头)
|
||
- fact_order_goods(商品项)
|
||
- fact_table_usage(台桌使用)
|
||
- fact_assistant_service(助教服务)
|
||
"""
|
||
|
||
def process_tickets(self, tickets: list, store_id: int) -> tuple:
|
||
"""
|
||
批量处理小票 JSON。
|
||
返回 (插入数, 错误数)
|
||
"""
|
||
inserted_count = 0
|
||
error_count = 0
|
||
|
||
# 准备批量数据列表
|
||
orders = []
|
||
goods_list = []
|
||
table_usages = []
|
||
assistant_services = []
|
||
|
||
for ticket in tickets:
|
||
try:
|
||
# 1. 解析订单头部 (fact_order)
|
||
root_data = ticket.get("data", {}).get("data", {})
|
||
if not root_data:
|
||
continue
|
||
|
||
order_settle_id = root_data.get("orderSettleId")
|
||
if not order_settle_id:
|
||
continue
|
||
|
||
orders.append({
|
||
"store_id": store_id,
|
||
"order_settle_id": order_settle_id,
|
||
"order_trade_no": 0,
|
||
"order_no": str(root_data.get("orderSettleNumber", "")),
|
||
"member_id": 0,
|
||
"pay_time": root_data.get("payTime"),
|
||
"total_amount": root_data.get("consumeMoney", 0),
|
||
"pay_amount": root_data.get("actualPayment", 0),
|
||
"discount_amount": root_data.get("memberOfferAmount", 0),
|
||
"coupon_amount": root_data.get("couponAmount", 0),
|
||
"status": "PAID",
|
||
"cashier_name": root_data.get("cashierName", ""),
|
||
"remark": root_data.get("orderRemark", ""),
|
||
"raw_data": json.dumps(ticket, ensure_ascii=False)
|
||
})
|
||
|
||
# 2. 解析订单项 (orderItem 列表)
|
||
order_items = root_data.get("orderItem", [])
|
||
for item in order_items:
|
||
order_trade_no = item.get("siteOrderId")
|
||
|
||
# 2.1 台桌流水
|
||
table_ledger = item.get("tableLedger")
|
||
if table_ledger:
|
||
table_usages.append({
|
||
"store_id": store_id,
|
||
"order_ledger_id": table_ledger.get("orderTableLedgerId"),
|
||
"order_settle_id": order_settle_id,
|
||
"table_id": table_ledger.get("siteTableId"),
|
||
"table_name": table_ledger.get("tableName"),
|
||
"start_time": table_ledger.get("chargeStartTime"),
|
||
"end_time": table_ledger.get("chargeEndTime"),
|
||
"duration_minutes": table_ledger.get("useDuration", 0),
|
||
"total_amount": table_ledger.get("consumptionAmount", 0),
|
||
"pay_amount": table_ledger.get("consumptionAmount", 0) - table_ledger.get("memberDiscountAmount", 0)
|
||
})
|
||
|
||
# 2.2 商品流水
|
||
goods_ledgers = item.get("goodsLedgers", [])
|
||
for g in goods_ledgers:
|
||
goods_list.append({
|
||
"store_id": store_id,
|
||
"order_goods_id": g.get("orderGoodsLedgerId"),
|
||
"order_settle_id": order_settle_id,
|
||
"order_trade_no": order_trade_no,
|
||
"goods_id": g.get("siteGoodsId"),
|
||
"goods_name": g.get("goodsName"),
|
||
"quantity": g.get("goodsCount", 0),
|
||
"unit_price": g.get("goodsPrice", 0),
|
||
"total_amount": g.get("ledgerAmount", 0),
|
||
"pay_amount": g.get("realGoodsMoney", 0)
|
||
})
|
||
|
||
# 2.3 助教服务
|
||
assistant_ledgers = item.get("assistantPlayWith", [])
|
||
for a in assistant_ledgers:
|
||
assistant_services.append({
|
||
"store_id": store_id,
|
||
"ledger_id": a.get("orderAssistantLedgerId"),
|
||
"order_settle_id": order_settle_id,
|
||
"assistant_id": a.get("assistantId"),
|
||
"assistant_name": a.get("ledgerName"),
|
||
"service_type": a.get("skillName", "Play"),
|
||
"start_time": a.get("ledgerStartTime"),
|
||
"end_time": a.get("ledgerEndTime"),
|
||
"duration_minutes": int(a.get("ledgerCount", 0) / 60) if a.get("ledgerCount") else 0,
|
||
"total_amount": a.get("ledgerAmount", 0),
|
||
"pay_amount": a.get("ledgerAmount", 0)
|
||
})
|
||
|
||
inserted_count += 1
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error parsing ticket: {e}", exc_info=True)
|
||
error_count += 1
|
||
|
||
# 3. 批量插入/更新
|
||
if orders:
|
||
self._upsert_orders(orders)
|
||
if goods_list:
|
||
self._upsert_goods(goods_list)
|
||
if table_usages:
|
||
self._upsert_table_usages(table_usages)
|
||
if assistant_services:
|
||
self._upsert_assistant_services(assistant_services)
|
||
|
||
return inserted_count, error_count
|
||
|
||
def _upsert_orders(self, rows):
|
||
sql = """
|
||
INSERT INTO billiards.fact_order (
|
||
store_id, order_settle_id, order_trade_no, order_no, member_id,
|
||
pay_time, total_amount, pay_amount, discount_amount, coupon_amount,
|
||
status, cashier_name, remark, raw_data
|
||
) VALUES (
|
||
%(store_id)s, %(order_settle_id)s, %(order_trade_no)s, %(order_no)s, %(member_id)s,
|
||
%(pay_time)s, %(total_amount)s, %(pay_amount)s, %(discount_amount)s, %(coupon_amount)s,
|
||
%(status)s, %(cashier_name)s, %(remark)s, %(raw_data)s
|
||
)
|
||
ON CONFLICT (store_id, order_settle_id) DO UPDATE SET
|
||
pay_time = EXCLUDED.pay_time,
|
||
pay_amount = EXCLUDED.pay_amount,
|
||
updated_at = now()
|
||
"""
|
||
self.db.batch_execute(sql, rows)
|
||
|
||
def _upsert_goods(self, rows):
|
||
sql = """
|
||
INSERT INTO billiards.fact_order_goods (
|
||
store_id, order_goods_id, order_settle_id, order_trade_no,
|
||
goods_id, goods_name, quantity, unit_price, total_amount, pay_amount
|
||
) VALUES (
|
||
%(store_id)s, %(order_goods_id)s, %(order_settle_id)s, %(order_trade_no)s,
|
||
%(goods_id)s, %(goods_name)s, %(quantity)s, %(unit_price)s, %(total_amount)s, %(pay_amount)s
|
||
)
|
||
ON CONFLICT (store_id, order_goods_id) DO UPDATE SET
|
||
pay_amount = EXCLUDED.pay_amount
|
||
"""
|
||
self.db.batch_execute(sql, rows)
|
||
|
||
def _upsert_table_usages(self, rows):
|
||
sql = """
|
||
INSERT INTO billiards.fact_table_usage (
|
||
store_id, order_ledger_id, order_settle_id, table_id, table_name,
|
||
start_time, end_time, duration_minutes, total_amount, pay_amount
|
||
) VALUES (
|
||
%(store_id)s, %(order_ledger_id)s, %(order_settle_id)s, %(table_id)s, %(table_name)s,
|
||
%(start_time)s, %(end_time)s, %(duration_minutes)s, %(total_amount)s, %(pay_amount)s
|
||
)
|
||
ON CONFLICT (store_id, order_ledger_id) DO UPDATE SET
|
||
pay_amount = EXCLUDED.pay_amount
|
||
"""
|
||
self.db.batch_execute(sql, rows)
|
||
|
||
def _upsert_assistant_services(self, rows):
|
||
sql = """
|
||
INSERT INTO billiards.fact_assistant_service (
|
||
store_id, ledger_id, order_settle_id, assistant_id, assistant_name,
|
||
service_type, start_time, end_time, duration_minutes, total_amount, pay_amount
|
||
) VALUES (
|
||
%(store_id)s, %(ledger_id)s, %(order_settle_id)s, %(assistant_id)s, %(assistant_name)s,
|
||
%(service_type)s, %(start_time)s, %(end_time)s, %(duration_minutes)s, %(total_amount)s, %(pay_amount)s
|
||
)
|
||
ON CONFLICT (store_id, ledger_id) DO UPDATE SET
|
||
pay_amount = EXCLUDED.pay_amount
|
||
"""
|
||
self.db.batch_execute(sql, rows)
|