18 KiB
技术设计:团购详情接口整合 ETL 数据流
1. 架构概览
ODS_GROUP_PACKAGE 任务(BaseOdsTask + OdsTaskSpec)
│
│ 阶段 1:列表拉取(UnifiedPipeline #1)
│ QueryPackageCouponList → ods.group_buy_packages
│
▼
ods.group_buy_packages ──写入完成──▶ 阶段 2:详情拉取自动启动
│ │
│ ┌───────┴───────┐
│ │ UnifiedPipeline│
│ │ #2 │
│ │ (detail_mode) │
│ └───────┬───────┘
│ │
│ ┌───────▼───────┐
│ │ 串行请求线程 │
│ │ (主线程) │
│ │ RateLimiter │
│ └───────┬───────┘
│ │ 响应提交
│ ┌───────▼───────┐
│ │ 处理队列 │
│ │ N 个工作线程 │
│ └───────┬───────┘
│ │ 处理完成
│ ┌───────▼───────┐
│ │ 写入队列 │
│ │ 单线程写库 │
│ └───────┬───────┘
│ │
▼ ▼
DWD dim_groupbuy_package ods.group_buy_package_details
DWD dim_groupbuy_package_ex ◀── SCD2 合并 ──┘
不再新建独立的
DetailFetcher类。详情拉取完全复用BaseOdsTask已有的detail_endpoint二级拉取模式,通过OdsTaskSpec声明式配置即可驱动。
2. 调研结论与设计决策
2.1 ODS 层表方案 → 选项 A:新建独立表
决策依据:
- 现有
ods.group_buy_packages使用OdsTaskSpec驱动,payload 存储列表接口的原始 JSON,PK 为id + content_hash - 详情接口返回嵌套结构(子数组
packageCouponAssistants、grouponSiteInfos等),与列表接口的扁平结构差异大 - 两个接口的写入时机不同(列表先写完,详情后写),混在同一表会导致
SnapshotMode.FULL_TABLE的软删除逻辑冲突 - 独立表可以独立演进字段,不影响现有列表数据的稳定性
新表:ods.group_buy_package_details,PK = coupon_id(BIGINT),全量快照覆盖
2.2 DWD 层表方案 → 选项 A:在现有扩展表新增字段
决策依据:
dim_groupbuy_package_ex当前 21 个业务字段(不含 SCD2),密度适中- 详情接口的增量价值字段仅 4 个 JSONB 列(
table_area_ids、table_area_names、assistant_services、groupon_site_infos) - 新建独立表会增加 SCD2 版本同步复杂度,且下游查询需要额外 JOIN
- 扩展表的 SCD2 合并已与主表同步,新增字段自动纳入变更检测
2.3 取消信号 → 复用现有 CancellationToken
CancellationToken(封装 threading.Event)已在 etl-unified-pipeline 中实现。详情阶段的 UnifiedPipeline 实例共享同一个 cancel_token,在请求循环和 RateLimiter.wait() 中检查取消状态。
3. 组件设计
3.1 OdsTaskSpec 详情配置(声明式,无需新建类)
RateLimiter(api/rate_limiter.py)、CancellationToken(utils/cancellation.py)和UnifiedPipeline(pipeline/unified_pipeline.py)已在 etl-unified-pipeline 中实现并上线。BaseOdsTask.execute()内置了detail_endpoint二级详情拉取模式,本 spec 通过OdsTaskSpec声明式配置驱动,不新建独立类。
在 tasks/ods/ods_tasks.py 的 ODS_GROUP_PACKAGE 任务 spec 中添加以下配置:
OdsTaskSpec(
code="ODS_GROUP_PACKAGE",
# ... 现有列表拉取配置 ...
# ── Detail_Mode 配置 ──
detail_endpoint="/PackageCoupon/QueryPackageCouponInfo",
detail_param_builder=lambda rec: {"couponId": rec["id"]},
detail_target_table="ods.group_buy_package_details",
detail_data_path=("data",),
detail_id_column="id",
)
配置字段说明:
| 字段 | 值 | 说明 |
|---|---|---|
detail_endpoint |
/PackageCoupon/QueryPackageCouponInfo |
详情接口 endpoint |
detail_param_builder |
lambda rec: {"couponId": rec["id"]} |
将列表表的 id 映射为详情接口的 couponId 参数 |
detail_target_table |
ods.group_buy_package_details |
详情数据写入的目标表 |
detail_data_path |
("data",) |
详情响应的数据路径 |
detail_id_column |
id |
从 ods.group_buy_packages 提取 couponId 列表的列名 |
3.2 执行流程(BaseOdsTask.execute() 内置)
BaseOdsTask.execute() 在列表拉取全部完成后,自动检测 spec.detail_endpoint 是否配置,若已配置则启动详情拉取阶段:
# BaseOdsTask.execute() 内置逻辑(已实现,无需修改):
if spec.detail_endpoint:
# 1. 创建独立的 UnifiedPipeline 实例(共享 cancel_token)
detail_pipeline = UnifiedPipeline(
api_client=self.api,
db_connection=self.db,
logger=self.logger,
config=pipeline_config, # PipelineConfig.from_app_config()
cancel_token=cancel_token, # 与列表阶段共享
)
# 2. 从 ODS 目标表查询 ID 列表,生成详情请求序列
detail_requests = self._build_detail_requests(spec)
# → SELECT DISTINCT {detail_id_column} FROM {table_name}
# → 对每个 ID 调用 detail_param_builder 构造参数
# → yield PipelineRequest(is_detail=True, detail_id=record_id)
# 3. 构建处理和写入函数
detail_process_fn = self._build_detail_process_fn(spec)
detail_write_fn = self._build_detail_write_fn(spec, source_file)
# → 写入 detail_target_table,使用 _insert_records_schema_aware()
# 4. 执行管道
detail_result = detail_pipeline.run(
detail_requests, detail_process_fn, detail_write_fn,
)
self.db.commit()
3.3 详情响应处理(需自定义 process_fn)
默认的 _build_detail_process_fn 从 response.get("records", []) 提取记录。对于团购详情接口,需要自定义字段提取逻辑:
- 从
data.groupPurchasePackage提取结构化字段(package_name、duration、start_time、end_time等) - 从
data.groupPurchasePackage.tableAreaId/tableAreaNameList提取台区数组为 JSONB - 从
data.packageCouponAssistants提取助教服务关联数组为 JSONB - 从
data.grouponSiteInfos提取关联门店数组为 JSONB - 从
data.packagePackageService和data.packageCouponDetailsList提取为 JSONB - 计算
content_hash,保留完整原始响应为payload
实现方式:在 ODS_GROUP_PACKAGE 任务中覆盖 _build_detail_process_fn,或在 OdsTaskSpec 中扩展 detail_process_fn 回调。
3.4 详情数据写入(复用 _insert_records_schema_aware)
_build_detail_write_fn 已内置调用 _insert_records_schema_aware(),按目标表结构动态写入,支持 ON CONFLICT UPSERT。写入目标为 ods.group_buy_package_details,PK = coupon_id。
3.5 DWD 加载扩展
文件:apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py
在 _merge_dim_scd2() 处理 dim_groupbuy_package_ex 时,需要额外从 ods.group_buy_package_details 读取详情字段并合并到 ODS 快照中:
# 伪代码:在 DWD 加载 dim_groupbuy_package_ex 时
def _load_groupbuy_package_ex(self, cur, now):
# 1. 从 ods.group_buy_packages 读取列表数据(现有逻辑)
# 2. 从 ods.group_buy_package_details 读取详情数据
# 3. 通过 coupon_id = id 关联,将详情字段合并到 ODS 快照
# 4. 执行 SCD2 合并(现有 _merge_dim_scd2 逻辑)
新增字段映射(ods.group_buy_package_details → dwd.dim_groupbuy_package_ex):
| ODS 详情字段 | DWD 扩展表字段 | 类型 |
|---|---|---|
table_area_ids |
table_area_ids |
JSONB |
table_area_names |
table_area_names |
JSONB |
assistant_services |
assistant_services |
JSONB |
groupon_site_infos |
groupon_site_infos |
JSONB |
4. 数据库变更
4.1 新建 ODS 表
-- db/etl_feiqiu/ods/group_buy_package_details.sql
CREATE TABLE IF NOT EXISTS ods.group_buy_package_details (
coupon_id BIGINT NOT NULL,
package_name TEXT,
duration INTEGER, -- 台费计时时长(秒)
start_time TIMESTAMPTZ, -- 可用日期开始
end_time TIMESTAMPTZ, -- 可用日期结束
add_start_clock TEXT, -- 可用时段开始
add_end_clock TEXT, -- 可用时段结束
is_enabled INTEGER,
is_delete INTEGER,
site_id BIGINT,
tenant_id BIGINT,
create_time TIMESTAMPTZ,
creator_name TEXT,
-- JSONB 数组字段
table_area_ids JSONB, -- [2791960001957765, ...]
table_area_names JSONB, -- ["A区", ...]
assistant_services JSONB, -- [{skillId, assistantLevel, assistantDuration}, ...]
groupon_site_infos JSONB, -- [{siteId, siteName}, ...]
package_services JSONB, -- 待调研,可能为空
coupon_details_list JSONB, -- 待调研,可能为空
-- ETL 元数据
content_hash TEXT,
payload JSONB, -- 完整原始响应
fetched_at TIMESTAMPTZ DEFAULT now(),
-- 主键
CONSTRAINT pk_group_buy_package_details PRIMARY KEY (coupon_id)
);
COMMENT ON TABLE ods.group_buy_package_details IS '团购套餐详情 ODS:QueryPackageCouponInfo 原始数据';
4.2 DWD 扩展表 ALTER
-- db/etl_feiqiu/migrations/2026-03-05__add_detail_fields_to_dim_groupbuy_package_ex.sql
ALTER TABLE dwd.dim_groupbuy_package_ex
ADD COLUMN IF NOT EXISTS table_area_ids JSONB,
ADD COLUMN IF NOT EXISTS table_area_names JSONB,
ADD COLUMN IF NOT EXISTS assistant_services JSONB,
ADD COLUMN IF NOT EXISTS groupon_site_infos JSONB;
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.table_area_ids IS '可用台区 ID 列表(来自详情接口 tableAreaId)';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.table_area_names IS '可用台区名称列表(来自详情接口 tableAreaNameList)';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.assistant_services IS '助教服务关联(来自详情接口 packageCouponAssistants)';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.groupon_site_infos IS '关联门店信息(来自详情接口 grouponSiteInfos)';
5. 线程模型详细设计
详情阶段复用 UnifiedPipeline 的三层并发架构,与列表阶段完全一致:
UnifiedPipeline #2(detail_mode)
│
│ 主线程(_request_loop)
│ ┌─────────────────────────────────────────┐
│ │ for req in _build_detail_requests(spec): │
│ │ if cancel_token.is_cancelled: break │
│ │ resp = api.post(req.endpoint, params) │
│ │ processing_queue.put((req, resp)) │
│ │ rate_limiter.wait(cancel_token.event) │
│ └─────────────────┬───────────────────────┘
│ │
│ processing_queue.put(SENTINEL × worker_count)
│ 等待所有 worker 完成
│ write_queue.put(SENTINEL)
│ 等待 writer 完成
│
├──▶ Worker Thread 1 ──┐
├──▶ Worker Thread 2 ──┤
│ │
│ processing_queue │
│ ┌─────────────┐ │
│ │ (req, resp) │───▶ detail_process_fn(resp)
│ │ (req, resp) │ → 提取字段、计算 content_hash
│ │ SENTINEL │ write_queue.put(records)
│ └─────────────┘ │
│ │
│ ▼
│ Write Thread
│ ┌──────────────────┐
│ │ write_queue │
│ │ batch=batch_size │──▶ _insert_records_schema_aware()
│ │ timeout=5s │ → UPSERT ods.group_buy_package_details
│ │ SENTINEL │
│ └──────────────────┘
│
▼
PipelineResult(detail_success / detail_failure / detail_skipped)
关键设计点(均由 PipelineConfig 统一控制,支持任务级覆盖):
processing_queue:queue.Queue(maxsize=queue_size),满时阻塞主线程(背压机制)write_queue:queue.Queue(maxsize=queue_size * 2)- Worker 数量:
PipelineConfig.workers(默认 2) - Writer 批量写入:累积
batch_size条或超时batch_timeout秒后执行 - SENTINEL:
None对象,用于通知线程退出 - 取消信号:主线程检查
cancel_token.is_cancelled,RateLimiter.wait()轮询cancel_token.event
6. 取消信号
详情阶段的 UnifiedPipeline 实例与列表阶段共享同一个 CancellationToken。取消信号的上游传递链(Admin-web → Backend → Orchestration → CancellationToken)属于 etl-unified-pipeline 的架构范畴,本 spec 仅关注详情阶段收到取消信号后的行为:
_request_loop检测到cancel_token.is_cancelled,停止发送新请求RateLimiter.wait()在 0.5s 轮询周期内检测到取消,立即返回False- 主线程发送 SENTINEL 到处理队列,等待已入队数据处理完成
PipelineResult.cancelled = True,BaseOdsTask据此设置任务状态
7. 错误处理策略
错误处理由 UnifiedPipeline 统一管理,各阶段行为如下:
| 场景 | 处理方式 |
|---|---|
| 单个 couponId 请求超时/HTTP 错误 | _request_loop 捕获异常,request_failures++,继续下一个 |
单个 couponId 返回 code != 0 |
同上(API 层异常) |
| 连续失败超过阈值 | _request_loop 中断,PipelineResult.status = "FAILED" |
| Worker 线程处理异常 | _process_worker 捕获异常,processing_failures++,继续消费队列 |
| Writer 线程写入失败 | _write_worker 捕获异常,write_failures++,继续消费队列 |
| 取消信号到达 | 停止新请求,等待已入队数据处理完成,cancelled = True |
BaseOdsTask.execute() 在详情阶段完成后,将 detail_result 的统计信息合并到任务结果中,并记录每个失败项的错误日志。
连续失败阈值:PipelineConfig.max_consecutive_failures(默认 10,支持 pipeline.ods_group_package.max_consecutive_failures 任务级覆盖)。
8. 配置参数
详情阶段复用 PipelineConfig 统一配置体系,支持三级回退:pipeline.ods_group_package.<key> → pipeline.<key> → 硬编码默认值。
| 配置键 | 默认值 | 说明 |
|---|---|---|
pipeline.workers |
2 | 处理线程数 |
pipeline.queue_size |
100 | 处理队列容量 |
pipeline.batch_size |
100 | 写入批量阈值 |
pipeline.batch_timeout |
5.0 | 写入等待超时(秒) |
pipeline.rate_min |
5.0 | RateLimiter 最小间隔(秒) |
pipeline.rate_max |
20.0 | RateLimiter 最大间隔(秒) |
pipeline.max_consecutive_failures |
10 | 连续失败中断阈值 |
如需为详情阶段单独调参,可通过 pipeline.ods_group_package.* 任务级覆盖(列表和详情阶段共享同一 PipelineConfig 实例)。
不再需要独立的
DETAIL_FETCH_*配置参数。
9. 实施任务清单
Task 1:新建 ODS 详情表 DDL
- 创建
db/etl_feiqiu/ods/group_buy_package_details.sql - 执行 DDL 到测试库
- 需求覆盖:需求 3 验收标准 1-4
Task 2:扩展 ODS_GROUP_PACKAGE 任务 — 配置详情拉取
- 在
tasks/ods/ods_tasks.py的ODS_GROUP_PACKAGEOdsTaskSpec 中添加detail_endpoint等配置 - 实现自定义的
_build_detail_process_fn字段提取逻辑 - 实现自定义的
_build_detail_write_fn写入逻辑 - 复用
BaseOdsTask.execute()已有的详情拉取流程(UnifiedPipeline+RateLimiter+CancellationToken) - 需求覆盖:需求 1 验收标准 1-8,需求 2 验收标准 1-6,需求 5 验收标准 1-4
Task 3:DWD 扩展表 ALTER + 加载逻辑
- 执行 ALTER TABLE 到测试库
- 修改 DWD 加载逻辑,从详情 ODS 表读取并合并到扩展表
- 需求覆盖:需求 4 验收标准 1-5
Task 4:数据调研 — 获取全部团购详情并分析未标注字段
- 编写一次性脚本调用详情接口获取全部数据
- 分析未标注字段的值分布
- 确认
packagePackageService和packageCouponDetailsList是否有数据 - 根据分析结果调整 ODS/DWD 字段定义
- 需求覆盖:需求 3 验收标准 6,附录 B 调研 3、4
Task 5:文档同步更新
- 更新 ODS DDL 文档、字段映射文档
- 更新 BD Manual
- 更新 DWD 全景文档
- 更新 README 任务清单
- 需求覆盖:需求 6 验收标准 1-4