Files

16 KiB
Raw Permalink Blame History

设计文档ETL 管道全流程调试与 Debug

概述

本设计覆盖 apps/etl/pipelines/feiqiu/ 下 ETL 管道的全流程调试,采用五阶段策略:

  1. 分层单元调试逐层ODS → DWD → DWS → INDEX使用 FakeDB/FakeAPI 和真实数据库验证任务逻辑
  2. 全量数据刷新:执行 2026-01-01 至 2026-02-16 的 api_full Flow内嵌性能计时边执行边 Debug发现问题即修复并重试
  3. 黑盒数据校验:从 API 源数据出发,逐层对比各 Schema 各表的记录数、金额汇总、可疑值检测和抽样逐字段比对
  4. 架构优化分析:分析 ETL 整体结构,在保证稳定和正确的基础上提出精简架构的建议,生成架构报告
  5. 报告生成:基于全量刷新阶段采集的计时数据生成性能分析报告,汇总所有调试结果生成 Debug 报告

性能计时从全量刷新阶段开始就嵌入采集(每任务/每层/API 调用耗时),报告阶段仅做分析和输出,不再单独重跑流程采集数据。

调试使用 .env 中配置的真实 API 和数据库连接test_etl_feiqiu所有发现的问题和修复记录在结构化 Debug 报告中。性能计时在全量刷新阶段实时采集,报告阶段读取中间数据做分析输出。

架构

调试流程架构

flowchart TD
    A[阶段1: 分层单元调试] --> B[阶段2: 全量数据刷新<br/>内嵌性能计时]
    B --> C[阶段3: 黑盒数据校验]
    C --> D[阶段4: 架构优化分析]
    D --> E[阶段5: 报告生成<br/>含性能分析]

    subgraph 阶段1 [分层单元调试]
        A1[ODS 层: API抓取 → 表写入] --> A2[DWD 层: ODS → DWD 清洗装载]
        A2 --> A3[DWS 层: DWD → DWS 汇总]
        A3 --> A4[INDEX 层: 指数算法计算]
        A4 --> A5[编排层: PipelineRunner + TaskExecutor]
        A5 --> A6[CLI 入口: 参数解析 + 模式切换]
        A6 --> A7[配置体系: AppConfig 加载合并]
    end

    subgraph 阶段2 [全量数据刷新 - 内嵌计时 + 边执行边Debug]
        B1[api_full Flow 执行<br/>每任务/每层计时] --> B2{发现Bug?}
        B2 -->|是| B3[修复Bug]
        B3 --> B4[重试失败的层/任务]
        B4 --> B2
        B2 -->|否| B5[increment_verify 校验]
        B5 --> B6[自动补齐缺失数据]
        B6 --> B7[输出计时 JSON 中间文件]
    end

    subgraph 阶段3 [黑盒数据校验]
        C1[API → ODS 记录数对比] --> C2[ODS → DWD 记录数+金额对比]
        C2 --> C3[DWD → DWS 聚合一致性]
        C3 --> C4[可疑值检测: 边缘值/空值/重复]
        C4 --> C5[抽样100条逐字段比对]
    end

    subgraph 阶段4 [架构优化分析]
        D1[代码结构分析] --> D2[冗余识别]
        D2 --> D3[精简建议]
    end

    subgraph 阶段5 [报告生成 - 含性能分析]
        E1[读取计时 JSON] --> E2[瓶颈识别 + 优化建议]
        E2 --> E3[汇总 Debug 报告]
    end

现有系统架构

flowchart LR
    CLI[cli/main.py] --> PR[PipelineRunner]
    CLI --> TE[TaskExecutor]
    PR --> TE
    TE --> TR[TaskRegistry<br/>52个任务]
    TE --> CM[CursorManager]
    TE --> RT[RunTracker]
    
    TR --> ODS[ODS 任务<br/>BaseOdsTask × 23]
    TR --> DWD[DWD 任务<br/>DwdLoadTask]
    TR --> DWS[DWS 任务<br/>BaseDwsTask × 15]
    TR --> IDX[INDEX 任务 × 4]
    TR --> UTL[工具类任务 × 7]
    
    ODS --> API[APIClient<br/>上游 SaaS]
    ODS --> DB[(PostgreSQL<br/>ods.*)]
    DWD --> DB
    DWS --> DB
    IDX --> DB
    
    QC[IntegrityChecker] --> API
    QC --> DB

