飞球 ETL 系统ODS → DWD → DWS

面向台球门店业务的数据仓库 ETL 管线:从上游 SaaS API 或离线 JSON 采集订单、支付、会员、库存等数据,先落地 ODS,再清洗装载 DWD(含 SCD2 维度、事实增量),汇总至 DWS(助教业绩、财务日报、会员分析、工资计算、自定义指数算法),并输出质量校验报表。

快速开始

工作区根目录:C:\ZQYY\FQ-ETL,所有命令在此目录执行。

  1. 环境Python 3.10+、PostgreSQL。
  2. 配置:编辑 .env(或设环境变量),至少包含:
    STORE_ID=123
    PG_DSN=postgresql://<user>:<password>@<host>:<port>/<db>
    
  3. 安装依赖:
    pip install -r requirements.txt
    
  4. 离线回放入库ODS → DWD → 质检):
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks MANUAL_INGEST --ingest-source "./export/test-json-doc"
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_QUALITY_CHECK
    

Windows 可用 scripts/run_ods.bat 一键执行 ODS 建表 + 灌入示例 JSON。

正式环境(在线抓取 → ODS → DWD

核心入口 CLIpython -m cli.main

必备配置(.env 或环境变量)

  • 数据库:PG_DSNSTORE_ID
  • 在线抓取:API_TOKEN(可选 API_BASEAPI_TIMEOUTAPI_PAGE_SIZE
  • 输出目录(可选):EXPORT_ROOTLOG_ROOTFETCH_ROOT

推荐定时方式

方式 A两段定时

  1. 更新 ODS在线抓取 + 入库):
    python -m cli.main --pipeline-flow FULL \
      --tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER
    
  2. ODS → DWD
    python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS
    

方式 B一条命令

python -m cli.main --pipeline-flow FULL \
  --tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER,DWD_LOAD_FROM_ODS

--data-source 参数

  • online:仅在线抓取(等价于旧 FETCH_ONLY
  • offline:仅离线入库(等价于旧 INGEST_ONLY
  • hybrid:在线抓取 + 离线入库(等价于旧 FULL,默认值)

--pipeline 管道模式

通过 --pipeline 指定预定义管道,配合 --processing-mode 控制流程:

  • increment_only:仅增量 ETL默认
  • verify_only:仅校验
  • increment_verify:增量 + 校验

DWS 层(汇总/财务)

建表与初始化

  • 建表:INIT_DWS_SCHEMA
  • 配置:SEED_DWS_CONFIG
  • 指数参数:database/seed_index_parameters.sqlWBI/NCI/RS/OS/MS/ML

任务调度建议

  • 每小时DWS_ASSISTANT_DAILYDWS_FINANCE_DAILYDWS_FINANCE_INCOME_STRUCTURE
  • 每日DWS_ASSISTANT_MONTHLYDWS_ASSISTANT_CUSTOMERDWS_MEMBER_CONSUMPTIONDWS_MEMBER_VISITDWS_FINANCE_DISCOUNT_DETAILDWS_FINANCE_RECHARGEDWS_ASSISTANT_FINANCE
  • 每2小时DWS_WINBACK_INDEXDWS_NEWCONV_INDEX
  • 每4小时DWS_RELATION_INDEXRS/OS/MS/ML
  • 按需DWS_ML_MANUAL_IMPORTML人工台账导入
  • 每月(月初)DWS_ASSISTANT_SALARY
  • 维护(按需)DWS_RETENTION_CLEANUP
  • 物化刷新(可选)DWS_MV_REFRESH_FINANCE_DAILYDWS_MV_REFRESH_ASSISTANT_DAILY

调度配置保存在 config/scheduled_tasks.jsonGUI 调度器会读取该文件。

指数算法参数cfg_index_parameters

  • 参数表:billiards_dws.cfg_index_parameters
  • 初始化脚本:database/seed_index_parameters.sqlWBI/NCI/RS/OS/MS/ML
  • 公共参数:percentile_lower/upper(分位截断锚点),ewma_alpha(平滑系数)

ML人工台账导入

  • 模板文件:docs/templates/ml_manual_ledger_template.xlsx
  • GUI 路径:任务配置 -> 数据建设 -> ML人工台账导入
  • 导入环境变量:ML_MANUAL_LEDGER_FILE=<xlsx路径>

Excel 导入(支出/平台回款/充值提成)

脚本:scripts/db_admin/import_dws_excel.py

  • 支出结构:--type expense,按月导入
  • 平台回款:--type platform,按回款日期导入
  • 充值提成:--type commission,按月份导入

时间口径

  • 周起始日:周一
  • 月/季度起始:第一天 0 点
  • 环比:对比上一个等长区间

DWS 口径要点

  • 财务/消费类统计统一按 pay_time;来店开始时间保留 create_time
  • 团购实付/优惠按结账日对齐(order_settle_id + pay_time
  • 来店时长按 dwd_table_fee_log.real_table_use_seconds 计算
  • 有效业绩统一过滤 is_delete = 1 的作废记录

物化汇总层(可选)

  • l1=近2天l2=近1月l3=近3月l4=近6月不含本月
  • 刷新任务:DWS_MV_REFRESH_FINANCE_DAILYDWS_MV_REFRESH_ASSISTANT_DAILY
  • 配置:DWS_MV_ENABLEDDWS_MV_LAYERSDWS_MV_TABLES

目录结构

详见 .kiro/steering/structure.md,核心目录:

FQ-ETL/
├── cli/                    # CLI 入口
├── config/                 # 配置默认值、环境变量解析、AppConfig
├── api/                    # API 客户端HTTP、本地 JSON 回放、录制)
├── database/               # 数据库连接、DDL、种子脚本、迁移
├── tasks/                  # ETL 任务ods/ dwd/ dws/ utility/ verification/
├── loaders/                # 数据加载器ods/ dimensions/ facts/
├── scd/                    # SCD2 处理器
├── orchestration/          # 调度器、任务注册表、游标管理、运行记录
├── quality/                # 数据质量检查器
├── models/                 # 解析器与验证器
├── utils/                  # 工具函数
├── gui/                    # PySide6 桌面 GUI
├── scripts/                # 运维脚本audit/ check/ rebuild/ repair/ export/
├── tests/                  # 测试unit/ integration/
├── docs/                   # 文档audit/ bd_manual/ dictionary/ index/ templates/
├── reports/                # 质检输出gitignore
├── export/                 # JSON 落盘gitignore
└── logs/                   # 运行日志gitignore

架构与流程

执行链路(三层架构):

  1. CLI 层cli/main.py):解析参数 → 生成 AppConfig → 依赖注入
  2. 编排层orchestration/pipeline_runner.py):管道名称→层→任务列表解析,processing_mode 控制增量/校验
  3. 执行层orchestration/task_executor.pyDataSource 枚举决定 fetch/ingest 路径,含游标管理、运行记录、失败标记

任务模板ExtractAPI 分页/重试或离线 JSON→ Transform解析/校验)→ Load批量 upsert/SCD2/增量写入)→(可选)质量检查 → 更新水位

