2025-12-09 05:43:04 +08:00
2025-11-19 05:05:11 +08:00
2025-12-09 05:43:04 +08:00
2025-12-09 05:43:04 +08:00
2025-11-18 02:28:47 +08:00
2025-11-18 02:28:47 +08:00
2025-12-09 05:43:04 +08:00
2025-12-09 05:43:04 +08:00
2025-11-30 07:19:05 +08:00
2025-11-18 02:28:47 +08:00
2025-11-18 02:28:47 +08:00

飞球 ETL 系统ODS → DWD

面向门店业务的 ETL拉取/或离线灌入上游 JSON先落地 ODS再清洗装载 DWD含 SCD2 维度、事实增量),并提供质量校验报表。

快速运行(离线示例 JSON

  1. 环境Python 3.10+、PostgreSQL.env 关键项:PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-testINGEST_SOURCE_DIR=C:\dev\LLTQ\export\test-json-doc
  2. 安装依赖:
    cd etl_billiards
    pip install -r requirements.txt
    
  3. 一键 ODS→DWD→质检
    python -m etl_billiards.cli.main --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA --pipeline-flow INGEST_ONLY
    python -m etl_billiards.cli.main --tasks MANUAL_INGEST --pipeline-flow INGEST_ONLY --ingest-source "C:\dev\LLTQ\export\test-json-doc"
    python -m etl_billiards.cli.main --tasks DWD_LOAD_FROM_ODS --pipeline-flow INGEST_ONLY
    python -m etl_billiards.cli.main --tasks DWD_QUALITY_CHECK --pipeline-flow INGEST_ONLY
    # 报表etl_billiards/reports/dwd_quality_report.json
    

目录与文件作用

  • 根目录:etl_billiards/ 主代码;requirements.txt 依赖;run_etl.sh/.bat 启动脚本;.env/.env.example 配置;tmp/ 存放草稿/调试/备份。
  • etl_billiards/ 主线目录
    • config/defaults.py 默认值,env_parser.py 解析 .envsettings.py 统一配置加载。
    • api/client.py HTTP 请求、重试与分页。
    • database/connection.py 连接封装,operations.py 批量 upsertDDLschema_ODS_doc.sqlschema_dwd_doc.sql
    • tasks/:业务任务
      • init_schema_task.pyINIT_ODS_SCHEMA / INIT_DWD_SCHEMA。
      • manual_ingest_task.py:示例 JSON → ODS。
      • dwd_load_task.pyODS → DWD映射、SCD2/事实增量)。
      • 其他任务按需扩展。
    • loaders/ODS/DWD/SCD2 Loader 实现。
    • scd/scd2_handler.py 处理维度 SCD2 历史。
    • quality/:质量检查器(行数/金额对照)。
    • orchestration/scheduler.py 调度;task_registry.py 任务注册;run_tracker.py 运行记录。
    • scripts/:重建/测试/探活工具。
    • docs/ods_to_dwd_mapping.md 映射说明,ods_sample_json.md 示例 JSON 说明,dwd_quality_check.md 质检说明。
    • reports/:质检输出(如 dwd_quality_report.json)。
    • tests/:单元/集成测试;utils/:通用工具。
    • backups/(若存在):关键文件备份。

业务流程与文件关系

  1. 调度入口:cli/main.py 解析 CLI → orchestration/scheduler.pytask_registry.py 创建任务 → 初始化 DB/API/Config 上下文。
  2. ODSinit_schema_task.py 执行 schema_ODS_doc.sql 建表;manual_ingest_task.pyINGEST_SOURCE_DIR 读 JSON批量 upsert ODS。
  3. DWDinit_schema_task.py 执行 schema_dwd_doc.sql 建表;dwd_load_task.py 依据 TABLE_MAP/FACT_MAPPINGS 从 ODS 清洗写入 DWD维度走 SCD2scd/scd2_handler.py),事实按时间/水位增量。
  4. 质检:质量任务读取 ODS/DWD统计行数/金额,输出 reports/dwd_quality_report.json
  5. 配置:config/defaults.py + .env + CLI 参数叠加HTTP如启用在线api/client.pyDB 访问走 database/connection.py
  6. 文档:docs/ods_to_dwd_mapping.md 记录字段映射;docs/ods_sample_json.md 描述示例数据结构,便于对照调试。

当前状态2025-12-09

  • 示例 JSON 全量灌入DWD 行数与 ODS 对齐。
  • 分类维度已展平大类+子类:dim_goods_category 26 行category_level/leaf 已赋值)。
  • 剩余空值多因源数据为空;补值需先确认上游是否提供。

可精简/归档

  • tmp/tmp/etl_billiards_misc/ 中的草稿、旧备份、调试脚本仅供参考,运行不依赖。
  • 根级保留必要文件README、requirements、run_etl.*、.env/.env.example其余临时文件已移至 tmp。
Description
从非球 接口获取数据,处理到入库
Readme 1.6 MiB
Languages
Python 99.9%