Files

18 KiB
Raw Permalink Blame History

技术设计:团购详情接口整合 ETL 数据流

对应需求文档:#file:.kiro/specs/etl-coupon-detail/requirements.md

1. 架构概览

ODS_GROUP_PACKAGE 任务BaseOdsTask + OdsTaskSpec
        │
        │  阶段 1列表拉取UnifiedPipeline #1
        │  QueryPackageCouponList → ods.group_buy_packages
        │
        ▼
  ods.group_buy_packages  ──写入完成──▶  阶段 2详情拉取自动启动
        │                                      │
        │                              ┌───────┴───────┐
        │                              │ UnifiedPipeline│
        │                              │     #2         │
        │                              │ (detail_mode)  │
        │                              └───────┬───────┘
        │                                      │
        │                              ┌───────▼───────┐
        │                              │  串行请求线程   │
        │                              │  (主线程)      │
        │                              │  RateLimiter   │
        │                              └───────┬───────┘
        │                                      │ 响应提交
        │                              ┌───────▼───────┐
        │                              │  处理队列      │
        │                              │  N 个工作线程  │
        │                              └───────┬───────┘
        │                                      │ 处理完成
        │                              ┌───────▼───────┐
        │                              │  写入队列      │
        │                              │  单线程写库    │
        │                              └───────┬───────┘
        │                                      │
        ▼                                      ▼
  DWD dim_groupbuy_package    ods.group_buy_package_details
  DWD dim_groupbuy_package_ex ◀── SCD2 合并 ──┘

不再新建独立的 DetailFetcher 类。详情拉取完全复用 BaseOdsTask 已有的 detail_endpoint 二级拉取模式,通过 OdsTaskSpec 声明式配置即可驱动。

2. 调研结论与设计决策

2.1 ODS 层表方案 → 选项 A新建独立表

决策依据:

  • 现有 ods.group_buy_packages 使用 OdsTaskSpec 驱动payload 存储列表接口的原始 JSONPK 为 id + content_hash
  • 详情接口返回嵌套结构(子数组 packageCouponAssistantsgrouponSiteInfos 等),与列表接口的扁平结构差异大
  • 两个接口的写入时机不同(列表先写完,详情后写),混在同一表会导致 SnapshotMode.FULL_TABLE 的软删除逻辑冲突
  • 独立表可以独立演进字段,不影响现有列表数据的稳定性

新表:ods.group_buy_package_detailsPK = coupon_idBIGINT全量快照覆盖

2.2 DWD 层表方案 → 选项 A在现有扩展表新增字段

决策依据:

  • dim_groupbuy_package_ex 当前 21 个业务字段(不含 SCD2密度适中
  • 详情接口的增量价值字段仅 4 个 JSONB 列(table_area_idstable_area_namesassistant_servicesgroupon_site_infos
  • 新建独立表会增加 SCD2 版本同步复杂度,且下游查询需要额外 JOIN
  • 扩展表的 SCD2 合并已与主表同步,新增字段自动纳入变更检测

2.3 取消信号 → 复用现有 CancellationToken

CancellationToken(封装 threading.Event)已在 etl-unified-pipeline 中实现。详情阶段的 UnifiedPipeline 实例共享同一个 cancel_token,在请求循环和 RateLimiter.wait() 中检查取消状态。

3. 组件设计

3.1 OdsTaskSpec 详情配置(声明式,无需新建类)

RateLimiterapi/rate_limiter.py)、CancellationTokenutils/cancellation.py)和 UnifiedPipelinepipeline/unified_pipeline.py)已在 etl-unified-pipeline 中实现并上线。BaseOdsTask.execute() 内置了 detail_endpoint 二级详情拉取模式,本 spec 通过 OdsTaskSpec 声明式配置驱动,不新建独立类。

tasks/ods/ods_tasks.pyODS_GROUP_PACKAGE 任务 spec 中添加以下配置:

OdsTaskSpec(
    code="ODS_GROUP_PACKAGE",
    # ... 现有列表拉取配置 ...
    
    # ── Detail_Mode 配置 ──
    detail_endpoint="/PackageCoupon/QueryPackageCouponInfo",
    detail_param_builder=lambda rec: {"couponId": rec["id"]},
    detail_target_table="ods.group_buy_package_details",
    detail_data_path=("data",),
    detail_id_column="id",
)

配置字段说明:

