Files

20 KiB
Raw Permalink Blame History

设计文档ODS 去重与软删除机制标准化

概述

本设计对 ODS 层的 OdsTaskSpec 配置、content_hash 去重策略、软删除语义进行标准化改造。核心原则ODS 是追加写入的版本化存储,每次内容变更(包括删除)都是一个新版本行。

改造分四个阶段:

  1. 配置精简(方案 1删除无效/冗余字段,引入 SnapshotMode 枚举
  2. 去重优化(方案 2默认开启 skip_unchangedhash 改用 payload + is_delete
  3. 索引支持(方案 3为"取最新版本"查询添加复合索引
  4. 软删除语义(方案 4从 UPDATE 改为 INSERT 删除版本行

改造前后对比

配置层对比

维度 改造前 改造后
去重开关 enable_content_hash_dedup=False22/23 任务关闭) skip_unchanged=True(默认开启)
快照策略 snapshot_full_table + snapshot_window_columns 两个字段组合 SnapshotMode 枚举NONE/FULL_TABLE/WINDOW+ snapshot_time_column
冲突列 conflict_columns_override(运行时不生效,仅声明性标注) 删除PK 唯一来源为 DDL
冗余字段 include_site_column/include_page_no/include_page_size(全部 False 删除,硬编码移除

content_hash 计算对比

维度 改造前 改造后
输入 展平后的 merged_rec排除 7 个元数据字段 原始 payload JSON + is_delete
排除逻辑 _sanitize_record_for_hash 递归排除 source_file/source_endpoint/fetched_at/record_index/content_hash/payload/data 无需排除——payload 天然不含元数据字段
is_delete 参与 不参与is_delete 变化不改变 hash 参与is_delete 变化产生新 hash → 新版本行)
默认行为 22/23 任务不算 hash每次抓取都插入新行 所有任务默认算 hash内容不变则跳过

软删除对比

维度 改造前 改造后
操作方式 UPDATE ... SET is_delete=1(修改所有历史版本) INSERT 一条 is_delete=1 的新版本行
历史版本影响 所有历史版本的 is_delete 被改为 1 历史版本完全不变
幂等性 重复执行无副作用UPDATE 幂等) 重复执行无副作用(最新版本已是 is_delete=1 则跳过)
下游取数 WHERE is_delete = 0(但历史版本也被改了) DISTINCT ON (id) ORDER BY fetched_at DESC + WHERE is_delete = 0

新版本数据处理流程

正常写入流程(每次 ETL 运行)

1. API 抓取 → 获得一批记录
2. 对每条记录:
   a. _normalize_is_delete_flag标准化 is_delete 字段API 可能返回 isDelete/is_deleted 等变体)
   b. 取原始 record 作为 payload
   c. _compute_content_hash(payload, is_delete) → 计算 hash
   d. 若 skip_unchanged=True
      - 查询该业务 ID 在数据库中的最新 content_hash
      - 若 hash 相同 → 跳过(内容未变,无需新版本)
      - 若 hash 不同或无历史记录 → 继续插入
   e. INSERT INTO ods.xxx (..., content_hash, payload, is_delete, fetched_at)
      ON CONFLICT (id, content_hash) DO UPDATE ...

软删除流程(快照对比,路径 B

前提:任务配置了 snapshot_mode != NONE且 run.snapshot_missing_delete=True

1. 收集本次抓取到的所有业务 ID → fetched_keys
2. 查询快照范围内数据库中已有的业务 IDis_delete != 1
   - FULL_TABLE 模式:全表范围
   - WINDOW 模式WHERE {snapshot_time_column} >= window_start AND < window_end
3. 差集 = 数据库中的 ID - fetched_keys → 缺失 ID
4. 对每个缺失 ID
   a. SELECT DISTINCT ON (id) * FROM ods.xxx WHERE id = ? ORDER BY fetched_at DESC
      → 读取最新版本行
   b. 若最新版本已是 is_delete=1 → 跳过(幂等)
   c. 否则:
      - 复制最新版本行的所有字段
      - 设 is_delete = 1
      - _compute_content_hash(原payload, is_delete=1) → 新 hash
      - INSERT 新版本行hash 不同,不会与现有行冲突)
5. 历史版本行完全不变

下游取数规约

-- DWD 层从 ODS 取最新有效版本的标准查询
SELECT DISTINCT ON (id) *
FROM ods.{table_name}
WHERE is_delete IS DISTINCT FROM 1   -- 排除已删除
ORDER BY id, fetched_at DESC;         -- 利用 (id, fetched_at DESC) 索引

-- 若需要包含删除状态(如审计场景)
SELECT DISTINCT ON (id) *
FROM ods.{table_name}
ORDER BY id, fetched_at DESC;
-- 然后在应用层判断 is_delete 字段

架构

改造集中在 ODS 写入管线的三个核心环节:

flowchart TD
    A[上游 API / JSON 回放] --> B[BaseOdsTask.execute]
    B --> C{记录处理}
    C --> D[_normalize_is_delete_flag<br/>标准化 is_delete 字段]
    D --> E[_compute_content_hash<br/>基于 payload + is_delete 算 hash]
    E --> F{skip_unchanged?}
    F -->|hash 相同| G[跳过]
    F -->|hash 不同或新记录| H[INSERT 新版本行]
    
    B --> I{快照对比}
    I -->|snapshot_mode != NONE| J[_mark_missing_as_deleted]
    J --> K[读取缺失 ID 的最新版本]
    K --> L[构造 is_delete=1 的新版本]
    L --> M{最新版本已是 is_delete=1?}
    M -->|是| N[跳过]
    M -->|否| O[INSERT 删除版本行]

影响范围:

  • apps/etl/pipelines/feiqiu/tasks/ods/ods_tasks.py — 主要改动文件
  • db/etl_feiqiu/migrations/ — 新增索引迁移脚本
  • db/etl_feiqiu/schemas/ods.sql — DDL 注释更新(索引)
  • 7 个文档文件 — 同步更新

组件与接口

1. SnapshotMode 枚举

from enum import Enum

class SnapshotMode(Enum):
    """ODS 快照软删除策略。"""
    NONE = "none"            # 不做快照对比,不触发软删除
    FULL_TABLE = "full_table"  # 全表快照:对比全表所有记录
    WINDOW = "window"        # 窗口快照:仅对比时间窗口内的记录

定义在 ods_tasks.py 顶部,与 OdsTaskSpec 同文件。

2. OdsTaskSpec改造后

@dataclass(frozen=False)
class OdsTaskSpec:
    code: str
    class_name: str
    table_name: str
    endpoint: str
    data_path: Tuple[str, ...] = ("data",)
    list_key: str | None = None
    pk_columns: Tuple[ColumnSpec, ...] = ()
    extra_columns: Tuple[ColumnSpec, ...] = ()
    # --- 保留字段(语义不变)---
    include_source_file: bool = True
    include_source_endpoint: bool = True
    include_record_index: bool = False
    include_fetched_at: bool = True
    requires_window: bool = True
    time_fields: Tuple[str, str] | None = ("startTime", "endTime")
    include_site_id: bool = True
    description: str = ""
    extra_params: Dict[str, Any] = field(default_factory=dict)
    # --- 改造字段 ---
    skip_unchanged: bool = True                    # 原 enable_content_hash_dedup默认翻转
    snapshot_mode: SnapshotMode = SnapshotMode.NONE  # 替代 snapshot_full_table + snapshot_window_columns
    snapshot_time_column: str | None = None          # WINDOW 模式的时间列

    def __post_init__(self) -> None:
        if self.snapshot_mode == SnapshotMode.WINDOW and not self.snapshot_time_column:
            raise ValueError(
                f"任务 {self.code}: snapshot_mode=WINDOW 时必须指定 snapshot_time_column"
            )
        if self.snapshot_mode != SnapshotMode.WINDOW and self.snapshot_time_column is not None:
            raise ValueError(
                f"任务 {self.code}: snapshot_mode={self.snapshot_mode.value} 时不应指定 snapshot_time_column"
            )

删除的字段:

  • conflict_columns_override — 运行时不生效
  • include_site_column — 全部 False
  • include_page_no — 全部 False
  • include_page_size — 全部 False
  • snapshot_full_table — 被 SnapshotMode 替代
  • snapshot_window_columns — 被 SnapshotMode + snapshot_time_column 替代
  • enable_content_hash_dedup — 被 skip_unchanged 替代

3. 23 个任务的 SnapshotMode 映射

当前配置到新配置的映射规则:

原配置 新配置
snapshot_full_table=True snapshot_mode=SnapshotMode.FULL_TABLE
snapshot_window_columns=("col",) snapshot_mode=SnapshotMode.WINDOW, snapshot_time_column="col"
两者都未设置 snapshot_mode=SnapshotMode.NONE(默认值)

具体任务映射:

任务 原配置 新 snapshot_mode snapshot_time_column
ODS_ASSISTANT_ACCOUNT snapshot_full_table=True FULL_TABLE None
ODS_MEMBER_CARD snapshot_full_table=True FULL_TABLE None
ODS_GROUP_PACKAGE snapshot_full_table=True FULL_TABLE None
ODS_STORE_GOODS snapshot_full_table=True FULL_TABLE None
ODS_TENANT_GOODS snapshot_full_table=True FULL_TABLE None
ODS_TABLE_USE snapshot_window_columns=("create_time",) WINDOW "create_time"
ODS_ASSISTANT_LEDGER snapshot_window_columns=("create_time",) WINDOW "create_time"
ODS_STORE_GOODS_SALES snapshot_window_columns=("create_time",) WINDOW "create_time"
ODS_REFUND snapshot_window_columns=("pay_time",) WINDOW "pay_time"
ODS_PLATFORM_COUPON snapshot_window_columns=("consume_time",) WINDOW "consume_time"
ODS_MEMBER_BALANCE snapshot_window_columns=("create_time",) WINDOW "create_time"
ODS_GROUP_BUY_REDEMPTION snapshot_window_columns=("create_time",) WINDOW "create_time"
ODS_TABLE_FEE_DISCOUNT snapshot_window_columns=("create_time",) WINDOW "create_time"
其余 10 个任务 无快照配置 NONE None

4. _compute_content_hash改造后

@classmethod
def _compute_content_hash(cls, record: dict, payload: Any, is_delete: int) -> str:
    """基于原始 payload 和 is_delete 计算 content_hash。
    
    payload: 原始 API 返回的 JSON 对象(未展平)
    is_delete: 0 或 1
    """
    payload_str = json.dumps(
        payload,
        ensure_ascii=False,
        sort_keys=True,
        separators=(",", ":"),
        default=cls._hash_default,
    )
    raw = f"{payload_str}|{is_delete}"
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

关键变更:

  • 输入从"展平后的 merged_rec"改为"原始 payload + is_delete"
  • 删除 _sanitize_record_for_hash 方法(不再需要字段排除逻辑)
  • 删除 include_fetched_at 参数payload 天然不含 fetched_at
  • 分隔符 | 确保 payload 和 is_delete 不会产生歧义

一次性代价: 切换后首次运行,所有记录的 hash 都会变化(因为算法不同),会插入一批新版本行。这是预期行为,后续运行恢复正常去重。

5. _mark_missing_as_deleted改造后

def _mark_missing_as_deleted(self, *, table, business_pk_cols, 
                              snapshot_mode, snapshot_time_column,
                              window_start, window_end,
                              key_values, allow_empty) -> int:
    """快照对比软删除INSERT 删除版本行,而非 UPDATE 历史版本。"""
    # 1. 查询快照范围内、is_delete != 1 的业务 ID
    # 2. 排除本次抓取到的 key_values得到缺失 ID 集合
    # 3. 对每个缺失 ID
    #    a. 读取最新版本行DISTINCT ON ... ORDER BY fetched_at DESC
    #    b. 若最新版本已是 is_delete=1跳过
    #    c. 否则:复制该行,设 is_delete=1重算 content_hashINSERT
    # 4. 返回插入的删除版本行数

接口变更:

  • window_columns 参数改为 snapshot_mode + snapshot_time_column
  • full_table 参数删除(由 snapshot_mode 表达)
  • 内部从 UPDATE 改为 SELECT + INSERT

6. _insert_records_schema_aware 的适配

  • compare_latest 判断条件中 self.SPEC.enable_content_hash_dedup 改为 self.SPEC.skip_unchanged
  • _compute_content_hash 调用签名变更:传入原始 record作为 payload和 is_delete 值
  • 删除对 include_site_columninclude_page_noinclude_page_size 的引用

7. BaseOdsTask.execute 的适配

  • snapshot_full_table / snapshot_window_columns 的读取改为 spec.snapshot_mode / spec.snapshot_time_column
  • _mark_missing_as_deleted 调用参数适配
  • 删除对已移除字段的引用

数据模型

ODS 表结构(不变)

所有 23 个 ODS 表的 DDL 结构不变PK 仍为 (业务id, content_hash)

新增索引(迁移脚本)

每张含 fetched_at 列的 ODS 表新增复合索引:

-- 迁移脚本db/etl_feiqiu/migrations/YYYY-MM-DD__add_ods_latest_version_indexes.sql
-- 为 DISTINCT ON (id) ORDER BY id, fetched_at DESC 查询模式提供索引支持

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ods_member_profiles_latest
    ON ods.member_profiles (id, fetched_at DESC);

CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ods_member_balance_changes_latest
    ON ods.member_balance_changes (id, fetched_at DESC);

-- ... 对每张含 fetched_at 的 ODS 表重复此模式
-- 索引命名规范idx_ods_{table_name}_latest
-- 业务主键列名因表而异(大多数是 id少数是 recharge_order_id、sitegoodsstockid 等)

注意:

  • include_fetched_at=False 的任务(如 ODS_ASSISTANT_ACCOUNT其表中 fetched_at 列有 DEFAULT now(),实际仍有值,也需要索引。但需确认 DDL 中是否所有表都有 fetched_at 列。
  • 索引定义需同步写入 db/etl_feiqiu/schemas/ods.sqlDDL 源文件),确保新环境初始化时自动创建索引。
  • 迁移脚本 db/etl_feiqiu/migrations/YYYY-MM-DD__add_ods_latest_version_indexes.sql 用于已有环境的增量部署。

下游查询规约

DWD 层从 ODS 取数的标准模式:

SELECT DISTINCT ON (id) *
FROM ods.{table_name}
WHERE is_delete = 0  -- 或 is_delete IS DISTINCT FROM 1
ORDER BY id, fetched_at DESC;

此查询利用新增的 (id, fetched_at DESC) 索引,避免全表扫描。

正确性属性

正确性属性是系统在所有合法执行路径上都应保持的特征或行为——本质上是对"系统应该做什么"的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。

Property 1: SnapshotMode 与 snapshot_time_column 一致性

For any OdsTaskSpec 实例,当 snapshot_mode 为 WINDOW 时 snapshot_time_column 必须为非空字符串,当 snapshot_mode 为 FULL_TABLE 或 NONE 时 snapshot_time_column 必须为 None违反此约束应抛出 ValueError。

Validates: Requirements 2.3, 2.4, 2.5, 2.6

Property 2: content_hash 确定性

For any 原始 payload合法 JSON 对象)和 is_delete 值0 或 1对相同的 (payload, is_delete) 输入调用 _compute_content_hash 应始终产生相同的 SHA-256 哈希值。

