# -*- coding: utf-8 -*- """ODS ingestion tasks.""" from __future__ import annotations import json from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type from psycopg2.extras import Json, execute_values from models.parsers import TypeParser from .base_task import BaseTask ColumnTransform = Callable[[Any], Any] @dataclass(frozen=True) class ColumnSpec: """Mapping between DB column and source JSON field.""" column: str sources: Tuple[str, ...] = () required: bool = False default: Any = None transform: ColumnTransform | None = None @dataclass(frozen=True) class OdsTaskSpec: """Definition of a single ODS ingestion task.""" code: str class_name: str table_name: str endpoint: str data_path: Tuple[str, ...] = ("data",) list_key: str | None = None pk_columns: Tuple[ColumnSpec, ...] = () extra_columns: Tuple[ColumnSpec, ...] = () include_page_size: bool = False include_page_no: bool = False include_source_file: bool = True include_source_endpoint: bool = True include_record_index: bool = False include_site_column: bool = True include_fetched_at: bool = True requires_window: bool = True time_fields: Tuple[str, str] | None = ("startTime", "endTime") include_site_id: bool = True description: str = "" extra_params: Dict[str, Any] = field(default_factory=dict) conflict_columns_override: Tuple[str, ...] | None = None class BaseOdsTask(BaseTask): """Shared functionality for ODS ingestion tasks.""" SPEC: OdsTaskSpec def get_task_code(self) -> str: return self.SPEC.code def execute(self, cursor_data: dict | None = None) -> dict: spec = self.SPEC self.logger.info("寮€濮嬫墽琛?%s (ODS)", spec.code) window_start, window_end, window_minutes = self._resolve_window(cursor_data) store_id = TypeParser.parse_int(self.config.get("app.store_id")) if not store_id: raise ValueError("app.store_id 鏈厤缃紝鏃犳硶鎵ц ODS 浠诲姟") page_size = self.config.get("api.page_size", 200) params = self._build_params( spec, store_id, window_start=window_start, window_end=window_end, ) counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0} source_file = self._resolve_source_file_hint(spec) try: for _, page_records, _, response_payload in self.api.iter_paginated( endpoint=spec.endpoint, params=params, page_size=page_size, data_path=spec.data_path, list_key=spec.list_key, ): inserted, skipped = self._insert_records_schema_aware( table=spec.table_name, records=page_records, response_payload=response_payload, source_file=source_file, source_endpoint=spec.endpoint if spec.include_source_endpoint else None, ) counts["fetched"] += len(page_records) counts["inserted"] += inserted counts["skipped"] += skipped self.db.commit() self.logger.info("%s ODS 浠诲姟瀹屾垚: %s", spec.code, counts) allow_empty_advance = bool(self.config.get("run.allow_empty_result_advance", False)) status = "SUCCESS" if counts["fetched"] == 0 and not allow_empty_advance: status = "PARTIAL" result = self._build_result(status, counts) result["window"] = { "start": window_start, "end": window_end, "minutes": window_minutes, } result["request_params"] = params return result except Exception: self.db.rollback() counts["errors"] += 1 self.logger.error("%s ODS 浠诲姟澶辫触", spec.code, exc_info=True) raise def _resolve_window(self, cursor_data: dict | None) -> tuple[datetime, datetime, int]: base_start, base_end, base_minutes = self._get_time_window(cursor_data) if self.config.get("run.force_window_override"): override_start = self.config.get("run.window_override.start") override_end = self.config.get("run.window_override.end") if override_start and override_end: return base_start, base_end, base_minutes # 以 ODS 表 MAX(fetched_at) 兜底:避免“窗口游标推进但未实际入库”导致漏数。 last_fetched = self._get_max_fetched_at(self.SPEC.table_name) if last_fetched: overlap_seconds = int(self.config.get("run.overlap_seconds", 120) or 120) cursor_end = cursor_data.get("last_end") if isinstance(cursor_data, dict) else None anchor = cursor_end or last_fetched # 如果 cursor_end 比真实入库时间(last_fetched)更靠后,说明游标被推进但表未跟上:改用 last_fetched 作为起点 if isinstance(cursor_end, datetime) and cursor_end.tzinfo is None: cursor_end = cursor_end.replace(tzinfo=self.tz) if isinstance(cursor_end, datetime) and cursor_end > last_fetched: anchor = last_fetched start = anchor - timedelta(seconds=max(0, overlap_seconds)) if start.tzinfo is None: start = start.replace(tzinfo=self.tz) else: start = start.astimezone(self.tz) end = datetime.now(self.tz) minutes = max(1, int((end - start).total_seconds() // 60)) return start, end, minutes return base_start, base_end, base_minutes def _get_max_fetched_at(self, table_name: str) -> datetime | None: try: rows = self.db.query(f"SELECT MAX(fetched_at) AS mx FROM {table_name}") except Exception: return None if not rows or not rows[0].get("mx"): return None mx = rows[0]["mx"] if not isinstance(mx, datetime): return None if mx.tzinfo is None: return mx.replace(tzinfo=self.tz) return mx.astimezone(self.tz) def _build_params( self, spec: OdsTaskSpec, store_id: int, *, window_start: datetime, window_end: datetime, ) -> dict: base: dict[str, Any] = {} if spec.include_site_id: # /TenantGoods/GetGoodsInventoryList 要求 siteId 为数组(标量会触发服务端异常,返回畸形状态行 HTTP/1.1 1400) if spec.endpoint == "/TenantGoods/GetGoodsInventoryList": base["siteId"] = [store_id] else: base["siteId"] = store_id if spec.requires_window and spec.time_fields: start_key, end_key = spec.time_fields base[start_key] = TypeParser.format_timestamp(window_start, self.tz) base[end_key] = TypeParser.format_timestamp(window_end, self.tz) params = self._merge_common_params(base) params.update(spec.extra_params) return params # ------------------------------------------------------------------ schema-aware ingest (ODS doc schema) def _get_table_columns(self, table: str) -> list[tuple[str, str, str]]: cache = getattr(self, "_table_columns_cache", {}) if table in cache: return cache[table] if "." in table: schema, name = table.split(".", 1) else: schema, name = "public", table sql = """ SELECT column_name, data_type, udt_name FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """ with self.db.conn.cursor() as cur: cur.execute(sql, (schema, name)) cols = [(r[0], (r[1] or "").lower(), (r[2] or "").lower()) for r in cur.fetchall()] cache[table] = cols self._table_columns_cache = cache return cols def _get_table_pk_columns(self, table: str) -> list[str]: cache = getattr(self, "_table_pk_cache", {}) if table in cache: return cache[table] if "." in table: schema, name = table.split(".", 1) else: schema, name = "public", table sql = """ SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_schema = %s AND tc.table_name = %s ORDER BY kcu.ordinal_position """ with self.db.conn.cursor() as cur: cur.execute(sql, (schema, name)) cols = [r[0] for r in cur.fetchall()] cache[table] = cols self._table_pk_cache = cache return cols def _insert_records_schema_aware( self, *, table: str, records: list, response_payload: dict | list | None, source_file: str | None, source_endpoint: str | None, ) -> tuple[int, int]: """ 按 DB 表结构动态写入 ODS(只插新数据:ON CONFLICT DO NOTHING)。 返回 (inserted, skipped)。 """ if not records: return 0, 0 cols_info = self._get_table_columns(table) if not cols_info: raise ValueError(f"Cannot resolve columns for table={table}") pk_cols = self._get_table_pk_columns(table) db_json_cols_lower = { c[0].lower() for c in cols_info if c[1] in ("json", "jsonb") or c[2] in ("json", "jsonb") } col_names = [c[0] for c in cols_info] quoted_cols = ", ".join(f'\"{c}\"' for c in col_names) sql = f"INSERT INTO {table} ({quoted_cols}) VALUES %s" if pk_cols: pk_clause = ", ".join(f'\"{c}\"' for c in pk_cols) sql += f" ON CONFLICT ({pk_clause}) DO NOTHING" now = datetime.now(self.tz) json_dump = lambda v: json.dumps(v, ensure_ascii=False) # noqa: E731 params: list[tuple] = [] skipped = 0 root_site_profile = None if isinstance(response_payload, dict): data_part = response_payload.get("data") if isinstance(data_part, dict): sp = data_part.get("siteProfile") or data_part.get("site_profile") if isinstance(sp, dict): root_site_profile = sp for rec in records: if not isinstance(rec, dict): skipped += 1 continue merged_rec = self._merge_record_layers(rec) if table in {"billiards_ods.recharge_settlements", "billiards_ods.settlement_records"}: site_profile = merged_rec.get("siteProfile") or merged_rec.get("site_profile") or root_site_profile if isinstance(site_profile, dict): # 避免写入 None 覆盖原本存在的 camelCase 字段(例如 tenantId/siteId/siteName) def _fill_missing(target_col: str, candidates: list[Any]): existing = self._get_value_case_insensitive(merged_rec, target_col) if existing not in (None, ""): return for cand in candidates: if cand in (None, "", 0): continue merged_rec[target_col] = cand return _fill_missing("tenantid", [site_profile.get("tenant_id"), site_profile.get("tenantId")]) _fill_missing("siteid", [site_profile.get("siteId"), site_profile.get("id")]) _fill_missing("sitename", [site_profile.get("shop_name"), site_profile.get("siteName")]) if pk_cols: missing_pk = False for pk in pk_cols: pk_val = self._get_value_case_insensitive(merged_rec, pk) if pk_val is None or pk_val == "": missing_pk = True break if missing_pk: skipped += 1 continue row_vals: list[Any] = [] for (col_name, data_type, _udt) in cols_info: col_lower = col_name.lower() if col_lower == "payload": row_vals.append(Json(rec, dumps=json_dump)) continue if col_lower == "source_file": row_vals.append(source_file) continue if col_lower == "source_endpoint": row_vals.append(source_endpoint) continue if col_lower == "fetched_at": row_vals.append(now) continue value = self._normalize_scalar(self._get_value_case_insensitive(merged_rec, col_name)) if col_lower in db_json_cols_lower: row_vals.append(Json(value, dumps=json_dump) if value is not None else None) continue row_vals.append(self._cast_value(value, data_type)) params.append(tuple(row_vals)) if not params: return 0, skipped inserted = 0 chunk_size = int(self.config.get("run.ods_execute_values_page_size", 200) or 200) chunk_size = max(1, min(chunk_size, 2000)) with self.db.conn.cursor() as cur: for i in range(0, len(params), chunk_size): chunk = params[i : i + chunk_size] execute_values(cur, sql, chunk, page_size=len(chunk)) if cur.rowcount is not None and cur.rowcount > 0: inserted += int(cur.rowcount) return inserted, skipped @staticmethod def _merge_record_layers(record: dict) -> dict: merged = record data_part = merged.get("data") while isinstance(data_part, dict): merged = {**data_part, **merged} data_part = data_part.get("data") settle_inner = merged.get("settleList") if isinstance(settle_inner, dict): merged = {**settle_inner, **merged} return merged @staticmethod def _get_value_case_insensitive(record: dict | None, col: str | None): if record is None or col is None: return None if col in record: return record.get(col) col_lower = col.lower() for k, v in record.items(): if isinstance(k, str) and k.lower() == col_lower: return v return None @staticmethod def _normalize_scalar(value): if value == "" or value == "{}" or value == "[]": return None return value @staticmethod def _cast_value(value, data_type: str): if value is None: return None dt = (data_type or "").lower() if dt in ("integer", "bigint", "smallint"): if isinstance(value, bool): return int(value) try: return int(value) except Exception: return None if dt in ("numeric", "double precision", "real", "decimal"): if isinstance(value, bool): return int(value) try: return float(value) except Exception: return None if dt.startswith("timestamp") or dt in ("date", "time", "interval"): return value if isinstance(value, (str, datetime)) else None return value def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None: resolver = getattr(self.api, "get_source_hint", None) if callable(resolver): return resolver(spec.endpoint) return None def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec: return ColumnSpec( column=name, sources=sources, required=required, transform=TypeParser.parse_int, ) def _decimal_col(name: str, *sources: str) -> ColumnSpec: """??????????????""" return ColumnSpec( column=name, sources=sources, transform=lambda v: TypeParser.parse_decimal(v, 2), ) def _bool_col(name: str, *sources: str) -> ColumnSpec: """??????????????0/1?true/false ???""" def _to_bool(value): if value is None: return None if isinstance(value, bool): return value s = str(value).strip().lower() if s in {"1", "true", "t", "yes", "y"}: return True if s in {"0", "false", "f", "no", "n"}: return False return bool(value) return ColumnSpec(column=name, sources=sources, transform=_to_bool) ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = ( OdsTaskSpec( code="ODS_ASSISTANT_ACCOUNT", class_name="OdsAssistantAccountsTask", table_name="billiards_ods.assistant_accounts_master", endpoint="/PersonnelManagement/SearchAssistantInfo", data_path=("data",), list_key="assistantInfos", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="鍔╂暀璐﹀彿妗f ODS锛歋earchAssistantInfo -> assistantInfos 鍘熷 JSON", ), OdsTaskSpec( code="ODS_SETTLEMENT_RECORDS", class_name="OdsOrderSettleTask", table_name="billiards_ods.settlement_records", endpoint="/Site/GetAllOrderSettleList", data_path=("data",), list_key="settleList", time_fields=("rangeStartTime", "rangeEndTime"), pk_columns=(), include_site_column=False, include_source_endpoint=True, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=True, description="缁撹处璁板綍 ODS锛欸etAllOrderSettleList -> settleList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TABLE_USE", class_name="OdsTableUseTask", table_name="billiards_ods.table_fee_transactions", endpoint="/Site/GetSiteTableOrderDetails", data_path=("data",), list_key="siteTableUseDetailsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍙拌垂璁¤垂娴佹按 ODS锛欸etSiteTableOrderDetails -> siteTableUseDetailsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_ASSISTANT_LEDGER", class_name="OdsAssistantLedgerTask", table_name="billiards_ods.assistant_service_records", endpoint="/AssistantPerformance/GetOrderAssistantDetails", data_path=("data",), list_key="orderAssistantDetails", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="鍔╂暀鏈嶅姟娴佹按 ODS锛欸etOrderAssistantDetails -> orderAssistantDetails 鍘熷 JSON", ), OdsTaskSpec( code="ODS_ASSISTANT_ABOLISH", class_name="OdsAssistantAbolishTask", table_name="billiards_ods.assistant_cancellation_records", endpoint="/AssistantPerformance/GetAbolitionAssistant", data_path=("data",), list_key="abolitionAssistants", pk_columns=(_int_col("id", "id", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="鍔╂暀搴熼櫎璁板綍 ODS锛欸etAbolitionAssistant -> abolitionAssistants 鍘熷 JSON", ), OdsTaskSpec( code="ODS_STORE_GOODS_SALES", class_name="OdsGoodsLedgerTask", table_name="billiards_ods.store_goods_sales_records", endpoint="/TenantGoods/GetGoodsSalesList", data_path=("data",), list_key="orderGoodsLedgers", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="闂ㄥ簵鍟嗗搧閿€鍞祦姘?ODS锛欸etGoodsSalesList -> orderGoodsLedgers 鍘熷 JSON", ), OdsTaskSpec( code="ODS_PAYMENT", class_name="OdsPaymentTask", table_name="billiards_ods.payment_transactions", endpoint="/PayLog/GetPayLogListPage", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鏀粯娴佹按 ODS锛欸etPayLogListPage 鍘熷 JSON", ), OdsTaskSpec( code="ODS_REFUND", class_name="OdsRefundTask", table_name="billiards_ods.refund_transactions", endpoint="/Order/GetRefundPayLogList", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="閫€娆炬祦姘?ODS锛欸etRefundPayLogList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_PLATFORM_COUPON", class_name="OdsCouponVerifyTask", table_name="billiards_ods.platform_coupon_redemption_records", endpoint="/Promotion/GetOfflineCouponConsumePageList", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="骞冲彴/鍥㈣喘鍒告牳閿€ ODS锛欸etOfflineCouponConsumePageList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_MEMBER", class_name="OdsMemberTask", table_name="billiards_ods.member_profiles", endpoint="/MemberProfile/GetTenantMemberList", data_path=("data",), list_key="tenantMemberInfos", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="浼氬憳妗f ODS锛欸etTenantMemberList -> tenantMemberInfos 鍘熷 JSON", ), OdsTaskSpec( code="ODS_MEMBER_CARD", class_name="OdsMemberCardTask", table_name="billiards_ods.member_stored_value_cards", endpoint="/MemberProfile/GetTenantMemberCardList", data_path=("data",), list_key="tenantMemberCards", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="浼氬憳鍌ㄥ€煎崱 ODS锛欸etTenantMemberCardList -> tenantMemberCards 鍘熷 JSON", ), OdsTaskSpec( code="ODS_MEMBER_BALANCE", class_name="OdsMemberBalanceTask", table_name="billiards_ods.member_balance_changes", endpoint="/MemberProfile/GetMemberCardBalanceChange", data_path=("data",), list_key="tenantMemberCardLogs", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="浼氬憳浣欓鍙樺姩 ODS锛欸etMemberCardBalanceChange -> tenantMemberCardLogs 鍘熷 JSON", ), OdsTaskSpec( code="ODS_RECHARGE_SETTLE", class_name="OdsRechargeSettleTask", table_name="billiards_ods.recharge_settlements", endpoint="/Site/GetRechargeSettleList", data_path=("data",), list_key="settleList", time_fields=("rangeStartTime", "rangeEndTime"), pk_columns=(_int_col("recharge_order_id", "settleList.id", "id", required=True),), extra_columns=( _int_col("tenant_id", "settleList.tenantId", "tenantId"), _int_col("site_id", "settleList.siteId", "siteId", "siteProfile.id"), ColumnSpec("site_name_snapshot", sources=("siteProfile.shop_name", "settleList.siteName")), _int_col("member_id", "settleList.memberId", "memberId"), ColumnSpec("member_name_snapshot", sources=("settleList.memberName", "memberName")), ColumnSpec("member_phone_snapshot", sources=("settleList.memberPhone", "memberPhone")), _int_col("tenant_member_card_id", "settleList.tenantMemberCardId", "tenantMemberCardId"), ColumnSpec("member_card_type_name", sources=("settleList.memberCardTypeName", "memberCardTypeName")), _int_col("settle_relate_id", "settleList.settleRelateId", "settleRelateId"), _int_col("settle_type", "settleList.settleType", "settleType"), ColumnSpec("settle_name", sources=("settleList.settleName", "settleName")), _int_col("is_first", "settleList.isFirst", "isFirst"), _int_col("settle_status", "settleList.settleStatus", "settleStatus"), _decimal_col("pay_amount", "settleList.payAmount", "payAmount"), _decimal_col("refund_amount", "settleList.refundAmount", "refundAmount"), _decimal_col("point_amount", "settleList.pointAmount", "pointAmount"), _decimal_col("cash_amount", "settleList.cashAmount", "cashAmount"), _decimal_col("online_amount", "settleList.onlineAmount", "onlineAmount"), _decimal_col("balance_amount", "settleList.balanceAmount", "balanceAmount"), _decimal_col("card_amount", "settleList.cardAmount", "cardAmount"), _decimal_col("coupon_amount", "settleList.couponAmount", "couponAmount"), _decimal_col("recharge_card_amount", "settleList.rechargeCardAmount", "rechargeCardAmount"), _decimal_col("gift_card_amount", "settleList.giftCardAmount", "giftCardAmount"), _decimal_col("prepay_money", "settleList.prepayMoney", "prepayMoney"), _decimal_col("consume_money", "settleList.consumeMoney", "consumeMoney"), _decimal_col("goods_money", "settleList.goodsMoney", "goodsMoney"), _decimal_col("real_goods_money", "settleList.realGoodsMoney", "realGoodsMoney"), _decimal_col("table_charge_money", "settleList.tableChargeMoney", "tableChargeMoney"), _decimal_col("service_money", "settleList.serviceMoney", "serviceMoney"), _decimal_col("activity_discount", "settleList.activityDiscount", "activityDiscount"), _decimal_col("all_coupon_discount", "settleList.allCouponDiscount", "allCouponDiscount"), _decimal_col("goods_promotion_money", "settleList.goodsPromotionMoney", "goodsPromotionMoney"), _decimal_col("assistant_promotion_money", "settleList.assistantPromotionMoney", "assistantPromotionMoney"), _decimal_col("assistant_pd_money", "settleList.assistantPdMoney", "assistantPdMoney"), _decimal_col("assistant_cx_money", "settleList.assistantCxMoney", "assistantCxMoney"), _decimal_col("assistant_manual_discount", "settleList.assistantManualDiscount", "assistantManualDiscount"), _decimal_col("coupon_sale_amount", "settleList.couponSaleAmount", "couponSaleAmount"), _decimal_col("member_discount_amount", "settleList.memberDiscountAmount", "memberDiscountAmount"), _decimal_col("point_discount_price", "settleList.pointDiscountPrice", "pointDiscountPrice"), _decimal_col("point_discount_cost", "settleList.pointDiscountCost", "pointDiscountCost"), _decimal_col("adjust_amount", "settleList.adjustAmount", "adjustAmount"), _decimal_col("rounding_amount", "settleList.roundingAmount", "roundingAmount"), _int_col("payment_method", "settleList.paymentMethod", "paymentMethod"), _bool_col("can_be_revoked", "settleList.canBeRevoked", "canBeRevoked"), _bool_col("is_bind_member", "settleList.isBindMember", "isBindMember"), _bool_col("is_activity", "settleList.isActivity", "isActivity"), _bool_col("is_use_coupon", "settleList.isUseCoupon", "isUseCoupon"), _bool_col("is_use_discount", "settleList.isUseDiscount", "isUseDiscount"), _int_col("operator_id", "settleList.operatorId", "operatorId"), ColumnSpec("operator_name_snapshot", sources=("settleList.operatorName", "operatorName")), _int_col("salesman_user_id", "settleList.salesManUserId", "salesmanUserId", "salesManUserId"), ColumnSpec("salesman_name", sources=("settleList.salesManName", "salesmanName", "settleList.salesmanName")), ColumnSpec("order_remark", sources=("settleList.orderRemark", "orderRemark")), _int_col("table_id", "settleList.tableId", "tableId"), _int_col("serial_number", "settleList.serialNumber", "serialNumber"), _int_col("revoke_order_id", "settleList.revokeOrderId", "revokeOrderId"), ColumnSpec("revoke_order_name", sources=("settleList.revokeOrderName", "revokeOrderName")), ColumnSpec("revoke_time", sources=("settleList.revokeTime", "revokeTime")), ColumnSpec("create_time", sources=("settleList.createTime", "createTime")), ColumnSpec("pay_time", sources=("settleList.payTime", "payTime")), ColumnSpec("site_profile", sources=("siteProfile",)), ), include_site_column=False, include_source_endpoint=True, include_page_no=False, include_page_size=False, include_fetched_at=True, include_record_index=False, conflict_columns_override=None, requires_window=True, description="?????? ODS?GetRechargeSettleList -> data.settleList ????", ), OdsTaskSpec( code="ODS_GROUP_PACKAGE", class_name="OdsPackageTask", table_name="billiards_ods.group_buy_packages", endpoint="/PackageCoupon/QueryPackageCouponList", data_path=("data",), list_key="packageCouponList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍥㈣喘濂楅瀹氫箟 ODS锛歈ueryPackageCouponList -> packageCouponList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_GROUP_BUY_REDEMPTION", class_name="OdsGroupBuyRedemptionTask", table_name="billiards_ods.group_buy_redemption_records", endpoint="/Site/GetSiteTableUseDetails", data_path=("data",), list_key="siteTableUseDetailsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍥㈣喘濂楅鏍搁攢 ODS锛欸etSiteTableUseDetails -> siteTableUseDetailsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_INVENTORY_STOCK", class_name="OdsInventoryStockTask", table_name="billiards_ods.goods_stock_summary", endpoint="/TenantGoods/GetGoodsStockReport", data_path=("data",), pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="搴撳瓨姹囨€?ODS锛欸etGoodsStockReport 鍘熷 JSON", ), OdsTaskSpec( code="ODS_INVENTORY_CHANGE", class_name="OdsInventoryChangeTask", table_name="billiards_ods.goods_stock_movements", endpoint="/GoodsStockManage/QueryGoodsOutboundReceipt", data_path=("data",), list_key="queryDeliveryRecordsList", pk_columns=(_int_col("sitegoodsstockid", "siteGoodsStockId", required=True),), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), description="搴撳瓨鍙樺寲璁板綍 ODS锛歈ueryGoodsOutboundReceipt -> queryDeliveryRecordsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TABLES", class_name="OdsTablesTask", table_name="billiards_ods.site_tables_master", endpoint="/Table/GetSiteTables", data_path=("data",), list_key="siteTables", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍙版缁磋〃 ODS锛欸etSiteTables -> siteTables 鍘熷 JSON", ), OdsTaskSpec( code="ODS_GOODS_CATEGORY", class_name="OdsGoodsCategoryTask", table_name="billiards_ods.stock_goods_category_tree", endpoint="/TenantGoodsCategory/QueryPrimarySecondaryCategory", data_path=("data",), list_key="goodsCategoryList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="搴撳瓨鍟嗗搧鍒嗙被鏍?ODS锛歈ueryPrimarySecondaryCategory -> goodsCategoryList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_STORE_GOODS", class_name="OdsStoreGoodsTask", table_name="billiards_ods.store_goods_master", endpoint="/TenantGoods/GetGoodsInventoryList", data_path=("data",), list_key="orderGoodsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="闂ㄥ簵鍟嗗搧妗f ODS锛欸etGoodsInventoryList -> orderGoodsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TABLE_FEE_DISCOUNT", class_name="OdsTableDiscountTask", table_name="billiards_ods.table_fee_discount_records", endpoint="/Site/GetTaiFeeAdjustList", data_path=("data",), list_key="taiFeeAdjustInfos", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="鍙拌垂鎶樻墸/璋冭处 ODS锛欸etTaiFeeAdjustList -> taiFeeAdjustInfos 鍘熷 JSON", ), OdsTaskSpec( code="ODS_TENANT_GOODS", class_name="OdsTenantGoodsTask", table_name="billiards_ods.tenant_goods_master", endpoint="/TenantGoods/QueryTenantGoods", data_path=("data",), list_key="tenantGoodsList", pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=False, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, description="绉熸埛鍟嗗搧妗f ODS锛歈ueryTenantGoods -> tenantGoodsList 鍘熷 JSON", ), OdsTaskSpec( code="ODS_SETTLEMENT_TICKET", class_name="OdsSettlementTicketTask", table_name="billiards_ods.settlement_ticket_details", endpoint="/Order/GetOrderSettleTicketNew", data_path=(), list_key=None, pk_columns=(), include_site_column=False, include_source_endpoint=False, include_page_no=False, include_page_size=False, include_fetched_at=True, include_record_index=True, conflict_columns_override=("source_file", "record_index"), requires_window=False, include_site_id=False, description="缁撹处灏忕エ璇︽儏 ODS锛欸etOrderSettleTicketNew 鍘熷 JSON", ), ) def _get_spec(code: str) -> OdsTaskSpec: for spec in ODS_TASK_SPECS: if spec.code == code: return spec raise KeyError(f"Spec not found for code {code}") _SETTLEMENT_TICKET_SPEC = _get_spec("ODS_SETTLEMENT_TICKET") class OdsSettlementTicketTask(BaseOdsTask): """Special handling: fetch ticket details per payment relate_id/orderSettleId.""" SPEC = _SETTLEMENT_TICKET_SPEC def extract(self, context) -> dict: """Fetch ticket payloads only (used by fetch-only pipeline).""" existing_ids = self._fetch_existing_ticket_ids() candidates = self._collect_settlement_ids( context.store_id or 0, existing_ids, context.window_start, context.window_end ) candidates = [cid for cid in candidates if cid and cid not in existing_ids] payloads, skipped = self._fetch_ticket_payloads(candidates) return {"records": payloads, "skipped": skipped, "fetched": len(candidates)} def execute(self, cursor_data: dict | None = None) -> dict: spec = self.SPEC context = self._build_context(cursor_data) store_id = TypeParser.parse_int(self.config.get("app.store_id")) or 0 counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0} source_file = self._resolve_source_file_hint(spec) try: existing_ids = self._fetch_existing_ticket_ids() candidates = self._collect_settlement_ids( store_id, existing_ids, context.window_start, context.window_end ) candidates = [cid for cid in candidates if cid and cid not in existing_ids] counts["fetched"] = len(candidates) if not candidates: self.logger.info( "%s: 绐楀彛[%s ~ %s] 鏈彂鐜伴渶瑕佹姄鍙栫殑灏忕エ", spec.code, context.window_start, context.window_end, ) result = self._build_result("SUCCESS", counts) result["window"] = { "start": context.window_start, "end": context.window_end, "minutes": context.window_minutes, } result["request_params"] = {"candidates": 0} return result payloads, skipped = self._fetch_ticket_payloads(candidates) counts["skipped"] += skipped inserted, skipped2 = self._insert_records_schema_aware( table=spec.table_name, records=payloads, response_payload=None, source_file=source_file, source_endpoint=spec.endpoint, ) counts["inserted"] += inserted counts["skipped"] += skipped2 self.db.commit() self.logger.info( "%s: 灏忕エ鎶撳彇瀹屾垚锛屽€欓€?%s 鎻掑叆=%s 鏇存柊=%s 璺宠繃=%s", spec.code, len(candidates), inserted, 0, counts["skipped"], ) result = self._build_result("SUCCESS", counts) result["window"] = { "start": context.window_start, "end": context.window_end, "minutes": context.window_minutes, } result["request_params"] = {"candidates": len(candidates)} return result except Exception: counts["errors"] += 1 self.db.rollback() self.logger.error("%s: 灏忕エ鎶撳彇澶辫触", spec.code, exc_info=True) raise # ------------------------------------------------------------------ helpers def _fetch_existing_ticket_ids(self) -> set[int]: sql = """ SELECT DISTINCT CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$' THEN (payload ->> 'orderSettleId')::bigint END AS order_settle_id FROM billiards_ods.settlement_ticket_details """ try: rows = self.db.query(sql) except Exception: self.logger.warning("鏌ヨ宸叉湁灏忕エ澶辫触锛屾寜绌洪泦澶勭悊", exc_info=True) return set() return { TypeParser.parse_int(row.get("order_settle_id")) for row in rows if row.get("order_settle_id") is not None } def _collect_settlement_ids( self, store_id: int, existing_ids: set[int], window_start, window_end ) -> list[int]: ids = self._fetch_from_payment_table(store_id) if not ids: ids = self._fetch_from_payment_api(store_id, window_start, window_end) return sorted(i for i in ids if i is not None and i not in existing_ids) def _fetch_from_payment_table(self, store_id: int) -> set[int]: sql = """ SELECT DISTINCT COALESCE( CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$' THEN (payload ->> 'orderSettleId')::bigint END, CASE WHEN (payload ->> 'relateId') ~ '^[0-9]+$' THEN (payload ->> 'relateId')::bigint END ) AS order_settle_id FROM billiards_ods.payment_transactions WHERE (payload ->> 'orderSettleId') ~ '^[0-9]+$' OR (payload ->> 'relateId') ~ '^[0-9]+$' """ params = None if store_id: sql += " AND COALESCE((payload ->> 'siteId')::bigint, %s) = %s" params = (store_id, store_id) try: rows = self.db.query(sql, params) except Exception: self.logger.warning("璇诲彇鏀粯娴佹按浠ヨ幏鍙栫粨绠楀崟ID澶辫触锛屽皢灏濊瘯璋冪敤鏀粯鎺ュ彛鍥為€€", exc_info=True) return set() return { TypeParser.parse_int(row.get("order_settle_id")) for row in rows if row.get("order_settle_id") is not None } def _fetch_from_payment_api(self, store_id: int, window_start, window_end) -> set[int]: params = self._merge_common_params( { "siteId": store_id, "StartPayTime": TypeParser.format_timestamp(window_start, self.tz), "EndPayTime": TypeParser.format_timestamp(window_end, self.tz), } ) candidate_ids: set[int] = set() try: for _, records, _, _ in self.api.iter_paginated( endpoint="/PayLog/GetPayLogListPage", params=params, page_size=self.config.get("api.page_size", 200), data_path=("data",), ): for rec in records: relate_id = TypeParser.parse_int( (rec or {}).get("relateId") or (rec or {}).get("orderSettleId") or (rec or {}).get("order_settle_id") ) if relate_id: candidate_ids.add(relate_id) except Exception: self.logger.warning("璋冪敤鏀粯鎺ュ彛鑾峰彇缁撶畻鍗旾D澶辫触锛屽綋鍓嶆壒娆″皢璺宠繃鍥為€€鏉ユ簮", exc_info=True) return candidate_ids def _fetch_ticket_payload(self, order_settle_id: int): payload = None try: for _, _, _, response in self.api.iter_paginated( endpoint=self.SPEC.endpoint, params={"orderSettleId": order_settle_id}, page_size=None, data_path=self.SPEC.data_path, list_key=self.SPEC.list_key, ): payload = response except Exception: self.logger.warning( "璋冪敤灏忕エ鎺ュ彛澶辫触 orderSettleId=%s", order_settle_id, exc_info=True ) if isinstance(payload, dict) and isinstance(payload.get("data"), list) and len(payload["data"]) == 1: # 鏈湴妗?鍥炴斁鍙兘鎶婂搷搴斿寘瑁呮垚鍗曞厓绱?list锛岃繖閲屽睍寮€浠ヨ创杩戠湡瀹炵粨鏋? payload = payload["data"][0] return payload def _fetch_ticket_payloads(self, candidates: list[int]) -> tuple[list, int]: """Fetch ticket payloads for a set of orderSettleIds; returns (payloads, skipped).""" payloads: list = [] skipped = 0 for order_settle_id in candidates: payload = self._fetch_ticket_payload(order_settle_id) if payload: payloads.append(payload) else: skipped += 1 return payloads, skipped def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]: attrs = { "SPEC": spec, "__doc__": spec.description or f"ODS ingestion task {spec.code}", "__module__": __name__, } return type(spec.class_name, (BaseOdsTask,), attrs) ENABLED_ODS_CODES = { "ODS_ASSISTANT_ACCOUNT", "ODS_ASSISTANT_LEDGER", "ODS_ASSISTANT_ABOLISH", "ODS_INVENTORY_CHANGE", "ODS_INVENTORY_STOCK", "ODS_GROUP_PACKAGE", "ODS_GROUP_BUY_REDEMPTION", "ODS_MEMBER", "ODS_MEMBER_BALANCE", "ODS_MEMBER_CARD", "ODS_PAYMENT", "ODS_REFUND", "ODS_PLATFORM_COUPON", "ODS_RECHARGE_SETTLE", "ODS_TABLE_USE", "ODS_TABLES", "ODS_GOODS_CATEGORY", "ODS_STORE_GOODS", "ODS_TABLE_FEE_DISCOUNT", "ODS_STORE_GOODS_SALES", "ODS_TENANT_GOODS", "ODS_SETTLEMENT_TICKET", "ODS_SETTLEMENT_RECORDS", } ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = { spec.code: _build_task_class(spec) for spec in ODS_TASK_SPECS if spec.code in ENABLED_ODS_CODES } # Override with specialized settlement ticket implementation ODS_TASK_CLASSES["ODS_SETTLEMENT_TICKET"] = OdsSettlementTicketTask __all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask", "ENABLED_ODS_CODES"]