2025-11-30 07:19:05 +08:00
2025-11-19 05:05:11 +08:00
2025-11-30 07:19:05 +08:00
2025-11-30 07:19:05 +08:00
2025-11-18 02:28:47 +08:00
2025-11-18 02:28:47 +08:00
2025-11-30 07:19:05 +08:00
2025-11-30 07:19:05 +08:00
2025-11-30 07:19:05 +08:00
2025-11-30 07:19:05 +08:00
2025-11-30 07:19:05 +08:00
2025-11-18 02:28:47 +08:00
2025-11-18 02:28:47 +08:00
2025-11-30 07:19:05 +08:00

台球场 ETL 系统

用于台球门店业务的数据采集与入湖:从上游 API 拉取订单、支付、会员、库存等数据,先落地 ODS再清洗写入事实/维度表,并提供运行追踪、增量游标、数据质量检查与测试脚手架。

核心特性

  • 两阶段链路ODS 原始留痕 + DWD/事实表清洗,支持回放与重跑。
  • 任务注册与调度TaskRegistry 统一管理任务代码,ETLScheduler 负责游标、运行记录和失败隔离。
  • 统一底座:配置(默认值 + .env + CLI 覆盖)、分页/重试的 API 客户端、批量 Upsert 的数据库封装、SCD2 维度处理、质量检查。
  • 测试与回放ONLINE/OFFLINE 模式切换,run_tests.py/test_presets.py 支持参数化测试;MANUAL_INGEST 可将归档 JSON 重灌入 ODS。
  • 可安装setup.py / entry_point 提供 etl-billiards 命令,或直接 python -m cli.main 运行。

仓库结构(摘录)

  • etl_billiards/config:默认配置、环境变量解析、配置加载。
  • etl_billiards/apiHTTP 客户端,内置重试/分页。
  • etl_billiards/database:连接管理、批量 Upsert。
  • etl_billiards/tasks业务任务ORDERS、PAYMENTS…、ODS 任务、DWD 任务、人工回放;base_task.py/base_dwd_task.py 提供模板。
  • etl_billiards/loaders:事实/维度/ODS Loaderscd/ 为 SCD2。
  • etl_billiards/orchestration:调度器、任务注册表、游标与运行追踪。
  • etl_billiards/scripts:测试执行器、数据库连通性检测、预置测试指令。
  • etl_billiards/tests:单元/集成测试与离线 JSON 归档。

支持的任务代码

  • 事实/维度ORDERSPAYMENTSREFUNDSINVENTORY_CHANGECOUPON_USAGEMEMBERSASSISTANTSPRODUCTSTABLESPACKAGES_DEFTOPUPSTABLE_DISCOUNTASSISTANT_ABOLISHLEDGERTICKET_DWDPAYMENTS_DWDMEMBERS_DWD
  • ODS 原始采集ODS_ORDER_SETTLEODS_TABLE_USEODS_ASSISTANT_LEDGERODS_ASSISTANT_ABOLISHODS_GOODS_LEDGERODS_PAYMENTODS_REFUNDODS_COUPON_VERIFYODS_MEMBERODS_MEMBER_CARDODS_PACKAGEODS_INVENTORY_STOCKODS_INVENTORY_CHANGE
  • 辅助MANUAL_INGEST(将归档 JSON 回放到 ODS

快速开始

  1. 环境要求Python 3.10+、PostgreSQL。推荐在 etl_billiards/ 目录下执行命令。
  2. 安装依赖
    cd etl_billiards
    pip install -r requirements.txt
    # 开发模式pip install -e .
    
  3. 配置 .env
    cp .env.example .env
    # 核心项
    PG_DSN=postgresql://user:pwd@host:5432/LLZQ
    API_BASE=https://api.example.com
    API_TOKEN=your_token
    STORE_ID=2790685415443269
    EXPORT_ROOT=/path/to/export
    LOG_ROOT=/path/to/logs
    
    配置的生效顺序为 “默认值” < “环境变量/.env” < “CLI 参数”。
  4. 运行任务
    # 运行默认任务集
    python -m cli.main
    
    # 按需选择任务(逗号分隔)
    python -m cli.main --tasks ODS_ORDER_SETTLE,ORDERS,PAYMENTS
    
    # Dry-run 示例(不提交事务)
    python -m cli.main --tasks ORDERS --dry-run
    
    # Windows 批处理
    ..\\run_etl.bat --tasks PAYMENTS
    
  5. 查看输出:日志目录与导出目录分别由 LOG_ROOTEXPORT_ROOT 控制;运行追踪与游标记录写入数据库 etl_admin.* 表。

数据与运行流转

  • CLI 解析参数 → AppConfig.load() 组装配置 → ETLScheduler 创建 DB/API/游标/运行追踪器。
  • 调度器按任务代码实例化任务,读取/推进游标,落盘运行记录。
  • 任务模板:确定时间窗口 → 调用 API/ODS 数据 → 解析校验 → Loader 批量 Upsert/SCD2 → 质量检查 → 提交事务并回写游标。

测试与回放

  • 单元/集成测试:pytestpython scripts/run_tests.py --suite online
  • 预置组合:python scripts/run_tests.py --preset offline_realdb(见 scripts/test_presets.py)。
  • 离线模式:TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=... pytest tests/unit/test_etl_tasks_offline.py
  • 数据库连通性:python scripts/test_db_connection.py --dsn postgresql://... --query "SELECT 1"

其他提示

  • .env.example 列出了所有常用配置;config/defaults.py 记录默认值与任务窗口配置。
  • loaders/ods/generic.py 支持定义主键/列名即可落 ODStasks/manual_ingest_task.py 可将归档 JSON 快速灌入对应 ODS 表。
  • 需要新增任务时,在 tasks/ 中实现并在 orchestration/task_registry.py 注册即可复用调度能力。
Description
从非球 接口获取数据,处理到入库
Readme 1.3 MiB
Languages
Python 99.9%