Files

13 KiB
Raw Permalink Blame History

需求文档ETL 管道全流程调试与 Debug

简介

apps/etl/pipelines/feiqiu/ 下的 ETL 管道进行全流程调试,覆盖 ODS → DWD → DWS → INDEX 四层数据处理,识别并修复代码缺陷,生成 Debug 报告。调试范围包括 CLI 入口、编排层、任务执行层、数据库操作层和数据质量校验层。

术语表

  • ETL_Pipeline:从上游 SaaS API 抽取数据,经 ODS → DWD → DWS 三层处理的完整数据管道
  • ODS_Layer原始数据层Operational Data Store负责从 API 抓取数据并落地到 ods.*
  • DWD_Layer明细数据层Data Warehouse Detail负责从 ODS 清洗装载,维度走 SCD2事实按时间增量
  • DWS_Layer汇总数据层Data Warehouse Summary负责从 DWD 聚合生成业务汇总(助教业绩、财务日报等)
  • INDEX_Layer指数算法层负责计算自定义业务指数WBI/NCI/RS/ML
  • TaskExecutor:任务执行器,封装单个 ETL 任务的完整执行生命周期
  • PipelineRunnerFlow 编排器,根据 Flow 定义执行多层 ETL 任务
  • TaskRegistry:任务注册表,管理 52 个已注册任务的元数据和工厂方法
  • CursorManager:游标管理器,负责记录和推进每个任务的时间水位
  • FakeDB:测试用伪数据库操作对象,拦截并记录 SQL 操作
  • FakeAPI:测试用伪 API 客户端,返回预置内存数据
  • Debug_Report:调试报告,记录发现的问题、修复措施和验证结果

需求

需求 1ODS 层任务调试

用户故事: 作为开发者,我希望验证 ODS 层所有任务能正确从 API 抓取数据并写入 ODS 表,以确保原始数据完整落地。

验收标准

  1. WHEN 使用 FakeAPI 提供样例数据执行 ODS 任务, THE TaskExecutor SHALL 正确调用 API 分页接口并提取记录列表
  2. WHEN ODS 任务接收到 API 返回的记录, THE BaseOdsTask SHALL 按照 DB 表结构动态构建 INSERT 语句并写入 ODS 表
  3. WHEN ODS 记录的主键已存在于目标表, THE BaseOdsTask SHALL 根据 ods_conflict_mode 配置执行对应的冲突处理策略nothing/backfill/update
  4. WHEN ODS 记录缺少必需的主键字段值, THE BaseOdsTask SHALL 跳过该记录并递增 skipped 计数
  5. WHEN content_hash 列存在且新记录的哈希值与已有记录相同, THE BaseOdsTask SHALL 跳过该记录以避免无意义更新
  6. IF ODS 任务执行过程中发生数据库异常, THEN THE BaseOdsTask SHALL 回滚当前事务并在 counts 中递增 errors 计数
  7. WHEN snapshot_missing_delete 配置启用且表包含 is_delete 列, THE BaseOdsTask SHALL 将窗口内未出现在 API 返回中的记录标记为已删除

需求 2DWD 层装载调试

用户故事: 作为开发者,我希望验证 DWD 装载任务能正确从 ODS 清洗数据并写入 DWD 表,以确保维度和事实数据准确。

验收标准

  1. WHEN DWD_LOAD_FROM_ODS 任务执行, THE DwdLoadTask SHALL 遍历 TABLE_MAP 中所有 DWD→ODS 映射关系并逐表处理
  2. WHEN 处理维度表dim_*, THE DwdLoadTask SHALL 使用 SCD2 合并策略写入,保留历史版本
  3. WHEN 处理事实表dwd_*, THE DwdLoadTask SHALL 按时间窗口增量写入,使用 FACT_ORDER_CANDIDATES 中的时间列过滤
  4. WHEN ODS 源列名与 DWD 目标列名不同, THE DwdLoadTask SHALL 使用 FACT_MAPPINGS 中定义的列映射和类型转换正确转换
  5. WHEN 单张 DWD 表处理失败, THE DwdLoadTask SHALL 回滚该表事务并继续处理后续表,在结果中汇总错误信息
  6. WHEN dwd.only_tables 配置或 DWD_ONLY_TABLES 环境变量指定了表名列表, THE DwdLoadTask SHALL 仅处理指定的表

需求 3DWS 层汇总调试

用户故事: 作为开发者,我希望验证 DWS 层汇总任务能正确从 DWD 聚合生成业务报表数据,以确保汇总结果准确。

验收标准

  1. WHEN DWS 汇总任务执行, THE DWS_Task SHALL 从 DWD 层读取明细数据并按业务规则聚合写入 DWS 表
  2. WHEN TaskRegistry 按 "DWS" 层查询任务列表, THE TaskRegistry SHALL 返回所有 15 个已注册的 DWS 层任务代码
  3. WHEN DWS 任务的时间窗口跨越多个分段, THE DWS_Task SHALL 按分段逐段处理并累加计数结果

需求 4INDEX 层指数算法调试

