Files

10 KiB
Raw Permalink Blame History

设计文档DWD 第一阶段重构

概述

本次重构针对 DwdLoadTasktasks/dwd/dwd_load_task.py),目标是:

  1. 统一事实表增量窗口模式,消除水位线与窗口两套并行的范围过滤
  2. 删除回补机制(_insert_missing_by_pk),简化事实表写入路径
  3. 清理已确认的死代码、未使用常量和方法
  4. 修复 _build_column_mapping() 的参数 bug

所有改动集中在 dwd_load_task.py 一个文件(加上删除 base_dwd_task.py、更新两个外部引用属于低风险重构。重构后代码路径更清晰为第二阶段架构重构E/T/L → process_segment 钩子)做好准备。

架构

当前架构(重构前)

graph TD
    A[DwdLoadTask.load] --> B{事实表?}
    B -->|是| C{use_window?}
    C -->|是| D["_merge_fact_increment(window_start, window_end)"]
    C -->|否| E["_get_fact_watermark() → watermark"]
    E --> F["_merge_fact_increment(watermark)"]
    D --> G["_insert_missing_by_pk回补"]
    F --> G
    B -->|否| H{scd_cols_present?}
    H -->|是| I[_merge_dim_scd2]
    H -->|否| J[_merge_dim_type1_upsert]

目标架构(重构后)

graph TD
    A[DwdLoadTask.load] --> B{事实表?}
    B -->|是| D["_merge_fact_increment(window_start, window_end)"]
    D --> R[返回计数]
    B -->|否| I[_merge_dim_scd2]

关键变化:

  • 事实表路径:消除 use_window 分支判断,始终传 context.window_start/window_end;删除水位线获取和回补步骤
  • 维度表路径:消除 scd_cols_present 条件判断,直接调用 _merge_dim_scd2(所有 17 张维度表都有 SCD2 列)

组件与接口

变更组件清单

组件 变更类型 说明
DwdLoadTask.load() 修改 删除 use_window 判断,始终传 context.window_start/window_end
DwdLoadTask._merge_fact_increment() 修改 删除 watermark 分支和回补调用;window_start/window_end 改为必填参数
DwdLoadTask._merge_dim() 修改 删除 Type1 分支,直接调用 _merge_dim_scd2
DwdLoadTask._build_column_mapping() 修改 ods_tablecur 加入方法签名
DwdLoadTask._get_fact_watermark() 删除 水位线机制不再需要
DwdLoadTask._insert_missing_by_pk() 删除 回补机制不再需要
DwdLoadTask._pick_order_column() 删除 从未被调用的死代码
DwdLoadTask._merge_dim_type1_upsert() 删除 Type1 分支永远不触发
DwdLoadTask._upsert_scd2_row() 删除 已被批量方法替代
DwdLoadTask._close_current_dim() 删除 已被 _close_current_dim_bulk 替代
DwdLoadTask._insert_dim_row() 删除 已被 _insert_dim_rows_bulk 替代
FACT_ORDER_CANDIDATES 常量 删除 配合 _pick_order_column 删除
FACT_MISSING_FILL_TABLES 常量 删除 配合回补机制删除
base_dwd_task.py 删除文件 死代码,从未被使用
debug_dwd.py 修改 内联时间列候选列表,替代对 FACT_ORDER_CANDIDATES 的引用
integrity_checker.py 修改 内联时间列候选列表,替代对 FACT_ORDER_CANDIDATES 的引用

接口变更详情

_merge_fact_increment() 签名变更

# 重构前
def _merge_fact_increment(
    self, cur, dwd_table, ods_table, dwd_cols, ods_cols,
    dwd_types, ods_types,
    window_start: datetime | None = None,  # 可选
    window_end: datetime | None = None,    # 可选
) -> Dict[str, int]:

# 重构后
def _merge_fact_increment(
    self, cur, dwd_table, ods_table, dwd_cols, ods_cols,
    dwd_types, ods_types,
    window_start: datetime,   # 必填
    window_end: datetime,     # 必填
) -> Dict[str, int]:

_build_column_mapping() 签名变更

# 重构前bug引用了外部作用域的 ods_table 和 cur
def _build_column_mapping(
    self, dwd_table: str, pk_cols: Sequence[str], ods_cols: Sequence[str]
) -> Dict[str, tuple[str, str | None]]:

# 重构后
def _build_column_mapping(
    self, cur, dwd_table: str, ods_table: str,
    pk_cols: Sequence[str], ods_cols: Sequence[str]
) -> Dict[str, tuple[str, str | None]]:

_merge_dim() 简化

# 重构前
def _merge_dim(self, cur, dwd_table, ods_table, dwd_cols, ods_cols, now):
    pk_cols = self._get_primary_keys(cur, dwd_table)
    scd_cols_present = any(c.lower() in self.SCD_COLS for c in dwd_cols)
    if scd_cols_present:
        return self._merge_dim_scd2(...)
    return self._merge_dim_type1_upsert(...)

# 重构后
def _merge_dim(self, cur, dwd_table, ods_table, dwd_cols, ods_cols, now):
    return self._merge_dim_scd2(cur, dwd_table, ods_table, dwd_cols, ods_cols, now)

load() 事实表分支简化

# 重构前
use_window = bool(
    self.config.get("run.window_override.start")
    and self.config.get("run.window_override.end")
)
fact_counts = self._merge_fact_increment(
    ...,
    window_start=context.window_start if use_window else None,
    window_end=context.window_end if use_window else None,
)

# 重构后
fact_counts = self._merge_fact_increment(
    ...,
    window_start=context.window_start,
    window_end=context.window_end,
)

