Files

35 KiB
Raw Permalink Blame History

技术设计ETL 统一请求编排与线程模型改造

对应需求文档:requirements.md

1. 架构概览

本次改造将现有 21 个 ODS 任务从"同步串行执行"迁移到统一的"串行请求 + 异步处理 + 单线程写库"管道架构。核心设计原则:

  • 请求串行化:所有 API 请求通过全局 RequestScheduler 排队,严格一个接一个发送,遵循 RateLimiter 限流
  • 处理并行化API 响应提交到 ProcessingPool 多线程处理字段提取、hash 计算等),不阻塞请求线程
  • 写入串行化:所有数据库写入由单个 WriteWorker 线程执行,避免并发写入冲突
  • 配置灵活化:通过 PipelineConfig 支持全局默认 + 任务级覆盖
  • 可取消:通过 CancellationToken 支持外部取消信号,优雅中断
┌─────────────────────────────────────────────────────────────────┐
│                      FlowRunner                                  │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                  Unified_Pipeline                         │   │
│  │                                                           │   │
│  │  ┌─────────────┐    ┌──────────────┐    ┌─────────────┐  │   │
│  │  │  Request     │───▶│ Processing   │───▶│  Write      │  │   │
│  │  │  Scheduler   │    │ Pool         │    │  Worker     │  │   │
│  │  │  (串行请求)  │    │ (N 工作线程) │    │  (单线程)   │  │   │
│  │  └──────┬───────┘    └──────────────┘    └─────────────┘  │   │
│  │         │                                                  │   │
│  │  ┌──────▼───────┐    ┌──────────────┐                     │   │
│  │  │  Rate        │    │ Cancellation │                     │   │
│  │  │  Limiter     │    │ Token        │                     │   │
│  │  │  (5-20s)     │    │ (外部取消)   │                     │   │
│  │  └──────────────┘    └──────────────┘                     │   │
│  └──────────────────────────────────────────────────────────┘   │
│                                                                  │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │  DWD_Loader (多线程 SCD2 调度)                            │   │
│  │  ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐            │   │
│  │  │ Table1 │ │ Table2 │ │ Table3 │ │ Table4 │  ...       │   │
│  │  │ SCD2   │ │ SCD2   │ │ SCD2   │ │ SCD2   │            │   │
│  │  └────────┘ └────────┘ └────────┘ └────────┘            │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

1.1 设计决策与理由

决策 选项 理由
请求串行 vs 并行 串行 上游飞球 API 无并发友好设计,并行请求易触发风控;串行 + 限流是最安全的策略
处理线程数 默认 2 ODS 数据处理是轻量 CPU 操作JSON 解析、hash 计算2 线程足够消化请求间隔产生的积压
写入单线程 单线程 PostgreSQL 单连接写入避免锁竞争和事务冲突,简化错误处理和回滚逻辑
Pipeline 嵌入 vs 独立 嵌入 BaseOdsTask Pipeline 作为 BaseOdsTask 内部执行引擎对外接口TaskExecutor、FlowRunner完全不变
DWD 多线程 调度层并行 仅在调度层并行调用 _merge_dim_scd2(),方法本身不改,每张表独立事务

2. 架构

2.1 整体数据流

graph TD
    A[FlowRunner.run] --> B[TaskExecutor.run_tasks]
    B --> C{ODS 任务}
    C --> D[BaseOdsTask.execute]
    D --> E[UnifiedPipeline.run]
    E --> F[RequestScheduler<br/>串行请求 + RateLimiter]
    F --> G[ProcessingPool<br/>多线程处理]
    G --> H[WriteWorker<br/>单线程写库]
    H --> I[ODS 表写入完成]
    
    I --> J{有 Detail_Mode?}
    J -->|是| K[DetailFetcher<br/>二级详情拉取]
    K --> F
    J -->|否| L[返回结果]
    K --> L
    
    L --> M[DWD_Loader]
    M --> N[多线程 SCD2 调度]
    N --> O[DWD 表写入完成]

2.2 线程模型详细设计

