更新20260201-1
This commit is contained in:
@@ -522,9 +522,9 @@ class DwdLoadTask(BaseTask):
|
||||
continue
|
||||
|
||||
if self._table_base(dwd_table).startswith("dim_"):
|
||||
processed = self._merge_dim(cur, dwd_table, ods_table, dwd_cols, ods_cols, now)
|
||||
dim_counts = self._merge_dim(cur, dwd_table, ods_table, dwd_cols, ods_cols, now)
|
||||
self.db.conn.commit()
|
||||
summary.append({"table": dwd_table, "mode": "SCD2", "processed": processed})
|
||||
summary.append({"table": dwd_table, "mode": "SCD2", **dim_counts})
|
||||
else:
|
||||
dwd_types = self._get_column_types(cur, dwd_table, "billiards_dwd")
|
||||
ods_types = self._get_column_types(cur, ods_table, "billiards_ods")
|
||||
@@ -532,7 +532,7 @@ class DwdLoadTask(BaseTask):
|
||||
self.config.get("run.window_override.start")
|
||||
and self.config.get("run.window_override.end")
|
||||
)
|
||||
inserted = self._merge_fact_increment(
|
||||
fact_counts = self._merge_fact_increment(
|
||||
cur,
|
||||
dwd_table,
|
||||
ods_table,
|
||||
@@ -544,7 +544,7 @@ class DwdLoadTask(BaseTask):
|
||||
window_end=context.window_end if use_window else None,
|
||||
)
|
||||
self.db.conn.commit()
|
||||
summary.append({"table": dwd_table, "mode": "INCREMENT", "inserted": inserted})
|
||||
summary.append({"table": dwd_table, "mode": "INCREMENT", **fact_counts})
|
||||
|
||||
elapsed = time.monotonic() - started
|
||||
self.logger.info("DWD 装载完成:%s,用时 %.2fs", dwd_table, elapsed)
|
||||
@@ -675,7 +675,7 @@ class DwdLoadTask(BaseTask):
|
||||
dwd_cols: Sequence[str],
|
||||
ods_cols: Sequence[str],
|
||||
now: datetime,
|
||||
) -> int:
|
||||
) -> Dict[str, int]:
|
||||
"""
|
||||
维表合并策略:
|
||||
- 若主键包含 scd2 列(如 scd2_start_time/scd2_version),执行真正的 SCD2(关闭旧版+插入新版)。
|
||||
@@ -699,8 +699,8 @@ class DwdLoadTask(BaseTask):
|
||||
ods_cols: Sequence[str],
|
||||
pk_cols: Sequence[str],
|
||||
now: datetime,
|
||||
) -> int:
|
||||
"""维表 Type1 Upsert(主键冲突则更新),兼容带 scd2 字段但主键不支持多版本的表。"""
|
||||
) -> Dict[str, int]:
|
||||
"""维表 Type1 Upsert(主键冲突则更新),返回真实新增/更新计数。"""
|
||||
mapping = self._build_column_mapping(dwd_table, pk_cols, ods_cols)
|
||||
ods_set = {c.lower() for c in ods_cols}
|
||||
ods_table_sql = self._format_table(ods_table, "billiards_ods")
|
||||
@@ -731,7 +731,7 @@ class DwdLoadTask(BaseTask):
|
||||
added.add(lc)
|
||||
|
||||
if not select_exprs:
|
||||
return 0
|
||||
return {"processed": 0, "inserted": 0, "updated": 0, "skipped": 0}
|
||||
|
||||
order_col = self._pick_snapshot_order_column(ods_cols)
|
||||
business_keys = self._strip_scd2_keys(pk_cols)
|
||||
@@ -768,7 +768,7 @@ class DwdLoadTask(BaseTask):
|
||||
src_rows.append(row)
|
||||
|
||||
if not src_rows:
|
||||
return 0
|
||||
return {"processed": 0, "inserted": 0, "updated": 0, "skipped": 0}
|
||||
|
||||
dwd_table_sql = self._format_table(dwd_table, "billiards_dwd")
|
||||
sorted_cols = [c.lower() for c in sorted(dwd_cols)]
|
||||
@@ -802,12 +802,19 @@ class DwdLoadTask(BaseTask):
|
||||
else:
|
||||
set_exprs.append(f'\"{c}\" = EXCLUDED.\"{c}\"')
|
||||
|
||||
compare_cols = [c for c in sorted_cols if c not in pk_lower_set and c not in self.SCD_COLS]
|
||||
diff_exprs = [f'{dwd_table_sql}."{c}" IS DISTINCT FROM EXCLUDED."{c}"' for c in compare_cols]
|
||||
where_clause = f" WHERE {' OR '.join(diff_exprs)}" if diff_exprs else ""
|
||||
upsert_sql = (
|
||||
f"INSERT INTO {dwd_table_sql} ({insert_cols_sql}) VALUES %s "
|
||||
f"ON CONFLICT ({pk_sql}) DO UPDATE SET {', '.join(set_exprs)}"
|
||||
f"ON CONFLICT ({pk_sql}) DO UPDATE SET {', '.join(set_exprs)}{where_clause} "
|
||||
f"RETURNING (xmax = 0) AS inserted"
|
||||
)
|
||||
execute_values(cur, upsert_sql, [build_row(r) for r in src_rows], page_size=500)
|
||||
return len(src_rows)
|
||||
rows = execute_values(cur, upsert_sql, [build_row(r) for r in src_rows], page_size=500, fetch=True)
|
||||
inserted, updated = self._count_returning_flags(rows or [])
|
||||
processed = len(src_rows)
|
||||
skipped = max(0, processed - inserted - updated)
|
||||
return {"processed": processed, "inserted": inserted, "updated": updated, "skipped": skipped}
|
||||
|
||||
def _merge_dim_scd2(
|
||||
self,
|
||||
@@ -817,7 +824,7 @@ class DwdLoadTask(BaseTask):
|
||||
dwd_cols: Sequence[str],
|
||||
ods_cols: Sequence[str],
|
||||
now: datetime,
|
||||
) -> int:
|
||||
) -> Dict[str, int]:
|
||||
"""对维表执行 SCD2 合并:对比变更关闭旧版并插入新版。"""
|
||||
pk_cols = self._get_primary_keys(cur, dwd_table)
|
||||
if not pk_cols:
|
||||
@@ -860,7 +867,7 @@ class DwdLoadTask(BaseTask):
|
||||
added.add(lc)
|
||||
|
||||
if not select_exprs:
|
||||
return 0
|
||||
return {"processed": 0, "inserted": 0, "updated": 0, "skipped": 0}
|
||||
|
||||
order_col = self._pick_snapshot_order_column(ods_cols)
|
||||
key_exprs: list[str] = []
|
||||
@@ -906,7 +913,7 @@ class DwdLoadTask(BaseTask):
|
||||
src_rows_by_pk[pk_key] = mapped_row
|
||||
|
||||
if not src_rows_by_pk:
|
||||
return 0
|
||||
return {"processed": 0, "inserted": 0, "updated": 0, "skipped": 0}
|
||||
|
||||
# 预加载当前版本(scd2_is_current=1),避免逐行 SELECT 造成大量 round-trip
|
||||
table_sql_dwd = self._format_table(dwd_table, "billiards_dwd")
|
||||
@@ -941,7 +948,11 @@ class DwdLoadTask(BaseTask):
|
||||
if to_insert:
|
||||
self._insert_dim_rows_bulk(cur, dwd_table, dwd_cols, to_insert, now)
|
||||
|
||||
return len(src_rows_by_pk)
|
||||
processed = len(src_rows_by_pk)
|
||||
updated = len(to_close)
|
||||
inserted = max(0, len(to_insert) - updated)
|
||||
skipped = max(0, processed - inserted - updated)
|
||||
return {"processed": processed, "inserted": inserted, "updated": updated, "skipped": skipped}
|
||||
|
||||
def _close_current_dim_bulk(
|
||||
self,
|
||||
@@ -1129,9 +1140,13 @@ class DwdLoadTask(BaseTask):
|
||||
value = datetime.combine(value, datetime.min.time())
|
||||
if not isinstance(value, datetime):
|
||||
return value
|
||||
if value.tzinfo is None:
|
||||
return value.replace(tzinfo=self.tz)
|
||||
return value.astimezone(self.tz)
|
||||
try:
|
||||
if value.tzinfo is None:
|
||||
return value.replace(tzinfo=self.tz)
|
||||
return value.astimezone(self.tz)
|
||||
except (OverflowError, OSError):
|
||||
# 极端日期值(如 9999-12-31)无法转换时区,直接返回原值
|
||||
return value
|
||||
|
||||
def _looks_numeric(self, value: Any) -> bool:
|
||||
if isinstance(value, (int, float, Decimal)) and not isinstance(value, bool):
|
||||
@@ -1184,6 +1199,22 @@ class DwdLoadTask(BaseTask):
|
||||
return False
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _count_returning_flags(rows: Iterable[Any]) -> tuple[int, int]:
|
||||
"""Count inserted vs updated from RETURNING (xmax = 0) rows."""
|
||||
inserted = 0
|
||||
updated = 0
|
||||
for row in rows:
|
||||
if isinstance(row, dict):
|
||||
flag = row.get("inserted")
|
||||
else:
|
||||
flag = row[0] if row else None
|
||||
if flag:
|
||||
inserted += 1
|
||||
else:
|
||||
updated += 1
|
||||
return inserted, updated
|
||||
|
||||
def _merge_fact_increment(
|
||||
self,
|
||||
cur,
|
||||
@@ -1195,8 +1226,8 @@ class DwdLoadTask(BaseTask):
|
||||
ods_types: Dict[str, str],
|
||||
window_start: datetime | None = None,
|
||||
window_end: datetime | None = None,
|
||||
) -> int:
|
||||
"""事实表按时间增量插入,默认按列名交集写入。"""
|
||||
) -> Dict[str, int]:
|
||||
"""事实表按时间增量插入,返回真实新增/更新计数。"""
|
||||
mapping_entries = self.FACT_MAPPINGS.get(dwd_table) or []
|
||||
mapping: Dict[str, tuple[str, str | None]] = {
|
||||
dst.lower(): (src, cast_type) for dst, src, cast_type in mapping_entries
|
||||
@@ -1306,18 +1337,31 @@ class DwdLoadTask(BaseTask):
|
||||
set_exprs = [f'"{c}" = EXCLUDED."{c}"' for c in insert_cols if c.lower() not in pk_lower]
|
||||
if snapshot_mode or fact_upsert:
|
||||
if set_exprs:
|
||||
sql += f" ON CONFLICT ({pk_sql}) DO UPDATE SET {', '.join(set_exprs)}"
|
||||
compare_cols = [c for c in insert_cols if c.lower() not in pk_lower]
|
||||
diff_exprs = [f'{dwd_table_sql}."{c}" IS DISTINCT FROM EXCLUDED."{c}"' for c in compare_cols]
|
||||
where_clause = f" WHERE {' OR '.join(diff_exprs)}" if diff_exprs else ""
|
||||
sql += f" ON CONFLICT ({pk_sql}) DO UPDATE SET {', '.join(set_exprs)}{where_clause}"
|
||||
else:
|
||||
sql += f" ON CONFLICT ({pk_sql}) DO NOTHING"
|
||||
else:
|
||||
sql += f" ON CONFLICT ({pk_sql}) DO NOTHING"
|
||||
|
||||
sql += " RETURNING (xmax = 0) AS inserted"
|
||||
cur.execute(sql, params)
|
||||
inserted = cur.rowcount
|
||||
|
||||
inserted = 0
|
||||
updated = 0
|
||||
while True:
|
||||
rows = cur.fetchmany(10000)
|
||||
if not rows:
|
||||
break
|
||||
ins, upd = self._count_returning_flags(rows)
|
||||
inserted += ins
|
||||
updated += upd
|
||||
|
||||
# 回补缺失主键记录(处理历史回补导致的“create_time 水位”遗漏)
|
||||
if dwd_table.lower() in self.FACT_MISSING_FILL_TABLES:
|
||||
inserted += self._insert_missing_by_pk(
|
||||
missing_inserted = self._insert_missing_by_pk(
|
||||
cur,
|
||||
dwd_table,
|
||||
ods_table,
|
||||
@@ -1328,8 +1372,9 @@ class DwdLoadTask(BaseTask):
|
||||
dwd_types,
|
||||
ods_types,
|
||||
)
|
||||
inserted += missing_inserted
|
||||
|
||||
return inserted
|
||||
return {"inserted": inserted, "updated": updated, "processed": inserted + updated}
|
||||
def _pick_order_column(self, dwd_table: str, dwd_cols: Iterable[str], ods_cols: Iterable[str]) -> str | None:
|
||||
"""Pick an incremental order column that exists in both DWD and ODS."""
|
||||
lower_cols = {c.lower() for c in dwd_cols} & {c.lower() for c in ods_cols}
|
||||
|
||||
@@ -177,11 +177,12 @@ class BaseOdsTask(BaseTask):
|
||||
def _resolve_window(self, cursor_data: dict | None) -> tuple[datetime, datetime, int]:
|
||||
base_start, base_end, base_minutes = self._get_time_window(cursor_data)
|
||||
|
||||
if self.config.get("run.force_window_override"):
|
||||
override_start = self.config.get("run.window_override.start")
|
||||
override_end = self.config.get("run.window_override.end")
|
||||
if override_start and override_end:
|
||||
return base_start, base_end, base_minutes
|
||||
# 如果用户显式指定了窗口(window_override.start/end),则直接使用,不走 MAX(fetched_at) 兜底
|
||||
override_start = self.config.get("run.window_override.start")
|
||||
override_end = self.config.get("run.window_override.end")
|
||||
if override_start and override_end:
|
||||
# 用户明确指定了窗口,尊重用户选择
|
||||
return base_start, base_end, base_minutes
|
||||
|
||||
# 以 ODS 表 MAX(fetched_at) 兜底:避免“窗口游标推进但未实际入库”导致漏数。
|
||||
last_fetched = self._get_max_fetched_at(self.SPEC.table_name)
|
||||
|
||||
Reference in New Issue
Block a user