934 lines
34 KiB
Python
934 lines
34 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""ODS ingestion tasks."""
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type
|
||
|
||
from loaders.ods import GenericODSLoader
|
||
from models.parsers import TypeParser
|
||
from .base_task import BaseTask
|
||
|
||
|
||
ColumnTransform = Callable[[Any], Any]
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ColumnSpec:
|
||
"""Mapping between DB column and source JSON field."""
|
||
|
||
column: str
|
||
sources: Tuple[str, ...] = ()
|
||
required: bool = False
|
||
default: Any = None
|
||
transform: ColumnTransform | None = None
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class OdsTaskSpec:
|
||
"""Definition of a single ODS ingestion task."""
|
||
|
||
code: str
|
||
class_name: str
|
||
table_name: str
|
||
endpoint: str
|
||
data_path: Tuple[str, ...] = ("data",)
|
||
list_key: str | None = None
|
||
pk_columns: Tuple[ColumnSpec, ...] = ()
|
||
extra_columns: Tuple[ColumnSpec, ...] = ()
|
||
include_page_size: bool = False
|
||
include_page_no: bool = False
|
||
include_source_file: bool = True
|
||
include_source_endpoint: bool = True
|
||
include_record_index: bool = False
|
||
include_site_column: bool = True
|
||
include_fetched_at: bool = True
|
||
requires_window: bool = True
|
||
time_fields: Tuple[str, str] | None = ("startTime", "endTime")
|
||
include_site_id: bool = True
|
||
description: str = ""
|
||
extra_params: Dict[str, Any] = field(default_factory=dict)
|
||
conflict_columns_override: Tuple[str, ...] | None = None
|
||
|
||
|
||
class BaseOdsTask(BaseTask):
|
||
"""Shared functionality for ODS ingestion tasks."""
|
||
|
||
SPEC: OdsTaskSpec
|
||
|
||
def get_task_code(self) -> str:
|
||
return self.SPEC.code
|
||
|
||
def execute(self) -> dict:
|
||
spec = self.SPEC
|
||
self.logger.info("开始执行 %s (ODS)", spec.code)
|
||
|
||
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
|
||
if not store_id:
|
||
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
|
||
|
||
page_size = self.config.get("api.page_size", 200)
|
||
params = self._build_params(spec, store_id)
|
||
columns = self._resolve_columns(spec)
|
||
if spec.conflict_columns_override:
|
||
conflict_columns = list(spec.conflict_columns_override)
|
||
else:
|
||
conflict_columns = []
|
||
if spec.include_site_column:
|
||
conflict_columns.append("site_id")
|
||
conflict_columns += [col.column for col in spec.pk_columns]
|
||
loader = GenericODSLoader(
|
||
self.db,
|
||
spec.table_name,
|
||
columns,
|
||
conflict_columns,
|
||
)
|
||
|
||
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||
source_file = self._resolve_source_file_hint(spec)
|
||
|
||
try:
|
||
global_index = 0
|
||
for page_no, page_records, _, _ in self.api.iter_paginated(
|
||
endpoint=spec.endpoint,
|
||
params=params,
|
||
page_size=page_size,
|
||
data_path=spec.data_path,
|
||
list_key=spec.list_key,
|
||
):
|
||
rows: List[dict] = []
|
||
for raw in page_records:
|
||
row = self._build_row(
|
||
spec=spec,
|
||
store_id=store_id,
|
||
record=raw,
|
||
page_no=page_no if spec.include_page_no else None,
|
||
page_size_value=len(page_records)
|
||
if spec.include_page_size
|
||
else None,
|
||
source_file=source_file,
|
||
record_index=global_index if spec.include_record_index else None,
|
||
)
|
||
if row is None:
|
||
counts["skipped"] += 1
|
||
continue
|
||
rows.append(row)
|
||
global_index += 1
|
||
|
||
inserted, updated, _ = loader.upsert_rows(rows)
|
||
counts["inserted"] += inserted
|
||
counts["updated"] += updated
|
||
counts["fetched"] += len(page_records)
|
||
|
||
self.db.commit()
|
||
self.logger.info("%s ODS 任务完成: %s", spec.code, counts)
|
||
return self._build_result("SUCCESS", counts)
|
||
|
||
except Exception:
|
||
self.db.rollback()
|
||
counts["errors"] += 1
|
||
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
|
||
raise
|
||
|
||
def _build_params(self, spec: OdsTaskSpec, store_id: int) -> dict:
|
||
base: dict[str, Any] = {}
|
||
if spec.include_site_id:
|
||
base["siteId"] = store_id
|
||
if spec.requires_window and spec.time_fields:
|
||
window_start, window_end, _ = self._get_time_window()
|
||
start_key, end_key = spec.time_fields
|
||
base[start_key] = TypeParser.format_timestamp(window_start, self.tz)
|
||
base[end_key] = TypeParser.format_timestamp(window_end, self.tz)
|
||
|
||
params = self._merge_common_params(base)
|
||
params.update(spec.extra_params)
|
||
return params
|
||
|
||
def _resolve_columns(self, spec: OdsTaskSpec) -> List[str]:
|
||
columns: List[str] = []
|
||
if spec.include_site_column:
|
||
columns.append("site_id")
|
||
seen = set(columns)
|
||
for col_spec in list(spec.pk_columns) + list(spec.extra_columns):
|
||
if col_spec.column not in seen:
|
||
columns.append(col_spec.column)
|
||
seen.add(col_spec.column)
|
||
|
||
if spec.include_record_index and "record_index" not in seen:
|
||
columns.append("record_index")
|
||
seen.add("record_index")
|
||
|
||
if spec.include_page_no and "page_no" not in seen:
|
||
columns.append("page_no")
|
||
seen.add("page_no")
|
||
|
||
if spec.include_page_size and "page_size" not in seen:
|
||
columns.append("page_size")
|
||
seen.add("page_size")
|
||
|
||
if spec.include_source_file and "source_file" not in seen:
|
||
columns.append("source_file")
|
||
seen.add("source_file")
|
||
|
||
if spec.include_source_endpoint and "source_endpoint" not in seen:
|
||
columns.append("source_endpoint")
|
||
seen.add("source_endpoint")
|
||
|
||
if spec.include_fetched_at and "fetched_at" not in seen:
|
||
columns.append("fetched_at")
|
||
seen.add("fetched_at")
|
||
if "payload" not in seen:
|
||
columns.append("payload")
|
||
|
||
return columns
|
||
|
||
def _build_row(
|
||
self,
|
||
spec: OdsTaskSpec,
|
||
store_id: int,
|
||
record: dict,
|
||
page_no: int | None,
|
||
page_size_value: int | None,
|
||
source_file: str | None,
|
||
record_index: int | None = None,
|
||
) -> dict | None:
|
||
row: dict[str, Any] = {}
|
||
if spec.include_site_column:
|
||
row["site_id"] = store_id
|
||
|
||
for col_spec in spec.pk_columns + spec.extra_columns:
|
||
value = self._extract_value(record, col_spec)
|
||
if value is None and col_spec.required:
|
||
self.logger.warning(
|
||
"%s 缺少必填字段 %s,原始记录: %s",
|
||
spec.code,
|
||
col_spec.column,
|
||
record,
|
||
)
|
||
return None
|
||
row[col_spec.column] = value
|
||
|
||
if spec.include_page_no:
|
||
row["page_no"] = page_no
|
||
if spec.include_page_size:
|
||
row["page_size"] = page_size_value
|
||
if spec.include_record_index:
|
||
row["record_index"] = record_index
|
||
if spec.include_source_file:
|
||
row["source_file"] = source_file
|
||
if spec.include_source_endpoint:
|
||
row["source_endpoint"] = spec.endpoint
|
||
|
||
if spec.include_fetched_at:
|
||
row["fetched_at"] = datetime.now(self.tz)
|
||
row["payload"] = record
|
||
return row
|
||
|
||
def _extract_value(self, record: dict, spec: ColumnSpec):
|
||
value = None
|
||
for key in spec.sources:
|
||
value = self._dig(record, key)
|
||
if value is not None:
|
||
break
|
||
if value is None and spec.default is not None:
|
||
value = spec.default
|
||
if value is not None and spec.transform:
|
||
value = spec.transform(value)
|
||
return value
|
||
|
||
@staticmethod
|
||
def _dig(record: Any, path: str | None):
|
||
if not path:
|
||
return None
|
||
current = record
|
||
for part in path.split("."):
|
||
if isinstance(current, dict):
|
||
current = current.get(part)
|
||
else:
|
||
return None
|
||
return current
|
||
|
||
def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None:
|
||
resolver = getattr(self.api, "get_source_hint", None)
|
||
if callable(resolver):
|
||
return resolver(spec.endpoint)
|
||
return None
|
||
|
||
|
||
def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec:
|
||
return ColumnSpec(
|
||
column=name,
|
||
sources=sources,
|
||
required=required,
|
||
transform=TypeParser.parse_int,
|
||
)
|
||
|
||
|
||
ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = (
|
||
OdsTaskSpec(
|
||
code="ODS_ASSISTANT_ACCOUNTS",
|
||
class_name="OdsAssistantAccountsTask",
|
||
table_name="billiards_ods.assistant_accounts_master",
|
||
endpoint="/PersonnelManagement/SearchAssistantInfo",
|
||
data_path=("data",),
|
||
list_key="assistantInfos",
|
||
pk_columns=(_int_col("id", "id", required=True),),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
description="助教账号档案 ODS:SearchAssistantInfo -> assistantInfos 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_ORDER_SETTLE",
|
||
class_name="OdsOrderSettleTask",
|
||
table_name="billiards_ods.settlement_records",
|
||
endpoint="/Site/GetAllOrderSettleList",
|
||
data_path=("data",),
|
||
list_key="settleList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="结账记录 ODS:GetAllOrderSettleList -> settleList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_TABLE_USE",
|
||
class_name="OdsTableUseTask",
|
||
table_name="billiards_ods.table_fee_transactions",
|
||
endpoint="/Site/GetSiteTableOrderDetails",
|
||
data_path=("data",),
|
||
list_key="siteTableUseDetailsList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="台费计费流水 ODS:GetSiteTableOrderDetails -> siteTableUseDetailsList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_ASSISTANT_LEDGER",
|
||
class_name="OdsAssistantLedgerTask",
|
||
table_name="billiards_ods.assistant_service_records",
|
||
endpoint="/AssistantPerformance/GetOrderAssistantDetails",
|
||
data_path=("data",),
|
||
list_key="orderAssistantDetails",
|
||
pk_columns=(_int_col("id", "id", required=True),),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
description="助教服务流水 ODS:GetOrderAssistantDetails -> orderAssistantDetails 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_ASSISTANT_ABOLISH",
|
||
class_name="OdsAssistantAbolishTask",
|
||
table_name="billiards_ods.assistant_cancellation_records",
|
||
endpoint="/AssistantPerformance/GetAbolitionAssistant",
|
||
data_path=("data",),
|
||
list_key="abolitionAssistants",
|
||
pk_columns=(_int_col("id", "id", required=True),),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
description="助教废除记录 ODS:GetAbolitionAssistant -> abolitionAssistants 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_GOODS_LEDGER",
|
||
class_name="OdsGoodsLedgerTask",
|
||
table_name="billiards_ods.store_goods_sales_records",
|
||
endpoint="/TenantGoods/GetGoodsSalesList",
|
||
data_path=("data",),
|
||
list_key="orderGoodsLedgers",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="门店商品销售流水 ODS:GetGoodsSalesList -> orderGoodsLedgers 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_PAYMENT",
|
||
class_name="OdsPaymentTask",
|
||
table_name="billiards_ods.payment_transactions",
|
||
endpoint="/PayLog/GetPayLogListPage",
|
||
data_path=("data",),
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="支付流水 ODS:GetPayLogListPage 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_REFUND",
|
||
class_name="OdsRefundTask",
|
||
table_name="billiards_ods.refund_transactions",
|
||
endpoint="/Order/GetRefundPayLogList",
|
||
data_path=("data",),
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="退款流水 ODS:GetRefundPayLogList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_COUPON_VERIFY",
|
||
class_name="OdsCouponVerifyTask",
|
||
table_name="billiards_ods.platform_coupon_redemption_records",
|
||
endpoint="/Promotion/GetOfflineCouponConsumePageList",
|
||
data_path=("data",),
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="平台/团购券核销 ODS:GetOfflineCouponConsumePageList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_MEMBER",
|
||
class_name="OdsMemberTask",
|
||
table_name="billiards_ods.member_profiles",
|
||
endpoint="/MemberProfile/GetTenantMemberList",
|
||
data_path=("data",),
|
||
list_key="tenantMemberInfos",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="会员档案 ODS:GetTenantMemberList -> tenantMemberInfos 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_MEMBER_CARD",
|
||
class_name="OdsMemberCardTask",
|
||
table_name="billiards_ods.member_stored_value_cards",
|
||
endpoint="/MemberProfile/GetTenantMemberCardList",
|
||
data_path=("data",),
|
||
list_key="tenantMemberCards",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="会员储值卡 ODS:GetTenantMemberCardList -> tenantMemberCards 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_MEMBER_BALANCE",
|
||
class_name="OdsMemberBalanceTask",
|
||
table_name="billiards_ods.member_balance_changes",
|
||
endpoint="/MemberProfile/GetMemberCardBalanceChange",
|
||
data_path=("data",),
|
||
list_key="tenantMemberCardLogs",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="会员余额变动 ODS:GetMemberCardBalanceChange -> tenantMemberCardLogs 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_RECHARGE_SETTLE",
|
||
class_name="OdsRechargeSettleTask",
|
||
table_name="billiards_ods.recharge_settlements",
|
||
endpoint="/Site/GetRechargeSettleList",
|
||
data_path=("data",),
|
||
list_key="settleList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="会员充值结算 ODS:GetRechargeSettleList -> settleList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_PACKAGE",
|
||
class_name="OdsPackageTask",
|
||
table_name="billiards_ods.group_buy_packages",
|
||
endpoint="/PackageCoupon/QueryPackageCouponList",
|
||
data_path=("data",),
|
||
list_key="packageCouponList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="团购套餐定义 ODS:QueryPackageCouponList -> packageCouponList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_GROUP_BUY_REDEMPTION",
|
||
class_name="OdsGroupBuyRedemptionTask",
|
||
table_name="billiards_ods.group_buy_redemption_records",
|
||
endpoint="/Site/GetSiteTableUseDetails",
|
||
data_path=("data",),
|
||
list_key="siteTableUseDetailsList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="团购套餐核销 ODS:GetSiteTableUseDetails -> siteTableUseDetailsList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_INVENTORY_STOCK",
|
||
class_name="OdsInventoryStockTask",
|
||
table_name="billiards_ods.goods_stock_summary",
|
||
endpoint="/TenantGoods/GetGoodsStockReport",
|
||
data_path=("data",),
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="库存汇总 ODS:GetGoodsStockReport 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_INVENTORY_CHANGE",
|
||
class_name="OdsInventoryChangeTask",
|
||
table_name="billiards_ods.goods_stock_movements",
|
||
endpoint="/GoodsStockManage/QueryGoodsOutboundReceipt",
|
||
data_path=("data",),
|
||
list_key="queryDeliveryRecordsList",
|
||
pk_columns=(_int_col("sitegoodsstockid", "siteGoodsStockId", required=True),),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
description="库存变化记录 ODS:QueryGoodsOutboundReceipt -> queryDeliveryRecordsList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_TABLES",
|
||
class_name="OdsTablesTask",
|
||
table_name="billiards_ods.site_tables_master",
|
||
endpoint="/Table/GetSiteTables",
|
||
data_path=("data",),
|
||
list_key="siteTables",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="台桌维表 ODS:GetSiteTables -> siteTables 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_GOODS_CATEGORY",
|
||
class_name="OdsGoodsCategoryTask",
|
||
table_name="billiards_ods.stock_goods_category_tree",
|
||
endpoint="/TenantGoodsCategory/QueryPrimarySecondaryCategory",
|
||
data_path=("data",),
|
||
list_key="goodsCategoryList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="库存商品分类树 ODS:QueryPrimarySecondaryCategory -> goodsCategoryList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_STORE_GOODS",
|
||
class_name="OdsStoreGoodsTask",
|
||
table_name="billiards_ods.store_goods_master",
|
||
endpoint="/TenantGoods/GetGoodsInventoryList",
|
||
data_path=("data",),
|
||
list_key="orderGoodsList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="门店商品档案 ODS:GetGoodsInventoryList -> orderGoodsList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_TABLE_DISCOUNT",
|
||
class_name="OdsTableDiscountTask",
|
||
table_name="billiards_ods.table_fee_discount_records",
|
||
endpoint="/Site/GetTaiFeeAdjustList",
|
||
data_path=("data",),
|
||
list_key="taiFeeAdjustInfos",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="台费折扣/调账 ODS:GetTaiFeeAdjustList -> taiFeeAdjustInfos 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_TENANT_GOODS",
|
||
class_name="OdsTenantGoodsTask",
|
||
table_name="billiards_ods.tenant_goods_master",
|
||
endpoint="/TenantGoods/QueryTenantGoods",
|
||
data_path=("data",),
|
||
list_key="tenantGoodsList",
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=False,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
description="租户商品档案 ODS:QueryTenantGoods -> tenantGoodsList 原始 JSON",
|
||
),
|
||
OdsTaskSpec(
|
||
code="ODS_SETTLEMENT_TICKET",
|
||
class_name="OdsSettlementTicketTask",
|
||
table_name="billiards_ods.settlement_ticket_details",
|
||
endpoint="/Order/GetOrderSettleTicketNew",
|
||
data_path=(),
|
||
list_key=None,
|
||
pk_columns=(),
|
||
include_site_column=False,
|
||
include_source_endpoint=False,
|
||
include_page_no=False,
|
||
include_page_size=False,
|
||
include_fetched_at=True,
|
||
include_record_index=True,
|
||
conflict_columns_override=("source_file", "record_index"),
|
||
requires_window=False,
|
||
include_site_id=False,
|
||
description="结账小票详情 ODS:GetOrderSettleTicketNew 原始 JSON",
|
||
),
|
||
)
|
||
|
||
|
||
def _get_spec(code: str) -> OdsTaskSpec:
|
||
for spec in ODS_TASK_SPECS:
|
||
if spec.code == code:
|
||
return spec
|
||
raise KeyError(f"Spec not found for code {code}")
|
||
|
||
|
||
_SETTLEMENT_TICKET_SPEC = _get_spec("ODS_SETTLEMENT_TICKET")
|
||
|
||
|
||
class OdsSettlementTicketTask(BaseOdsTask):
|
||
"""Special handling: fetch ticket details per payment relate_id/orderSettleId."""
|
||
|
||
SPEC = _SETTLEMENT_TICKET_SPEC
|
||
|
||
def extract(self, context) -> dict:
|
||
"""Fetch ticket payloads only (used by fetch-only pipeline)."""
|
||
existing_ids = self._fetch_existing_ticket_ids()
|
||
candidates = self._collect_settlement_ids(
|
||
context.store_id or 0, existing_ids, context.window_start, context.window_end
|
||
)
|
||
candidates = [cid for cid in candidates if cid and cid not in existing_ids]
|
||
payloads, skipped = self._fetch_ticket_payloads(candidates)
|
||
return {"records": payloads, "skipped": skipped, "fetched": len(candidates)}
|
||
|
||
def execute(self, cursor_data: dict | None = None) -> dict:
|
||
spec = self.SPEC
|
||
context = self._build_context(cursor_data)
|
||
store_id = TypeParser.parse_int(self.config.get("app.store_id")) or 0
|
||
|
||
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||
loader = GenericODSLoader(
|
||
self.db,
|
||
spec.table_name,
|
||
self._resolve_columns(spec),
|
||
list(spec.conflict_columns_override or ("source_file", "record_index")),
|
||
)
|
||
source_file = self._resolve_source_file_hint(spec)
|
||
|
||
try:
|
||
existing_ids = self._fetch_existing_ticket_ids()
|
||
candidates = self._collect_settlement_ids(
|
||
store_id, existing_ids, context.window_start, context.window_end
|
||
)
|
||
candidates = [cid for cid in candidates if cid and cid not in existing_ids]
|
||
counts["fetched"] = len(candidates)
|
||
|
||
if not candidates:
|
||
self.logger.info(
|
||
"%s: 窗口[%s ~ %s] 未发现需要抓取的小票",
|
||
spec.code,
|
||
context.window_start,
|
||
context.window_end,
|
||
)
|
||
return self._build_result("SUCCESS", counts)
|
||
|
||
payloads, skipped = self._fetch_ticket_payloads(candidates)
|
||
counts["skipped"] += skipped
|
||
rows: list[dict] = []
|
||
for idx, payload in enumerate(payloads):
|
||
row = self._build_row(
|
||
spec=spec,
|
||
store_id=store_id,
|
||
record=payload,
|
||
page_no=None,
|
||
page_size_value=None,
|
||
source_file=source_file,
|
||
record_index=idx if spec.include_record_index else None,
|
||
)
|
||
if row is None:
|
||
counts["skipped"] += 1
|
||
continue
|
||
rows.append(row)
|
||
|
||
inserted, updated, _ = loader.upsert_rows(rows)
|
||
counts["inserted"] += inserted
|
||
counts["updated"] += updated
|
||
self.db.commit()
|
||
self.logger.info(
|
||
"%s: 小票抓取完成,候选=%s 插入=%s 更新=%s 跳过=%s",
|
||
spec.code,
|
||
len(candidates),
|
||
inserted,
|
||
updated,
|
||
counts["skipped"],
|
||
)
|
||
return self._build_result("SUCCESS", counts)
|
||
|
||
except Exception:
|
||
counts["errors"] += 1
|
||
self.db.rollback()
|
||
self.logger.error("%s: 小票抓取失败", spec.code, exc_info=True)
|
||
raise
|
||
|
||
# ------------------------------------------------------------------ helpers
|
||
def _fetch_existing_ticket_ids(self) -> set[int]:
|
||
sql = """
|
||
SELECT DISTINCT
|
||
CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$'
|
||
THEN (payload ->> 'orderSettleId')::bigint
|
||
END AS order_settle_id
|
||
FROM billiards_ods.settlement_ticket_details
|
||
"""
|
||
try:
|
||
rows = self.db.query(sql)
|
||
except Exception:
|
||
self.logger.warning("查询已有小票失败,按空集处理", exc_info=True)
|
||
return set()
|
||
|
||
return {
|
||
TypeParser.parse_int(row.get("order_settle_id"))
|
||
for row in rows
|
||
if row.get("order_settle_id") is not None
|
||
}
|
||
|
||
def _collect_settlement_ids(
|
||
self, store_id: int, existing_ids: set[int], window_start, window_end
|
||
) -> list[int]:
|
||
ids = self._fetch_from_payment_table(store_id)
|
||
if not ids:
|
||
ids = self._fetch_from_payment_api(store_id, window_start, window_end)
|
||
return sorted(i for i in ids if i is not None and i not in existing_ids)
|
||
|
||
def _fetch_from_payment_table(self, store_id: int) -> set[int]:
|
||
sql = """
|
||
SELECT DISTINCT COALESCE(
|
||
CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$'
|
||
THEN (payload ->> 'orderSettleId')::bigint END,
|
||
CASE WHEN (payload ->> 'relateId') ~ '^[0-9]+$'
|
||
THEN (payload ->> 'relateId')::bigint END
|
||
) AS order_settle_id
|
||
FROM billiards_ods.payment_transactions
|
||
WHERE (payload ->> 'orderSettleId') ~ '^[0-9]+$'
|
||
OR (payload ->> 'relateId') ~ '^[0-9]+$'
|
||
"""
|
||
params = None
|
||
if store_id:
|
||
sql += " AND COALESCE((payload ->> 'siteId')::bigint, %s) = %s"
|
||
params = (store_id, store_id)
|
||
|
||
try:
|
||
rows = self.db.query(sql, params)
|
||
except Exception:
|
||
self.logger.warning("读取支付流水以获取结算单ID失败,将尝试调用支付接口回退", exc_info=True)
|
||
return set()
|
||
|
||
return {
|
||
TypeParser.parse_int(row.get("order_settle_id"))
|
||
for row in rows
|
||
if row.get("order_settle_id") is not None
|
||
}
|
||
|
||
def _fetch_from_payment_api(self, store_id: int, window_start, window_end) -> set[int]:
|
||
params = self._merge_common_params(
|
||
{
|
||
"siteId": store_id,
|
||
"StartPayTime": TypeParser.format_timestamp(window_start, self.tz),
|
||
"EndPayTime": TypeParser.format_timestamp(window_end, self.tz),
|
||
}
|
||
)
|
||
candidate_ids: set[int] = set()
|
||
try:
|
||
for _, records, _, _ in self.api.iter_paginated(
|
||
endpoint="/PayLog/GetPayLogListPage",
|
||
params=params,
|
||
page_size=self.config.get("api.page_size", 200),
|
||
data_path=("data",),
|
||
):
|
||
for rec in records:
|
||
relate_id = TypeParser.parse_int(
|
||
(rec or {}).get("relateId")
|
||
or (rec or {}).get("orderSettleId")
|
||
or (rec or {}).get("order_settle_id")
|
||
)
|
||
if relate_id:
|
||
candidate_ids.add(relate_id)
|
||
except Exception:
|
||
self.logger.warning("调用支付接口获取结算单ID失败,当前批次将跳过回退来源", exc_info=True)
|
||
return candidate_ids
|
||
|
||
def _fetch_ticket_payload(self, order_settle_id: int):
|
||
payload = None
|
||
try:
|
||
for _, _, _, response in self.api.iter_paginated(
|
||
endpoint=self.SPEC.endpoint,
|
||
params={"orderSettleId": order_settle_id},
|
||
page_size=None,
|
||
data_path=self.SPEC.data_path,
|
||
list_key=self.SPEC.list_key,
|
||
):
|
||
payload = response
|
||
except Exception:
|
||
self.logger.warning(
|
||
"调用小票接口失败 orderSettleId=%s", order_settle_id, exc_info=True
|
||
)
|
||
if isinstance(payload, dict) and isinstance(payload.get("data"), list) and len(payload["data"]) == 1:
|
||
# 本地桩/回放可能把响应包装成单元素 list,这里展开以贴近真实结构
|
||
payload = payload["data"][0]
|
||
return payload
|
||
|
||
def _fetch_ticket_payloads(self, candidates: list[int]) -> tuple[list, int]:
|
||
"""Fetch ticket payloads for a set of orderSettleIds; returns (payloads, skipped)."""
|
||
payloads: list = []
|
||
skipped = 0
|
||
for order_settle_id in candidates:
|
||
payload = self._fetch_ticket_payload(order_settle_id)
|
||
if payload:
|
||
payloads.append(payload)
|
||
else:
|
||
skipped += 1
|
||
return payloads, skipped
|
||
|
||
|
||
def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]:
|
||
attrs = {
|
||
"SPEC": spec,
|
||
"__doc__": spec.description or f"ODS ingestion task {spec.code}",
|
||
"__module__": __name__,
|
||
}
|
||
return type(spec.class_name, (BaseOdsTask,), attrs)
|
||
|
||
|
||
ENABLED_ODS_CODES = {
|
||
"ODS_ASSISTANT_ACCOUNTS",
|
||
"ODS_ASSISTANT_LEDGER",
|
||
"ODS_ASSISTANT_ABOLISH",
|
||
"ODS_INVENTORY_CHANGE",
|
||
"ODS_INVENTORY_STOCK",
|
||
"ODS_PACKAGE",
|
||
"ODS_GROUP_BUY_REDEMPTION",
|
||
"ODS_MEMBER",
|
||
"ODS_MEMBER_BALANCE",
|
||
"ODS_MEMBER_CARD",
|
||
"ODS_PAYMENT",
|
||
"ODS_REFUND",
|
||
"ODS_COUPON_VERIFY",
|
||
"ODS_RECHARGE_SETTLE",
|
||
"ODS_TABLES",
|
||
"ODS_GOODS_CATEGORY",
|
||
"ODS_STORE_GOODS",
|
||
"ODS_TABLE_DISCOUNT",
|
||
"ODS_TENANT_GOODS",
|
||
"ODS_SETTLEMENT_TICKET",
|
||
"ODS_ORDER_SETTLE",
|
||
}
|
||
|
||
ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = {
|
||
spec.code: _build_task_class(spec)
|
||
for spec in ODS_TASK_SPECS
|
||
if spec.code in ENABLED_ODS_CODES
|
||
}
|
||
# Override with specialized settlement ticket implementation
|
||
ODS_TASK_CLASSES["ODS_SETTLEMENT_TICKET"] = OdsSettlementTicketTask
|
||
|
||
__all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask", "ENABLED_ODS_CODES"]
|