用户故事: 作为开发者,我希望验证 INDEX 层指数算法任务能正确计算业务指数,以确保 WBI/NCI/RS/ML 指数结果准确。

验收标准

  1. WHEN INDEX 层任务执行, THE INDEX_Task SHALL 从 DWD/DWS 层读取数据并按指数算法公式计算结果
  2. WHEN TaskRegistry 按 "INDEX" 层查询任务列表, THE TaskRegistry SHALL 返回所有 4 个已注册的 INDEX 层任务代码DWS_WINBACK_INDEX, DWS_NEWCONV_INDEX, DWS_ML_MANUAL_IMPORT, DWS_RELATION_INDEX

需求 5编排层调试

用户故事: 作为开发者,我希望验证 PipelineRunner 和 TaskExecutor 的编排逻辑正确,以确保多层 ETL 任务按正确顺序执行。

验收标准

  1. WHEN PipelineRunner 接收到有效的 Flow 名称, THE PipelineRunner SHALL 按 PIPELINE_LAYERS 定义的层顺序解析并执行任务
  2. IF PipelineRunner 接收到无效的 Flow 名称, THEN THE PipelineRunner SHALL 抛出 ValueError 并包含描述性错误信息
  3. WHEN PipelineRunner 在 verify_only 模式下执行, THE PipelineRunner SHALL 跳过增量 ETL 并直接执行校验逻辑
  4. WHEN PipelineRunner 在 increment_verify 模式下执行, THE PipelineRunner SHALL 先执行增量 ETL 再执行校验逻辑
  5. WHEN TaskExecutor 执行工具类任务, THE TaskExecutor SHALL 跳过游标管理和运行记录,直接执行任务
  6. WHEN TaskExecutor 执行 ODS 任务且 data_source 包含 fetch 阶段, THE TaskExecutor SHALL 使用 RecordingAPIClient 抓取并落盘后入库
  7. WHEN 任务执行成功且返回有效的时间窗口, THE TaskExecutor SHALL 通过 CursorManager 推进游标水位

需求 6CLI 入口调试

用户故事: 作为开发者,我希望验证 CLI 入口能正确解析参数并启动对应的执行模式,以确保命令行操作可靠。

验收标准

  1. WHEN CLI 接收到 --pipeline 参数, THE CLI SHALL 进入 Flow 模式并使用 PipelineRunner 执行
  2. WHEN CLI 接收到 --tasks 参数但无 --pipeline, THE CLI SHALL 进入传统模式并使用 TaskExecutor 直接执行任务列表
  3. WHEN CLI 接收到 --data-source 参数, THE CLI SHALL 使用指定的数据源模式online/offline/hybrid
  4. WHEN CLI 接收到已弃用的 --pipeline-flow 参数, THE CLI SHALL 映射为对应的 data_source 值并发出 DeprecationWarning
  5. WHEN CLI 未指定时间窗口参数, THE CLI SHALL 使用 --lookback-hours(默认 24 小时)计算回溯窗口

需求 7配置体系调试

用户故事: 作为开发者,我希望验证配置加载和合并逻辑正确,以确保 DEFAULTS < ENV < CLI 的优先级链可靠。

验收标准

  1. WHEN AppConfig.load 被调用, THE AppConfig SHALL 按 DEFAULTS → ENV → CLI 的优先级深度合并配置
  2. WHEN 配置中 app.store_id 缺失或非整数, THE AppConfig SHALL 抛出 SystemExit 并包含描述性错误信息
  3. WHEN 配置中 db.dsn 为空, THE AppConfig SHALL 从 host/port/name/user/password 组装 DSN 字符串
  4. WHEN 使用点号路径调用 config.get(), THE AppConfig SHALL 正确遍历嵌套字典并返回对应值

需求 8数据质量校验调试

用户故事: 作为开发者,我希望验证数据完整性校验逻辑能正确检测 ODS→DWD 之间的数据差异,以确保数据一致性。

验收标准

  1. WHEN 执行 DWD vs ODS 校验, THE IntegrityChecker SHALL 对 TABLE_MAP 中每对 DWD→ODS 表比较记录数和金额列汇总
  2. WHEN 校验发现 DWD 与 ODS 记录数不一致, THE IntegrityChecker SHALL 在结果中报告 count diff 值
  3. WHEN compare_content 配置启用, THE IntegrityChecker SHALL 通过哈希比较检测字段级内容差异并报告 mismatch 数量

需求 9真实数据调试

用户故事: 作为开发者,我希望使用真实的数据库和 API 连接进行调试,以确保发现的问题贴近生产环境。

验收标准

  1. THE ETL_Pipeline SHALL 使用 apps/etl/pipelines/feiqiu/.env 中配置的真实 API 地址和数据库 DSN 进行调试
  2. WHEN 执行调试任务, THE ETL_Pipeline SHALL 连接真实 PostgreSQL 实例test_etl_feiqiu 数据库)验证数据读写
  3. WHEN 执行调试任务, THE ETL_Pipeline SHALL 调用真实上游 SaaS API 验证数据抓取和分页逻辑
  4. IF 真实 API 或数据库连接失败, THEN THE ETL_Pipeline SHALL 记录连接错误详情并在 Debug 报告中标注

