Files

358 lines
18 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 技术设计:团购详情接口整合 ETL 数据流
> 对应需求文档:#[[file:.kiro/specs/etl-coupon-detail/requirements.md]]
## 1. 架构概览
```
ODS_GROUP_PACKAGE 任务BaseOdsTask + OdsTaskSpec
│ 阶段 1列表拉取UnifiedPipeline #1
│ QueryPackageCouponList → ods.group_buy_packages
ods.group_buy_packages ──写入完成──▶ 阶段 2详情拉取自动启动
│ │
│ ┌───────┴───────┐
│ │ UnifiedPipeline│
│ │ #2 │
│ │ (detail_mode) │
│ └───────┬───────┘
│ │
│ ┌───────▼───────┐
│ │ 串行请求线程 │
│ │ (主线程) │
│ │ RateLimiter │
│ └───────┬───────┘
│ │ 响应提交
│ ┌───────▼───────┐
│ │ 处理队列 │
│ │ N 个工作线程 │
│ └───────┬───────┘
│ │ 处理完成
│ ┌───────▼───────┐
│ │ 写入队列 │
│ │ 单线程写库 │
│ └───────┬───────┘
│ │
▼ ▼
DWD dim_groupbuy_package ods.group_buy_package_details
DWD dim_groupbuy_package_ex ◀── SCD2 合并 ──┘
```
> 不再新建独立的 `DetailFetcher` 类。详情拉取完全复用 `BaseOdsTask` 已有的 `detail_endpoint` 二级拉取模式,通过 `OdsTaskSpec` 声明式配置即可驱动。
## 2. 调研结论与设计决策
### 2.1 ODS 层表方案 → 选项 A新建独立表
决策依据:
- 现有 `ods.group_buy_packages` 使用 `OdsTaskSpec` 驱动payload 存储列表接口的原始 JSONPK 为 `id + content_hash`
- 详情接口返回嵌套结构(子数组 `packageCouponAssistants``grouponSiteInfos` 等),与列表接口的扁平结构差异大
- 两个接口的写入时机不同(列表先写完,详情后写),混在同一表会导致 `SnapshotMode.FULL_TABLE` 的软删除逻辑冲突
- 独立表可以独立演进字段,不影响现有列表数据的稳定性
新表:`ods.group_buy_package_details`PK = `coupon_id`BIGINT全量快照覆盖
### 2.2 DWD 层表方案 → 选项 A在现有扩展表新增字段
决策依据:
- `dim_groupbuy_package_ex` 当前 21 个业务字段(不含 SCD2密度适中
- 详情接口的增量价值字段仅 4 个 JSONB 列(`table_area_ids``table_area_names``assistant_services``groupon_site_infos`
- 新建独立表会增加 SCD2 版本同步复杂度,且下游查询需要额外 JOIN
- 扩展表的 SCD2 合并已与主表同步,新增字段自动纳入变更检测
### 2.3 取消信号 → 复用现有 `CancellationToken`
`CancellationToken`(封装 `threading.Event`)已在 etl-unified-pipeline 中实现。详情阶段的 `UnifiedPipeline` 实例共享同一个 `cancel_token`,在请求循环和 `RateLimiter.wait()` 中检查取消状态。
## 3. 组件设计
### 3.1 OdsTaskSpec 详情配置(声明式,无需新建类)
> `RateLimiter``api/rate_limiter.py`)、`CancellationToken``utils/cancellation.py`)和 `UnifiedPipeline``pipeline/unified_pipeline.py`)已在 etl-unified-pipeline 中实现并上线。`BaseOdsTask.execute()` 内置了 `detail_endpoint` 二级详情拉取模式,本 spec 通过 `OdsTaskSpec` 声明式配置驱动,不新建独立类。
`tasks/ods/ods_tasks.py``ODS_GROUP_PACKAGE` 任务 spec 中添加以下配置:
```python
OdsTaskSpec(
code="ODS_GROUP_PACKAGE",
# ... 现有列表拉取配置 ...
# ── Detail_Mode 配置 ──
detail_endpoint="/PackageCoupon/QueryPackageCouponInfo",
detail_param_builder=lambda rec: {"couponId": rec["id"]},
detail_target_table="ods.group_buy_package_details",
detail_data_path=("data",),
detail_id_column="id",
)
```
配置字段说明:
| 字段 | 值 | 说明 |
|------|-----|------|
| `detail_endpoint` | `/PackageCoupon/QueryPackageCouponInfo` | 详情接口 endpoint |
| `detail_param_builder` | `lambda rec: {"couponId": rec["id"]}` | 将列表表的 `id` 映射为详情接口的 `couponId` 参数 |
| `detail_target_table` | `ods.group_buy_package_details` | 详情数据写入的目标表 |
| `detail_data_path` | `("data",)` | 详情响应的数据路径 |
| `detail_id_column` | `id` | 从 `ods.group_buy_packages` 提取 couponId 列表的列名 |
### 3.2 执行流程BaseOdsTask.execute() 内置)
`BaseOdsTask.execute()` 在列表拉取全部完成后,自动检测 `spec.detail_endpoint` 是否配置,若已配置则启动详情拉取阶段:
```python
# BaseOdsTask.execute() 内置逻辑(已实现,无需修改):
if spec.detail_endpoint:
# 1. 创建独立的 UnifiedPipeline 实例(共享 cancel_token
detail_pipeline = UnifiedPipeline(
api_client=self.api,
db_connection=self.db,
logger=self.logger,
config=pipeline_config, # PipelineConfig.from_app_config()
cancel_token=cancel_token, # 与列表阶段共享
)
# 2. 从 ODS 目标表查询 ID 列表,生成详情请求序列
detail_requests = self._build_detail_requests(spec)
# → SELECT DISTINCT {detail_id_column} FROM {table_name}
# → 对每个 ID 调用 detail_param_builder 构造参数
# → yield PipelineRequest(is_detail=True, detail_id=record_id)
# 3. 构建处理和写入函数
detail_process_fn = self._build_detail_process_fn(spec)
detail_write_fn = self._build_detail_write_fn(spec, source_file)
# → 写入 detail_target_table使用 _insert_records_schema_aware()
# 4. 执行管道
detail_result = detail_pipeline.run(
detail_requests, detail_process_fn, detail_write_fn,
)
self.db.commit()
```
### 3.3 详情响应处理(需自定义 process_fn
默认的 `_build_detail_process_fn``response.get("records", [])` 提取记录。对于团购详情接口,需要自定义字段提取逻辑:
-`data.groupPurchasePackage` 提取结构化字段(`package_name``duration``start_time``end_time` 等)
-`data.groupPurchasePackage.tableAreaId` / `tableAreaNameList` 提取台区数组为 JSONB
-`data.packageCouponAssistants` 提取助教服务关联数组为 JSONB
-`data.grouponSiteInfos` 提取关联门店数组为 JSONB
-`data.packagePackageService``data.packageCouponDetailsList` 提取为 JSONB
- 计算 `content_hash`,保留完整原始响应为 `payload`
实现方式:在 `ODS_GROUP_PACKAGE` 任务中覆盖 `_build_detail_process_fn`,或在 `OdsTaskSpec` 中扩展 `detail_process_fn` 回调。
### 3.4 详情数据写入(复用 _insert_records_schema_aware
`_build_detail_write_fn` 已内置调用 `_insert_records_schema_aware()`,按目标表结构动态写入,支持 ON CONFLICT UPSERT。写入目标为 `ods.group_buy_package_details`PK = `coupon_id`
### 3.5 DWD 加载扩展
文件:`apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py`
`_merge_dim_scd2()` 处理 `dim_groupbuy_package_ex` 时,需要额外从 `ods.group_buy_package_details` 读取详情字段并合并到 ODS 快照中:
```python
# 伪代码:在 DWD 加载 dim_groupbuy_package_ex 时
def _load_groupbuy_package_ex(self, cur, now):
# 1. 从 ods.group_buy_packages 读取列表数据(现有逻辑)
# 2. 从 ods.group_buy_package_details 读取详情数据
# 3. 通过 coupon_id = id 关联,将详情字段合并到 ODS 快照
# 4. 执行 SCD2 合并(现有 _merge_dim_scd2 逻辑)
```
新增字段映射(`ods.group_buy_package_details``dwd.dim_groupbuy_package_ex`
| ODS 详情字段 | DWD 扩展表字段 | 类型 |
|-------------|---------------|------|
| `table_area_ids` | `table_area_ids` | JSONB |
| `table_area_names` | `table_area_names` | JSONB |
| `assistant_services` | `assistant_services` | JSONB |
| `groupon_site_infos` | `groupon_site_infos` | JSONB |
## 4. 数据库变更
### 4.1 新建 ODS 表
```sql
-- db/etl_feiqiu/ods/group_buy_package_details.sql
CREATE TABLE IF NOT EXISTS ods.group_buy_package_details (
coupon_id BIGINT NOT NULL,
package_name TEXT,
duration INTEGER, -- 台费计时时长(秒)
start_time TIMESTAMPTZ, -- 可用日期开始
end_time TIMESTAMPTZ, -- 可用日期结束
add_start_clock TEXT, -- 可用时段开始
add_end_clock TEXT, -- 可用时段结束
is_enabled INTEGER,
is_delete INTEGER,
site_id BIGINT,
tenant_id BIGINT,
create_time TIMESTAMPTZ,
creator_name TEXT,
-- JSONB 数组字段
table_area_ids JSONB, -- [2791960001957765, ...]
table_area_names JSONB, -- ["A区", ...]
assistant_services JSONB, -- [{skillId, assistantLevel, assistantDuration}, ...]
groupon_site_infos JSONB, -- [{siteId, siteName}, ...]
package_services JSONB, -- 待调研,可能为空
coupon_details_list JSONB, -- 待调研,可能为空
-- ETL 元数据
content_hash TEXT,
payload JSONB, -- 完整原始响应
fetched_at TIMESTAMPTZ DEFAULT now(),
-- 主键
CONSTRAINT pk_group_buy_package_details PRIMARY KEY (coupon_id)
);
COMMENT ON TABLE ods.group_buy_package_details IS '团购套餐详情 ODSQueryPackageCouponInfo 原始数据';
```
### 4.2 DWD 扩展表 ALTER
```sql
-- db/etl_feiqiu/migrations/2026-03-05__add_detail_fields_to_dim_groupbuy_package_ex.sql
ALTER TABLE dwd.dim_groupbuy_package_ex
ADD COLUMN IF NOT EXISTS table_area_ids JSONB,
ADD COLUMN IF NOT EXISTS table_area_names JSONB,
ADD COLUMN IF NOT EXISTS assistant_services JSONB,
ADD COLUMN IF NOT EXISTS groupon_site_infos JSONB;
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.table_area_ids IS '可用台区 ID 列表(来自详情接口 tableAreaId';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.table_area_names IS '可用台区名称列表(来自详情接口 tableAreaNameList';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.assistant_services IS '助教服务关联(来自详情接口 packageCouponAssistants';
COMMENT ON COLUMN dwd.dim_groupbuy_package_ex.groupon_site_infos IS '关联门店信息(来自详情接口 grouponSiteInfos';
```
## 5. 线程模型详细设计
详情阶段复用 `UnifiedPipeline` 的三层并发架构,与列表阶段完全一致:
```
UnifiedPipeline #2detail_mode
│ 主线程_request_loop
│ ┌─────────────────────────────────────────┐
│ │ for req in _build_detail_requests(spec): │
│ │ if cancel_token.is_cancelled: break │
│ │ resp = api.post(req.endpoint, params) │
│ │ processing_queue.put((req, resp)) │
│ │ rate_limiter.wait(cancel_token.event) │
│ └─────────────────┬───────────────────────┘
│ │
│ processing_queue.put(SENTINEL × worker_count)
│ 等待所有 worker 完成
│ write_queue.put(SENTINEL)
│ 等待 writer 完成
├──▶ Worker Thread 1 ──┐
├──▶ Worker Thread 2 ──┤
│ │
│ processing_queue │
│ ┌─────────────┐ │
│ │ (req, resp) │───▶ detail_process_fn(resp)
│ │ (req, resp) │ → 提取字段、计算 content_hash
│ │ SENTINEL │ write_queue.put(records)
│ └─────────────┘ │
│ │
│ ▼
│ Write Thread
│ ┌──────────────────┐
│ │ write_queue │
│ │ batch=batch_size │──▶ _insert_records_schema_aware()
│ │ timeout=5s │ → UPSERT ods.group_buy_package_details
│ │ SENTINEL │
│ └──────────────────┘
PipelineResultdetail_success / detail_failure / detail_skipped
```
关键设计点(均由 `PipelineConfig` 统一控制,支持任务级覆盖):
- `processing_queue``queue.Queue(maxsize=queue_size)`,满时阻塞主线程(背压机制)
- `write_queue``queue.Queue(maxsize=queue_size * 2)`
- Worker 数量:`PipelineConfig.workers`(默认 2
- Writer 批量写入:累积 `batch_size` 条或超时 `batch_timeout` 秒后执行
- SENTINEL`None` 对象,用于通知线程退出
- 取消信号:主线程检查 `cancel_token.is_cancelled``RateLimiter.wait()` 轮询 `cancel_token.event`
## 6. 取消信号
详情阶段的 `UnifiedPipeline` 实例与列表阶段共享同一个 `CancellationToken`。取消信号的上游传递链Admin-web → Backend → Orchestration → CancellationToken属于 etl-unified-pipeline 的架构范畴,本 spec 仅关注详情阶段收到取消信号后的行为:
1. `_request_loop` 检测到 `cancel_token.is_cancelled`,停止发送新请求
2. `RateLimiter.wait()` 在 0.5s 轮询周期内检测到取消,立即返回 `False`
3. 主线程发送 SENTINEL 到处理队列,等待已入队数据处理完成
4. `PipelineResult.cancelled = True``BaseOdsTask` 据此设置任务状态
## 7. 错误处理策略
错误处理由 `UnifiedPipeline` 统一管理,各阶段行为如下:
| 场景 | 处理方式 |
|------|---------|
| 单个 couponId 请求超时/HTTP 错误 | `_request_loop` 捕获异常,`request_failures++`,继续下一个 |
| 单个 couponId 返回 `code != 0` | 同上API 层异常) |
| 连续失败超过阈值 | `_request_loop` 中断,`PipelineResult.status = "FAILED"` |
| Worker 线程处理异常 | `_process_worker` 捕获异常,`processing_failures++`,继续消费队列 |
| Writer 线程写入失败 | `_write_worker` 捕获异常,`write_failures++`,继续消费队列 |
| 取消信号到达 | 停止新请求,等待已入队数据处理完成,`cancelled = True` |
`BaseOdsTask.execute()` 在详情阶段完成后,将 `detail_result` 的统计信息合并到任务结果中,并记录每个失败项的错误日志。
连续失败阈值:`PipelineConfig.max_consecutive_failures`(默认 10支持 `pipeline.ods_group_package.max_consecutive_failures` 任务级覆盖)。
## 8. 配置参数
详情阶段复用 `PipelineConfig` 统一配置体系,支持三级回退:`pipeline.ods_group_package.<key>``pipeline.<key>` → 硬编码默认值。
| 配置键 | 默认值 | 说明 |
|--------|--------|------|
| `pipeline.workers` | 2 | 处理线程数 |
| `pipeline.queue_size` | 100 | 处理队列容量 |
| `pipeline.batch_size` | 100 | 写入批量阈值 |
| `pipeline.batch_timeout` | 5.0 | 写入等待超时(秒) |
| `pipeline.rate_min` | 5.0 | RateLimiter 最小间隔(秒) |
| `pipeline.rate_max` | 20.0 | RateLimiter 最大间隔(秒) |
| `pipeline.max_consecutive_failures` | 10 | 连续失败中断阈值 |
如需为详情阶段单独调参,可通过 `pipeline.ods_group_package.*` 任务级覆盖(列表和详情阶段共享同一 `PipelineConfig` 实例)。
> 不再需要独立的 `DETAIL_FETCH_*` 配置参数。
## 9. 实施任务清单
### Task 1新建 ODS 详情表 DDL
- 创建 `db/etl_feiqiu/ods/group_buy_package_details.sql`
- 执行 DDL 到测试库
- 需求覆盖:需求 3 验收标准 1-4
### Task 2扩展 ODS_GROUP_PACKAGE 任务 — 配置详情拉取
-`tasks/ods/ods_tasks.py``ODS_GROUP_PACKAGE` OdsTaskSpec 中添加 `detail_endpoint` 等配置
- 实现自定义的 `_build_detail_process_fn` 字段提取逻辑
- 实现自定义的 `_build_detail_write_fn` 写入逻辑
- 复用 `BaseOdsTask.execute()` 已有的详情拉取流程(`UnifiedPipeline` + `RateLimiter` + `CancellationToken`
- 需求覆盖:需求 1 验收标准 1-8需求 2 验收标准 1-6需求 5 验收标准 1-4
### Task 3DWD 扩展表 ALTER + 加载逻辑
- 执行 ALTER TABLE 到测试库
- 修改 DWD 加载逻辑,从详情 ODS 表读取并合并到扩展表
- 需求覆盖:需求 4 验收标准 1-5
### Task 4数据调研 — 获取全部团购详情并分析未标注字段
- 编写一次性脚本调用详情接口获取全部数据
- 分析未标注字段的值分布
- 确认 `packagePackageService``packageCouponDetailsList` 是否有数据
- 根据分析结果调整 ODS/DWD 字段定义
- 需求覆盖:需求 3 验收标准 6附录 B 调研 3、4
### Task 5文档同步更新
- 更新 ODS DDL 文档、字段映射文档
- 更新 BD Manual
- 更新 DWD 全景文档
- 更新 README 任务清单
- 需求覆盖:需求 6 验收标准 1-4