Validates: Requirements 5.1, 5.4

Property 3: content_hash 区分性

For any 两组不同的 (payload, is_delete) 输入payload 不同或 is_delete 不同),_compute_content_hash 应产生不同的哈希值。

Validates: Requirements 5.5

Property 4: skip_unchanged 跳过内容未变的记录

For any ODS 任务skip_unchanged=True当一条记录的 content_hash 与数据库中该业务 ID 最新版本的 content_hash 相同时,该记录应被计入 skipped 而非 inserted。

Validates: Requirements 4.3, 8.5

Property 5: 记录数闭合不变量

For any 非空记录列表被写入 ODS 时,fetched == inserted + updated + skipped 恒成立。

Validates: Requirements 8.3

Property 6: 软删除构造正确性

For any 快照对比中发现的缺失业务 ID_mark_missing_as_deleted 应读取该 ID 的最新版本行,构造一条 is_delete=1 的新版本行,其 content_hash 基于原始 payload + is_delete=1 重算,并通过 INSERT而非 UPDATE写入。

Validates: Requirements 7.1, 7.2, 7.4

Property 7: 软删除幂等性

For any 业务 ID若其最新版本已经是 is_delete=1再次执行 _mark_missing_as_deleted 不应插入新的删除版本行。

Validates: Requirements 7.3, 8.7

