Files
feiqiu-ETL/etl_billiards/tasks/ods_tasks.py
2025-11-20 01:27:33 +08:00

401 lines
14 KiB
Python

# -*- coding: utf-8 -*-
"""ODS ingestion tasks."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type
from loaders.ods import GenericODSLoader
from models.parsers import TypeParser
from .base_task import BaseTask
ColumnTransform = Callable[[Any], Any]
@dataclass(frozen=True)
class ColumnSpec:
"""Mapping between DB column and source JSON field."""
column: str
sources: Tuple[str, ...] = ()
required: bool = False
default: Any = None
transform: ColumnTransform | None = None
@dataclass(frozen=True)
class OdsTaskSpec:
"""Definition of a single ODS ingestion task."""
code: str
class_name: str
table_name: str
endpoint: str
data_path: Tuple[str, ...] = ("data",)
list_key: str | None = None
pk_columns: Tuple[ColumnSpec, ...] = ()
extra_columns: Tuple[ColumnSpec, ...] = ()
include_page_size: bool = False
include_page_no: bool = True
include_source_file: bool = True
include_source_endpoint: bool = True
requires_window: bool = True
description: str = ""
extra_params: Dict[str, Any] = field(default_factory=dict)
class BaseOdsTask(BaseTask):
"""Shared functionality for ODS ingestion tasks."""
SPEC: OdsTaskSpec
def get_task_code(self) -> str:
return self.SPEC.code
def execute(self) -> dict:
spec = self.SPEC
self.logger.info("开始执行 %s (ODS)", spec.code)
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
if not store_id:
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
page_size = self.config.get("api.page_size", 200)
params = self._build_params(spec, store_id)
columns = self._resolve_columns(spec)
conflict_columns = ["store_id"] + [col.column for col in spec.pk_columns]
loader = GenericODSLoader(
self.db,
spec.table_name,
columns,
conflict_columns,
)
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
source_file = self._resolve_source_file_hint(spec)
try:
for page_no, page_records, _, _ in self.api.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
rows: List[dict] = []
for raw in page_records:
row = self._build_row(
spec=spec,
store_id=store_id,
record=raw,
page_no=page_no if spec.include_page_no else None,
page_size_value=len(page_records)
if spec.include_page_size
else None,
source_file=source_file,
)
if row is None:
counts["skipped"] += 1
continue
rows.append(row)
inserted, updated, _ = loader.upsert_rows(rows)
counts["inserted"] += inserted
counts["updated"] += updated
counts["fetched"] += len(page_records)
self.db.commit()
self.logger.info("%s ODS 任务完成: %s", spec.code, counts)
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
counts["errors"] += 1
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
raise
def _build_params(self, spec: OdsTaskSpec, store_id: int) -> dict:
params: dict[str, Any] = {"storeId": store_id}
params.update(spec.extra_params)
if spec.requires_window:
window_start, window_end, _ = self._get_time_window()
params["startTime"] = TypeParser.format_timestamp(window_start, self.tz)
params["endTime"] = TypeParser.format_timestamp(window_end, self.tz)
return params
def _resolve_columns(self, spec: OdsTaskSpec) -> List[str]:
columns: List[str] = ["store_id"]
seen = set(columns)
for col_spec in list(spec.pk_columns) + list(spec.extra_columns):
if col_spec.column not in seen:
columns.append(col_spec.column)
seen.add(col_spec.column)
if spec.include_page_no and "page_no" not in seen:
columns.append("page_no")
seen.add("page_no")
if spec.include_page_size and "page_size" not in seen:
columns.append("page_size")
seen.add("page_size")
if spec.include_source_file and "source_file" not in seen:
columns.append("source_file")
seen.add("source_file")
if spec.include_source_endpoint and "source_endpoint" not in seen:
columns.append("source_endpoint")
seen.add("source_endpoint")
if "fetched_at" not in seen:
columns.append("fetched_at")
seen.add("fetched_at")
if "payload" not in seen:
columns.append("payload")
return columns
def _build_row(
self,
spec: OdsTaskSpec,
store_id: int,
record: dict,
page_no: int | None,
page_size_value: int | None,
source_file: str | None,
) -> dict | None:
row: dict[str, Any] = {"store_id": store_id}
for col_spec in spec.pk_columns + spec.extra_columns:
value = self._extract_value(record, col_spec)
if value is None and col_spec.required:
self.logger.warning(
"%s 缺少必填字段 %s,原始记录: %s",
spec.code,
col_spec.column,
record,
)
return None
row[col_spec.column] = value
if spec.include_page_no:
row["page_no"] = page_no
if spec.include_page_size:
row["page_size"] = page_size_value
if spec.include_source_file:
row["source_file"] = source_file
if spec.include_source_endpoint:
row["source_endpoint"] = spec.endpoint
row["fetched_at"] = datetime.now(self.tz)
row["payload"] = record
return row
def _extract_value(self, record: dict, spec: ColumnSpec):
value = None
for key in spec.sources:
value = self._dig(record, key)
if value is not None:
break
if value is None and spec.default is not None:
value = spec.default
if value is not None and spec.transform:
value = spec.transform(value)
return value
@staticmethod
def _dig(record: Any, path: str | None):
if not path:
return None
current = record
for part in path.split("."):
if isinstance(current, dict):
current = current.get(part)
else:
return None
return current
def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None:
resolver = getattr(self.api, "get_source_hint", None)
if callable(resolver):
return resolver(spec.endpoint)
return None
def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec:
return ColumnSpec(
column=name,
sources=sources,
required=required,
transform=TypeParser.parse_int,
)
ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = (
OdsTaskSpec(
code="ODS_ORDER_SETTLE",
class_name="OdsOrderSettleTask",
table_name="billiards_ods.ods_order_settle",
endpoint="/order/list",
data_path=("data",),
pk_columns=(_int_col("order_settle_id", "orderSettleId", "order_settle_id", "id", required=True),),
extra_columns=(_int_col("order_trade_no", "orderTradeNo", "order_trade_no"),),
include_page_size=True,
description="订单/结算 ODS 原始记录",
),
OdsTaskSpec(
code="ODS_TABLE_USE",
class_name="OdsTableUseTask",
table_name="billiards_ods.ods_table_use_detail",
endpoint="/Table/UseDetailList",
data_path=("data", "siteTableUseDetailsList"),
pk_columns=(_int_col("ledger_id", "id", required=True),),
extra_columns=(
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
),
description="台费/开台流水 ODS",
),
OdsTaskSpec(
code="ODS_ASSISTANT_LEDGER",
class_name="OdsAssistantLedgerTask",
table_name="billiards_ods.ods_assistant_ledger",
endpoint="/Assistant/LedgerList",
data_path=("data", "orderAssistantDetails"),
pk_columns=(_int_col("ledger_id", "id", required=True),),
extra_columns=(
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
),
description="助教流水 ODS",
),
OdsTaskSpec(
code="ODS_ASSISTANT_ABOLISH",
class_name="OdsAssistantAbolishTask",
table_name="billiards_ods.ods_assistant_abolish",
endpoint="/Assistant/AbolishList",
data_path=("data", "abolitionAssistants"),
pk_columns=(_int_col("abolish_id", "id", required=True),),
description="助教作废记录 ODS",
),
OdsTaskSpec(
code="ODS_GOODS_LEDGER",
class_name="OdsGoodsLedgerTask",
table_name="billiards_ods.ods_goods_ledger",
endpoint="/Order/GoodsLedgerList",
data_path=("data", "orderGoodsLedgers"),
pk_columns=(_int_col("order_goods_id", "orderGoodsId", "id", required=True),),
extra_columns=(
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
),
description="商品销售流水 ODS",
),
OdsTaskSpec(
code="ODS_PAYMENT",
class_name="OdsPaymentTask",
table_name="billiards_ods.ods_payment",
endpoint="/pay/records",
data_path=("data",),
pk_columns=(_int_col("pay_id", "payId", "id", required=True),),
extra_columns=(
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
_int_col("relate_id", "relate_id", "relateId"),
),
include_page_size=False,
description="支付流水 ODS",
),
OdsTaskSpec(
code="ODS_REFUND",
class_name="OdsRefundTask",
table_name="billiards_ods.ods_refund",
endpoint="/Pay/RefundList",
data_path=(),
pk_columns=(_int_col("refund_id", "id", required=True),),
extra_columns=(
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
_int_col("relate_id", "relate_id", "relateId"),
),
description="退款流水 ODS",
),
OdsTaskSpec(
code="ODS_COUPON_VERIFY",
class_name="OdsCouponVerifyTask",
table_name="billiards_ods.ods_coupon_verify",
endpoint="/Coupon/UsageList",
data_path=(),
pk_columns=(_int_col("coupon_id", "id", "couponId", required=True),),
description="平台验券/团购流水 ODS",
),
OdsTaskSpec(
code="ODS_MEMBER",
class_name="OdsMemberTask",
table_name="billiards_ods.ods_member",
endpoint="/MemberProfile/GetTenantMemberList",
data_path=("data",),
pk_columns=(_int_col("member_id", "memberId", required=True),),
requires_window=False,
description="会员档案 ODS",
),
OdsTaskSpec(
code="ODS_MEMBER_CARD",
class_name="OdsMemberCardTask",
table_name="billiards_ods.ods_member_card",
endpoint="/MemberCard/List",
data_path=("data", "tenantMemberCards"),
pk_columns=(_int_col("card_id", "tenantMemberCardId", "cardId", required=True),),
requires_window=False,
description="会员卡/储值卡 ODS",
),
OdsTaskSpec(
code="ODS_PACKAGE",
class_name="OdsPackageTask",
table_name="billiards_ods.ods_package_coupon",
endpoint="/Package/List",
data_path=("data", "packageCouponList"),
pk_columns=(_int_col("package_id", "id", "packageId", required=True),),
requires_window=False,
description="团购/套餐定义 ODS",
),
OdsTaskSpec(
code="ODS_INVENTORY_STOCK",
class_name="OdsInventoryStockTask",
table_name="billiards_ods.ods_inventory_stock",
endpoint="/Inventory/StockSummary",
data_path=(),
pk_columns=(
_int_col("site_goods_id", "siteGoodsId", required=True),
ColumnSpec(column="snapshot_key", default="default", required=True),
),
requires_window=False,
description="库存汇总 ODS",
),
OdsTaskSpec(
code="ODS_INVENTORY_CHANGE",
class_name="OdsInventoryChangeTask",
table_name="billiards_ods.ods_inventory_change",
endpoint="/Inventory/ChangeList",
data_path=("data", "queryDeliveryRecordsList"),
pk_columns=(_int_col("change_id", "siteGoodsStockId", "id", required=True),),
description="库存变动 ODS",
),
)
def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]:
attrs = {
"SPEC": spec,
"__doc__": spec.description or f"ODS ingestion task {spec.code}",
"__module__": __name__,
}
return type(spec.class_name, (BaseOdsTask,), attrs)
ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = {
spec.code: _build_task_class(spec) for spec in ODS_TASK_SPECS
}
__all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask"]