573 lines
19 KiB
Python
573 lines
19 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
DWD 第一阶段重构 — 属性测试
|
||
|
||
Feature: dwd-phase1-refactor
|
||
测试位置:monorepo 根目录 tests/
|
||
|
||
使用 hypothesis + FakeCursor 验证重构后 DwdLoadTask 的核心正确性属性。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
import sys
|
||
import logging
|
||
from datetime import datetime, timedelta, timezone
|
||
from pathlib import Path
|
||
from types import SimpleNamespace
|
||
from typing import Any, Dict, List, Sequence, Tuple
|
||
from unittest.mock import MagicMock
|
||
|
||
from hypothesis import given, settings, assume
|
||
import hypothesis.strategies as st
|
||
|
||
# ── 将 ETL 模块加入 sys.path ──
|
||
_ETL_ROOT = Path(__file__).resolve().parent.parent / "apps" / "etl" / "connectors" / "feiqiu"
|
||
if str(_ETL_ROOT) not in sys.path:
|
||
sys.path.insert(0, str(_ETL_ROOT))
|
||
|
||
from tasks.dwd.dwd_load_task import DwdLoadTask
|
||
|
||
|
||
# ── 测试用 FakeCursor(扩展自 task_test_utils,增加 fetchmany 支持)──
|
||
|
||
class PropertyTestCursor:
|
||
"""捕获 SQL 语句的极简游标,用于属性测试验证 SQL 结构。"""
|
||
|
||
def __init__(self):
|
||
self.executed: List[Dict[str, Any]] = []
|
||
self._fetchall_rows: List[Tuple] = []
|
||
self.rowcount = 0
|
||
self.connection = SimpleNamespace(encoding="UTF8")
|
||
|
||
def execute(self, sql: str, params=None):
|
||
sql_text = sql.decode("utf-8", errors="ignore") if isinstance(sql, (bytes, bytearray)) else str(sql)
|
||
self.executed.append({"sql": sql_text.strip(), "params": params})
|
||
self._fetchall_rows = []
|
||
|
||
lowered = sql_text.lower()
|
||
# information_schema 查询返回假列信息
|
||
if "from information_schema.columns" in lowered:
|
||
table_name = params[1] if params and len(params) >= 2 else None
|
||
self._fetchall_rows = self._fake_columns(table_name)
|
||
return
|
||
if "from information_schema.table_constraints" in lowered:
|
||
self._fetchall_rows = []
|
||
return
|
||
self.rowcount = 0
|
||
|
||
def fetchone(self):
|
||
return None
|
||
|
||
def fetchall(self):
|
||
return list(self._fetchall_rows)
|
||
|
||
def fetchmany(self, size=None):
|
||
"""_merge_fact_increment 使用 fetchmany 读取 RETURNING 结果。"""
|
||
# 返回空列表表示无更多行,终止循环
|
||
return []
|
||
|
||
def mogrify(self, template, args):
|
||
return b"(?)"
|
||
|
||
@staticmethod
|
||
def _fake_columns(table_name: str | None) -> List[Tuple[str, str, str]]:
|
||
"""返回假列信息,确保包含 fetched_at 和常见列。"""
|
||
return [
|
||
("id", "bigint", "int8"),
|
||
("site_id", "bigint", "int8"),
|
||
("record_index", "integer", "int4"),
|
||
("content_hash", "text", "text"),
|
||
("fetched_at", "timestamp with time zone", "timestamptz"),
|
||
("payload", "jsonb", "jsonb"),
|
||
]
|
||
|
||
def __enter__(self):
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc, tb):
|
||
return False
|
||
|
||
|
||
# ── 辅助:构造最小可用的 DwdLoadTask 实例 ──
|
||
|
||
def _make_task(fact_upsert: bool = True) -> DwdLoadTask:
|
||
"""构造一个用于测试的 DwdLoadTask,使用 mock config/db/api/logger。"""
|
||
config = MagicMock()
|
||
config.get = lambda key, default=None: {
|
||
"app.store_id": 1,
|
||
"app.timezone": "Asia/Shanghai",
|
||
"dwd.fact_upsert": fact_upsert,
|
||
}.get(key, default)
|
||
|
||
db = MagicMock()
|
||
api = MagicMock()
|
||
logger = logging.getLogger("test_dwd_property")
|
||
logger.setLevel(logging.WARNING) # 抑制测试噪音
|
||
|
||
return DwdLoadTask(config, db, api, logger)
|
||
|
||
|
||
# ── Hypothesis 策略 ──
|
||
|
||
@st.composite
|
||
def window_pair(draw):
|
||
"""生成有效的 (window_start, window_end) 时间对,确保 start < end。"""
|
||
base = draw(st.datetimes(
|
||
min_value=datetime(2020, 1, 1),
|
||
max_value=datetime(2030, 12, 31),
|
||
timezones=st.just(timezone.utc),
|
||
))
|
||
delta_minutes = draw(st.integers(min_value=1, max_value=10080)) # 1 分钟 ~ 7 天
|
||
end = base + timedelta(minutes=delta_minutes)
|
||
return base, end
|
||
|
||
|
||
# 从 TABLE_MAP 中提取所有事实表(dwd_ 前缀)
|
||
_FACT_TABLES = [
|
||
(dwd, ods)
|
||
for dwd, ods in DwdLoadTask.TABLE_MAP.items()
|
||
if dwd.split(".")[-1].startswith("dwd_")
|
||
]
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════
|
||
# Property 1: 事实表增量 SQL 始终使用窗口范围条件
|
||
# Feature: dwd-phase1-refactor, Property 1: 事实表增量 SQL 始终使用窗口范围条件
|
||
# ══════════════════════════════════════════════════════════════
|
||
|
||
# 匹配双边窗口条件:fetched_at >= %s AND fetched_at < %s
|
||
_WINDOW_PATTERN = re.compile(
|
||
r'"fetched_at"\s*>=\s*%s\s+AND\s+"fetched_at"\s*<\s*%s',
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
# 匹配单边水位线条件:fetched_at > %s(不跟 AND fetched_at < %s)
|
||
_SINGLE_WATERMARK_PATTERN = re.compile(
|
||
r'"fetched_at"\s*>\s*%s(?!\s+AND\s+"fetched_at"\s*<\s*%s)',
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
@given(
|
||
table_pair=st.sampled_from(_FACT_TABLES),
|
||
window=window_pair(),
|
||
)
|
||
@settings(max_examples=100)
|
||
def test_fact_increment_sql_uses_window_range(
|
||
table_pair: Tuple[str, str],
|
||
window: Tuple[datetime, datetime],
|
||
):
|
||
"""
|
||
Property 1: 事实表增量 SQL 始终使用窗口范围条件
|
||
|
||
对于任意事实表和任意 window_start/window_end 时间对,
|
||
_merge_fact_increment() 生成的 SQL 的 WHERE 子句:
|
||
- 必须包含 fetched_at >= %s AND fetched_at < %s
|
||
- 不得包含单边水位线条件 fetched_at > %s
|
||
|
||
**Validates: Requirements 1.1, 1.2**
|
||
"""
|
||
dwd_table, ods_table = table_pair
|
||
window_start, window_end = window
|
||
|
||
task = _make_task()
|
||
cur = PropertyTestCursor()
|
||
|
||
# 获取 FakeCursor 返回的列名(模拟 information_schema 查询结果)
|
||
fake_cols = [row[0] for row in PropertyTestCursor._fake_columns(None)]
|
||
|
||
# 获取该表的 FACT_MAPPINGS 中的目标列
|
||
mapping_entries = task.FACT_MAPPINGS.get(dwd_table) or []
|
||
mapping_dest = [dst for dst, _, _ in mapping_entries]
|
||
|
||
# 构建 dwd_cols 和 ods_cols(合并映射列和假列)
|
||
dwd_cols = list(set(fake_cols + mapping_dest))
|
||
ods_cols = list(fake_cols)
|
||
|
||
# 构建类型字典
|
||
fake_types = {row[0]: row[1] for row in PropertyTestCursor._fake_columns(None)}
|
||
dwd_types = dict(fake_types)
|
||
ods_types = dict(fake_types)
|
||
# 为映射目标列补充类型
|
||
for col in mapping_dest:
|
||
if col not in dwd_types:
|
||
dwd_types[col] = "text"
|
||
|
||
result = task._merge_fact_increment(
|
||
cur=cur,
|
||
dwd_table=dwd_table,
|
||
ods_table=ods_table,
|
||
dwd_cols=dwd_cols,
|
||
ods_cols=ods_cols,
|
||
dwd_types=dwd_types,
|
||
ods_types=ods_types,
|
||
window_start=window_start,
|
||
window_end=window_end,
|
||
)
|
||
|
||
# 找到主 INSERT SQL(排除 information_schema 查询)
|
||
insert_sqls = [
|
||
entry["sql"]
|
||
for entry in cur.executed
|
||
if entry["sql"].upper().startswith("INSERT INTO")
|
||
]
|
||
|
||
assert len(insert_sqls) == 1, (
|
||
f"期望恰好 1 条 INSERT SQL,实际 {len(insert_sqls)} 条,"
|
||
f"表: {dwd_table}"
|
||
)
|
||
|
||
sql = insert_sqls[0]
|
||
|
||
# 验证:包含双边窗口条件
|
||
assert _WINDOW_PATTERN.search(sql), (
|
||
f"INSERT SQL 缺少双边窗口条件 (fetched_at >= %s AND fetched_at < %s),"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|
||
|
||
# 验证:不包含单边水位线条件
|
||
assert not _SINGLE_WATERMARK_PATTERN.search(sql), (
|
||
f"INSERT SQL 包含单边水位线条件 (fetched_at > %s),"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|
||
|
||
# 验证:SQL 参数包含 window_start 和 window_end
|
||
insert_entry = [
|
||
entry for entry in cur.executed
|
||
if entry["sql"].upper().startswith("INSERT INTO")
|
||
][0]
|
||
params = insert_entry["params"]
|
||
assert params is not None, f"INSERT SQL 参数为 None,表: {dwd_table}"
|
||
assert window_start in params, (
|
||
f"SQL 参数缺少 window_start={window_start},"
|
||
f"表: {dwd_table}, params={params}"
|
||
)
|
||
assert window_end in params, (
|
||
f"SQL 参数缺少 window_end={window_end},"
|
||
f"表: {dwd_table}, params={params}"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════
|
||
# Property 2: 事实表增量不执行回补 SQL
|
||
# Feature: dwd-phase1-refactor, Property 2: 事实表增量不执行回补 SQL
|
||
# ══════════════════════════════════════════════════════════════
|
||
|
||
# 匹配回补 SQL 特征:LEFT JOIN ... WHERE ... IS NULL
|
||
_BACKFILL_PATTERN = re.compile(
|
||
r"LEFT\s+JOIN\b.*?\bWHERE\b.*?\bIS\s+NULL\b",
|
||
re.IGNORECASE | re.DOTALL,
|
||
)
|
||
|
||
|
||
@given(
|
||
table_pair=st.sampled_from(_FACT_TABLES),
|
||
window=window_pair(),
|
||
)
|
||
@settings(max_examples=100)
|
||
def test_fact_increment_no_backfill(
|
||
table_pair: Tuple[str, str],
|
||
window: Tuple[datetime, datetime],
|
||
):
|
||
"""
|
||
Property 2: 事实表增量不执行回补 SQL
|
||
|
||
对于任意事实表和任意 window_start/window_end 时间对,
|
||
_merge_fact_increment() 执行的所有 SQL 语句中不得包含
|
||
LEFT JOIN ... WHERE ... IS NULL 模式(回补特征)。
|
||
|
||
**Validates: Requirements 2.3**
|
||
"""
|
||
dwd_table, ods_table = table_pair
|
||
window_start, window_end = window
|
||
|
||
task = _make_task()
|
||
cur = PropertyTestCursor()
|
||
|
||
# 构建列信息(复用 Property 1 的逻辑)
|
||
fake_cols = [row[0] for row in PropertyTestCursor._fake_columns(None)]
|
||
mapping_entries = task.FACT_MAPPINGS.get(dwd_table) or []
|
||
mapping_dest = [dst for dst, _, _ in mapping_entries]
|
||
|
||
dwd_cols = list(set(fake_cols + mapping_dest))
|
||
ods_cols = list(fake_cols)
|
||
|
||
fake_types = {row[0]: row[1] for row in PropertyTestCursor._fake_columns(None)}
|
||
dwd_types = dict(fake_types)
|
||
ods_types = dict(fake_types)
|
||
for col in mapping_dest:
|
||
if col not in dwd_types:
|
||
dwd_types[col] = "text"
|
||
|
||
result = task._merge_fact_increment(
|
||
cur=cur,
|
||
dwd_table=dwd_table,
|
||
ods_table=ods_table,
|
||
dwd_cols=dwd_cols,
|
||
ods_cols=ods_cols,
|
||
dwd_types=dwd_types,
|
||
ods_types=ods_types,
|
||
window_start=window_start,
|
||
window_end=window_end,
|
||
)
|
||
|
||
# 检查所有执行的 SQL(排除 information_schema 元数据查询)
|
||
all_sqls = [
|
||
entry["sql"]
|
||
for entry in cur.executed
|
||
if "information_schema" not in entry["sql"].lower()
|
||
]
|
||
|
||
for sql in all_sqls:
|
||
assert not _BACKFILL_PATTERN.search(sql), (
|
||
f"检测到回补 SQL(LEFT JOIN ... IS NULL 模式),"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════
|
||
# Property 4: 维度表始终走 SCD2 路径
|
||
# Feature: dwd-phase1-refactor, Property 4: 维度表始终走 SCD2 路径
|
||
# ══════════════════════════════════════════════════════════════
|
||
|
||
# 从 TABLE_MAP 中提取所有维度表(dim_ 前缀)
|
||
_DIM_TABLES = [
|
||
(dwd, ods)
|
||
for dwd, ods in DwdLoadTask.TABLE_MAP.items()
|
||
if dwd.split(".")[-1].startswith("dim_")
|
||
]
|
||
|
||
|
||
@given(
|
||
table_pair=st.sampled_from(_DIM_TABLES),
|
||
now=st.datetimes(
|
||
min_value=datetime(2020, 1, 1),
|
||
max_value=datetime(2030, 12, 31),
|
||
timezones=st.just(timezone.utc),
|
||
),
|
||
)
|
||
@settings(max_examples=100)
|
||
def test_dim_always_scd2(
|
||
table_pair: Tuple[str, str],
|
||
now: datetime,
|
||
):
|
||
"""
|
||
Property 4: 维度表始终走 SCD2 路径
|
||
|
||
对于任意维度表(TABLE_MAP 中 dim_ 前缀),调用 _merge_dim()
|
||
始终委托给 _merge_dim_scd2(),不经过任何条件分支判断。
|
||
|
||
**Validates: Requirements 3.8, 5.1**
|
||
"""
|
||
from unittest.mock import patch, sentinel
|
||
|
||
dwd_table, ods_table = table_pair
|
||
|
||
task = _make_task()
|
||
cur = PropertyTestCursor()
|
||
|
||
# 构建最小列集合
|
||
fake_cols = [row[0] for row in PropertyTestCursor._fake_columns(None)]
|
||
dwd_cols = list(fake_cols)
|
||
ods_cols = list(fake_cols)
|
||
|
||
# mock _merge_dim_scd2,返回 sentinel 以验证委托关系
|
||
expected_return = {"processed": 0, "inserted": 0, "updated": 0, "skipped": 0}
|
||
|
||
with patch.object(task, "_merge_dim_scd2", return_value=expected_return) as mock_scd2:
|
||
result = task._merge_dim(cur, dwd_table, ods_table, dwd_cols, ods_cols, now)
|
||
|
||
# 验证 _merge_dim_scd2 被调用了恰好一次
|
||
assert mock_scd2.call_count == 1, (
|
||
f"期望 _merge_dim_scd2 被调用 1 次,实际 {mock_scd2.call_count} 次,"
|
||
f"表: {dwd_table}"
|
||
)
|
||
|
||
# 验证传入参数正确
|
||
call_args = mock_scd2.call_args
|
||
assert call_args[0] == (cur, dwd_table, ods_table, dwd_cols, ods_cols, now), (
|
||
f"_merge_dim_scd2 调用参数不匹配,表: {dwd_table}\n"
|
||
f"期望: (cur, {dwd_table}, {ods_table}, {dwd_cols}, {ods_cols}, {now})\n"
|
||
f"实际: {call_args[0]}"
|
||
)
|
||
|
||
# 验证返回值直接透传
|
||
assert result is expected_return, (
|
||
f"_merge_dim() 返回值未直接透传 _merge_dim_scd2() 的结果,"
|
||
f"表: {dwd_table}"
|
||
)
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════════
|
||
# Property 3: 事实表主增量 SQL 结构等价性
|
||
# Feature: dwd-phase1-refactor, Property 3: 事实表主增量 SQL 结构等价性
|
||
# ══════════════════════════════════════════════════════════════
|
||
|
||
|
||
class _PkAwareCursor(PropertyTestCursor):
|
||
"""扩展 PropertyTestCursor,对 table_constraints 查询返回主键信息。
|
||
|
||
_merge_fact_increment 通过 _get_primary_keys 查询 information_schema
|
||
获取主键列。基础 PropertyTestCursor 返回空列表,导致无法生成
|
||
ON CONFLICT 子句。此子类注入 pk_cols 以完整测试 SQL 结构。
|
||
"""
|
||
|
||
def __init__(self, pk_cols: Sequence[str]):
|
||
super().__init__()
|
||
self._pk_cols = list(pk_cols)
|
||
|
||
def execute(self, sql: str, params=None):
|
||
sql_text = sql.decode("utf-8", errors="ignore") if isinstance(sql, (bytes, bytearray)) else str(sql)
|
||
lowered = sql_text.lower()
|
||
# 拦截主键查询,返回注入的 pk_cols
|
||
if "from information_schema.table_constraints" in lowered:
|
||
self.executed.append({"sql": sql_text.strip(), "params": params})
|
||
self._fetchall_rows = [
|
||
{"column_name": col} for col in self._pk_cols
|
||
]
|
||
return
|
||
super().execute(sql, params)
|
||
|
||
|
||
# INSERT INTO "dwd"."<table>" (...) — 实际 SQL 使用引号包裹 schema 和表名
|
||
_INSERT_INTO_PATTERN = re.compile(
|
||
r'INSERT\s+INTO\s+"?dwd"?\."?\w+"?\s*\(',
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
# SELECT ... FROM "ods"."<table>"
|
||
_SELECT_FROM_ODS_PATTERN = re.compile(
|
||
r'SELECT\s+.+?\s+FROM\s+"?ods"?\."?\w+"?',
|
||
re.IGNORECASE | re.DOTALL,
|
||
)
|
||
|
||
# ON CONFLICT (...) DO UPDATE SET 或 DO NOTHING
|
||
_ON_CONFLICT_PATTERN = re.compile(
|
||
r"ON\s+CONFLICT\s*\([^)]+\)\s+DO\s+(UPDATE\s+SET|NOTHING)",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
# IS DISTINCT FROM(变更检测)
|
||
_IS_DISTINCT_FROM_PATTERN = re.compile(
|
||
r"IS\s+DISTINCT\s+FROM",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
# RETURNING (xmax = 0) AS inserted
|
||
_RETURNING_PATTERN = re.compile(
|
||
r"RETURNING\s+\(xmax\s*=\s*0\)\s+AS\s+inserted",
|
||
re.IGNORECASE,
|
||
)
|
||
|
||
|
||
@given(
|
||
table_pair=st.sampled_from(_FACT_TABLES),
|
||
window=window_pair(),
|
||
fact_upsert=st.booleans(),
|
||
)
|
||
@settings(max_examples=100)
|
||
def test_fact_increment_sql_structure(
|
||
table_pair: Tuple[str, str],
|
||
window: Tuple[datetime, datetime],
|
||
fact_upsert: bool,
|
||
):
|
||
"""
|
||
Property 3: 事实表主增量 SQL 结构等价性
|
||
|
||
对于任意事实表、窗口参数和 fact_upsert 配置,
|
||
_merge_fact_increment() 生成的主增量 SQL 应包含:
|
||
1. INSERT INTO <dwd_table> 结构
|
||
2. SELECT ... FROM <ods_table> 结构
|
||
3. ON CONFLICT (<pk_cols>) DO UPDATE SET 或 DO NOTHING 结构
|
||
4. IS DISTINCT FROM 变更检测(当有 DO UPDATE SET 时)
|
||
5. RETURNING (xmax = 0) AS inserted 结构
|
||
|
||
**Validates: Requirements 5.2, 5.5**
|
||
"""
|
||
dwd_table, ods_table = table_pair
|
||
window_start, window_end = window
|
||
|
||
task = _make_task(fact_upsert=fact_upsert)
|
||
|
||
# 构建列信息
|
||
fake_cols = [row[0] for row in PropertyTestCursor._fake_columns(None)]
|
||
mapping_entries = task.FACT_MAPPINGS.get(dwd_table) or []
|
||
mapping_dest = [dst for dst, _, _ in mapping_entries]
|
||
|
||
dwd_cols = list(set(fake_cols + mapping_dest))
|
||
ods_cols = list(fake_cols)
|
||
|
||
fake_types = {row[0]: row[1] for row in PropertyTestCursor._fake_columns(None)}
|
||
dwd_types = dict(fake_types)
|
||
ods_types = dict(fake_types)
|
||
for col in mapping_dest:
|
||
if col not in dwd_types:
|
||
dwd_types[col] = "text"
|
||
|
||
# 使用注入主键的 cursor,确保 ON CONFLICT 子句被生成
|
||
# 选择 "id" 作为主键——它存在于 _fake_columns 返回的列中
|
||
cur = _PkAwareCursor(pk_cols=["id"])
|
||
|
||
result = task._merge_fact_increment(
|
||
cur=cur,
|
||
dwd_table=dwd_table,
|
||
ods_table=ods_table,
|
||
dwd_cols=dwd_cols,
|
||
ods_cols=ods_cols,
|
||
dwd_types=dwd_types,
|
||
ods_types=ods_types,
|
||
window_start=window_start,
|
||
window_end=window_end,
|
||
)
|
||
|
||
# 提取主 INSERT SQL
|
||
insert_sqls = [
|
||
entry["sql"]
|
||
for entry in cur.executed
|
||
if entry["sql"].upper().startswith("INSERT INTO")
|
||
]
|
||
|
||
assert len(insert_sqls) == 1, (
|
||
f"期望恰好 1 条 INSERT SQL,实际 {len(insert_sqls)} 条,"
|
||
f"表: {dwd_table}"
|
||
)
|
||
|
||
sql = insert_sqls[0]
|
||
|
||
# 1. 验证 INSERT INTO dwd.<table> 结构
|
||
assert _INSERT_INTO_PATTERN.search(sql), (
|
||
f"SQL 缺少 INSERT INTO dwd.<table> 结构,"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|
||
|
||
# 2. 验证 SELECT ... FROM ods.<table> 结构
|
||
assert _SELECT_FROM_ODS_PATTERN.search(sql), (
|
||
f"SQL 缺少 SELECT ... FROM ods.<table> 结构,"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|
||
|
||
# 3. 验证 ON CONFLICT 结构
|
||
assert _ON_CONFLICT_PATTERN.search(sql), (
|
||
f"SQL 缺少 ON CONFLICT (...) DO UPDATE SET / DO NOTHING 结构,"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|
||
|
||
# 4. 验证 IS DISTINCT FROM 变更检测逻辑
|
||
# 有 DO UPDATE SET 时,必须有 IS DISTINCT FROM 变更检测
|
||
has_do_update = re.search(
|
||
r"ON\s+CONFLICT\s*\([^)]+\)\s+DO\s+UPDATE\s+SET",
|
||
sql, re.IGNORECASE,
|
||
)
|
||
if has_do_update:
|
||
assert _IS_DISTINCT_FROM_PATTERN.search(sql), (
|
||
f"SQL 有 DO UPDATE SET 但缺少 IS DISTINCT FROM 变更检测,"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|
||
|
||
# 5. 验证 RETURNING (xmax = 0) AS inserted 结构
|
||
assert _RETURNING_PATTERN.search(sql), (
|
||
f"SQL 缺少 RETURNING (xmax = 0) AS inserted 结构,"
|
||
f"表: {dwd_table}\nSQL: {sql[:500]}"
|
||
)
|