# -*- coding: utf-8 -*- """ODS ingestion tasks.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type from loaders.ods import GenericODSLoader from models.parsers import TypeParser from .base_task import BaseTask ColumnTransform = Callable[[Any], Any] @dataclass(frozen=True) class ColumnSpec: """Mapping between DB column and source JSON field.""" column: str sources: Tuple[str, ...] = () required: bool = False default: Any = None transform: ColumnTransform | None = None @dataclass(frozen=True) class OdsTaskSpec: """Definition of a single ODS ingestion task.""" code: str class_name: str table_name: str endpoint: str data_path: Tuple[str, ...] = ("data",) list_key: str | None = None pk_columns: Tuple[ColumnSpec, ...] = () extra_columns: Tuple[ColumnSpec, ...] = () include_page_size: bool = False include_page_no: bool = False include_source_file: bool = True include_source_endpoint: bool = True include_record_index: bool = False include_site_column: bool = True include_fetched_at: bool = True requires_window: bool = True time_fields: Tuple[str, str] | None = ("startTime", "endTime") include_site_id: bool = True description: str = "" extra_params: Dict[str, Any] = field(default_factory=dict) conflict_columns_override: Tuple[str, ...] | None = None class BaseOdsTask(BaseTask): """Shared functionality for ODS ingestion tasks.""" SPEC: OdsTaskSpec def get_task_code(self) -> str: return self.SPEC.code def execute(self) -> dict: spec = self.SPEC self.logger.info("寮€濮嬫墽琛?%s (ODS)", spec.code) store_id = TypeParser.parse_int(self.config.get("app.store_id")) if not store_id: raise ValueError("app.store_id 鏈厤缃紝鏃犳硶鎵ц ODS 浠诲姟") page_size = self.config.get("api.page_size", 200) params = self._build_params(spec, store_id) columns = self._resolve_columns(spec) if spec.conflict_columns_override: conflict_columns = list(spec.conflict_columns_override) else: conflict_columns = [] if spec.include_site_column: conflict_columns.append("site_id") conflict_columns += [col.column for col in spec.pk_columns] loader = GenericODSLoader( self.db, spec.table_name, columns, conflict_columns, ) counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0} source_file = self._resolve_source_file_hint(spec) try: global_index = 0 for page_no, page_records, _, _ in self.api.iter_paginated( endpoint=spec.endpoint, params=params, page_size=page_size, data_path=spec.data_path, list_key=spec.list_key, ): rows: List[dict] = [] for raw in page_records: row = self._build_row( spec=spec, store_id=store_id, record=raw, page_no=page_no if spec.include_page_no else None, page_size_value=len(page_records) if spec.include_page_size else None, source_file=source_file, record_index=global_index if spec.include_record_index else None, ) if row is None: counts["skipped"] += 1 continue rows.append(row) global_index += 1 inserted, updated, _ = loader.upsert_rows(rows) counts["inserted"] += inserted counts["updated"] += updated counts["fetched"] += len(page_records) self.db.commit() self.logger.info("%s ODS 浠诲姟瀹屾垚: %s", spec.code, counts) return self._build_result("SUCCESS", counts) except Exception: self.db.rollback() counts["errors"] += 1 self.logger.error("%s ODS 浠诲姟澶辫触", spec.code, exc_info=True) raise def _build_params(self, spec: OdsTaskSpec, store_id: int) -> dict: base: dict[str, Any] = {} if spec.include_site_id: base["siteId"] = store_id if spec.requires_window and spec.time_fields: window_start, window_end, _ = self._get_time_window() start_key, end_key = spec.time_fields base[start_key] = TypeParser.format_timestamp(window_start, self.tz) base[end_key] = TypeParser.format_timestamp(window_end, self.tz) params = self._merge_common_params(base) params.update(spec.extra_params) return params def _resolve_columns(self, spec: OdsTaskSpec) -> List[str]: columns: List[str] = [] if spec.include_site_column: columns.append("site_id") seen = set(columns) for col_spec in list(spec.pk_columns) + list(spec.extra_columns): if col_spec.column not in seen: columns.append(col_spec.column) seen.add(col_spec.column) if spec.include_record_index and "record_index" not in seen: columns.append("record_index") seen.add("record_index") if spec.include_page_no and "page_no" not in seen: columns.append("page_no") seen.add("page_no") if spec.include_page_size and "page_size" not in seen: columns.append("page_size") seen.add("page_size") if spec.include_source_file and "source_file" not in seen: columns.append("source_file") seen.add("source_file") if spec.include_source_endpoint and "source_endpoint" not in seen: columns.append("source_endpoint") seen.add("source_endpoint") if spec.include_fetched_at and "fetched_at" not in seen: columns.append("fetched_at") seen.add("fetched_at") if "payload" not in seen: columns.append("payload") return columns def _build_row( self, spec: OdsTaskSpec, store_id: int, record: dict, page_no: int | None, page_size_value: int | None, source_file: str | None, record_index: int | None = None, ) -> dict | None: row: dict[str, Any] = {} if spec.include_site_column: row["site_id"] = store_id for col_spec in spec.pk_columns + spec.extra_columns: value = self._extract_value(record, col_spec) if value is None and col_spec.required: self.logger.warning( "%s 缂哄皯蹇呭~瀛楁 %s锛屽師濮嬭褰? %s", spec.code, col_spec.column, record, ) return None row[col_spec.column] = value if spec.include_page_no: row["page_no"] = page_no if spec.include_page_size: row["page_size"] = page_size_value if spec.include_record_index: row["record_index"] = record_index if spec.include_source_file: row["source_file"] = source_file if spec.include_source_endpoint: row["source_endpoint"] = spec.endpoint if spec.include_fetched_at: row["fetched_at"] = datetime.now(self.tz) row["payload"] = record return row def _extract_value(self, record: dict, spec: ColumnSpec): value = None for key in spec.sources: value = self._dig(record, key) if value is not None: break if value is None and spec.default is not None: value = spec.default if value is not None and spec.transform: value = spec.transform(value) return value @staticmethod def _dig(record: Any, path: str | None): if not path: return None current = record for part in path.split("."): if isinstance(current, dict): current = current.get(part) else: return None return current def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None: resolver = getattr(self.api, "get_source_hint", None) if callable(resolver): return resolver(spec.endpoint) return None def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec: return ColumnSpec( column=name, sources=sources, required=required, transform=TypeParser.parse_int, ) def _decimal_col(name: str, *sources: str) -> ColumnSpec: """??????????????""" return ColumnSpec( column=name, sources=sources, transform=lambda v: TypeParser.parse_decimal(v, 2), ) def _bool_col(name: str, *sources: str) -> ColumnSpec: """??????????????0/1?true/false ???""" def _to_bool(value): if value is None: return None if isinstance(value, bool): return value s = str(value).strip().lower() if s in {"1", "true", "t", "yes", "y"}: return True if s in {"0", "false", "f", "no", "n"}: return False return bool(value) return ColumnSpec(column=name, sources=sources, transform=_to_bool) ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = ( OdsTaskSpec( code="ODS_ASSISTANT_ACCOUNT", class_name="OdsAssistantAccountsTask", table_name="billiards_ods.assistant_accounts_master", endpoint="/PersonnelManagement/SearchAssistantInfo", data_path=("data",), list_key="assistantInfos", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="鍔╂暀璐﹀彿妗f ODS锛歋earchAssistantInfo -> assistantInfos 鍘熷 JSON", ), OdsTaskSpec( code="ODS_SETTLEMENT_RECORDS", class_name="OdsOrderSettleTask", table_name="billiards_ods.settlement_records", endpoint="/Site/GetAllOrderSettleList", data_path=("data",), list_key="settleList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="缁撹处璁板綍 ODS锛欸etAllOrderSettleList -> settleList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TABLE_USE", class_name="OdsTableUseTask", table_name="billiards_ods.table_fee_transactions", endpoint="/Site/GetSiteTableOrderDetails", data_path=("data",), list_key="siteTableUseDetailsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍙拌垂璁¤垂娴佹按 ODS锛欸etSiteTableOrderDetails -> siteTableUseDetailsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_ASSISTANT_LEDGER", class_name="OdsAssistantLedgerTask", table_name="billiards_ods.assistant_service_records", endpoint="/AssistantPerformance/GetOrderAssistantDetails", data_path=("data",), list_key="orderAssistantDetails", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="鍔╂暀鏈嶅姟娴佹按 ODS锛欸etOrderAssistantDetails -> orderAssistantDetails 鍘熷 JSON", ), OdsTaskSpec( code="ODS_ASSISTANT_ABOLISH", class_name="OdsAssistantAbolishTask", table_name="billiards_ods.assistant_cancellation_records", endpoint="/AssistantPerformance/GetAbolitionAssistant", data_path=("data",), list_key="abolitionAssistants", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="鍔╂暀搴熼櫎璁板綍 ODS锛欸etAbolitionAssistant -> abolitionAssistants 鍘熷 JSON", ), OdsTaskSpec( code="ODS_STORE_GOODS_SALES", class_name="OdsGoodsLedgerTask", table_name="billiards_ods.store_goods_sales_records", endpoint="/TenantGoods/GetGoodsSalesList", data_path=("data",), list_key="orderGoodsLedgers", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="闂ㄥ簵鍟嗗搧閿€鍞祦姘?ODS锛欸etGoodsSalesList -> orderGoodsLedgers 鍘熷 JSON", ), OdsTaskSpec( code="ODS_PAYMENT", class_name="OdsPaymentTask", table_name="billiards_ods.payment_transactions", endpoint="/PayLog/GetPayLogListPage", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鏀粯娴佹按 ODS锛欸etPayLogListPage 鍘熷 JSON", ), OdsTaskSpec( code="ODS_REFUND", class_name="OdsRefundTask", table_name="billiards_ods.refund_transactions", endpoint="/Order/GetRefundPayLogList", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="閫€娆炬祦姘?ODS锛欸etRefundPayLogList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_PLATFORM_COUPON", class_name="OdsCouponVerifyTask", table_name="billiards_ods.platform_coupon_redemption_records", endpoint="/Promotion/GetOfflineCouponConsumePageList", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="骞冲彴/鍥㈣喘鍒告牳閿€ ODS锛欸etOfflineCouponConsumePageList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_MEMBER", class_name="OdsMemberTask", table_name="billiards_ods.member_profiles", endpoint="/MemberProfile/GetTenantMemberList", data_path=("data",), list_key="tenantMemberInfos", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="浼氬憳妗f ODS锛欸etTenantMemberList -> tenantMemberInfos 鍘熷 JSON", ), OdsTaskSpec( code="ODS_MEMBER_CARD", class_name="OdsMemberCardTask", table_name="billiards_ods.member_stored_value_cards", endpoint="/MemberProfile/GetTenantMemberCardList", data_path=("data",), list_key="tenantMemberCards", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="浼氬憳鍌ㄥ€煎崱 ODS锛欸etTenantMemberCardList -> tenantMemberCards 鍘熷 JSON", ), OdsTaskSpec( code="ODS_MEMBER_BALANCE", class_name="OdsMemberBalanceTask", table_name="billiards_ods.member_balance_changes", endpoint="/MemberProfile/GetMemberCardBalanceChange", data_path=("data",), list_key="tenantMemberCardLogs", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="浼氬憳浣欓鍙樺姩 ODS锛欸etMemberCardBalanceChange -> tenantMemberCardLogs 鍘熷 JSON", ), OdsTaskSpec( code="ODS_RECHARGE_SETTLE", class_name="OdsRechargeSettleTask", table_name="billiards_ods.recharge_settlements", endpoint="/Site/GetRechargeSettleList", data_path=("data",), list_key="settleList", pk_columns=(_int_col("recharge_order_id", "settleList.id", "id", required=True),), extra_columns=( _int_col("tenant_id", "settleList.tenantId", "tenantId"), _int_col("site_id", "settleList.siteId", "siteId", "siteProfile.id"), ColumnSpec("site_name_snapshot", sources=("siteProfile.shop_name", "settleList.siteName")), _int_col("member_id", "settleList.memberId", "memberId"), ColumnSpec("member_name_snapshot", sources=("settleList.memberName", "memberName")), ColumnSpec("member_phone_snapshot", sources=("settleList.memberPhone", "memberPhone")), _int_col("tenant_member_card_id", "settleList.tenantMemberCardId", "tenantMemberCardId"), ColumnSpec("member_card_type_name", sources=("settleList.memberCardTypeName", "memberCardTypeName")), _int_col("settle_relate_id", "settleList.settleRelateId", "settleRelateId"), _int_col("settle_type", "settleList.settleType", "settleType"), ColumnSpec("settle_name", sources=("settleList.settleName", "settleName")), _int_col("is_first", "settleList.isFirst", "isFirst"), _int_col("settle_status", "settleList.settleStatus", "settleStatus"), _decimal_col("pay_amount", "settleList.payAmount", "payAmount"), _decimal_col("refund_amount", "settleList.refundAmount", "refundAmount"), _decimal_col("point_amount", "settleList.pointAmount", "pointAmount"), _decimal_col("cash_amount", "settleList.cashAmount", "cashAmount"), _decimal_col("online_amount", "settleList.onlineAmount", "onlineAmount"), _decimal_col("balance_amount", "settleList.balanceAmount", "balanceAmount"), _decimal_col("card_amount", "settleList.cardAmount", "cardAmount"), _decimal_col("coupon_amount", "settleList.couponAmount", "couponAmount"), _decimal_col("recharge_card_amount", "settleList.rechargeCardAmount", "rechargeCardAmount"), _decimal_col("gift_card_amount", "settleList.giftCardAmount", "giftCardAmount"), _decimal_col("prepay_money", "settleList.prepayMoney", "prepayMoney"), _decimal_col("consume_money", "settleList.consumeMoney", "consumeMoney"), _decimal_col("goods_money", "settleList.goodsMoney", "goodsMoney"), _decimal_col("real_goods_money", "settleList.realGoodsMoney", "realGoodsMoney"), _decimal_col("table_charge_money", "settleList.tableChargeMoney", "tableChargeMoney"), _decimal_col("service_money", "settleList.serviceMoney", "serviceMoney"), _decimal_col("activity_discount", "settleList.activityDiscount", "activityDiscount"), _decimal_col("all_coupon_discount", "settleList.allCouponDiscount", "allCouponDiscount"), _decimal_col("goods_promotion_money", "settleList.goodsPromotionMoney", "goodsPromotionMoney"), _decimal_col("assistant_promotion_money", "settleList.assistantPromotionMoney", "assistantPromotionMoney"), _decimal_col("assistant_pd_money", "settleList.assistantPdMoney", "assistantPdMoney"), _decimal_col("assistant_cx_money", "settleList.assistantCxMoney", "assistantCxMoney"), _decimal_col("assistant_manual_discount", "settleList.assistantManualDiscount", "assistantManualDiscount"), _decimal_col("coupon_sale_amount", "settleList.couponSaleAmount", "couponSaleAmount"), _decimal_col("member_discount_amount", "settleList.memberDiscountAmount", "memberDiscountAmount"), _decimal_col("point_discount_price", "settleList.pointDiscountPrice", "pointDiscountPrice"), _decimal_col("point_discount_cost", "settleList.pointDiscountCost", "pointDiscountCost"), _decimal_col("adjust_amount", "settleList.adjustAmount", "adjustAmount"), _decimal_col("rounding_amount", "settleList.roundingAmount", "roundingAmount"), _int_col("payment_method", "settleList.paymentMethod", "paymentMethod"), _bool_col("can_be_revoked", "settleList.canBeRevoked", "canBeRevoked"), _bool_col("is_bind_member", "settleList.isBindMember", "isBindMember"), _bool_col("is_activity", "settleList.isActivity", "isActivity"), _bool_col("is_use_coupon", "settleList.isUseCoupon", "isUseCoupon"), _bool_col("is_use_discount", "settleList.isUseDiscount", "isUseDiscount"), _int_col("operator_id", "settleList.operatorId", "operatorId"), ColumnSpec("operator_name_snapshot", sources=("settleList.operatorName", "operatorName")), _int_col("salesman_user_id", "settleList.salesManUserId", "salesmanUserId", "salesManUserId"), ColumnSpec("salesman_name", sources=("settleList.salesManName", "salesmanName", "settleList.salesmanName")), ColumnSpec("order_remark", sources=("settleList.orderRemark", "orderRemark")), _int_col("table_id", "settleList.tableId", "tableId"), _int_col("serial_number", "settleList.serialNumber", "serialNumber"), _int_col("revoke_order_id", "settleList.revokeOrderId", "revokeOrderId"), ColumnSpec("revoke_order_name", sources=("settleList.revokeOrderName", "revokeOrderName")), ColumnSpec("revoke_time", sources=("settleList.revokeTime", "revokeTime")), ColumnSpec("create_time", sources=("settleList.createTime", "createTime")), ColumnSpec("pay_time", sources=("settleList.payTime", "payTime")), ColumnSpec("site_profile", sources=("siteProfile",)), ), include_site_column=False, include_source_endpoint=True, include_page_no=False, include_page_size=False, include_fetched_at=True, include_record_index=False, conflict_columns_override=None, requires_window=False, description="?????? ODS?GetRechargeSettleList -> data.settleList ????", ), OdsTaskSpec( code="ODS_GROUP_PACKAGE", class_name="OdsPackageTask", table_name="billiards_ods.group_buy_packages", endpoint="/PackageCoupon/QueryPackageCouponList", data_path=("data",), list_key="packageCouponList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍥㈣喘濂楅瀹氫箟 ODS锛歈ueryPackageCouponList -> packageCouponList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_GROUP_BUY_REDEMPTION", class_name="OdsGroupBuyRedemptionTask", table_name="billiards_ods.group_buy_redemption_records", endpoint="/Site/GetSiteTableUseDetails", data_path=("data",), list_key="siteTableUseDetailsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍥㈣喘濂楅鏍搁攢 ODS锛欸etSiteTableUseDetails -> siteTableUseDetailsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_INVENTORY_STOCK", class_name="OdsInventoryStockTask", table_name="billiards_ods.goods_stock_summary", endpoint="/TenantGoods/GetGoodsStockReport", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="搴撳瓨姹囨€?ODS锛欸etGoodsStockReport 鍘熷 JSON", ), OdsTaskSpec( code="ODS_INVENTORY_CHANGE", class_name="OdsInventoryChangeTask", table_name="billiards_ods.goods_stock_movements", endpoint="/GoodsStockManage/QueryGoodsOutboundReceipt", data_path=("data",), list_key="queryDeliveryRecordsList", pk_columns=(_int_col("sitegoodsstockid", "siteGoodsStockId", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="搴撳瓨鍙樺寲璁板綍 ODS锛歈ueryGoodsOutboundReceipt -> queryDeliveryRecordsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TABLES", class_name="OdsTablesTask", table_name="billiards_ods.site_tables_master", endpoint="/Table/GetSiteTables", data_path=("data",), list_key="siteTables", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍙版缁磋〃 ODS锛欸etSiteTables -> siteTables 鍘熷 JSON", ), OdsTaskSpec( code="ODS_GOODS_CATEGORY", class_name="OdsGoodsCategoryTask", table_name="billiards_ods.stock_goods_category_tree", endpoint="/TenantGoodsCategory/QueryPrimarySecondaryCategory", data_path=("data",), list_key="goodsCategoryList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="搴撳瓨鍟嗗搧鍒嗙被鏍?ODS锛歈ueryPrimarySecondaryCategory -> goodsCategoryList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_STORE_GOODS", class_name="OdsStoreGoodsTask", table_name="billiards_ods.store_goods_master", endpoint="/TenantGoods/GetGoodsInventoryList", data_path=("data",), list_key="orderGoodsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="闂ㄥ簵鍟嗗搧妗f ODS锛欸etGoodsInventoryList -> orderGoodsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TABLE_FEE_DISCOUNT", class_name="OdsTableDiscountTask", table_name="billiards_ods.table_fee_discount_records", endpoint="/Site/GetTaiFeeAdjustList", data_path=("data",), list_key="taiFeeAdjustInfos", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍙拌垂鎶樻墸/璋冭处 ODS锛欸etTaiFeeAdjustList -> taiFeeAdjustInfos 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TENANT_GOODS", class_name="OdsTenantGoodsTask", table_name="billiards_ods.tenant_goods_master", endpoint="/TenantGoods/QueryTenantGoods", data_path=("data",), list_key="tenantGoodsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="绉熸埛鍟嗗搧妗f ODS锛歈ueryTenantGoods -> tenantGoodsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_SETTLEMENT_TICKET", class_name="OdsSettlementTicketTask", table_name="billiards_ods.settlement_ticket_details", endpoint="/Order/GetOrderSettleTicketNew", data_path=(), list_key=None, pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=True, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, include_site_id=False, description="缁撹处灏忕エ璇︽儏 ODS锛欸etOrderSettleTicketNew 鍘熷 JSON", ), ) def _get_spec(code: str) -> OdsTaskSpec: for spec in ODS_TASK_SPECS: if spec.code == code: return spec raise KeyError(f"Spec not found for code {code}") _SETTLEMENT_TICKET_SPEC = _get_spec("ODS_SETTLEMENT_TICKET") class OdsSettlementTicketTask(BaseOdsTask): """Special handling: fetch ticket details per payment relate_id/orderSettleId.""" SPEC = _SETTLEMENT_TICKET_SPEC def extract(self, context) -> dict: """Fetch ticket payloads only (used by fetch-only pipeline).""" existing_ids = self._fetch_existing_ticket_ids() candidates = self._collect_settlement_ids( context.store_id or 0, existing_ids, context.window_start, context.window_end ) candidates = [cid for cid in candidates if cid and cid not in existing_ids] payloads, skipped = self._fetch_ticket_payloads(candidates) return {"records": payloads, "skipped": skipped, "fetched": len(candidates)} def execute(self, cursor_data: dict | None = None) -> dict: spec = self.SPEC context = self._build_context(cursor_data) store_id = TypeParser.parse_int(self.config.get("app.store_id")) or 0 counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0} loader = GenericODSLoader( self.db, spec.table_name, self._resolve_columns(spec), list(spec.conflict_columns_override or ("source_file", "record_index")), ) source_file = self._resolve_source_file_hint(spec) try: existing_ids = self._fetch_existing_ticket_ids() candidates = self._collect_settlement_ids( store_id, existing_ids, context.window_start, context.window_end ) candidates = [cid for cid in candidates if cid and cid not in existing_ids] counts["fetched"] = len(candidates) if not candidates: self.logger.info( "%s: 绐楀彛[%s ~ %s] 鏈彂鐜伴渶瑕佹姄鍙栫殑灏忕エ", spec.code, context.window_start, context.window_end, ) return self._build_result("SUCCESS", counts) payloads, skipped = self._fetch_ticket_payloads(candidates) counts["skipped"] += skipped rows: list[dict] = [] for idx, payload in enumerate(payloads): row = self._build_row( spec=spec, store_id=store_id, record=payload, page_no=None, page_size_value=None, source_file=source_file, record_index=idx if spec.include_record_index else None, ) if row is None: counts["skipped"] += 1 continue rows.append(row) inserted, updated, _ = loader.upsert_rows(rows) counts["inserted"] += inserted counts["updated"] += updated self.db.commit() self.logger.info( "%s: 灏忕エ鎶撳彇瀹屾垚锛屽€欓€?%s 鎻掑叆=%s 鏇存柊=%s 璺宠繃=%s", spec.code, len(candidates), inserted, updated, counts["skipped"], ) return self._build_result("SUCCESS", counts) except Exception: counts["errors"] += 1 self.db.rollback() self.logger.error("%s: 灏忕エ鎶撳彇澶辫触", spec.code, exc_info=True) raise # ------------------------------------------------------------------ helpers def _fetch_existing_ticket_ids(self) -> set[int]: sql = """ SELECT DISTINCT CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$' THEN (payload ->> 'orderSettleId')::bigint END AS order_settle_id FROM billiards_ods.settlement_ticket_details """ try: rows = self.db.query(sql) except Exception: self.logger.warning("鏌ヨ宸叉湁灏忕エ澶辫触锛屾寜绌洪泦澶勭悊", exc_info=True) return set() return { TypeParser.parse_int(row.get("order_settle_id")) for row in rows if row.get("order_settle_id") is not None } def _collect_settlement_ids( self, store_id: int, existing_ids: set[int], window_start, window_end ) -> list[int]: ids = self._fetch_from_payment_table(store_id) if not ids: ids = self._fetch_from_payment_api(store_id, window_start, window_end) return sorted(i for i in ids if i is not None and i not in existing_ids) def _fetch_from_payment_table(self, store_id: int) -> set[int]: sql = """ SELECT DISTINCT COALESCE( CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$' THEN (payload ->> 'orderSettleId')::bigint END, CASE WHEN (payload ->> 'relateId') ~ '^[0-9]+$' THEN (payload ->> 'relateId')::bigint END ) AS order_settle_id FROM billiards_ods.payment_transactions WHERE (payload ->> 'orderSettleId') ~ '^[0-9]+$' OR (payload ->> 'relateId') ~ '^[0-9]+$' """ params = None if store_id: sql += " AND COALESCE((payload ->> 'siteId')::bigint, %s) = %s" params = (store_id, store_id) try: rows = self.db.query(sql, params) except Exception: self.logger.warning("璇诲彇鏀粯娴佹按浠ヨ幏鍙栫粨绠楀崟ID澶辫触锛屽皢灏濊瘯璋冪敤鏀粯鎺ュ彛鍥為€€", exc_info=True) return set() return { TypeParser.parse_int(row.get("order_settle_id")) for row in rows if row.get("order_settle_id") is not None } def _fetch_from_payment_api(self, store_id: int, window_start, window_end) -> set[int]: params = self._merge_common_params( { "siteId": store_id, "StartPayTime": TypeParser.format_timestamp(window_start, self.tz), "EndPayTime": TypeParser.format_timestamp(window_end, self.tz), } ) candidate_ids: set[int] = set() try: for _, records, _, _ in self.api.iter_paginated( endpoint="/PayLog/GetPayLogListPage", params=params, page_size=self.config.get("api.page_size", 200), data_path=("data",), ): for rec in records: relate_id = TypeParser.parse_int( (rec or {}).get("relateId") or (rec or {}).get("orderSettleId") or (rec or {}).get("order_settle_id") ) if relate_id: candidate_ids.add(relate_id) except Exception: self.logger.warning("璋冪敤鏀粯鎺ュ彛鑾峰彇缁撶畻鍗旾D澶辫触锛屽綋鍓嶆壒娆″皢璺宠繃鍥為€€鏉ユ簮", exc_info=True) return candidate_ids def _fetch_ticket_payload(self, order_settle_id: int): payload = None try: for _, _, _, response in self.api.iter_paginated( endpoint=self.SPEC.endpoint, params={"orderSettleId": order_settle_id}, page_size=None, data_path=self.SPEC.data_path, list_key=self.SPEC.list_key, ): payload = response except Exception: self.logger.warning( "璋冪敤灏忕エ鎺ュ彛澶辫触 orderSettleId=%s", order_settle_id, exc_info=True ) if isinstance(payload, dict) and isinstance(payload.get("data"), list) and len(payload["data"]) == 1: # 鏈湴妗?鍥炴斁鍙兘鎶婂搷搴斿寘瑁呮垚鍗曞厓绱?list锛岃繖閲屽睍寮€浠ヨ创杩戠湡瀹炵粨鏋? payload = payload["data"][0] return payload def _fetch_ticket_payloads(self, candidates: list[int]) -> tuple[list, int]: """Fetch ticket payloads for a set of orderSettleIds; returns (payloads, skipped).""" payloads: list = [] skipped = 0 for order_settle_id in candidates: payload = self._fetch_ticket_payload(order_settle_id) if payload: payloads.append(payload) else: skipped += 1 return payloads, skipped def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]: attrs = { "SPEC": spec, "__doc__": spec.description or f"ODS ingestion task {spec.code}", "__module__": __name__, } return type(spec.class_name, (BaseOdsTask,), attrs) ENABLED_ODS_CODES = { "ODS_ASSISTANT_ACCOUNT", "ODS_ASSISTANT_LEDGER", "ODS_ASSISTANT_ABOLISH", "ODS_INVENTORY_CHANGE", "ODS_INVENTORY_STOCK", "ODS_GROUP_PACKAGE", "ODS_GROUP_BUY_REDEMPTION", "ODS_MEMBER", "ODS_MEMBER_BALANCE", "ODS_MEMBER_CARD", "ODS_PAYMENT", "ODS_REFUND", "ODS_PLATFORM_COUPON", "ODS_RECHARGE_SETTLE", "ODS_TABLE_USE", "ODS_TABLES", "ODS_GOODS_CATEGORY", "ODS_STORE_GOODS", "ODS_TABLE_FEE_DISCOUNT", "ODS_STORE_GOODS_SALES", "ODS_TENANT_GOODS", "ODS_SETTLEMENT_TICKET", "ODS_SETTLEMENT_RECORDS", } ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = { spec.code: _build_task_class(spec) for spec in ODS_TASK_SPECS if spec.code in ENABLED_ODS_CODES } # Override with specialized settlement ticket implementation ODS_TASK_CLASSES["ODS_SETTLEMENT_TICKET"] = OdsSettlementTicketTask __all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask", "ENABLED_ODS_CODES"]