飞球 ETL 系统(ODS → DWD)
面向门店业务的 ETL:拉取/或离线灌入上游 JSON,先落地 ODS,再清洗装载 DWD(含 SCD2 维度、事实增量),并提供质量校验报表。
快速运行(离线示例 JSON)
- 环境:Python 3.10+、PostgreSQL;
.env关键项:PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test,INGEST_SOURCE_DIR=C:\dev\LLTQ\export\test-json-doc。 - 安装依赖:
cd etl_billiards pip install -r requirements.txt - 一键 ODS→DWD→质检:
python -m etl_billiards.cli.main --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA --pipeline-flow INGEST_ONLY python -m etl_billiards.cli.main --tasks MANUAL_INGEST --pipeline-flow INGEST_ONLY --ingest-source "C:\dev\LLTQ\export\test-json-doc" python -m etl_billiards.cli.main --tasks DWD_LOAD_FROM_ODS --pipeline-flow INGEST_ONLY python -m etl_billiards.cli.main --tasks DWD_QUALITY_CHECK --pipeline-flow INGEST_ONLY # 报表:etl_billiards/reports/dwd_quality_report.json
目录与文件作用
- 根目录:
etl_billiards/主代码;requirements.txt依赖;run_etl.sh/.bat启动脚本;.env/.env.example配置;tmp/存放草稿/调试/备份。 - etl_billiards/ 主线目录
config/:defaults.py默认值,env_parser.py解析 .env,settings.py统一配置加载。api/:client.pyHTTP 请求、重试与分页。database/:connection.py连接封装,operations.py批量 upsert,DDL:schema_ODS_doc.sql、schema_dwd_doc.sql。tasks/:业务任务init_schema_task.py:INIT_ODS_SCHEMA / INIT_DWD_SCHEMA。manual_ingest_task.py:示例 JSON → ODS。dwd_load_task.py:ODS → DWD(映射、SCD2/事实增量)。- 其他任务按需扩展。
loaders/:ODS/DWD/SCD2 Loader 实现。scd/:scd2_handler.py处理维度 SCD2 历史。quality/:质量检查器(行数/金额对照)。orchestration/:scheduler.py调度;task_registry.py任务注册;run_tracker.py运行记录。scripts/:重建/测试/探活工具。docs/:ods_to_dwd_mapping.md映射说明,ods_sample_json.md示例 JSON 说明,dwd_quality_check.md质检说明。reports/:质检输出(如dwd_quality_report.json)。tests/:单元/集成测试;utils/:通用工具。backups/(若存在):关键文件备份。
业务流程与文件关系
- 调度入口:
cli/main.py解析 CLI →orchestration/scheduler.py依task_registry.py创建任务 → 初始化 DB/API/Config 上下文。 - ODS:
init_schema_task.py执行schema_ODS_doc.sql建表;manual_ingest_task.py从INGEST_SOURCE_DIR读 JSON,批量 upsert ODS。 - DWD:
init_schema_task.py执行schema_dwd_doc.sql建表;dwd_load_task.py依据TABLE_MAP/FACT_MAPPINGS从 ODS 清洗写入 DWD,维度走 SCD2(scd/scd2_handler.py),事实按时间/水位增量。 - 质检:质量任务读取 ODS/DWD,统计行数/金额,输出
reports/dwd_quality_report.json。 - 配置:
config/defaults.py+.env+ CLI 参数叠加;HTTP(如启用在线)走api/client.py;DB 访问走database/connection.py。 - 文档:
docs/ods_to_dwd_mapping.md记录字段映射;docs/ods_sample_json.md描述示例数据结构,便于对照调试。
当前状态(2025-12-09)
- 示例 JSON 全量灌入;DWD 行数与 ODS 对齐。
- 分类维度已展平大类+子类:
dim_goods_category26 行(category_level/leaf 已赋值)。 - 剩余空值多因源数据为空;补值需先确认上游是否提供。
可精简/归档
tmp/、tmp/etl_billiards_misc/中的草稿、旧备份、调试脚本仅供参考,运行不依赖。- 根级保留必要文件(README、requirements、run_etl.*、.env/.env.example),其余临时文件已移至 tmp。
Description
Languages
Python
99.9%