8.7 KiB
实现计划:ETL 管道全流程调试与 Debug
概述
按五阶段策略实现 ETL 管道全流程调试:分层单元调试 → 全量数据刷新(内嵌计时,边执行边 Debug)→ 黑盒数据校验 → 架构优化分析 → Debug 报告生成(含性能分析)。性能计时从全量刷新阶段开始就嵌入采集,报告阶段仅做分析和输出。调试脚本放置在 apps/etl/pipelines/feiqiu/scripts/debug/ 目录下。
任务
-
1. 阶段1:分层单元调试 - ODS 层
-
1.1 编写
scripts/debug/debug_ods.py调试脚本- 连接真实 API 和数据库(从
.env加载配置) - 逐个执行 23 个 ODS 任务(小窗口,如最近 2 小时)
- 验证每个任务的返回结果:status、counts(fetched/inserted/updated/skipped/errors)
- 检查 ODS 表实际写入行数是否与 counts 一致
- 记录每个任务的执行结果到 DebugResult 列表
- Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 9.1, 9.2, 9.3
- 连接真实 API 和数据库(从
-
1.2 编写 ODS 层属性测试
tests/unit/test_debug_ods_properties.py- Property 1: ODS 任务提取记录数一致性
- Validates: Requirements 1.1, 1.2
- Property 2: ODS 冲突处理策略正确性
- Validates: Requirements 1.3
- Property 3: ODS 跳过缺失主键记录
- Validates: Requirements 1.4
- Property 4: ODS content_hash 去重
- Validates: Requirements 1.5
- Property 5: ODS 快照删除标记
- Validates: Requirements 1.7
-
-
2. 阶段1:分层单元调试 - DWD 层
-
2.1 编写
scripts/debug/debug_dwd.py调试脚本- 执行 DWD_LOAD_FROM_ODS 任务
- 验证 TABLE_MAP 中每对 DWD→ODS 映射的处理结果
- 检查维度表 SCD2 版本链完整性
- 检查事实表时间窗口增量写入正确性
- 验证 FACT_MAPPINGS 列映射和类型转换
- 记录每张表的处理结果(inserted/updated/errors)
- Requirements: 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 9.1, 9.2
-
2.2 编写 DWD 层属性测试
tests/unit/test_debug_dwd_properties.py- Property 6: DWD FACT_MAPPINGS 列映射完整性
- Validates: Requirements 2.4
- Property 7: DWD only_tables 过滤
- Validates: Requirements 2.6
- Property 8: DWS 分段累加一致性
- Validates: Requirements 3.3
-
-
3. 阶段1:分层单元调试 - DWS 和 INDEX 层
-
3.1 编写
scripts/debug/debug_dws.py调试脚本- 逐个执行 15 个 DWS 汇总任务
- 验证每个任务的返回结果和 DWS 表写入情况
- 检查汇总数据与 DWD 明细数据的一致性(抽样验证)
- Requirements: 3.1, 3.2, 3.3, 9.1, 9.2
-
3.2 编写
scripts/debug/debug_index.py调试脚本- 执行 4 个 INDEX 层任务(WBI/NCI/RS/ML)
- 验证指数计算结果的合理性(非空、范围检查)
- Requirements: 4.1, 4.2, 9.1, 9.2
-
-
4. 阶段1:分层单元调试 - 编排层和配置层
-
4.1 编写
scripts/debug/debug_orchestration.py调试脚本- 验证 PipelineRunner 的 Flow 解析逻辑(所有 7 种 Flow)
- 验证 TaskExecutor 的任务分发逻辑(ODS/DWD/DWS/工具类)
- 验证 CursorManager 的游标推进逻辑
- 验证 CLI 参数解析(传统模式 vs Flow 模式)
- Requirements: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7, 6.1, 6.2, 6.3, 6.4, 6.5
-
4.2 编写编排层属性测试
tests/unit/test_debug_orchestration_properties.py- Property 9: PipelineRunner Flow 层解析
- Validates: Requirements 5.1, 10.2
- Property 10: PipelineRunner 无效 Flow 拒绝
- Validates: Requirements 5.2
- Property 11: TaskExecutor 工具类任务跳过游标
- Validates: Requirements 5.5
- Property 12: CLI data_source 解析
- Validates: Requirements 6.3, 6.4
-
4.3 编写配置层属性测试
tests/unit/test_debug_config_properties.py- Property 13: AppConfig 优先级合并
- Validates: Requirements 7.1
- Property 14: AppConfig store_id 验证
- Validates: Requirements 7.2
- Property 15: AppConfig DSN 组装
- Validates: Requirements 7.3
- Property 16: AppConfig 点号路径 get
- Validates: Requirements 7.4
-
-
5. 检查点 - 阶段1 完成
- 确保所有单元调试脚本可运行,所有属性测试通过,询问用户是否有问题。
-
6. 阶段2:全量数据刷新(内嵌计时,边执行边 Debug)
-
6.1 编写
scripts/debug/run_full_refresh.py全量刷新脚本- 使用
api_fullFlow 执行 2026-01-01 00:00 至 2026-02-16 00:00 的全量更新 - 按层逐步执行:先 ODS,再 DWD,再 DWS,最后 INDEX
- 内嵌性能计时:记录每个任务的开始/结束时间、每层的总耗时、API 调用响应时间,计时的项目颗粒度尽可能精细。
- 每层执行后检查结果,发现错误时记录并尝试修复
- 支持从失败的层/任务重试(断点续跑)
- 全量更新完成后执行
increment_verify校验 - 校验发现不一致时自动补齐
- 将计时数据和执行统计(记录数、耗时)写入 JSON 中间文件供后续性能分析使用
- Requirements: 10.1, 10.2, 10.3, 10.4, 10.5, 13.1, 13.4, 9.1, 9.2, 9.3
- 使用
-
6.2 执行全量刷新并修复发现的 Bug
- 运行
run_full_refresh.py - 对执行过程中发现的每个 Bug:定位原因、修复代码、重试验证
- 将所有发现的问题和修复记录到 DebugResult 列表
- Requirements: 10.1, 10.2, 10.3, 10.4
- 运行
-
-
7. 检查点 - 阶段2 完成
- 确保全量刷新成功完成,所有测试通过,询问用户是否有问题。
-
8. 阶段3:黑盒数据校验
-
8.1 编写
scripts/debug/debug_blackbox.py黑盒校验脚本- API → ODS:逐端点从 API 拉取数据,与 ODS 表记录数对比
- ODS → DWD:按 TABLE_MAP 逐对比较记录数和金额列汇总
- DWD → DWS:验证汇总表聚合结果与明细数据一致性
- 可疑值检测:扫描各表中的边缘值、空值、重复记录,分析可能的流程问题
- 抽样比对:从新数据中随机抽样 100 条记录,逐字段与上游 API 源数据比对
- 输出缺失记录主键列表和差异金额详情
- 生成逐层校验报告
- Requirements: 12.1, 12.2, 12.3, 12.4, 12.5, 12.6, 12.7, 12.8, 12.9
-
8.2 执行黑盒校验并记录结果
- 运行
debug_blackbox.py - 分析校验结果,对发现的不一致追溯原因
- 将校验结果记录到 BlackboxCheckResult 列表
- Requirements: 12.1, 12.2, 12.3, 12.4, 12.5, 12.6, 12.7, 12.8, 12.9
- 运行
-
-
9. 检查点 - 阶段3 完成
- 确保黑盒校验完成,所有测试通过,询问用户是否有问题。
-
10. 阶段4:架构优化分析
- 10.1 编写
scripts/debug/analyze_architecture.py架构分析脚本- 分析代码结构:模块依赖关系、文件大小、函数复杂度
- 识别重复代码和冗余模块
- 评估各层之间的耦合度
- 分析 52 个任务的分类合理性
- 生成架构优化报告(Markdown),存放在
docs/reports/ - Requirements: 14.1, 14.2, 14.3, 14.4
- 10.1 编写
-
11. 阶段5:Debug 报告生成(含性能分析)
-
11.1 编写
scripts/debug/analyze_performance.py性能分析脚本- 读取全量刷新阶段采集的计时 JSON 中间文件
- 统计各层(ODS/DWD/DWS/INDEX)和各任务的执行耗时
- 识别耗时最长的前 5 个任务作为性能瓶颈
- 分析关键 SQL 查询的执行计划(EXPLAIN ANALYZE)
- 分析 API 调用的响应时间和分页效率
- 生成性能优化报告(Markdown),存放在
docs/reports/ - Requirements: 13.1, 13.2, 13.3, 13.4, 13.5
-
11.2 编写
scripts/debug/generate_report.py报告生成脚本- 汇总所有阶段的调试结果
- 生成结构化 Markdown 报告,包含:概述、发现的问题列表、修复措施、验证结果、全量更新统计、黑盒校验结果、性能分析摘要、架构优化摘要、遗留问题
- 输出到
apps/etl/pipelines/feiqiu/docs/reports/debug_report_YYYYMMDD.md - Requirements: 11.1, 11.2, 11.3, 11.4
-
-
12. 最终检查点 - 全部完成
- 确保所有测试通过,所有报告已生成,询问用户是否有问题。
备注
- 标记
*的任务为可选任务,可跳过以加快 MVP 进度 - 每个任务引用具体需求以确保可追溯性
- 检查点确保增量验证
- 属性测试验证通用正确性属性
- 单元测试验证具体示例和边界情况
- 调试脚本使用真实 API 和数据库连接(
.env配置) - 全量刷新采用迭代式:发现 Bug 即修复并重试