Files

160 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 需求文档ETL 统一请求编排与线程模型改造
## 简介
对飞球 Connector ETL 系统(`apps/etl/connectors/feiqiu/`)的请求编排和线程模型进行全局统一化改造。当前系统所有 ODS 任务在 `BaseOdsTask.execute()` 中同步串行执行 API 请求、数据处理和数据库写入,无限流机制、无取消信号、无并行处理能力。本次改造建立统一的请求编排框架,所有 21 个 ODS 任务迁移到"串行请求 + 异步处理 + 单线程写库"架构支持全局限流5-20 秒随机间隔)、外部取消信号、可选的"列表→详情"二级拉取模式,并对 DWD 层加载进行多线程优化。
## 术语表
- **ETL_System**:飞球 Connector ETL 系统(`apps/etl/connectors/feiqiu/`),负责从飞球 SaaS API 拉取数据并加载到 PostgreSQL 的 ODS → DWD → DWS 各层
- **Unified_Pipeline**:统一请求编排框架,所有 ODS 任务共用的"串行请求 + 异步处理 + 单线程写库"执行引擎
- **Request_Scheduler**:全局请求调度器,负责将所有 API 请求统一排队、串行发送、遵循限流规则
- **Rate_Limiter**:请求间隔控制器,控制相邻两次 API 请求之间的随机等待时间(默认 5-20 秒均匀分布),防止触发上游风控
- **Processing_Pool**:异步处理线程池,多个工作线程并行消费 API 响应数据执行字段提取、数据清洗、content_hash 计算等 CPU 密集操作
- **Write_Worker**:单线程写入工作器,汇总所有处理完成的结果,统一执行数据库写入操作,保证写入串行化
- **CancellationToken**:取消令牌,外部组件(如 Admin-web、CLI通过设置该令牌通知正在执行的任务中断
- **ODS_Task**ODS 层数据拉取任务的统称,当前共 21 个,通过 `OdsTaskSpec` 数据类定义、`_build_task_class()` 动态生成任务类
- **Detail_Mode**:二级详情拉取模式,在列表接口拉取完成后逐条调用详情接口获取更丰富的数据,属于可选能力
- **Pipeline_Config**:管道配置,包含 worker 数、队列大小、批量写入阈值、限流间隔等参数,不同任务可独立配置
- **BaseOdsTask**:当前 ODS 任务基类(`tasks/ods/ods_tasks.py`封装时间窗口解析、API 分页拉取、结构感知写入、快照软删除等核心逻辑
- **TaskExecutor**:任务执行器(`orchestration/task_executor.py`),封装单个任务的执行生命周期(游标管理、运行记录、数据源路由)
- **FlowRunner**:流程编排器(`orchestration/flow_runner.py`编排多层任务ODS → DWD → DWS → INDEX的执行顺序
- **DWD_Loader**DWD 层加载任务(`DwdLoadTask`),通过 `_merge_dim_scd2()` 执行 SCD2 合并,将 ODS 原始数据转换为维度/事实表
## 需求
### 需求 1统一请求调度器
**用户故事:** 作为 ETL 运维人员,我希望所有 API 请求通过统一的调度器串行发送并遵循限流规则,避免触发上游风控导致 IP 封禁或数据拉取失败。
#### 验收标准
1. THE Request_Scheduler SHALL 维护一个全局请求队列,所有 ODS 任务的 API 请求统一进入该队列排队等待发送
2. THE Request_Scheduler SHALL 严格串行发送请求:等待上一个请求的 HTTP 响应完整返回后,再等待限流间隔,然后发送队列中的下一个请求
3. THE Rate_Limiter SHALL 在每两个相邻请求之间插入 5 至 20 秒之间的随机等待时间(均匀分布)
4. THE Rate_Limiter SHALL 支持通过 Pipeline_Config 调整最小间隔(默认 5 秒)和最大间隔(默认 20 秒),不同任务可配置不同的间隔范围
5. IF Rate_Limiter 初始化时最小间隔大于最大间隔THEN THE Rate_Limiter SHALL 抛出 `ValueError` 并包含描述性错误信息
6. WHEN 同一次 FlowRunner 执行中包含多个 ODS 任务时THE Request_Scheduler SHALL 按任务注册顺序依次处理每个任务的请求,同一时刻仅有一个 HTTP 请求在途
7. THE Request_Scheduler SHALL 在每个请求完成后记录请求耗时、响应状态码和目标 endpoint 到日志
### 需求 2异步处理与单线程写库架构
**用户故事:** 作为 ETL 运维人员,我希望 API 响应数据的处理字段提取、清洗、hash 计算)能与请求发送并行执行,同时保证数据库写入的串行化,在不增加 API 压力的前提下提升整体吞吐。
#### 验收标准
1. THE Unified_Pipeline SHALL 在每个 API 请求的响应返回后,立即将响应数据提交到 Processing_Pool 的任务队列,不阻塞 Request_Scheduler 的限流等待计时
2. THE Processing_Pool SHALL 支持多个工作线程并行消费处理队列中的响应数据执行字段提取、数据清洗、content_hash 计算、record 层合并等操作
3. THE Processing_Pool 的工作线程数量 SHALL 通过 Pipeline_Config 配置(默认值 2不同任务可独立配置
4. THE Write_Worker SHALL 作为单独的线程运行,从处理完成队列中消费数据,统一执行数据库 INSERT/UPSERT 操作
5. THE Write_Worker SHALL 支持批量写入:累积到配置的阈值(默认 100 条记录)或等待超时(默认 5 秒)后执行一次批量写入
6. THE Write_Worker 的批量写入阈值和等待超时 SHALL 通过 Pipeline_Config 配置,不同任务可独立配置
7. WHEN 所有请求发送完毕后THE Unified_Pipeline SHALL 等待 Processing_Pool 和 Write_Worker 全部完成后再返回最终结果
8. THE Unified_Pipeline SHALL 保证多线程读库操作的安全性Processing_Pool 中的工作线程可并行读取数据库(如查询最新 content_hash使用独立的只读数据库连接
### 需求 3外部取消信号支持
**用户故事:** 作为 ETL 运维人员,我希望能通过 Admin-web 或 CLI 发送取消信号中断正在执行的 ODS 任务,避免长时间运行的任务无法停止。
#### 验收标准
1. THE CancellationToken SHALL 提供线程安全的 `cancel()` 方法和 `is_cancelled` 属性,供外部组件触发取消
2. WHEN CancellationToken 被触发时THE Request_Scheduler SHALL 在当前请求的限流等待期或响应等待期中断,不再发起后续请求
3. WHEN CancellationToken 被触发时THE Processing_Pool SHALL 完成当前已提交到队列中的所有数据处理任务,不丢弃已入队的数据
4. WHEN CancellationToken 被触发时THE Write_Worker SHALL 将所有已处理完成的数据写入数据库后再退出,保证已处理数据的持久化
5. WHEN 任务因取消信号中断时THE Unified_Pipeline SHALL 返回部分完成的统计结果(已完成的请求数、已处理的记录数、已写入的记录数),任务状态标记为 `CANCELLED`
6. THE CancellationToken SHALL 支持超时自动取消:可在创建时指定最大执行时间(秒),超时后自动触发取消信号
7. IF CancellationToken 在任务启动前已处于取消状态THEN THE Unified_Pipeline SHALL 立即返回空结果,不发送任何请求
### 需求 4Pipeline 配置体系
**用户故事:** 作为 ETL 运维人员我希望线程模型的各项参数worker 数、队列大小、批量写入阈值、限流间隔)足够灵活,不同接口可以有不同的配置,以适应不同 API 的特性。
#### 验收标准
1. THE Pipeline_Config SHALL 支持以下可配置参数Processing_Pool 工作线程数(`workers`,默认 2、处理队列容量`queue_size`,默认 100、Write_Worker 批量写入阈值(`batch_size`,默认 100、Write_Worker 等待超时秒数(`batch_timeout`,默认 5.0、Rate_Limiter 最小间隔秒数(`rate_min`,默认 5.0、Rate_Limiter 最大间隔秒数(`rate_max`,默认 20.0
2. THE Pipeline_Config SHALL 遵循现有配置分层体系(根 `.env` < `.env.local` < 环境变量 < CLI 参数),通过 `AppConfig``pipeline.*` 命名空间读取
3. THE Pipeline_Config SHALL 支持任务级覆盖:通过 `pipeline.<task_code>.*` 命名空间为特定任务指定独立配置,未指定时回退到 `pipeline.*` 全局默认值
4. IF Pipeline_Config 中 `workers` 小于 1 或 `queue_size` 小于 1THEN THE Unified_Pipeline SHALL 抛出 `ValueError` 并包含描述性错误信息
5. IF Pipeline_Config 中 `batch_size` 小于 1THEN THE Unified_Pipeline SHALL 抛出 `ValueError` 并包含描述性错误信息
6. THE Pipeline_Config SHALL 支持运行时通过 CLI 参数 `--pipeline-workers``--pipeline-batch-size``--pipeline-rate-min``--pipeline-rate-max` 覆盖全局默认值
### 需求 5现有 ODS 任务迁移
**用户故事:** 作为 ETL 开发者,我希望现有 21 个 ODS 任务全部迁移到统一管道框架上,保持功能完全等价,不丢失任何现有能力。
#### 验收标准
1. THE Unified_Pipeline SHALL 完整保留 BaseOdsTask 的所有现有功能:时间窗口解析(`_resolve_window`)、窗口分段(`build_window_segments`、API 分页拉取(`iter_paginated`)、结构感知写入(`_insert_records_schema_aware`)、快照软删除(`_mark_missing_as_deleted`、content_hash 去重(`skip_unchanged`
2. THE Unified_Pipeline SHALL 保留 OdsTaskSpec 数据类的所有现有字段定义,迁移后的任务通过相同的 `OdsTaskSpec` 实例配置
3. WHEN 迁移完成后THE ETL_System 对每个 ODS 任务执行相同输入数据时 SHALL 产生与迁移前完全相同的数据库写入结果(相同的 inserted/updated/skipped 计数和相同的记录内容)
4. THE Unified_Pipeline SHALL 保留现有的 `endpoint_routing` 逻辑recent/former 路由拆分),迁移后的请求路由行为与现有系统一致
5. THE Unified_Pipeline SHALL 保留现有的 `source_file``source_endpoint``fetched_at` 等元数据写入逻辑
6. THE Unified_Pipeline SHALL 兼容现有的 `TaskExecutor` 执行生命周期(游标管理、运行记录、数据源路由),迁移后 TaskExecutor 无需修改调用方式
7. WHEN 迁移完成后THE TaskRegistry 中所有 21 个 ODS 任务的注册代码和元数据 SHALL 保持不变
### 需求 6二级详情拉取模式
**用户故事:** 作为 ETL 开发者,我希望统一管道框架支持"列表接口拉完后逐条调详情"的二级拉取模式,以便团购详情等需要二次请求的业务能在框架内实现。
#### 验收标准
1. THE Unified_Pipeline SHALL 支持可选的 Detail_Mode在列表接口的所有分页数据拉取并写入 ODS 完成后,从已写入的记录中提取 ID 列表,逐条调用详情接口
2. THE OdsTaskSpec SHALL 新增可选字段支持 Detail_Mode 配置:详情接口 endpoint、详情请求参数构造函数、详情数据目标表名、详情数据的 data_path 和 list_key
3. WHEN OdsTaskSpec 未配置 Detail_Mode 相关字段时THE Unified_Pipeline SHALL 跳过详情拉取阶段,行为与纯列表拉取模式完全一致
4. THE Detail_Mode 的详情请求 SHALL 通过 Request_Scheduler 统一排队,遵循与列表请求相同的限流规则
5. IF 详情接口对某个 ID 返回错误或超时THEN THE Unified_Pipeline SHALL 记录错误日志(含 ID 和错误信息)并继续处理下一个 ID不中断整体流程
6. WHEN 详情拉取完成后THE Unified_Pipeline SHALL 在任务执行结果中包含详情拉取的统计信息(详情成功数、详情失败数、详情跳过数),与列表拉取统计分开记录
### 需求 7DWD 层多线程优化
**用户故事:** 作为 ETL 运维人员,我希望 DWD 层加载任务能利用多线程并行处理多张表的 SCD2 合并,缩短 DWD 层的整体执行时间。
#### 验收标准
1. THE DWD_Loader SHALL 支持多线程并行执行多张 DWD 表的 SCD2 合并操作,每张表的合并在独立线程中运行
2. THE DWD_Loader 的并行线程数 SHALL 通过配置参数控制(默认值 4通过 `AppConfig``dwd.parallel_workers` 读取
3. THE DWD_Loader SHALL 保证每张表的 SCD2 合并操作在独立的数据库事务中执行,单张表失败不影响其他表的处理
4. WHEN 某张 DWD 表的 SCD2 合并失败时THE DWD_Loader SHALL 记录错误日志(含表名和错误信息),将该表标记为失败,继续处理其他表
5. THE DWD_Loader SHALL 在所有表处理完成后返回汇总结果:成功表数、失败表数、每张表的 inserted/updated 计数
6. THE DWD_Loader 的现有 `_merge_dim_scd2()` 方法 SHALL 保持不变,多线程优化仅在调度层面并行调用该方法
### 需求 8可观测性与监控
**用户故事:** 作为 ETL 运维人员,我希望统一管道框架提供充分的运行时可观测性,便于监控执行状态和排查问题。
#### 验收标准
1. THE Unified_Pipeline SHALL 在任务执行过程中记录以下关键指标到日志当前请求队列深度、Processing_Pool 活跃线程数、Write_Worker 待写入队列深度、已完成请求数/总请求数
2. THE Unified_Pipeline SHALL 在任务完成后输出执行摘要:总耗时、请求阶段耗时、处理阶段耗时、写入阶段耗时、各阶段的记录数统计
3. WHEN Processing_Pool 的任务队列达到容量上限时THE Unified_Pipeline SHALL 记录警告日志Request_Scheduler 暂停发送新请求直到队列有空位(背压机制)
4. WHEN Write_Worker 的待写入队列积压超过 `queue_size * 2`THE Unified_Pipeline SHALL 记录警告日志
5. THE Unified_Pipeline SHALL 与现有的 `EtlTimer` 集成,在 FlowRunner 的计时报告中体现各 ODS 任务的请求/处理/写入阶段耗时
### 需求 9错误处理与容错
**用户故事:** 作为 ETL 运维人员,我希望统一管道框架具备完善的错误处理机制,单个请求或记录的失败不影响整体任务的执行。
#### 验收标准
1. IF 单个 API 请求失败HTTP 错误、超时、API 返回错误码THEN THE Request_Scheduler SHALL 按现有 `APIClient` 的重试策略(最多 3 次,指数退避)重试,重试耗尽后记录错误并继续处理下一个请求
2. IF Processing_Pool 中某条记录的处理抛出异常THEN THE Processing_Pool SHALL 记录错误日志(含记录标识和异常信息),将该记录标记为处理失败,继续处理队列中的其他记录
3. IF Write_Worker 执行批量写入时发生数据库错误THEN THE Write_Worker SHALL 回滚当前批次的事务,记录错误日志(含批次大小和错误信息),将该批次的记录标记为写入失败
4. WHEN 任务执行完成后THE Unified_Pipeline SHALL 在执行结果中汇总所有错误:请求失败数、处理失败数、写入失败数,以及每个失败项的错误摘要
5. IF 任务执行过程中连续失败次数超过配置阈值(默认 10 次THEN THE Unified_Pipeline SHALL 主动中断任务执行,将任务状态标记为 `FAILED`,避免无效重试浪费时间
6. THE Unified_Pipeline SHALL 保留现有 BaseOdsTask 的事务管理语义每个窗口分段segment的数据在该分段全部处理完成后统一 commit分段失败时 rollback 该分段
### 需求 10Admin-web 日志输出优化
**用户故事:** 作为 ETL 运维人员,我希望在 Admin-web 管理后台查看 ETL 执行日志时,各个任务的日志按任务分组、有序展示,避免多任务并行执行时日志行交叉混乱导致难以阅读和排查问题。
#### 验收标准
1. THE ETL_System SHALL 为每个 ODS 任务的执行日志添加任务标识前缀(任务代码),使日志行可按任务归属区分
2. THE Admin-web SHALL 支持按任务代码过滤和分组展示 ETL 执行日志,用户可选择查看单个任务的日志或全部日志
3. THE Unified_Pipeline SHALL 在多线程环境下保证日志写入的原子性:每条日志消息作为完整的一行输出,不会被其他线程的日志截断或插入
4. THE ETL_System SHALL 为每个任务维护独立的日志缓冲区,任务完成后将该任务的完整日志按时间顺序一次性输出到 Admin-web避免执行过程中不同任务的日志行交叉
5. THE Admin-web SHALL 在 ETL 执行结果页面中按任务分段展示日志:每个任务的日志折叠为独立区块,展开后显示该任务的完整执行日志(含时间戳、日志级别、消息内容)
6. WHEN 多个 ODS 任务在同一次 FlowRunner 执行中运行时THE Admin-web SHALL 在顶部展示任务执行时间线概览(每个任务的开始时间、结束时间、状态),用户可点击跳转到对应任务的日志区块