外部引用处理

FACT_ORDER_CANDIDATES 被两个外部模块引用,删除后需要同步处理:

  1. scripts/debug/debug_dwd.py:将候选列表内联为模块级常量 _TIME_COLUMN_CANDIDATES
  2. quality/integrity_checker.py:将候选列表内联为模块级常量 _TIME_COLUMN_CANDIDATES

两处内联的列表内容与原 FACT_ORDER_CANDIDATES 相同:["pay_time", "create_time", "update_time", "occur_time", "settle_time", "start_use_time", "fetched_at"]。这些模块的用途是调试和完整性检查,与 DWD 装载的核心逻辑无关,内联不会引入耦合问题。

数据模型

本次重构不涉及数据库 schema 变更。所有 DWD 表结构、ODS 表结构、FACT_MAPPINGSTABLE_MAP 保持不变。

变更仅影响运行时行为:

  • 事实表增量 SQL 的 WHERE 条件从"水位线或窗口"统一为"窗口"
  • 回补 LEFT JOIN SQL 不再执行
  • 维度表合并路径从"SCD2 或 Type1"统一为"SCD2"

正确性属性

属性Property是一种在系统所有有效执行中都应成立的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格说明与机器可验证正确性保证之间的桥梁。

基于验收标准的前置分析,以下属性覆盖了本次重构的核心正确性要求。静态代码结构检查(方法/常量/文件是否存在)通过单元测试覆盖,不作为属性列出。

Property 1: 事实表增量 SQL 始终使用窗口范围条件

For any 有效的事实表映射(TABLE_MAPdwd_ 前缀的表)和任意的 window_start/window_end 时间对(window_start < window_end_merge_fact_increment() 生成的主增量 SQL 的 WHERE 子句 SHALL 包含 fetched_at >= window_start AND fetched_at < window_end 条件,且不包含单边水位线条件(fetched_at > watermark)。

Validates: Requirements 1.1, 1.2

Property 2: 事实表增量不执行回补 SQL

For any 有效的事实表映射和任意的窗口参数,_merge_fact_increment() 执行的 SQL 语句列表中 SHALL 不包含 LEFT JOIN 回补查询(即不包含 LEFT JOIN ... WHERE ... IS NULL 模式的 SQL

Validates: Requirements 2.3

Property 3: 事实表主增量 SQL 结构等价性

For any 有效的事实表映射、列集合和窗口参数,重构后 _merge_fact_increment() 生成的主增量 INSERT INTO ... SELECT ... ON CONFLICT SQL 的结构列列表、SELECT 表达式、ON CONFLICT 子句SHALL 与重构前窗口模式(use_window=True)生成的 SQL 结构相同。

Validates: Requirements 5.2, 5.5

Property 4: 维度表始终走 SCD2 路径

For any 有效的维度表映射(TABLE_MAPdim_ 前缀的表),调用 _merge_dim() SHALL 始终委托给 _merge_dim_scd2(),不经过任何条件分支判断。

Validates: Requirements 3.8, 5.1

错误处理

本次重构不引入新的错误处理逻辑,保留现有机制:

场景 处理方式 变更
单表事实增量 SQL 执行失败(含类型转换错误) load()try/except 捕获,回滚该表事务,记录错误日志,继续处理下一张表 无变更(需求 2.4:异常自然传播到 load() 层面)
维度表 SCD2 合并失败 同上 无变更
ODS 表缺少 fetched_at _merge_fact_increment 记录 error 日志并返回零计数 无变更
DWD 表无法获取列信息 load() 记录 warning 并跳过该表 无变更

删除回补机制后,原本由回补兜底的"部分行丢失"场景不再存在——类型转换失败会导致整条 SQL 报错、整张表事务回滚,这是正确的行为(需求 2.4)。

测试策略

测试框架

  • 单元测试:pytest,使用 FakeDB/FakeCursortests/unit/task_test_utils.py
  • 属性测试:hypothesis,最少 100 次迭代
  • 测试位置:
    • 单元测试:apps/etl/pipelines/feiqiu/tests/unit/test_dwd_phase1_refactor.py
    • 属性测试:tests/test_dwd_phase1_properties.pymonorepo 根目录)

双轨测试方法

单元测试覆盖:

  • 静态代码结构验证(死代码/常量/方法已删除)
  • _build_column_mapping() bug 修复后的调用正确性
  • _merge_dim() 直接调用 SCD2 的行为
  • 外部模块(debug_dwd.pyintegrity_checker.py)导入不报错
  • 表过滤功能(dwd.only_tables)回归

属性测试覆盖:

  • Property 1: 事实表 SQL 窗口条件(通过 FakeCursor 捕获 SQL验证 WHERE 子句)
  • Property 2: 无回补 SQL通过 FakeCursor 捕获所有执行的 SQL验证无 LEFT JOIN
  • Property 3: SQL 结构等价性(对比重构前后生成的 SQL
  • Property 4: 维度表 SCD2 路径(通过 mock 验证调用链)

属性测试配置

  • 库:hypothesis
  • 每个属性最少 100 次迭代
  • 每个属性测试标注对应的设计属性编号
  • 标注格式:# Feature: dwd-phase1-refactor, Property N: {property_text}

测试数据生成策略

使用 hypothesis@st.composite 策略生成:

  • 随机的 window_start/window_end 时间对(确保 start < end
  • 随机的列名集合(确保包含 fetched_at 和至少一个主键列)
  • 随机的 FACT_MAPPINGS 条目(列名 → 源列 + 可选类型转换)

使用 FakeCursor 捕获执行的 SQL 语句,而非实际连接数据库。