主线程RequestScheduler
    │
    │  for request in request_queue:
    │      if cancel_token.is_cancelled: break
    │      resp = api_client.post(endpoint, params)
    │      processing_queue.put((request_id, resp))
    │      rate_limiter.wait(cancel_token.event)
    │
    │  processing_queue.put(SENTINEL × worker_count)
    │  等待所有 worker 完成
    │  write_queue.put(SENTINEL)
    │  等待 writer 完成
    │
    ├──▶ Worker Thread 1 ──┐
    ├──▶ Worker Thread 2 ──┤
    │                      │
    │   processing_queue   │
    │   ┌─────────────┐    │
    │   │ (id, resp)   │───▶ 字段提取 / content_hash 计算
    │   │ (id, resp)   │     write_queue.put(processed_rows)
    │   │ SENTINEL     │
    │   └─────────────┘    │
    │                      │
    │                      ▼
    │              Write Thread (单线程)
    │              ┌─────────────┐
    │              │ write_queue  │
    │              │ batch=100    │──▶ UPSERT / INSERT
    │              │ timeout=5s   │
    │              │ SENTINEL     │
    │              └─────────────┘
    │
    ▼
PipelineResult统计信息

关键设计点:

  • processing_queuequeue.Queue(maxsize=queue_size),默认 100满时 RequestScheduler 阻塞(背压机制)
  • write_queuequeue.Queue(maxsize=queue_size * 2),默认 200
  • SENTINELNone 对象,通知线程退出
  • 取消信号:主线程检查 cancel_tokenworker/writer 通过 SENTINEL 正常退出
  • 批量写入:累积到 batch_size(默认 100或等待 batch_timeout(默认 5 秒)后执行一次

2.3 取消信号传递链

外部触发Admin-web / CLI / 超时)
    │ cancel_token.cancel()
    ▼
RequestScheduler
    │ rate_limiter.wait() 提前返回 False
    │ 主循环 break不再发新请求
    ▼
ProcessingPool
    │ 通过 SENTINEL 正常退出
    │ 已入队数据全部处理完成
    ▼
WriteWorker
    │ 通过 SENTINEL 正常退出
    │ 已处理数据全部写入 DB
    ▼
返回 PipelineResult(cancelled=True, ...)

3. 组件与接口

3.1 PipelineConfig配置数据类

文件:apps/etl/connectors/feiqiu/config/pipeline_config.py

@dataclass(frozen=True)
class PipelineConfig:
    """统一管道配置,支持全局默认 + 任务级覆盖。"""
    workers: int = 2              # ProcessingPool 工作线程数
    queue_size: int = 100         # 处理队列容量
    batch_size: int = 100         # WriteWorker 批量写入阈值
    batch_timeout: float = 5.0    # WriteWorker 等待超时(秒)
    rate_min: float = 5.0         # RateLimiter 最小间隔(秒)
    rate_max: float = 20.0        # RateLimiter 最大间隔(秒)
    max_consecutive_failures: int = 10  # 连续失败中断阈值

    def __post_init__(self):
        if self.workers < 1:
            raise ValueError(f"workers 必须 >= 1当前值: {self.workers}")
        if self.queue_size < 1:
            raise ValueError(f"queue_size 必须 >= 1当前值: {self.queue_size}")
        if self.batch_size < 1:
            raise ValueError(f"batch_size 必须 >= 1当前值: {self.batch_size}")
        if self.rate_min > self.rate_max:
            raise ValueError(
                f"rate_min({self.rate_min}) 不能大于 rate_max({self.rate_max})"
            )

    @classmethod
    def from_app_config(cls, config: AppConfig, task_code: str | None = None) -> "PipelineConfig":
        """从 AppConfig 加载,支持 pipeline.<task_code>.* 任务级覆盖。"""
        def _get(key: str, default):
            # 优先任务级 → 全局级 → 默认值
            if task_code:
                val = config.get(f"pipeline.{task_code.lower()}.{key}")
                if val is not None:
                    return type(default)(val)
            val = config.get(f"pipeline.{key}")
            if val is not None:
                return type(default)(val)
            return default

        return cls(
            workers=_get("workers", 2),
            queue_size=_get("queue_size", 100),
            batch_size=_get("batch_size", 100),
            batch_timeout=_get("batch_timeout", 5.0),
            rate_min=_get("rate_min", 5.0),
            rate_max=_get("rate_max", 20.0),
            max_consecutive_failures=_get("max_consecutive_failures", 10),
        )