需求 10全量数据更新与校验

用户故事: 作为开发者,我希望在调试完成后执行 2026-01-01 至 2026-02-16 的全量数据更新和校验,以确保历史数据完整一致。

验收标准

  1. WHEN 调试修复完成后, THE ETL_Pipeline SHALL 使用 api_full Flow 执行 2026-01-01 00:00 至 2026-02-16 00:00 的全量数据更新
  2. WHEN 全量更新执行, THE PipelineRunner SHALL 按 ODS → DWD → DWS → INDEX 顺序依次处理所有层
  3. WHEN 全量更新完成后, THE ETL_Pipeline SHALL 执行 increment_verify 模式对同一时间窗口进行数据一致性校验
  4. WHEN 校验发现数据不一致, THE IntegrityChecker SHALL 自动执行补齐操作并记录补齐结果
  5. THE Debug_Report SHALL 包含全量更新的执行统计(各层记录数、耗时)和校验结果摘要

需求 11Debug 报告生成

用户故事: 作为开发者,我希望获得一份结构化的 Debug 报告,记录所有发现的问题和修复措施,以便追溯和复查。

验收标准

  1. THE Debug_Report SHALL 包含以下章节:概述、发现的问题列表、修复措施、验证结果、全量更新统计、遗留问题
  2. WHEN 发现代码缺陷, THE Debug_Report SHALL 记录缺陷位置(文件路径+行号)、缺陷描述、严重程度和修复方案
  3. WHEN 修复已验证通过, THE Debug_Report SHALL 记录验证方式(单元测试/属性测试/手动验证)和验证结果
  4. THE Debug_Report SHALL 输出为 Markdown 文件,存放在 apps/etl/pipelines/feiqiu/docs/reports/ 目录下

需求 12黑盒数据完整性校验

用户故事: 作为检查者,我希望以黑盒视角从 API 源数据出发,逐层对比各 Schema 各表的数据是否完整,以独立验证 ETL 管道的正确性。

验收标准

  1. WHEN 执行黑盒校验, THE IntegrityChecker SHALL 从上游 API 重新拉取指定时间窗口的源数据作为基准
  2. WHEN 获取到 API 源数据后, THE IntegrityChecker SHALL 将 API 记录数与 ODS 表记录数逐端点对比,报告缺失和多余记录
  3. WHEN ODS 层校验完成后, THE IntegrityChecker SHALL 将 ODS 表记录数与 DWD 表记录数逐映射对比,报告数量差异
  4. WHEN DWD 层校验完成后, THE IntegrityChecker SHALL 验证 DWS 汇总表的聚合结果与 DWD 明细数据的一致性
  5. WHEN 校验发现金额类字段汇总不一致, THE IntegrityChecker SHALL 报告差异金额和涉及的具体记录主键
  6. THE IntegrityChecker SHALL 生成逐层校验报告,包含每张表的记录数对比、金额汇总对比和内容哈希差异统计
  7. WHEN 校验发现缺失记录, THE IntegrityChecker SHALL 输出缺失记录的主键列表,便于定位补齐
  8. WHEN 执行黑盒校验, THE IntegrityChecker SHALL 检测可疑值(边缘值、空值、重复记录等),分析可能的流程问题并追溯原因
  9. WHEN 全量更新完成后, THE IntegrityChecker SHALL 从新数据中抽样 100 条记录,逐字段与上游 API 源数据比对,验证字段级一致性

需求 13性能分析

用户故事: 作为开发者,我希望分析 ETL 整体流程的性能瓶颈,在保证稳定和数据处理结果正确的基础上提高数据处理性能。

验收标准

  1. THE Performance_Analyzer SHALL 统计各层ODS/DWD/DWS/INDEX和各任务的执行耗时
  2. THE Performance_Analyzer SHALL 识别耗时最长的前 5 个任务作为性能瓶颈
  3. THE Performance_Analyzer SHALL 分析数据库查询的执行计划,识别缺失索引和全表扫描
  4. THE Performance_Analyzer SHALL 分析 API 调用的响应时间和分页效率
  5. THE Performance_Analyzer SHALL 生成性能优化报告,包含瓶颈分析和具体优化建议,存放在 apps/etl/pipelines/feiqiu/docs/reports/ 目录下

需求 14架构优化分析

用户故事: 作为开发者,我希望分析 ETL 整体结构,在保证稳定和数据处理结果正确的基础上精简架构。

验收标准

  1. THE Architecture_Analyzer SHALL 分析代码结构,识别重复代码和冗余模块
  2. THE Architecture_Analyzer SHALL 评估各层之间的耦合度,识别可解耦的组件
  3. THE Architecture_Analyzer SHALL 分析任务注册表中 52 个任务的分类合理性
  4. THE Architecture_Analyzer SHALL 生成架构优化报告,包含结构分析、冗余识别和精简建议,存放在 apps/etl/pipelines/feiqiu/docs/reports/ 目录下