Files

18 KiB
Raw Permalink Blame History

飞球 ETL 任务说明文档

本文档是飞球 ETL 系统etl-billiards任务说明的总览入口。 系统从上游 SaaS API 抽取台球门店运营数据,经 ODS → DWD → DWS 三层处理后, 输出助教业绩、财务日报、会员分析、工资计算及自定义指数等业务报表。

目录


数据流向

graph LR
    API["上游 SaaS API"] -->|抓取| ODS["ODS<br/>操作数据存储层"]
    JSON["本地 JSON 文件"] -->|手动入库| ODS
    ODS -->|清洗 / SCD2 / 增量| DWD["DWD<br/>明细数据层"]
    DWD -->|聚合 / 计算| DWS["DWS<br/>数据服务层"]
    DWD -->|指数算法| INDEX["INDEX<br/>指数算法层"]
    DWS --> REPORT["业务报表<br/>助教业绩 · 财务日报<br/>会员分析 · 工资计算"]
    INDEX --> REPORT

层级说明:

Schema 职责
ODS ods 保留 API 原始 payload便于回溯
DWD dwd 清洗后的维度表dim_SCD2和事实表fact_ / dwd_*,增量)
DWS dws 按业务维度聚合的汇总统计表
INDEX dws 基于 DWD/DWS 数据计算的自定义业务指数

文档索引

文档 说明
BaseTask 公共机制 任务基类模板方法、TaskContext、时间窗口、注册表、Flow 执行
ODS 层任务 22 个通用 ODS 任务的架构、配置结构、API 端点、目标表
DWD 层任务 DWD_LOAD_FROM_ODS 核心装载、SCD2 处理、质量校验
DWS 层任务 助教业绩、会员分析、财务统计、库存汇总、运维任务共 17 个 DWS 任务
INDEX 层任务 WBI/NCI/RS/SPI 指数算法 + ML 手动台账导入
工具类任务 Schema 初始化、手动入库、归档、截止检查、完整性校验

任务清单

ODS 层(操作数据存储)

通用 ODS 任务OdsTaskSpec 动态注册)