3.2 CancellationToken取消令牌

文件:apps/etl/connectors/feiqiu/utils/cancellation.py

class CancellationToken:
    """线程安全的取消令牌,封装 threading.Event。"""
    def __init__(self, timeout: float | None = None):
        self._event = threading.Event()
        self._timer: threading.Timer | None = None
        if timeout is not None and timeout > 0:
            self._timer = threading.Timer(timeout, self.cancel)
            self._timer.daemon = True
            self._timer.start()

    def cancel(self):
        """发出取消信号。"""
        self._event.set()

    @property
    def is_cancelled(self) -> bool:
        return self._event.is_set()

    @property
    def event(self) -> threading.Event:
        return self._event

    def dispose(self):
        """清理超时定时器。"""
        if self._timer is not None:
            self._timer.cancel()
            self._timer = None

3.3 RateLimiter限流器

文件:apps/etl/connectors/feiqiu/api/rate_limiter.py

class RateLimiter:
    """请求间隔控制器,支持取消信号中断等待。"""
    def __init__(self, min_interval: float = 5.0, max_interval: float = 20.0):
        if min_interval > max_interval:
            raise ValueError(
                f"min_interval({min_interval}) 不能大于 max_interval({max_interval})"
            )
        self._min = min_interval
        self._max = max_interval
        self._last_interval: float = 0.0

    def wait(self, cancel_event: threading.Event | None = None) -> bool:
        """等待随机间隔。返回 False 表示被取消信号中断。
        将等待时间拆分为 0.5s 小段,每段检查 cancel_event。"""
        interval = random.uniform(self._min, self._max)
        self._last_interval = interval
        remaining = interval
        while remaining > 0:
            if cancel_event and cancel_event.is_set():
                return False
            sleep_time = min(0.5, remaining)
            time.sleep(sleep_time)
            remaining -= sleep_time
        return True

    @property
    def last_interval(self) -> float:
        return self._last_interval

3.4 UnifiedPipeline统一管道引擎

文件:apps/etl/connectors/feiqiu/pipeline/unified_pipeline.py

这是核心组件,封装"串行请求 + 异步处理 + 单线程写库"的完整执行引擎。

@dataclass
class PipelineResult:
    """管道执行结果统计。"""
    status: str = "SUCCESS"           # SUCCESS / PARTIAL / CANCELLED / FAILED
    total_requests: int = 0
    completed_requests: int = 0
    total_fetched: int = 0
    total_inserted: int = 0
    total_updated: int = 0
    total_skipped: int = 0
    request_failures: int = 0
    processing_failures: int = 0
    write_failures: int = 0
    cancelled: bool = False
    errors: list[dict] = field(default_factory=list)
    timing: dict[str, float] = field(default_factory=dict)  # 各阶段耗时

