Files
feiqiu-ETL/etl_billiards/tasks/payments_dwd_task.py
2025-11-30 07:19:05 +08:00

139 lines
5.2 KiB
Python

# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.facts.payment import PaymentLoader
from models.parsers import TypeParser
import json
class PaymentsDwdTask(BaseDwdTask):
"""
DWD Task: Process Payment Records from ODS to Fact Table
Source: billiards_ods.ods_payment
Target: billiards.fact_payment
"""
def get_task_code(self) -> str:
return "PAYMENTS_DWD"
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
loader = PaymentLoader(self.db, logger=self.logger)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_updated = 0
total_skipped = 0
# Iterate ODS Data
batches = self.iter_ods_rows(
table_name="billiards_ods.ods_payment_record",
columns=["site_id", "pay_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_payment(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_payments(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
total_skipped += skipped
self.db.commit()
self.logger.info(
"Task %s completed. inserted=%s updated=%s skipped=%s",
self.get_task_code(),
total_inserted,
total_updated,
total_skipped,
)
return {
"status": "SUCCESS",
"counts": {
"inserted": total_inserted,
"updated": total_updated,
"skipped": total_skipped,
},
"window_start": window_start,
"window_end": window_end,
}
def _parse_payment(self, raw: dict, store_id: int) -> dict:
"""Parse ODS payload into Fact structure"""
try:
pay_id = TypeParser.parse_int(raw.get("payId") or raw.get("id"))
if not pay_id:
return None
relate_type = str(raw.get("relateType") or raw.get("relate_type") or "")
relate_id = TypeParser.parse_int(raw.get("relateId") or raw.get("relate_id"))
# Attempt to populate settlement / trade identifiers
order_settle_id = TypeParser.parse_int(
raw.get("orderSettleId") or raw.get("order_settle_id")
)
order_trade_no = TypeParser.parse_int(
raw.get("orderTradeNo") or raw.get("order_trade_no")
)
if relate_type in {"1", "SETTLE", "ORDER"}:
order_settle_id = order_settle_id or relate_id
return {
"store_id": store_id,
"pay_id": pay_id,
"order_id": TypeParser.parse_int(raw.get("orderId") or raw.get("order_id")),
"order_settle_id": order_settle_id,
"order_trade_no": order_trade_no,
"relate_type": relate_type,
"relate_id": relate_id,
"site_id": TypeParser.parse_int(
raw.get("siteId") or raw.get("site_id") or store_id
),
"tenant_id": TypeParser.parse_int(raw.get("tenantId") or raw.get("tenant_id")),
"create_time": TypeParser.parse_timestamp(
raw.get("createTime") or raw.get("create_time"), self.tz
),
"pay_time": TypeParser.parse_timestamp(raw.get("payTime"), self.tz),
"pay_amount": TypeParser.parse_decimal(raw.get("payAmount")),
"fee_amount": TypeParser.parse_decimal(
raw.get("feeAmount")
or raw.get("serviceFee")
or raw.get("channelFee")
or raw.get("fee_amount")
),
"discount_amount": TypeParser.parse_decimal(
raw.get("discountAmount")
or raw.get("couponAmount")
or raw.get("discount_amount")
),
"payment_method": str(raw.get("paymentMethod") or raw.get("payment_method") or ""),
"pay_type": raw.get("payType") or raw.get("pay_type"),
"online_pay_channel": raw.get("onlinePayChannel") or raw.get("online_pay_channel"),
"pay_terminal": raw.get("payTerminal") or raw.get("pay_terminal"),
"pay_status": str(raw.get("payStatus") or raw.get("pay_status") or ""),
"remark": raw.get("remark"),
"raw_data": json.dumps(raw, ensure_ascii=False)
}
except Exception as e:
self.logger.warning(f"Error parsing payment: {e}")
return None