# 台球场 ETL 系统 用于台球门店业务的数据采集与入湖:从上游 API 拉取订单、支付、会员、库存等数据,先落地 ODS,再清洗写入事实/维度表,并提供运行追踪、增量游标、数据质量检查与测试脚手架。 ## 核心特性 - **两阶段链路**:ODS 原始留痕 + DWD/事实表清洗,支持回放与重跑。 - **任务注册与调度**:`TaskRegistry` 统一管理任务代码,`ETLScheduler` 负责游标、运行记录和失败隔离。 - **统一底座**:配置(默认值 + `.env` + CLI 覆盖)、分页/重试的 API 客户端、批量 Upsert 的数据库封装、SCD2 维度处理、质量检查。 - **测试与回放**:ONLINE/OFFLINE 模式切换,`run_tests.py`/`test_presets.py` 支持参数化测试;`MANUAL_INGEST` 可将归档 JSON 重灌入 ODS。 - **可安装**:`setup.py` / `entry_point` 提供 `etl-billiards` 命令,或直接 `python -m cli.main` 运行。 ## 仓库结构(摘录) - `etl_billiards/config`:默认配置、环境变量解析、配置加载。 - `etl_billiards/api`:HTTP 客户端,内置重试/分页。 - `etl_billiards/database`:连接管理、批量 Upsert。 - `etl_billiards/tasks`:业务任务(ORDERS、PAYMENTS…)、ODS 任务、DWD 任务、人工回放;`base_task.py`/`base_dwd_task.py` 提供模板。 - `etl_billiards/loaders`:事实/维度/ODS Loader;`scd/` 为 SCD2。 - `etl_billiards/orchestration`:调度器、任务注册表、游标与运行追踪。 - `etl_billiards/scripts`:测试执行器、数据库连通性检测、预置测试指令。 - `etl_billiards/tests`:单元/集成测试与离线 JSON 归档。 ## 支持的任务代码 - **事实/维度**:`ORDERS`、`PAYMENTS`、`REFUNDS`、`INVENTORY_CHANGE`、`COUPON_USAGE`、`MEMBERS`、`ASSISTANTS`、`PRODUCTS`、`TABLES`、`PACKAGES_DEF`、`TOPUPS`、`TABLE_DISCOUNT`、`ASSISTANT_ABOLISH`、`LEDGER`、`TICKET_DWD`、`PAYMENTS_DWD`、`MEMBERS_DWD`。 - **ODS 原始采集**:`ODS_ORDER_SETTLE`、`ODS_TABLE_USE`、`ODS_ASSISTANT_LEDGER`、`ODS_ASSISTANT_ABOLISH`、`ODS_GOODS_LEDGER`、`ODS_PAYMENT`、`ODS_REFUND`、`ODS_COUPON_VERIFY`、`ODS_MEMBER`、`ODS_MEMBER_CARD`、`ODS_PACKAGE`、`ODS_INVENTORY_STOCK`、`ODS_INVENTORY_CHANGE`。 - **辅助**:`MANUAL_INGEST`(将归档 JSON 回放到 ODS)。 ## 快速开始 1. **环境要求**:Python 3.10+、PostgreSQL。推荐在 `etl_billiards/` 目录下执行命令。 2. **安装依赖** ```bash cd etl_billiards pip install -r requirements.txt # 开发模式:pip install -e . ``` 3. **配置 `.env`** ```bash cp .env.example .env # 核心项 PG_DSN=postgresql://user:pwd@host:5432/LLZQ API_BASE=https://api.example.com API_TOKEN=your_token STORE_ID=2790685415443269 EXPORT_ROOT=/path/to/export LOG_ROOT=/path/to/logs ``` 配置的生效顺序为 “默认值” < “环境变量/.env” < “CLI 参数”。 4. **运行任务** ```bash # 运行默认任务集 python -m cli.main # 按需选择任务(逗号分隔) python -m cli.main --tasks ODS_ORDER_SETTLE,ORDERS,PAYMENTS # Dry-run 示例(不提交事务) python -m cli.main --tasks ORDERS --dry-run # Windows 批处理 ..\\run_etl.bat --tasks PAYMENTS ``` 5. **查看输出**:日志目录与导出目录分别由 `LOG_ROOT`、`EXPORT_ROOT` 控制;运行追踪与游标记录写入数据库 `etl_admin.*` 表。 ## 数据与运行流转 - CLI 解析参数 → `AppConfig.load()` 组装配置 → `ETLScheduler` 创建 DB/API/游标/运行追踪器。 - 调度器按任务代码实例化任务,读取/推进游标,落盘运行记录。 - 任务模板:确定时间窗口 → 调用 API/ODS 数据 → 解析校验 → Loader 批量 Upsert/SCD2 → 质量检查 → 提交事务并回写游标。 ## 测试与回放 - 单元/集成测试:`pytest` 或 `python scripts/run_tests.py --suite online`。 - 预置组合:`python scripts/run_tests.py --preset offline_realdb`(见 `scripts/test_presets.py`)。 - 离线模式:`TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=... pytest tests/unit/test_etl_tasks_offline.py`。 - 数据库连通性:`python scripts/test_db_connection.py --dsn postgresql://... --query "SELECT 1"`。 ## 其他提示 - `.env.example` 列出了所有常用配置;`config/defaults.py` 记录默认值与任务窗口配置。 - `loaders/ods/generic.py` 支持定义主键/列名即可落 ODS;`tasks/manual_ingest_task.py` 可将归档 JSON 快速灌入对应 ODS 表。 - 需要新增任务时,在 `tasks/` 中实现并在 `orchestration/task_registry.py` 注册即可复用调度能力。