class UnifiedPipeline:
    """统一管道引擎:串行请求 + 异步处理 + 单线程写库。"""

    def __init__(
        self,
        api_client: APIClient,
        db_connection,
        logger: logging.Logger,
        config: PipelineConfig,
        cancel_token: CancellationToken | None = None,
    ):
        self.api = api_client
        self.db = db_connection
        self.logger = logger
        self.config = config
        self.cancel_token = cancel_token or CancellationToken()
        self._rate_limiter = RateLimiter(config.rate_min, config.rate_max)

    def run(
        self,
        requests: Iterable[PipelineRequest],
        process_fn: Callable[[Any], list[dict]],
        write_fn: Callable[[list[dict]], WriteResult],
    ) -> PipelineResult:
        """执行管道。
        
        Args:
            requests: 请求迭代器(由 BaseOdsTask 生成,包含 endpoint、params 等)
            process_fn: 处理函数,将 API 响应转换为待写入记录列表
            write_fn: 写入函数,将记录批量写入数据库
        """
        if self.cancel_token.is_cancelled:
            return PipelineResult(status="CANCELLED", cancelled=True)

        processing_queue = queue.Queue(maxsize=self.config.queue_size)
        write_queue = queue.Queue(maxsize=self.config.queue_size * 2)
        result = PipelineResult()

        # 启动处理线程池
        workers = []
        for i in range(self.config.workers):
            t = threading.Thread(
                target=self._process_worker,
                args=(processing_queue, write_queue, process_fn, result),
                name=f"pipeline-worker-{i}",
                daemon=True,
            )
            t.start()
            workers.append(t)

        # 启动写入线程
        writer = threading.Thread(
            target=self._write_worker,
            args=(write_queue, write_fn, result),
            name="pipeline-writer",
            daemon=True,
        )
        writer.start()

        # 主线程:串行请求
        self._request_loop(requests, processing_queue, result)

        # 发送 SENTINEL 到处理队列
        for _ in workers:
            processing_queue.put(None)
        for w in workers:
            w.join()

        # 发送 SENTINEL 到写入队列
        write_queue.put(None)
        writer.join()

        # 确定最终状态
        if result.cancelled:
            result.status = "CANCELLED"
        elif result.request_failures + result.processing_failures + result.write_failures > 0:
            result.status = "PARTIAL"

        return result

3.5 BaseOdsTask 改造

文件:apps/etl/connectors/feiqiu/tasks/ods/ods_tasks.py(修改现有文件)

改造策略:在 BaseOdsTask.execute() 内部用 UnifiedPipeline 替代现有的同步循环但保留所有现有功能时间窗口解析、分页拉取、结构感知写入、快照软删除、content_hash 去重)。

class BaseOdsTask(BaseTask):
    """改造后的 ODS 任务基类。"""

    def execute(self, cursor_data: dict | None = None) -> dict:
        spec = self.SPEC
        # ... 现有的窗口解析、分段逻辑保持不变 ...

        # 构建 PipelineConfig支持任务级覆盖
        pipeline_config = PipelineConfig.from_app_config(self.config, spec.code)
        cancel_token = getattr(self, '_cancel_token', None)

        pipeline = UnifiedPipeline(
            api_client=self.api,
            db_connection=self.db,
            logger=self.logger,
            config=pipeline_config,
            cancel_token=cancel_token,
        )

        # 将现有的分页请求逻辑封装为 PipelineRequest 迭代器
        # 将现有的 _insert_records_schema_aware 封装为 write_fn
        # 将现有的字段提取/hash 计算封装为 process_fn
        result = pipeline.run(
            requests=self._build_requests(spec, segments, store_id, page_size),
            process_fn=self._build_process_fn(spec),
            write_fn=self._build_write_fn(spec, source_file),
        )

        # ... 快照软删除逻辑保持不变 ...
        # ... 结果构建逻辑保持不变 ...

关键约束:

  • OdsTaskSpec 数据类的所有现有字段保持不变
  • _insert_records_schema_aware()_mark_missing_as_deleted() 等方法保持不变
  • TaskExecutor 调用 task.execute(cursor_data) 的接口保持不变
  • TaskRegistry 中的注册代码保持不变

3.6 OdsTaskSpec 扩展Detail_Mode 支持)

在现有 OdsTaskSpec 数据类中新增可选字段:

@dataclass(frozen=True)
class OdsTaskSpec:
    # ... 所有现有字段保持不变 ...

    # Detail_Mode 可选配置(新增)
    detail_endpoint: str | None = None          # 详情接口 endpoint
    detail_param_builder: Callable[[dict], dict] | None = None  # 详情请求参数构造
    detail_target_table: str | None = None      # 详情数据目标表名
    detail_data_path: tuple[str, ...] | None = None  # 详情数据的 data_path
    detail_list_key: str | None = None          # 详情数据的 list_key
    detail_id_column: str | None = None         # 从列表数据中提取 ID 的列名

detail_endpointNonePipeline 跳过详情拉取阶段,行为与纯列表模式完全一致。

3.7 DWD 多线程调度器

文件:apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py(修改现有文件)

改造 DwdLoadTask.load() 方法,将现有的串行 for dwd_table, ods_table in TABLE_MAP 循环改为 concurrent.futures.ThreadPoolExecutor 并行调度:

