430 lines
15 KiB
Python
430 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""ODS ingestion tasks."""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type
|
|
|
|
from loaders.ods import GenericODSLoader
|
|
from models.parsers import TypeParser
|
|
from .base_task import BaseTask
|
|
|
|
|
|
ColumnTransform = Callable[[Any], Any]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ColumnSpec:
|
|
"""Mapping between DB column and source JSON field."""
|
|
|
|
column: str
|
|
sources: Tuple[str, ...] = ()
|
|
required: bool = False
|
|
default: Any = None
|
|
transform: ColumnTransform | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class OdsTaskSpec:
|
|
"""Definition of a single ODS ingestion task."""
|
|
|
|
code: str
|
|
class_name: str
|
|
table_name: str
|
|
endpoint: str
|
|
data_path: Tuple[str, ...] = ("data",)
|
|
list_key: str | None = None
|
|
pk_columns: Tuple[ColumnSpec, ...] = ()
|
|
extra_columns: Tuple[ColumnSpec, ...] = ()
|
|
include_page_size: bool = False
|
|
include_page_no: bool = False
|
|
include_source_file: bool = True
|
|
include_source_endpoint: bool = True
|
|
requires_window: bool = True
|
|
time_fields: Tuple[str, str] | None = ("startTime", "endTime")
|
|
include_site_id: bool = True
|
|
description: str = ""
|
|
extra_params: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class BaseOdsTask(BaseTask):
|
|
"""Shared functionality for ODS ingestion tasks."""
|
|
|
|
SPEC: OdsTaskSpec
|
|
|
|
def get_task_code(self) -> str:
|
|
return self.SPEC.code
|
|
|
|
def execute(self) -> dict:
|
|
spec = self.SPEC
|
|
self.logger.info("开始执行 %s (ODS)", spec.code)
|
|
|
|
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
|
|
if not store_id:
|
|
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
|
|
|
|
page_size = self.config.get("api.page_size", 200)
|
|
params = self._build_params(spec, store_id)
|
|
columns = self._resolve_columns(spec)
|
|
conflict_columns = ["site_id"] + [col.column for col in spec.pk_columns]
|
|
loader = GenericODSLoader(
|
|
self.db,
|
|
spec.table_name,
|
|
columns,
|
|
conflict_columns,
|
|
)
|
|
|
|
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
|
|
source_file = self._resolve_source_file_hint(spec)
|
|
|
|
try:
|
|
for page_no, page_records, _, _ in self.api.iter_paginated(
|
|
endpoint=spec.endpoint,
|
|
params=params,
|
|
page_size=page_size,
|
|
data_path=spec.data_path,
|
|
list_key=spec.list_key,
|
|
):
|
|
rows: List[dict] = []
|
|
for raw in page_records:
|
|
row = self._build_row(
|
|
spec=spec,
|
|
store_id=store_id,
|
|
record=raw,
|
|
page_no=page_no if spec.include_page_no else None,
|
|
page_size_value=len(page_records)
|
|
if spec.include_page_size
|
|
else None,
|
|
source_file=source_file,
|
|
)
|
|
if row is None:
|
|
counts["skipped"] += 1
|
|
continue
|
|
rows.append(row)
|
|
|
|
inserted, updated, _ = loader.upsert_rows(rows)
|
|
counts["inserted"] += inserted
|
|
counts["updated"] += updated
|
|
counts["fetched"] += len(page_records)
|
|
|
|
self.db.commit()
|
|
self.logger.info("%s ODS 任务完成: %s", spec.code, counts)
|
|
return self._build_result("SUCCESS", counts)
|
|
|
|
except Exception:
|
|
self.db.rollback()
|
|
counts["errors"] += 1
|
|
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
|
|
raise
|
|
|
|
def _build_params(self, spec: OdsTaskSpec, store_id: int) -> dict:
|
|
base: dict[str, Any] = {}
|
|
if spec.include_site_id:
|
|
base["siteId"] = store_id
|
|
if spec.requires_window and spec.time_fields:
|
|
window_start, window_end, _ = self._get_time_window()
|
|
start_key, end_key = spec.time_fields
|
|
base[start_key] = TypeParser.format_timestamp(window_start, self.tz)
|
|
base[end_key] = TypeParser.format_timestamp(window_end, self.tz)
|
|
|
|
params = self._merge_common_params(base)
|
|
params.update(spec.extra_params)
|
|
return params
|
|
|
|
def _resolve_columns(self, spec: OdsTaskSpec) -> List[str]:
|
|
columns: List[str] = ["site_id"]
|
|
seen = set(columns)
|
|
for col_spec in list(spec.pk_columns) + list(spec.extra_columns):
|
|
if col_spec.column not in seen:
|
|
columns.append(col_spec.column)
|
|
seen.add(col_spec.column)
|
|
|
|
if spec.include_page_no and "page_no" not in seen:
|
|
columns.append("page_no")
|
|
seen.add("page_no")
|
|
|
|
if spec.include_page_size and "page_size" not in seen:
|
|
columns.append("page_size")
|
|
seen.add("page_size")
|
|
|
|
if spec.include_source_file and "source_file" not in seen:
|
|
columns.append("source_file")
|
|
seen.add("source_file")
|
|
|
|
if spec.include_source_endpoint and "source_endpoint" not in seen:
|
|
columns.append("source_endpoint")
|
|
seen.add("source_endpoint")
|
|
|
|
if "fetched_at" not in seen:
|
|
columns.append("fetched_at")
|
|
seen.add("fetched_at")
|
|
if "payload" not in seen:
|
|
columns.append("payload")
|
|
|
|
return columns
|
|
|
|
def _build_row(
|
|
self,
|
|
spec: OdsTaskSpec,
|
|
store_id: int,
|
|
record: dict,
|
|
page_no: int | None,
|
|
page_size_value: int | None,
|
|
source_file: str | None,
|
|
) -> dict | None:
|
|
row: dict[str, Any] = {"site_id": store_id}
|
|
|
|
for col_spec in spec.pk_columns + spec.extra_columns:
|
|
value = self._extract_value(record, col_spec)
|
|
if value is None and col_spec.required:
|
|
self.logger.warning(
|
|
"%s 缺少必填字段 %s,原始记录: %s",
|
|
spec.code,
|
|
col_spec.column,
|
|
record,
|
|
)
|
|
return None
|
|
row[col_spec.column] = value
|
|
|
|
if spec.include_page_no:
|
|
row["page_no"] = page_no
|
|
if spec.include_page_size:
|
|
row["page_size"] = page_size_value
|
|
if spec.include_source_file:
|
|
row["source_file"] = source_file
|
|
if spec.include_source_endpoint:
|
|
row["source_endpoint"] = spec.endpoint
|
|
|
|
row["fetched_at"] = datetime.now(self.tz)
|
|
row["payload"] = record
|
|
return row
|
|
|
|
def _extract_value(self, record: dict, spec: ColumnSpec):
|
|
value = None
|
|
for key in spec.sources:
|
|
value = self._dig(record, key)
|
|
if value is not None:
|
|
break
|
|
if value is None and spec.default is not None:
|
|
value = spec.default
|
|
if value is not None and spec.transform:
|
|
value = spec.transform(value)
|
|
return value
|
|
|
|
@staticmethod
|
|
def _dig(record: Any, path: str | None):
|
|
if not path:
|
|
return None
|
|
current = record
|
|
for part in path.split("."):
|
|
if isinstance(current, dict):
|
|
current = current.get(part)
|
|
else:
|
|
return None
|
|
return current
|
|
|
|
def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None:
|
|
resolver = getattr(self.api, "get_source_hint", None)
|
|
if callable(resolver):
|
|
return resolver(spec.endpoint)
|
|
return None
|
|
|
|
|
|
def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec:
|
|
return ColumnSpec(
|
|
column=name,
|
|
sources=sources,
|
|
required=required,
|
|
transform=TypeParser.parse_int,
|
|
)
|
|
|
|
|
|
ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = (
|
|
OdsTaskSpec(
|
|
code="ODS_ORDER_SETTLE",
|
|
class_name="OdsOrderSettleTask",
|
|
table_name="billiards_ods.ods_order_settle",
|
|
endpoint="/Site/GetAllOrderSettleList",
|
|
data_path=("data",),
|
|
list_key="settleList",
|
|
pk_columns=(
|
|
_int_col(
|
|
"order_settle_id",
|
|
"orderSettleId",
|
|
"order_settle_id",
|
|
"settleList.id",
|
|
"id",
|
|
required=True,
|
|
),
|
|
),
|
|
extra_columns=(
|
|
_int_col("order_trade_no", "orderTradeNo", "order_trade_no", "settleList.orderTradeNo"),
|
|
),
|
|
include_page_size=False,
|
|
time_fields=("rangeStartTime", "rangeEndTime"),
|
|
description="订单/结算 ODS 原始记录",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_TABLE_USE",
|
|
class_name="OdsTableUseTask",
|
|
table_name="billiards_ods.ods_table_use_log",
|
|
endpoint="/Site/GetSiteTableOrderDetails",
|
|
data_path=("data",),
|
|
list_key="siteTableUseDetailsList",
|
|
pk_columns=(_int_col("ledger_id", "id", required=True),),
|
|
extra_columns=(
|
|
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
|
|
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
|
|
),
|
|
description="台费/开台流水 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_ASSISTANT_LEDGER",
|
|
class_name="OdsAssistantLedgerTask",
|
|
table_name="billiards_ods.ods_assistant_service_log",
|
|
endpoint="/AssistantPerformance/GetOrderAssistantDetails",
|
|
data_path=("data",),
|
|
list_key="orderAssistantDetails",
|
|
pk_columns=(_int_col("ledger_id", "id", required=True),),
|
|
extra_columns=(
|
|
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
|
|
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
|
|
),
|
|
description="助教流水 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_ASSISTANT_ABOLISH",
|
|
class_name="OdsAssistantAbolishTask",
|
|
table_name="billiards_ods.ods_assistant_cancel_log",
|
|
endpoint="/AssistantPerformance/GetAbolitionAssistant",
|
|
data_path=("data",),
|
|
list_key="abolitionAssistants",
|
|
pk_columns=(_int_col("abolish_id", "id", required=True),),
|
|
description="助教作废记录 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_GOODS_LEDGER",
|
|
class_name="OdsGoodsLedgerTask",
|
|
table_name="billiards_ods.ods_store_sale_item",
|
|
endpoint="/TenantGoods/GetGoodsSalesList",
|
|
data_path=("data",),
|
|
list_key="orderGoodsLedgers",
|
|
pk_columns=(_int_col("order_goods_id", "orderGoodsId", "id", required=True),),
|
|
extra_columns=(
|
|
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
|
|
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
|
|
),
|
|
description="商品销售流水 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_PAYMENT",
|
|
class_name="OdsPaymentTask",
|
|
table_name="billiards_ods.ods_payment_record",
|
|
endpoint="/PayLog/GetPayLogListPage",
|
|
data_path=("data",),
|
|
pk_columns=(_int_col("pay_id", "payId", "id", required=True),),
|
|
extra_columns=(
|
|
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
|
|
_int_col("relate_id", "relate_id", "relateId"),
|
|
),
|
|
include_page_size=False,
|
|
time_fields=("StartPayTime", "EndPayTime"),
|
|
description="支付流水 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_REFUND",
|
|
class_name="OdsRefundTask",
|
|
table_name="billiards_ods.ods_refund_record",
|
|
endpoint="/Order/GetRefundPayLogList",
|
|
data_path=("data",),
|
|
pk_columns=(_int_col("refund_id", "id", required=True),),
|
|
extra_columns=(
|
|
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
|
|
_int_col("relate_id", "relate_id", "relateId"),
|
|
),
|
|
description="退款流水 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_COUPON_VERIFY",
|
|
class_name="OdsCouponVerifyTask",
|
|
table_name="billiards_ods.ods_platform_coupon_log",
|
|
endpoint="/Promotion/GetOfflineCouponConsumePageList",
|
|
data_path=("data",),
|
|
pk_columns=(_int_col("coupon_id", "id", "couponId", required=True),),
|
|
description="平台验券/团购流水 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_MEMBER",
|
|
class_name="OdsMemberTask",
|
|
table_name="billiards_ods.ods_member_profile",
|
|
endpoint="/MemberProfile/GetTenantMemberList",
|
|
data_path=("data",),
|
|
list_key="tenantMemberInfos",
|
|
pk_columns=(_int_col("member_id", "memberId", required=True),),
|
|
requires_window=False,
|
|
description="会员档案 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_MEMBER_CARD",
|
|
class_name="OdsMemberCardTask",
|
|
table_name="billiards_ods.ods_member_card",
|
|
endpoint="/MemberProfile/GetTenantMemberCardList",
|
|
data_path=("data",),
|
|
list_key="tenantMemberCards",
|
|
pk_columns=(_int_col("card_id", "tenantMemberCardId", "cardId", required=True),),
|
|
requires_window=False,
|
|
description="会员卡/储值卡 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_PACKAGE",
|
|
class_name="OdsPackageTask",
|
|
table_name="billiards_ods.ods_group_package",
|
|
endpoint="/PackageCoupon/QueryPackageCouponList",
|
|
data_path=("data",),
|
|
list_key="packageCouponList",
|
|
pk_columns=(_int_col("package_id", "id", "packageId", required=True),),
|
|
requires_window=False,
|
|
description="团购/套餐定义 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_INVENTORY_STOCK",
|
|
class_name="OdsInventoryStockTask",
|
|
table_name="billiards_ods.ods_inventory_stock",
|
|
endpoint="/TenantGoods/GetGoodsStockReport",
|
|
data_path=("data",),
|
|
pk_columns=(
|
|
_int_col("site_goods_id", "siteGoodsId", required=True),
|
|
ColumnSpec(column="snapshot_key", default="default", required=True),
|
|
),
|
|
requires_window=False,
|
|
description="库存汇总 ODS",
|
|
),
|
|
OdsTaskSpec(
|
|
code="ODS_INVENTORY_CHANGE",
|
|
class_name="OdsInventoryChangeTask",
|
|
table_name="billiards_ods.ods_inventory_change",
|
|
endpoint="/GoodsStockManage/QueryGoodsOutboundReceipt",
|
|
data_path=("data",),
|
|
list_key="queryDeliveryRecordsList",
|
|
pk_columns=(_int_col("change_id", "siteGoodsStockId", "id", required=True),),
|
|
description="库存变动 ODS",
|
|
),
|
|
)
|
|
|
|
|
|
def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]:
|
|
attrs = {
|
|
"SPEC": spec,
|
|
"__doc__": spec.description or f"ODS ingestion task {spec.code}",
|
|
"__module__": __name__,
|
|
}
|
|
return type(spec.class_name, (BaseOdsTask,), attrs)
|
|
|
|
|
|
ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = {
|
|
spec.code: _build_task_class(spec) for spec in ODS_TASK_SPECS
|
|
}
|
|
|
|
__all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask"]
|