任务代码 Python 类 目标表 简要说明 详情
ODS_ASSISTANT_ACCOUNT OdsAssistantAccountsTask ods.assistant_accounts_master 助教账号档案 查看
ODS_ASSISTANT_LEDGER OdsAssistantLedgerTask ods.assistant_service_records 助教服务流水 查看
ODS_INVENTORY_CHANGE OdsInventoryChangeTask ods.goods_stock_movements 库存变化记录 查看
ODS_INVENTORY_STOCK OdsInventoryStockTask ods.goods_stock_summary 库存汇总 查看
ODS_GROUP_PACKAGE OdsPackageTask ods.group_buy_packages 团购套餐定义 + 详情子流程(通过 detail_endpoint 串行调用 QueryPackageCouponInfo 获取每个团购的详情数据,写入 ods.group_buy_package_details 查看
ODS_GROUP_BUY_REDEMPTION OdsGroupBuyRedemptionTask ods.group_buy_redemption_records 团购套餐核销 查看
ODS_MEMBER OdsMemberTask ods.member_profiles 会员档案 查看
ODS_MEMBER_BALANCE OdsMemberBalanceTask ods.member_balance_changes 会员余额变动 查看
ODS_MEMBER_CARD OdsMemberCardTask ods.member_stored_value_cards 会员储值卡 查看
ODS_PAYMENT OdsPaymentTask ods.payment_transactions 支付流水 查看
ODS_REFUND OdsRefundTask ods.refund_transactions 退款流水 查看
ODS_PLATFORM_COUPON OdsCouponVerifyTask ods.platform_coupon_redemption_records 平台/团购券核销 查看
ODS_RECHARGE_SETTLE OdsRechargeSettleTask ods.recharge_settlements 充值结算 查看
ODS_TABLE_USE OdsTableUseTask ods.table_fee_transactions 台费计费流水 查看
ODS_TABLES OdsTablesTask ods.site_tables_master 台桌维表 查看
ODS_GOODS_CATEGORY OdsGoodsCategoryTask ods.stock_goods_category_tree 库存商品分类 查看
ODS_STORE_GOODS OdsStoreGoodsTask ods.store_goods_master 门店商品档案 查看
ODS_TABLE_FEE_DISCOUNT OdsTableDiscountTask ods.table_fee_discount_records 台费折扣/调账 查看
ODS_STORE_GOODS_SALES OdsGoodsLedgerTask ods.store_goods_sales_records 门店商品销售流水 查看
ODS_TENANT_GOODS OdsTenantGoodsTask ods.tenant_goods_master 租户商品档案 查看
ODS_SETTLEMENT_RECORDS OdsOrderSettleTask ods.settlement_records 结账记录 查看
ODS_STAFF_INFO OdsStaffInfoTask ods.staff_info_master 员工档案(含在职/离职) 查看

DWD 层(明细数据)

任务代码 Python 类 简要说明 详情
DWD_LOAD_FROM_ODS DwdLoadTask 核心装载:遍历 TABLE_MAP维度走 SCD2事实走增量 查看
DWD_QUALITY_CHECK DwdQualityTask ODS 与 DWD 行数/金额核对,输出 JSON 报表 查看

DWS 层(数据服务)

助教业绩域

任务代码 Python 类 目标表 粒度 详情
DWS_ASSISTANT_DAILY AssistantDailyTask dws_assistant_daily_detail 日期+助教 查看
DWS_ASSISTANT_MONTHLY AssistantMonthlyTask dws_assistant_monthly_summary 月份+助教 查看
DWS_ASSISTANT_CUSTOMER AssistantCustomerTask dws_assistant_customer_stats 日期+助教+会员 查看
DWS_ASSISTANT_SALARY AssistantSalaryTask dws_assistant_salary_calc 月份+助教 查看
DWS_ASSISTANT_FINANCE AssistantFinanceTask dws_assistant_finance_analysis 日期+助教 查看
DWS_ASSISTANT_ORDER_CONTRIBUTION AssistantOrderContributionTask dws_assistant_order_contribution 日期+助教 查看

会员分析域

任务代码 Python 类 目标表 粒度 详情
DWS_MEMBER_CONSUMPTION MemberConsumptionTask dws_member_consumption_summary 日期+会员 查看
DWS_MEMBER_VISIT MemberVisitTask dws_member_visit_detail 日期+会员+结账单 查看

财务统计域

任务代码 Python 类 目标表 粒度 详情
DWS_FINANCE_DAILY FinanceDailyTask dws_finance_daily_summary 日期 查看
DWS_FINANCE_RECHARGE FinanceRechargeTask dws_finance_recharge_summary 日期 查看
DWS_FINANCE_INCOME_STRUCTURE FinanceIncomeStructureTask dws_finance_income_structure 日期+收入类型 查看
DWS_FINANCE_DISCOUNT_DETAIL FinanceDiscountDetailTask dws_finance_discount_detail 日期+折扣类型 查看

库存汇总域

任务代码 Python 类 目标表 粒度 详情
DWS_GOODS_STOCK_DAILY GoodsStockDailyTask dws_goods_stock_daily_summary 日期+商品 查看
DWS_GOODS_STOCK_WEEKLY GoodsStockWeeklyTask dws_goods_stock_weekly_summary ISO周+商品 查看
DWS_GOODS_STOCK_MONTHLY GoodsStockMonthlyTask dws_goods_stock_monthly_summary 月份+商品 查看

运维任务

任务代码 Python 类 简要说明 详情
DWS_BUILD_ORDER_SUMMARY DwsBuildOrderSummaryTask 构建订单汇总中间表 查看
DWS_MAINTENANCE DwsMaintenanceTask 统一维护:物化视图刷新 + 历史数据清理 查看

INDEX 层(指数算法)

任务代码 Python 类 目标表 指数类型 详情
DWS_WINBACK_INDEX WinbackIndexTask dws_member_winback_index WBI回流指数 查看
DWS_NEWCONV_INDEX NewconvIndexTask dws_member_newconv_index NCI新客转化指数 查看
DWS_RELATION_INDEX RelationIndexTask dws_relation_index RS关系指数 查看
DWS_ML_MANUAL_IMPORT MlManualImportTask dws_ml_manual_ledger ML手动台账导入 查看
DWS_SPENDING_POWER_INDEX SpendingPowerIndexTask dws_member_spending_power_index SPI消费力指数 查看

工具类 / 校验类

任务代码 Python 类 类型 简要说明 详情
INIT_ODS_SCHEMA InitOdsSchemaTask utility 执行 ODS + meta DDL创建必要目录 查看
INIT_DWD_SCHEMA InitDwdSchemaTask utility 执行 DWD DDL 查看
INIT_DWS_SCHEMA InitDwsSchemaTask utility 执行 DWS DDL 查看
MANUAL_INGEST ManualIngestTask utility 从本地 JSON 文件手动入库到 ODS 查看
ODS_JSON_ARCHIVE OdsJsonArchiveTask utility 归档 ODS JSON 文件 查看
CHECK_CUTOFF CheckCutoffTask utility 检查数据截止时间 查看
SEED_DWS_CONFIG SeedDwsConfigTask utility 初始化 DWS 配置种子数据 查看
DATA_INTEGRITY_CHECK DataIntegrityTask verification 数据完整性校验 查看

Flow执行流程类型

Flow执行流程定义了多层任务的执行顺序。通过 --flow 参数指定 Flow ID--pipeline 作为已弃用别名保留),或通过 --layers 参数自由组合层级,系统自动解析对应层并按拓扑排序后的顺序执行该层的所有已注册任务。