def load(self, extracted: dict[str, Any], context: TaskContext) -> dict[str, Any]:
    now = extracted["now"]
    parallel_workers = int(self.config.get("dwd.parallel_workers", 4))

    # 将表分为维度表和事实表两组
    dim_tables = [(d, o) for d, o in self.TABLE_MAP.items() 
                  if self._table_base(d).startswith("dim_")]
    fact_tables = [(d, o) for d, o in self.TABLE_MAP.items() 
                   if not self._table_base(d).startswith("dim_")]

    summary = []
    errors = []

    # 维度表并行 SCD2 合并(每张表独立事务、独立数据库连接)
    with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
        futures = {}
        for dwd_table, ods_table in dim_tables:
            if only_tables and ...:  # 过滤逻辑保持不变
                continue
            future = executor.submit(
                self._process_single_table, dwd_table, ods_table, now, context
            )
            futures[future] = dwd_table

        for future in as_completed(futures):
            dwd_table = futures[future]
            try:
                table_result = future.result()
                summary.append(table_result)
            except Exception as exc:
                errors.append({"table": dwd_table, "error": str(exc)})

    # 事实表同样并行处理
    # ... 类似逻辑 ...

    return {"tables": summary, "errors": len(errors), "error_details": errors}

关键约束:

  • _merge_dim_scd2() 方法本身不改
  • 每张表使用独立的数据库连接和事务
  • 单张表失败不影响其他表

3.8 任务日志管理器

文件:apps/etl/connectors/feiqiu/utils/task_log_buffer.py(新建)

class TaskLogBuffer:
    """任务级日志缓冲区,收集单个任务的所有日志,任务完成后一次性输出。"""

    def __init__(self, task_code: str, parent_logger: logging.Logger):
        self.task_code = task_code
        self._buffer: list[LogEntry] = []
        self._lock = threading.Lock()
        self._parent = parent_logger

    def log(self, level: int, message: str, *args, **kwargs):
        """线程安全地缓冲一条日志。"""
        with self._lock:
            self._buffer.append(LogEntry(
                timestamp=datetime.now(),
                level=level,
                task_code=self.task_code,
                message=message % args if args else message,
            ))

    def flush(self) -> list[LogEntry]:
        """将缓冲区内容按时间顺序一次性输出到父 logger并返回日志列表。"""
        with self._lock:
            entries = sorted(self._buffer, key=lambda e: e.timestamp)
            for entry in entries:
                self._parent.log(
                    entry.level,
                    "[%s] %s",
                    entry.task_code,
                    entry.message,
                )
            self._buffer.clear()
            return entries

4. 数据模型

4.1 PipelineConfig 配置命名空间

AppConfig 中新增 pipeline.* 命名空间:

配置键 类型 默认值 说明
pipeline.workers int 2 ProcessingPool 工作线程数
pipeline.queue_size int 100 处理队列容量
pipeline.batch_size int 100 WriteWorker 批量写入阈值
pipeline.batch_timeout float 5.0 WriteWorker 等待超时(秒)
pipeline.rate_min float 5.0 RateLimiter 最小间隔(秒)
pipeline.rate_max float 20.0 RateLimiter 最大间隔(秒)
pipeline.max_consecutive_failures int 10 连续失败中断阈值
pipeline.<TASK_CODE>.workers int - 任务级覆盖:工作线程数
pipeline.<TASK_CODE>.rate_min float - 任务级覆盖:最小间隔
pipeline.<TASK_CODE>.rate_max float - 任务级覆盖:最大间隔
dwd.parallel_workers int 4 DWD 层并行线程数