组件与接口

调试脚本组件

调试通过一组 Python 脚本实现,放置在 apps/etl/pipelines/feiqiu/scripts/debug/ 目录下:

脚本 职责 对应需求
debug_ods.py ODS 层逐任务调试:连接真实 API 和 DB执行单个 ODS 任务并验证写入结果 需求 1, 9
debug_dwd.py DWD 层调试:执行 DWD_LOAD_FROM_ODS 并验证 TABLE_MAP 中每对映射的记录数 需求 2, 9
debug_dws.py DWS 层调试:逐个执行 DWS 任务并验证汇总结果 需求 3, 9
debug_index.py INDEX 层调试:执行 4 个指数任务并验证计算结果 需求 4, 9
debug_orchestration.py 编排层调试:验证 PipelineRunner 和 TaskExecutor 的流程控制 需求 5
run_full_refresh.py 全量刷新:执行 2026-01-01 ~ 2026-02-16 的 api_full内嵌性能计时边执行边 Debug发现问题即修复重试输出计时 JSON 需求 10, 13
debug_blackbox.py 黑盒校验:从 API 源数据逐层对比各表数据完整性 + 可疑值检测 + 抽样逐字段比对 需求 12
analyze_performance.py 性能分析:读取全量刷新采集的计时 JSON统计耗时识别瓶颈生成性能优化报告 需求 13
analyze_architecture.py 架构分析:分析代码结构,识别冗余和可精简点,生成架构优化报告 需求 14
generate_report.py 报告生成:汇总所有调试结果生成 Markdown 报告 需求 11

关键接口

# 调试脚本的统一入口接口
class DebugResult:
    """单个调试步骤的结果"""
    layer: str              # "ODS" / "DWD" / "DWS" / "INDEX" / "ORCHESTRATION"
    task_code: str          # 任务代码
    status: str             # "PASS" / "FAIL" / "WARN" / "ERROR"
    message: str            # 结果描述
    details: dict           # 详细信息(记录数、错误堆栈等)
    fix_applied: str | None # 已应用的修复措施

class BlackboxCheckResult:
    """黑盒校验单表结果"""
    layer: str              # "API_ODS" / "ODS_DWD" / "DWD_DWS"
    source_table: str       # 源表名
    target_table: str       # 目标表名
    source_count: int       # 源记录数
    target_count: int       # 目标记录数
    count_diff: int         # 差异数
    amount_diffs: list      # 金额差异列表
    missing_keys: list      # 缺失记录主键
    mismatch_count: int     # 内容不一致数

数据模型

Debug 报告结构

@dataclass
class DebugReport:
    """Debug 报告数据模型"""
    title: str                          # 报告标题
    generated_at: datetime              # 生成时间
    environment: dict                   # 环境信息DB DSN, API base, store_id
    
    # 分层调试结果
    ods_results: list[DebugResult]      # ODS 层调试结果
    dwd_results: list[DebugResult]      # DWD 层调试结果
    dws_results: list[DebugResult]      # DWS 层调试结果
    index_results: list[DebugResult]    # INDEX 层调试结果
    orchestration_results: list[DebugResult]  # 编排层调试结果
    
    # 黑盒校验结果
    blackbox_results: list[BlackboxCheckResult]
    
    # 全量刷新结果
    full_refresh: dict                  # {status, layers, counts, duration}
    verification: dict                  # {status, total_tables, consistent, backfilled}
    
    # 性能分析
    performance_report: dict            # {layer_timings, bottlenecks, recommendations}
    
    # 架构优化
    architecture_report: dict           # {structure_analysis, redundancies, simplification_suggestions}
    
    # 汇总
    total_issues: int                   # 发现的问题总数
    fixed_issues: int                   # 已修复的问题数
    remaining_issues: int               # 遗留问题数

黑盒校验数据流