术语说明:Connector(数据源连接器)指对接的上游 SaaS 平台(如飞球),对应 apps/etl/connectors/{connector_name}/Flow(执行流程)指 ETL 任务的处理链路,描述数据从哪一层流到哪一层。

Flow 类型 包含层 说明
api_ods ODS 仅从 API 抓取数据到 ODS
api_ods_dwd ODS → DWD 抓取数据并清洗装载到 DWD
api_full ODS → DWD → DWS → INDEX 全流程:抓取 → 清洗 → 汇总 → 指数
ods_dwd DWD 仅执行 ODS → DWD 清洗装载(不抓取)
dwd_dws DWS 仅执行 DWD → DWS 汇总计算
dwd_dws_index DWS → INDEX 汇总计算 + 指数算法
dwd_index INDEX 仅执行指数算法

Flow 定义位于 orchestration/flow_runner.pyFlowRunner.FLOW_LAYERS--layers 参数接受 ODS、DWD、DWS、INDEX 的任意逗号分隔组合(如 --layers ODS,DWD),与 --flow/--pipeline 互斥。


处理模式

通过 --processing-mode 参数指定,控制 Flow 的执行行为。

模式 说明 适用场景
increment_only 仅增量处理(默认) 日常定时调度,只处理新增/变更数据
verify_only 仅校验并修复,跳过增量 ETL 数据质量巡检、手动修复不一致
increment_verify 先增量处理,再校验并修复 需要确保数据一致性的关键批次

补充参数:

  • --fetch-before-verify:仅在 verify_only 模式下有效,校验前先从 API 获取最新数据
  • --verify-tables:指定仅校验的表名(逗号分隔),用于单表验证

数据源模式

通过 --data-source 参数指定,控制 ODS 层的数据来源。

模式 说明 适用场景
online 仅在线抓取(从 API 获取数据) 正常运行,网络可用
offline 仅本地入库(从 JSON 文件读取) 离线环境、JSON 回放
hybrid 抓取 + 入库(默认) 同时从 API 抓取并处理本地文件

旧参数 --pipeline-flowFULL / FETCH_ONLY / INGEST_ONLY)已弃用,请使用 --data-source


CLI 参数速查表

入口命令:python -m cli.main

基本参数

参数 类型 默认值 说明
--store-id int 门店 ID
--tasks str 任务列表,逗号分隔(传统模式)
--dry-run flag false 试运行,不提交数据库事务

Flow 与模式参数