任务级覆盖示例(.env

PIPELINE_WORKERS=2
PIPELINE_RATE_MIN=5.0
PIPELINE_ODS_GROUP_PACKAGE.RATE_MIN=8.0
PIPELINE_ODS_GROUP_PACKAGE.RATE_MAX=25.0

CLI 参数覆盖:

--pipeline-workers 4
--pipeline-batch-size 200
--pipeline-rate-min 3.0
--pipeline-rate-max 15.0

4.2 PipelineRequest 数据类

@dataclass
class PipelineRequest:
    """管道请求描述。"""
    endpoint: str
    params: dict
    page_size: int | None = 200
    data_path: tuple[str, ...] = ("data",)
    list_key: str | None = None
    segment_index: int = 0        # 所属窗口分段索引
    is_detail: bool = False       # 是否为详情请求
    detail_id: Any = None         # 详情请求的 ID

4.3 PipelineResult 数据类

@dataclass
class PipelineResult:
    """管道执行结果。"""
    status: str = "SUCCESS"
    total_requests: int = 0
    completed_requests: int = 0
    total_fetched: int = 0
    total_inserted: int = 0
    total_updated: int = 0
    total_skipped: int = 0
    total_deleted: int = 0
    request_failures: int = 0
    processing_failures: int = 0
    write_failures: int = 0
    cancelled: bool = False
    errors: list[dict] = field(default_factory=list)
    timing: dict[str, float] = field(default_factory=dict)
    # Detail_Mode 统计(仅在启用时填充)
    detail_success: int = 0
    detail_failure: int = 0
    detail_skipped: int = 0

4.4 WriteResult 数据类

@dataclass
class WriteResult:
    """单次批量写入结果。"""
    inserted: int = 0
    updated: int = 0
    skipped: int = 0
    errors: int = 0

4.5 LogEntry 数据类

@dataclass
class LogEntry:
    """日志条目。"""
    timestamp: datetime
    level: int
    task_code: str
    message: str

4.6 现有数据模型不变

以下现有数据模型保持不变:

  • OdsTaskSpec:仅新增 Detail_Mode 可选字段,所有现有字段不变
  • TaskContext:不变
  • TaskMeta:不变
  • SnapshotMode:不变
  • ColumnSpec:不变

5. 正确性属性

属性Property是系统在所有有效执行中都应保持为真的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。

以下属性基于需求文档中的验收标准推导,经过冗余消除和合并后得到 16 个独立属性。

Property 1: 请求严格串行

对于任意一组提交到 RequestScheduler 的 API 请求,每个请求的发送时间戳必须严格晚于上一个请求的响应完成时间戳,无论请求来自同一个 ODS 任务还是不同的 ODS 任务。

Validates: Requirements 1.2, 1.6

Property 2: RateLimiter 间隔范围

对于任意有效的 min_interval 和 max_interval 配置min_interval <= max_intervalRateLimiter.wait() 的实际等待时间始终在 [min_interval, max_interval] 范围内(允许 ±0.5s 的系统调度误差)。

Validates: Requirements 1.3

Property 3: PipelineConfig 构造与验证

对于任意一组配置参数,当 workers >= 1、queue_size >= 1、batch_size >= 1 且 rate_min <= rate_max 时PipelineConfig 应成功构造并正确存储所有参数值;当任一条件不满足时,应抛出 ValueError。

Validates: Requirements 1.5, 4.1, 4.4, 4.5

Property 4: 配置分层与任务级覆盖

对于任意任务代码和配置值组合PipelineConfig.from_app_config() 应遵循优先级:任务级配置(pipeline.<task_code>.*> 全局配置(pipeline.*> 默认值。当任务级配置存在时,应覆盖全局配置;当任务级配置不存在时,应回退到全局配置。

Validates: Requirements 1.4, 2.3, 2.6, 4.2, 4.3, 4.6, 7.2

Property 5: 管道完成语义

对于任意一组成功的 API 请求无失败、无取消UnifiedPipeline.run() 返回时PipelineResult 中的 total_fetched 应等于所有请求返回的记录总数,且 total_inserted + total_updated + total_skipped 应等于 total_fetched。

Validates: Requirements 2.7

Property 6: WriteWorker 批量大小约束

对于任意配置的 batch_sizeWriteWorker 每次调用 write_fn 时传入的记录数不超过 batch_size。

Validates: Requirements 2.5

Property 7: CancellationToken 状态转换

对于任意 CancellationToken 实例,初始状态 is_cancelled 为 False调用 cancel() 后 is_cancelled 变为 True 且不可逆转。对于配置了超时的 CancellationToken在超时时间到达后 is_cancelled 自动变为 True。

Validates: Requirements 3.1, 3.6

Property 8: 取消后已入队数据不丢失

对于任意管道执行过程中触发取消信号的时刻,管道返回时:(a) 不再发送新的 API 请求,(b) 所有已提交到 processing_queue 的数据全部被处理完成,(c) 所有已处理完成的数据全部被写入数据库,(d) 返回结果的 status 为 CANCELLED 且 cancelled 为 True。

Validates: Requirements 3.2, 3.3, 3.4, 3.5

Property 9: 迁移前后输出等价

对于任意 ODS 任务和相同的输入数据API 响应序列),通过 UnifiedPipeline 执行后产生的数据库写入结果inserted/updated/skipped 计数和记录内容)应与迁移前的同步串行执行完全一致。

Validates: Requirements 5.1, 5.3, 5.4, 5.5

Property 10: Detail_Mode 可选性

对于任意 OdsTaskSpec当 detail_endpoint 为 None 时,管道执行应跳过详情拉取阶段,结果中 detail_success/detail_failure/detail_skipped 均为 0当 detail_endpoint 已配置时,管道应在列表拉取完成后执行详情拉取,且详情请求遵循与列表请求相同的限流规则。

Validates: Requirements 6.1, 6.3, 6.4

Property 11: 单项失败不中断整体

对于任意管道执行中的单个失败项API 请求失败、处理异常、详情接口错误、DWD 单表合并失败),管道应继续处理后续项目,不中断整体流程,且失败项被正确记录在结果的 errors 列表中。

