Files

835 lines
35 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 技术设计ETL 统一请求编排与线程模型改造
> 对应需求文档:[requirements.md](./requirements.md)
## 1. 架构概览
本次改造将现有 21 个 ODS 任务从"同步串行执行"迁移到统一的"串行请求 + 异步处理 + 单线程写库"管道架构。核心设计原则:
- **请求串行化**:所有 API 请求通过全局 `RequestScheduler` 排队,严格一个接一个发送,遵循 `RateLimiter` 限流
- **处理并行化**API 响应提交到 `ProcessingPool` 多线程处理字段提取、hash 计算等),不阻塞请求线程
- **写入串行化**:所有数据库写入由单个 `WriteWorker` 线程执行,避免并发写入冲突
- **配置灵活化**:通过 `PipelineConfig` 支持全局默认 + 任务级覆盖
- **可取消**:通过 `CancellationToken` 支持外部取消信号,优雅中断
```
┌─────────────────────────────────────────────────────────────────┐
│ FlowRunner │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Unified_Pipeline │ │
│ │ │ │
│ │ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │ │
│ │ │ Request │───▶│ Processing │───▶│ Write │ │ │
│ │ │ Scheduler │ │ Pool │ │ Worker │ │ │
│ │ │ (串行请求) │ │ (N 工作线程) │ │ (单线程) │ │ │
│ │ └──────┬───────┘ └──────────────┘ └─────────────┘ │ │
│ │ │ │ │
│ │ ┌──────▼───────┐ ┌──────────────┐ │ │
│ │ │ Rate │ │ Cancellation │ │ │
│ │ │ Limiter │ │ Token │ │ │
│ │ │ (5-20s) │ │ (外部取消) │ │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ DWD_Loader (多线程 SCD2 调度) │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ Table1 │ │ Table2 │ │ Table3 │ │ Table4 │ ... │ │
│ │ │ SCD2 │ │ SCD2 │ │ SCD2 │ │ SCD2 │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 1.1 设计决策与理由
| 决策 | 选项 | 理由 |
|------|------|------|
| 请求串行 vs 并行 | 串行 | 上游飞球 API 无并发友好设计,并行请求易触发风控;串行 + 限流是最安全的策略 |
| 处理线程数 | 默认 2 | ODS 数据处理是轻量 CPU 操作JSON 解析、hash 计算2 线程足够消化请求间隔产生的积压 |
| 写入单线程 | 单线程 | PostgreSQL 单连接写入避免锁竞争和事务冲突,简化错误处理和回滚逻辑 |
| Pipeline 嵌入 vs 独立 | 嵌入 BaseOdsTask | Pipeline 作为 BaseOdsTask 内部执行引擎对外接口TaskExecutor、FlowRunner完全不变 |
| DWD 多线程 | 调度层并行 | 仅在调度层并行调用 `_merge_dim_scd2()`,方法本身不改,每张表独立事务 |
## 2. 架构
### 2.1 整体数据流
```mermaid
graph TD
A[FlowRunner.run] --> B[TaskExecutor.run_tasks]
B --> C{ODS 任务}
C --> D[BaseOdsTask.execute]
D --> E[UnifiedPipeline.run]
E --> F[RequestScheduler<br/>串行请求 + RateLimiter]
F --> G[ProcessingPool<br/>多线程处理]
G --> H[WriteWorker<br/>单线程写库]
H --> I[ODS 表写入完成]
I --> J{有 Detail_Mode?}
J -->|是| K[DetailFetcher<br/>二级详情拉取]
K --> F
J -->|否| L[返回结果]
K --> L
L --> M[DWD_Loader]
M --> N[多线程 SCD2 调度]
N --> O[DWD 表写入完成]
```
### 2.2 线程模型详细设计
```
主线程RequestScheduler
│ for request in request_queue:
│ if cancel_token.is_cancelled: break
│ resp = api_client.post(endpoint, params)
│ processing_queue.put((request_id, resp))
│ rate_limiter.wait(cancel_token.event)
│ processing_queue.put(SENTINEL × worker_count)
│ 等待所有 worker 完成
│ write_queue.put(SENTINEL)
│ 等待 writer 完成
├──▶ Worker Thread 1 ──┐
├──▶ Worker Thread 2 ──┤
│ │
│ processing_queue │
│ ┌─────────────┐ │
│ │ (id, resp) │───▶ 字段提取 / content_hash 计算
│ │ (id, resp) │ write_queue.put(processed_rows)
│ │ SENTINEL │
│ └─────────────┘ │
│ │
│ ▼
│ Write Thread (单线程)
│ ┌─────────────┐
│ │ write_queue │
│ │ batch=100 │──▶ UPSERT / INSERT
│ │ timeout=5s │
│ │ SENTINEL │
│ └─────────────┘
PipelineResult统计信息
```
关键设计点:
- `processing_queue``queue.Queue(maxsize=queue_size)`,默认 100满时 `RequestScheduler` 阻塞(背压机制)
- `write_queue``queue.Queue(maxsize=queue_size * 2)`,默认 200
- SENTINEL`None` 对象,通知线程退出
- 取消信号:主线程检查 `cancel_token`worker/writer 通过 SENTINEL 正常退出
- 批量写入:累积到 `batch_size`(默认 100或等待 `batch_timeout`(默认 5 秒)后执行一次
### 2.3 取消信号传递链
```
外部触发Admin-web / CLI / 超时)
│ cancel_token.cancel()
RequestScheduler
│ rate_limiter.wait() 提前返回 False
│ 主循环 break不再发新请求
ProcessingPool
│ 通过 SENTINEL 正常退出
│ 已入队数据全部处理完成
WriteWorker
│ 通过 SENTINEL 正常退出
│ 已处理数据全部写入 DB
返回 PipelineResult(cancelled=True, ...)
```
## 3. 组件与接口
### 3.1 PipelineConfig配置数据类
文件:`apps/etl/connectors/feiqiu/config/pipeline_config.py`
```python
@dataclass(frozen=True)
class PipelineConfig:
"""统一管道配置,支持全局默认 + 任务级覆盖。"""
workers: int = 2 # ProcessingPool 工作线程数
queue_size: int = 100 # 处理队列容量
batch_size: int = 100 # WriteWorker 批量写入阈值
batch_timeout: float = 5.0 # WriteWorker 等待超时(秒)
rate_min: float = 5.0 # RateLimiter 最小间隔(秒)
rate_max: float = 20.0 # RateLimiter 最大间隔(秒)
max_consecutive_failures: int = 10 # 连续失败中断阈值
def __post_init__(self):
if self.workers < 1:
raise ValueError(f"workers 必须 >= 1当前值: {self.workers}")
if self.queue_size < 1:
raise ValueError(f"queue_size 必须 >= 1当前值: {self.queue_size}")
if self.batch_size < 1:
raise ValueError(f"batch_size 必须 >= 1当前值: {self.batch_size}")
if self.rate_min > self.rate_max:
raise ValueError(
f"rate_min({self.rate_min}) 不能大于 rate_max({self.rate_max})"
)
@classmethod
def from_app_config(cls, config: AppConfig, task_code: str | None = None) -> "PipelineConfig":
"""从 AppConfig 加载,支持 pipeline.<task_code>.* 任务级覆盖。"""
def _get(key: str, default):
# 优先任务级 → 全局级 → 默认值
if task_code:
val = config.get(f"pipeline.{task_code.lower()}.{key}")
if val is not None:
return type(default)(val)
val = config.get(f"pipeline.{key}")
if val is not None:
return type(default)(val)
return default
return cls(
workers=_get("workers", 2),
queue_size=_get("queue_size", 100),
batch_size=_get("batch_size", 100),
batch_timeout=_get("batch_timeout", 5.0),
rate_min=_get("rate_min", 5.0),
rate_max=_get("rate_max", 20.0),
max_consecutive_failures=_get("max_consecutive_failures", 10),
)
```
### 3.2 CancellationToken取消令牌
文件:`apps/etl/connectors/feiqiu/utils/cancellation.py`
```python
class CancellationToken:
"""线程安全的取消令牌,封装 threading.Event。"""
def __init__(self, timeout: float | None = None):
self._event = threading.Event()
self._timer: threading.Timer | None = None
if timeout is not None and timeout > 0:
self._timer = threading.Timer(timeout, self.cancel)
self._timer.daemon = True
self._timer.start()
def cancel(self):
"""发出取消信号。"""
self._event.set()
@property
def is_cancelled(self) -> bool:
return self._event.is_set()
@property
def event(self) -> threading.Event:
return self._event
def dispose(self):
"""清理超时定时器。"""
if self._timer is not None:
self._timer.cancel()
self._timer = None
```
### 3.3 RateLimiter限流器
文件:`apps/etl/connectors/feiqiu/api/rate_limiter.py`
```python
class RateLimiter:
"""请求间隔控制器,支持取消信号中断等待。"""
def __init__(self, min_interval: float = 5.0, max_interval: float = 20.0):
if min_interval > max_interval:
raise ValueError(
f"min_interval({min_interval}) 不能大于 max_interval({max_interval})"
)
self._min = min_interval
self._max = max_interval
self._last_interval: float = 0.0
def wait(self, cancel_event: threading.Event | None = None) -> bool:
"""等待随机间隔。返回 False 表示被取消信号中断。
将等待时间拆分为 0.5s 小段,每段检查 cancel_event。"""
interval = random.uniform(self._min, self._max)
self._last_interval = interval
remaining = interval
while remaining > 0:
if cancel_event and cancel_event.is_set():
return False
sleep_time = min(0.5, remaining)
time.sleep(sleep_time)
remaining -= sleep_time
return True
@property
def last_interval(self) -> float:
return self._last_interval
```
### 3.4 UnifiedPipeline统一管道引擎
文件:`apps/etl/connectors/feiqiu/pipeline/unified_pipeline.py`
这是核心组件,封装"串行请求 + 异步处理 + 单线程写库"的完整执行引擎。
```python
@dataclass
class PipelineResult:
"""管道执行结果统计。"""
status: str = "SUCCESS" # SUCCESS / PARTIAL / CANCELLED / FAILED
total_requests: int = 0
completed_requests: int = 0
total_fetched: int = 0
total_inserted: int = 0
total_updated: int = 0
total_skipped: int = 0
request_failures: int = 0
processing_failures: int = 0
write_failures: int = 0
cancelled: bool = False
errors: list[dict] = field(default_factory=list)
timing: dict[str, float] = field(default_factory=dict) # 各阶段耗时
class UnifiedPipeline:
"""统一管道引擎:串行请求 + 异步处理 + 单线程写库。"""
def __init__(
self,
api_client: APIClient,
db_connection,
logger: logging.Logger,
config: PipelineConfig,
cancel_token: CancellationToken | None = None,
):
self.api = api_client
self.db = db_connection
self.logger = logger
self.config = config
self.cancel_token = cancel_token or CancellationToken()
self._rate_limiter = RateLimiter(config.rate_min, config.rate_max)
def run(
self,
requests: Iterable[PipelineRequest],
process_fn: Callable[[Any], list[dict]],
write_fn: Callable[[list[dict]], WriteResult],
) -> PipelineResult:
"""执行管道。
Args:
requests: 请求迭代器(由 BaseOdsTask 生成,包含 endpoint、params 等)
process_fn: 处理函数,将 API 响应转换为待写入记录列表
write_fn: 写入函数,将记录批量写入数据库
"""
if self.cancel_token.is_cancelled:
return PipelineResult(status="CANCELLED", cancelled=True)
processing_queue = queue.Queue(maxsize=self.config.queue_size)
write_queue = queue.Queue(maxsize=self.config.queue_size * 2)
result = PipelineResult()
# 启动处理线程池
workers = []
for i in range(self.config.workers):
t = threading.Thread(
target=self._process_worker,
args=(processing_queue, write_queue, process_fn, result),
name=f"pipeline-worker-{i}",
daemon=True,
)
t.start()
workers.append(t)
# 启动写入线程
writer = threading.Thread(
target=self._write_worker,
args=(write_queue, write_fn, result),
name="pipeline-writer",
daemon=True,
)
writer.start()
# 主线程:串行请求
self._request_loop(requests, processing_queue, result)
# 发送 SENTINEL 到处理队列
for _ in workers:
processing_queue.put(None)
for w in workers:
w.join()
# 发送 SENTINEL 到写入队列
write_queue.put(None)
writer.join()
# 确定最终状态
if result.cancelled:
result.status = "CANCELLED"
elif result.request_failures + result.processing_failures + result.write_failures > 0:
result.status = "PARTIAL"
return result
```
### 3.5 BaseOdsTask 改造
文件:`apps/etl/connectors/feiqiu/tasks/ods/ods_tasks.py`(修改现有文件)
改造策略:在 `BaseOdsTask.execute()` 内部用 `UnifiedPipeline` 替代现有的同步循环但保留所有现有功能时间窗口解析、分页拉取、结构感知写入、快照软删除、content_hash 去重)。
```python
class BaseOdsTask(BaseTask):
"""改造后的 ODS 任务基类。"""
def execute(self, cursor_data: dict | None = None) -> dict:
spec = self.SPEC
# ... 现有的窗口解析、分段逻辑保持不变 ...
# 构建 PipelineConfig支持任务级覆盖
pipeline_config = PipelineConfig.from_app_config(self.config, spec.code)
cancel_token = getattr(self, '_cancel_token', None)
pipeline = UnifiedPipeline(
api_client=self.api,
db_connection=self.db,
logger=self.logger,
config=pipeline_config,
cancel_token=cancel_token,
)
# 将现有的分页请求逻辑封装为 PipelineRequest 迭代器
# 将现有的 _insert_records_schema_aware 封装为 write_fn
# 将现有的字段提取/hash 计算封装为 process_fn
result = pipeline.run(
requests=self._build_requests(spec, segments, store_id, page_size),
process_fn=self._build_process_fn(spec),
write_fn=self._build_write_fn(spec, source_file),
)
# ... 快照软删除逻辑保持不变 ...
# ... 结果构建逻辑保持不变 ...
```
关键约束:
- `OdsTaskSpec` 数据类的所有现有字段保持不变
- `_insert_records_schema_aware()``_mark_missing_as_deleted()` 等方法保持不变
- `TaskExecutor` 调用 `task.execute(cursor_data)` 的接口保持不变
- `TaskRegistry` 中的注册代码保持不变
### 3.6 OdsTaskSpec 扩展Detail_Mode 支持)
在现有 `OdsTaskSpec` 数据类中新增可选字段:
```python
@dataclass(frozen=True)
class OdsTaskSpec:
# ... 所有现有字段保持不变 ...
# Detail_Mode 可选配置(新增)
detail_endpoint: str | None = None # 详情接口 endpoint
detail_param_builder: Callable[[dict], dict] | None = None # 详情请求参数构造
detail_target_table: str | None = None # 详情数据目标表名
detail_data_path: tuple[str, ...] | None = None # 详情数据的 data_path
detail_list_key: str | None = None # 详情数据的 list_key
detail_id_column: str | None = None # 从列表数据中提取 ID 的列名
```
`detail_endpoint``None`Pipeline 跳过详情拉取阶段,行为与纯列表模式完全一致。
### 3.7 DWD 多线程调度器
文件:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py`(修改现有文件)
改造 `DwdLoadTask.load()` 方法,将现有的串行 `for dwd_table, ods_table in TABLE_MAP` 循环改为 `concurrent.futures.ThreadPoolExecutor` 并行调度:
```python
def load(self, extracted: dict[str, Any], context: TaskContext) -> dict[str, Any]:
now = extracted["now"]
parallel_workers = int(self.config.get("dwd.parallel_workers", 4))
# 将表分为维度表和事实表两组
dim_tables = [(d, o) for d, o in self.TABLE_MAP.items()
if self._table_base(d).startswith("dim_")]
fact_tables = [(d, o) for d, o in self.TABLE_MAP.items()
if not self._table_base(d).startswith("dim_")]
summary = []
errors = []
# 维度表并行 SCD2 合并(每张表独立事务、独立数据库连接)
with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
futures = {}
for dwd_table, ods_table in dim_tables:
if only_tables and ...: # 过滤逻辑保持不变
continue
future = executor.submit(
self._process_single_table, dwd_table, ods_table, now, context
)
futures[future] = dwd_table
for future in as_completed(futures):
dwd_table = futures[future]
try:
table_result = future.result()
summary.append(table_result)
except Exception as exc:
errors.append({"table": dwd_table, "error": str(exc)})
# 事实表同样并行处理
# ... 类似逻辑 ...
return {"tables": summary, "errors": len(errors), "error_details": errors}
```
关键约束:
- `_merge_dim_scd2()` 方法本身不改
- 每张表使用独立的数据库连接和事务
- 单张表失败不影响其他表
### 3.8 任务日志管理器
文件:`apps/etl/connectors/feiqiu/utils/task_log_buffer.py`(新建)
```python
class TaskLogBuffer:
"""任务级日志缓冲区,收集单个任务的所有日志,任务完成后一次性输出。"""
def __init__(self, task_code: str, parent_logger: logging.Logger):
self.task_code = task_code
self._buffer: list[LogEntry] = []
self._lock = threading.Lock()
self._parent = parent_logger
def log(self, level: int, message: str, *args, **kwargs):
"""线程安全地缓冲一条日志。"""
with self._lock:
self._buffer.append(LogEntry(
timestamp=datetime.now(),
level=level,
task_code=self.task_code,
message=message % args if args else message,
))
def flush(self) -> list[LogEntry]:
"""将缓冲区内容按时间顺序一次性输出到父 logger并返回日志列表。"""
with self._lock:
entries = sorted(self._buffer, key=lambda e: e.timestamp)
for entry in entries:
self._parent.log(
entry.level,
"[%s] %s",
entry.task_code,
entry.message,
)
self._buffer.clear()
return entries
```
## 4. 数据模型
### 4.1 PipelineConfig 配置命名空间
`AppConfig` 中新增 `pipeline.*` 命名空间:
| 配置键 | 类型 | 默认值 | 说明 |
|--------|------|--------|------|
| `pipeline.workers` | int | 2 | ProcessingPool 工作线程数 |
| `pipeline.queue_size` | int | 100 | 处理队列容量 |
| `pipeline.batch_size` | int | 100 | WriteWorker 批量写入阈值 |
| `pipeline.batch_timeout` | float | 5.0 | WriteWorker 等待超时(秒) |
| `pipeline.rate_min` | float | 5.0 | RateLimiter 最小间隔(秒) |
| `pipeline.rate_max` | float | 20.0 | RateLimiter 最大间隔(秒) |
| `pipeline.max_consecutive_failures` | int | 10 | 连续失败中断阈值 |
| `pipeline.<TASK_CODE>.workers` | int | - | 任务级覆盖:工作线程数 |
| `pipeline.<TASK_CODE>.rate_min` | float | - | 任务级覆盖:最小间隔 |
| `pipeline.<TASK_CODE>.rate_max` | float | - | 任务级覆盖:最大间隔 |
| `dwd.parallel_workers` | int | 4 | DWD 层并行线程数 |
任务级覆盖示例(`.env`
```
PIPELINE_WORKERS=2
PIPELINE_RATE_MIN=5.0
PIPELINE_ODS_GROUP_PACKAGE.RATE_MIN=8.0
PIPELINE_ODS_GROUP_PACKAGE.RATE_MAX=25.0
```
CLI 参数覆盖:
```
--pipeline-workers 4
--pipeline-batch-size 200
--pipeline-rate-min 3.0
--pipeline-rate-max 15.0
```
### 4.2 PipelineRequest 数据类
```python
@dataclass
class PipelineRequest:
"""管道请求描述。"""
endpoint: str
params: dict
page_size: int | None = 200
data_path: tuple[str, ...] = ("data",)
list_key: str | None = None
segment_index: int = 0 # 所属窗口分段索引
is_detail: bool = False # 是否为详情请求
detail_id: Any = None # 详情请求的 ID
```
### 4.3 PipelineResult 数据类
```python
@dataclass
class PipelineResult:
"""管道执行结果。"""
status: str = "SUCCESS"
total_requests: int = 0
completed_requests: int = 0
total_fetched: int = 0
total_inserted: int = 0
total_updated: int = 0
total_skipped: int = 0
total_deleted: int = 0
request_failures: int = 0
processing_failures: int = 0
write_failures: int = 0
cancelled: bool = False
errors: list[dict] = field(default_factory=list)
timing: dict[str, float] = field(default_factory=dict)
# Detail_Mode 统计(仅在启用时填充)
detail_success: int = 0
detail_failure: int = 0
detail_skipped: int = 0
```
### 4.4 WriteResult 数据类
```python
@dataclass
class WriteResult:
"""单次批量写入结果。"""
inserted: int = 0
updated: int = 0
skipped: int = 0
errors: int = 0
```
### 4.5 LogEntry 数据类
```python
@dataclass
class LogEntry:
"""日志条目。"""
timestamp: datetime
level: int
task_code: str
message: str
```
### 4.6 现有数据模型不变
以下现有数据模型保持不变:
- `OdsTaskSpec`:仅新增 Detail_Mode 可选字段,所有现有字段不变
- `TaskContext`:不变
- `TaskMeta`:不变
- `SnapshotMode`:不变
- `ColumnSpec`:不变
## 5. 正确性属性
*属性Property是系统在所有有效执行中都应保持为真的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。*
以下属性基于需求文档中的验收标准推导,经过冗余消除和合并后得到 16 个独立属性。
### Property 1: 请求严格串行
*对于任意*一组提交到 RequestScheduler 的 API 请求,每个请求的发送时间戳必须严格晚于上一个请求的响应完成时间戳,无论请求来自同一个 ODS 任务还是不同的 ODS 任务。
**Validates: Requirements 1.2, 1.6**
### Property 2: RateLimiter 间隔范围
*对于任意*有效的 min_interval 和 max_interval 配置min_interval <= max_intervalRateLimiter.wait() 的实际等待时间始终在 [min_interval, max_interval] 范围内(允许 ±0.5s 的系统调度误差)。
**Validates: Requirements 1.3**
### Property 3: PipelineConfig 构造与验证
*对于任意*一组配置参数,当 workers >= 1、queue_size >= 1、batch_size >= 1 且 rate_min <= rate_max 时PipelineConfig 应成功构造并正确存储所有参数值;当任一条件不满足时,应抛出 ValueError。
**Validates: Requirements 1.5, 4.1, 4.4, 4.5**
### Property 4: 配置分层与任务级覆盖
*对于任意*任务代码和配置值组合PipelineConfig.from_app_config() 应遵循优先级:任务级配置(`pipeline.<task_code>.*`> 全局配置(`pipeline.*`> 默认值。当任务级配置存在时,应覆盖全局配置;当任务级配置不存在时,应回退到全局配置。
**Validates: Requirements 1.4, 2.3, 2.6, 4.2, 4.3, 4.6, 7.2**
### Property 5: 管道完成语义
*对于任意*一组成功的 API 请求无失败、无取消UnifiedPipeline.run() 返回时PipelineResult 中的 total_fetched 应等于所有请求返回的记录总数,且 total_inserted + total_updated + total_skipped 应等于 total_fetched。
**Validates: Requirements 2.7**
### Property 6: WriteWorker 批量大小约束
*对于任意*配置的 batch_sizeWriteWorker 每次调用 write_fn 时传入的记录数不超过 batch_size。
**Validates: Requirements 2.5**
### Property 7: CancellationToken 状态转换
*对于任意* CancellationToken 实例,初始状态 is_cancelled 为 False调用 cancel() 后 is_cancelled 变为 True 且不可逆转。对于配置了超时的 CancellationToken在超时时间到达后 is_cancelled 自动变为 True。
**Validates: Requirements 3.1, 3.6**
### Property 8: 取消后已入队数据不丢失
*对于任意*管道执行过程中触发取消信号的时刻,管道返回时:(a) 不再发送新的 API 请求,(b) 所有已提交到 processing_queue 的数据全部被处理完成,(c) 所有已处理完成的数据全部被写入数据库,(d) 返回结果的 status 为 CANCELLED 且 cancelled 为 True。
**Validates: Requirements 3.2, 3.3, 3.4, 3.5**
### Property 9: 迁移前后输出等价
*对于任意* ODS 任务和相同的输入数据API 响应序列),通过 UnifiedPipeline 执行后产生的数据库写入结果inserted/updated/skipped 计数和记录内容)应与迁移前的同步串行执行完全一致。
**Validates: Requirements 5.1, 5.3, 5.4, 5.5**
### Property 10: Detail_Mode 可选性
*对于任意* OdsTaskSpec当 detail_endpoint 为 None 时,管道执行应跳过详情拉取阶段,结果中 detail_success/detail_failure/detail_skipped 均为 0当 detail_endpoint 已配置时,管道应在列表拉取完成后执行详情拉取,且详情请求遵循与列表请求相同的限流规则。
**Validates: Requirements 6.1, 6.3, 6.4**
### Property 11: 单项失败不中断整体
*对于任意*管道执行中的单个失败项API 请求失败、处理异常、详情接口错误、DWD 单表合并失败),管道应继续处理后续项目,不中断整体流程,且失败项被正确记录在结果的 errors 列表中。
**Validates: Requirements 6.5, 9.1, 9.2, 7.3, 7.4**
### Property 12: 连续失败触发中断
*对于任意*管道执行,当连续失败次数超过 max_consecutive_failures 配置值时,管道应主动中断执行,返回结果的 status 为 FAILED。当连续失败次数未超过阈值时管道应继续执行。
**Validates: Requirements 9.5**
### Property 13: 写入失败回滚当前批次
*对于任意*批量写入操作,当 write_fn 抛出数据库异常时,当前批次的事务应被回滚(不产生部分写入),该批次的记录被标记为写入失败,后续批次不受影响。
**Validates: Requirements 9.3**
### Property 14: 结果统计完整性
*对于任意*管道执行(包括 Detail_Mode 和 DWD 多线程返回结果中的统计信息应完整且一致request_failures + processing_failures + write_failures 应等于 errors 列表的长度detail_success + detail_failure + detail_skipped 应等于详情请求总数DWD 汇总中成功表数 + 失败表数应等于总表数。
**Validates: Requirements 6.6, 8.2, 9.4, 7.5**
### Property 15: 日志缓冲区按任务隔离
*对于任意*多个并发任务的日志流,每个 TaskLogBuffer 的 flush() 输出应仅包含该任务的日志条目,且按时间戳升序排列,不包含其他任务的日志。
**Validates: Requirements 10.1, 10.4**
### Property 16: DWD 并行与串行结果一致
*对于任意*一组 DWD 表的 SCD2 合并操作,多线程并行执行的最终结果(每张表的 inserted/updated 计数)应与串行逐表执行的结果完全一致。
**Validates: Requirements 7.1**
## 6. 错误处理
### 6.1 错误分类与处理策略
| 错误类型 | 触发条件 | 处理方式 | 影响范围 |
|----------|----------|----------|----------|
| API 请求失败 | HTTP 错误、超时、API 返回错误码 | 由 `APIClient` 内置重试3 次指数退避);耗尽后记录错误,继续下一个请求 | 单个请求 |
| 处理异常 | 字段提取、hash 计算等抛出异常 | 捕获异常,记录错误日志(含记录标识),标记为处理失败,继续处理队列 | 单条记录 |
| 写入失败 | 数据库错误(约束冲突、连接断开等) | 回滚当前批次事务,记录错误日志(含批次大小),标记为写入失败 | 单个批次 |
| 连续失败 | 连续 N 次请求/处理/写入失败 | 主动中断管道status=FAILED | 整个任务 |
| 取消信号 | 外部触发 cancel_token.cancel() | 停止新请求,等待已入队数据处理完成后退出 | 整个任务 |
| 配置错误 | workers<1, rate_min>rate_max 等 | 构造时抛出 ValueError任务不启动 | 整个任务 |
| DWD 单表失败 | SCD2 合并过程中异常 | 回滚该表事务,记录错误,继续处理其他表 | 单张表 |
### 6.2 连续失败计数逻辑
```python
consecutive_failures = 0
for request in requests:
try:
response = api_client.post(...)
consecutive_failures = 0 # 成功则重置
except Exception:
consecutive_failures += 1
if consecutive_failures >= config.max_consecutive_failures:
result.status = "FAILED"
break
```
### 6.3 事务管理
- ODS 层每个窗口分段segment的数据在该分段全部处理完成后统一 commit分段失败时 rollback 该分段(保留现有语义)
- DWD 层:每张表独立事务,单表失败 rollback 不影响其他表
- WriteWorker每个批次独立事务批次失败 rollback 不影响后续批次
## 7. 测试策略
### 7.1 测试框架
- 单元测试:`pytest`ETL 模块内 `tests/unit/`
- 属性测试:`hypothesis`Monorepo 级 `tests/`
- 每个属性测试最少运行 100 次迭代
### 7.2 属性测试计划
每个正确性属性对应一个属性测试,使用 `hypothesis` 库实现:
| 属性 | 测试文件 | 生成器策略 |
|------|----------|-----------|
| P1: 请求严格串行 | `tests/test_pipeline_properties.py` | 生成随机请求序列,用 FakeAPI 记录时间戳 |
| P2: RateLimiter 间隔范围 | `tests/test_rate_limiter_properties.py` | 生成随机 (min, max) 对,验证 wait() 时间 |
| P3: PipelineConfig 构造 | `tests/test_pipeline_config_properties.py` | 生成随机配置参数组合(含无效值) |
| P4: 配置分层覆盖 | `tests/test_pipeline_config_properties.py` | 生成随机的多层配置字典 |
| P5: 管道完成语义 | `tests/test_pipeline_properties.py` | 生成随机记录集,验证计数一致 |
| P6: WriteWorker 批量约束 | `tests/test_pipeline_properties.py` | 生成随机 batch_size 和记录流 |
| P7: CancellationToken 状态 | `tests/test_cancellation_properties.py` | 生成随机超时值 |
| P8: 取消后数据不丢失 | `tests/test_pipeline_properties.py` | 生成随机请求序列 + 随机取消时刻 |
| P9: 迁移等价 | `tests/test_migration_properties.py` | 生成随机 API 响应,对比新旧实现 |
| P10: Detail_Mode 可选性 | `tests/test_detail_mode_properties.py` | 生成有/无 detail_endpoint 的 OdsTaskSpec |
| P11: 单项失败不中断 | `tests/test_pipeline_properties.py` | 生成含随机失败的请求序列 |
| P12: 连续失败中断 | `tests/test_pipeline_properties.py` | 生成连续失败序列 + 随机阈值 |
| P13: 写入失败回滚 | `tests/test_pipeline_properties.py` | 生成含随机写入失败的批次 |
| P14: 结果统计完整性 | `tests/test_pipeline_properties.py` | 生成随机执行结果,验证计数一致性 |
| P15: 日志缓冲区隔离 | `tests/test_log_buffer_properties.py` | 生成多任务随机日志流 |
| P16: DWD 并行串行一致 | `tests/test_dwd_parallel_properties.py` | 生成随机表集合 + mock SCD2 |
每个测试必须包含注释标签:
```python
# Feature: etl-unified-pipeline, Property 1: 请求严格串行
```
### 7.3 单元测试计划
单元测试聚焦于具体示例、边界条件和集成点:
| 测试目标 | 测试文件 | 覆盖内容 |
|----------|----------|----------|
| RateLimiter | `tests/unit/test_rate_limiter.py` | 边界min=max、取消中断、min>max 抛错 |
| CancellationToken | `tests/unit/test_cancellation.py` | 边界:预取消、超时=0、dispose |
| PipelineConfig | `tests/unit/test_pipeline_config.py` | 边界无效参数、CLI 覆盖 |
| UnifiedPipeline | `tests/unit/test_unified_pipeline.py` | 集成FakeAPI + FakeDB 端到端 |
| TaskLogBuffer | `tests/unit/test_task_log_buffer.py` | 边界:空缓冲区、并发写入 |
| DWD 多线程调度 | `tests/unit/test_dwd_parallel.py` | 集成mock SCD2 + 单表失败 |
| Detail_Mode | `tests/unit/test_detail_mode.py` | 集成:列表→详情完整流程 |
### 7.4 测试环境
- 单元测试使用 FakeDB/FakeAPI不涉及真实数据库连接
- 属性测试使用 `hypothesis` 库,最少 100 次迭代
- 集成测试(如需)使用 `test_etl_feiqiu` 测试库,通过 `TEST_DB_DSN` 连接