Files

7.5 KiB
Raw Permalink Blame History

实现计划ODS 去重与软删除机制标准化

概述

按方案 1→2→3→4 的顺序递进实现,每个方案完成后有检查点。核心改动集中在 ods_tasks.py,辅以 DDL 迁移和文档同步。

任务

  • 1. 方案 1清理 OdsTaskSpec 无效/冗余配置

    • 1.1 添加 SnapshotMode 枚举,重构 OdsTaskSpec dataclass

      • ods_tasks.py 顶部定义 SnapshotMode 枚举NONE / FULL_TABLE / WINDOW
      • 从 OdsTaskSpec 中删除 conflict_columns_overrideinclude_site_columninclude_page_noinclude_page_sizesnapshot_full_tablesnapshot_window_columnsenable_content_hash_dedup
      • 添加 skip_unchanged: bool = Truesnapshot_mode: SnapshotMode = SnapshotMode.NONEsnapshot_time_column: str | None = None
      • 重写 __post_init__ 校验逻辑WINDOW 必须有 snapshot_time_columnFULL_TABLE/NONE 不能有
      • Requirements: 1.1, 1.2, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 3.1, 4.1
    • 1.2 迁移 23 个 ODS_TASK_SPECS 声明到新字段

      • 按设计文档中的映射表,将每个任务的 snapshot_full_table/snapshot_window_columns 转换为 snapshot_mode/snapshot_time_column
      • 删除所有任务中的 conflict_columns_override、include_site_column、include_page_no、include_page_size
      • 将 ODS_RECHARGE_SETTLE 的 enable_content_hash_dedup=True 改为 skip_unchanged=True其余任务使用默认值 True
      • Requirements: 1.3, 2.7, 3.3
    • 1.3 适配 BaseOdsTask.execute 和相关方法中对旧字段的引用

      • execute 方法中 snapshot_full_table / snapshot_window_columns 改为读取 spec.snapshot_mode / spec.snapshot_time_column
      • _mark_missing_as_deleted 参数签名适配(暂保持 UPDATE 语义,方案 4 再改)
      • 删除 BaseOdsTask 中对 include_site_columninclude_page_noinclude_page_size 的引用
      • _insert_records_schema_awareenable_content_hash_dedup 改为 skip_unchanged
      • Requirements: 1.2, 3.2, 4.2
    • 1.4 编写 SnapshotMode 校验属性测试

      • Property 1: SnapshotMode 与 snapshot_time_column 一致性
      • Validates: Requirements 2.3, 2.4, 2.5, 2.6
  • 2. 检查点 - 方案 1 完成

    • 确保所有测试通过ask the user if questions arise.
    • 运行 cd apps/etl/pipelines/feiqiu && pytest tests/unit -v
    • 验证现有 test_ods_tasks.py 和 test_debug_ods_properties.py 适配后通过
  • 3. 方案 2默认开启 skip_unchanged + hash 算法改为 payload + is_delete

    • 3.1 重写 _compute_content_hash删除 _sanitize_record_for_hash

      • 新签名:_compute_content_hash(cls, record: dict, payload: Any, is_delete: int) -> str
      • 基于 json.dumps(payload, sort_keys=True, separators=(',',':'), ensure_ascii=False) + | + is_delete 计算 SHA-256
      • 删除 _sanitize_record_for_hash 方法
      • 删除 include_fetched_at 参数
      • Requirements: 5.1, 5.2, 5.3
    • 3.2 适配 _insert_records_schema_aware 中的 hash 计算调用

      • _compute_content_hash(merged_rec, include_fetched_at=False) 改为 _compute_content_hash(merged_rec, payload=rec, is_delete=merged_rec.get("is_delete", 0))
      • 其中 rec 是原始 API 返回的记录(未展平),merged_rec 中的 is_delete 已被 _normalize_is_delete_flag 标准化
      • Requirements: 5.1, 5.2
    • 3.3 编写 content_hash 确定性和区分性属性测试

      • Property 2: content_hash 确定性
      • Validates: Requirements 5.1, 5.4
      • Property 3: content_hash 区分性
      • Validates: Requirements 5.5
    • 3.4 编写 skip_unchanged 和记录数闭合属性测试

      • Property 4: skip_unchanged 跳过内容未变的记录
      • Validates: Requirements 4.3, 8.5
      • Property 5: 记录数闭合不变量
      • Validates: Requirements 8.3
  • 4. 检查点 - 方案 2 完成

    • 确保所有测试通过ask the user if questions arise.
    • 适配 test_debug_ods_properties.py 中 Property 4content_hash 确定性)到新签名
  • 5. 方案 3DDL 迁移 - 添加"取最新版本"索引

    • 5.1 创建迁移脚本并更新 DDL 源文件
      • 创建 db/etl_feiqiu/migrations/YYYY-MM-DD__add_ods_latest_version_indexes.sql
      • 为每张含 fetched_at 列的 ODS 表创建 (业务主键, fetched_at DESC) 复合索引
      • 使用 CREATE INDEX CONCURRENTLY IF NOT EXISTS
      • 索引命名:idx_ods_{table_name}_latest
      • 同步更新 db/etl_feiqiu/schemas/ods.sql 中的索引定义
      • Requirements: 6.1, 6.2, 6.3
  • 6. 方案 4软删除改为"插入删除版本"

    • 6.1 重写 _mark_missing_as_deleted 方法

      • 接口变更:window_columns/full_table 参数改为 snapshot_mode/snapshot_time_column
      • 查询快照范围内 is_delete != 1 的业务 ID排除本次抓取到的 key_values
      • 对每个缺失 ID读取最新版本行DISTINCT ON + ORDER BY fetched_at DESC
      • 若最新版本已是 is_delete=1 → 跳过
      • 否则:复制该行,设 is_delete=1重算 content_hashINSERT 新行
      • Requirements: 7.1, 7.2, 7.3, 7.4
    • 6.2 适配 BaseOdsTask.execute 中的 _mark_missing_as_deleted 调用

      • 传入 snapshot_mode 和 snapshot_time_column 替代旧参数
      • 更新 deleted 计数逻辑(从 UPDATE rowcount 改为 INSERT count
      • Requirements: 7.1
    • 6.3 编写软删除属性测试

      • Property 6: 软删除构造正确性
      • Validates: Requirements 7.1, 7.2, 7.4
      • Property 7: 软删除幂等性
      • Validates: Requirements 7.3, 8.7
      • Property 8: 软删除不修改历史版本
      • Validates: Requirements 7.4, 8.6
  • 7. 检查点 - 方案 4 完成

    • 确保所有测试通过ask the user if questions arise.
    • 适配 test_debug_ods_properties.py 中 Property 5快照删除标记到新的 INSERT 语义
    • 运行完整测试套件:cd apps/etl/pipelines/feiqiu && pytest tests/unit -v
  • 8. 文档同步

    • 8.1 更新 ods_task_params_matrix.md

      • 反映新字段skip_unchanged、snapshot_mode、snapshot_time_column
      • 移除已删除字段列
      • 确保 23 个任务的完整参数矩阵
      • Requirements: 9.1, 9.2
    • 8.2 更新 ods_tasks.md 和 base_task_mechanism.md

      • 更新去重机制说明skip_unchanged 默认开启、hash 基于 payload + is_delete
      • 更新软删除机制说明INSERT 删除版本行、双路径覆盖)
      • Requirements: 9.3
    • 8.3 更新 ODS 数据库文档和 ods_tables_dictionary.md

      • 记录新增的 (业务主键, fetched_at DESC) 索引
      • 更新下游取数规约说明
      • Requirements: 9.4
    • 8.4 更新 docs/database/etl_feiqiu_schema_migration.md

      • 记录本次迁移变更(索引添加)
      • Requirements: 9.5
    • 8.5 更新 ods_taskspec_refactor_proposal.md

      • 标记本次改造已完成的方案1-4
      • 记录方案 5冷数据归档为中长期待办
      • Requirements: 9.6
  • 9. 最终检查点

    • 确保所有测试通过ask the user if questions arise.
    • 运行 cd apps/etl/pipelines/feiqiu && pytest tests/unit -v
    • 运行 cd C:\NeoZQYY && pytest tests/ -vmonorepo 属性测试)

备注

  • 标记 * 的任务为可选测试任务,可跳过以加速 MVP
  • 每个任务引用具体需求编号以确保可追溯
  • 检查点确保增量验证
  • 属性测试验证通用正确性属性,单元测试验证具体示例和边界情况
  • 本次改造涉及高风险路径tasks/),完成后需触发 /audit