Files

450 lines
20 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 设计文档ODS 去重与软删除机制标准化
## 概述
本设计对 ODS 层的 `OdsTaskSpec` 配置、content_hash 去重策略、软删除语义进行标准化改造。核心原则ODS 是追加写入的版本化存储,每次内容变更(包括删除)都是一个新版本行。
改造分四个阶段:
1. **配置精简**(方案 1删除无效/冗余字段,引入 SnapshotMode 枚举
2. **去重优化**(方案 2默认开启 skip_unchangedhash 改用 payload + is_delete
3. **索引支持**(方案 3为"取最新版本"查询添加复合索引
4. **软删除语义**(方案 4从 UPDATE 改为 INSERT 删除版本行
## 改造前后对比
### 配置层对比
| 维度 | 改造前 | 改造后 |
|------|--------|--------|
| 去重开关 | `enable_content_hash_dedup=False`22/23 任务关闭) | `skip_unchanged=True`(默认开启) |
| 快照策略 | `snapshot_full_table` + `snapshot_window_columns` 两个字段组合 | `SnapshotMode` 枚举NONE/FULL_TABLE/WINDOW+ `snapshot_time_column` |
| 冲突列 | `conflict_columns_override`(运行时不生效,仅声明性标注) | 删除PK 唯一来源为 DDL |
| 冗余字段 | `include_site_column`/`include_page_no`/`include_page_size`(全部 False | 删除,硬编码移除 |
### content_hash 计算对比
| 维度 | 改造前 | 改造后 |
|------|--------|--------|
| 输入 | 展平后的 merged_rec排除 7 个元数据字段 | 原始 payload JSON + is_delete |
| 排除逻辑 | `_sanitize_record_for_hash` 递归排除 source_file/source_endpoint/fetched_at/record_index/content_hash/payload/data | 无需排除——payload 天然不含元数据字段 |
| is_delete 参与 | 不参与is_delete 变化不改变 hash | 参与is_delete 变化产生新 hash → 新版本行) |
| 默认行为 | 22/23 任务不算 hash每次抓取都插入新行 | 所有任务默认算 hash内容不变则跳过 |
### 软删除对比
| 维度 | 改造前 | 改造后 |
|------|--------|--------|
| 操作方式 | `UPDATE ... SET is_delete=1`(修改所有历史版本) | INSERT 一条 is_delete=1 的新版本行 |
| 历史版本影响 | 所有历史版本的 is_delete 被改为 1 | 历史版本完全不变 |
| 幂等性 | 重复执行无副作用UPDATE 幂等) | 重复执行无副作用(最新版本已是 is_delete=1 则跳过) |
| 下游取数 | `WHERE is_delete = 0`(但历史版本也被改了) | `DISTINCT ON (id) ORDER BY fetched_at DESC` + `WHERE is_delete = 0` |
### 新版本数据处理流程
#### 正常写入流程(每次 ETL 运行)
```
1. API 抓取 → 获得一批记录
2. 对每条记录:
a. _normalize_is_delete_flag标准化 is_delete 字段API 可能返回 isDelete/is_deleted 等变体)
b. 取原始 record 作为 payload
c. _compute_content_hash(payload, is_delete) → 计算 hash
d. 若 skip_unchanged=True
- 查询该业务 ID 在数据库中的最新 content_hash
- 若 hash 相同 → 跳过(内容未变,无需新版本)
- 若 hash 不同或无历史记录 → 继续插入
e. INSERT INTO ods.xxx (..., content_hash, payload, is_delete, fetched_at)
ON CONFLICT (id, content_hash) DO UPDATE ...
```
#### 软删除流程(快照对比,路径 B
```
前提:任务配置了 snapshot_mode != NONE且 run.snapshot_missing_delete=True
1. 收集本次抓取到的所有业务 ID → fetched_keys
2. 查询快照范围内数据库中已有的业务 IDis_delete != 1
- FULL_TABLE 模式:全表范围
- WINDOW 模式WHERE {snapshot_time_column} >= window_start AND < window_end
3. 差集 = 数据库中的 ID - fetched_keys → 缺失 ID
4. 对每个缺失 ID
a. SELECT DISTINCT ON (id) * FROM ods.xxx WHERE id = ? ORDER BY fetched_at DESC
→ 读取最新版本行
b. 若最新版本已是 is_delete=1 → 跳过(幂等)
c. 否则:
- 复制最新版本行的所有字段
- 设 is_delete = 1
- _compute_content_hash(原payload, is_delete=1) → 新 hash
- INSERT 新版本行hash 不同,不会与现有行冲突)
5. 历史版本行完全不变
```
#### 下游取数规约
```sql
-- DWD 层从 ODS 取最新有效版本的标准查询
SELECT DISTINCT ON (id) *
FROM ods.{table_name}
WHERE is_delete IS DISTINCT FROM 1 -- 排除已删除
ORDER BY id, fetched_at DESC; -- 利用 (id, fetched_at DESC) 索引
-- 若需要包含删除状态(如审计场景)
SELECT DISTINCT ON (id) *
FROM ods.{table_name}
ORDER BY id, fetched_at DESC;
-- 然后在应用层判断 is_delete 字段
```
## 架构
改造集中在 ODS 写入管线的三个核心环节:
```mermaid
flowchart TD
A[上游 API / JSON 回放] --> B[BaseOdsTask.execute]
B --> C{记录处理}
C --> D[_normalize_is_delete_flag<br/>标准化 is_delete 字段]
D --> E[_compute_content_hash<br/>基于 payload + is_delete 算 hash]
E --> F{skip_unchanged?}
F -->|hash 相同| G[跳过]
F -->|hash 不同或新记录| H[INSERT 新版本行]
B --> I{快照对比}
I -->|snapshot_mode != NONE| J[_mark_missing_as_deleted]
J --> K[读取缺失 ID 的最新版本]
K --> L[构造 is_delete=1 的新版本]
L --> M{最新版本已是 is_delete=1?}
M -->|是| N[跳过]
M -->|否| O[INSERT 删除版本行]
```
**影响范围:**
- `apps/etl/pipelines/feiqiu/tasks/ods/ods_tasks.py` — 主要改动文件
- `db/etl_feiqiu/migrations/` — 新增索引迁移脚本
- `db/etl_feiqiu/schemas/ods.sql` — DDL 注释更新(索引)
- 7 个文档文件 — 同步更新
## 组件与接口
### 1. SnapshotMode 枚举
```python
from enum import Enum
class SnapshotMode(Enum):
"""ODS 快照软删除策略。"""
NONE = "none" # 不做快照对比,不触发软删除
FULL_TABLE = "full_table" # 全表快照:对比全表所有记录
WINDOW = "window" # 窗口快照:仅对比时间窗口内的记录
```
定义在 `ods_tasks.py` 顶部,与 OdsTaskSpec 同文件。
### 2. OdsTaskSpec改造后
```python
@dataclass(frozen=False)
class OdsTaskSpec:
code: str
class_name: str
table_name: str
endpoint: str
data_path: Tuple[str, ...] = ("data",)
list_key: str | None = None
pk_columns: Tuple[ColumnSpec, ...] = ()
extra_columns: Tuple[ColumnSpec, ...] = ()
# --- 保留字段(语义不变)---
include_source_file: bool = True
include_source_endpoint: bool = True
include_record_index: bool = False
include_fetched_at: bool = True
requires_window: bool = True
time_fields: Tuple[str, str] | None = ("startTime", "endTime")
include_site_id: bool = True
description: str = ""
extra_params: Dict[str, Any] = field(default_factory=dict)
# --- 改造字段 ---
skip_unchanged: bool = True # 原 enable_content_hash_dedup默认翻转
snapshot_mode: SnapshotMode = SnapshotMode.NONE # 替代 snapshot_full_table + snapshot_window_columns
snapshot_time_column: str | None = None # WINDOW 模式的时间列
def __post_init__(self) -> None:
if self.snapshot_mode == SnapshotMode.WINDOW and not self.snapshot_time_column:
raise ValueError(
f"任务 {self.code}: snapshot_mode=WINDOW 时必须指定 snapshot_time_column"
)
if self.snapshot_mode != SnapshotMode.WINDOW and self.snapshot_time_column is not None:
raise ValueError(
f"任务 {self.code}: snapshot_mode={self.snapshot_mode.value} 时不应指定 snapshot_time_column"
)
```
**删除的字段:**
- `conflict_columns_override` — 运行时不生效
- `include_site_column` — 全部 False
- `include_page_no` — 全部 False
- `include_page_size` — 全部 False
- `snapshot_full_table` — 被 SnapshotMode 替代
- `snapshot_window_columns` — 被 SnapshotMode + snapshot_time_column 替代
- `enable_content_hash_dedup` — 被 skip_unchanged 替代
### 3. 23 个任务的 SnapshotMode 映射
当前配置到新配置的映射规则:
| 原配置 | 新配置 |
|--------|--------|
| `snapshot_full_table=True` | `snapshot_mode=SnapshotMode.FULL_TABLE` |
| `snapshot_window_columns=("col",)` | `snapshot_mode=SnapshotMode.WINDOW, snapshot_time_column="col"` |
| 两者都未设置 | `snapshot_mode=SnapshotMode.NONE`(默认值) |
具体任务映射:
| 任务 | 原配置 | 新 snapshot_mode | snapshot_time_column |
|------|--------|-----------------|---------------------|
| ODS_ASSISTANT_ACCOUNT | snapshot_full_table=True | FULL_TABLE | None |
| ODS_MEMBER_CARD | snapshot_full_table=True | FULL_TABLE | None |
| ODS_GROUP_PACKAGE | snapshot_full_table=True | FULL_TABLE | None |
| ODS_STORE_GOODS | snapshot_full_table=True | FULL_TABLE | None |
| ODS_TENANT_GOODS | snapshot_full_table=True | FULL_TABLE | None |
| ODS_TABLE_USE | snapshot_window_columns=("create_time",) | WINDOW | "create_time" |
| ODS_ASSISTANT_LEDGER | snapshot_window_columns=("create_time",) | WINDOW | "create_time" |
| ODS_STORE_GOODS_SALES | snapshot_window_columns=("create_time",) | WINDOW | "create_time" |
| ODS_REFUND | snapshot_window_columns=("pay_time",) | WINDOW | "pay_time" |
| ODS_PLATFORM_COUPON | snapshot_window_columns=("consume_time",) | WINDOW | "consume_time" |
| ODS_MEMBER_BALANCE | snapshot_window_columns=("create_time",) | WINDOW | "create_time" |
| ODS_GROUP_BUY_REDEMPTION | snapshot_window_columns=("create_time",) | WINDOW | "create_time" |
| ODS_TABLE_FEE_DISCOUNT | snapshot_window_columns=("create_time",) | WINDOW | "create_time" |
| 其余 10 个任务 | 无快照配置 | NONE | None |
### 4. _compute_content_hash改造后
```python
@classmethod
def _compute_content_hash(cls, record: dict, payload: Any, is_delete: int) -> str:
"""基于原始 payload 和 is_delete 计算 content_hash。
payload: 原始 API 返回的 JSON 对象(未展平)
is_delete: 0 或 1
"""
payload_str = json.dumps(
payload,
ensure_ascii=False,
sort_keys=True,
separators=(",", ":"),
default=cls._hash_default,
)
raw = f"{payload_str}|{is_delete}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
```
**关键变更:**
- 输入从"展平后的 merged_rec"改为"原始 payload + is_delete"
- 删除 `_sanitize_record_for_hash` 方法(不再需要字段排除逻辑)
- 删除 `include_fetched_at` 参数payload 天然不含 fetched_at
- 分隔符 `|` 确保 payload 和 is_delete 不会产生歧义
**一次性代价:** 切换后首次运行,所有记录的 hash 都会变化(因为算法不同),会插入一批新版本行。这是预期行为,后续运行恢复正常去重。
### 5. _mark_missing_as_deleted改造后
```python
def _mark_missing_as_deleted(self, *, table, business_pk_cols,
snapshot_mode, snapshot_time_column,
window_start, window_end,
key_values, allow_empty) -> int:
"""快照对比软删除INSERT 删除版本行,而非 UPDATE 历史版本。"""
# 1. 查询快照范围内、is_delete != 1 的业务 ID
# 2. 排除本次抓取到的 key_values得到缺失 ID 集合
# 3. 对每个缺失 ID
# a. 读取最新版本行DISTINCT ON ... ORDER BY fetched_at DESC
# b. 若最新版本已是 is_delete=1跳过
# c. 否则:复制该行,设 is_delete=1重算 content_hashINSERT
# 4. 返回插入的删除版本行数
```
**接口变更:**
- `window_columns` 参数改为 `snapshot_mode` + `snapshot_time_column`
- `full_table` 参数删除(由 snapshot_mode 表达)
- 内部从 UPDATE 改为 SELECT + INSERT
### 6. _insert_records_schema_aware 的适配
- `compare_latest` 判断条件中 `self.SPEC.enable_content_hash_dedup` 改为 `self.SPEC.skip_unchanged`
- `_compute_content_hash` 调用签名变更:传入原始 record作为 payload和 is_delete 值
- 删除对 `include_site_column``include_page_no``include_page_size` 的引用
### 7. BaseOdsTask.execute 的适配
- `snapshot_full_table` / `snapshot_window_columns` 的读取改为 `spec.snapshot_mode` / `spec.snapshot_time_column`
- `_mark_missing_as_deleted` 调用参数适配
- 删除对已移除字段的引用
## 数据模型
### ODS 表结构(不变)
所有 23 个 ODS 表的 DDL 结构不变PK 仍为 `(业务id, content_hash)`
### 新增索引(迁移脚本)
每张含 `fetched_at` 列的 ODS 表新增复合索引:
```sql
-- 迁移脚本db/etl_feiqiu/migrations/YYYY-MM-DD__add_ods_latest_version_indexes.sql
-- 为 DISTINCT ON (id) ORDER BY id, fetched_at DESC 查询模式提供索引支持
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ods_member_profiles_latest
ON ods.member_profiles (id, fetched_at DESC);
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_ods_member_balance_changes_latest
ON ods.member_balance_changes (id, fetched_at DESC);
-- ... 对每张含 fetched_at 的 ODS 表重复此模式
-- 索引命名规范idx_ods_{table_name}_latest
-- 业务主键列名因表而异(大多数是 id少数是 recharge_order_id、sitegoodsstockid 等)
```
**注意:**
- `include_fetched_at=False` 的任务(如 ODS_ASSISTANT_ACCOUNT其表中 fetched_at 列有 DEFAULT now(),实际仍有值,也需要索引。但需确认 DDL 中是否所有表都有 fetched_at 列。
- 索引定义需同步写入 `db/etl_feiqiu/schemas/ods.sql`DDL 源文件),确保新环境初始化时自动创建索引。
- 迁移脚本 `db/etl_feiqiu/migrations/YYYY-MM-DD__add_ods_latest_version_indexes.sql` 用于已有环境的增量部署。
### 下游查询规约
DWD 层从 ODS 取数的标准模式:
```sql
SELECT DISTINCT ON (id) *
FROM ods.{table_name}
WHERE is_delete = 0 -- 或 is_delete IS DISTINCT FROM 1
ORDER BY id, fetched_at DESC;
```
此查询利用新增的 `(id, fetched_at DESC)` 索引,避免全表扫描。
## 正确性属性
*正确性属性是系统在所有合法执行路径上都应保持的特征或行为——本质上是对"系统应该做什么"的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。*
### Property 1: SnapshotMode 与 snapshot_time_column 一致性
*For any* OdsTaskSpec 实例,当 snapshot_mode 为 WINDOW 时 snapshot_time_column 必须为非空字符串,当 snapshot_mode 为 FULL_TABLE 或 NONE 时 snapshot_time_column 必须为 None违反此约束应抛出 ValueError。
**Validates: Requirements 2.3, 2.4, 2.5, 2.6**
### Property 2: content_hash 确定性
*For any* 原始 payload合法 JSON 对象)和 is_delete 值0 或 1对相同的 (payload, is_delete) 输入调用 `_compute_content_hash` 应始终产生相同的 SHA-256 哈希值。
**Validates: Requirements 5.1, 5.4**
### Property 3: content_hash 区分性
*For any* 两组不同的 (payload, is_delete) 输入payload 不同或 is_delete 不同),`_compute_content_hash` 应产生不同的哈希值。
**Validates: Requirements 5.5**
### Property 4: skip_unchanged 跳过内容未变的记录
*For any* ODS 任务skip_unchanged=True当一条记录的 content_hash 与数据库中该业务 ID 最新版本的 content_hash 相同时,该记录应被计入 skipped 而非 inserted。
**Validates: Requirements 4.3, 8.5**
### Property 5: 记录数闭合不变量
*For any* 非空记录列表被写入 ODS 时,`fetched == inserted + updated + skipped` 恒成立。
**Validates: Requirements 8.3**
### Property 6: 软删除构造正确性
*For any* 快照对比中发现的缺失业务 ID`_mark_missing_as_deleted` 应读取该 ID 的最新版本行,构造一条 is_delete=1 的新版本行,其 content_hash 基于原始 payload + is_delete=1 重算,并通过 INSERT而非 UPDATE写入。
**Validates: Requirements 7.1, 7.2, 7.4**
### Property 7: 软删除幂等性
*For any* 业务 ID若其最新版本已经是 is_delete=1再次执行 `_mark_missing_as_deleted` 不应插入新的删除版本行。
**Validates: Requirements 7.3, 8.7**
### Property 8: 软删除不修改历史版本
*For any* 软删除操作执行后,数据库中该业务 ID 的所有历史版本行(执行前已存在的行)的内容应保持不变——不应有 UPDATE 语句作用于 ODS 表。
**Validates: Requirements 7.4, 8.6**
## 错误处理
### OdsTaskSpec 校验错误
- `SnapshotMode.WINDOW` + `snapshot_time_column=None``__post_init__` 抛出 `ValueError`
- `SnapshotMode.FULL_TABLE/NONE` + `snapshot_time_column` 不为 None → `__post_init__` 抛出 `ValueError`
- 这些错误在任务注册时(模块加载时)即触发,属于 fail-fast 设计
### hash 算法切换的一次性代价
- 首次运行后所有记录的 content_hash 都会变化,导致全量插入新版本行
- 这是预期行为,不是错误
- 日志中应记录 "hash 算法已变更,本次运行将插入全量新版本" 的提示信息
- 后续运行恢复正常去重
### 软删除的边界情况
- 缺失 ID 在数据库中无任何记录(从未抓取过)→ 跳过,不插入删除版本
- 缺失 ID 的最新版本已是 is_delete=1 → 跳过(幂等性)
- 快照范围内无任何记录且 allow_empty=False → 返回 0不执行任何操作
### 迁移脚本错误
- `CREATE INDEX CONCURRENTLY` 不能在事务块内执行 → 迁移脚本需单独执行
- 索引创建失败不影响数据写入,仅影响查询性能 → 可重试
## 测试策略
### 属性测试hypothesis
使用 `pytest` + `hypothesis` 库,每个属性测试至少运行 100 次迭代。
**测试文件:** `apps/etl/pipelines/feiqiu/tests/unit/test_ods_dedup_properties.py`
| 属性 | 测试方法 | 生成策略 |
|------|---------|---------|
| Property 1 | 生成随机 SnapshotMode + snapshot_time_column 组合,验证校验逻辑 | `st.sampled_from(SnapshotMode)` × `st.one_of(st.none(), st.text(min_size=1))` |
| Property 2 | 生成随机 JSON payload + is_delete验证两次调用结果相同 | `st.dictionaries(st.text(), st.text())` × `st.sampled_from([0, 1])` |
| Property 3 | 生成两组不同的 (payload, is_delete),验证 hash 不同 | 同上,加 `assume(pair1 != pair2)` |
| Property 4 | 用 PkAwareFakeDB 预设最新 hash验证相同记录被跳过 | `_ods_record_with_id` 策略 |
| Property 5 | 生成随机记录列表,验证 fetched == inserted + updated + skipped | `st.lists(_ods_record_with_id)` |
| Property 6 | 用 FakeDB 模拟缺失 ID 场景,验证 INSERT 而非 UPDATE | `st.lists(st.integers())` |
| Property 7 | 预设最新版本 is_delete=1验证不产生新行 | 同上 |
| Property 8 | 执行软删除后检查 FakeDB 中无 UPDATE 语句 | 同上 |
每个测试用注释标注:`# Feature: ods-dedup-standardize, Property N: {title}`
### 单元测试
**测试文件:** 适配现有 `test_ods_tasks.py``test_debug_ods_properties.py`
- 适配 OdsTaskSpec 构造函数变更(删除旧字段,使用新字段)
- 适配 `_compute_content_hash` 签名变更
- 适配 `_mark_missing_as_deleted` 参数变更
- 验证 SnapshotMode 枚举的边界情况edge cases from prework 2.5, 2.6
### 现有测试适配
现有测试中需要适配的关键点:
- `test_debug_ods_properties.py` 中的 Property 4content_hash 确定性)需要适配新的 `_compute_content_hash` 签名
- `test_debug_ods_properties.py` 中的 Property 5快照删除标记需要适配新的 INSERT 语义(检查 INSERT 而非 UPDATE
- `test_ods_tasks.py` 中的所有任务测试需要确保在新的 OdsTaskSpec 下仍能正常运行
### 测试执行命令
```bash
# ETL 单元测试(包含属性测试)
cd apps/etl/pipelines/feiqiu && pytest tests/unit -v
# 仅运行本次改造的属性测试
cd apps/etl/pipelines/feiqiu && pytest tests/unit/test_ods_dedup_properties.py -v
```