Files

15 KiB
Raw Permalink Blame History

实施计划ETL 统一请求编排与线程模型改造

概述

将飞球 Connector ETL 系统的 ODS 任务从同步串行执行迁移到"串行请求 + 异步处理 + 单线程写库"统一管道架构。按组件依赖顺序逐步实现:基础组件 → 核心引擎 → 任务迁移 → DWD 优化 → 日志优化。

任务

  • 1. 实现基础组件PipelineConfig、CancellationToken、RateLimiter

    • 1.1 创建 apps/etl/connectors/feiqiu/config/pipeline_config.py,实现 PipelineConfig 数据类

      • 定义 workersqueue_sizebatch_sizebatch_timeoutrate_minrate_maxmax_consecutive_failures 字段及默认值
      • 实现 __post_init__ 参数校验workers>=1、queue_size>=1、batch_size>=1、rate_min<=rate_max
      • 实现 from_app_config(config, task_code) 类方法,支持 pipeline.<task_code>.* 任务级覆盖 → 全局 pipeline.* → 默认值的三级回退
      • 需求: 1.3, 1.4, 1.5, 2.3, 2.5, 2.6, 4.1, 4.2, 4.3, 4.4, 4.5, 4.6
    • 1.2 编写 PipelineConfig 属性测试

      • Property 3: PipelineConfig 构造与验证 — 生成随机配置参数组合(含无效值),验证合法参数成功构造、非法参数抛出 ValueError
      • Property 4: 配置分层与任务级覆盖 — 生成随机多层配置字典,验证任务级 > 全局级 > 默认值的优先级
      • 测试文件:tests/test_pipeline_config_properties.py
      • 验证: 需求 1.4, 1.5, 4.1, 4.2, 4.3, 4.4, 4.5, 4.6
    • 1.3 创建 apps/etl/connectors/feiqiu/utils/cancellation.py,实现 CancellationToken

      • 基于 threading.Event 实现线程安全的 cancel() 方法和 is_cancelled 属性
      • 实现超时自动取消(构造时传入 timeout 秒数,通过 threading.Timer 触发)
      • 实现 dispose() 清理定时器
      • 需求: 3.1, 3.6
    • 1.4 编写 CancellationToken 属性测试

      • Property 7: CancellationToken 状态转换 — 生成随机超时值,验证初始 False、cancel() 后 True 且不可逆、超时自动触发
      • 测试文件:tests/test_cancellation_properties.py
      • 验证: 需求 3.1, 3.6
    • 1.5 创建 apps/etl/connectors/feiqiu/api/rate_limiter.py,实现 RateLimiter

      • 构造时校验 min_interval <= max_interval,否则抛出 ValueError
      • 实现 wait(cancel_event) 方法:生成 [min, max] 均匀分布随机间隔,拆分为 0.5s 小段轮询 cancel_event
      • 暴露 last_interval 属性
      • 需求: 1.3, 1.5
    • 1.6 编写 RateLimiter 属性测试

      • Property 2: RateLimiter 间隔范围 — 生成随机 (min, max) 对,验证 wait() 实际等待时间在 [min, max] ± 0.5s 范围内
      • 测试文件:tests/test_rate_limiter_properties.py
      • 验证: 需求 1.3
    • 1.7 编写基础组件单元测试

      • 测试文件:apps/etl/connectors/feiqiu/tests/unit/test_pipeline_config.pytests/unit/test_cancellation.pytests/unit/test_rate_limiter.py
      • 覆盖边界条件RateLimiter min=max、CancellationToken 预取消/timeout=0/dispose、PipelineConfig 无效参数/CLI 覆盖
      • 需求: 1.3, 1.5, 3.1, 3.6, 4.1, 4.4, 4.5
  • 2. 检查点 — 基础组件验证

    • 确保所有测试通过ask the user if questions arise.
  • 3. 实现核心管道引擎UnifiedPipeline

    • 3.1 创建数据类 PipelineRequestPipelineResultWriteResult

      • 文件:apps/etl/connectors/feiqiu/pipeline/models.py
      • PipelineRequestendpoint、params、page_size、data_path、list_key、segment_index、is_detail、detail_id
      • PipelineResultstatus、各阶段计数、errors 列表、timing 字典、Detail_Mode 统计
      • WriteResultinserted、updated、skipped、errors
      • 需求: 2.7, 6.6, 8.2, 9.4
    • 3.2 创建 apps/etl/connectors/feiqiu/pipeline/unified_pipeline.py,实现 UnifiedPipeline 核心引擎

      • 实现 __init__:接收 api_client、db_connection、logger、PipelineConfig、CancellationToken初始化 RateLimiter
      • 实现 run(requests, process_fn, write_fn) -> PipelineResult 主方法:
        • 预取消检查cancel_token 已取消则立即返回空结果)
        • 创建 processing_queuemaxsize=queue_size和 write_queuemaxsize=queue_size*2
        • 启动 N 个 worker 线程(_process_worker)和 1 个 writer 线程(_write_worker
        • 主线程执行 _request_loop:串行发送请求、限流等待、取消检查、背压阻塞
        • 发送 SENTINEL 通知线程退出join 等待完成
        • 计算最终 statusSUCCESS/PARTIAL/CANCELLED/FAILED
      • 需求: 1.1, 1.2, 1.6, 2.1, 2.2, 2.4, 2.7, 2.8, 3.2, 3.7, 8.3
    • 3.3 实现 _request_loop 请求调度逻辑

      • 遍历 requests 迭代器,逐个发送 API 请求
      • 每个请求完成后记录耗时、状态码、endpoint 到日志
      • 将响应数据 put 到 processing_queue满时阻塞 = 背压)
      • 请求间调用 rate_limiter.wait(cancel_event),被取消则 break
      • 实现连续失败计数:成功重置为 0失败 +1超过 max_consecutive_failures 则中断
      • 需求: 1.2, 1.7, 3.2, 8.1, 8.3, 9.1, 9.5
    • 3.4 实现 _process_worker 处理线程逻辑

      • 从 processing_queue 消费数据,调用 process_fn 处理
      • 处理结果 put 到 write_queue
      • 单条记录处理异常时捕获、记录错误、标记失败、继续处理
      • 收到 SENTINEL 时退出
      • 需求: 2.1, 2.2, 9.2
    • 3.5 实现 _write_worker 写入线程逻辑

      • 从 write_queue 消费数据,累积到 batch_size 或等待 batch_timeout 后调用 write_fn 批量写入
      • 写入失败时回滚当前批次事务、记录错误、标记失败、继续处理后续批次
      • 队列积压超过 queue_size*2 时记录警告日志
      • 收到 SENTINEL 时将剩余数据 flush 写入后退出
      • 需求: 2.4, 2.5, 2.6, 8.4, 9.3, 9.6
    • 3.6 编写 UnifiedPipeline 属性测试

      • Property 1: 请求严格串行 — 用 FakeAPI 记录时间戳,验证每个请求发送时间 > 上一个响应完成时间
      • Property 5: 管道完成语义 — 生成随机记录集,验证 total_fetched == total_inserted + total_updated + total_skipped
      • Property 6: WriteWorker 批量大小约束 — 生成随机 batch_size 和记录流,验证每次 write_fn 调用的记录数 <= batch_size
      • Property 8: 取消后已入队数据不丢失 — 生成随机请求序列 + 随机取消时刻,验证已入队数据全部处理和写入
      • Property 11: 单项失败不中断整体 — 生成含随机失败的请求序列,验证后续项目继续处理
      • Property 12: 连续失败触发中断 — 生成连续失败序列 + 随机阈值,验证超过阈值时中断
      • Property 13: 写入失败回滚当前批次 — 生成含随机写入失败的批次,验证回滚且后续批次不受影响
      • Property 14: 结果统计完整性 — 验证各计数字段的一致性关系
      • 测试文件:tests/test_pipeline_properties.py
      • 验证: 需求 1.2, 1.6, 2.5, 2.7, 3.2, 3.3, 3.4, 3.5, 6.6, 8.2, 9.1, 9.2, 9.3, 9.4, 9.5
    • 3.7 编写 UnifiedPipeline 单元测试

      • 测试文件:apps/etl/connectors/feiqiu/tests/unit/test_unified_pipeline.py
      • 使用 FakeAPI + FakeDB 端到端测试:正常流程、空请求、预取消、背压触发
      • 需求: 2.7, 3.7, 8.1, 8.3
  • 4. 检查点 — 核心引擎验证

    • 确保所有测试通过ask the user if questions arise.
  • 5. BaseOdsTask 改造与 ODS 任务迁移

    • 5.1 扩展 OdsTaskSpec 数据类,新增 Detail_Mode 可选字段

      • apps/etl/connectors/feiqiu/tasks/ods/ods_tasks.py 中为 OdsTaskSpec 新增:detail_endpointdetail_param_builderdetail_target_tabledetail_data_pathdetail_list_keydetail_id_column
      • 所有新增字段默认值为 None,不影响现有 21 个任务的 OdsTaskSpec 实例
      • 需求: 6.2, 6.3
    • 5.2 改造 BaseOdsTask.execute() 方法,嵌入 UnifiedPipeline

      • execute() 内部构建 PipelineConfig.from_app_config(self.config, spec.code)
      • 将现有分页请求逻辑封装为 _build_requests()Iterable[PipelineRequest]
      • 将现有字段提取/hash 计算封装为 _build_process_fn()Callable
      • 将现有 _insert_records_schema_aware 封装为 _build_write_fn()Callable
      • 调用 pipeline.run(requests, process_fn, write_fn) 替代现有同步循环
      • 保留快照软删除(_mark_missing_as_deleted、endpoint_routing、元数据写入source_file、source_endpoint、fetched_at
      • 保留 TaskExecutor 调用接口不变(task.execute(cursor_data) 签名不变)
      • 需求: 5.1, 5.2, 5.3, 5.4, 5.5, 5.6, 5.7
    • 5.3 实现 Detail_Mode 详情拉取逻辑

      • BaseOdsTask 中实现 _build_detail_requests() 方法:从已写入 ODS 的记录中提取 ID 列表,生成 PipelineRequest(is_detail=True) 序列
      • 详情请求通过同一个 UnifiedPipeline 的 RequestScheduler 排队,遵循相同限流规则
      • 单个详情请求失败时记录错误日志(含 ID 和错误信息),继续处理下一个
      • 在 PipelineResult 中填充 detail_success/detail_failure/detail_skipped 统计
      • 需求: 6.1, 6.4, 6.5, 6.6
    • 5.4 编写 Detail_Mode 属性测试

      • Property 10: Detail_Mode 可选性 — 生成有/无 detail_endpoint 的 OdsTaskSpec验证无配置时跳过详情阶段、有配置时执行详情拉取且遵循限流
      • 测试文件:tests/test_detail_mode_properties.py
      • 验证: 需求 6.1, 6.3, 6.4
    • 5.5 编写迁移等价属性测试

      • Property 9: 迁移前后输出等价 — 生成随机 API 响应序列,对比 UnifiedPipeline 与原同步串行实现的数据库写入结果inserted/updated/skipped 计数和记录内容)
      • 测试文件:tests/test_migration_properties.py
      • 验证: 需求 5.1, 5.3, 5.4, 5.5
    • 5.6 编写 Detail_Mode 和迁移单元测试

      • 测试文件:apps/etl/connectors/feiqiu/tests/unit/test_detail_mode.py
      • 覆盖:列表→详情完整流程、无 detail_endpoint 跳过、详情单条失败不中断
      • 需求: 6.1, 6.3, 6.5
  • 6. 检查点 — ODS 迁移验证

    • 确保所有测试通过ask the user if questions arise.
  • 7. DWD 层多线程优化

    • 7.1 改造 apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py 中的 DwdLoadTask.load() 方法

      • AppConfig 读取 dwd.parallel_workers(默认 4
      • 将现有串行 for dwd_table, ods_table in TABLE_MAP 循环改为 concurrent.futures.ThreadPoolExecutor 并行调度
      • 每张表调用 _process_single_table() 在独立线程中执行,使用独立数据库连接和事务
      • _merge_dim_scd2() 方法本身不改
      • 单张表失败时捕获异常、记录错误日志(含表名和错误信息)、标记失败、继续处理其他表
      • 所有表处理完成后返回汇总结果:成功表数、失败表数、每张表的 inserted/updated 计数
      • 需求: 7.1, 7.2, 7.3, 7.4, 7.5, 7.6
    • 7.2 编写 DWD 并行属性测试

      • Property 16: DWD 并行与串行结果一致 — 生成随机表集合 + mock SCD2验证多线程并行执行的结果与串行逐表执行完全一致
      • 测试文件:tests/test_dwd_parallel_properties.py
      • 验证: 需求 7.1
    • 7.3 编写 DWD 多线程单元测试

      • 测试文件:apps/etl/connectors/feiqiu/tests/unit/test_dwd_parallel.py
      • 覆盖mock SCD2 正常并行、单表失败不影响其他表、汇总结果正确
      • 需求: 7.3, 7.4, 7.5
  • 8. 可观测性与日志优化

    • 8.1 在 UnifiedPipeline 中集成运行时指标日志

      • _request_loop 中定期记录当前请求队列深度、ProcessingPool 活跃线程数、WriteWorker 待写入队列深度、已完成请求数/总请求数
      • run() 返回前计算并记录执行摘要:总耗时、请求/处理/写入各阶段耗时、各阶段记录数统计
      • 与现有 EtlTimer 集成,在 FlowRunner 计时报告中体现各 ODS 任务的阶段耗时
      • 需求: 8.1, 8.2, 8.5
    • 8.2 创建 apps/etl/connectors/feiqiu/utils/task_log_buffer.py,实现 TaskLogBuffer

      • 实现线程安全的 log(level, message) 方法,将日志条目缓冲到内存列表
      • 实现 flush() 方法:按时间戳升序排列,一次性输出到父 logger添加 [task_code] 前缀
      • 定义 LogEntry 数据类timestamp、level、task_code、message
      • 需求: 10.1, 10.3, 10.4
    • 8.3 编写日志缓冲区属性测试

      • Property 15: 日志缓冲区按任务隔离 — 生成多任务随机日志流,验证每个 TaskLogBuffer 的 flush() 仅包含该任务日志且按时间戳升序
      • 测试文件:tests/test_log_buffer_properties.py
      • 验证: 需求 10.1, 10.4
    • 8.4 编写 TaskLogBuffer 单元测试

      • 测试文件:apps/etl/connectors/feiqiu/tests/unit/test_task_log_buffer.py
      • 覆盖:空缓冲区 flush、并发多线程写入、日志前缀格式
      • 需求: 10.1, 10.3, 10.4
  • 9. 检查点 — DWD 优化与日志验证

    • 确保所有测试通过ask the user if questions arise.
  • 10. Admin-web 日志展示优化

    • 10.1 在 apps/etl/connectors/feiqiu/ 中集成 TaskLogBuffer 到 BaseOdsTask 和 FlowRunner

      • 在 BaseOdsTask.execute() 中创建 TaskLogBuffer 实例,替代直接 logger 调用
      • 在 FlowRunner 中为每个任务分配独立的 TaskLogBuffer任务完成后调用 flush()
      • 保证多线程环境下日志写入原子性(每条日志完整一行)
      • 需求: 10.1, 10.3, 10.4
    • 10.2 在 apps/admin-web/ 中实现按任务分组的日志展示

      • 在 ETL 执行结果页面中按任务分段展示日志:每个任务折叠为独立区块
      • 展开后显示该任务的完整执行日志(时间戳、日志级别、消息内容)
      • 支持按任务代码过滤和分组展示
      • 顶部展示任务执行时间线概览(每个任务的开始/结束时间、状态),可点击跳转
      • 需求: 10.2, 10.5, 10.6
  • 11. CLI 参数扩展

    • 11.1 在 apps/etl/connectors/feiqiu/cli/ 中添加 Pipeline 相关 CLI 参数
      • 新增 --pipeline-workers--pipeline-batch-size--pipeline-rate-min--pipeline-rate-max 参数
      • 将 CLI 参数值注入到 AppConfig使其在 PipelineConfig.from_app_config() 中生效
      • 需求: 4.6
  • 12. 最终检查点 — 全量验证

    • 确保所有测试通过ask the user if questions arise.

备注

  • 标记 * 的子任务为可选,可跳过以加速 MVP 交付
  • 每个任务引用了具体的需求编号,确保可追溯
  • 检查点任务用于增量验证,确保每个阶段的正确性
  • 属性测试验证通用正确性属性,单元测试验证具体示例和边界条件
  • 属性测试位于 Monorepo 级 tests/ 目录,单元测试位于 ETL 模块内 tests/unit/