flowchart TD
    API[上游 API] -->|iter_paginated| COUNT_API[API 记录数]
    API -->|抽样100条| SAMPLE[抽样记录集]
    ODS[(ods.* 表)] -->|SELECT COUNT| COUNT_ODS[ODS 记录数]
    ODS -->|可疑值检测| SUSPECT[可疑值: 边缘值/空值/重复]
    DWD[(dwd.* 表)] -->|SELECT COUNT| COUNT_DWD[DWD 记录数]
    DWS[(dws.* 表)] -->|SELECT SUM| SUM_DWS[DWS 汇总值]
    
    COUNT_API --> CMP1{API vs ODS<br/>记录数对比}
    COUNT_ODS --> CMP1
    COUNT_ODS --> CMP2{ODS vs DWD<br/>记录数+金额对比}
    COUNT_DWD --> CMP2
    COUNT_DWD --> CMP3{DWD vs DWS<br/>聚合一致性}
    SUM_DWS --> CMP3
    
    SAMPLE --> CMP4{抽样逐字段比对<br/>API vs ODS 100条}
    ODS --> CMP4
    
    SUSPECT --> CMP5{可疑值分析<br/>追溯上游原因}
    
    CMP1 --> RPT[校验报告]
    CMP2 --> RPT
    CMP3 --> RPT
    CMP4 --> RPT
    CMP5 --> RPT

正确性属性

属性是在系统所有有效执行中都应成立的特征或行为——本质上是关于系统应该做什么的形式化陈述。属性是人类可读规范与机器可验证正确性保证之间的桥梁。

基于前置分析,以下属性覆盖了可自动化测试的验收标准。本项目的许多需求(特别是需求 8-12 的黑盒校验和全量刷新)依赖真实数据库和 API 连接,属于集成测试范畴,不适合属性测试。属性测试聚焦于可用 FakeDB/FakeAPI 验证的核心逻辑。

Property 1: ODS 任务提取记录数一致性

对任意 ODS 任务和 FakeAPI 提供的任意非空记录列表,任务执行后 FakeDB 接收到的记录数应等于 API 提供的记录数减去被跳过的记录数(缺失主键 + 重复哈希)。

Validates: Requirements 1.1, 1.2

Property 2: ODS 冲突处理策略正确性

对任意 ods_conflict_mode 配置值nothing/backfill/updateBaseOdsTask 生成的 SQL 语句应包含对应的冲突处理子句nothing → DO NOTHINGbackfill → COALESCEupdate → IS DISTINCT FROM

Validates: Requirements 1.3

Property 3: ODS 跳过缺失主键记录

对任意包含缺失主键字段的记录集合BaseOdsTask 的 skipped 计数应等于缺失主键的记录数,且这些记录不应出现在写入的行中。

Validates: Requirements 1.4

Property 4: ODS content_hash 去重

对任意两条内容相同的 ODS 记录(仅 fetched_at 不同),计算出的 content_hash 应相同;当已有记录的 content_hash 与新记录相同时,新记录应被跳过。

Validates: Requirements 1.5

Property 5: ODS 快照删除标记

对任意启用 snapshot_missing_delete 的 ODS 任务,当 API 返回的记录集是已有记录集的真子集时,差集中的记录应被标记为 is_delete=1

Validates: Requirements 1.7

Property 6: DWD FACT_MAPPINGS 列映射完整性

对任意 FACT_MAPPINGS 中的映射条目 (dwd_col, ods_expr, cast_type),当 ods_expr 是简单列名时,该列应存在于对应的 ODS 源表中。

Validates: Requirements 2.4

Property 7: DWD only_tables 过滤

对任意非空的 dwd.only_tables 配置列表DwdLoadTask 处理的表集合应是配置列表与 TABLE_MAP 键集合的交集。

Validates: Requirements 2.6

Property 8: DWS 分段累加一致性

对任意时间窗口和分段配置BaseTask 的 _accumulate_counts 方法对各分段计数的累加结果应等于各分段计数的逐键求和。

Validates: Requirements 3.3

Property 9: PipelineRunner Flow 层解析

对任意有效的 Flow 名称PIPELINE_LAYERS 的键PipelineRunner 解析出的层列表应与 PIPELINE_LAYERS 中定义的值完全一致。

Validates: Requirements 5.1, 10.2

Property 10: PipelineRunner 无效 Flow 拒绝

对任意不在 PIPELINE_LAYERS 键集合中的字符串PipelineRunner.run() 应抛出 ValueError。

Validates: Requirements 5.2

Property 11: TaskExecutor 工具类任务跳过游标

对任意被 TaskRegistry 标记为 requires_db_config=False 的任务代码TaskExecutor 应通过 _run_utility_task 路径执行,不调用 CursorManager 和 RunTracker。

