7.5 KiB
7.5 KiB
实施计划:团购详情接口整合 ETL 数据流
概述
将飞球团购详情接口(QueryPackageCouponInfo)整合到现有 ETL 的 API → ODS → DWD 三层数据流。利用 BaseOdsTask 已有的 detail_endpoint 二级详情拉取模式(通过 UnifiedPipeline + RateLimiter + CancellationToken),在 ODS_GROUP_PACKAGE 任务的 OdsTaskSpec 中配置详情拉取参数,将详情数据写入新建的 ods.group_buy_package_details 表,并向下传导至 dwd.dim_groupbuy_package_ex。
任务
-
1. 新建 ODS 详情表 DDL
- 1.1 创建
db/etl_feiqiu/ods/group_buy_package_details.sql- 按设计文档 §4.1 定义表结构:
coupon_idBIGINT PK、结构化字段、JSONB 数组字段、ETL 元数据字段 - 添加表和列的 COMMENT
- 需求覆盖:需求 3 验收标准 1-4
- 按设计文档 §4.1 定义表结构:
- 1.2 在测试库
test_etl_feiqiu执行 DDL 验证表创建成功- 需求覆盖:需求 3 验收标准 1
- 1.1 创建
-
2. 扩展 ODS_GROUP_PACKAGE 任务 — 配置详情拉取
- 2.1 在
tasks/ods/ods_tasks.py的ODS_GROUP_PACKAGEOdsTaskSpec 中添加 detail_endpoint 配置- 设置
detail_endpoint="/PackageCoupon/QueryPackageCouponInfo" - 设置
detail_param_builder=lambda rec: {"couponId": rec["id"]}(将列表表的id映射为详情接口的couponId参数) - 设置
detail_target_table="ods.group_buy_package_details" - 设置
detail_data_path=("data",) - 设置
detail_id_column="id"(从ods.group_buy_packages提取 couponId 列表) - 复用
BaseOdsTask.execute()已有的详情拉取流程(UnifiedPipeline+RateLimiter+CancellationToken) - 需求覆盖:需求 1 验收标准 1-8,需求 2 验收标准 1-6,需求 5 验收标准 1-4
- 设置
- 2.2 实现详情响应的
_build_detail_process_fn字段提取逻辑- 从
data.groupPurchasePackage提取结构化字段(package_name、duration、start_time、end_time、add_start_clock、add_end_clock、is_enabled、is_delete、site_id、tenant_id、create_time、creator_name) - 从
data.groupPurchasePackage.tableAreaId/tableAreaNameList提取台区数组为 JSONB - 从
data.packageCouponAssistants提取助教服务关联数组为 JSONB - 从
data.grouponSiteInfos提取关联门店数组为 JSONB - 从
data.packagePackageService和data.packageCouponDetailsList提取为 JSONB - 计算
content_hash,保留完整原始响应为payload - 需求覆盖:需求 2 验收标准 4,需求 3 验收标准 2-4
- 从
- 2.3 实现详情数据的
_build_detail_write_fn写入逻辑- 采用全量快照模式(
SnapshotMode.FULL_TABLE)写入ods.group_buy_package_details - UPSERT on
coupon_id,每次运行覆盖全部记录 - 需求覆盖:需求 3 验收标准 5
- 采用全量快照模式(
- 2.4 编写 ODS 详情拉取的单元测试
- 测试
detail_param_builder参数构造 - 测试字段提取逻辑(正常响应、空数组、缺失字段)
- 测试
content_hash计算一致性 - 需求覆盖:需求 2 验收标准 4-5
- 测试
- 2.1 在
-
3. 检查点 — ODS 层验证
- 确保所有测试通过,ask the user if questions arise。
- 用
--dry-run --tasks ODS_GROUP_PACKAGE验证任务注册和配置正确
-
4. DWD 扩展表 ALTER + 加载逻辑
- 4.1 创建迁移脚本
db/etl_feiqiu/migrations/2026-03-05__add_detail_fields_to_dim_groupbuy_package_ex.sql- ALTER TABLE 添加 4 个 JSONB 列:
table_area_ids、table_area_names、assistant_services、groupon_site_infos - 添加列 COMMENT
- 在测试库
test_etl_feiqiu执行迁移验证 - 需求覆盖:需求 4 验收标准 1
- ALTER TABLE 添加 4 个 JSONB 列:
- 4.2 修改
tasks/dwd/dwd_load_task.py的TABLE_MAPPING和COLUMN_OVERRIDES- 在
COLUMN_OVERRIDES["dwd.dim_groupbuy_package_ex"]中新增 4 个详情字段的映射 - 需求覆盖:需求 4 验收标准 2
- 在
- 4.3 修改
_fetch_source_rows或_merge_dim_scd2流程,在加载dim_groupbuy_package_ex时 LEFT JOINods.group_buy_package_details- 通过
ods.group_buy_packages.id = ods.group_buy_package_details.coupon_id关联 - 将详情表的
table_area_ids、table_area_names、assistant_services、groupon_site_infos合并到 ODS 快照行 - 当详情表中
coupon_id在列表表中不存在时,记录警告日志并跳过 - 新增字段自动纳入 SCD2 变更检测(现有
_is_row_changed逻辑已支持 JSONB 比较) - 需求覆盖:需求 4 验收标准 2-5
- 通过
- 4.4 编写 DWD 加载扩展的单元测试
- 测试 LEFT JOIN 逻辑:详情存在 / 详情缺失 / 详情多余(应跳过并警告)
- 测试 SCD2 变更检测包含新增 JSONB 字段
- 需求覆盖:需求 4 验收标准 3-5
- 4.1 创建迁移脚本
-
5. 检查点 — DWD 层验证
- 确保所有测试通过,ask the user if questions arise。
- 用
--dry-run --tasks DWD_LOAD_FROM_ODS验证 DWD 加载配置正确
-
6. 数据调研 — 获取全部团购详情并分析未标注字段
- 6.1 编写一次性调研脚本
apps/etl/connectors/feiqiu/scripts/research_coupon_details.py- 使用
load_dotenv加载根.env,通过AppConfig.load()获取配置 - 连接测试库
test_etl_feiqiu(TEST_DB_DSN) - 从
ods.group_buy_packages读取所有coupon_id - 串行调用详情接口(复用
RateLimiter(5, 20)),将原始响应存入ods.group_buy_package_details.payload - 需求覆盖:附录 B 调研 3、4
- 使用
- 6.2 分析未标注字段的值分布
- 对附录 A 中标记为 ❌ 的字段,统计所有记录的值分布
- 所有记录值完全相同的字段 → 标记为忽略
- 值有变化的字段 → 推测用途,输出分析报告到
docs/reports/ - 确认
packagePackageService和packageCouponDetailsList是否有非空数据 - 根据分析结果调整 ODS 表字段定义和 DWD 映射(如需要)
- 需求覆盖:需求 3 验收标准 6,附录 B 调研 3、4
- 6.1 编写一次性调研脚本
-
7. 文档同步更新
- 7.1 更新 ODS 层文档
- 更新 ODS DDL 文档(新增
ods.group_buy_package_details表定义) - 更新 ODS 字段映射文档(新增
QueryPackageCouponInfo→ods.group_buy_package_details映射) - 更新 BD Manual(
docs/database/BD_Manual_*.md)新增详情表说明 - 需求覆盖:需求 6 验收标准 1
- 更新 ODS DDL 文档(新增
- 7.2 更新 DWD 层文档
- 更新 DWD 全景文档(
docs/reports/dwd-panorama/dwd-dimension-panorama.md中dim_groupbuy_package_ex章节) - 更新 DWD 表结构概览文档,反映新增的 4 个 JSONB 字段
- 需求覆盖:需求 6 验收标准 2
- 更新 DWD 全景文档(
- 7.3 更新 ETL README 和架构文档
- 更新 README 任务清单,反映
ODS_GROUP_PACKAGE任务新增的详情拉取子流程 - 需求覆盖:需求 6 验收标准 3
- 更新 README 任务清单,反映
- 7.1 更新 ODS 层文档
-
8. 最终检查点 — 全量验证
- 37 个单元测试全部通过(ODS 16 + DWD 12 + column ref 9)
备注
- 标记
*的子任务为可选,可跳过以加速 MVP 交付 RateLimiter(api/rate_limiter.py)和CancellationToken(utils/cancellation.py)已在 etl-unified-pipeline 中实现,本 spec 直接复用,不重复创建BaseOdsTask.execute()已内置detail_endpoint二级详情拉取模式(通过UnifiedPipeline),Task 2 利用此现有机制而非新建独立的DetailFetcher类- 每个任务引用具体需求条款以确保可追溯性
- 检查点确保增量验证,避免问题累积