2026-01-27 22:47:24 +08:00
2026-01-27 22:47:05 +08:00
2026-01-27 22:47:05 +08:00
2026-01-27 22:47:05 +08:00
2026-01-27 22:47:05 +08:00
2025-11-18 02:28:47 +08:00
2026-01-27 22:47:05 +08:00
2026-01-27 22:47:05 +08:00
2025-11-18 02:28:47 +08:00
2025-11-18 02:28:47 +08:00
2026-01-27 22:47:05 +08:00

飞球 ETL 系统ODS → DWD

面向门店业务的 ETL从上游 API 或离线 JSON 采集订单、支付、会员、库存等数据,先落地 ODS,再清洗装载 DWD(含 SCD2 维度、事实增量),并输出质量校验报表。

快速运行(离线示例 JSON

以下命令默认在 etl_billiards/ 目录执行(项目会从 etl_billiards/.env 读取配置;也可直接设置环境变量)。

  1. 环境Python 3.10+、PostgreSQL。
  2. 配置:编辑 etl_billiards/.env(或设环境变量),至少包含:
    STORE_ID=123
    PG_DSN=postgresql://<user>:<password>@<host>:<port>/<db>
    # 示例:使用仓库内置的最小样例(仅 1 个 JSON
    INGEST_SOURCE_DIR=../tmp/single_ingest
    
  3. 安装依赖:
    cd etl_billiards
    pip install -r requirements.txt
    
  4. 回放入库ODS→ 装载 DWD → 质检(可用 --ingest-source 覆盖 INGEST_SOURCE_DIR
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks MANUAL_INGEST --ingest-source "../tmp/single_ingest"
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_QUALITY_CHECK
    # 报表reports/dwd_quality_report.json
    

可按需单独运行:

  • 仅建表:python -m cli.main --tasks INIT_ODS_SCHEMA
  • 仅 ODS 灌入:python -m cli.main --tasks MANUAL_INGEST
  • 仅 DWD 装载:python -m cli.main --tasks INIT_DWD_SCHEMA,DWD_LOAD_FROM_ODS

Windows可用 etl_billiards/run_ods.bat 一键执行 ODS 建表 + 灌入示例 JSONINIT_ODS_SCHEMA + MANUAL_INGEST)。

正式环境(在线抓取 → 更新 ODS → 更新 DWD

核心入口 CLI推荐在 etl_billiards/ 目录执行)

  • python -m cli.main

必备配置(建议通过环境变量或 .env

  • 数据库:PG_DSNSTORE_ID
  • 在线抓取:API_TOKEN(可选 API_BASEAPI_TIMEOUTAPI_PAGE_SIZEAPI_RETRY_MAX
  • 输出目录(可选):EXPORT_ROOTLOG_ROOTFETCH_ROOT/JSON_FETCH_ROOT

安全提示:建议将数据库/Token 等凭证保存在 .env 或受控秘钥管理中,生产环境使用最小权限账号。

推荐定时方式 A两段定时更清晰

  1. 更新 ODS在线抓取 + 入库FULL
    python -m cli.main \
      --pipeline-flow FULL \
      --tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER \
      --pg-dsn "$PG_DSN" --store-id "$STORE_ID" \
      --api-token "$API_TOKEN"
    
  2. ODS → DWD将新增/变更同步到 DWD
    python -m cli.main \
      --pipeline-flow INGEST_ONLY \
      --tasks DWD_LOAD_FROM_ODS \
      --pg-dsn "$PG_DSN" --store-id "$STORE_ID"
    

推荐定时方式 B一条命令串起来

同一条命令先跑在线抓取/入库任务,再跑 DWD 装载任务:

python -m cli.main \
  --pipeline-flow FULL \
  --tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER,DWD_LOAD_FROM_ODS \
  --pg-dsn "$PG_DSN" --store-id "$STORE_ID" \
  --api-token "$API_TOKEN"

pipeline-flow 说明

  • FULL:在线抓取落盘 + 本地清洗入库ODS 任务会走抓取;DWD_LOAD_FROM_ODS 仅走入库阶段)
  • FETCH_ONLY:仅在线抓取落盘,不入库
  • INGEST_ONLY:仅从本地 JSON 回放入库(适合离线回放/补跑)

目录结构与关键文件

  • 仓库根目录:etl_billiards/ 主代码;app/ 示例 runner开发笔记/ 项目笔记;tmp/ 草稿/调试归档;requirements.txt(仓库根)依赖;run_etl.sh/.bat 启动脚本。
    • 注意:根目录的 run_etl.sh/.bat 运行时要求当前目录为 etl_billiards/(因为入口是 python -m cli.main)。
  • etl_billiards/(主代码目录)
    • .env:本地配置文件(可选,用环境变量也可)
    • cli/CLI 入口(cli/main.py
    • config/defaults.py 默认值;env_parser.py 解析 .env/环境变量;settings.py AppConfig 加载/校验
    • api/client.py HTTP 请求、重试、分页与落盘
    • database/connection.py 连接封装;operations.py 批量 upsertDDLschema_ODS_doc.sqlschema_dwd_doc.sql
    • tasks/业务任务ODS 抓取/回放、DWD 装载、质检等)
    • loaders/ODS/DWD/SCD2 Loader 实现
    • scd/scd2_handler.py(维度 SCD2 历史)
    • quality/:质量检查器(行数/金额对照)
    • orchestration/scheduler.py 调度;task_registry.py 注册;cursor_manager.py 水位管理;run_tracker.py 运行记录
    • scripts/:重建/测试/探活工具
    • docs/:映射/样例/质检说明文档
    • fetch-test/:接口联调/规则验证的一次性脚本与报告(不影响主流程)
    • reports/:质检输出(如 dwd_quality_report.json
    • tests/:单元/集成测试

项目文件索引(维护/AI 快速定位)

说明:用于维护/AI 快速定位文件路径与用途;默认不列出 .git/__pycache__/.pytest_cache/*.pyc 等自动生成内容。

/

  • .gitignoreGit 忽略规则。
  • .gitkeep:占位文件(用于保留空目录)。
  • README.md:项目总览与使用说明(本文档)。
  • requirements.txt:仓库根依赖清单(不含版本约束,建议优先用 etl_billiards/requirements.txt)。
  • run_etl.batWindows 启动脚本(需先 cd etl_billiards;入口为 python -m cli.main)。
  • run_etl.shLinux/macOS 启动脚本(需先 cd etl_billiards;会加载当前目录 .env)。

app/

etl_billiards/

  • etl_billiards/.env:本地运行环境变量(含敏感信息,勿提交/勿外传)。
  • etl_billiards/__init__.pyPython 包标记文件。
  • etl_billiards/ods_row_report.jsonODS 行数对照报告source json vs ODS 表)。
  • etl_billiards/requirements.txtETL 运行依赖(带最低版本约束)。
  • etl_billiards/run_ods.batWindows 一键脚本:重建 ODS 并灌入示例 JSON。
  • etl_billiards/setup.py:打包/安装脚本(当前项目主要按“cd etl_billiards; python -m cli.main”方式运行)。

etl_billiards/api/

  • etl_billiards/api/__init__.pyPython 包标记文件。
  • etl_billiards/api/client.pyAPI客户端统一封装 POST/重试/分页与列表提取逻辑。
  • etl_billiards/api/endpoint_routing.py:“近期记录 / 历史记录(Former)”接口路由规则。
  • etl_billiards/api/local_json_client.py:本地 JSON 客户端,模拟 APIClient 的分页接口,从落盘的 JSON 回放数据。
  • etl_billiards/api/recording_client.py:包装 APIClient将分页响应落盘便于后续本地清洗。

etl_billiards/cli/

  • etl_billiards/cli/__init__.pyPython 包标记文件。
  • etl_billiards/cli/main.pyCLI主入口

etl_billiards/config/

  • etl_billiards/config/__init__.pyPython 包标记文件。
  • etl_billiards/config/defaults.py:配置默认值定义
  • etl_billiards/config/env_parser.py:环境变量解析
  • etl_billiards/config/settings.py:配置管理主类

etl_billiards/database/

  • etl_billiards/database/__init__.pyPython 包标记文件。
  • etl_billiards/database/base.py数据库操作批量、RETURNING支持
  • etl_billiards/database/connection.pyDatabase connection manager with capped connect_timeout.
  • etl_billiards/database/operations.py:数据库批量操作
  • etl_billiards/database/schema_dwd_doc.sqlDWD Schema DDL含字段/注释/口径说明)。
  • etl_billiards/database/schema_etl_admin.sqletl_admin 元数据 Schema DDL任务/水位/运行记录等)。
  • etl_billiards/database/schema_ODS_doc.sqlODS Schema DDL含字段/注释/口径说明)。
  • etl_billiards/database/seed_ods_tasks.sqlSQL 种子脚本:初始化/注册 ODS 任务。
  • etl_billiards/database/seed_scheduler_tasks.sqlSQL 种子脚本:初始化调度任务配置。

etl_billiards/database/Deleded & backup/

  • (本目录无直接文件)

etl_billiards/docs/

  • etl_billiards/docs/dwd_main_tables_dictionary.mdDWD 主表(非 Ex表格说明书
  • etl_billiards/docs/在线抓取更新ODS 然后将更新的ODS内容对应到DWD的更新。.md在线抓取更新ODS 然后将更新的ODS内容对应到DWD的更新。

etl_billiards/fetch-test/

  • etl_billiards/fetch-test/compare_recent_former_endpoints.py:对比“近期记录”与“历史记录(Former)”接口:
  • etl_billiards/fetch-test/README.mdfetch-test
  • etl_billiards/fetch-test/recent_vs_former_report.json:报告/比对输出JSON
  • etl_billiards/fetch-test/recent_vs_former_report.md:近期记录 vs 历史记录(Former) 接口对比报告

etl_billiards/loaders/

  • etl_billiards/loaders/__init__.pyPython 包标记文件。
  • etl_billiards/loaders/base_loader.py:数据加载器基类

etl_billiards/loaders/dimensions/

  • etl_billiards/loaders/dimensions/__init__.pyPython 包标记文件。
  • etl_billiards/loaders/dimensions/assistant.py:助教维度加载器
  • etl_billiards/loaders/dimensions/member.py:会员维度表加载器
  • etl_billiards/loaders/dimensions/package.py:团购/套餐定义加载器
  • etl_billiards/loaders/dimensions/product.py:商品维度 + 价格SCD2 加载器
  • etl_billiards/loaders/dimensions/table.py:台桌维度加载器

etl_billiards/loaders/facts/

  • etl_billiards/loaders/facts/__init__.pyPython 包标记文件。
  • etl_billiards/loaders/facts/assistant_abolish.py:助教作废事实表
  • etl_billiards/loaders/facts/assistant_ledger.py:助教流水事实表
  • etl_billiards/loaders/facts/coupon_usage.py:券核销事实表
  • etl_billiards/loaders/facts/inventory_change.py:库存变动事实表
  • etl_billiards/loaders/facts/order.py:订单事实表加载器
  • etl_billiards/loaders/facts/payment.py:支付事实表加载器
  • etl_billiards/loaders/facts/refund.py:退款事实表加载器
  • etl_billiards/loaders/facts/table_discount.py:台费打折事实表
  • etl_billiards/loaders/facts/ticket.py:小票详情加载器
  • etl_billiards/loaders/facts/topup.py:充值记录事实表

etl_billiards/loaders/ods/

  • etl_billiards/loaders/ods/__init__.pyPython 包标记文件。
  • etl_billiards/loaders/ods/generic.pyGeneric ODS loader that keeps raw payload + primary keys.

etl_billiards/models/

  • etl_billiards/models/__init__.pyPython 包标记文件。
  • etl_billiards/models/parsers.py:数据类型解析器
  • etl_billiards/models/validators.py:数据验证器

etl_billiards/orchestration/

  • etl_billiards/orchestration/__init__.pyPython 包标记文件。
  • etl_billiards/orchestration/cursor_manager.py:游标管理器
  • etl_billiards/orchestration/run_tracker.py:运行记录追踪器
  • etl_billiards/orchestration/scheduler.pyETL 调度:支持在线抓取、离线清洗入库、全流程三种模式。
  • etl_billiards/orchestration/task_registry.py:任务注册表

etl_billiards/quality/

  • etl_billiards/quality/__init__.pyPython 包标记文件。
  • etl_billiards/quality/balance_checker.py:余额一致性检查器
  • etl_billiards/quality/base_checker.py:数据质量检查器基类

etl_billiards/reports/

  • etl_billiards/reports/dwd_quality_report.jsonDWD 质量核对输出(行数/金额对照)。

etl_billiards/scd/

  • etl_billiards/scd/__init__.pyPython 包标记文件。
  • etl_billiards/scd/scd2_handler.pySCD2 (Slowly Changing Dimension Type 2) 处理逻辑

etl_billiards/scripts/

  • etl_billiards/scripts/bootstrap_schema.pyApply the PRD-aligned warehouse schema (ODS/DWD/DWS) to PostgreSQL.
  • etl_billiards/scripts/build_dwd_from_ods.pyPopulate PRD DWD tables from ODS payload snapshots.
  • etl_billiards/scripts/build_dws_order_summary.pyRecompute billiards_dws.dws_order_summary from DWD fact tables.
  • etl_billiards/scripts/check_ods_json_vs_table.pyODS JSON 字段核对脚本:对照当前数据库中的 ODS 表字段,检查示例 JSON默认目录 export/test-json-doc
  • etl_billiards/scripts/check_ods_gaps.pyODS 缺失校验脚本API 主键 vs ODS 主键逐条比对,输出缺失明细样例。
  • etl_billiards/scripts/reload_ods_windowed.pyODS 窗口化补跑脚本:按时间切片重跑 ODS 任务,并可配置窗口粒度与延时。
  • etl_billiards/scripts/rebuild_db_and_run_ods_to_dwd.py:一键重建 ETL 相关 Schema并执行 ODS → DWD。
  • etl_billiards/scripts/rebuild_ods_from_json.py:从本地 JSON 示例目录重建 billiards_ods.* 表,并导入样例数据。
  • etl_billiards/scripts/run_tests.py:灵活的测试执行脚本,可像搭积木一样组合不同参数或预置命令(模式/数据库/归档路径等),
  • etl_billiards/scripts/test_db_connection.pyQuick utility for validating PostgreSQL connectivity (ASCII-only output).
  • etl_billiards/scripts/test_presets.py:测试命令仓库:集中维护 run_tests.py 的常用组合,支持一键执行。

etl_billiards/scripts/Deleded & backup/

  • (本目录无直接文件)

etl_billiards/tasks/

  • etl_billiards/tasks/__init__.pyPython 包标记文件。
  • etl_billiards/tasks/assistant_abolish_task.py:助教作废任务
  • etl_billiards/tasks/assistants_task.py:助教账号任务
  • etl_billiards/tasks/base_dwd_task.pyDWD任务基类
  • etl_billiards/tasks/base_task.pyETL任务基类引入 Extract/Transform/Load 模板方法)
  • etl_billiards/tasks/coupon_usage_task.py:平台券核销任务
  • etl_billiards/tasks/dwd_load_task.pyDWD 装载任务:从 ODS 增量写入 DWD维度 SCD2事实按时间增量
  • etl_billiards/tasks/dwd_quality_task.pyDWD 质量核对任务:按 dwd_quality_check.md 输出行数/金额对照报表。
  • etl_billiards/tasks/init_dwd_schema_task.py:初始化 DWD Schema执行 schema_dwd_doc.sql可选先 DROP SCHEMA。
  • etl_billiards/tasks/init_schema_task.py:任务:初始化运行环境,执行 ODS 与 etl_admin 的 DDL并准备日志/导出目录。
  • etl_billiards/tasks/inventory_change_task.py:库存变更任务
  • etl_billiards/tasks/ledger_task.py:助教流水任务
  • etl_billiards/tasks/manual_ingest_task.py:手工示例数据灌入:按 schema_ODS_doc.sql 的表结构写入 ODS。
  • etl_billiards/tasks/members_dwd_task.pyDWD TaskProcess Member Records from ODS to Dimension Table.
  • etl_billiards/tasks/members_task.py会员ETL任务
  • etl_billiards/tasks/ods_json_archive_task.py:在线抓取 ODS 相关接口并落盘为 JSON用于后续离线回放/入库)。
  • etl_billiards/tasks/ods_tasks.pyODS ingestion tasks.
  • etl_billiards/tasks/orders_task.py订单ETL任务
  • etl_billiards/tasks/packages_task.py:团购/套餐定义任务
  • etl_billiards/tasks/payments_dwd_task.pyDWD TaskProcess Payment Records from ODS to Fact Table.
  • etl_billiards/tasks/payments_task.py支付记录ETL任务
  • etl_billiards/tasks/products_task.py商品档案PRODUCTSETL任务
  • etl_billiards/tasks/refunds_task.py:退款记录任务
  • etl_billiards/tasks/table_discount_task.py:台费折扣任务
  • etl_billiards/tasks/tables_task.py:台桌档案任务
  • etl_billiards/tasks/ticket_dwd_task.pyDWD TaskProcess Ticket Details from ODS to DWD fact tables.
  • etl_billiards/tasks/topups_task.py:充值记录任务

etl_billiards/tasks/dwd/

  • (本目录无直接文件)

etl_billiards/tests/

  • etl_billiards/tests/__init__.pyPython 包标记文件。

etl_billiards/tests/integration/

  • etl_billiards/tests/integration/__init__.pyPython 包标记文件。
  • etl_billiards/tests/integration/test_database.py:数据库集成测试

etl_billiards/tests/unit/

  • etl_billiards/tests/unit/__init__.pyPython 包标记文件。
  • etl_billiards/tests/unit/task_test_utils.pyETL 任务测试的共用辅助模块,涵盖在线/离线模式所需的伪造数据、客户端与配置等工具函数。
  • etl_billiards/tests/unit/test_config.py:配置管理测试
  • etl_billiards/tests/unit/test_endpoint_routing.pyUnit tests for recent/former endpoint routing.
  • etl_billiards/tests/unit/test_etl_tasks_offline.py:离线模式任务测试,通过回放归档 JSON 来验证 T+L 链路可用。
  • etl_billiards/tests/unit/test_etl_tasks_online.py:在线模式下的端到端任务测试,验证所有任务在模拟 API 下能顺利执行。
  • etl_billiards/tests/unit/test_etl_tasks_stages.py:验证 14 个任务的 E/T/L 分阶段调用FakeDB/FakeAPI不访问真实接口或数据库
  • etl_billiards/tests/unit/test_ods_tasks.pyUnit tests for the new ODS ingestion tasks.
  • etl_billiards/tests/unit/test_parsers.py:解析器测试
  • etl_billiards/tests/unit/test_reporting.py:汇总与报告工具的单测。

etl_billiards/utils/

  • etl_billiards/utils/__init__.pyPython 包标记文件。
  • etl_billiards/utils/helpers.py:通用工具函数
  • etl_billiards/utils/json_store.pyJSON 归档/读取的通用工具。
  • etl_billiards/utils/reporting.py:简单的任务结果汇总与格式化工具。

tmp/

  • tmp/20251121-task.txt:历史任务/计划记录(可能存在编码问题)。
  • tmp/doc_extracted.txt:从 DWD 文档抽取的正文(大文本)。
  • tmp/doc_lines.txtDWD 文档按行抽取/对照(文本)。
  • tmp/dwd_tables.jsonDWD 表清单JSON
  • tmp/dwd_tables_full.jsonDWD 表清单(完整版 JSON
  • tmp/hebing.py:临时脚本:按“同名 key”合并目录内 md+json 输出 merged_output.txt。
  • tmp/README_FULL.md:历史/草稿README 详细版(已合并进根 README
  • tmp/rebuild_run_20251214-042115.log:运行日志/调试输出(临时文件)。
  • tmp/rewrite_schema_dwd_doc_comments.py:临时脚本:批量重写 DWD DDL 注释(归档/草稿)。
  • tmp/rewrite_schema_ods_doc_comments.py:临时脚本:批量重写 ODS DDL 注释(归档/草稿)。
  • tmp/schema_dwd.sqlDWD schema 草稿/导出(归档)。
  • tmp/schema_dwd_doc.sqlDWD schema doc 版本(归档)。
  • tmp/schema_ODS_doc copy.sqlODS schema doc 备份(归档)。
  • tmp/schema_ODS_doc.sqlODS schema doc 版本(归档)。
  • tmp/temp_chinese.txt:编码/文本对照测试。
  • tmp/tmp_debug_sql.py:临时脚本:调试 SQL/映射(归档)。
  • tmp/tmp_drop_dwd.py临时脚本DROP SCHEMA billiards_dwd危险勿在生产执行
  • tmp/tmp_dwd_tasks.py:临时脚本:调试 DWD 相关任务(归档)。
  • tmp/tmp_problems.py:临时脚本:问题排查记录/复现(归档)。
  • tmp/tmp_run_sql.py:临时脚本:拼接/执行一条 INSERT...SELECT 验证映射(需 PG_DSN
  • tmp/非球接口API.md:上游接口笔记/汇总(草稿/归档)。

tmp/a/

  • (本目录无直接文件)

tmp/b/

  • (本目录无直接文件)

tmp/etl_billiards_misc/

  • tmp/etl_billiards_misc/0.pySimple PostgreSQL connectivity smoke-checker.
  • tmp/etl_billiards_misc/feiqiu-ETL.code-workspaceVS Code workspace 文件(归档)。
  • tmp/etl_billiards_misc/草稿.txt:草稿/说明(归档)。

tmp/etl_billiards_misc/backups/

  • tmp/etl_billiards_misc/backups/manual_ingest_task.py:历史版本备份(归档)。
  • tmp/etl_billiards_misc/backups/manual_ingest_task.py.bak_20251209:历史版本备份(归档)。
  • tmp/etl_billiards_misc/backups/schema_ODS_doc.sql:历史版本备份(归档)。
  • tmp/etl_billiards_misc/backups/schema_ODS_doc.sql.bak_20251209:历史版本备份(归档)。

tmp/etl_billiards_misc/tmp & Delete/

  • tmp/etl_billiards_misc/tmp & Delete/.env.example:旧示例配置(归档)。
  • tmp/etl_billiards_misc/tmp & Delete/dwd_schema_columns.txtDWD 字段提取/对照文本(归档)。
  • tmp/etl_billiards_misc/tmp & Delete/DWD层设计建议.docxDWD 设计建议文档(归档)。
  • tmp/etl_billiards_misc/tmp & Delete/DWD层设计草稿.mdDWD 设计草稿(归档)。
  • tmp/etl_billiards_misc/tmp & Delete/schema_dwd_doc.sql.bakschema 备份(归档)。
  • tmp/etl_billiards_misc/tmp & Delete/schema_ODS_doc.sql.bakschema 备份(归档)。
  • tmp/etl_billiards_misc/tmp & Delete/schema_ODS_doc.sql.rewrite2.bakschema 重写过程备份(归档)。
  • tmp/etl_billiards_misc/tmp & Delete/schema_v2.sqlschema v2 草稿(归档)。

tmp/recharge_only/

  • tmp/recharge_only/recharge_settlements.json:离线样例 JSON仅充值结算

tmp/single_ingest/

  • tmp/single_ingest/goods_stock_movements.json:离线最小样例 JSON单文件

开发笔记/

  • 开发笔记/记录.md:开发/迁移过程的备忘与待办(归档)。

架构与流程

执行链路(控制流):

  1. CLIcli/main.py)解析参数 → 生成 AppConfig → 初始化日志/DB/API
  2. 调度层(orchestration/scheduler.py)按 task_registry.py 实例化任务,设置 run_uuid、cursor水位、上下文
  3. 任务执行模板:获取时间窗口/水位(cursor_manager.py)→ ExtractAPI 分页/重试或离线读 JSON→ Transform解析/校验)→ LoadLoader 批量 upsert/SCD2/增量写入,底层 database/operations.py)→(可选)质量检查 → 更新水位与运行记录(run_tracker.py),提交/回滚事务。

数据流与依赖:

  • 配置:config/defaults.py + .env/环境变量 + CLI 参数叠加
  • 在线:api/client.py 支撑分页/重试;可落盘 JSONpipeline.fetch_root
  • 离线:manual_ingest_task.pyINGEST_SOURCE_DIR 回放入库
  • DWDdwd_load_task.py 依据 TABLE_MAP/FACT_MAPPINGS 映射装载,维度走 SCD2事实走增量
  • 质检:dwd_quality_task.py 输出 reports/dwd_quality_report.json

ODS → DWD 策略与建模要点

  1. ODS 留底保留源主键、payload、时间/来源信息,便于回溯。
  2. DWD 清洗:维度走 SCD2事实按时间/水位增量;字段类型、单位、枚举标准化,同时保留溯源字段。
  3. 颗粒一致:一张 DWD 表只承载一种业务事件/颗粒,避免混颗粒。
  4. 业务键统一site_id、member_id、table_id、order_settle_id、order_trade_no 等统一命名。
  5. 不过度汇总DWD 只做明细/轻度清洗,聚合留到 DWS/报表。
  6. 去嵌套:数组展开为子表/子行,重复 profile 提炼为维度。
  7. 长期演进:优先加列/加表,减少对已有表结构的破坏。

常用 CLI

cd etl_billiards

# 运行 defaults.py 中的默认任务列表(在线 FULL 流程)
python -m cli.main --pg-dsn "$PG_DSN" --store-id "$STORE_ID" --api-token "$API_TOKEN"

# 运行指定任务
python -m cli.main --tasks INIT_ODS_SCHEMA,MANUAL_INGEST --pipeline-flow INGEST_ONLY --ingest-source "../tmp/single_ingest"

# 覆盖 DSN / API / 输出目录
python -m cli.main --pg-dsn "postgresql://user:pwd@host:5432/db" --store-id 123 --api-token "..." --fetch-root "./json_fetch"

# 试运行(不写库)
python -m cli.main --dry-run --tasks DWD_LOAD_FROM_ODS

窗口切分与补偿

用于 ETL 任务、ODS 缺失校验、数据一致性检查等“带时间窗口”的执行场景。逻辑如下:

  • 仅当传入窗口参数(如 CLI --window-start/--window-end 或脚本 --start/--end)时启用切分。
  • 先对整体窗口前后补偿 N 小时,再按月切分(month 为最大单位)。不需要切分时设为 none
  • 分段窗口将依次执行并汇总结果。

配置项(默认值见 config/defaults.py

  • run.window_split.unitmonth / none(默认 month
  • run.window_split.compensation_hours:整数小时(默认 2

环境变量:

  • WINDOW_SPLIT_UNIT
  • WINDOW_COMPENSATION_HOURS

CLI 参数(覆盖配置):

  • python -m cli.main--window-split-unit--window-compensation-hours
  • scripts/check_ods_gaps.py--window-split-unit--window-compensation-hours
  • scripts/check_data_integrity.py--window-split-unit--window-compensation-hours
  • scripts/reload_ods_windowed.py--window-split-unit--window-compensation-hours

示例(unit=monthcompensation_hours=2

  • 传入窗口:2025/11/10 10:00 - 2026/1/19 10:15
  • 实际处理窗口切分:
    • 2025/11/10 08:00 - 2025/12/01 00:00
    • 2025/12/01 00:00 - 2026/01/01 00:00
    • 2026/01/01 00:00 - 2026/01/19 12:15

测试

说明:仓库未固定 pytest 版本(运行测试需自行安装 pytest)。

cd etl_billiards
pip install pytest

# 单元测试(模拟 API + FakeDB
pytest tests/unit

# 集成测试(需要设置 TEST_DB_DSN
TEST_DB_DSN="postgresql://user:pwd@host:5432/db" pytest tests/integration/test_database.py

# 便捷测试执行器(可选)
python scripts/run_tests.py --suite online -k ORDERS
python scripts/test_db_connection.py --dsn "postgresql://user:pwd@host:5432/db" --query "SELECT 1"

开发与扩展

  • 新任务:在 tasks/ 继承 BaseTask实现 get_task_code/execute,并在 orchestration/task_registry.py 注册。
  • 新 Loader/Checker参考 loaders/quality/,复用批量 upsert/质检接口。
  • 新配置项:在 config/defaults.py 增加默认值,并在 config/env_parser.py 增加环境变量映射(如需要)。

ODS 任务上线指引

  • 元数据/任务注册脚本:
    • etl_billiards/database/seed_ods_tasks.sql
    • etl_billiards/database/seed_scheduler_tasks.sql
  • 确认 etl_admin.etl_task 中已启用所需任务(不同环境需替换 store_id / schema
  • 离线回放/重建 ODS开发/运维):
    cd etl_billiards
    python scripts/rebuild_ods_from_json.py --dsn "$PG_DSN" --json-dir "export/test-json-doc"
    

ODS 表概览(数据路径)

ODS 表名 接口 Path 数据列表路径
assistant_accounts_master /PersonnelManagement/SearchAssistantInfo data.assistantInfos
assistant_service_records /AssistantPerformance/GetOrderAssistantDetails data.orderAssistantDetails
assistant_cancellation_records /AssistantPerformance/GetAbolitionAssistant data.abolitionAssistants
goods_stock_movements /GoodsStockManage/QueryGoodsOutboundReceipt data.queryDeliveryRecordsList
goods_stock_summary /TenantGoods/GetGoodsStockReport data
group_buy_packages /PackageCoupon/QueryPackageCouponList data.packageCouponList
group_buy_redemption_records /Site/GetSiteTableUseDetails data.siteTableUseDetailsList
member_profiles /MemberProfile/GetTenantMemberList data.tenantMemberInfos
member_balance_changes /MemberProfile/GetMemberCardBalanceChange data.tenantMemberCardLogs
member_stored_value_cards /MemberProfile/GetTenantMemberCardList data.tenantMemberCards
payment_transactions /PayLog/GetPayLogListPage data
platform_coupon_redemption_records /Promotion/GetOfflineCouponConsumePageList data
recharge_settlements /Site/GetRechargeSettleList data.settleList
refund_transactions /Order/GetRefundPayLogList data
settlement_records /Site/GetAllOrderSettleList data.settleList
settlement_ticket_details /Order/GetOrderSettleTicketNew 完整 JSON
site_tables_master /Table/GetSiteTables data.siteTables
stock_goods_category_tree /TenantGoodsCategory/QueryPrimarySecondaryCategory data.goodsCategoryList
store_goods_master /TenantGoods/GetGoodsInventoryList data.orderGoodsList
store_goods_sales_records /TenantGoods/GetGoodsSalesList data.orderGoodsLedgers
table_fee_discount_records /Site/GetTaiFeeAdjustList data.taiFeeAdjustInfos
table_fee_transactions /Site/GetSiteTableOrderDetails data.siteTableUseDetailsList
tenant_goods_master /TenantGoods/QueryTenantGoods data.tenantGoodsList

完整字段级映射见 etl_billiards/docs/ 与 ODS/DWD DDL。

当前状态2025-12-09

  • 示例 JSON 已全量灌入DWD 行数与 ODS 对齐。
  • 分类维度已展平大类+子类:dim_goods_category 26 行category_level/leaf 已赋值)。
  • 部分空字段源数据即为空,如需补值请先确认上游。

可精简/归档

  • tmp/tmp/etl_billiards_misc/ 中草稿、旧备份、调试脚本仅供参考,不影响运行。
  • 根级保留必要文件README、requirements、run_etl.*),其余临时文件按需归档至 tmp/

一键更新(推荐)

日常需要把数据从 ODS 更新到最新,并同步刷新 DWD/DWS 时,直接运行一键脚本:

cd etl_billiards
python run_update.py

常用参数:

  • --overlap-seconds 3600:冗余抓取窗口(默认 3600 秒)
  • --dws-rebuild-days 1DWS 回算冗余天数(默认 1 天)
  • --dws-start YYYY-MM-DD --dws-end YYYY-MM-DD:手工指定 DWS 回算日期范围
  • --skip-ods:跳过 ODS 在线抓取(仅跑 DWD/DWS
  • --ods-tasks ODS_PAYMENT,ODS_TABLE_USE,...:只跑指定 ODS 任务
  • --check-ods-gaps:在 ODS 更新完成后执行缺失校验API 主键 vs ODS 主键)
  • --check-ods-overlap-hours 24:缺失校验时,从 ODS 最新截止时间回溯的小时数(默认 24
  • --check-ods-window-days 1:缺失校验 API 窗口粒度(默认 1 天)
  • --check-ods-page-size 200:缺失校验 API 每页大小(默认 200
  • --check-ods-timeout-sec 1800:缺失校验步骤超时秒数(默认 1800
  • --check-ods-task-codes ODS_PAYMENT,ODS_TABLE_USE,...:仅校验指定 ODS 任务

ODS 缺失校验API vs ODS

说明:

  • 校验口径为 ODS 表 MAX(fetched_at) 的最小值,视为“最新一致截止时间”。
  • --from-cutoff 会从该截止时间回溯 N 小时(默认 24 小时)到当前,便于日常增量校验。

全量校验(从 2025-07 至今):

cd etl_billiards
python scripts/check_ods_gaps.py --start 2025-07-01

更新时校验(从 ODS 最新截止时间回溯 24h

cd etl_billiards
python run_update.py --check-ods-gaps

FAQ

  • 字段空值:若映射已存在且源列非空仍为空,再检查上游 JSON维度 SCD2 按全量合并。
  • DSN/路径:确认 PG_DSNSTORE_IDINGEST_SOURCE_DIR 与本地一致。
  • 新增任务:在 tasks/ 实现并注册到 task_registry.py,必要时同步更新 DDL 与映射。
  • 权限/运行:检查网络、账号权限;脚本需执行权限(如 chmod +x run_etl.sh)。

Cutoff截止时间检查

当你需要“上次数据截止到什么时候”“现在应该从哪里开始补跑”时,使用任务 CHECK_CUTOFF

cd etl_billiards
python -m cli.main --pipeline-flow INGEST_ONLY --tasks CHECK_CUTOFF

它会输出:

  • etl_admin.etl_cursor:每个任务的 last_start/last_end/last_run_id(调度游标)
  • ODSDWD_LOAD_FROM_ODS 依赖的各个 billiards_ods.* 表做 MAX(fetched_at)(真实已入库 ODS 的截止)
  • DWD/DWS输出若干关键表的最大业务时间/最大更新时刻,便于快速核对

如果 etl_cursor.last_end 很新,但 ODS 的 MAX(fetched_at) 很旧,通常表示在线抓取没跑通(最常见是 API_TOKEN 过期导致 401

冗余抓取方案(推荐)

为避免边界时间丢数(上游延迟写入、接口分页抖动、窗口切换等),建议在 cutoff 基础上向前追加 1 小时 冗余量:

  • 配置:将 OVERLAP_SECONDS 设为 3600默认 120 秒)
# etl_billiards/.env
OVERLAP_SECONDS=3600

冗余方案的关键点是“重抓不重落”,依靠各层的去重/幂等机制只落新数据:

  • ODS 层:主键/冲突列 UPSERT重复抓取只会 upsert不会重复插入
  • DWD 层:事实表增量插入 + 主键冲突不重复落(重复范围会被跳过),维度表按 SCD2 合并
  • DWS 层:对指定日期窗口先 delete 再 upsert窗口内重算幂等

如果你希望“冗余窗口内的数据发生变更也要覆盖更新”,需要把对应层的冲突策略从 DO NOTHING 调整为 DO UPDATE(当前实现以“只落新数据”为主)。

DWS汇总层入库

本项目已包含 billiards_dws 汇总层(当前提供 dws_order_summary

  1. 初始化 DWS 表结构:
python -m cli.main --pipeline-flow INGEST_ONLY --tasks INIT_DWS_SCHEMA
  1. 生成/刷新汇总表(按窗口重算,建议配合 --window-start/--window-end
python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWS_BUILD_ORDER_SUMMARY \
  --window-start "2025-10-01 00:00:00" \
  --window-end "2025-12-26 23:59:59"
  1. 推荐串联ODS -> DWD -> DWS
# 先跑在线 ODS 抓取(需要有效 API_TOKEN如果出现 401 请更新 token
python -m cli.main --pipeline-flow FULL --tasks ODS_MEMBER,ODS_PAYMENT,ODS_REFUND,ODS_SETTLEMENT_RECORDS

# 再把 ODS 增量同步到 DWD
python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS

# 最后重算 DWS
python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWS_BUILD_ORDER_SUMMARY

日志 (UTF-8)

  • 默认日志目录:etl_billiards/logs/

  • 每次运行都会生成一个带有时间戳的 .log 文件,以便于使用外部工具查看。 常用选项:

  • --log-file 自定义日志路径(覆盖默认值)。

  • --log-dir 自定义日志目录。

  • --log-level 日志级别(INFO/DEBUG)。

  • --no-log-console 禁用控制台日志记录(仅写入文件)。

示例(按月切分 + 前后补偿 2h

cd etl_billiards
python scripts/check_ods_gaps.py --start 2025-07-01 --end 2025-09-30 --window-split-unit month --window-compensation-hours 2 --task-codes ODS_PAYMENT --sleep-per-window-seconds 0.5
python scripts/reload_ods_windowed.py --tasks ODS_PAYMENT,ODS_TABLE_USE --start 2025-07-01 --end 2025-09-30 --window-split-unit month --window-compensation-hours 2 --sleep-seconds 1
python run_update.py --check-ods-gaps --check-ods-window-days 1 --check-ods-sleep-per-window-seconds 0.5
Description
从非球 接口获取数据,处理到入库
Readme 228 MiB
Languages
Python 99.8%
Batchfile 0.1%
PowerShell 0.1%