字段 说明
detail_endpoint /PackageCoupon/QueryPackageCouponInfo 详情接口 endpoint
detail_param_builder lambda rec: {"couponId": rec["id"]} 将列表表的 id 映射为详情接口的 couponId 参数
detail_target_table ods.group_buy_package_details 详情数据写入的目标表
detail_data_path ("data",) 详情响应的数据路径
detail_id_column id ods.group_buy_packages 提取 couponId 列表的列名

3.2 执行流程BaseOdsTask.execute() 内置)

BaseOdsTask.execute() 在列表拉取全部完成后,自动检测 spec.detail_endpoint 是否配置,若已配置则启动详情拉取阶段:

# BaseOdsTask.execute() 内置逻辑(已实现,无需修改):
if spec.detail_endpoint:
    # 1. 创建独立的 UnifiedPipeline 实例(共享 cancel_token
    detail_pipeline = UnifiedPipeline(
        api_client=self.api,
        db_connection=self.db,
        logger=self.logger,
        config=pipeline_config,       # PipelineConfig.from_app_config()
        cancel_token=cancel_token,    # 与列表阶段共享
    )
    
    # 2. 从 ODS 目标表查询 ID 列表,生成详情请求序列
    detail_requests = self._build_detail_requests(spec)
    #    → SELECT DISTINCT {detail_id_column} FROM {table_name}
    #    → 对每个 ID 调用 detail_param_builder 构造参数
    #    → yield PipelineRequest(is_detail=True, detail_id=record_id)
    
    # 3. 构建处理和写入函数
    detail_process_fn = self._build_detail_process_fn(spec)
    detail_write_fn = self._build_detail_write_fn(spec, source_file)
    #    → 写入 detail_target_table使用 _insert_records_schema_aware()
    
    # 4. 执行管道
    detail_result = detail_pipeline.run(
        detail_requests, detail_process_fn, detail_write_fn,
    )
    self.db.commit()

