Files
ZQYY.FQ-ETL/tasks/ods/ods_tasks.py

1770 lines
72 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""ODS ingestion tasks."""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type
from psycopg2.extras import Json, execute_values
from models.parsers import TypeParser
from tasks.base_task import BaseTask
from utils.windowing import build_window_segments, calc_window_minutes, calc_window_days, format_window_days
ColumnTransform = Callable[[Any], Any]
@dataclass(frozen=True)
class ColumnSpec:
"""Mapping between DB column and source JSON field."""
column: str
sources: Tuple[str, ...] = ()
required: bool = False
default: Any = None
transform: ColumnTransform | None = None
@dataclass(frozen=True)
class OdsTaskSpec:
"""Definition of a single ODS ingestion task."""
code: str
class_name: str
table_name: str
endpoint: str
data_path: Tuple[str, ...] = ("data",)
list_key: str | None = None
pk_columns: Tuple[ColumnSpec, ...] = ()
extra_columns: Tuple[ColumnSpec, ...] = ()
include_page_size: bool = False
include_page_no: bool = False
include_source_file: bool = True
include_source_endpoint: bool = True
include_record_index: bool = False
include_site_column: bool = True
include_fetched_at: bool = True
requires_window: bool = True
time_fields: Tuple[str, str] | None = ("startTime", "endTime")
include_site_id: bool = True
snapshot_window_columns: Tuple[str, ...] | None = None
snapshot_full_table: bool = False
description: str = ""
extra_params: Dict[str, Any] = field(default_factory=dict)
conflict_columns_override: Tuple[str, ...] | None = None
class BaseOdsTask(BaseTask):
"""Shared functionality for ODS ingestion tasks."""
SPEC: OdsTaskSpec
def get_task_code(self) -> str:
return self.SPEC.code
def execute(self, cursor_data: dict | None = None) -> dict:
spec = self.SPEC
self.logger.info("开始执行%s (ODS)", spec.code)
window_start, window_end, window_minutes = self._resolve_window(cursor_data)
segments = build_window_segments(
self.config,
window_start,
window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(window_start, window_end)]
total_segments = len(segments)
total_days = sum(calc_window_days(s, e) for s, e in segments) if segments else 0.0
processed_days = 0.0
if total_segments > 1:
self.logger.info(
"%s: 窗口拆分为 %s 段(共 %s 天)",
spec.code,
total_segments,
format_window_days(total_days),
)
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
if not store_id:
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
page_size = self.config.get("api.page_size", 200)
total_counts = {
"fetched": 0,
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": 0,
"deleted": 0,
}
segment_results: list[dict] = []
params_list: list[dict] = []
source_file = self._resolve_source_file_hint(spec)
snapshot_missing_delete = bool(self.config.get("run.snapshot_missing_delete", False))
snapshot_allow_empty = bool(self.config.get("run.snapshot_allow_empty_delete", False))
snapshot_full_table = bool(spec.snapshot_full_table)
snapshot_window_columns = self._resolve_snapshot_window_columns(
spec.table_name, spec.snapshot_window_columns
)
business_pk_cols = [
c for c in self._get_table_pk_columns(spec.table_name) if str(c).lower() != "content_hash"
]
has_is_delete = self._table_has_column(spec.table_name, "is_delete")
try:
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
params = self._build_params(
spec,
store_id,
window_start=seg_start,
window_end=seg_end,
)
params_list.append(params)
segment_counts = {
"fetched": 0,
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": 0,
"deleted": 0,
}
segment_keys: set[tuple] = set()
self.logger.info(
"%s: 开始执行(%s/%s),窗口[%s ~ %s]",
spec.code,
idx,
total_segments,
seg_start,
seg_end,
)
for _, page_records, _, response_payload in self.api.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
if (
snapshot_missing_delete
and has_is_delete
and business_pk_cols
and (snapshot_full_table or snapshot_window_columns)
):
segment_keys.update(self._collect_business_keys(page_records, business_pk_cols))
inserted, updated, skipped = self._insert_records_schema_aware(
table=spec.table_name,
records=page_records,
response_payload=response_payload,
source_file=source_file,
source_endpoint=spec.endpoint if spec.include_source_endpoint else None,
)
segment_counts["fetched"] += len(page_records)
segment_counts["inserted"] += inserted
segment_counts["updated"] += updated
segment_counts["skipped"] += skipped
if (
snapshot_missing_delete
and has_is_delete
and business_pk_cols
and (snapshot_full_table or snapshot_window_columns)
):
if segment_counts["fetched"] > 0 or snapshot_allow_empty:
deleted = self._mark_missing_as_deleted(
table=spec.table_name,
business_pk_cols=business_pk_cols,
window_columns=snapshot_window_columns,
window_start=seg_start,
window_end=seg_end,
key_values=segment_keys,
allow_empty=snapshot_allow_empty,
full_table=snapshot_full_table,
)
if deleted:
segment_counts["updated"] += deleted
segment_counts["deleted"] += deleted
self.db.commit()
self._accumulate_counts(total_counts, segment_counts)
segment_days = calc_window_days(seg_start, seg_end)
processed_days += segment_days
if total_segments > 1:
self.logger.info(
"%s: 完成(%s/%s),已处理 %s/%s",
spec.code,
idx,
total_segments,
format_window_days(processed_days),
format_window_days(total_days),
)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": seg_start,
"end": seg_end,
"minutes": calc_window_minutes(seg_start, seg_end),
},
"counts": segment_counts,
}
)
self.logger.info("%s ODS 任务完成: %s", spec.code, total_counts)
allow_empty_advance = bool(self.config.get("run.allow_empty_result_advance", False))
status = "SUCCESS"
if total_counts["fetched"] == 0 and not allow_empty_advance:
status = "PARTIAL"
result = self._build_result(status, total_counts)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result["window"] = {
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
if total_segments > 1:
result["segments"] = segment_results
if len(params_list) == 1:
result["request_params"] = params_list[0]
else:
result["request_params"] = params_list
return result
except Exception:
self.db.rollback()
total_counts["errors"] += 1
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
raise
def _resolve_window(self, cursor_data: dict | None) -> tuple[datetime, datetime, int]:
base_start, base_end, base_minutes = self._get_time_window(cursor_data)
# 如果用户显式指定了窗口(window_override.start/end),则直接使用,不走 MAX(fetched_at) 兜底
override_start = self.config.get("run.window_override.start")
override_end = self.config.get("run.window_override.end")
if override_start and override_end:
# 用户明确指定了窗口,尊重用户选择
return base_start, base_end, base_minutes
# 以 ODS 表 MAX(fetched_at) 兜底:避免“窗口游标推进但未实际入库”导致漏数。
last_fetched = self._get_max_fetched_at(self.SPEC.table_name)
if last_fetched:
overlap_seconds = int(self.config.get("run.overlap_seconds", 600) or 600)
cursor_end = cursor_data.get("last_end") if isinstance(cursor_data, dict) else None
anchor = cursor_end or last_fetched
# 如果 cursor_end 比真实入库时间(last_fetched)更靠后,说明游标被推进但表未跟上:改用 last_fetched 作为起点
if isinstance(cursor_end, datetime) and cursor_end.tzinfo is None:
cursor_end = cursor_end.replace(tzinfo=self.tz)
if isinstance(cursor_end, datetime) and cursor_end > last_fetched:
anchor = last_fetched
start = anchor - timedelta(seconds=max(0, overlap_seconds))
if start.tzinfo is None:
start = start.replace(tzinfo=self.tz)
else:
start = start.astimezone(self.tz)
end = datetime.now(self.tz)
minutes = max(1, int((end - start).total_seconds() // 60))
return start, end, minutes
return base_start, base_end, base_minutes
def _get_max_fetched_at(self, table_name: str) -> datetime | None:
try:
rows = self.db.query(f"SELECT MAX(fetched_at) AS mx FROM {table_name}")
except Exception:
return None
if not rows or not rows[0].get("mx"):
return None
mx = rows[0]["mx"]
if not isinstance(mx, datetime):
return None
if mx.tzinfo is None:
return mx.replace(tzinfo=self.tz)
return mx.astimezone(self.tz)
def _build_params(
self,
spec: OdsTaskSpec,
store_id: int,
*,
window_start: datetime,
window_end: datetime,
) -> dict:
base: dict[str, Any] = {}
if spec.include_site_id:
# /TenantGoods/GetGoodsInventoryList 要求 siteId 为数组(标量会触发服务端异常,返回畸形状态行 HTTP/1.1 1400
if spec.endpoint == "/TenantGoods/GetGoodsInventoryList":
base["siteId"] = [store_id]
else:
base["siteId"] = store_id
if spec.requires_window and spec.time_fields:
start_key, end_key = spec.time_fields
base[start_key] = TypeParser.format_timestamp(window_start, self.tz)
base[end_key] = TypeParser.format_timestamp(window_end, self.tz)
params = self._merge_common_params(base)
params.update(spec.extra_params)
return params
# ------------------------------------------------------------------ 结构感知写入ODS 文档 schema
def _get_table_columns(self, table: str) -> list[tuple[str, str, str]]:
cache = getattr(self, "_table_columns_cache", {})
if table in cache:
return cache[table]
if "." in table:
schema, name = table.split(".", 1)
else:
schema, name = "public", table
sql = """
SELECT column_name, data_type, udt_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
"""
with self.db.conn.cursor() as cur:
cur.execute(sql, (schema, name))
cols = [(r[0], (r[1] or "").lower(), (r[2] or "").lower()) for r in cur.fetchall()]
cache[table] = cols
self._table_columns_cache = cache
return cols
def _get_table_pk_columns(self, table: str) -> list[str]:
cache = getattr(self, "_table_pk_cache", {})
if table in cache:
return cache[table]
if "." in table:
schema, name = table.split(".", 1)
else:
schema, name = "public", table
sql = """
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = %s
AND tc.table_name = %s
ORDER BY kcu.ordinal_position
"""
with self.db.conn.cursor() as cur:
cur.execute(sql, (schema, name))
cols = [r[0] for r in cur.fetchall()]
cache[table] = cols
self._table_pk_cache = cache
return cols
def _table_has_column(self, table: str, column: str) -> bool:
col_lower = str(column or "").lower()
return any(c[0].lower() == col_lower for c in self._get_table_columns(table))
def _resolve_snapshot_window_columns(
self, table: str, columns: Sequence[str] | None
) -> list[str]:
if not columns:
return []
col_map = {c[0].lower(): c[0] for c in self._get_table_columns(table)}
resolved: list[str] = []
for col in columns:
if not col:
continue
actual = col_map.get(str(col).lower())
if actual:
resolved.append(actual)
return resolved
@staticmethod
def _coerce_delete_flag(value) -> int | None:
if value is None:
return None
if isinstance(value, bool):
return 1 if value else 0
if isinstance(value, (int, float)):
try:
return 1 if int(value) != 0 else 0
except Exception:
return 1 if value else 0
if isinstance(value, str):
s = value.strip().lower()
if not s:
return None
if s in {"1", "true", "t", "yes", "y"}:
return 1
if s in {"0", "false", "f", "no", "n"}:
return 0
try:
return 1 if int(s) != 0 else 0
except Exception:
return 1 if s else 0
return 1 if value else 0
def _normalize_is_delete_flag(self, record: dict, *, default_if_missing: int | None) -> None:
if not isinstance(record, dict):
return
raw = None
for key in ("is_delete", "is_deleted", "isDelete", "isDeleted"):
if key in record:
raw = record.get(key)
break
candidate = self._get_value_case_insensitive(record, key)
if candidate is not None:
raw = candidate
break
normalized = self._coerce_delete_flag(raw)
if normalized is None:
if default_if_missing is not None:
record["is_delete"] = int(default_if_missing)
return
record["is_delete"] = normalized
@staticmethod
def _normalize_pk_value(value):
if value is None or value == "":
return None
if isinstance(value, str):
parsed = TypeParser.parse_int(value)
if parsed is not None:
return parsed
return value
def _collect_business_keys(
self, records: list, business_pk_cols: Sequence[str]
) -> set[tuple]:
if not records or not business_pk_cols:
return set()
keys: set[tuple] = set()
for rec in records:
if not isinstance(rec, dict):
continue
merged_rec = self._merge_record_layers(rec)
key = tuple(
self._normalize_pk_value(self._get_value_case_insensitive(merged_rec, col))
for col in business_pk_cols
)
if any(v is None or v == "" for v in key):
continue
keys.add(key)
return keys
def _mark_missing_as_deleted(
self,
*,
table: str,
business_pk_cols: Sequence[str],
window_columns: Sequence[str],
window_start: datetime,
window_end: datetime,
key_values: Sequence[tuple],
allow_empty: bool,
full_table: bool,
) -> int:
if not business_pk_cols:
return 0
if not window_columns and not full_table:
return 0
if not self._table_has_column(table, "is_delete"):
return 0
resolved_window_cols = self._resolve_snapshot_window_columns(table, window_columns)
if not full_table and not resolved_window_cols:
return 0
with self.db.conn.cursor() as cur:
if full_table:
base_filter = 't."is_delete" IS DISTINCT FROM 1'
else:
window_clause = " OR ".join(
f'(t."{col}" >= %s AND t."{col}" < %s)' for col in resolved_window_cols
)
window_params: list = []
for _ in resolved_window_cols:
window_params.extend([window_start, window_end])
window_clause_sql = cur.mogrify(window_clause, window_params).decode()
base_filter = f"({window_clause_sql}) AND t.\"is_delete\" IS DISTINCT FROM 1"
if not key_values:
if not allow_empty:
return 0
sql = f"UPDATE {table} t SET is_delete=1 WHERE {base_filter}"
cur.execute(sql)
return int(cur.rowcount or 0)
keys_sql = ", ".join(f'\"{c}\"' for c in business_pk_cols)
join_clause = " AND ".join(f'k.\"{c}\" = t.\"{c}\"' for c in business_pk_cols)
sql = (
f"WITH keys({keys_sql}) AS (VALUES %s) "
f"UPDATE {table} t SET is_delete=1 "
f"WHERE {base_filter} AND NOT EXISTS (SELECT 1 FROM keys k WHERE {join_clause})"
)
key_list = list(key_values)
execute_values(cur, sql, key_list, page_size=len(key_list))
return int(cur.rowcount or 0)
def _insert_records_schema_aware(
self,
*,
table: str,
records: list,
response_payload: dict | list | None,
source_file: str | None,
source_endpoint: str | None,
) -> tuple[int, int, int]:
"""
按 DB 表结构动态写入 ODS。
- 新记录:插入
- 已存在的记录:按冲突策略更新
返回 (inserted, updated, skipped)。
"""
if not records:
return 0, 0, 0
cols_info = self._get_table_columns(table)
if not cols_info:
raise ValueError(f"Cannot resolve columns for table={table}")
pk_cols = self._get_table_pk_columns(table)
db_json_cols_lower = {
c[0].lower() for c in cols_info if c[1] in ("json", "jsonb") or c[2] in ("json", "jsonb")
}
needs_content_hash = any(c[0].lower() == "content_hash" for c in cols_info)
has_is_delete = any(c[0].lower() == "is_delete" for c in cols_info)
default_is_delete = (
0 if has_is_delete and bool(self.config.get("run.snapshot_missing_delete", False)) else None
)
col_names = [c[0] for c in cols_info]
quoted_cols = ", ".join(f'\"{c}\"' for c in col_names)
sql = f"INSERT INTO {table} ({quoted_cols}) VALUES %s"
# 冲突处理模式:
# "nothing" - 跳过已存在记录 (DO NOTHING)
# "backfill" - 只回填 NULL 列 (COALESCE)
# "update" - 全字段对比更新 (覆盖所有变化的字段)
conflict_mode = str(self.config.get("run.ods_conflict_mode", "update")).lower()
# 兼容旧配置
if self.config.get("run.ods_backfill_null_columns") is False:
conflict_mode = "nothing"
if pk_cols:
pk_clause = ", ".join(f'\"{c}\"' for c in pk_cols)
if conflict_mode in ("backfill", "update"):
# 排除主键列fetched_at 保持插入时间,不参与更新
pk_cols_lower = {c.lower() for c in pk_cols}
immutable_update_cols = {"fetched_at"}
update_cols = [
c for c in col_names
if c.lower() not in pk_cols_lower and c.lower() not in immutable_update_cols
]
# 仅用业务字段判断是否需要更新,避免元数据变化触发全量更新
# payload 参与比较(有变化时更新),其余元数据不触发更新
meta_cols = {"source_file", "source_endpoint", "fetched_at", "content_hash"}
compare_cols = [c for c in update_cols if c.lower() not in meta_cols]
if update_cols:
if conflict_mode == "backfill":
# 回填模式:只填充 NULL 列
set_clause = ", ".join(
f'"{c}" = COALESCE({table}."{c}", EXCLUDED."{c}")'
for c in update_cols
)
where_clause = " OR ".join(f'{table}."{c}" IS NULL' for c in update_cols)
sql += f" ON CONFLICT ({pk_clause}) DO UPDATE SET {set_clause} WHERE {where_clause}"
else:
# update 模式:全字段对比更新
set_clause = ", ".join(
f'"{c}" = EXCLUDED."{c}"'
for c in update_cols
)
# 只在有字段变化时才更新
if compare_cols:
where_clause = " OR ".join(
f'{table}."{c}" IS DISTINCT FROM EXCLUDED."{c}"'
for c in compare_cols
)
sql += f" ON CONFLICT ({pk_clause}) DO UPDATE SET {set_clause} WHERE {where_clause}"
else:
sql += f" ON CONFLICT ({pk_clause}) DO UPDATE SET {set_clause}"
else:
sql += f" ON CONFLICT ({pk_clause}) DO NOTHING"
else:
sql += f" ON CONFLICT ({pk_clause}) DO NOTHING"
use_returning = bool(pk_cols)
if use_returning:
sql += " RETURNING (xmax = 0) AS inserted"
now = datetime.now(self.tz)
json_dump = lambda v: json.dumps(v, ensure_ascii=False) # noqa: E731
params: list[tuple] = []
skipped = 0
merged_records: list[dict] = []
root_site_profile = None
if isinstance(response_payload, dict):
data_part = response_payload.get("data")
if isinstance(data_part, dict):
sp = data_part.get("siteProfile") or data_part.get("site_profile")
if isinstance(sp, dict):
root_site_profile = sp
for rec in records:
if not isinstance(rec, dict):
skipped += 1
continue
merged_rec = self._merge_record_layers(rec)
self._normalize_is_delete_flag(merged_rec, default_if_missing=default_is_delete)
merged_records.append({"raw": rec, "merged": merged_rec})
if table in {"billiards_ods.recharge_settlements", "billiards_ods.settlement_records"}:
site_profile = merged_rec.get("siteProfile") or merged_rec.get("site_profile") or root_site_profile
if isinstance(site_profile, dict):
# 避免写入 None 覆盖原本存在的 camelCase 字段(例如 tenantId/siteId/siteName
def _fill_missing(target_col: str, candidates: list[Any]):
existing = self._get_value_case_insensitive(merged_rec, target_col)
if existing not in (None, ""):
return
for cand in candidates:
if cand in (None, "", 0):
continue
merged_rec[target_col] = cand
return
_fill_missing("tenantid", [site_profile.get("tenant_id"), site_profile.get("tenantId")])
_fill_missing("siteid", [site_profile.get("siteId"), site_profile.get("id")])
_fill_missing("sitename", [site_profile.get("shop_name"), site_profile.get("siteName")])
has_fetched_at = any(c[0].lower() == "fetched_at" for c in cols_info)
business_keys = [c for c in pk_cols if str(c).lower() != "content_hash"]
compare_latest = bool(needs_content_hash and has_fetched_at and business_keys)
latest_compare_hash: dict[tuple[Any, ...], str | None] = {}
if compare_latest:
key_values: list[tuple[Any, ...]] = []
for item in merged_records:
merged_rec = item["merged"]
key = tuple(self._get_value_case_insensitive(merged_rec, k) for k in business_keys)
if any(v is None or v == "" for v in key):
continue
key_values.append(key)
if key_values:
with self.db.conn.cursor() as cur:
latest_hashes = self._fetch_latest_content_hashes(cur, table, business_keys, key_values)
for key, value in latest_hashes.items():
latest_compare_hash[key] = value
for item in merged_records:
rec = item["raw"]
merged_rec = item["merged"]
content_hash = None
compare_hash = None
if needs_content_hash:
# content_hash 不包含 fetched_at避免更新时与入库时间不一致
compare_hash = self._compute_content_hash(merged_rec, include_fetched_at=False)
content_hash = compare_hash
if pk_cols:
missing_pk = False
for pk in pk_cols:
if str(pk).lower() == "content_hash":
continue
pk_val = self._get_value_case_insensitive(merged_rec, pk)
if pk_val is None or pk_val == "":
missing_pk = True
break
if missing_pk:
skipped += 1
continue
if compare_latest and compare_hash is not None:
key = tuple(self._get_value_case_insensitive(merged_rec, k) for k in business_keys)
if any(v is None or v == "" for v in key):
skipped += 1
continue
last_hash = latest_compare_hash.get(key)
if last_hash is not None and last_hash == compare_hash:
skipped += 1
continue
row_vals: list[Any] = []
for (col_name, data_type, _udt) in cols_info:
col_lower = col_name.lower()
if col_lower == "payload":
row_vals.append(Json(rec, dumps=json_dump))
continue
if col_lower == "source_file":
row_vals.append(source_file)
continue
if col_lower == "source_endpoint":
row_vals.append(source_endpoint)
continue
if col_lower == "fetched_at":
row_vals.append(now)
continue
if col_lower == "content_hash":
row_vals.append(content_hash)
continue
value = self._normalize_scalar(self._get_value_case_insensitive(merged_rec, col_name))
if col_lower in db_json_cols_lower:
row_vals.append(Json(value, dumps=json_dump) if value is not None else None)
continue
row_vals.append(self._cast_value(value, data_type))
params.append(tuple(row_vals))
if not params:
return 0, 0, skipped
inserted = 0
updated = 0
chunk_size = int(self.config.get("run.ods_execute_values_page_size", 200) or 200)
chunk_size = max(1, min(chunk_size, 2000))
with self.db.conn.cursor() as cur:
for i in range(0, len(params), chunk_size):
chunk = params[i : i + chunk_size]
if use_returning:
rows = execute_values(cur, sql, chunk, page_size=len(chunk), fetch=True)
ins, upd = self._count_returning_flags(rows or [])
inserted += ins
updated += upd
# ON CONFLICT ... DO UPDATE ... WHERE 只会返回“真正受影响”的行。
# 其余未变化/冲突跳过的行需要计入 skipped避免 fetched 与分项不闭合。
affected = len(rows or [])
if affected < len(chunk):
skipped += (len(chunk) - affected)
else:
execute_values(cur, sql, chunk, page_size=len(chunk))
if cur.rowcount is not None and cur.rowcount > 0:
inserted += int(cur.rowcount)
if cur.rowcount < len(chunk):
skipped += (len(chunk) - int(cur.rowcount))
elif cur.rowcount == 0:
skipped += len(chunk)
return inserted, updated, skipped
@staticmethod
def _count_returning_flags(rows: Iterable[Any]) -> tuple[int, int]:
"""Count inserted vs updated from RETURNING (xmax = 0) rows."""
inserted = 0
updated = 0
for row in rows or []:
if isinstance(row, dict):
flag = row.get("inserted")
else:
flag = row[0] if row else None
if flag:
inserted += 1
else:
updated += 1
return inserted, updated
@staticmethod
def _merge_record_layers(record: dict) -> dict:
merged = record
data_part = merged.get("data")
while isinstance(data_part, dict):
merged = {**data_part, **merged}
data_part = data_part.get("data")
settle_inner = merged.get("settleList")
if isinstance(settle_inner, dict):
merged = {**settle_inner, **merged}
return merged
@staticmethod
def _get_value_case_insensitive(record: dict | None, col: str | None):
if record is None or col is None:
return None
if col in record:
return record.get(col)
col_lower = col.lower()
for k, v in record.items():
if isinstance(k, str) and k.lower() == col_lower:
return v
return None
@staticmethod
def _normalize_scalar(value):
if value == "" or value == "{}" or value == "[]":
return None
return value
@staticmethod
def _cast_value(value, data_type: str):
if value is None:
return None
dt = (data_type or "").lower()
if dt == "boolean":
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.lower() in ("true", "1", "yes", "t")
return bool(value)
if dt in ("integer", "bigint", "smallint"):
if isinstance(value, bool):
return int(value)
try:
return int(value)
except Exception:
return None
if dt in ("numeric", "double precision", "real", "decimal"):
if isinstance(value, bool):
return int(value)
try:
return float(value)
except Exception:
return None
if dt.startswith("timestamp") or dt in ("date", "time", "interval"):
return value if isinstance(value, (str, datetime)) else None
return value
def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None:
resolver = getattr(self.api, "get_source_hint", None)
if callable(resolver):
return resolver(spec.endpoint)
return None
@staticmethod
def _hash_default(value):
if isinstance(value, datetime):
return value.isoformat()
return str(value)
@classmethod
def _sanitize_record_for_hash(cls, record: dict, *, include_fetched_at: bool) -> dict:
exclude = {
"data",
"payload",
"source_file",
"source_endpoint",
"content_hash",
"record_index",
}
if not include_fetched_at:
exclude.add("fetched_at")
def _strip(value):
if isinstance(value, dict):
cleaned = {}
for k, v in value.items():
if isinstance(k, str) and k.lower() in exclude:
continue
cleaned[k] = _strip(v)
return cleaned
if isinstance(value, list):
return [_strip(v) for v in value]
return value
return _strip(record or {})
@classmethod
def _compute_content_hash(cls, record: dict, *, include_fetched_at: bool) -> str:
cleaned = cls._sanitize_record_for_hash(record, include_fetched_at=include_fetched_at)
payload = json.dumps(
cleaned,
ensure_ascii=False,
sort_keys=True,
separators=(",", ":"),
default=cls._hash_default,
)
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
@staticmethod
def _compute_compare_hash_from_payload(payload: Any) -> str | None:
if payload is None:
return None
if isinstance(payload, str):
try:
payload = json.loads(payload)
except Exception:
return None
if not isinstance(payload, dict):
return None
merged = BaseOdsTask._merge_record_layers(payload)
return BaseOdsTask._compute_content_hash(merged, include_fetched_at=False)
@staticmethod
def _fetch_latest_content_hashes(
cur, table: str, business_keys: Sequence[str], key_values: Sequence[tuple]
) -> dict:
if not business_keys or not key_values:
return {}
keys_sql = ", ".join(f'"{k}"' for k in business_keys)
sql = (
f"WITH keys({keys_sql}) AS (VALUES %s) "
f"SELECT DISTINCT ON ({keys_sql}) {keys_sql}, content_hash "
f"FROM {table} t JOIN keys k USING ({keys_sql}) "
f"ORDER BY {keys_sql}, fetched_at DESC NULLS LAST"
)
unique_keys = list({tuple(k) for k in key_values})
execute_values(cur, sql, unique_keys, page_size=500)
rows = cur.fetchall() or []
result = {}
if rows and isinstance(rows[0], dict):
for r in rows:
key = tuple(r[k] for k in business_keys)
result[key] = r.get("content_hash")
return result
key_len = len(business_keys)
for r in rows:
key = tuple(r[:key_len])
value = r[key_len] if len(r) > key_len else None
result[key] = value
return result
def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec:
return ColumnSpec(
column=name,
sources=sources,
required=required,
transform=TypeParser.parse_int,
)
def _decimal_col(name: str, *sources: str) -> ColumnSpec:
"""??????????????"""
return ColumnSpec(
column=name,
sources=sources,
transform=lambda v: TypeParser.parse_decimal(v, 2),
)
def _bool_col(name: str, *sources: str) -> ColumnSpec:
"""??????????????0/1?true/false ???"""
def _to_bool(value):
if value is None:
return None
if isinstance(value, bool):
return value
s = str(value).strip().lower()
if s in {"1", "true", "t", "yes", "y"}:
return True
if s in {"0", "false", "f", "no", "n"}:
return False
return bool(value)
return ColumnSpec(column=name, sources=sources, transform=_to_bool)
ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = (
OdsTaskSpec(
code="ODS_ASSISTANT_ACCOUNT",
class_name="OdsAssistantAccountsTask",
table_name="billiards_ods.assistant_accounts_master",
endpoint="/PersonnelManagement/SearchAssistantInfo",
data_path=("data",),
list_key="assistantInfos",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
snapshot_full_table=True,
description="助教账号档案 ODSSearchAssistantInfo -> assistantInfos 原始 JSON",
),
OdsTaskSpec(
code="ODS_SETTLEMENT_RECORDS",
class_name="OdsOrderSettleTask",
table_name="billiards_ods.settlement_records",
endpoint="/Site/GetAllOrderSettleList",
data_path=("data",),
list_key="settleList",
time_fields=("rangeStartTime", "rangeEndTime"),
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=True,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=True,
description="结账记录 ODSGetAllOrderSettleList -> settleList 原始 JSON",
),
OdsTaskSpec(
code="ODS_TABLE_USE",
class_name="OdsTableUseTask",
table_name="billiards_ods.table_fee_transactions",
endpoint="/Site/GetSiteTableOrderDetails",
data_path=("data",),
list_key="siteTableUseDetailsList",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_window_columns=("create_time",),
description="台费计费流水 ODSGetSiteTableOrderDetails -> siteTableUseDetailsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_ASSISTANT_LEDGER",
class_name="OdsAssistantLedgerTask",
table_name="billiards_ods.assistant_service_records",
endpoint="/AssistantPerformance/GetOrderAssistantDetails",
data_path=("data",),
list_key="orderAssistantDetails",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
snapshot_window_columns=("create_time",),
description="助教服务流水 ODSGetOrderAssistantDetails -> orderAssistantDetails 原始 JSON",
),
OdsTaskSpec(
code="ODS_ASSISTANT_ABOLISH",
class_name="OdsAssistantAbolishTask",
table_name="billiards_ods.assistant_cancellation_records",
endpoint="/AssistantPerformance/GetAbolitionAssistant",
data_path=("data",),
list_key="abolitionAssistants",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
description="助教废除记录 ODSGetAbolitionAssistant -> abolitionAssistants 原始 JSON",
),
OdsTaskSpec(
code="ODS_STORE_GOODS_SALES",
class_name="OdsGoodsLedgerTask",
table_name="billiards_ods.store_goods_sales_records",
endpoint="/TenantGoods/GetGoodsSalesList",
data_path=("data",),
list_key="orderGoodsLedgers",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_window_columns=("create_time",),
description="门店商品销售流水 ODSGetGoodsSalesList -> orderGoodsLedgers 原始 JSON",
),
OdsTaskSpec(
code="ODS_PAYMENT",
class_name="OdsPaymentTask",
table_name="billiards_ods.payment_transactions",
endpoint="/PayLog/GetPayLogListPage",
data_path=("data",),
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
description="支付流水 ODSGetPayLogListPage 原始 JSON",
),
OdsTaskSpec(
code="ODS_REFUND",
class_name="OdsRefundTask",
table_name="billiards_ods.refund_transactions",
endpoint="/Order/GetRefundPayLogList",
data_path=("data",),
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_window_columns=("pay_time",),
description="退款流水 ODSGetRefundPayLogList 原始 JSON",
),
OdsTaskSpec(
code="ODS_PLATFORM_COUPON",
class_name="OdsCouponVerifyTask",
table_name="billiards_ods.platform_coupon_redemption_records",
endpoint="/Promotion/GetOfflineCouponConsumePageList",
data_path=("data",),
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_window_columns=("consume_time",),
description="平台/团购券核销 ODSGetOfflineCouponConsumePageList 原始 JSON",
),
OdsTaskSpec(
code="ODS_MEMBER",
class_name="OdsMemberTask",
table_name="billiards_ods.member_profiles",
endpoint="/MemberProfile/GetTenantMemberList",
data_path=("data",),
list_key="tenantMemberInfos",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
description="会员档案 ODSGetTenantMemberList -> tenantMemberInfos 原始 JSON",
),
OdsTaskSpec(
code="ODS_MEMBER_CARD",
class_name="OdsMemberCardTask",
table_name="billiards_ods.member_stored_value_cards",
endpoint="/MemberProfile/GetTenantMemberCardList",
data_path=("data",),
list_key="tenantMemberCards",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_full_table=True,
description="会员储值卡 ODSGetTenantMemberCardList -> tenantMemberCards 原始 JSON",
),
OdsTaskSpec(
code="ODS_MEMBER_BALANCE",
class_name="OdsMemberBalanceTask",
table_name="billiards_ods.member_balance_changes",
endpoint="/MemberProfile/GetMemberCardBalanceChange",
data_path=("data",),
list_key="tenantMemberCardLogs",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_window_columns=("create_time",),
description="会员余额变动 ODSGetMemberCardBalanceChange -> tenantMemberCardLogs 原始 JSON",
),
OdsTaskSpec(
code="ODS_RECHARGE_SETTLE",
class_name="OdsRechargeSettleTask",
table_name="billiards_ods.recharge_settlements",
endpoint="/Site/GetRechargeSettleList",
data_path=("data",),
list_key="settleList",
time_fields=("rangeStartTime", "rangeEndTime"),
pk_columns=(_int_col("recharge_order_id", "settleList.id", "id", required=True),),
extra_columns=(
_int_col("tenant_id", "settleList.tenantId", "tenantId"),
_int_col("site_id", "settleList.siteId", "siteId", "siteProfile.id"),
ColumnSpec("site_name_snapshot", sources=("siteProfile.shop_name", "settleList.siteName")),
_int_col("member_id", "settleList.memberId", "memberId"),
ColumnSpec("member_name_snapshot", sources=("settleList.memberName", "memberName")),
ColumnSpec("member_phone_snapshot", sources=("settleList.memberPhone", "memberPhone")),
_int_col("tenant_member_card_id", "settleList.tenantMemberCardId", "tenantMemberCardId"),
ColumnSpec("member_card_type_name", sources=("settleList.memberCardTypeName", "memberCardTypeName")),
_int_col("settle_relate_id", "settleList.settleRelateId", "settleRelateId"),
_int_col("settle_type", "settleList.settleType", "settleType"),
ColumnSpec("settle_name", sources=("settleList.settleName", "settleName")),
_int_col("is_first", "settleList.isFirst", "isFirst"),
_int_col("settle_status", "settleList.settleStatus", "settleStatus"),
_decimal_col("pay_amount", "settleList.payAmount", "payAmount"),
_decimal_col("refund_amount", "settleList.refundAmount", "refundAmount"),
_decimal_col("point_amount", "settleList.pointAmount", "pointAmount"),
_decimal_col("cash_amount", "settleList.cashAmount", "cashAmount"),
_decimal_col("online_amount", "settleList.onlineAmount", "onlineAmount"),
_decimal_col("balance_amount", "settleList.balanceAmount", "balanceAmount"),
_decimal_col("card_amount", "settleList.cardAmount", "cardAmount"),
_decimal_col("coupon_amount", "settleList.couponAmount", "couponAmount"),
_decimal_col("recharge_card_amount", "settleList.rechargeCardAmount", "rechargeCardAmount"),
_decimal_col("gift_card_amount", "settleList.giftCardAmount", "giftCardAmount"),
_decimal_col("prepay_money", "settleList.prepayMoney", "prepayMoney"),
_decimal_col("consume_money", "settleList.consumeMoney", "consumeMoney"),
_decimal_col("goods_money", "settleList.goodsMoney", "goodsMoney"),
_decimal_col("real_goods_money", "settleList.realGoodsMoney", "realGoodsMoney"),
_decimal_col("table_charge_money", "settleList.tableChargeMoney", "tableChargeMoney"),
_decimal_col("service_money", "settleList.serviceMoney", "serviceMoney"),
_decimal_col("activity_discount", "settleList.activityDiscount", "activityDiscount"),
_decimal_col("all_coupon_discount", "settleList.allCouponDiscount", "allCouponDiscount"),
_decimal_col("goods_promotion_money", "settleList.goodsPromotionMoney", "goodsPromotionMoney"),
_decimal_col("assistant_promotion_money", "settleList.assistantPromotionMoney", "assistantPromotionMoney"),
_decimal_col("assistant_pd_money", "settleList.assistantPdMoney", "assistantPdMoney"),
_decimal_col("assistant_cx_money", "settleList.assistantCxMoney", "assistantCxMoney"),
_decimal_col("assistant_manual_discount", "settleList.assistantManualDiscount", "assistantManualDiscount"),
_decimal_col("coupon_sale_amount", "settleList.couponSaleAmount", "couponSaleAmount"),
_decimal_col("member_discount_amount", "settleList.memberDiscountAmount", "memberDiscountAmount"),
_decimal_col("point_discount_price", "settleList.pointDiscountPrice", "pointDiscountPrice"),
_decimal_col("point_discount_cost", "settleList.pointDiscountCost", "pointDiscountCost"),
_decimal_col("adjust_amount", "settleList.adjustAmount", "adjustAmount"),
_decimal_col("rounding_amount", "settleList.roundingAmount", "roundingAmount"),
_int_col("payment_method", "settleList.paymentMethod", "paymentMethod"),
_bool_col("can_be_revoked", "settleList.canBeRevoked", "canBeRevoked"),
_bool_col("is_bind_member", "settleList.isBindMember", "isBindMember"),
_bool_col("is_activity", "settleList.isActivity", "isActivity"),
_bool_col("is_use_coupon", "settleList.isUseCoupon", "isUseCoupon"),
_bool_col("is_use_discount", "settleList.isUseDiscount", "isUseDiscount"),
_int_col("operator_id", "settleList.operatorId", "operatorId"),
ColumnSpec("operator_name_snapshot", sources=("settleList.operatorName", "operatorName")),
_int_col("salesman_user_id", "settleList.salesManUserId", "salesmanUserId", "salesManUserId"),
ColumnSpec("salesman_name", sources=("settleList.salesManName", "salesmanName", "settleList.salesmanName")),
ColumnSpec("order_remark", sources=("settleList.orderRemark", "orderRemark")),
_int_col("table_id", "settleList.tableId", "tableId"),
_int_col("serial_number", "settleList.serialNumber", "serialNumber"),
_int_col("revoke_order_id", "settleList.revokeOrderId", "revokeOrderId"),
ColumnSpec("revoke_order_name", sources=("settleList.revokeOrderName", "revokeOrderName")),
ColumnSpec("revoke_time", sources=("settleList.revokeTime", "revokeTime")),
ColumnSpec("create_time", sources=("settleList.createTime", "createTime")),
ColumnSpec("pay_time", sources=("settleList.payTime", "payTime")),
ColumnSpec("site_profile", sources=("siteProfile",)),
),
include_site_column=False,
include_source_endpoint=True,
include_page_no=False,
include_page_size=False,
include_fetched_at=True,
include_record_index=False,
conflict_columns_override=None,
requires_window=True,
description="?????? ODS?GetRechargeSettleList -> data.settleList ????",
),
OdsTaskSpec(
code="ODS_GROUP_PACKAGE",
class_name="OdsPackageTask",
table_name="billiards_ods.group_buy_packages",
endpoint="/PackageCoupon/QueryPackageCouponList",
data_path=("data",),
list_key="packageCouponList",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_full_table=True,
description="团购套餐定义 ODSQueryPackageCouponList -> packageCouponList 原始 JSON",
),
OdsTaskSpec(
code="ODS_GROUP_BUY_REDEMPTION",
class_name="OdsGroupBuyRedemptionTask",
table_name="billiards_ods.group_buy_redemption_records",
endpoint="/Site/GetSiteTableUseDetails",
data_path=("data",),
list_key="siteTableUseDetailsList",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_window_columns=("create_time",),
description="团购套餐核销 ODSGetSiteTableUseDetails -> siteTableUseDetailsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_INVENTORY_STOCK",
class_name="OdsInventoryStockTask",
table_name="billiards_ods.goods_stock_summary",
endpoint="/TenantGoods/GetGoodsStockReport",
data_path=("data",),
pk_columns=(_int_col("sitegoodsid", "siteGoodsId", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
description="库存汇总 ODSGetGoodsStockReport 原始 JSON",
),
OdsTaskSpec(
code="ODS_INVENTORY_CHANGE",
class_name="OdsInventoryChangeTask",
table_name="billiards_ods.goods_stock_movements",
endpoint="/GoodsStockManage/QueryGoodsOutboundReceipt",
data_path=("data",),
list_key="queryDeliveryRecordsList",
pk_columns=(_int_col("sitegoodsstockid", "siteGoodsStockId", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
description="库存变化记录 ODSQueryGoodsOutboundReceipt -> queryDeliveryRecordsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_TABLES",
class_name="OdsTablesTask",
table_name="billiards_ods.site_tables_master",
endpoint="/Table/GetSiteTables",
data_path=("data",),
list_key="siteTables",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
description="台桌维表 ODSGetSiteTables -> siteTables 原始 JSON",
),
OdsTaskSpec(
code="ODS_GOODS_CATEGORY",
class_name="OdsGoodsCategoryTask",
table_name="billiards_ods.stock_goods_category_tree",
endpoint="/TenantGoodsCategory/QueryPrimarySecondaryCategory",
data_path=("data",),
list_key="goodsCategoryList",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
description="库存商品分类鏍?ODSQueryPrimarySecondaryCategory -> goodsCategoryList 原始 JSON",
),
OdsTaskSpec(
code="ODS_STORE_GOODS",
class_name="OdsStoreGoodsTask",
table_name="billiards_ods.store_goods_master",
endpoint="/TenantGoods/GetGoodsInventoryList",
data_path=("data",),
list_key="orderGoodsList",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_full_table=True,
description="门店商品档案 ODSGetGoodsInventoryList -> orderGoodsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_TABLE_FEE_DISCOUNT",
class_name="OdsTableDiscountTask",
table_name="billiards_ods.table_fee_discount_records",
endpoint="/Site/GetTaiFeeAdjustList",
data_path=("data",),
list_key="taiFeeAdjustInfos",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_window_columns=("create_time",),
description="台费折扣/调账 ODSGetTaiFeeAdjustList -> taiFeeAdjustInfos 原始 JSON",
),
OdsTaskSpec(
code="ODS_TENANT_GOODS",
class_name="OdsTenantGoodsTask",
table_name="billiards_ods.tenant_goods_master",
endpoint="/TenantGoods/QueryTenantGoods",
data_path=("data",),
list_key="tenantGoodsList",
pk_columns=(_int_col("id", "id", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=False,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
snapshot_full_table=True,
description="租户商品档案 ODSQueryTenantGoods -> tenantGoodsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_SETTLEMENT_TICKET",
class_name="OdsSettlementTicketTask",
table_name="billiards_ods.settlement_ticket_details",
endpoint="/Order/GetOrderSettleTicketNew",
data_path=(),
list_key=None,
pk_columns=(_int_col("ordersettleid", "orderSettleId", required=True),),
include_site_column=False,
include_source_endpoint=False,
include_page_no=False,
include_page_size=False,
include_fetched_at=True,
include_record_index=True,
conflict_columns_override=("source_file", "record_index"),
requires_window=False,
include_site_id=False,
description="结账小票详情 ODSGetOrderSettleTicketNew 原始 JSON",
),
)
def _get_spec(code: str) -> OdsTaskSpec:
for spec in ODS_TASK_SPECS:
if spec.code == code:
return spec
raise KeyError(f"Spec not found for code {code}")
_SETTLEMENT_TICKET_SPEC = _get_spec("ODS_SETTLEMENT_TICKET")
class OdsSettlementTicketTask(BaseOdsTask):
"""Special handling: fetch ticket details per payment relate_id/orderSettleId."""
SPEC = _SETTLEMENT_TICKET_SPEC
def extract(self, context) -> dict:
"""Fetch ticket payloads only (used by fetch-only pipeline)."""
existing_ids = self._fetch_existing_ticket_ids()
candidates = self._collect_settlement_ids(
context.store_id or 0, existing_ids, context.window_start, context.window_end
)
candidates = [cid for cid in candidates if cid and cid not in existing_ids]
payloads, skipped = self._fetch_ticket_payloads(candidates)
return {"records": payloads, "skipped": skipped, "fetched": len(candidates)}
def execute(self, cursor_data: dict | None = None) -> dict:
spec = self.SPEC
base_context = self._build_context(cursor_data)
segments = build_window_segments(
self.config,
base_context.window_start,
base_context.window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(base_context.window_start, base_context.window_end)]
total_segments = len(segments)
if total_segments > 1:
self.logger.info("%s: 窗口拆分为 %s", spec.code, total_segments)
store_id = TypeParser.parse_int(self.config.get("app.store_id")) or 0
counts_total = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
segment_results: list[dict] = []
source_file = self._resolve_source_file_hint(spec)
try:
existing_ids = self._fetch_existing_ticket_ids()
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
context = self._build_context_for_window(seg_start, seg_end, cursor_data)
self.logger.info(
"%s: 开始执行(%s/%s),窗口[%s ~ %s]",
spec.code,
idx,
total_segments,
context.window_start,
context.window_end,
)
candidates = self._collect_settlement_ids(
store_id, existing_ids, context.window_start, context.window_end
)
candidates = [cid for cid in candidates if cid and cid not in existing_ids]
segment_counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
segment_counts["fetched"] = len(candidates)
if not candidates:
self.logger.info(
"%s: 窗口[%s ~ %s] 未发现需要抓取的小票",
spec.code,
context.window_start,
context.window_end,
)
self._accumulate_counts(counts_total, segment_counts)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": segment_counts,
}
)
continue
payloads, skipped = self._fetch_ticket_payloads(candidates)
segment_counts["skipped"] += skipped
inserted, updated, skipped2 = self._insert_records_schema_aware(
table=spec.table_name,
records=payloads,
response_payload=None,
source_file=source_file,
source_endpoint=spec.endpoint,
)
segment_counts["inserted"] += inserted
segment_counts["updated"] += updated
segment_counts["skipped"] += skipped2
self.db.commit()
existing_ids.update(candidates)
self._accumulate_counts(counts_total, segment_counts)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": context.window_start,
"end": context.window_end,
"minutes": context.window_minutes,
},
"counts": segment_counts,
}
)
self.logger.info(
"%s: 小票抓取完成,抓取=%s 插入=%s 更新=%s 跳过=%s",
spec.code,
counts_total["fetched"],
counts_total["inserted"],
counts_total["updated"],
counts_total["skipped"],
)
result = self._build_result("SUCCESS", counts_total)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result["window"] = {
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
if segment_results:
result["segments"] = segment_results
result["request_params"] = {"candidates": counts_total["fetched"]}
return result
except Exception:
counts_total["errors"] += 1
self.db.rollback()
self.logger.error("%s: 小票抓取失败", spec.code, exc_info=True)
raise
def _fetch_existing_ticket_ids(self) -> set[int]:
sql = """
SELECT DISTINCT
CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$'
THEN (payload ->> 'orderSettleId')::bigint
END AS order_settle_id
FROM billiards_ods.settlement_ticket_details
"""
try:
rows = self.db.query(sql)
except Exception:
self.logger.warning("查询已有小票失败,按空集处理", exc_info=True)
return set()
return {
TypeParser.parse_int(row.get("order_settle_id"))
for row in rows
if row.get("order_settle_id") is not None
}
def _collect_settlement_ids(
self, store_id: int, existing_ids: set[int], window_start, window_end
) -> list[int]:
ids = self._fetch_from_payment_table(store_id)
if not ids:
ids = self._fetch_from_payment_api(store_id, window_start, window_end)
return sorted(i for i in ids if i is not None and i not in existing_ids)
def _fetch_from_payment_table(self, store_id: int) -> set[int]:
sql = """
SELECT DISTINCT COALESCE(
CASE WHEN (payload ->> 'orderSettleId') ~ '^[0-9]+$'
THEN (payload ->> 'orderSettleId')::bigint END,
CASE WHEN (payload ->> 'relateId') ~ '^[0-9]+$'
THEN (payload ->> 'relateId')::bigint END
) AS order_settle_id
FROM billiards_ods.payment_transactions
WHERE (payload ->> 'orderSettleId') ~ '^[0-9]+$'
OR (payload ->> 'relateId') ~ '^[0-9]+$'
"""
params = None
if store_id:
sql += " AND COALESCE((payload ->> 'siteId')::bigint, %s) = %s"
params = (store_id, store_id)
try:
rows = self.db.query(sql, params)
except Exception:
self.logger.warning("读取支付流水以获取结算单ID失败将尝试调用支付接口回退", exc_info=True)
return set()
return {
TypeParser.parse_int(row.get("order_settle_id"))
for row in rows
if row.get("order_settle_id") is not None
}
def _fetch_from_payment_api(self, store_id: int, window_start, window_end) -> set[int]:
params = self._merge_common_params(
{
"siteId": store_id,
"StartPayTime": TypeParser.format_timestamp(window_start, self.tz),
"EndPayTime": TypeParser.format_timestamp(window_end, self.tz),
}
)
candidate_ids: set[int] = set()
try:
for _, records, _, _ in self.api.iter_paginated(
endpoint="/PayLog/GetPayLogListPage",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data",),
):
for rec in records:
relate_id = TypeParser.parse_int(
(rec or {}).get("relateId")
or (rec or {}).get("orderSettleId")
or (rec or {}).get("order_settle_id")
)
if relate_id:
candidate_ids.add(relate_id)
except Exception:
self.logger.warning("调用支付接口获取结算单ID失败当前批次将跳过回退来源", exc_info=True)
return candidate_ids
def _fetch_ticket_payload(self, order_settle_id: int):
payload = None
try:
for _, _, _, response in self.api.iter_paginated(
endpoint=self.SPEC.endpoint,
params={"orderSettleId": order_settle_id},
page_size=None,
data_path=self.SPEC.data_path,
list_key=self.SPEC.list_key,
):
payload = response
except Exception:
self.logger.warning(
"调用小票接口失败 orderSettleId=%s", order_settle_id, exc_info=True
)
if isinstance(payload, dict) and isinstance(payload.get("data"), list) and len(payload["data"]) == 1:
# 本地桩回放可能把响应包装成单元素 list这里展开以贴近真实结果
payload = payload["data"][0]
return payload
def _fetch_ticket_payloads(self, candidates: list[int]) -> tuple[list, int]:
"""Fetch ticket payloads for a set of orderSettleIds; returns (payloads, skipped)."""
payloads: list = []
skipped = 0
for order_settle_id in candidates:
payload = self._fetch_ticket_payload(order_settle_id)
if payload:
payloads.append(payload)
else:
skipped += 1
return payloads, skipped
def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]:
attrs = {
"SPEC": spec,
"__doc__": spec.description or f"ODS ingestion task {spec.code}",
"__module__": __name__,
}
return type(spec.class_name, (BaseOdsTask,), attrs)
ENABLED_ODS_CODES = {
"ODS_ASSISTANT_ACCOUNT",
"ODS_ASSISTANT_LEDGER",
"ODS_ASSISTANT_ABOLISH",
"ODS_INVENTORY_CHANGE",
"ODS_INVENTORY_STOCK",
"ODS_GROUP_PACKAGE",
"ODS_GROUP_BUY_REDEMPTION",
"ODS_MEMBER",
"ODS_MEMBER_BALANCE",
"ODS_MEMBER_CARD",
"ODS_PAYMENT",
"ODS_REFUND",
"ODS_PLATFORM_COUPON",
"ODS_RECHARGE_SETTLE",
"ODS_TABLE_USE",
"ODS_TABLES",
"ODS_GOODS_CATEGORY",
"ODS_STORE_GOODS",
"ODS_TABLE_FEE_DISCOUNT",
"ODS_STORE_GOODS_SALES",
"ODS_TENANT_GOODS",
"ODS_SETTLEMENT_TICKET",
"ODS_SETTLEMENT_RECORDS",
}
ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = {
spec.code: _build_task_class(spec)
for spec in ODS_TASK_SPECS
if spec.code in ENABLED_ODS_CODES
}
# 使用专用的结账小票实现覆盖默认流程
ODS_TASK_CLASSES["ODS_SETTLEMENT_TICKET"] = OdsSettlementTicketTask
__all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask", "ENABLED_ODS_CODES"]