Files
Neo-ZQYY/.kiro/specs/ods-dedup-standardize/requirements.md

144 lines
11 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 需求文档ODS 去重与软删除机制标准化
## 简介
NeoZQYY ETL 系统的 23 个 ODS 任务在去重和软删除机制上存在配置误导、无意义版本膨胀、软删除语义不清等问题。本需求旨在精简 `OdsTaskSpec` 配置、标准化 content_hash 去重策略、优化软删除语义,使 ODS 层真正实现"忠实记录上游数据版本变更"的职责。
## 术语表
- **OdsTaskSpec**`ods_tasks.py` 中定义 ODS 任务配置的 dataclass包含端点、主键、去重开关等字段
- **BaseOdsTask**ODS 任务执行基类,包含写入、去重、软删除等核心逻辑
- **content_hash**:基于记录内容计算的 SHA-256 哈希值,与业务 ID 组成复合主键
- **skip_unchanged**:重命名后的去重开关(原 `enable_content_hash_dedup`),为 True 时跳过内容未变的记录
- **SnapshotMode**新增枚举统一表达软删除快照策略NONE / FULL_TABLE / WINDOW
- **业务主键**ODS 表复合主键中除 content_hash 外的列(通常为 `id`
- **软删除**:将上游已不存在的记录标记为 `is_delete=1`
- **payload**ODS 表中存储的原始 API 响应 JSON
## 需求
### 需求 1删除运行时无效的 conflict_columns_override 字段
**用户故事:** 作为 ETL 开发者,我希望移除不生效的配置字段,以避免对 ODS 写入行为产生误解。
#### 验收标准
1. WHEN `OdsTaskSpec` dataclass 被定义时THE OdsTaskSpec SHALL 不包含 `conflict_columns_override` 字段
2. WHEN `OdsTaskSpec.__post_init__` 执行校验时THE OdsTaskSpec SHALL 不包含任何引用 `conflict_columns_override` 的校验逻辑
3. WHEN 23 个 `ODS_TASK_SPECS` 声明被更新后THE ODS_TASK_SPECS SHALL 不包含任何 `conflict_columns_override` 参数
### 需求 2用 SnapshotMode 枚举替代软删除组合字段
**用户故事:** 作为 ETL 开发者,我希望用单一枚举字段表达快照策略,以消除 `snapshot_full_table``snapshot_window_columns` 的组合歧义。
#### 验收标准
1. THE SnapshotMode 枚举 SHALL 定义三个值NONE、FULL_TABLE、WINDOW
2. WHEN `OdsTaskSpec` dataclass 被定义时THE OdsTaskSpec SHALL 使用 `snapshot_mode: SnapshotMode` 替代 `snapshot_full_table: bool``snapshot_window_columns`
3. WHEN `snapshot_mode` 为 WINDOW 时THE OdsTaskSpec SHALL 要求 `snapshot_time_column` 为非空字符串
4. WHEN `snapshot_mode` 为 FULL_TABLE 或 NONE 时THE OdsTaskSpec SHALL 要求 `snapshot_time_column` 为 None
5. IF `snapshot_mode` 为 WINDOW 且 `snapshot_time_column` 为 NoneTHEN THE OdsTaskSpec SHALL 在 `__post_init__` 中抛出 ValueError
6. IF `snapshot_mode` 为 FULL_TABLE 且 `snapshot_time_column` 不为 NoneTHEN THE OdsTaskSpec SHALL 在 `__post_init__` 中抛出 ValueError
7. WHEN 23 个 `ODS_TASK_SPECS` 声明被迁移后THE ODS_TASK_SPECS SHALL 使用 `snapshot_mode``snapshot_time_column` 替代原有的 `snapshot_full_table``snapshot_window_columns`
### 需求 3删除全局恒定的冗余布尔字段
**用户故事:** 作为 ETL 开发者,我希望移除所有任务中值恒定的配置字段,以减少 OdsTaskSpec 的认知负担。
#### 验收标准
1. WHEN `OdsTaskSpec` dataclass 被定义时THE OdsTaskSpec SHALL 不包含 `include_site_column``include_page_no``include_page_size` 字段
2. WHEN `BaseOdsTask` 执行逻辑中引用上述三个字段时THE BaseOdsTask SHALL 将对应逻辑硬编码为 False 或直接移除
3. WHEN 23 个 `ODS_TASK_SPECS` 声明被更新后THE ODS_TASK_SPECS SHALL 不包含 `include_site_column``include_page_no``include_page_size` 参数
### 需求 4重命名去重开关并默认开启
**用户故事:** 作为 ETL 开发者,我希望去重开关名称更直观且默认开启,以减少无意义的版本膨胀。
#### 验收标准
1. WHEN `OdsTaskSpec` dataclass 被定义时THE OdsTaskSpec SHALL 使用 `skip_unchanged: bool = True` 替代 `enable_content_hash_dedup: bool = False`
2. WHEN `BaseOdsTask._insert_records_schema_aware` 执行去重判断时THE BaseOdsTask SHALL 引用 `self.SPEC.skip_unchanged` 而非 `self.SPEC.enable_content_hash_dedup`
3. WHEN `skip_unchanged` 为 True 且目标表有 content_hash 列和业务主键时THE BaseOdsTask SHALL 跳过 content_hash 与数据库中最新版本相同的记录
### 需求 5改用 payload + is_delete 计算 content_hash
**用户故事:** 作为 ETL 开发者,我希望 content_hash 基于原始 payload 和 is_delete 计算,以获得最干净的语义且不受展平逻辑影响。
#### 验收标准
1. WHEN `_compute_content_hash` 计算哈希时THE BaseOdsTask SHALL 仅基于记录的 payload原始 JSON和 is_delete 字段计算 SHA-256 哈希
2. WHEN `_compute_content_hash` 计算哈希时THE BaseOdsTask SHALL 对 payload 进行 `json.dumps(sort_keys=True, separators=(',',':'), ensure_ascii=False)` 序列化后拼接 is_delete 值再计算哈希
3. WHEN `_sanitize_record_for_hash` 被调用时THE BaseOdsTask SHALL 移除该方法,因为新的 hash 计算不再需要字段排除逻辑
4. THE _compute_content_hash SHALL 对相同的 payload 和 is_delete 组合始终产生相同的哈希值
5. THE _compute_content_hash SHALL 对不同的 payload 或不同的 is_delete 值产生不同的哈希值
### 需求 6为 ODS 表添加"取最新版本"索引
**用户故事:** 作为 ETL 开发者,我希望每张 ODS 表有 `(业务id, fetched_at DESC)` 复合索引,以高效支持 `DISTINCT ON` 取最新版本的查询模式。
#### 验收标准
1. WHEN DDL 迁移脚本执行后THE 数据库 SHALL 为每张含 fetched_at 列的 ODS 表创建 `(业务主键, fetched_at DESC)` 复合索引
2. WHEN 索引创建时THE 迁移脚本 SHALL 使用 `CREATE INDEX IF NOT EXISTS` 以保证幂等性
3. WHEN 索引创建时THE 迁移脚本 SHALL 使用 `CONCURRENTLY` 选项以避免锁表
### 需求 7软删除改为"插入删除版本"
**用户故事:** 作为 ETL 开发者,我希望软删除操作插入一条 `is_delete=1` 的新版本行,而非 UPDATE 所有历史版本,以保持 ODS 追加写入的语义一致性。
**背景:** 软删除有两个触发路径:
- **路径 AAPI 返回)**:上游 API 的 JSON 响应中自带 `is_delete`/`isDelete` 等字段,由 `_normalize_is_delete_flag` 标准化后随记录正常写入。此路径在需求 5is_delete 参与 hash生效后自动产生新版本行无需额外改造。
- **路径 B快照空缺**:通过 `_mark_missing_as_deleted` 在快照范围内FULL_TABLE 模式对比全表WINDOW 模式仅对比时间窗口内的记录)对比本次抓取的业务 ID 集合与数据库已有记录,发现缺失的 ID 需标记为删除。仅当任务配置了 `snapshot_mode` 为 FULL_TABLE 或 WINDOW 时才触发。此路径是本需求的改造重点。
#### 验收标准
1. WHEN `_mark_missing_as_deleted` 检测到某业务 ID 在上游已不存在时(路径 BTHE BaseOdsTask SHALL 读取该业务 ID 的最新版本行内容
2. WHEN 构造删除版本行时THE BaseOdsTask SHALL 将 is_delete 设为 1保留其余字段不变并基于 payload + is_delete=1 重算 content_hash
3. WHEN 删除版本行的 content_hash 与该业务 ID 最新版本的 content_hash 相同时THE BaseOdsTask SHALL 跳过插入(该记录已经是删除状态)
4. WHEN 删除版本行被插入后THE BaseOdsTask SHALL 不修改该业务 ID 的任何历史版本行
5. WHEN 上游 API 返回的记录中 is_delete=1 时(路径 ATHE BaseOdsTask SHALL 通过正常写入流程插入新版本行is_delete 参与 hash 计算hash 变化即为新版本)
6. WHEN 下游查询 ODS 数据时THE 查询规约 SHALL 先按业务 ID 取 `fetched_at DESC` 最新版本,再过滤 `is_delete = 0`
### 需求 8回归验证策略
**用户故事:** 作为 ETL 开发者,我希望有完善的回归测试覆盖本次改造的所有核心逻辑变更,以确保 23 个 ODS 任务在改造后行为正确。
**挑战:** 本次改造涉及 OdsTaskSpec 字段重构、hash 算法变更、软删除语义变更,影响所有 23 个任务的写入和删除路径。需要分层验证:
- 单元级OdsTaskSpec 校验逻辑、SnapshotMode 枚举约束、hash 计算纯函数
- 行为级skip_unchanged 去重、软删除插入版本、记录数闭合
- 属性级:用 hypothesis 对核心不变量进行随机化验证
#### 验收标准
1. WHEN OdsTaskSpec 使用 SnapshotMode.WINDOW 且 snapshot_time_column 为 None 时THE 单元测试 SHALL 验证 __post_init__ 抛出 ValueError
2. WHEN OdsTaskSpec 使用 SnapshotMode.FULL_TABLE 且 snapshot_time_column 不为 None 时THE 单元测试 SHALL 验证 __post_init__ 抛出 ValueError
3. WHEN 任意非空记录列表被写入 ODS 时THE 属性测试 SHALL 验证 fetched == inserted + updated + skipped记录数闭合不变量
4. WHEN 同一条记录的 payload 和 is_delete 不变时THE 属性测试 SHALL 验证 _compute_content_hash 产生相同的哈希值
5. WHEN skip_unchanged=True 且记录内容未变时THE 属性测试 SHALL 验证该记录被跳过而非重复插入
6. WHEN 快照对比发现缺失 ID 时THE 属性测试 SHALL 验证生成的是 INSERT 语句(而非 UPDATE且历史版本行不被修改
7. WHEN 缺失 ID 的最新版本已经是 is_delete=1 时THE 属性测试 SHALL 验证不会重复插入删除版本
8. WHEN 现有的 test_ods_tasks.py 和 test_debug_ods_properties.py 中的测试用例被适配到新接口后THE 测试套件 SHALL 全部通过
### 需求 9同步更新所有相关文档
**用户故事:** 作为 ETL 开发者,我希望所有涉及 ODS 去重、软删除、OdsTaskSpec 配置的文档与代码变更保持同步,以确保文档准确反映当前实现。
**涉及文档清单:**
- `apps/etl/pipelines/feiqiu/docs/etl_tasks/ods_task_params_matrix.md` — 任务参数矩阵
- `apps/etl/pipelines/feiqiu/docs/etl_tasks/ods_tasks.md` — ODS 任务说明
- `apps/etl/pipelines/feiqiu/docs/etl_tasks/base_task_mechanism.md` — 基础任务机制
- `apps/etl/pipelines/feiqiu/docs/architecture/ods_taskspec_refactor_proposal.md` — OdsTaskSpec 重构提案
- `apps/etl/pipelines/feiqiu/docs/database/ODS/` — ODS 数据库文档目录
- `apps/etl/pipelines/feiqiu/docs/database/overview/ods_tables_dictionary.md` — ODS 表字典
- `docs/database/etl_feiqiu_schema_migration.md` — 项目级 schema 迁移文档
#### 验收标准
1. WHEN OdsTaskSpec 字段发生变更后THE ods_task_params_matrix.md SHALL 反映新的字段名称和默认值skip_unchanged、snapshot_mode、snapshot_time_column并移除已删除字段的列
2. WHEN OdsTaskSpec 字段发生变更后THE ods_task_params_matrix.md SHALL 包含所有 23 个任务的完整参数矩阵
3. WHEN 去重和软删除机制发生变更后THE ods_tasks.md 和 base_task_mechanism.md SHALL 更新对应的机制说明
4. WHEN DDL 迁移脚本添加索引后THE ODS 数据库文档和 ods_tables_dictionary.md SHALL 记录新增索引
5. WHEN DDL 迁移脚本添加索引后THE docs/database/etl_feiqiu_schema_migration.md SHALL 记录本次迁移变更
6. WHEN 所有文档更新完成后THE 文档 SHALL 逐个文件检查并确保内容与代码实现一致