窗口切分与补偿

配置项(默认值见 config/defaults.py

  • run.window_split.unitday / week / month / none(默认 day
  • run.window_split.days:默认 10
  • run.window_split.compensation_hours:默认 2

测试

pip install pytest hypothesis

# 全部单元测试
pytest tests/unit

# 集成测试(需要数据库)
TEST_DB_DSN="postgresql://..." pytest tests/integration

开发与扩展

  • 新任务:在 tasks/ 继承 BaseTask,实现 get_task_code/execute,在 orchestration/task_registry.py 注册
  • 新 Loader参考 loaders/,复用批量 upsert 接口
  • 新配置项:在 config/defaults.py 增加默认值,config/env_parser.py 增加环境变量映射

ODS 表概览

ODS 表名 接口 Path 数据路径
assistant_accounts_master /PersonnelManagement/SearchAssistantInfo data.assistantInfos
assistant_service_records /AssistantPerformance/GetOrderAssistantDetails data.orderAssistantDetails
assistant_cancellation_records /AssistantPerformance/GetAbolitionAssistant data.abolitionAssistants
goods_stock_movements /GoodsStockManage/QueryGoodsOutboundReceipt data.queryDeliveryRecordsList
goods_stock_summary /TenantGoods/GetGoodsStockReport data
group_buy_packages /PackageCoupon/QueryPackageCouponList data.packageCouponList
group_buy_redemption_records /Site/GetSiteTableUseDetails data.siteTableUseDetailsList
member_profiles /MemberProfile/GetTenantMemberList data.tenantMemberInfos
member_balance_changes /MemberProfile/GetMemberCardBalanceChange data.tenantMemberCardLogs
member_stored_value_cards /MemberProfile/GetTenantMemberCardList data.tenantMemberCards
payment_transactions /PayLog/GetPayLogListPage data
platform_coupon_redemption_records /Promotion/GetOfflineCouponConsumePageList data
recharge_settlements /Site/GetRechargeSettleList data.settleList
refund_transactions /Order/GetRefundPayLogList data
settlement_records /Site/GetAllOrderSettleList data.settleList
settlement_ticket_details /Site/GetSiteTableUseDetails data.siteTableUseDetailsList
site_tables_master /Table/GetSiteTables data.siteTables
store_goods_master /TenantGoods/QuerySiteGoods data.siteGoodsList
store_goods_sales_records /TenantGoods/QuerySiteGoodsSaleRecord data.siteGoodsSaleRecords
table_fee_discount_records /Site/GetTaiFeeAdjustList data.taiFeeAdjustInfos
table_fee_transactions /Site/GetTaiFeeList data.taiFeeList
tenant_goods_master /TenantGoods/QueryTenantGoods data.tenantGoodsList
stock_goods_category_tree /TenantGoods/QueryGoodsCategoryTree data.goodsCategoryTree
Description
桌球运营 非球 ETL
Readme 1.2 MiB
Languages
Python 99.6%
PLpgSQL 0.3%