Property 8: 软删除不修改历史版本

For any 软删除操作执行后,数据库中该业务 ID 的所有历史版本行(执行前已存在的行)的内容应保持不变——不应有 UPDATE 语句作用于 ODS 表。

Validates: Requirements 7.4, 8.6

错误处理

OdsTaskSpec 校验错误

  • SnapshotMode.WINDOW + snapshot_time_column=None__post_init__ 抛出 ValueError
  • SnapshotMode.FULL_TABLE/NONE + snapshot_time_column 不为 None → __post_init__ 抛出 ValueError
  • 这些错误在任务注册时(模块加载时)即触发,属于 fail-fast 设计

hash 算法切换的一次性代价

  • 首次运行后所有记录的 content_hash 都会变化,导致全量插入新版本行
  • 这是预期行为,不是错误
  • 日志中应记录 "hash 算法已变更,本次运行将插入全量新版本" 的提示信息
  • 后续运行恢复正常去重

软删除的边界情况

  • 缺失 ID 在数据库中无任何记录(从未抓取过)→ 跳过,不插入删除版本
  • 缺失 ID 的最新版本已是 is_delete=1 → 跳过(幂等性)
  • 快照范围内无任何记录且 allow_empty=False → 返回 0不执行任何操作

迁移脚本错误

  • CREATE INDEX CONCURRENTLY 不能在事务块内执行 → 迁移脚本需单独执行
  • 索引创建失败不影响数据写入,仅影响查询性能 → 可重试