Validates: Requirements 6.5, 9.1, 9.2, 7.3, 7.4

Property 12: 连续失败触发中断

对于任意管道执行,当连续失败次数超过 max_consecutive_failures 配置值时,管道应主动中断执行,返回结果的 status 为 FAILED。当连续失败次数未超过阈值时管道应继续执行。

Validates: Requirements 9.5

Property 13: 写入失败回滚当前批次

对于任意批量写入操作,当 write_fn 抛出数据库异常时,当前批次的事务应被回滚(不产生部分写入),该批次的记录被标记为写入失败,后续批次不受影响。

Validates: Requirements 9.3

Property 14: 结果统计完整性

对于任意管道执行(包括 Detail_Mode 和 DWD 多线程返回结果中的统计信息应完整且一致request_failures + processing_failures + write_failures 应等于 errors 列表的长度detail_success + detail_failure + detail_skipped 应等于详情请求总数DWD 汇总中成功表数 + 失败表数应等于总表数。

Validates: Requirements 6.6, 8.2, 9.4, 7.5

Property 15: 日志缓冲区按任务隔离

对于任意多个并发任务的日志流,每个 TaskLogBuffer 的 flush() 输出应仅包含该任务的日志条目,且按时间戳升序排列,不包含其他任务的日志。

Validates: Requirements 10.1, 10.4

Property 16: DWD 并行与串行结果一致

对于任意一组 DWD 表的 SCD2 合并操作,多线程并行执行的最终结果(每张表的 inserted/updated 计数)应与串行逐表执行的结果完全一致。

Validates: Requirements 7.1

6. 错误处理

6.1 错误分类与处理策略

错误类型 触发条件 处理方式 影响范围
API 请求失败 HTTP 错误、超时、API 返回错误码 APIClient 内置重试3 次指数退避);耗尽后记录错误,继续下一个请求 单个请求
处理异常 字段提取、hash 计算等抛出异常 捕获异常,记录错误日志(含记录标识),标记为处理失败,继续处理队列 单条记录
写入失败 数据库错误(约束冲突、连接断开等) 回滚当前批次事务,记录错误日志(含批次大小),标记为写入失败 单个批次
连续失败 连续 N 次请求/处理/写入失败 主动中断管道status=FAILED 整个任务
取消信号 外部触发 cancel_token.cancel() 停止新请求,等待已入队数据处理完成后退出 整个任务
配置错误 workers<1, rate_min>rate_max 等 构造时抛出 ValueError任务不启动 整个任务
DWD 单表失败 SCD2 合并过程中异常 回滚该表事务,记录错误,继续处理其他表 单张表

