# -*- coding: utf-8 -*- """ODS ingestion tasks.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type from loaders.ods import GenericODSLoader from models.parsers import TypeParser from .base_task import BaseTask ColumnTransform = Callable[[Any], Any] @dataclass(frozen=True) class ColumnSpec: """Mapping between DB column and source JSON field.""" column: str sources: Tuple[str, ...] = () required: bool = False default: Any = None transform: ColumnTransform | None = None @dataclass(frozen=True) class OdsTaskSpec: """Definition of a single ODS ingestion task.""" code: str class_name: str table_name: str endpoint: str data_path: Tuple[str, ...] = ("data",) list_key: str | None = None pk_columns: Tuple[ColumnSpec, ...] = () extra_columns: Tuple[ColumnSpec, ...] = () include_page_size: bool = False include_page_no: bool = False include_source_file: bool = True include_source_endpoint: bool = True include_record_index: bool = False include_site_column: bool = True include_fetched_at: bool = True requires_window: bool = True time_fields: Tuple[str, str] | None = ("startTime", "endTime") include_site_id: bool = True description: str = "" extra_params: Dict[str, Any] = field(default_factory=dict) conflict_columns_override: Tuple[str, ...] | None = None class BaseOdsTask(BaseTask): """Shared functionality for ODS ingestion tasks.""" SPEC: OdsTaskSpec def get_task_code(self) -> str: return self.SPEC.code def execute(self) -> dict: spec = self.SPEC self.logger.info("开始执行 %s (ODS)", spec.code) store_id = TypeParser.parse_int(self.config.get("app.store_id")) if not store_id: raise ValueError("app.store_id 未配置,无法执行 ODS 任务") page_size = self.config.get("api.page_size", 200) params = self._build_params(spec, store_id) columns = self._resolve_columns(spec) if spec.conflict_columns_override: conflict_columns = list(spec.conflict_columns_override) else: conflict_columns = [] if spec.include_site_column: conflict_columns.append("site_id") conflict_columns += [col.column for col in spec.pk_columns] loader = GenericODSLoader( self.db, spec.table_name, columns, conflict_columns, ) counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0} source_file = self._resolve_source_file_hint(spec) try: global_index = 0 for page_no, page_records, _, _ in self.api.iter_paginated( endpoint=spec.endpoint, params=params, page_size=page_size, data_path=spec.data_path, list_key=spec.list_key, ): rows: List[dict] = [] for raw in page_records: row = self._build_row( spec=spec, store_id=store_id, record=raw, page_no=page_no if spec.include_page_no else None, page_size_value=len(page_records) if spec.include_page_size else None, source_file=source_file, record_index=global_index if spec.include_record_index else None, ) if row is None: counts["skipped"] += 1 continue rows.append(row) global_index += 1 inserted, updated, _ = loader.upsert_rows(rows) counts["inserted"] += inserted counts["updated"] += updated counts["fetched"] += len(page_records) self.db.commit() self.logger.info("%s ODS 任务完成: %s", spec.code, counts) return self._build_result("SUCCESS", counts) except Exception: self.db.rollback() counts["errors"] += 1 self.logger.error("%s ODS 任务失败", spec.code, exc_info=True) raise def _build_params(self, spec: OdsTaskSpec, store_id: int) -> dict: base: dict[str, Any] = {} if spec.include_site_id: base["siteId"] = store_id if spec.requires_window and spec.time_fields: window_start, window_end, _ = self._get_time_window() start_key, end_key = spec.time_fields base[start_key] = TypeParser.format_timestamp(window_start, self.tz) base[end_key] = TypeParser.format_timestamp(window_end, self.tz) params = self._merge_common_params(base) params.update(spec.extra_params) return params def _resolve_columns(self, spec: OdsTaskSpec) -> List[str]: columns: List[str] = [] if spec.include_site_column: columns.append("site_id") seen = set(columns) for col_spec in list(spec.pk_columns) + list(spec.extra_columns): if col_spec.column not in seen: columns.append(col_spec.column) seen.add(col_spec.column) if spec.include_record_index and "record_index" not in seen: columns.append("record_index") seen.add("record_index") if spec.include_page_no and "page_no" not in seen: columns.append("page_no") seen.add("page_no") if spec.include_page_size and "page_size" not in seen: columns.append("page_size") seen.add("page_size") if spec.include_source_file and "source_file" not in seen: columns.append("source_file") seen.add("source_file") if spec.include_source_endpoint and "source_endpoint" not in seen: columns.append("source_endpoint") seen.add("source_endpoint") if spec.include_fetched_at and "fetched_at" not in seen: columns.append("fetched_at") seen.add("fetched_at") if "payload" not in seen: columns.append("payload") return columns def _build_row( self, spec: OdsTaskSpec, store_id: int, record: dict, page_no: int | None, page_size_value: int | None, source_file: str | None, record_index: int | None = None, ) -> dict | None: row: dict[str, Any] = {} if spec.include_site_column: row["site_id"] = store_id for col_spec in spec.pk_columns + spec.extra_columns: value = self._extract_value(record, col_spec) if value is None and col_spec.required: self.logger.warning( "%s 缺少必填字段 %s,原始记录: %s", spec.code, col_spec.column, record, ) return None row[col_spec.column] = value if spec.include_page_no: row["page_no"] = page_no if spec.include_page_size: row["page_size"] = page_size_value if spec.include_record_index: row["record_index"] = record_index if spec.include_source_file: row["source_file"] = source_file if spec.include_source_endpoint: row["source_endpoint"] = spec.endpoint if spec.include_fetched_at: row["fetched_at"] = datetime.now(self.tz) row["payload"] = record return row def _extract_value(self, record: dict, spec: ColumnSpec): value = None for key in spec.sources: value = self._dig(record, key) if value is not None: break if value is None and spec.default is not None: value = spec.default if value is not None and spec.transform: value = spec.transform(value) return value @staticmethod def _dig(record: Any, path: str | None): if not path: return None current = record for part in path.split("."): if isinstance(current, dict): current = current.get(part) else: return None return current def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None: resolver = getattr(self.api, "get_source_hint", None) if callable(resolver): return resolver(spec.endpoint) return None def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec: return ColumnSpec( column=name, sources=sources, required=required, transform=TypeParser.parse_int, ) ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = ( OdsTaskSpec( code="ODS_ASSISTANT_ACCOUNTS", class_name="OdsAssistantAccountsTask", table_name="billiards_ods.assistant_accounts_master", endpoint="/PersonnelManagement/SearchAssistantInfo", data_path=("data",), list_key="assistantInfos", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="助教账号档案 ODS:SearchAssistantInfo -> assistantInfos 原始 JSON", ), OdsTaskSpec( code="ODS_ORDER_SETTLE", class_name="OdsOrderSettleTask", table_name="billiards_ods.settlement_records", endpoint="/Site/GetAllOrderSettleList", data_path=("data",), list_key="settleList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="结账记录 ODS:GetAllOrderSettleList -> settleList 原始 JSON", ), OdsTaskSpec( code="ODS_TABLE_USE", class_name="OdsTableUseTask", table_name="billiards_ods.table_fee_transactions", endpoint="/Site/GetSiteTableOrderDetails", data_path=("data",), list_key="siteTableUseDetailsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="台费计费流水 ODS:GetSiteTableOrderDetails -> siteTableUseDetailsList 原始 JSON", ), OdsTaskSpec( code="ODS_ASSISTANT_LEDGER", class_name="OdsAssistantLedgerTask", table_name="billiards_ods.assistant_service_records", endpoint="/AssistantPerformance/GetOrderAssistantDetails", data_path=("data",), list_key="orderAssistantDetails", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="助教服务流水 ODS:GetOrderAssistantDetails -> orderAssistantDetails 原始 JSON", ), OdsTaskSpec( code="ODS_ASSISTANT_ABOLISH", class_name="OdsAssistantAbolishTask", table_name="billiards_ods.assistant_cancellation_records", endpoint="/AssistantPerformance/GetAbolitionAssistant", data_path=("data",), list_key="abolitionAssistants", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="助教废除记录 ODS:GetAbolitionAssistant -> abolitionAssistants 原始 JSON", ), OdsTaskSpec( code="ODS_GOODS_LEDGER", class_name="OdsGoodsLedgerTask", table_name="billiards_ods.store_goods_sales_records", endpoint="/TenantGoods/GetGoodsSalesList", data_path=("data",), list_key="orderGoodsLedgers", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="门店商品销售流水 ODS:GetGoodsSalesList -> orderGoodsLedgers 原始 JSON", ), OdsTaskSpec( code="ODS_PAYMENT", class_name="OdsPaymentTask", table_name="billiards_ods.payment_transactions", endpoint="/PayLog/GetPayLogListPage", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="支付流水 ODS:GetPayLogListPage 原始 JSON", ), OdsTaskSpec( code="ODS_REFUND", class_name="OdsRefundTask", table_name="billiards_ods.refund_transactions", endpoint="/Order/GetRefundPayLogList", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="退款流水 ODS:GetRefundPayLogList 原始 JSON", ), OdsTaskSpec( code="ODS_COUPON_VERIFY", class_name="OdsCouponVerifyTask", table_name="billiards_ods.platform_coupon_redemption_records", endpoint="/Promotion/GetOfflineCouponConsumePageList", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="平台/团购券核销 ODS:GetOfflineCouponConsumePageList 原始 JSON", ), OdsTaskSpec( code="ODS_MEMBER", class_name="OdsMemberTask", table_name="billiards_ods.member_profiles", endpoint="/MemberProfile/GetTenantMemberList", data_path=("data",), list_key="tenantMemberInfos", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="会员档案 ODS:GetTenantMemberList -> tenantMemberInfos 原始 JSON", ), OdsTaskSpec( code="ODS_MEMBER_CARD", class_name="OdsMemberCardTask", table_name="billiards_ods.member_stored_value_cards", endpoint="/MemberProfile/GetTenantMemberCardList", data_path=("data",), list_key="tenantMemberCards", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="会员储值卡 ODS:GetTenantMemberCardList -> tenantMemberCards 原始 JSON", ), OdsTaskSpec( code="ODS_MEMBER_BALANCE", class_name="OdsMemberBalanceTask", table_name="billiards_ods.member_balance_changes", endpoint="/MemberProfile/GetMemberCardBalanceChange", data_path=("data",), list_key="tenantMemberCardLogs", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="会员余额变动 ODS:GetMemberCardBalanceChange -> tenantMemberCardLogs 原始 JSON", ), OdsTaskSpec( code="ODS_RECHARGE_SETTLE", class_name="OdsRechargeSettleTask", table_name="billiards_ods.recharge_settlements", endpoint="/Site/GetRechargeSettleList", data_path=("data",), list_key="settleList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="会员充值结算 ODS:GetRechargeSettleList -> settleList 原始 JSON", ), OdsTaskSpec( code="ODS_PACKAGE", class_name="OdsPackageTask", table_name="billiards_ods.group_buy_packages", endpoint="/PackageCoupon/QueryPackageCouponList", data_path=("data",), list_key="packageCouponList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="团购套餐定义 ODS:QueryPackageCouponList -> packageCouponList 原始 JSON", ), OdsTaskSpec( code="ODS_GROUP_BUY_REDEMPTION", class_name="OdsGroupBuyRedemptionTask", table_name="billiards_ods.group_buy_redemption_records", endpoint="/Site/GetSiteTableUseDetails", data_path=("data",), list_key="siteTableUseDetailsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="团购套餐核销 ODS:GetSiteTableUseDetails -> siteTableUseDetailsList 原始 JSON", ), OdsTaskSpec( code="ODS_INVENTORY_STOCK", class_name="OdsInventoryStockTask", table_name="billiards_ods.goods_stock_summary", endpoint="/TenantGoods/GetGoodsStockReport", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="库存汇总 ODS:GetGoodsStockReport 原始 JSON", ), OdsTaskSpec( code="ODS_INVENTORY_CHANGE", class_name="OdsInventoryChangeTask", table_name="billiards_ods.goods_stock_movements", endpoint="/GoodsStockManage/QueryGoodsOutboundReceipt", data_path=("data",), list_key="queryDeliveryRecordsList", pk_columns=(_int_col("sitegoodsstockid", "siteGoodsStockId", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="库存变化记录 ODS:QueryGoodsOutboundReceipt -> queryDeliveryRecordsList 原始 JSON", ), OdsTaskSpec( code="ODS_TABLES", class_name="OdsTablesTask", table_name="billiards_ods.site_tables_master", endpoint="/Table/GetSiteTables", data_path=("data",), list_key="siteTables", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="台桌维表 ODS:GetSiteTables -> siteTables 原始 JSON", ), OdsTaskSpec( code="ODS_GOODS_CATEGORY", class_name="OdsGoodsCategoryTask", table_name="billiards_ods.stock_goods_category_tree", endpoint="/TenantGoodsCategory/QueryPrimarySecondaryCategory", data_path=("data",), list_key="goodsCategoryList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="库存商品分类树 ODS:QueryPrimarySecondaryCategory -> goodsCategoryList 原始 JSON", ), OdsTaskSpec( code="ODS_STORE_GOODS", class_name="OdsStoreGoodsTask", table_name="billiards_ods.store_goods_master", endpoint="/TenantGoods/GetGoodsInventoryList", data_path=("data",), list_key="orderGoodsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="门店商品档案 ODS:GetGoodsInventoryList -> orderGoodsList 原始 JSON", ), OdsTaskSpec( code="ODS_TABLE_DISCOUNT", class_name="OdsTableDiscountTask", table_name="billiards_ods.table_fee_discount_records", endpoint="/Site/GetTaiFeeAdjustList", data_path=("data",), list_key="taiFeeAdjustInfos", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="台费折扣/调账 ODS:GetTaiFeeAdjustList -> taiFeeAdjustInfos 原始 JSON", ), OdsTaskSpec( code="ODS_TENANT_GOODS", class_name="OdsTenantGoodsTask", table_name="billiards_ods.tenant_goods_master", endpoint="/TenantGoods/QueryTenantGoods", data_path=("data",), list_key="tenantGoodsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="租户商品档案 ODS:QueryTenantGoods -> tenantGoodsList 原始 JSON", ), OdsTaskSpec( code="ODS_SETTLEMENT_TICKET", class_name="OdsSettlementTicketTask", table_name="billiards_ods.settlement_ticket_details", endpoint="/Order/GetOrderSettleTicketNew", data_path=(), list_key=None, pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=True, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, include_site_id=False, description="结账小票详情 ODS:GetOrderSettleTicketNew 原始 JSON", ), ) def _get_spec(code: str) -> OdsTaskSpec: for spec in ODS_TASK_SPECS: if spec.code == code: return spec raise KeyError(f"Spec not found for code {code}") _SETTLEMENT_TICKET_SPEC = _get_spec("ODS_SETTLEMENT_TICKET") class OdsSettlementTicketTask(BaseOdsTask): """Special handling: fetch ticket details per payment relate_id/orderSettleId.""" SPEC = _SETTLEMENT_TICKET_SPEC def extract(self, context) -> dict: """Fetch ticket payloads only (used by fetch-only pipeline).""" existing_ids = self._fetch_existing_ticket_ids() candidates = self._collect_settlement_ids( context.store_id or 0, existing_ids, context.window_start, context.window_end ) candidates = [cid for cid in candidates if cid and cid not in existing_ids] payloads, skipped = self._fetch_ticket_payloads(candidates) return {"records": payloads, "skipped": skipped, "fetched": len(candidates)} def execute(self, cursor_data: dict | None = None) -> dict: spec = self.SPEC context = self._build_context(cursor_data) store_id = TypeParser.parse_int(self.config.get("app.store_id")) or 0 counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0} loader = GenericODSLoader( self.db, spec.table_name, self._resolve_columns(spec), list(spec.conflict_columns_override or ("source_file", "record_index")), ) source_file = self._resolve_source_file_hint(spec) try: existing_ids = self._fetch_existing_ticket_ids() candidates = self._collect_settlement_ids( store_id, existing_ids, context.window_start, context.window_end ) candidates = [cid for cid in candidates if cid and cid not in existing_ids] counts["fetched"] = len(candidates) if not candidates: self.logger.info( "%s: 窗口[%s ~ %s] 未发现需要抓取的小票", spec.code, context.window_start, context.window_end, ) return self._build_result("SUCCESS", counts) payloads, skipped = self._fetch_ticket_payloads(candidates) counts["skipped"] += skipped rows: list[dict] = [] for idx, payload in enumerate(payloads): row = self._build_row( spec=spec, store_id=store_id, record=payload, page_no=None, page_size_value=None, source_file=source_file, record_index=idx if spec.include_record_index else None, ) if row is None: counts["skipped"] += 1 continue rows.append(row) inserted, updated, _ = loader.upsert_rows(rows) counts["inserted"] += inserted counts["updated"] += updated self.db.commit() self.logger.info( "%s: 小票抓取完成,候选=%s 插入=%s 更新=%s 跳过=%s", spec.code, len(candidates), inserted, updated, counts["skipped"], ) return self._build_result("SUCCESS", counts) except Exception: counts["errors"] += 1 self.db.rollback() self.logger.error("%s: 小票抓取失败", spec.code, exc_info=True) raise # ------------------------------------------------------------------ helpers def _fetch_existing_ticket_ids(self) -> set[int]: sql = """ SELECT DISTINCT CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$' THEN (payload ->> 'orderSettleId')::bigint END AS order_settle_id FROM billiards_ods.settlement_ticket_details """ try: rows = self.db.query(sql) except Exception: self.logger.warning("查询已有小票失败,按空集处理", exc_info=True) return set() return { TypeParser.parse_int(row.get("order_settle_id")) for row in rows if row.get("order_settle_id") is not None } def _collect_settlement_ids( self, store_id: int, existing_ids: set[int], window_start, window_end ) -> list[int]: ids = self._fetch_from_payment_table(store_id) if not ids: ids = self._fetch_from_payment_api(store_id, window_start, window_end) return sorted(i for i in ids if i is not None and i not in existing_ids) def _fetch_from_payment_table(self, store_id: int) -> set[int]: sql = """ SELECT DISTINCT COALESCE( CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$' THEN (payload ->> 'orderSettleId')::bigint END, CASE WHEN (payload ->> 'relateId') ~ '^[0-9]+$' THEN (payload ->> 'relateId')::bigint END ) AS order_settle_id FROM billiards_ods.payment_transactions WHERE (payload ->> 'orderSettleId') ~ '^[0-9]+$' OR (payload ->> 'relateId') ~ '^[0-9]+$' """ params = None if store_id: sql += " AND COALESCE((payload ->> 'siteId')::bigint, %s) = %s" params = (store_id, store_id) try: rows = self.db.query(sql, params) except Exception: self.logger.warning("读取支付流水以获取结算单ID失败,将尝试调用支付接口回退", exc_info=True) return set() return { TypeParser.parse_int(row.get("order_settle_id")) for row in rows if row.get("order_settle_id") is not None } def _fetch_from_payment_api(self, store_id: int, window_start, window_end) -> set[int]: params = self._merge_common_params( { "siteId": store_id, "StartPayTime": TypeParser.format_timestamp(window_start, self.tz), "EndPayTime": TypeParser.format_timestamp(window_end, self.tz), } ) candidate_ids: set[int] = set() try: for _, records, _, _ in self.api.iter_paginated( endpoint="/PayLog/GetPayLogListPage", params=params, page_size=self.config.get("api.page_size", 200), data_path=("data",), ): for rec in records: relate_id = TypeParser.parse_int( (rec or {}).get("relateId") or (rec or {}).get("orderSettleId") or (rec or {}).get("order_settle_id") ) if relate_id: candidate_ids.add(relate_id) except Exception: self.logger.warning("调用支付接口获取结算单ID失败,当前批次将跳过回退来源", exc_info=True) return candidate_ids def _fetch_ticket_payload(self, order_settle_id: int): payload = None try: for _, _, _, response in self.api.iter_paginated( endpoint=self.SPEC.endpoint, params={"orderSettleId": order_settle_id}, page_size=None, data_path=self.SPEC.data_path, list_key=self.SPEC.list_key, ): payload = response except Exception: self.logger.warning( "调用小票接口失败 orderSettleId=%s", order_settle_id, exc_info=True ) if isinstance(payload, dict) and isinstance(payload.get("data"), list) and len(payload["data"]) == 1: # 本地桩/回放可能把响应包装成单元素 list,这里展开以贴近真实结构 payload = payload["data"][0] return payload def _fetch_ticket_payloads(self, candidates: list[int]) -> tuple[list, int]: """Fetch ticket payloads for a set of orderSettleIds; returns (payloads, skipped).""" payloads: list = [] skipped = 0 for order_settle_id in candidates: payload = self._fetch_ticket_payload(order_settle_id) if payload: payloads.append(payload) else: skipped += 1 return payloads, skipped def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]: attrs = { "SPEC": spec, "__doc__": spec.description or f"ODS ingestion task {spec.code}", "__module__": __name__, } return type(spec.class_name, (BaseOdsTask,), attrs) ENABLED_ODS_CODES = { "ODS_ASSISTANT_ACCOUNTS", "ODS_ASSISTANT_LEDGER", "ODS_ASSISTANT_ABOLISH", "ODS_INVENTORY_CHANGE", "ODS_INVENTORY_STOCK", "ODS_PACKAGE", "ODS_GROUP_BUY_REDEMPTION", "ODS_MEMBER", "ODS_MEMBER_BALANCE", "ODS_MEMBER_CARD", "ODS_PAYMENT", "ODS_REFUND", "ODS_COUPON_VERIFY", "ODS_RECHARGE_SETTLE", "ODS_TABLES", "ODS_GOODS_CATEGORY", "ODS_STORE_GOODS", "ODS_TABLE_DISCOUNT", "ODS_TENANT_GOODS", "ODS_SETTLEMENT_TICKET", "ODS_ORDER_SETTLE", } ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = { spec.code: _build_task_class(spec) for spec in ODS_TASK_SPECS if spec.code in ENABLED_ODS_CODES } # Override with specialized settlement ticket implementation ODS_TASK_CLASSES["ODS_SETTLEMENT_TICKET"] = OdsSettlementTicketTask __all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask", "ENABLED_ODS_CODES"]