参数 类型 默认值 说明
--flow choice Flow 类型(见Flow执行流程类型
--pipeline choice [已弃用] --flow 的别名,使用时输出 DeprecationWarning
--layers str ETL 层自由组合逗号分隔ODS,DWD,DWS,INDEX--flow/--pipeline 互斥
--processing-mode choice increment_only 处理模式(见处理模式
--data-source choice hybrid 数据源模式(见数据源模式
--fetch-before-verify flag false 校验前先从 API 获取数据(仅 verify_only
--verify-tables str 仅校验指定表(逗号分隔)

时间窗口参数

参数 类型 默认值 说明
--window-start datetime 固定时间窗口开始(优先级高于游标)
--window-end datetime 固定时间窗口结束
--force-window-override flag false 强制使用 window_start/window_end不走 MAX(fetched_at) 兜底
--window-split choice none 时间窗口切分:none / day / week / month
--window-split-unit str 配置值 窗口切分单位(day/week/month/none
--window-split-days int 配置值 按天切分的天数(1/10/30
--window-compensation-hours int 配置值 窗口前后补偿小时数
--lookback-hours int 24 回溯小时数
--overlap-seconds int 3600 冗余秒数(默认 1 小时)

数据库参数

参数 类型 默认值 说明
--pg-dsn str PostgreSQL DSN 连接串
--pg-host str PostgreSQL 主机
--pg-port int PostgreSQL 端口
--pg-name str PostgreSQL 数据库名
--pg-user str PostgreSQL 用户名
--pg-password str PostgreSQL 密码

API 参数

参数 类型 默认值 说明
--api-base str API 基础 URL
--api-token str API 令牌Bearer Token
--api-timeout int API 超时(秒)
--api-page-size int 分页大小
--api-retry-max int API 重试最大次数

目录与运行参数

参数 类型 默认值 说明
--export-root str 导出根目录
--log-root str 日志根目录
--fetch-root str 抓取 JSON 输出根目录
--ingest-source str 本地清洗入库源目录
--write-pretty-json flag false 抓取 JSON 美化输出
--idle-start str 闲时窗口开始HH:MM
--idle-end str 闲时窗口结束HH:MM
--allow-empty-advance flag false 允许空结果推进窗口
--force-full flag false 强制全量处理:跳过 ODS hash 去重和 DWD 变更对比,无条件写入

已弃用参数

参数 替代方案 说明
--pipeline --flow 功能相同,使用 --pipeline 时输出 DeprecationWarning
--pipeline-flow --data-source FULLhybridFETCH_ONLYonlineINGEST_ONLYoffline

常见命令示例

# 全流程 ETLAPI 抓取 → ODS → DWD → DWS → INDEX
python -m cli.main --flow api_full --pg-dsn "$PG_DSN" --store-id 1 --api-token "$TOKEN"

# 仅抓取 ODS 数据
python -m cli.main --flow api_ods --store-id 1

# ODS → DWD 清洗装载(不抓取 API
python -m cli.main --flow ods_dwd

# 仅执行 DWS 汇总
python -m cli.main --flow dwd_dws

# 仅执行指数算法
python -m cli.main --flow dwd_index

# 使用 --layers 自由组合
python -m cli.main --layers ODS,DWD --store-id 1

# 仅执行 DWS + INDEX 层
python -m cli.main --layers DWS,INDEX

# 指定时间窗口
python -m cli.main --flow api_ods --window-start "2026-02-01" --window-end "2026-02-02"

# 按天切分时间窗口
python -m cli.main --flow api_ods --window-start "2026-01-01" --window-end "2026-02-01" --window-split day

# 传统模式:指定任务列表
python -m cli.main --tasks ODS_PAYMENT,ODS_MEMBER,ODS_SETTLEMENT_RECORDS --store-id 1

# 校验并修复(跳过增量)
python -m cli.main --flow api_full --processing-mode verify_only

# 校验前先从 API 获取数据
python -m cli.main --flow api_full --processing-mode verify_only --fetch-before-verify

# 增量 + 校验
python -m cli.main --flow api_full --processing-mode increment_verify

# 仅校验指定表
python -m cli.main --flow api_full --processing-mode verify_only --verify-tables "dim_member,fact_payment"

# 试运行(不提交)
python -m cli.main --dry-run --tasks DWD_LOAD_FROM_ODS

# Schema 初始化
python -m cli.main --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA,INIT_DWS_SCHEMA

# 手动入库(离线模式)
python -m cli.main --tasks MANUAL_INGEST --data-source offline --ingest-source ./data/json

# DWS 配置种子数据初始化
python -m cli.main --tasks SEED_DWS_CONFIG

# 数据完整性校验
python -m cli.main --tasks DATA_INTEGRITY_CHECK

最后更新日期2026-02-26