Files

5.9 KiB
Raw Permalink Blame History

实施计划DWD 第一阶段重构

概述

按照 confirmed_changes.md 中 2.1-2.4 的顺序,对 DwdLoadTask 进行低风险重构。改动集中在 dwd_load_task.py,辅以删除 base_dwd_task.py 和更新两个外部引用。每个步骤递增构建,确保无悬空代码。

任务

  • 1. 统一窗口模式,去掉水位线(需求 1

    • 1.1 修改 _merge_fact_increment() 签名:window_startwindow_end 改为必填参数(去掉 | None 和默认值 None);删除方法体内的 watermark 分支(elif order_col: 块及 watermark = None 赋值),统一使用 WHERE fetched_at >= %s AND fetched_at < %s
      • 文件:tasks/dwd/dwd_load_task.py,方法 _merge_fact_increment
      • Requirements: 1.2, 1.4
    • 1.2 修改 load() 中事实表分支:删除 use_window 变量及条件判断,始终传 window_start=context.window_start, window_end=context.window_end
      • 文件:tasks/dwd/dwd_load_task.py,方法 load
      • Requirements: 1.1
    • 1.3 删除 _get_fact_watermark() 方法(约 30 行)
      • 文件:tasks/dwd/dwd_load_task.py
      • Requirements: 1.3
    • 1.4 编写属性测试:事实表增量 SQL 始终使用窗口范围条件
      • Property 1: 事实表增量 SQL 始终使用窗口范围条件
      • Validates: Requirements 1.1, 1.2
      • 使用 hypothesis 生成随机 window_start/window_end,通过 FakeCursor 捕获 SQL验证 WHERE 子句包含 >= %s AND < %s,不包含单边水位线条件
      • 文件:tests/test_dwd_phase1_properties.py
  • 2. 删除回补机制(需求 2

    • 2.1 删除 _merge_fact_increment() 末尾的回补调用块:移除对 _insert_missing_by_pk() 的调用及 missing_inserted 相关代码
      • 文件:tasks/dwd/dwd_load_task.py,方法 _merge_fact_increment
      • Requirements: 2.3
    • 2.2 删除 _insert_missing_by_pk() 方法(约 100 行)
      • 文件:tasks/dwd/dwd_load_task.py
      • Requirements: 2.1
    • 2.3 删除 FACT_MISSING_FILL_TABLES 常量
      • 文件:tasks/dwd/dwd_load_task.py
      • Requirements: 2.2
    • 2.4 编写属性测试:事实表增量不执行回补 SQL
      • Property 2: 事实表增量不执行回补 SQL
      • Validates: Requirements 2.3
      • 通过 FakeCursor 捕获所有执行的 SQL验证无 LEFT JOIN ... IS NULL 模式
      • 文件:tests/test_dwd_phase1_properties.py
  • 3. 检查点 - 确保窗口统一和回补删除后测试通过

    • 运行 cd apps/etl/pipelines/feiqiu && pytest tests/unitcd C:\NeoZQYY && pytest tests/ -v,确保所有测试通过,如有问题请询问用户。
  • 4. 清理死代码和未使用常量(需求 3

    • 4.1 删除 _pick_order_column() 方法和 FACT_ORDER_CANDIDATES 常量
      • 文件:tasks/dwd/dwd_load_task.py
      • Requirements: 3.2, 3.3
    • 4.2 更新外部引用:在 debug_dwd.pyintegrity_checker.py 中将 DwdLoadTask.FACT_ORDER_CANDIDATES 替换为模块级常量 _TIME_COLUMN_CANDIDATES(内联相同列表)
      • 文件:scripts/debug/debug_dwd.pyquality/integrity_checker.py
      • Requirements: 3.9
    • 4.3 删除逐行 SCD2 方法:_upsert_scd2_row()_close_current_dim()_insert_dim_row()
      • 文件:tasks/dwd/dwd_load_task.py
      • Requirements: 3.4, 3.5, 3.6
    • 4.4 删除 _merge_dim_type1_upsert() 方法;简化 _merge_dim() 直接调用 _merge_dim_scd2()
      • 文件:tasks/dwd/dwd_load_task.py
      • Requirements: 3.7, 3.8
    • 4.5 删除 base_dwd_task.py 文件
      • 文件:tasks/dwd/base_dwd_task.py
      • Requirements: 3.1
    • 4.6 编写属性测试:维度表始终走 SCD2 路径
      • Property 4: 维度表始终走 SCD2 路径
      • Validates: Requirements 3.8, 5.1
      • 通过 mock _merge_dim_scd2 验证 _merge_dim() 始终委托给它
      • 文件:tests/test_dwd_phase1_properties.py
  • 5. 修复 _build_column_mapping() 参数 bug需求 4

    • 5.1 修改 _build_column_mapping() 签名:添加 curods_table 参数;更新所有调用点(_merge_dim_type1_upsert 已删除,剩余调用点为 _merge_dim_scd2)传入正确参数
      • 文件:tasks/dwd/dwd_load_task.py
      • Requirements: 4.1, 4.2
    • 5.2 编写单元测试:验证 _build_column_mapping()fetched_at 缺失时正确使用 ods_tablecur 参数记录日志
      • 文件:apps/etl/pipelines/feiqiu/tests/unit/test_dwd_phase1_refactor.py
      • Requirements: 4.1
  • 6. 编写回归单元测试(需求 5

    • 6.1 编写单元测试:验证死代码已清理(hasattr 检查所有已删除的方法和常量)
      • 文件:apps/etl/pipelines/feiqiu/tests/unit/test_dwd_phase1_refactor.py
      • Requirements: 1.3, 2.1, 2.2, 3.1-3.7
    • 6.2 编写单元测试:验证外部模块导入正常(debug_dwd.pyintegrity_checker.py 无 ImportError
      • 文件:apps/etl/pipelines/feiqiu/tests/unit/test_dwd_phase1_refactor.py
      • Requirements: 3.9
    • 6.3 编写属性测试:事实表主增量 SQL 结构等价性
      • Property 3: 事实表主增量 SQL 结构等价性
      • Validates: Requirements 5.2, 5.5
      • 使用 hypothesis 生成随机列集合和窗口参数,通过 FakeCursor 捕获 SQL验证 INSERT INTO ... ON CONFLICT 结构与预期一致
      • 文件:tests/test_dwd_phase1_properties.py
  • 7. 最终检查点 - 确保所有测试通过

    • 运行 cd apps/etl/pipelines/feiqiu && pytest tests/unitcd C:\NeoZQYY && pytest tests/ -v,确保所有测试通过,如有问题请询问用户。

备注

  • 标记 * 的子任务为可选测试任务,可跳过以加速 MVP
  • 每个任务引用了具体的需求编号,确保可追溯
  • 检查点确保增量验证
  • 属性测试验证通用正确性属性,单元测试验证具体示例和边界情况
  • 本次重构涉及 tasks/ 高风险路径,完成后需运行 /audit