13 KiB
飞球 ETL 系统(ODS → DWD)合并版
本文在不改变现有用法的前提下,整合了两份历史说明的有效信息;所有冲突以当前仓库中的
README.md为准,补充了架构、测试、迁移、ODS 任务指引、表映射与 DWD 建模原则等内容。
1. 项目概述
面向门店业务的数据工程项目:从上游业务 API 或本地离线 JSON 拉取原始数据,先落地 ODS,再清洗装载 DWD(含 SCD2 维度历史与事实增量),并提供数据质量校验报表。项目采用模块化、分层架构(配置、API、数据库、加载器/SCD/质量、编排、任务、CLI、测试),统一通过 CLI 调度执行。
2. 快速运行(离线示例 JSON)
环境要求: Python 3.10+、PostgreSQL 可用;.env 中关键项:
PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-testINGEST_SOURCE_DIR=C:\dev\LLTQ\export\test-json-doc
安装依赖:
cd etl_billiards
pip install -r requirements.txt
一键 ODS→DWD→质量校验(离线回放):
# 初始化 ODS + DWD
python -m etl_billiards.cli.main --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA --pipeline-flow INGEST_ONLY
# 灌入示例 JSON 到 ODS(可用 .env 中的 INGEST_SOURCE_DIR 覆盖)
python -m etl_billiards.cli.main --tasks MANUAL_INGEST --pipeline-flow INGEST_ONLY --ingest-source "C:\dev\LLTQ\export\test-json-doc"
# 从 ODS 装载到 DWD
python -m etl_billiards.cli.main --tasks DWD_LOAD_FROM_ODS --pipeline-flow INGEST_ONLY
# 质量校验报表
python -m etl_billiards.cli.main --tasks DWD_QUALITY_CHECK --pipeline-flow INGEST_ONLY
# 报表输出:etl_billiards/reports/dwd_quality_report.json
可按需单独运行:
- 仅建表:
python -m etl_billiards.cli.main --tasks INIT_ODS_SCHEMA- 仅 ODS 灌入:
python -m etl_billiards.cli.main --tasks MANUAL_INGEST- 仅 DWD 装载:
python -m etl_billiards.cli.main --tasks INIT_DWD_SCHEMA,DWD_LOAD_FROM_ODS
3. 配置与目录约定
- 示例数据目录:
C:\dev\LLTQ\export\test-json-doc(可通过.env的INGEST_SOURCE_DIR覆盖)。 - 日志/导出目录:
LOG_ROOT、EXPORT_ROOT见.env。 - 质量报表:
etl_billiards/reports/dwd_quality_report.json。 - ODS/DWD DDL:
etl_billiards/database/schema_ODS_doc.sql、etl_billiards/database/schema_dwd_doc.sql。 - 任务注册:
etl_billiards/orchestration/task_registry.py(默认启用 INIT_ODS_SCHEMA、MANUAL_INGEST、INIT_DWD_SCHEMA、DWD_LOAD_FROM_ODS、DWD_QUALITY_CHECK)。
安全提示: 建议将数据库口令仅保存在 .env 或密钥管理平台,避免在示例或脚本中硬编码;生产环境请使用具有最小权限的数据库账号。
4. 目录结构与主要文件(精要)
- 根目录:
etl_billiards/主代码;requirements.txt依赖;run_etl.sh/.bat启动脚本;.env/.env.example配置;tmp/放置草稿/备份。 config/:defaults.py默认值,env_parser.py解析.env,settings.py统一配置加载。api/:client.pyHTTP 请求、重试与分页。database/:connection.py连接封装,operations.py批量 upsert,DDL SQL。tasks/:业务任务(见第 9 章与第 10 章)。loaders/:ODS/DWD/SCD2 Loader 实现。scd/:scd2_handler.py处理维度 SCD2 历史。quality/:质量检查器(行数/金额对照等)。orchestration/:scheduler.py调度;task_registry.py注册;run_tracker.py运行记录;cursor_manager.py增量游标。docs/:映射与示例文档。reports/:质检输出(如dwd_quality_report.json)。tests/:单元/集成测试;utils/:通用工具。
5. 架构与流程
CLI (etl_billiards/cli/main.py)
↓
Orchestration (scheduler / task_registry / run_tracker / cursor_manager)
↓
Tasks (orders/payments/members/…)
↙ ↓ ↘
Loaders SCD2 Quality
↓
Models
↓
API
↓
Database
↓
Config
执行模板(任务层): 统一模板方法包含:读取增量窗口 → 拉取数据(支持分页/重试)→ 解析与校验 → Loader 写库(Upsert/SCD2)→ 质量检查 → 更新游标与运行记录。
6. ODS → DWD 双阶段策略
为支持回溯/重放与稳定建模,项目采用双阶段:
- ODS 落地:将原始记录(含分页/抓取时间/来源)按表落地到
billiards_ods.*,保留源主键与 payload 以保证可追溯。 - DWD 清洗:从 ODS 读取 payload,进行解析、标准化与 SCD2 处理,写入
billiards.*的维度/事实表;事实表走时间/水位增量。
7. 常用命令(CLI)
# 运行全部已注册任务(按 task_registry)
python -m etl_billiards.cli.main
# 仅运行指定任务
python -m etl_billiards.cli.main --tasks ORDERS,PAYMENTS,MEMBERS
# 覆盖数据库 DSN
python -m etl_billiards.cli.main --pg-dsn "postgresql://user:pwd@host:5432/db"
# 覆盖上游 API 端点
python -m etl_billiards.cli.main --api-base "https://api.example.com" --api-token "..."
# 试运行(不写库)
python -m etl_billiards.cli.main --dry-run --tasks ORDERS
实际启用的任务代码与管道以当前
task_registry.py与数据库任务配置为准。
8. 测试(ONLINE / OFFLINE)
TEST_MODE=ONLINE:模拟实时 API,完整 E/T/L。TEST_MODE=OFFLINE:从TEST_JSON_ARCHIVE_DIR指定的归档 JSON 读取,仅做 Transform + Load(适合回放)。TEST_DB_DSN:如设置,则集成测试连接真实测试库执行写库;未设置则用内存伪库。
示例:
# 在线模式覆盖任务
TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py
# 离线模式回放归档 JSON
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/source-data-doc pytest tests/unit/test_etl_tasks_offline.py
# 脚本化组合
python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb
python scripts/run_tests.py --preset offline_realdb
数据库连通性检查:
python scripts/test_db_connection.py --dsn postgresql://user:pwd@host:5432/db --query "SELECT 1"
9. 开发与扩展指南
添加新任务:
from tasks.base_task import BaseTask
class MyTask(BaseTask):
def get_task_code(self) -> str:
return "MY_TASK"
def execute(self) -> dict:
window_start, window_end, _ = self._get_time_window()
records, _ = self.api.get_paginated(...)
parsed = [self._parse(r) for r in records]
loader = MyLoader(self.db)
inserted, updated, _ = loader.upsert(parsed)
self.db.commit()
return self._build_result("SUCCESS", {"inserted": inserted, "updated": updated})
在 orchestration/task_registry.py 注册:
from tasks.my_task import MyTask
default_registry.register("MY_TASK", MyTask)
添加新 Loader / Checker: 参见 loaders/、quality/ 模块,优先复用批量 upsert 与返回写入统计的统一接口。
10. ODS 任务上线指引(摘要)
- 任务注册脚本:
etl_billiards/database/seed_ods_tasks.sql,将其中store_id替换为实际值后执行:
psql "$PG_DSN" -f etl_billiards/database/seed_ods_tasks.sql
- 调度:确认
etl_admin.etl_task中已启用所需 ODS 任务(任务代码见 seed 脚本)。 - 离线回灌:开发环境可用
rebuild_ods_from_json初始化 ODS(生产慎用)。 - 测试:
pytest etl_billiards/tests/unit/test_ods_tasks.py覆盖核心 ODS 任务。
11. ODS 表映射总览(节选)
| ODS 表名 | 接口 Path | 数据列表路径 |
|---|---|---|
assistant_accounts_master |
/PersonnelManagement/SearchAssistantInfo |
data.assistantInfos |
assistant_service_records |
/AssistantPerformance/GetOrderAssistantDetails |
data.orderAssistantDetails |
assistant_cancellation_records |
/AssistantPerformance/GetAbolitionAssistant |
data.abolitionAssistants |
goods_stock_movements |
/GoodsStockManage/QueryGoodsOutboundReceipt |
data.queryDeliveryRecordsList |
goods_stock_summary |
/TenantGoods/GetGoodsStockReport |
data |
group_buy_packages |
/PackageCoupon/QueryPackageCouponList |
data.packageCouponList |
group_buy_redemption_records |
/Site/GetSiteTableUseDetails |
data.siteTableUseDetailsList |
member_profiles |
/MemberProfile/GetTenantMemberList |
data.tenantMemberInfos |
member_balance_changes |
/MemberProfile/GetMemberCardBalanceChange |
data.tenantMemberCardLogs |
member_stored_value_cards |
/MemberProfile/GetTenantMemberCardList |
data.tenantMemberCards |
payment_transactions |
/PayLog/GetPayLogListPage |
data |
platform_coupon_redemption_records |
/Promotion/GetOfflineCouponConsumePageList |
data |
recharge_settlements |
/Site/GetRechargeSettleList |
data.settleList |
refund_transactions |
/Order/GetRefundPayLogList |
data |
settlement_records |
/Site/GetAllOrderSettleList |
data.settleList |
settlement_ticket_details |
/Order/GetOrderSettleTicketNew |
(整包原始 JSON) |
site_tables_master |
/Table/GetSiteTables |
data.siteTables |
stock_goods_category_tree |
/TenantGoodsCategory/QueryPrimarySecondaryCategory |
data.goodsCategoryList |
store_goods_master |
/TenantGoods/GetGoodsInventoryList |
data.orderGoodsList |
store_goods_sales_records |
/TenantGoods/GetGoodsSalesList |
data.orderGoodsLedgers |
table_fee_discount_records |
/Site/GetTaiFeeAdjustList |
data.taiFeeAdjustInfos |
table_fee_transactions |
/Site/GetSiteTableOrderDetails |
data.siteTableUseDetailsList |
tenant_goods_master |
/TenantGoods/QueryTenantGoods |
data.tenantGoodsList |
完整清单与字段级映射请结合
docs/下映射文档与各 ODS/DWD DDL 参照执行。
12. DWD 维度与建模原则(核心要点)
- 粒度唯一、原子:一张 DWD 表只承载一种业务事件粒度(结账、台费流水、助教服务、余额变动等),禁止混粒度与混汇总。
- 以业务过程建模:先梳理真实业务链路,再拆表/拆维度,不以 JSON 列表结构生搬硬套。
- 主键明确、外键统一:统一命名(如
site_id、member_id、table_id、order_settle_id、order_trade_no),避免后续主题层拼接歧义。 - 保留明细,不做过度汇总:DWD 仅存明细,聚合到 DWS 完成。
- 统一清洗且可追溯:类型/单位/枚举标准化,并保留源主键与原始字段确保可追溯。
- 扁平化、去嵌套:数组拆子表,重复 profile 抽维度,去除分页壳。
- 长期稳定、可扩展:优先加字段/增表/在 DWS 派生,避免频繁重构 DWD 表。
13. 当前状态(2025-12-09)
- 示例 JSON 已全量灌入;DWD 行数与 ODS 对齐。
- 分类维度已展平大类+子类:
dim_goods_category26 行(category_level/leaf已赋值)。 - 部分空值来自上游源数据为空;补值需先确认上游是否提供。
14. 可精简/归档
tmp/、tmp/etl_billiards_misc/、etl_billiards/tmp & Delete/下草稿/旧备份/一次性脚本不参与运行,可按需归档或清理。- 顶层仅保留必要文件(README、requirements、run_etl.*、.env/.env.example);其余临时文件已移至
tmp/。
15. 常见问题(FAQ)
- 字段空值:如映射存在且源列非空仍为空,先检查上游 JSON 是否缺值;维度 SCD2 按全量合并。
- DSN/目录缺失:确认
.env中PG_DSN、INGEST_SOURCE_DIR与本地一致。 - 新增任务:在
tasks/实现并注册到task_registry.py,必要时补充 DDL 与映射。 - 连接/权限:网络/账号/权限检查;脚本执行权限(如
chmod +x run_etl.sh)。
以上为合并后的单点说明文档。若需拆分为「快速开始 / 架构设计 / 迁移指南 / 开发扩展」等多份文档,可按章节直接拆分。