Files
feiqiu-ETL/README_FULL.md
2025-12-09 05:43:04 +08:00

12 KiB
Raw Permalink Blame History

飞球 ETL 系统ODS → DWD— 详细版

本文为项目的详细说明,保持与当前代码一致,覆盖 ODS 任务、DWD 装载、质检及开发扩展要点。


1. 项目概览

面向门店业务的 ETL从上游 API 或离线 JSON 采集订单、支付、会员、库存等数据,先落地 ODS,再清洗装载 DWD(含 SCD2 维度、事实增量),并输出质量校验报表。项目采用模块化/分层架构配置、API、数据库、Loader/SCD、质量、调度、CLI、测试统一通过 CLI 调度。


2. 快速开始(离线示例 JSON

环境要求Python 3.10+PostgreSQL.env 关键项:

  • PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test
  • INGEST_SOURCE_DIR=C:\dev\LLTQ\export\test-json-doc

安装依赖

cd etl_billiards
pip install -r requirements.txt

一键 ODS → DWD → 质检(离线回放)

# 初始化 ODS + DWD
python -m etl_billiards.cli.main --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA --pipeline-flow INGEST_ONLY

# 灌入示例 JSON 到 ODS可用 .env 的 INGEST_SOURCE_DIR 覆盖)
python -m etl_billiards.cli.main --tasks MANUAL_INGEST --pipeline-flow INGEST_ONLY --ingest-source "C:\dev\LLTQ\export\test-json-doc"

# 从 ODS 装载 DWD
python -m etl_billiards.cli.main --tasks DWD_LOAD_FROM_ODS --pipeline-flow INGEST_ONLY

# 质量校验报表
python -m etl_billiards.cli.main --tasks DWD_QUALITY_CHECK --pipeline-flow INGEST_ONLY
# 报表输出etl_billiards/reports/dwd_quality_report.json

可按需单独运行:

  • 仅建表:python -m etl_billiards.cli.main --tasks INIT_ODS_SCHEMA
  • 仅 ODS 灌入:python -m etl_billiards.cli.main --tasks MANUAL_INGEST
  • 仅 DWD 装载:python -m etl_billiards.cli.main --tasks INIT_DWD_SCHEMA,DWD_LOAD_FROM_ODS

3. 配置与路径

  • 示例数据目录:C:\dev\LLTQ\export\test-json-doc(可由 .envINGEST_SOURCE_DIR 覆盖)。
  • 日志/导出目录:LOG_ROOTEXPORT_ROOT.env
  • 报表:etl_billiards/reports/dwd_quality_report.json
  • DDLetl_billiards/database/schema_ODS_doc.sqletl_billiards/database/schema_dwd_doc.sql
  • 任务注册:etl_billiards/orchestration/task_registry.py(默认启用 INIT_ODS_SCHEMA、MANUAL_INGEST、INIT_DWD_SCHEMA、DWD_LOAD_FROM_ODS、DWD_QUALITY_CHECK

安全提示:建议将数据库凭证保存在 .env 或受控秘钥管理中,生产环境使用最小权限账号。


4. 目录结构与关键文件

  • 根目录:etl_billiards/ 主代码;requirements.txt 依赖;run_etl.sh/.bat 启动脚本;.env/.env.example 配置;tmp/ 草稿/调试归档。
  • config/defaults.py 默认值,env_parser.py 解析 .envsettings.py AppConfig 统一加载。
  • api/client.py HTTP 请求、重试、分页。
  • database/connection.py 连接封装;operations.py 批量 upsertDDL SQLODS/DWD
  • tasks/
    • init_schema_task.pyINIT_ODS_SCHEMA/INIT_DWD_SCHEMA
    • manual_ingest_task.py(示例 JSON → ODS
    • dwd_load_task.pyODS → DWD 映射、SCD2/事实增量);
    • 其他任务按需扩展。
  • loaders/ODS/DWD/SCD2 Loader 实现。
  • scd/scd2_handler.py 处理维度 SCD2 历史。
  • quality/:质量检查器(行数/金额对照)。
  • orchestration/scheduler.py 调度;task_registry.py 注册;run_tracker.py 运行记录;cursor_manager.py 水位管理。
  • scripts/:重建/测试/探活工具。
  • docs/ods_to_dwd_mapping.md 映射说明;ods_sample_json.md 示例 JSON 说明;dwd_quality_check.md 质检说明。
  • reports/:质检输出(如 dwd_quality_report.json)。
  • tests/:单元/集成测试;utils/:通用工具;backups/:备份(若存在)。

5. 架构与流程

执行链路(控制流):

  1. CLIcli/main.py)解析参数 → 生成 AppConfig → 初始化日志/DB 连接;
  2. 调度层(scheduler.py)按 task_registry.py 中的注册表实例化任务,设置 run_uuid、cursor水位、上下文
  3. 任务基类模板:
    • 获取时间窗口/水位cursor_manager
    • 拉取数据:在线模式调用 api/client.py 支持分页、重试;离线模式直接读取 JSON 文件;
    • 解析与校验:类型转换、必填校验(如任务内部 parse/validate
    • 加载:调用 Loaderloaders/)执行批量 Upsert/SCD2/增量写入(底层用 database/operations.py
    • 质量检查(如需):质量模块对行数/金额等进行对比;
    • 更新水位与运行记录(run_tracker.py),提交/回滚事务。

数据流与依赖:

  • 配置:config/defaults.py + .env + CLI 参数叠加,形成 AppConfig。
  • API 访问:api/client.py 支撑分页/重试;离线 ingest 直接读文件。
  • DB 访问:database/connection.py 提供连接上下文;operations.py 负责批量 upsert/分页写入。
  • ODSmanual_ingest_task.py 读取 JSON → ODS 表(保留 payload/来源/时间戳)。
  • DWDdwd_load_task.py 依据 TABLE_MAP/FACT_MAPPINGS 从 ODS 选取字段;维度走 SCD2scd/scd2_handler.py事实走增量支持字段表达式JSON->>、CAST
  • 质检:quality 模块或相关任务对 ODS/DWD 行数、金额等进行比对,输出 reports/

6. ODS → DWD 策略

  1. ODS 留底保留源主键、payload、时间/来源信息。
  2. DWD 清洗:维度 SCD2事实按时间/水位增量;字段类型、单位、枚举标准化,保留可溯源字段。
  3. 业务键统一site_id、member_id、table_id、order_settle_id、order_trade_no 等统一命名。
  4. 不过度汇总DWD 只做明细/轻度清洗,汇总留待 DWS/报表。
  5. 去嵌套:数组展开为子表/子行,重复 profile 提炼为维度。
  6. 长期演进:优先加列/加表,避免频繁改已有表结构。

7. 常用 CLI

# 运行所有已注册任务
python -m etl_billiards.cli.main
# 运行指定任务
python -m etl_billiards.cli.main --tasks INIT_ODS_SCHEMA,MANUAL_INGEST
# 覆盖 DSN
python -m etl_billiards.cli.main --pg-dsn "postgresql://user:pwd@host:5432/db"
# 覆盖 API
python -m etl_billiards.cli.main --api-base "https://api.example.com" --api-token "..."
# 试运行(不写库)
python -m etl_billiards.cli.main --dry-run --tasks DWD_LOAD_FROM_ODS

8. 测试ONLINE / OFFLINE

  • TEST_MODE=ONLINE:调用真实 API全链路 E/T/L。
  • TEST_MODE=OFFLINE:从 TEST_JSON_ARCHIVE_DIR 读取离线 JSON只做 Transform + Load。
  • TEST_DB_DSN:如设置,则集成测试连真库;未设置用内存/临时库。
    示例:
TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/source-data-doc pytest tests/unit/test_etl_tasks_offline.py
python scripts/test_db_connection.py --dsn postgresql://user:pwd@host:5432/db --query "SELECT 1"

9. 开发与扩展

  • 新任务:在 tasks/ 继承 BaseTask实现 get_task_code/execute,并在 orchestration/task_registry.py 注册。
  • 新 Loader/Checker参考 loaders/quality/ 复用批量 upsert/质检接口。
  • 配置:config/defaults.py + .env + CLI 叠加,新增配置需在 defaults 与 env_parser 中声明。

10. ODS 任务上线指引

  • 任务注册脚本:etl_billiards/database/seed_ods_tasks.sql(替换 store_id 后执行:psql "$PG_DSN" -f ...)。
  • 确认 etl_admin.etl_task 中已启用所需 ODS 任务。
  • 离线回放:可用 scripts/rebuild_ods_from_json(如有)从本地 JSON 重建 ODS。
  • 单测:pytest etl_billiards/tests/unit/test_ods_tasks.py

11. ODS 表概览(数据路径)

ODS 表名 接口 Path 数据列表路径
assistant_accounts_master /PersonnelManagement/SearchAssistantInfo data.assistantInfos
assistant_service_records /AssistantPerformance/GetOrderAssistantDetails data.orderAssistantDetails
assistant_cancellation_records /AssistantPerformance/GetAbolitionAssistant data.abolitionAssistants
goods_stock_movements /GoodsStockManage/QueryGoodsOutboundReceipt data.queryDeliveryRecordsList
goods_stock_summary /TenantGoods/GetGoodsStockReport data
group_buy_packages /PackageCoupon/QueryPackageCouponList data.packageCouponList
group_buy_redemption_records /Site/GetSiteTableUseDetails data.siteTableUseDetailsList
member_profiles /MemberProfile/GetTenantMemberList data.tenantMemberInfos
member_balance_changes /MemberProfile/GetMemberCardBalanceChange data.tenantMemberCardLogs
member_stored_value_cards /MemberProfile/GetTenantMemberCardList data.tenantMemberCards
payment_transactions /PayLog/GetPayLogListPage data
platform_coupon_redemption_records /Promotion/GetOfflineCouponConsumePageList data
recharge_settlements /Site/GetRechargeSettleList data.settleList
refund_transactions /Order/GetRefundPayLogList data
settlement_records /Site/GetAllOrderSettleList data.settleList
settlement_ticket_details /Order/GetOrderSettleTicketNew 完整 JSON
site_tables_master /Table/GetSiteTables data.siteTables
stock_goods_category_tree /TenantGoodsCategory/QueryPrimarySecondaryCategory data.goodsCategoryList
store_goods_master /TenantGoods/GetGoodsInventoryList data.orderGoodsList
store_goods_sales_records /TenantGoods/GetGoodsSalesList data.orderGoodsLedgers
table_fee_discount_records /Site/GetTaiFeeAdjustList data.taiFeeAdjustInfos
table_fee_transactions /Site/GetSiteTableOrderDetails data.siteTableUseDetailsList
tenant_goods_master /TenantGoods/QueryTenantGoods data.tenantGoodsList

完整字段级映射见 docs/ 与 ODS/DWD DDL。


12. DWD 维度与建模要点

  1. 颗粒一致、单一业务键:一张 DWD 表只承载一种业务事件/颗粒,避免混颗粒。
  2. 先理解业务链路,再建模;不要机械按 JSON 列表建表。
  3. 业务键统一site_id、member_id、table_id、order_settle_id、order_trade_no 等必须一致命名。
  4. 保留明细,不过度汇总;聚合留到 DWS/报表。
  5. 清洗标准化同时保留溯源字段源主键、时间、金额、payload
  6. 去嵌套与解耦:数组展开子行,重复 profile 提炼维度。
  7. 演进优先加列/加表,减少对已有表结构的破坏。

13. 当前状态2025-12-09

  • 示例 JSON 已全量灌入DWD 行数与 ODS 对齐。
  • 分类维度已展平大类+子类:dim_goods_category 26 行category_level/leaf 已赋值)。
  • 部分空字段源数据即为空,如需补值请先确认上游。

14. 可精简/归档

  • tmp/tmp/etl_billiards_misc/ 中草稿、旧备份、调试脚本仅供参考,不影响运行。
  • 根级保留必要文件README、requirements、run_etl.*、.env/.env.example其他临时文件已移至 tmp。

15. FAQ

  • 字段空值:若映射已存在且源列非空仍为空,再检查上游 JSON维度 SCD2 按全量合并。
  • DSN/路径:确认 .envPG_DSNINGEST_SOURCE_DIR 与本地一致。
  • 新增任务:在 tasks/ 实现并注册到 task_registry.py,必要时同步更新 DDL 与映射。
  • 权限/运行:检查网络、账号权限;脚本需执行权限(如 chmod +x run_etl.sh)。