Files

8.7 KiB
Raw Permalink Blame History

实现计划ETL 管道全流程调试与 Debug

概述

按五阶段策略实现 ETL 管道全流程调试:分层单元调试 → 全量数据刷新(内嵌计时,边执行边 Debug→ 黑盒数据校验 → 架构优化分析 → Debug 报告生成(含性能分析)。性能计时从全量刷新阶段开始就嵌入采集,报告阶段仅做分析和输出。调试脚本放置在 apps/etl/pipelines/feiqiu/scripts/debug/ 目录下。

任务

  • 1. 阶段1分层单元调试 - ODS 层

    • 1.1 编写 scripts/debug/debug_ods.py 调试脚本

      • 连接真实 API 和数据库(从 .env 加载配置)
      • 逐个执行 23 个 ODS 任务(小窗口,如最近 2 小时)
      • 验证每个任务的返回结果status、countsfetched/inserted/updated/skipped/errors
      • 检查 ODS 表实际写入行数是否与 counts 一致
      • 记录每个任务的执行结果到 DebugResult 列表
      • Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 9.1, 9.2, 9.3
    • 1.2 编写 ODS 层属性测试 tests/unit/test_debug_ods_properties.py

      • Property 1: ODS 任务提取记录数一致性
      • Validates: Requirements 1.1, 1.2
      • Property 2: ODS 冲突处理策略正确性
      • Validates: Requirements 1.3
      • Property 3: ODS 跳过缺失主键记录
      • Validates: Requirements 1.4
      • Property 4: ODS content_hash 去重
      • Validates: Requirements 1.5
      • Property 5: ODS 快照删除标记
      • Validates: Requirements 1.7
  • 2. 阶段1分层单元调试 - DWD 层

    • 2.1 编写 scripts/debug/debug_dwd.py 调试脚本

      • 执行 DWD_LOAD_FROM_ODS 任务
      • 验证 TABLE_MAP 中每对 DWD→ODS 映射的处理结果
      • 检查维度表 SCD2 版本链完整性
      • 检查事实表时间窗口增量写入正确性
      • 验证 FACT_MAPPINGS 列映射和类型转换
      • 记录每张表的处理结果inserted/updated/errors
      • Requirements: 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 9.1, 9.2
    • 2.2 编写 DWD 层属性测试 tests/unit/test_debug_dwd_properties.py

      • Property 6: DWD FACT_MAPPINGS 列映射完整性
      • Validates: Requirements 2.4
      • Property 7: DWD only_tables 过滤
      • Validates: Requirements 2.6
      • Property 8: DWS 分段累加一致性
      • Validates: Requirements 3.3
  • 3. 阶段1分层单元调试 - DWS 和 INDEX 层

    • 3.1 编写 scripts/debug/debug_dws.py 调试脚本

      • 逐个执行 15 个 DWS 汇总任务
      • 验证每个任务的返回结果和 DWS 表写入情况
      • 检查汇总数据与 DWD 明细数据的一致性(抽样验证)
      • Requirements: 3.1, 3.2, 3.3, 9.1, 9.2
    • 3.2 编写 scripts/debug/debug_index.py 调试脚本

      • 执行 4 个 INDEX 层任务WBI/NCI/RS/ML
      • 验证指数计算结果的合理性(非空、范围检查)
      • Requirements: 4.1, 4.2, 9.1, 9.2
  • 4. 阶段1分层单元调试 - 编排层和配置层

    • 4.1 编写 scripts/debug/debug_orchestration.py 调试脚本

      • 验证 PipelineRunner 的 Flow 解析逻辑(所有 7 种 Flow
      • 验证 TaskExecutor 的任务分发逻辑ODS/DWD/DWS/工具类)
      • 验证 CursorManager 的游标推进逻辑
      • 验证 CLI 参数解析(传统模式 vs Flow 模式)
      • Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 6.1, 6.2, 6.3, 6.4, 6.5
    • 4.2 编写编排层属性测试 tests/unit/test_debug_orchestration_properties.py

      • Property 9: PipelineRunner Flow 层解析
      • Validates: Requirements 5.1, 10.2
      • Property 10: PipelineRunner 无效 Flow 拒绝
      • Validates: Requirements 5.2
      • Property 11: TaskExecutor 工具类任务跳过游标
      • Validates: Requirements 5.5
      • Property 12: CLI data_source 解析
      • Validates: Requirements 6.3, 6.4
    • 4.3 编写配置层属性测试 tests/unit/test_debug_config_properties.py

      • Property 13: AppConfig 优先级合并
      • Validates: Requirements 7.1
      • Property 14: AppConfig store_id 验证
      • Validates: Requirements 7.2
      • Property 15: AppConfig DSN 组装
      • Validates: Requirements 7.3
      • Property 16: AppConfig 点号路径 get
      • Validates: Requirements 7.4
  • 5. 检查点 - 阶段1 完成

    • 确保所有单元调试脚本可运行,所有属性测试通过,询问用户是否有问题。
  • 6. 阶段2全量数据刷新内嵌计时边执行边 Debug

    • 6.1 编写 scripts/debug/run_full_refresh.py 全量刷新脚本

      • 使用 api_full Flow 执行 2026-01-01 00:00 至 2026-02-16 00:00 的全量更新
      • 按层逐步执行:先 ODS再 DWD再 DWS最后 INDEX
      • 内嵌性能计时:记录每个任务的开始/结束时间、每层的总耗时、API 调用响应时间,计时的项目颗粒度尽可能精细。
      • 每层执行后检查结果,发现错误时记录并尝试修复
      • 支持从失败的层/任务重试(断点续跑)
      • 全量更新完成后执行 increment_verify 校验
      • 校验发现不一致时自动补齐
      • 将计时数据和执行统计(记录数、耗时)写入 JSON 中间文件供后续性能分析使用
      • Requirements: 10.1, 10.2, 10.3, 10.4, 10.5, 13.1, 13.4, 9.1, 9.2, 9.3
    • 6.2 执行全量刷新并修复发现的 Bug

      • 运行 run_full_refresh.py
      • 对执行过程中发现的每个 Bug定位原因、修复代码、重试验证
      • 将所有发现的问题和修复记录到 DebugResult 列表
      • Requirements: 10.1, 10.2, 10.3, 10.4
  • 7. 检查点 - 阶段2 完成

    • 确保全量刷新成功完成,所有测试通过,询问用户是否有问题。
  • 8. 阶段3黑盒数据校验

    • 8.1 编写 scripts/debug/debug_blackbox.py 黑盒校验脚本

      • API → ODS逐端点从 API 拉取数据,与 ODS 表记录数对比
      • ODS → DWD按 TABLE_MAP 逐对比较记录数和金额列汇总
      • DWD → DWS验证汇总表聚合结果与明细数据一致性
      • 可疑值检测:扫描各表中的边缘值、空值、重复记录,分析可能的流程问题
      • 抽样比对:从新数据中随机抽样 100 条记录,逐字段与上游 API 源数据比对
      • 输出缺失记录主键列表和差异金额详情
      • 生成逐层校验报告
      • Requirements: 12.1, 12.2, 12.3, 12.4, 12.5, 12.6, 12.7, 12.8, 12.9
    • 8.2 执行黑盒校验并记录结果

      • 运行 debug_blackbox.py
      • 分析校验结果,对发现的不一致追溯原因
      • 将校验结果记录到 BlackboxCheckResult 列表
      • Requirements: 12.1, 12.2, 12.3, 12.4, 12.5, 12.6, 12.7, 12.8, 12.9
  • 9. 检查点 - 阶段3 完成

    • 确保黑盒校验完成,所有测试通过,询问用户是否有问题。
  • 10. 阶段4架构优化分析

    • 10.1 编写 scripts/debug/analyze_architecture.py 架构分析脚本
      • 分析代码结构:模块依赖关系、文件大小、函数复杂度
      • 识别重复代码和冗余模块
      • 评估各层之间的耦合度
      • 分析 52 个任务的分类合理性
      • 生成架构优化报告Markdown存放在 docs/reports/
      • Requirements: 14.1, 14.2, 14.3, 14.4
  • 11. 阶段5Debug 报告生成(含性能分析)

    • 11.1 编写 scripts/debug/analyze_performance.py 性能分析脚本

      • 读取全量刷新阶段采集的计时 JSON 中间文件
      • 统计各层ODS/DWD/DWS/INDEX和各任务的执行耗时
      • 识别耗时最长的前 5 个任务作为性能瓶颈
      • 分析关键 SQL 查询的执行计划EXPLAIN ANALYZE
      • 分析 API 调用的响应时间和分页效率
      • 生成性能优化报告Markdown存放在 docs/reports/
      • Requirements: 13.1, 13.2, 13.3, 13.4, 13.5
    • 11.2 编写 scripts/debug/generate_report.py 报告生成脚本

      • 汇总所有阶段的调试结果
      • 生成结构化 Markdown 报告,包含:概述、发现的问题列表、修复措施、验证结果、全量更新统计、黑盒校验结果、性能分析摘要、架构优化摘要、遗留问题
      • 输出到 apps/etl/pipelines/feiqiu/docs/reports/debug_report_YYYYMMDD.md
      • Requirements: 11.1, 11.2, 11.3, 11.4
  • 12. 最终检查点 - 全部完成

    • 确保所有测试通过,所有报告已生成,询问用户是否有问题。

备注

  • 标记 * 的任务为可选任务,可跳过以加快 MVP 进度
  • 每个任务引用具体需求以确保可追溯性
  • 检查点确保增量验证
  • 属性测试验证通用正确性属性
  • 单元测试验证具体示例和边界情况
  • 调试脚本使用真实 API 和数据库连接(.env 配置)
  • 全量刷新采用迭代式:发现 Bug 即修复并重试