Validates: Requirements 5.5

Property 12: CLI data_source 解析

对任意 --data-source 参数值online/offline/hybrid--pipeline-flow 参数值FULL/FETCH_ONLY/INGEST_ONLYresolve_data_source 应返回正确的映射值,且 --data-source 优先于 --pipeline-flow

Validates: Requirements 6.3, 6.4

Property 13: AppConfig 优先级合并

对任意嵌套字典 DEFAULTS 和 CLI 覆盖,_deep_merge 后 CLI 中的键值应覆盖 DEFAULTS 中的同名键值,未被覆盖的键应保持原值。

Validates: Requirements 7.1

Property 14: AppConfig store_id 验证

对任意非整数字符串作为 app.store_idAppConfig._normalize 应抛出 SystemExit。

Validates: Requirements 7.2

Property 15: AppConfig DSN 组装

对任意 host、port、name、user、password 组合db.dsn 为空时AppConfig._normalize 应组装出格式为 postgresql://{user}:{password}@{host}:{port}/{name} 的 DSN 字符串。

Validates: Requirements 7.3

Property 16: AppConfig 点号路径 get

对任意嵌套字典和有效的点号路径,config.get(path) 应返回路径对应的值;对无效路径应返回 default 参数值。

Validates: Requirements 7.4

错误处理

数据库连接错误

  • 调试脚本在启动时验证数据库连接,连接失败时记录错误详情并在报告中标注
  • 单表处理失败时回滚该表事务继续处理后续表DWD 层已实现此逻辑)
  • 全量刷新过程中连接断开时,利用 DatabaseConnection.ensure_open() 尝试重连

API 连接错误

  • APIClient 内置重试机制(retry_max=3,指数退避)
  • API 返回非 0 code 时抛出 ValueError由上层捕获并记录
  • Token 过期时在报告中标注,提示用户更新 .env 中的 API_TOKEN

任务执行错误

  • ODS 任务:数据库异常时回滚并递增 errors 计数
  • DWD 任务:单表失败时回滚该表,继续后续表,最终汇总错误
  • DWS/INDEX 任务:继承 BaseTask 的异常处理,回滚后重新抛出
  • 工具类任务:异常直接向上传播,不影响游标和运行记录

数据质量错误

  • 黑盒校验发现的不一致记录在报告中详细列出主键
  • 金额差异超过阈值时标记为 WARN 级别
  • 校验后自动补齐通过 run_backfill 执行,补齐失败的记录单独记录

测试策略

双轨测试方法

本项目采用单元测试 + 属性测试的双轨方法:

  • 属性测试:使用 hypothesis 库验证上述 16 个正确性属性,每个属性至少运行 100 次迭代
  • 单元测试:使用 pytest 验证具体示例、边界情况和错误条件
  • 集成测试:使用真实数据库(TEST_DB_DSN)验证端到端数据流

属性测试配置

  • 库:hypothesis(已在项目依赖中)
  • 最小迭代次数100
  • 每个属性测试必须引用设计文档中的属性编号
  • 标签格式:Feature: etl-pipeline-debug, Property {number}: {property_text}

测试文件组织

apps/etl/pipelines/feiqiu/tests/unit/
├── test_debug_ods_properties.py      # Property 1-5: ODS 层属性测试
├── test_debug_dwd_properties.py      # Property 6-8: DWD/DWS 层属性测试
├── test_debug_orchestration_properties.py  # Property 9-12: 编排层属性测试
├── test_debug_config_properties.py   # Property 13-16: 配置层属性测试
└── (现有测试文件保持不变)

集成测试

集成测试通过调试脚本实现,连接真实数据库和 API

  • debug_ods.py:逐个执行 ODS 任务,验证写入结果
  • debug_dwd.py:执行 DWD 装载,验证映射正确性
  • debug_blackbox.py:黑盒校验,逐层对比数据完整性
  • run_full_refresh.py:全量刷新 + 校验

现有测试基础设施

项目已有完善的测试工具:

  • FakeDBOperations:拦截并记录 SQL 操作,提供 commit/rollback 计数
  • FakeAPIClient:返回预置内存数据,记录调用参数
  • OfflineAPIClient:从归档 JSON 回放数据
  • RealDBOperationsAdapter:连接真实 PostgreSQL 的适配器
  • create_test_config():构建测试用 AppConfig