3.3 详情响应处理(需自定义 process_fn

默认的 _build_detail_process_fnresponse.get("records", []) 提取记录。对于团购详情接口,需要自定义字段提取逻辑:

  • data.groupPurchasePackage 提取结构化字段(package_namedurationstart_timeend_time 等)
  • data.groupPurchasePackage.tableAreaId / tableAreaNameList 提取台区数组为 JSONB
  • data.packageCouponAssistants 提取助教服务关联数组为 JSONB
  • data.grouponSiteInfos 提取关联门店数组为 JSONB
  • data.packagePackageServicedata.packageCouponDetailsList 提取为 JSONB
  • 计算 content_hash,保留完整原始响应为 payload

实现方式:在 ODS_GROUP_PACKAGE 任务中覆盖 _build_detail_process_fn,或在 OdsTaskSpec 中扩展 detail_process_fn 回调。

3.4 详情数据写入(复用 _insert_records_schema_aware

_build_detail_write_fn 已内置调用 _insert_records_schema_aware(),按目标表结构动态写入,支持 ON CONFLICT UPSERT。写入目标为 ods.group_buy_package_detailsPK = coupon_id

3.5 DWD 加载扩展

文件:apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py

_merge_dim_scd2() 处理 dim_groupbuy_package_ex 时,需要额外从 ods.group_buy_package_details 读取详情字段并合并到 ODS 快照中:

# 伪代码:在 DWD 加载 dim_groupbuy_package_ex 时
def _load_groupbuy_package_ex(self, cur, now):
    # 1. 从 ods.group_buy_packages 读取列表数据(现有逻辑)
    # 2. 从 ods.group_buy_package_details 读取详情数据
    # 3. 通过 coupon_id = id 关联,将详情字段合并到 ODS 快照
    # 4. 执行 SCD2 合并(现有 _merge_dim_scd2 逻辑)

新增字段映射(ods.group_buy_package_detailsdwd.dim_groupbuy_package_ex

ODS 详情字段 DWD 扩展表字段 类型
table_area_ids table_area_ids JSONB
table_area_names table_area_names JSONB
assistant_services assistant_services JSONB
groupon_site_infos groupon_site_infos JSONB

4. 数据库变更

4.1 新建 ODS 表

-- db/etl_feiqiu/ods/group_buy_package_details.sql
CREATE TABLE IF NOT EXISTS ods.group_buy_package_details (
    coupon_id           BIGINT       NOT NULL,
    package_name        TEXT,
    duration            INTEGER,          -- 台费计时时长(秒)
    start_time          TIMESTAMPTZ,      -- 可用日期开始
    end_time            TIMESTAMPTZ,      -- 可用日期结束
    add_start_clock     TEXT,             -- 可用时段开始
    add_end_clock       TEXT,             -- 可用时段结束
    is_enabled          INTEGER,
    is_delete           INTEGER,
    site_id             BIGINT,
    tenant_id           BIGINT,
    create_time         TIMESTAMPTZ,
    creator_name        TEXT,
    -- JSONB 数组字段
    table_area_ids      JSONB,            -- [2791960001957765, ...]
    table_area_names    JSONB,            -- ["A区", ...]
    assistant_services  JSONB,            -- [{skillId, assistantLevel, assistantDuration}, ...]
    groupon_site_infos  JSONB,            -- [{siteId, siteName}, ...]
    package_services    JSONB,            -- 待调研,可能为空
    coupon_details_list JSONB,            -- 待调研,可能为空
    -- ETL 元数据
    content_hash        TEXT,
    payload             JSONB,            -- 完整原始响应
    fetched_at          TIMESTAMPTZ       DEFAULT now(),
    -- 主键
    CONSTRAINT pk_group_buy_package_details PRIMARY KEY (coupon_id)
);

COMMENT ON TABLE ods.group_buy_package_details IS '团购套餐详情 ODSQueryPackageCouponInfo 原始数据';

4.2 DWD 扩展表 ALTER

-- db/etl_feiqiu/migrations/2026-03-05__add_detail_fields_to_dim_groupbuy_package_ex.sql
ALTER TABLE dwd.dim_groupbuy_package_ex
    ADD COLUMN IF NOT EXISTS table_area_ids      JSONB,
    ADD COLUMN IF NOT EXISTS table_area_names    JSONB,
    ADD COLUMN IF NOT EXISTS assistant_services  JSONB,
    ADD COLUMN IF NOT EXISTS groupon_site_infos  JSONB;

COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.table_area_ids IS '可用台区 ID 列表(来自详情接口 tableAreaId';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.table_area_names IS '可用台区名称列表(来自详情接口 tableAreaNameList';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.assistant_services IS '助教服务关联(来自详情接口 packageCouponAssistants';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.groupon_site_infos IS '关联门店信息(来自详情接口 grouponSiteInfos';

5. 线程模型详细设计

详情阶段复用 UnifiedPipeline 的三层并发架构,与列表阶段完全一致:

UnifiedPipeline #2detail_mode
    │
    │  主线程_request_loop
    │  ┌─────────────────────────────────────────┐
    │  │ for req in _build_detail_requests(spec): │
    │  │     if cancel_token.is_cancelled: break  │
    │  │     resp = api.post(req.endpoint, params) │
    │  │     processing_queue.put((req, resp))     │
    │  │     rate_limiter.wait(cancel_token.event) │
    │  └─────────────────┬───────────────────────┘
    │                    │
    │  processing_queue.put(SENTINEL × worker_count)
    │  等待所有 worker 完成
    │  write_queue.put(SENTINEL)
    │  等待 writer 完成
    │
    ├──▶ Worker Thread 1 ──┐
    ├──▶ Worker Thread 2 ──┤
    │                      │
    │   processing_queue   │
    │   ┌─────────────┐    │
    │   │ (req, resp)  │───▶ detail_process_fn(resp)
    │   │ (req, resp)  │     → 提取字段、计算 content_hash
    │   │ SENTINEL     │     write_queue.put(records)
    │   └─────────────┘    │
    │                      │
    │                      ▼
    │              Write Thread
    │              ┌──────────────────┐
    │              │ write_queue       │
    │              │ batch=batch_size  │──▶ _insert_records_schema_aware()
    │              │ timeout=5s       │     → UPSERT ods.group_buy_package_details
    │              │ SENTINEL         │
    │              └──────────────────┘
    │
    ▼
PipelineResultdetail_success / detail_failure / detail_skipped

关键设计点(均由 PipelineConfig 统一控制,支持任务级覆盖):

  • processing_queuequeue.Queue(maxsize=queue_size),满时阻塞主线程(背压机制)
  • write_queuequeue.Queue(maxsize=queue_size * 2)
  • Worker 数量:PipelineConfig.workers(默认 2
  • Writer 批量写入:累积 batch_size 条或超时 batch_timeout 秒后执行
  • SENTINELNone 对象,用于通知线程退出
  • 取消信号:主线程检查 cancel_token.is_cancelledRateLimiter.wait() 轮询 cancel_token.event

6. 取消信号

详情阶段的 UnifiedPipeline 实例与列表阶段共享同一个 CancellationToken。取消信号的上游传递链Admin-web → Backend → Orchestration → CancellationToken属于 etl-unified-pipeline 的架构范畴,本 spec 仅关注详情阶段收到取消信号后的行为:

  1. _request_loop 检测到 cancel_token.is_cancelled,停止发送新请求
  2. RateLimiter.wait() 在 0.5s 轮询周期内检测到取消,立即返回 False
  3. 主线程发送 SENTINEL 到处理队列,等待已入队数据处理完成
  4. PipelineResult.cancelled = TrueBaseOdsTask 据此设置任务状态

7. 错误处理策略

错误处理由 UnifiedPipeline 统一管理,各阶段行为如下:

场景 处理方式
单个 couponId 请求超时/HTTP 错误 _request_loop 捕获异常,request_failures++,继续下一个
单个 couponId 返回 code != 0 同上API 层异常)
连续失败超过阈值 _request_loop 中断,PipelineResult.status = "FAILED"
Worker 线程处理异常 _process_worker 捕获异常,processing_failures++,继续消费队列
Writer 线程写入失败 _write_worker 捕获异常,write_failures++,继续消费队列
取消信号到达 停止新请求,等待已入队数据处理完成,cancelled = True

BaseOdsTask.execute() 在详情阶段完成后,将 detail_result 的统计信息合并到任务结果中,并记录每个失败项的错误日志。

连续失败阈值:PipelineConfig.max_consecutive_failures(默认 10支持 pipeline.ods_group_package.max_consecutive_failures 任务级覆盖)。

8. 配置参数

详情阶段复用 PipelineConfig 统一配置体系,支持三级回退:pipeline.ods_group_package.<key>pipeline.<key> → 硬编码默认值。

配置键 默认值 说明
pipeline.workers 2 处理线程数
pipeline.queue_size 100 处理队列容量
pipeline.batch_size 100 写入批量阈值
pipeline.batch_timeout 5.0 写入等待超时(秒)
pipeline.rate_min 5.0 RateLimiter 最小间隔(秒)
pipeline.rate_max 20.0 RateLimiter 最大间隔(秒)
pipeline.max_consecutive_failures 10 连续失败中断阈值

如需为详情阶段单独调参,可通过 pipeline.ods_group_package.* 任务级覆盖(列表和详情阶段共享同一 PipelineConfig 实例)。

不再需要独立的 DETAIL_FETCH_* 配置参数。

9. 实施任务清单

Task 1新建 ODS 详情表 DDL

  • 创建 db/etl_feiqiu/ods/group_buy_package_details.sql
  • 执行 DDL 到测试库
  • 需求覆盖:需求 3 验收标准 1-4

Task 2扩展 ODS_GROUP_PACKAGE 任务 — 配置详情拉取

  • tasks/ods/ods_tasks.pyODS_GROUP_PACKAGE OdsTaskSpec 中添加 detail_endpoint 等配置
  • 实现自定义的 _build_detail_process_fn 字段提取逻辑
  • 实现自定义的 _build_detail_write_fn 写入逻辑
  • 复用 BaseOdsTask.execute() 已有的详情拉取流程(UnifiedPipeline + RateLimiter + CancellationToken
  • 需求覆盖:需求 1 验收标准 1-8需求 2 验收标准 1-6需求 5 验收标准 1-4

Task 3DWD 扩展表 ALTER + 加载逻辑

  • 执行 ALTER TABLE 到测试库
  • 修改 DWD 加载逻辑,从详情 ODS 表读取并合并到扩展表
  • 需求覆盖:需求 4 验收标准 1-5

Task 4数据调研 — 获取全部团购详情并分析未标注字段

  • 编写一次性脚本调用详情接口获取全部数据
  • 分析未标注字段的值分布
  • 确认 packagePackageServicepackageCouponDetailsList 是否有数据
  • 根据分析结果调整 ODS/DWD 字段定义
  • 需求覆盖:需求 3 验收标准 6附录 B 调研 3、4

Task 5文档同步更新

  • 更新 ODS DDL 文档、字段映射文档
  • 更新 BD Manual
  • 更新 DWD 全景文档
  • 更新 README 任务清单
  • 需求覆盖:需求 6 验收标准 1-4