台球场 ETL 系统
用于台球门店业务的数据采集与入湖:从上游 API 拉取订单、支付、会员、库存等数据,先落地 ODS,再清洗写入事实/维度表,并提供运行追踪、增量游标、数据质量检查与测试脚手架。
核心特性
- 两阶段链路:ODS 原始留痕 + DWD/事实表清洗,支持回放与重跑。
- 任务注册与调度:
TaskRegistry统一管理任务代码,ETLScheduler负责游标、运行记录和失败隔离。 - 统一底座:配置(默认值 +
.env+ CLI 覆盖)、分页/重试的 API 客户端、批量 Upsert 的数据库封装、SCD2 维度处理、质量检查。 - 测试与回放:ONLINE/OFFLINE 模式切换,
run_tests.py/test_presets.py支持参数化测试;MANUAL_INGEST可将归档 JSON 重灌入 ODS。 - 可安装:
setup.py/entry_point提供etl-billiards命令,或直接python -m cli.main运行。
仓库结构(摘录)
etl_billiards/config:默认配置、环境变量解析、配置加载。etl_billiards/api:HTTP 客户端,内置重试/分页。etl_billiards/database:连接管理、批量 Upsert。etl_billiards/tasks:业务任务(ORDERS、PAYMENTS…)、ODS 任务、DWD 任务、人工回放;base_task.py/base_dwd_task.py提供模板。etl_billiards/loaders:事实/维度/ODS Loader;scd/为 SCD2。etl_billiards/orchestration:调度器、任务注册表、游标与运行追踪。etl_billiards/scripts:测试执行器、数据库连通性检测、预置测试指令。etl_billiards/tests:单元/集成测试与离线 JSON 归档。
支持的任务代码
- 事实/维度:
ORDERS、PAYMENTS、REFUNDS、INVENTORY_CHANGE、COUPON_USAGE、MEMBERS、ASSISTANTS、PRODUCTS、TABLES、PACKAGES_DEF、TOPUPS、TABLE_DISCOUNT、ASSISTANT_ABOLISH、LEDGER、TICKET_DWD、PAYMENTS_DWD、MEMBERS_DWD。 - ODS 原始采集:
ODS_ORDER_SETTLE、ODS_TABLE_USE、ODS_ASSISTANT_LEDGER、ODS_ASSISTANT_ABOLISH、ODS_GOODS_LEDGER、ODS_PAYMENT、ODS_REFUND、ODS_COUPON_VERIFY、ODS_MEMBER、ODS_MEMBER_CARD、ODS_PACKAGE、ODS_INVENTORY_STOCK、ODS_INVENTORY_CHANGE。 - 辅助:
MANUAL_INGEST(将归档 JSON 回放到 ODS)。
快速开始
- 环境要求:Python 3.10+、PostgreSQL。推荐在
etl_billiards/目录下执行命令。 - 安装依赖
cd etl_billiards pip install -r requirements.txt # 开发模式:pip install -e . - 配置
.env配置的生效顺序为 “默认值” < “环境变量/.env” < “CLI 参数”。cp .env.example .env # 核心项 PG_DSN=postgresql://user:pwd@host:5432/LLZQ API_BASE=https://api.example.com API_TOKEN=your_token STORE_ID=2790685415443269 EXPORT_ROOT=/path/to/export LOG_ROOT=/path/to/logs - 运行任务
# 运行默认任务集 python -m cli.main # 按需选择任务(逗号分隔) python -m cli.main --tasks ODS_ORDER_SETTLE,ORDERS,PAYMENTS # Dry-run 示例(不提交事务) python -m cli.main --tasks ORDERS --dry-run # Windows 批处理 ..\\run_etl.bat --tasks PAYMENTS - 查看输出:日志目录与导出目录分别由
LOG_ROOT、EXPORT_ROOT控制;运行追踪与游标记录写入数据库etl_admin.*表。
数据与运行流转
- CLI 解析参数 →
AppConfig.load()组装配置 →ETLScheduler创建 DB/API/游标/运行追踪器。 - 调度器按任务代码实例化任务,读取/推进游标,落盘运行记录。
- 任务模板:确定时间窗口 → 调用 API/ODS 数据 → 解析校验 → Loader 批量 Upsert/SCD2 → 质量检查 → 提交事务并回写游标。
测试与回放
- 单元/集成测试:
pytest或python scripts/run_tests.py --suite online。 - 预置组合:
python scripts/run_tests.py --preset offline_realdb(见scripts/test_presets.py)。 - 离线模式:
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=... pytest tests/unit/test_etl_tasks_offline.py。 - 数据库连通性:
python scripts/test_db_connection.py --dsn postgresql://... --query "SELECT 1"。
其他提示
.env.example列出了所有常用配置;config/defaults.py记录默认值与任务窗口配置。loaders/ods/generic.py支持定义主键/列名即可落 ODS;tasks/manual_ingest_task.py可将归档 JSON 快速灌入对应 ODS 表。- 需要新增任务时,在
tasks/中实现并在orchestration/task_registry.py注册即可复用调度能力。
Description
Languages
Python
99.9%