飞球 ETL 系统(ODS → DWD → DWS)
面向台球门店业务的数据仓库 ETL 管线:从上游 SaaS API 或离线 JSON 采集订单、支付、会员、库存等数据,先落地 ODS,再清洗装载 DWD(含 SCD2 维度、事实增量),汇总至 DWS(助教业绩、财务日报、会员分析、工资计算、自定义指数算法),并输出质量校验报表。
快速开始
工作区根目录:
C:\ZQYY\FQ-ETL,所有命令在此目录执行。
- 环境:Python 3.10+、PostgreSQL。
- 配置:编辑
.env(或设环境变量),至少包含:STORE_ID=123 PG_DSN=postgresql://<user>:<password>@<host>:<port>/<db> - 安装依赖:
pip install -r requirements.txt - 离线回放入库(ODS → DWD → 质检):
python -m cli.main --pipeline-flow INGEST_ONLY --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA python -m cli.main --pipeline-flow INGEST_ONLY --tasks MANUAL_INGEST --ingest-source "./export/test-json-doc" python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_QUALITY_CHECK
Windows 可用
scripts/run_ods.bat一键执行 ODS 建表 + 灌入示例 JSON。
正式环境(在线抓取 → ODS → DWD)
核心入口 CLI:python -m cli.main
必备配置(.env 或环境变量)
- 数据库:
PG_DSN、STORE_ID - 在线抓取:
API_TOKEN(可选API_BASE、API_TIMEOUT、API_PAGE_SIZE) - 输出目录(可选):
EXPORT_ROOT、LOG_ROOT、FETCH_ROOT
推荐定时方式
方式 A(两段定时)
- 更新 ODS(在线抓取 + 入库):
python -m cli.main --pipeline-flow FULL \ --tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER - ODS → DWD:
python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS
方式 B(一条命令)
python -m cli.main --pipeline-flow FULL \
--tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER,DWD_LOAD_FROM_ODS
--data-source 参数
online:仅在线抓取(等价于旧FETCH_ONLY)offline:仅离线入库(等价于旧INGEST_ONLY)hybrid:在线抓取 + 离线入库(等价于旧FULL,默认值)
--pipeline 管道模式
通过 --pipeline 指定预定义管道,配合 --processing-mode 控制流程:
increment_only:仅增量 ETL(默认)verify_only:仅校验increment_verify:增量 + 校验
DWS 层(汇总/财务)
建表与初始化
- 建表:
INIT_DWS_SCHEMA - 配置:
SEED_DWS_CONFIG - 指数参数:
database/seed_index_parameters.sql(WBI/NCI/RS/OS/MS/ML)
任务调度建议
- 每小时:
DWS_ASSISTANT_DAILY、DWS_FINANCE_DAILY、DWS_FINANCE_INCOME_STRUCTURE - 每日:
DWS_ASSISTANT_MONTHLY、DWS_ASSISTANT_CUSTOMER、DWS_MEMBER_CONSUMPTION、DWS_MEMBER_VISIT、DWS_FINANCE_DISCOUNT_DETAIL、DWS_FINANCE_RECHARGE、DWS_ASSISTANT_FINANCE - 每2小时:
DWS_WINBACK_INDEX、DWS_NEWCONV_INDEX - 每4小时:
DWS_RELATION_INDEX(RS/OS/MS/ML) - 按需:
DWS_ML_MANUAL_IMPORT(ML人工台账导入) - 每月(月初):
DWS_ASSISTANT_SALARY - 维护(按需):
DWS_RETENTION_CLEANUP - 物化刷新(可选):
DWS_MV_REFRESH_FINANCE_DAILY、DWS_MV_REFRESH_ASSISTANT_DAILY
调度配置保存在 config/scheduled_tasks.json,GUI 调度器会读取该文件。
指数算法参数(cfg_index_parameters)
- 参数表:
billiards_dws.cfg_index_parameters - 初始化脚本:
database/seed_index_parameters.sql(WBI/NCI/RS/OS/MS/ML) - 公共参数:
percentile_lower/upper(分位截断锚点),ewma_alpha(平滑系数)
ML人工台账导入
- 模板文件:
docs/templates/ml_manual_ledger_template.xlsx - GUI 路径:
任务配置 -> 数据建设 -> ML人工台账导入 - 导入环境变量:
ML_MANUAL_LEDGER_FILE=<xlsx路径>
Excel 导入(支出/平台回款/充值提成)
脚本:scripts/db_admin/import_dws_excel.py
- 支出结构:
--type expense,按月导入 - 平台回款:
--type platform,按回款日期导入 - 充值提成:
--type commission,按月份导入
时间口径
- 周起始日:周一
- 月/季度起始:第一天 0 点
- 环比:对比上一个等长区间
DWS 口径要点
- 财务/消费类统计统一按
pay_time;来店开始时间保留create_time - 团购实付/优惠按结账日对齐(
order_settle_id+pay_time) - 来店时长按
dwd_table_fee_log.real_table_use_seconds计算 - 有效业绩统一过滤
is_delete = 1的作废记录
物化汇总层(可选)
l1=近2天,l2=近1月,l3=近3月,l4=近6月(不含本月)- 刷新任务:
DWS_MV_REFRESH_FINANCE_DAILY、DWS_MV_REFRESH_ASSISTANT_DAILY - 配置:
DWS_MV_ENABLED、DWS_MV_LAYERS、DWS_MV_TABLES等
目录结构
详见 .kiro/steering/structure.md,核心目录:
FQ-ETL/
├── cli/ # CLI 入口
├── config/ # 配置(默认值、环境变量解析、AppConfig)
├── api/ # API 客户端(HTTP、本地 JSON 回放、录制)
├── database/ # 数据库连接、DDL、种子脚本、迁移
├── tasks/ # ETL 任务(ods/ dwd/ dws/ utility/ verification/)
├── loaders/ # 数据加载器(ods/ dimensions/ facts/)
├── scd/ # SCD2 处理器
├── orchestration/ # 调度器、任务注册表、游标管理、运行记录
├── quality/ # 数据质量检查器
├── models/ # 解析器与验证器
├── utils/ # 工具函数
├── gui/ # PySide6 桌面 GUI
├── scripts/ # 运维脚本(audit/ check/ rebuild/ repair/ export/)
├── tests/ # 测试(unit/ integration/)
├── docs/ # 文档(audit/ bd_manual/ dictionary/ index/ templates/)
├── reports/ # 质检输出(gitignore)
├── export/ # JSON 落盘(gitignore)
└── logs/ # 运行日志(gitignore)
架构与流程
执行链路(三层架构):
- CLI 层(
cli/main.py):解析参数 → 生成 AppConfig → 依赖注入 - 编排层(
orchestration/pipeline_runner.py):管道名称→层→任务列表解析,processing_mode控制增量/校验 - 执行层(
orchestration/task_executor.py):DataSource枚举决定 fetch/ingest 路径,含游标管理、运行记录、失败标记
任务模板:Extract(API 分页/重试或离线 JSON)→ Transform(解析/校验)→ Load(批量 upsert/SCD2/增量写入)→(可选)质量检查 → 更新水位
窗口切分与补偿
配置项(默认值见 config/defaults.py):
run.window_split.unit:day/week/month/none(默认day)run.window_split.days:默认10run.window_split.compensation_hours:默认2
测试
pip install pytest hypothesis
# 全部单元测试
pytest tests/unit
# 集成测试(需要数据库)
TEST_DB_DSN="postgresql://..." pytest tests/integration
开发与扩展
- 新任务:在
tasks/继承BaseTask,实现get_task_code/execute,在orchestration/task_registry.py注册 - 新 Loader:参考
loaders/,复用批量 upsert 接口 - 新配置项:在
config/defaults.py增加默认值,config/env_parser.py增加环境变量映射
ODS 表概览
| ODS 表名 | 接口 Path | 数据路径 |
|---|---|---|
| assistant_accounts_master | /PersonnelManagement/SearchAssistantInfo | data.assistantInfos |
| assistant_service_records | /AssistantPerformance/GetOrderAssistantDetails | data.orderAssistantDetails |
| assistant_cancellation_records | /AssistantPerformance/GetAbolitionAssistant | data.abolitionAssistants |
| goods_stock_movements | /GoodsStockManage/QueryGoodsOutboundReceipt | data.queryDeliveryRecordsList |
| goods_stock_summary | /TenantGoods/GetGoodsStockReport | data |
| group_buy_packages | /PackageCoupon/QueryPackageCouponList | data.packageCouponList |
| group_buy_redemption_records | /Site/GetSiteTableUseDetails | data.siteTableUseDetailsList |
| member_profiles | /MemberProfile/GetTenantMemberList | data.tenantMemberInfos |
| member_balance_changes | /MemberProfile/GetMemberCardBalanceChange | data.tenantMemberCardLogs |
| member_stored_value_cards | /MemberProfile/GetTenantMemberCardList | data.tenantMemberCards |
| payment_transactions | /PayLog/GetPayLogListPage | data |
| platform_coupon_redemption_records | /Promotion/GetOfflineCouponConsumePageList | data |
| recharge_settlements | /Site/GetRechargeSettleList | data.settleList |
| refund_transactions | /Order/GetRefundPayLogList | data |
| settlement_records | /Site/GetAllOrderSettleList | data.settleList |
| settlement_ticket_details | /Site/GetSiteTableUseDetails | data.siteTableUseDetailsList |
| site_tables_master | /Table/GetSiteTables | data.siteTables |
| store_goods_master | /TenantGoods/QuerySiteGoods | data.siteGoodsList |
| store_goods_sales_records | /TenantGoods/QuerySiteGoodsSaleRecord | data.siteGoodsSaleRecords |
| table_fee_discount_records | /Site/GetTaiFeeAdjustList | data.taiFeeAdjustInfos |
| table_fee_transactions | /Site/GetTaiFeeList | data.taiFeeList |
| tenant_goods_master | /TenantGoods/QueryTenantGoods | data.tenantGoodsList |
| stock_goods_category_tree | /TenantGoods/QueryGoodsCategoryTree | data.goodsCategoryTree |
Description
Languages
Python
99.6%
PLpgSQL
0.3%