6.2 连续失败计数逻辑

consecutive_failures = 0

for request in requests:
    try:
        response = api_client.post(...)
        consecutive_failures = 0  # 成功则重置
    except Exception:
        consecutive_failures += 1
        if consecutive_failures >= config.max_consecutive_failures:
            result.status = "FAILED"
            break

6.3 事务管理

  • ODS 层每个窗口分段segment的数据在该分段全部处理完成后统一 commit分段失败时 rollback 该分段(保留现有语义)
  • DWD 层:每张表独立事务,单表失败 rollback 不影响其他表
  • WriteWorker每个批次独立事务批次失败 rollback 不影响后续批次

7. 测试策略

7.1 测试框架

  • 单元测试:pytestETL 模块内 tests/unit/
  • 属性测试:hypothesisMonorepo 级 tests/
  • 每个属性测试最少运行 100 次迭代

7.2 属性测试计划

每个正确性属性对应一个属性测试,使用 hypothesis 库实现:

属性 测试文件 生成器策略
P1: 请求严格串行 tests/test_pipeline_properties.py 生成随机请求序列,用 FakeAPI 记录时间戳
P2: RateLimiter 间隔范围 tests/test_rate_limiter_properties.py 生成随机 (min, max) 对,验证 wait() 时间
P3: PipelineConfig 构造 tests/test_pipeline_config_properties.py 生成随机配置参数组合(含无效值)
P4: 配置分层覆盖 tests/test_pipeline_config_properties.py 生成随机的多层配置字典
P5: 管道完成语义 tests/test_pipeline_properties.py 生成随机记录集,验证计数一致
P6: WriteWorker 批量约束 tests/test_pipeline_properties.py 生成随机 batch_size 和记录流
P7: CancellationToken 状态 tests/test_cancellation_properties.py 生成随机超时值
P8: 取消后数据不丢失 tests/test_pipeline_properties.py 生成随机请求序列 + 随机取消时刻
P9: 迁移等价 tests/test_migration_properties.py 生成随机 API 响应,对比新旧实现
P10: Detail_Mode 可选性 tests/test_detail_mode_properties.py 生成有/无 detail_endpoint 的 OdsTaskSpec
P11: 单项失败不中断 tests/test_pipeline_properties.py 生成含随机失败的请求序列
P12: 连续失败中断 tests/test_pipeline_properties.py 生成连续失败序列 + 随机阈值
P13: 写入失败回滚 tests/test_pipeline_properties.py 生成含随机写入失败的批次
P14: 结果统计完整性 tests/test_pipeline_properties.py 生成随机执行结果,验证计数一致性
P15: 日志缓冲区隔离 tests/test_log_buffer_properties.py 生成多任务随机日志流
P16: DWD 并行串行一致 tests/test_dwd_parallel_properties.py 生成随机表集合 + mock SCD2

每个测试必须包含注释标签:

# Feature: etl-unified-pipeline, Property 1: 请求严格串行

7.3 单元测试计划

单元测试聚焦于具体示例、边界条件和集成点:

测试目标 测试文件 覆盖内容
RateLimiter tests/unit/test_rate_limiter.py 边界min=max、取消中断、min>max 抛错
CancellationToken tests/unit/test_cancellation.py 边界:预取消、超时=0、dispose
PipelineConfig tests/unit/test_pipeline_config.py 边界无效参数、CLI 覆盖
UnifiedPipeline tests/unit/test_unified_pipeline.py 集成FakeAPI + FakeDB 端到端
TaskLogBuffer tests/unit/test_task_log_buffer.py 边界:空缓冲区、并发写入
DWD 多线程调度 tests/unit/test_dwd_parallel.py 集成mock SCD2 + 单表失败
Detail_Mode tests/unit/test_detail_mode.py 集成:列表→详情完整流程

7.4 测试环境

  • 单元测试使用 FakeDB/FakeAPI不涉及真实数据库连接
  • 属性测试使用 hypothesis 库,最少 100 次迭代
  • 集成测试(如需)使用 test_etl_feiqiu 测试库,通过 TEST_DB_DSN 连接