测试策略

属性测试hypothesis

使用 pytest + hypothesis 库,每个属性测试至少运行 100 次迭代。

测试文件: apps/etl/pipelines/feiqiu/tests/unit/test_ods_dedup_properties.py

属性 测试方法 生成策略
Property 1 生成随机 SnapshotMode + snapshot_time_column 组合,验证校验逻辑 st.sampled_from(SnapshotMode) × st.one_of(st.none(), st.text(min_size=1))
Property 2 生成随机 JSON payload + is_delete验证两次调用结果相同 st.dictionaries(st.text(), st.text()) × st.sampled_from([0, 1])
Property 3 生成两组不同的 (payload, is_delete),验证 hash 不同 同上,加 assume(pair1 != pair2)
Property 4 用 PkAwareFakeDB 预设最新 hash验证相同记录被跳过 _ods_record_with_id 策略
Property 5 生成随机记录列表,验证 fetched == inserted + updated + skipped st.lists(_ods_record_with_id)
Property 6 用 FakeDB 模拟缺失 ID 场景,验证 INSERT 而非 UPDATE st.lists(st.integers())
Property 7 预设最新版本 is_delete=1验证不产生新行 同上
Property 8 执行软删除后检查 FakeDB 中无 UPDATE 语句 同上

每个测试用注释标注:# Feature: ods-dedup-standardize, Property N: {title}

单元测试

测试文件: 适配现有 test_ods_tasks.pytest_debug_ods_properties.py

  • 适配 OdsTaskSpec 构造函数变更(删除旧字段,使用新字段)
  • 适配 _compute_content_hash 签名变更
  • 适配 _mark_missing_as_deleted 参数变更
  • 验证 SnapshotMode 枚举的边界情况edge cases from prework 2.5, 2.6

现有测试适配

现有测试中需要适配的关键点:

  • test_debug_ods_properties.py 中的 Property 4content_hash 确定性)需要适配新的 _compute_content_hash 签名
  • test_debug_ods_properties.py 中的 Property 5快照删除标记需要适配新的 INSERT 语义(检查 INSERT 而非 UPDATE
  • test_ods_tasks.py 中的所有任务测试需要确保在新的 OdsTaskSpec 下仍能正常运行

测试执行命令

# ETL 单元测试(包含属性测试)
cd apps/etl/pipelines/feiqiu && pytest tests/unit -v

# 仅运行本次改造的属性测试
cd apps/etl/pipelines/feiqiu && pytest tests/unit/test_ods_dedup_properties.py -v