Files
Neo-ZQYY/apps/etl/connectors/feiqiu/tasks/ods/ods_tasks.py

1620 lines
67 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""ODS ingestion tasks."""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type
from dateutil import parser as dtparser
from psycopg2.extras import Json, execute_values
from models.parsers import TypeParser
from tasks.base_task import BaseTask
from utils.windowing import build_window_segments, calc_window_minutes, calc_window_days, format_window_days
ColumnTransform = Callable[[Any], Any]
class SnapshotMode(Enum):
"""ODS 快照软删除策略。"""
NONE = "none" # 不做快照对比,不触发软删除
FULL_TABLE = "full_table" # 全表快照:对比全表所有记录
WINDOW = "window" # 窗口快照:仅对比时间窗口内的记录
@dataclass(frozen=True)
class ColumnSpec:
"""Mapping between DB column and source JSON field."""
column: str
sources: Tuple[str, ...] = ()
required: bool = False
default: Any = None
transform: ColumnTransform | None = None
@dataclass(frozen=True)
class OdsTaskSpec:
"""Definition of a single ODS ingestion task."""
code: str
class_name: str
table_name: str
endpoint: str
data_path: Tuple[str, ...] = ("data",)
list_key: str | None = None
pk_columns: Tuple[ColumnSpec, ...] = ()
extra_columns: Tuple[ColumnSpec, ...] = ()
include_source_file: bool = True
include_source_endpoint: bool = True
include_record_index: bool = False
include_fetched_at: bool = True
"""是否写入 fetched_at 列。仅控制元数据写入,不影响去重行为(去重由 skip_unchanged 控制)。"""
requires_window: bool = True
time_fields: Tuple[str, str] | None = ("startTime", "endTime")
include_site_id: bool = True
description: str = ""
extra_params: Dict[str, Any] = field(default_factory=dict)
# 去重开关:为 True 时,若目标表有 content_hash 列且有业务主键,则跳过内容未变的记录。
skip_unchanged: bool = True
# 快照软删除策略(替代原 snapshot_full_table + snapshot_window_columns
snapshot_mode: SnapshotMode = SnapshotMode.NONE
# WINDOW 模式的时间列名
snapshot_time_column: str | None = None
def __post_init__(self) -> None:
if self.snapshot_mode == SnapshotMode.WINDOW and not self.snapshot_time_column:
raise ValueError(
f"任务 {self.code}: snapshot_mode=WINDOW 时必须指定 snapshot_time_column"
)
if self.snapshot_mode != SnapshotMode.WINDOW and self.snapshot_time_column is not None:
raise ValueError(
f"任务 {self.code}: snapshot_mode={self.snapshot_mode.value} 时不应指定 snapshot_time_column"
)
class BaseOdsTask(BaseTask):
"""Shared functionality for ODS ingestion tasks."""
SPEC: OdsTaskSpec
def get_task_code(self) -> str:
return self.SPEC.code
def execute(self, cursor_data: dict | None = None) -> dict:
spec = self.SPEC
self.logger.info("开始执行%s (ODS)", spec.code)
window_start, window_end, window_minutes = self._resolve_window(cursor_data)
segments = build_window_segments(
self.config,
window_start,
window_end,
tz=self.tz,
override_only=True,
)
if not segments:
segments = [(window_start, window_end)]
total_segments = len(segments)
total_days = sum(calc_window_days(s, e) for s, e in segments) if segments else 0.0
processed_days = 0.0
if total_segments > 1:
self.logger.info(
"%s: 窗口拆分为 %s 段(共 %s 天)",
spec.code,
total_segments,
format_window_days(total_days),
)
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
if not store_id:
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
page_size = self.config.get("api.page_size", 200)
total_counts = {
"fetched": 0,
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": 0,
"deleted": 0,
}
segment_results: list[dict] = []
params_list: list[dict] = []
source_file = self._resolve_source_file_hint(spec)
# 默认 True任务层声明了 snapshot_mode != NONE 即生效;运行时可设为 False 紧急关闭
snapshot_missing_delete = bool(self.config.get("run.snapshot_missing_delete", True))
snapshot_allow_empty = bool(self.config.get("run.snapshot_allow_empty_delete", False))
# CHANGE 2026-02-18 | 防止 API 返回数据被截断时误删早期记录
snapshot_protect_early_cutoff = bool(self.config.get("run.snapshot_protect_early_cutoff", True))
snapshot_mode = spec.snapshot_mode
snapshot_time_column = spec.snapshot_time_column
business_pk_cols = [
c for c in self._get_table_pk_columns(spec.table_name) if str(c).lower() != "content_hash"
]
has_is_delete = self._table_has_column(spec.table_name, "is_delete")
try:
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
params = self._build_params(
spec,
store_id,
window_start=seg_start,
window_end=seg_end,
)
params_list.append(params)
segment_counts = {
"fetched": 0,
"inserted": 0,
"updated": 0,
"skipped": 0,
"errors": 0,
"deleted": 0,
}
segment_keys: set[tuple] = set()
# CHANGE 2026-02-18 | 收集 WINDOW 模式下 API 返回数据的实际最早时间戳
segment_earliest_time: datetime | None = None
self.logger.info(
"%s: 开始执行(%s/%s),窗口[%s ~ %s]",
spec.code,
idx,
total_segments,
seg_start,
seg_end,
)
for _, page_records, _, response_payload in self.api.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
if (
snapshot_missing_delete
and has_is_delete
and business_pk_cols
and snapshot_mode != SnapshotMode.NONE
):
segment_keys.update(self._collect_business_keys(page_records, business_pk_cols))
# CHANGE 2026-02-18 | 收集实际最早时间戳,用于 early-cutoff 保护
if (
snapshot_protect_early_cutoff
and snapshot_mode == SnapshotMode.WINDOW
and snapshot_time_column
):
page_earliest = self._collect_earliest_time(
page_records, snapshot_time_column
)
if page_earliest is not None:
if segment_earliest_time is None or page_earliest < segment_earliest_time:
segment_earliest_time = page_earliest
inserted, updated, skipped = self._insert_records_schema_aware(
table=spec.table_name,
records=page_records,
response_payload=response_payload,
source_file=source_file,
source_endpoint=spec.endpoint if spec.include_source_endpoint else None,
)
segment_counts["fetched"] += len(page_records)
segment_counts["inserted"] += inserted
segment_counts["updated"] += updated
segment_counts["skipped"] += skipped
if (
snapshot_missing_delete
and has_is_delete
and business_pk_cols
and snapshot_mode != SnapshotMode.NONE
):
if segment_counts["fetched"] > 0 or snapshot_allow_empty:
# CHANGE 2026-02-18 | early-cutoff 保护:用 API 实际最早时间戳收窄软删除范围
effective_window_start = seg_start
if (
snapshot_protect_early_cutoff
and snapshot_mode == SnapshotMode.WINDOW
and segment_earliest_time is not None
and segment_earliest_time > seg_start
):
self.logger.info(
"%s: early-cutoff 保护生效,软删除窗口起点从 %s 收窄至 %s",
spec.code, seg_start, segment_earliest_time,
)
effective_window_start = segment_earliest_time
deleted = self._mark_missing_as_deleted(
table=spec.table_name,
business_pk_cols=business_pk_cols,
snapshot_mode=snapshot_mode,
snapshot_time_column=snapshot_time_column,
window_start=effective_window_start,
window_end=seg_end,
key_values=segment_keys,
allow_empty=snapshot_allow_empty,
)
if deleted:
segment_counts["inserted"] += deleted
segment_counts["deleted"] += deleted
self.db.commit()
self._accumulate_counts(total_counts, segment_counts)
segment_days = calc_window_days(seg_start, seg_end)
processed_days += segment_days
if total_segments > 1:
self.logger.info(
"%s: 完成(%s/%s),已处理 %s/%s",
spec.code,
idx,
total_segments,
format_window_days(processed_days),
format_window_days(total_days),
)
if total_segments > 1:
segment_results.append(
{
"window": {
"start": seg_start,
"end": seg_end,
"minutes": calc_window_minutes(seg_start, seg_end),
},
"counts": segment_counts,
}
)
self.logger.info("%s ODS 任务完成: %s", spec.code, total_counts)
allow_empty_advance = bool(self.config.get("run.allow_empty_result_advance", False))
status = "SUCCESS"
if total_counts["fetched"] == 0 and not allow_empty_advance:
status = "PARTIAL"
result = self._build_result(status, total_counts)
overall_start = segments[0][0]
overall_end = segments[-1][1]
result["window"] = {
"start": overall_start,
"end": overall_end,
"minutes": calc_window_minutes(overall_start, overall_end),
}
if total_segments > 1:
result["segments"] = segment_results
if len(params_list) == 1:
result["request_params"] = params_list[0]
else:
result["request_params"] = params_list
return result
except Exception:
self.db.rollback()
total_counts["errors"] += 1
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
raise
def _resolve_window(self, cursor_data: dict | None) -> tuple[datetime, datetime, int]:
base_start, base_end, base_minutes = self._get_time_window(cursor_data)
# 如果用户显式指定了窗口(window_override.start/end),则直接使用,不走 MAX(fetched_at) 兜底
override_start = self.config.get("run.window_override.start")
override_end = self.config.get("run.window_override.end")
if override_start and override_end:
# 用户明确指定了窗口,尊重用户选择
return base_start, base_end, base_minutes
# full_window 模式:直接使用基础窗口,跳过 MAX(fetched_at) 兜底
# 该模式以 API 返回数据的实际时间范围为准,无游标偏移风险
if self.config.get("run.processing_mode") == "full_window":
return base_start, base_end, base_minutes
# 以 ODS 表 MAX(fetched_at) 兜底:避免“窗口游标推进但未实际入库”导致漏数。
last_fetched = self._get_max_fetched_at(self.SPEC.table_name)
if last_fetched:
overlap_seconds = int(self.config.get("run.overlap_seconds", 600) or 600)
cursor_end = cursor_data.get("last_end") if isinstance(cursor_data, dict) else None
anchor = cursor_end or last_fetched
# 如果 cursor_end 比真实入库时间(last_fetched)更靠后,说明游标被推进但表未跟上:改用 last_fetched 作为起点
if isinstance(cursor_end, datetime) and cursor_end.tzinfo is None:
cursor_end = cursor_end.replace(tzinfo=self.tz)
if isinstance(cursor_end, datetime) and cursor_end > last_fetched:
anchor = last_fetched
start = anchor - timedelta(seconds=max(0, overlap_seconds))
if start.tzinfo is None:
start = start.replace(tzinfo=self.tz)
else:
start = start.astimezone(self.tz)
end = datetime.now(self.tz)
minutes = max(1, int((end - start).total_seconds() // 60))
return start, end, minutes
return base_start, base_end, base_minutes
def _get_max_fetched_at(self, table_name: str) -> datetime | None:
try:
rows = self.db.query(f"SELECT MAX(fetched_at) AS mx FROM {table_name}")
except Exception:
return None
if not rows or not rows[0].get("mx"):
return None
mx = rows[0]["mx"]
if not isinstance(mx, datetime):
return None
if mx.tzinfo is None:
return mx.replace(tzinfo=self.tz)
return mx.astimezone(self.tz)
def _build_params(
self,
spec: OdsTaskSpec,
store_id: int,
*,
window_start: datetime,
window_end: datetime,
) -> dict:
base: dict[str, Any] = {}
if spec.include_site_id:
# /TenantGoods/GetGoodsInventoryList 要求 siteId 为数组(标量会触发服务端异常,返回畸形状态行 HTTP/1.1 1400
if spec.endpoint == "/TenantGoods/GetGoodsInventoryList":
base["siteId"] = [store_id]
else:
base["siteId"] = store_id
if spec.requires_window and spec.time_fields:
start_key, end_key = spec.time_fields
base[start_key] = TypeParser.format_timestamp(window_start, self.tz)
base[end_key] = TypeParser.format_timestamp(window_end, self.tz)
params = self._merge_common_params(base)
params.update(spec.extra_params)
return params
# ------------------------------------------------------------------ 结构感知写入ODS 文档 schema
def _get_table_columns(self, table: str) -> list[tuple[str, str, str]]:
cache = getattr(self, "_table_columns_cache", {})
if table in cache:
return cache[table]
if "." in table:
schema, name = table.split(".", 1)
else:
schema, name = "public", table
sql = """
SELECT column_name, data_type, udt_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
"""
with self.db.conn.cursor() as cur:
cur.execute(sql, (schema, name))
cols = [(r[0], (r[1] or "").lower(), (r[2] or "").lower()) for r in cur.fetchall()]
cache[table] = cols
self._table_columns_cache = cache
return cols
def _get_table_pk_columns(self, table: str) -> list[str]:
cache = getattr(self, "_table_pk_cache", {})
if table in cache:
return cache[table]
if "." in table:
schema, name = table.split(".", 1)
else:
schema, name = "public", table
sql = """
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = %s
AND tc.table_name = %s
ORDER BY kcu.ordinal_position
"""
with self.db.conn.cursor() as cur:
cur.execute(sql, (schema, name))
cols = [r[0] for r in cur.fetchall()]
cache[table] = cols
self._table_pk_cache = cache
return cols
def _table_has_column(self, table: str, column: str) -> bool:
col_lower = str(column or "").lower()
return any(c[0].lower() == col_lower for c in self._get_table_columns(table))
def _resolve_snapshot_window_columns(
self, table: str, columns: Sequence[str] | None
) -> list[str]:
if not columns:
return []
col_map = {c[0].lower(): c[0] for c in self._get_table_columns(table)}
resolved: list[str] = []
for col in columns:
if not col:
continue
actual = col_map.get(str(col).lower())
if actual:
resolved.append(actual)
return resolved
@staticmethod
def _coerce_delete_flag(value) -> int | None:
if value is None:
return None
if isinstance(value, bool):
return 1 if value else 0
if isinstance(value, (int, float)):
try:
return 1 if int(value) != 0 else 0
except Exception:
return 1 if value else 0
if isinstance(value, str):
s = value.strip().lower()
if not s:
return None
if s in {"1", "true", "t", "yes", "y"}:
return 1
if s in {"0", "false", "f", "no", "n"}:
return 0
try:
return 1 if int(s) != 0 else 0
except Exception:
return 1 if s else 0
return 1 if value else 0
def _normalize_is_delete_flag(self, record: dict, *, default_if_missing: int | None) -> None:
if not isinstance(record, dict):
return
raw = None
for key in ("is_delete", "is_deleted", "isDelete", "isDeleted"):
if key in record:
raw = record.get(key)
break
candidate = self._get_value_case_insensitive(record, key)
if candidate is not None:
raw = candidate
break
normalized = self._coerce_delete_flag(raw)
if normalized is None:
if default_if_missing is not None:
record["is_delete"] = int(default_if_missing)
return
record["is_delete"] = normalized
@staticmethod
def _normalize_pk_value(value):
if value is None or value == "":
return None
if isinstance(value, str):
parsed = TypeParser.parse_int(value)
if parsed is not None:
return parsed
return value
def _collect_business_keys(
self, records: list, business_pk_cols: Sequence[str]
) -> set[tuple]:
if not records or not business_pk_cols:
return set()
keys: set[tuple] = set()
for rec in records:
if not isinstance(rec, dict):
continue
merged_rec = self._merge_record_layers(rec)
key = tuple(
self._normalize_pk_value(self._get_value_case_insensitive(merged_rec, col))
for col in business_pk_cols
)
if any(v is None or v == "" for v in key):
continue
keys.add(key)
return keys
# CHANGE 2026-02-18 | 新增:从 API 返回记录中提取实际最早时间戳
def _collect_earliest_time(
self, records: list, time_column: str
) -> datetime | None:
"""从一批 API 返回记录中提取 time_column 的最小值。
用于 early-cutoff 保护:当 API 返回数据被截断时,
避免将截断点之前的数据误标为软删除。
"""
if not records or not time_column:
return None
earliest: datetime | None = None
for rec in records:
if not isinstance(rec, dict):
continue
merged = self._merge_record_layers(rec)
raw = self._get_value_case_insensitive(merged, time_column)
if raw is None:
continue
try:
if isinstance(raw, datetime):
ts = raw
elif isinstance(raw, str):
ts = dtparser.parse(raw)
else:
continue
# 确保带时区
if ts.tzinfo is None:
ts = ts.replace(tzinfo=self.tz)
if earliest is None or ts < earliest:
earliest = ts
except (ValueError, TypeError, OverflowError):
continue
return earliest
def _mark_missing_as_deleted(
self,
*,
table: str,
business_pk_cols: Sequence[str],
snapshot_mode: SnapshotMode,
snapshot_time_column: str | None,
window_start: datetime,
window_end: datetime,
key_values: Sequence[tuple],
allow_empty: bool,
) -> int:
"""快照对比软删除INSERT 删除版本行,而非 UPDATE 历史版本。
算法:
1. 查询快照范围内 is_delete != 1 的业务 ID
2. 排除本次抓取到的 key_values → 得到缺失 ID
3. 对每个缺失 ID 读取最新版本行DISTINCT ON + ORDER BY fetched_at DESC
4. 若最新版本已是 is_delete=1 → 跳过(幂等)
5. 否则:复制该行,设 is_delete=1重算 content_hashINSERT 新版本行
"""
if not business_pk_cols:
return 0
if snapshot_mode == SnapshotMode.NONE:
return 0
if not self._table_has_column(table, "is_delete"):
return 0
# WINDOW 模式需要解析时间列
resolved_time_col: str | None = None
if snapshot_mode == SnapshotMode.WINDOW:
if not snapshot_time_column:
return 0
resolved = self._resolve_snapshot_window_columns(table, [snapshot_time_column])
if not resolved:
return 0
resolved_time_col = resolved[0]
pk_cols_sql = ", ".join(f'"{c}"' for c in business_pk_cols)
with self.db.conn.cursor() as cur:
# --- 步骤 1: 查询快照范围内 is_delete != 1 的业务 ID ---
if snapshot_mode == SnapshotMode.FULL_TABLE:
scope_where = '"is_delete" IS DISTINCT FROM 1'
scope_params: list = []
else:
scope_where = (
f'"{resolved_time_col}" >= %s AND "{resolved_time_col}" < %s '
f'AND "is_delete" IS DISTINCT FROM 1'
)
scope_params = [window_start, window_end]
existing_sql = (
f"SELECT DISTINCT {pk_cols_sql} FROM {table} "
f"WHERE {scope_where}"
)
cur.execute(existing_sql, scope_params)
existing_ids: set[tuple] = {tuple(row) for row in cur.fetchall()}
if not existing_ids:
return 0
# --- 步骤 2: 差集 = 数据库中的 ID - 本次抓取到的 key_values ---
fetched_set: set[tuple] = set()
if key_values:
fetched_set = {tuple(k) for k in key_values}
if not fetched_set and not allow_empty:
return 0
missing_ids = existing_ids - fetched_set
if not missing_ids:
return 0
# --- 步骤 3-5: 对每个缺失 ID 读取最新版本、构造删除版本行并 INSERT ---
cols_info = self._get_table_columns(table)
col_names = [c[0] for c in cols_info]
col_index = {c[0].lower(): i for i, c in enumerate(cols_info)}
# 批量查询所有缺失 ID 的最新版本行
pk_where = " AND ".join(f'k."{c}" = t."{c}"' for c in business_pk_cols)
latest_sql = (
f"WITH keys({pk_cols_sql}) AS (VALUES %s) "
f"SELECT DISTINCT ON ({', '.join(f't.\"{c}\"' for c in business_pk_cols)}) "
f"{', '.join(f't.\"{c}\"' for c in col_names)} "
f"FROM {table} t JOIN keys k ON {pk_where} "
f"ORDER BY {', '.join(f't.\"{c}\"' for c in business_pk_cols)}, t.\"fetched_at\" DESC NULLS LAST"
)
missing_list = list(missing_ids)
execute_values(cur, latest_sql, missing_list, page_size=min(len(missing_list), 500))
latest_rows = cur.fetchall() or []
if not latest_rows:
return 0
# 定位关键列索引
is_delete_idx = col_index.get("is_delete")
content_hash_idx = col_index.get("content_hash")
payload_idx = col_index.get("payload")
fetched_at_idx = col_index.get("fetched_at")
if is_delete_idx is None or content_hash_idx is None or payload_idx is None:
return 0
now = datetime.now(self.tz)
insert_rows: list[tuple] = []
# CHANGE [2026-02-20] intent: 识别所有 JSONB 列索引,防止 dict/list 值导致 psycopg2 适配错误
jsonb_col_indices: set[int] = set()
for ci in cols_info:
col_lower = ci[2] # udt_name
if col_lower in ("jsonb", "json"):
idx = col_index.get(ci[0].lower())
if idx is not None:
jsonb_col_indices.add(idx)
for row in latest_rows:
row = list(row)
# 若最新版本已是 is_delete=1 → 跳过(幂等)
if row[is_delete_idx] == 1:
continue
# 复制该行,设 is_delete=1
row[is_delete_idx] = 1
# 重算 content_hash基于原 payload + is_delete=1
original_payload = row[payload_idx]
new_hash = self._compute_content_hash(
record={},
payload=original_payload,
is_delete=1,
)
row[content_hash_idx] = new_hash
# 更新 fetched_at 为当前时间
if fetched_at_idx is not None:
row[fetched_at_idx] = now
# 将所有 JSONB 列的 dict/list 值包装为 Json 以便 psycopg2 正确序列化
for ji in jsonb_col_indices:
val = row[ji]
if isinstance(val, (dict, list)):
row[ji] = Json(
val,
dumps=lambda v: json.dumps(v, ensure_ascii=False),
)
insert_rows.append(tuple(row))
if not insert_rows:
return 0
# INSERT 新版本行ON CONFLICT DO NOTHING 保证幂等
# (若 content_hash 相同说明已存在相同的删除版本行)
quoted_cols = ", ".join(f'"{c}"' for c in col_names)
pk_all = self._get_table_pk_columns(table)
pk_clause = ", ".join(f'"{c}"' for c in pk_all) if pk_all else ""
insert_sql = f"INSERT INTO {table} ({quoted_cols}) VALUES %s"
if pk_clause:
insert_sql += f" ON CONFLICT ({pk_clause}) DO NOTHING"
execute_values(cur, insert_sql, insert_rows, page_size=min(len(insert_rows), 500))
return int(cur.rowcount or 0)
def _insert_records_schema_aware(
self,
*,
table: str,
records: list,
response_payload: dict | list | None,
source_file: str | None,
source_endpoint: str | None,
) -> tuple[int, int, int]:
"""
按 DB 表结构动态写入 ODS。
- 新记录:插入
- 已存在的记录:按冲突策略更新
返回 (inserted, updated, skipped)。
"""
if not records:
return 0, 0, 0
force_full_update = bool(self.config.get("run.force_full_update", False))
cols_info = self._get_table_columns(table)
if not cols_info:
raise ValueError(f"Cannot resolve columns for table={table}")
pk_cols = self._get_table_pk_columns(table)
db_json_cols_lower = {
c[0].lower() for c in cols_info if c[1] in ("json", "jsonb") or c[2] in ("json", "jsonb")
}
needs_content_hash = any(c[0].lower() == "content_hash" for c in cols_info)
has_is_delete = any(c[0].lower() == "is_delete" for c in cols_info)
default_is_delete = (
0 if has_is_delete and bool(self.config.get("run.snapshot_missing_delete", True)) else None
)
col_names = [c[0] for c in cols_info]
quoted_cols = ", ".join(f'\"{c}\"' for c in col_names)
sql = f"INSERT INTO {table} ({quoted_cols}) VALUES %s"
# 冲突处理模式:
# "nothing" - 跳过已存在记录 (DO NOTHING)
# "backfill" - 只回填 NULL 列 (COALESCE)
# "update" - 全字段对比更新 (覆盖所有变化的字段)
conflict_mode = str(self.config.get("run.ods_conflict_mode", "update")).lower()
if pk_cols:
pk_clause = ", ".join(f'\"{c}\"' for c in pk_cols)
if conflict_mode in ("backfill", "update"):
# 排除主键列;正常模式下 fetched_at 保持插入时间不参与更新
pk_cols_lower = {c.lower() for c in pk_cols}
# CHANGE 2026-02-19 | force_full_update 时 fetched_at 也参与更新,
# 确保 hash 相同命中 ON CONFLICT 时刷新 fetched_at 为当前时间
immutable_update_cols: set[str] = set() if force_full_update else {"fetched_at"}
update_cols = [
c for c in col_names
if c.lower() not in pk_cols_lower and c.lower() not in immutable_update_cols
]
# 仅用业务字段判断是否需要更新,避免元数据变化触发全量更新
# payload 参与比较(有变化时更新),其余元数据不触发更新
meta_cols = {"source_file", "source_endpoint", "fetched_at", "content_hash"}
compare_cols = [c for c in update_cols if c.lower() not in meta_cols]
if update_cols:
if conflict_mode == "backfill":
# 回填模式:只填充 NULL 列
set_clause = ", ".join(
f'"{c}" = COALESCE({table}."{c}", EXCLUDED."{c}")'
for c in update_cols
)
where_clause = " OR ".join(f'{table}."{c}" IS NULL' for c in update_cols)
sql += f" ON CONFLICT ({pk_clause}) DO UPDATE SET {set_clause} WHERE {where_clause}"
else:
# update 模式:全字段对比更新
set_clause = ", ".join(
f'"{c}" = EXCLUDED."{c}"'
for c in update_cols
)
# CHANGE 2026-02-18 | force_full_update 时去掉 WHERE 条件,无条件覆盖
if compare_cols and not force_full_update:
where_clause = " OR ".join(
f'{table}."{c}" IS DISTINCT FROM EXCLUDED."{c}"'
for c in compare_cols
)
sql += f" ON CONFLICT ({pk_clause}) DO UPDATE SET {set_clause} WHERE {where_clause}"
else:
sql += f" ON CONFLICT ({pk_clause}) DO UPDATE SET {set_clause}"
else:
sql += f" ON CONFLICT ({pk_clause}) DO NOTHING"
else:
sql += f" ON CONFLICT ({pk_clause}) DO NOTHING"
use_returning = bool(pk_cols)
if use_returning:
sql += " RETURNING (xmax = 0) AS inserted"
now = datetime.now(self.tz)
json_dump = lambda v: json.dumps(v, ensure_ascii=False) # noqa: E731
params: list[tuple] = []
skipped = 0
merged_records: list[dict] = []
root_site_profile = None
if isinstance(response_payload, dict):
data_part = response_payload.get("data")
if isinstance(data_part, dict):
sp = data_part.get("siteProfile") or data_part.get("site_profile")
if isinstance(sp, dict):
root_site_profile = sp
for rec in records:
if not isinstance(rec, dict):
skipped += 1
continue
merged_rec = self._merge_record_layers(rec)
self._normalize_is_delete_flag(merged_rec, default_if_missing=default_is_delete)
merged_records.append({"raw": rec, "merged": merged_rec})
if table in {"ods.recharge_settlements", "ods.settlement_records"}:
site_profile = merged_rec.get("siteProfile") or merged_rec.get("site_profile") or root_site_profile
if isinstance(site_profile, dict):
# 避免写入 None 覆盖原本存在的 camelCase 字段(例如 tenantId/siteId/siteName
def _fill_missing(target_col: str, candidates: list[Any]):
existing = self._get_value_case_insensitive(merged_rec, target_col)
if existing not in (None, ""):
return
for cand in candidates:
if cand in (None, "", 0):
continue
merged_rec[target_col] = cand
return
_fill_missing("tenantid", [site_profile.get("tenant_id"), site_profile.get("tenantId")])
_fill_missing("siteid", [site_profile.get("siteId"), site_profile.get("id")])
_fill_missing("sitename", [site_profile.get("shop_name"), site_profile.get("siteName")])
business_keys = [c for c in pk_cols if str(c).lower() != "content_hash"]
# P2(A): 使用 spec 上的显式开关控制去重,不再隐式依赖 has_fetched_at
# CHANGE 2026-02-19 | force_full_update 时仍查最新 hash用于判断是否回退到历史版本
# 但不 skip——所有记录都走 INSERT避免 (id, old_hash) 命中历史行而丢失新版本
compare_latest = bool(
self.SPEC.skip_unchanged
and needs_content_hash
and business_keys
)
latest_compare_hash: dict[tuple[Any, ...], str | None] = {}
if compare_latest:
key_values: list[tuple[Any, ...]] = []
for item in merged_records:
merged_rec = item["merged"]
key = tuple(self._get_value_case_insensitive(merged_rec, k) for k in business_keys)
if any(v is None or v == "" for v in key):
continue
key_values.append(key)
if key_values:
with self.db.conn.cursor() as cur:
latest_hashes = self._fetch_latest_content_hashes(cur, table, business_keys, key_values)
for key, value in latest_hashes.items():
latest_compare_hash[key] = value
for item in merged_records:
rec = item["raw"]
merged_rec = item["merged"]
content_hash = None
compare_hash = None
if needs_content_hash:
# 基于原始 payload + is_delete 计算 hashis_delete 已由 _normalize_is_delete_flag 标准化)
compare_hash = self._compute_content_hash(
merged_rec,
payload=rec,
is_delete=merged_rec.get("is_delete", 0),
)
content_hash = compare_hash
if pk_cols:
missing_pk = False
for pk in pk_cols:
if str(pk).lower() == "content_hash":
continue
pk_val = self._get_value_case_insensitive(merged_rec, pk)
if pk_val is None or pk_val == "":
missing_pk = True
break
if missing_pk:
skipped += 1
continue
if compare_latest and compare_hash is not None:
key = tuple(self._get_value_case_insensitive(merged_rec, k) for k in business_keys)
if any(v is None or v == "" for v in key):
skipped += 1
continue
last_hash = latest_compare_hash.get(key)
if last_hash is not None and last_hash == compare_hash:
if force_full_update:
# CHANGE 2026-02-19 | hash 与最新版本相同,走 ON CONFLICT UPDATE 更新 fetched_at
pass
else:
skipped += 1
continue
row_vals: list[Any] = []
for (col_name, data_type, _udt) in cols_info:
col_lower = col_name.lower()
if col_lower == "payload":
row_vals.append(Json(rec, dumps=json_dump))
continue
if col_lower == "source_file":
row_vals.append(source_file)
continue
if col_lower == "source_endpoint":
row_vals.append(source_endpoint)
continue
if col_lower == "fetched_at":
row_vals.append(now)
continue
if col_lower == "content_hash":
row_vals.append(content_hash)
continue
value = self._normalize_scalar(self._get_value_case_insensitive(merged_rec, col_name))
if col_lower in db_json_cols_lower:
row_vals.append(Json(value, dumps=json_dump) if value is not None else None)
continue
row_vals.append(self._cast_value(value, data_type))
params.append(tuple(row_vals))
if not params:
return 0, 0, skipped
inserted = 0
updated = 0
chunk_size = int(self.config.get("run.ods_execute_values_page_size", 200) or 200)
chunk_size = max(1, min(chunk_size, 2000))
with self.db.conn.cursor() as cur:
for i in range(0, len(params), chunk_size):
chunk = params[i : i + chunk_size]
if use_returning:
rows = execute_values(cur, sql, chunk, page_size=len(chunk), fetch=True)
ins, upd = self._count_returning_flags(rows or [])
inserted += ins
updated += upd
# ON CONFLICT ... DO UPDATE ... WHERE 只会返回“真正受影响”的行。
# 其余未变化/冲突跳过的行需要计入 skipped避免 fetched 与分项不闭合。
affected = len(rows or [])
if affected < len(chunk):
skipped += (len(chunk) - affected)
else:
execute_values(cur, sql, chunk, page_size=len(chunk))
if cur.rowcount is not None and cur.rowcount > 0:
inserted += int(cur.rowcount)
if cur.rowcount < len(chunk):
skipped += (len(chunk) - int(cur.rowcount))
elif cur.rowcount == 0:
skipped += len(chunk)
return inserted, updated, skipped
@staticmethod
def _count_returning_flags(rows: Iterable[Any]) -> tuple[int, int]:
"""Count inserted vs updated from RETURNING (xmax = 0) rows."""
inserted = 0
updated = 0
for row in rows or []:
if isinstance(row, dict):
flag = row.get("inserted")
else:
flag = row[0] if row else None
if flag:
inserted += 1
else:
updated += 1
return inserted, updated
@staticmethod
def _merge_record_layers(record: dict) -> dict:
merged = record
data_part = merged.get("data")
while isinstance(data_part, dict):
merged = {**data_part, **merged}
data_part = data_part.get("data")
settle_inner = merged.get("settleList")
if isinstance(settle_inner, dict):
merged = {**settle_inner, **merged}
return merged
@staticmethod
def _get_value_case_insensitive(record: dict | None, col: str | None):
if record is None or col is None:
return None
if col in record:
return record.get(col)
col_lower = col.lower()
for k, v in record.items():
if isinstance(k, str) and k.lower() == col_lower:
return v
return None
@staticmethod
def _normalize_scalar(value):
if value == "" or value == "{}" or value == "[]":
return None
return value
@staticmethod
def _cast_value(value, data_type: str):
if value is None:
return None
dt = (data_type or "").lower()
if dt == "boolean":
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.lower() in ("true", "1", "yes", "t")
return bool(value)
if dt in ("integer", "bigint", "smallint"):
if isinstance(value, bool):
return int(value)
try:
return int(value)
except Exception:
return None
if dt in ("numeric", "double precision", "real", "decimal"):
if isinstance(value, bool):
return int(value)
try:
return float(value)
except Exception:
return None
if dt.startswith("timestamp") or dt in ("date", "time", "interval"):
return value if isinstance(value, (str, datetime)) else None
return value
def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None:
resolver = getattr(self.api, "get_source_hint", None)
if callable(resolver):
return resolver(spec.endpoint)
return None
@staticmethod
def _hash_default(value):
if isinstance(value, datetime):
return value.isoformat()
return str(value)
@classmethod
def _compute_content_hash(cls, record: dict, payload: Any, is_delete: int) -> str:
"""基于原始 payload 和 is_delete 计算 content_hash。
payload: 原始 API 返回的 JSON 对象(未展平)
is_delete: 0 或 1
"""
payload_str = json.dumps(
payload,
ensure_ascii=False,
sort_keys=True,
separators=(",", ":"),
default=cls._hash_default,
)
raw = f"{payload_str}|{is_delete}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
@staticmethod
def _compute_compare_hash_from_payload(payload: Any) -> str | None:
if payload is None:
return None
if isinstance(payload, str):
try:
payload = json.loads(payload)
except Exception:
return None
if not isinstance(payload, dict):
return None
merged = BaseOdsTask._merge_record_layers(payload)
return BaseOdsTask._compute_content_hash(
merged,
payload=payload,
is_delete=merged.get("is_delete", 0),
)
@staticmethod
def _fetch_latest_content_hashes(
cur, table: str, business_keys: Sequence[str], key_values: Sequence[tuple]
) -> dict:
if not business_keys or not key_values:
return {}
keys_sql = ", ".join(f'"{k}"' for k in business_keys)
sql = (
f"WITH keys({keys_sql}) AS (VALUES %s) "
f"SELECT DISTINCT ON ({keys_sql}) {keys_sql}, content_hash "
f"FROM {table} t JOIN keys k USING ({keys_sql}) "
f"ORDER BY {keys_sql}, fetched_at DESC NULLS LAST"
)
unique_keys = list({tuple(k) for k in key_values})
execute_values(cur, sql, unique_keys, page_size=500)
rows = cur.fetchall() or []
result = {}
if rows and isinstance(rows[0], dict):
for r in rows:
key = tuple(r[k] for k in business_keys)
result[key] = r.get("content_hash")
return result
key_len = len(business_keys)
for r in rows:
key = tuple(r[:key_len])
value = r[key_len] if len(r) > key_len else None
result[key] = value
return result
def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec:
return ColumnSpec(
column=name,
sources=sources,
required=required,
transform=TypeParser.parse_int,
)
def _decimal_col(name: str, *sources: str) -> ColumnSpec:
"""??????????????"""
return ColumnSpec(
column=name,
sources=sources,
transform=lambda v: TypeParser.parse_decimal(v, 2),
)
def _bool_col(name: str, *sources: str) -> ColumnSpec:
"""??????????????0/1?true/false ???"""
def _to_bool(value):
if value is None:
return None
if isinstance(value, bool):
return value
s = str(value).strip().lower()
if s in {"1", "true", "t", "yes", "y"}:
return True
if s in {"0", "false", "f", "no", "n"}:
return False
return bool(value)
return ColumnSpec(column=name, sources=sources, transform=_to_bool)
ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = (
OdsTaskSpec(
code="ODS_ASSISTANT_ACCOUNT",
class_name="OdsAssistantAccountsTask",
table_name="ods.assistant_accounts_master",
endpoint="/PersonnelManagement/SearchAssistantInfo",
data_path=("data",),
list_key="assistantInfos",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
snapshot_mode=SnapshotMode.FULL_TABLE,
description="助教账号档案 ODSSearchAssistantInfo -> assistantInfos 原始 JSON",
),
OdsTaskSpec(
code="ODS_SETTLEMENT_RECORDS",
class_name="OdsOrderSettleTask",
table_name="ods.settlement_records",
endpoint="/Site/GetAllOrderSettleList",
data_path=("data",),
list_key="settleList",
time_fields=("rangeStartTime", "rangeEndTime"),
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=True,
include_fetched_at=False,
include_record_index=True,
requires_window=True,
description="结账记录 ODSGetAllOrderSettleList -> settleList 原始 JSON",
),
OdsTaskSpec(
code="ODS_TABLE_USE",
class_name="OdsTableUseTask",
table_name="ods.table_fee_transactions",
endpoint="/Site/GetSiteTableOrderDetails",
data_path=("data",),
list_key="siteTableUseDetailsList",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="create_time",
description="台费计费流水 ODSGetSiteTableOrderDetails -> siteTableUseDetailsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_ASSISTANT_LEDGER",
class_name="OdsAssistantLedgerTask",
table_name="ods.assistant_service_records",
endpoint="/AssistantPerformance/GetOrderAssistantDetails",
data_path=("data",),
list_key="orderAssistantDetails",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="create_time",
description="助教服务流水 ODSGetOrderAssistantDetails -> orderAssistantDetails 原始 JSON",
),
OdsTaskSpec(
code="ODS_STORE_GOODS_SALES",
class_name="OdsGoodsLedgerTask",
table_name="ods.store_goods_sales_records",
endpoint="/TenantGoods/GetGoodsSalesList",
data_path=("data",),
list_key="orderGoodsLedgers",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="create_time",
description="门店商品销售流水 ODSGetGoodsSalesList -> orderGoodsLedgers 原始 JSON",
),
OdsTaskSpec(
code="ODS_PAYMENT",
class_name="OdsPaymentTask",
table_name="ods.payment_transactions",
endpoint="/PayLog/GetPayLogListPage",
data_path=("data",),
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
description="支付流水 ODSGetPayLogListPage 原始 JSON",
),
OdsTaskSpec(
code="ODS_REFUND",
class_name="OdsRefundTask",
table_name="ods.refund_transactions",
endpoint="/Order/GetRefundPayLogList",
data_path=("data",),
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="pay_time",
description="退款流水 ODSGetRefundPayLogList 原始 JSON",
),
OdsTaskSpec(
code="ODS_PLATFORM_COUPON",
class_name="OdsCouponVerifyTask",
table_name="ods.platform_coupon_redemption_records",
endpoint="/Promotion/GetOfflineCouponConsumePageList",
data_path=("data",),
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="consume_time",
description="平台/团购券核销 ODSGetOfflineCouponConsumePageList 原始 JSON",
),
OdsTaskSpec(
code="ODS_MEMBER",
class_name="OdsMemberTask",
table_name="ods.member_profiles",
endpoint="/MemberProfile/GetTenantMemberList",
data_path=("data",),
list_key="tenantMemberInfos",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
description="会员档案 ODSGetTenantMemberList -> tenantMemberInfos 原始 JSON",
),
OdsTaskSpec(
code="ODS_MEMBER_CARD",
class_name="OdsMemberCardTask",
table_name="ods.member_stored_value_cards",
endpoint="/MemberProfile/GetTenantMemberCardList",
data_path=("data",),
list_key="tenantMemberCards",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.FULL_TABLE,
description="会员储值卡 ODSGetTenantMemberCardList -> tenantMemberCards 原始 JSON",
),
OdsTaskSpec(
code="ODS_MEMBER_BALANCE",
class_name="OdsMemberBalanceTask",
table_name="ods.member_balance_changes",
endpoint="/MemberProfile/GetMemberCardBalanceChange",
data_path=("data",),
list_key="tenantMemberCardLogs",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="create_time",
description="会员余额变动 ODSGetMemberCardBalanceChange -> tenantMemberCardLogs 原始 JSON",
),
OdsTaskSpec(
code="ODS_RECHARGE_SETTLE",
class_name="OdsRechargeSettleTask",
table_name="ods.recharge_settlements",
endpoint="/Site/GetRechargeSettleList",
data_path=("data",),
list_key="settleList",
time_fields=("rangeStartTime", "rangeEndTime"),
pk_columns=(_int_col("recharge_order_id", "settleList.id", "id", required=True),),
extra_columns=(
_int_col("tenant_id", "settleList.tenantId", "tenantId"),
_int_col("site_id", "settleList.siteId", "siteId", "siteProfile.id"),
ColumnSpec("site_name_snapshot", sources=("siteProfile.shop_name", "settleList.siteName")),
_int_col("member_id", "settleList.memberId", "memberId"),
ColumnSpec("member_name_snapshot", sources=("settleList.memberName", "memberName")),
ColumnSpec("member_phone_snapshot", sources=("settleList.memberPhone", "memberPhone")),
_int_col("tenant_member_card_id", "settleList.tenantMemberCardId", "tenantMemberCardId"),
ColumnSpec("member_card_type_name", sources=("settleList.memberCardTypeName", "memberCardTypeName")),
_int_col("settle_relate_id", "settleList.settleRelateId", "settleRelateId"),
_int_col("settle_type", "settleList.settleType", "settleType"),
ColumnSpec("settle_name", sources=("settleList.settleName", "settleName")),
_int_col("is_first", "settleList.isFirst", "isFirst"),
_int_col("settle_status", "settleList.settleStatus", "settleStatus"),
_decimal_col("pay_amount", "settleList.payAmount", "payAmount"),
_decimal_col("refund_amount", "settleList.refundAmount", "refundAmount"),
_decimal_col("point_amount", "settleList.pointAmount", "pointAmount"),
_decimal_col("cash_amount", "settleList.cashAmount", "cashAmount"),
_decimal_col("online_amount", "settleList.onlineAmount", "onlineAmount"),
_decimal_col("balance_amount", "settleList.balanceAmount", "balanceAmount"),
_decimal_col("card_amount", "settleList.cardAmount", "cardAmount"),
_decimal_col("coupon_amount", "settleList.couponAmount", "couponAmount"),
_decimal_col("recharge_card_amount", "settleList.rechargeCardAmount", "rechargeCardAmount"),
_decimal_col("gift_card_amount", "settleList.giftCardAmount", "giftCardAmount"),
_decimal_col("prepay_money", "settleList.prepayMoney", "prepayMoney"),
_decimal_col("consume_money", "settleList.consumeMoney", "consumeMoney"),
_decimal_col("goods_money", "settleList.goodsMoney", "goodsMoney"),
_decimal_col("real_goods_money", "settleList.realGoodsMoney", "realGoodsMoney"),
_decimal_col("table_charge_money", "settleList.tableChargeMoney", "tableChargeMoney"),
_decimal_col("service_money", "settleList.serviceMoney", "serviceMoney"),
_decimal_col("activity_discount", "settleList.activityDiscount", "activityDiscount"),
_decimal_col("all_coupon_discount", "settleList.allCouponDiscount", "allCouponDiscount"),
_decimal_col("goods_promotion_money", "settleList.goodsPromotionMoney", "goodsPromotionMoney"),
_decimal_col("assistant_promotion_money", "settleList.assistantPromotionMoney", "assistantPromotionMoney"),
_decimal_col("assistant_pd_money", "settleList.assistantPdMoney", "assistantPdMoney"),
_decimal_col("assistant_cx_money", "settleList.assistantCxMoney", "assistantCxMoney"),
_decimal_col("assistant_manual_discount", "settleList.assistantManualDiscount", "assistantManualDiscount"),
_decimal_col("coupon_sale_amount", "settleList.couponSaleAmount", "couponSaleAmount"),
_decimal_col("member_discount_amount", "settleList.memberDiscountAmount", "memberDiscountAmount"),
_decimal_col("point_discount_price", "settleList.pointDiscountPrice", "pointDiscountPrice"),
_decimal_col("point_discount_cost", "settleList.pointDiscountCost", "pointDiscountCost"),
_decimal_col("adjust_amount", "settleList.adjustAmount", "adjustAmount"),
_decimal_col("rounding_amount", "settleList.roundingAmount", "roundingAmount"),
_int_col("payment_method", "settleList.paymentMethod", "paymentMethod"),
_bool_col("can_be_revoked", "settleList.canBeRevoked", "canBeRevoked"),
_bool_col("is_bind_member", "settleList.isBindMember", "isBindMember"),
_bool_col("is_activity", "settleList.isActivity", "isActivity"),
_bool_col("is_use_coupon", "settleList.isUseCoupon", "isUseCoupon"),
_bool_col("is_use_discount", "settleList.isUseDiscount", "isUseDiscount"),
_int_col("operator_id", "settleList.operatorId", "operatorId"),
ColumnSpec("operator_name_snapshot", sources=("settleList.operatorName", "operatorName")),
_int_col("salesman_user_id", "settleList.salesManUserId", "salesmanUserId", "salesManUserId"),
ColumnSpec("salesman_name", sources=("settleList.salesManName", "salesmanName", "settleList.salesmanName")),
ColumnSpec("order_remark", sources=("settleList.orderRemark", "orderRemark")),
_int_col("table_id", "settleList.tableId", "tableId"),
_int_col("serial_number", "settleList.serialNumber", "serialNumber"),
_int_col("revoke_order_id", "settleList.revokeOrderId", "revokeOrderId"),
ColumnSpec("revoke_order_name", sources=("settleList.revokeOrderName", "revokeOrderName")),
ColumnSpec("revoke_time", sources=("settleList.revokeTime", "revokeTime")),
ColumnSpec("create_time", sources=("settleList.createTime", "createTime")),
ColumnSpec("pay_time", sources=("settleList.payTime", "payTime")),
ColumnSpec("site_profile", sources=("siteProfile",)),
),
include_source_endpoint=True,
include_fetched_at=True,
include_record_index=False,
requires_window=True,
description="?????? ODS?GetRechargeSettleList -> data.settleList ????",
),
OdsTaskSpec(
code="ODS_GROUP_PACKAGE",
class_name="OdsPackageTask",
table_name="ods.group_buy_packages",
endpoint="/PackageCoupon/QueryPackageCouponList",
data_path=("data",),
list_key="packageCouponList",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.FULL_TABLE,
description="团购套餐定义 ODSQueryPackageCouponList -> packageCouponList 原始 JSON",
),
OdsTaskSpec(
code="ODS_GROUP_BUY_REDEMPTION",
class_name="OdsGroupBuyRedemptionTask",
table_name="ods.group_buy_redemption_records",
endpoint="/Site/GetSiteTableUseDetails",
data_path=("data",),
list_key="siteTableUseDetailsList",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="create_time",
description="团购套餐核销 ODSGetSiteTableUseDetails -> siteTableUseDetailsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_INVENTORY_STOCK",
class_name="OdsInventoryStockTask",
table_name="ods.goods_stock_summary",
endpoint="/TenantGoods/GetGoodsStockReport",
data_path=("data",),
pk_columns=(_int_col("sitegoodsid", "siteGoodsId", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=True,
time_fields=("startTime", "endTime"),
description="库存汇总 ODSGetGoodsStockReport 原始 JSON",
),
OdsTaskSpec(
code="ODS_INVENTORY_CHANGE",
class_name="OdsInventoryChangeTask",
table_name="ods.goods_stock_movements",
endpoint="/GoodsStockManage/QueryGoodsOutboundReceipt",
data_path=("data",),
list_key="queryDeliveryRecordsList",
pk_columns=(_int_col("sitegoodsstockid", "siteGoodsStockId", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
description="库存变化记录 ODSQueryGoodsOutboundReceipt -> queryDeliveryRecordsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_TABLES",
class_name="OdsTablesTask",
table_name="ods.site_tables_master",
endpoint="/Table/GetSiteTables",
data_path=("data",),
list_key="siteTables",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
description="台桌维表 ODSGetSiteTables -> siteTables 原始 JSON",
),
OdsTaskSpec(
code="ODS_GOODS_CATEGORY",
class_name="OdsGoodsCategoryTask",
table_name="ods.stock_goods_category_tree",
endpoint="/TenantGoodsCategory/QueryPrimarySecondaryCategory",
data_path=("data",),
list_key="goodsCategoryList",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
description="库存商品分类鏍?ODSQueryPrimarySecondaryCategory -> goodsCategoryList 原始 JSON",
),
OdsTaskSpec(
code="ODS_STORE_GOODS",
class_name="OdsStoreGoodsTask",
table_name="ods.store_goods_master",
endpoint="/TenantGoods/GetGoodsInventoryList",
data_path=("data",),
list_key="orderGoodsList",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.FULL_TABLE,
description="门店商品档案 ODSGetGoodsInventoryList -> orderGoodsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_TABLE_FEE_DISCOUNT",
class_name="OdsTableDiscountTask",
table_name="ods.table_fee_discount_records",
endpoint="/Site/GetTaiFeeAdjustList",
data_path=("data",),
list_key="taiFeeAdjustInfos",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.WINDOW,
snapshot_time_column="create_time",
description="台费折扣/调账 ODSGetTaiFeeAdjustList -> taiFeeAdjustInfos 原始 JSON",
),
OdsTaskSpec(
code="ODS_TENANT_GOODS",
class_name="OdsTenantGoodsTask",
table_name="ods.tenant_goods_master",
endpoint="/TenantGoods/QueryTenantGoods",
data_path=("data",),
list_key="tenantGoodsList",
pk_columns=(_int_col("id", "id", required=True),),
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
snapshot_mode=SnapshotMode.FULL_TABLE,
description="租户商品档案 ODSQueryTenantGoods -> tenantGoodsList 原始 JSON",
),
OdsTaskSpec(
code="ODS_STAFF_INFO",
class_name="OdsStaffInfoTask",
table_name="ods.staff_info_master",
endpoint="/PersonnelManagement/SearchSystemStaffInfo",
data_path=("data",),
list_key="staffProfiles",
pk_columns=(_int_col("id", "id", required=True),),
extra_params={
"workStatusEnum": 0,
"dingTalkSynced": 0,
"staffIdentity": 0,
"rankId": 0,
"criticismStatus": 0,
"signStatus": -1,
},
include_source_endpoint=False,
include_fetched_at=False,
include_record_index=True,
requires_window=False,
time_fields=None,
snapshot_mode=SnapshotMode.FULL_TABLE,
description="员工档案 ODSSearchSystemStaffInfo -> staffProfiles 原始 JSON",
),
)
def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]:
attrs = {
"SPEC": spec,
"__doc__": spec.description or f"ODS ingestion task {spec.code}",
"__module__": __name__,
}
return type(spec.class_name, (BaseOdsTask,), attrs)
ENABLED_ODS_CODES = {
"ODS_ASSISTANT_ACCOUNT",
"ODS_ASSISTANT_LEDGER",
"ODS_INVENTORY_CHANGE",
"ODS_INVENTORY_STOCK",
"ODS_GROUP_PACKAGE",
"ODS_GROUP_BUY_REDEMPTION",
"ODS_MEMBER",
"ODS_MEMBER_BALANCE",
"ODS_MEMBER_CARD",
"ODS_PAYMENT",
"ODS_REFUND",
"ODS_PLATFORM_COUPON",
"ODS_RECHARGE_SETTLE",
"ODS_TABLE_USE",
"ODS_TABLES",
"ODS_GOODS_CATEGORY",
"ODS_STORE_GOODS",
"ODS_TABLE_FEE_DISCOUNT",
"ODS_STORE_GOODS_SALES",
"ODS_TENANT_GOODS",
"ODS_SETTLEMENT_RECORDS",
"ODS_STAFF_INFO",
}
ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = {
spec.code: _build_task_class(spec)
for spec in ODS_TASK_SPECS
if spec.code in ENABLED_ODS_CODES
}
__all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask", "ENABLED_ODS_CODES"]