# 台球场 ETL 系统(模块化版本)合并文档 本文为原多份文档(如 `INDEX.md`、`QUICK_START.md`、`ARCHITECTURE.md`、`MIGRATION_GUIDE.md`、`PROJECT_STRUCTURE.md`、`README.md` 等)的合并版,只保留与**当前项目本身**相关的内容:项目说明、目录结构、架构设计、数据与控制流程、迁移与扩展指南等,不包含修改历史和重构过程描述。 --- ## 1. 项目概述 台球场 ETL 系统是一个面向门店业务的专业 ETL 工程项目,用于从外部业务 API 拉取订单、支付、会员等数据,经过解析、校验、SCD2 处理、质量检查后写入 PostgreSQL 数据库,并支持增量同步和任务运行追踪。 系统采用模块化、分层架构设计,核心特性包括: - 模块化目录结构(配置、数据库、API、模型、加载器、SCD2、质量检查、编排、任务、CLI、工具、测试等分层清晰)。 - 完整的配置管理:默认值 + 环境变量 + CLI 参数多层覆盖。 - 可复用的数据库访问层(连接管理、批量 Upsert 封装)。 - 支持重试与分页的 API 客户端。 - 类型安全的数据解析与校验模块。 - SCD2 维度历史管理。 - 数据质量检查(例如余额一致性检查)。 - 任务编排层统一调度、游标管理与运行追踪。 - 命令行入口统一管理任务执行,支持筛选任务、Dry-run 等模式。 --- ## 2. 快速开始 ### 2.1 环境准备 - Python 版本:建议 3.10+ - 数据库:PostgreSQL - 操作系统:Windows / Linux / macOS 均可 ```bash # 克隆/下载代码后进入项目目录 cd etl_billiards/ ls -la ``` 你会看到下述目录结构的顶层部分(详细见第 4 章): - `config/` - 配置管理 - `database/` - 数据库访问 - `api/` - API 客户端 - `tasks/` - ETL 任务实现 - `cli/` - 命令行入口 - `docs/` - 技术文档 ### 2.2 安装依赖 ```bash pip install -r requirements.txt ``` 主要依赖示例(按实际 `requirements.txt` 为准): - `psycopg2-binary`:PostgreSQL 驱动 - `requests`:HTTP 客户端 - `python-dateutil`:时间处理 - `tzdata`:时区数据 ### 2.3 配置环境变量 复制并修改环境变量模板: ```bash cp .env.example .env # 使用你习惯的编辑器修改 .env ``` `.env` 示例(最小配置): ```bash # 数据库 PG_DSN=postgresql://user:password@localhost:5432/.... # API API_BASE=https://api.example.com API_TOKEN=your_token_here # 门店/应用 STORE_ID=2790685415443269 TIMEZONE=Asia/Taipei # 目录 EXPORT_ROOT=/path/to/export LOG_ROOT=/path/to/logs ``` > 所有配置项的默认值见 `config/defaults.py`,最终生效配置由「默认值 + 环境变量 + CLI 参数」三层叠加。 ### 2.4 运行第一个任务 通过 CLI 入口运行: ```bash # 运行所有任务 python -m cli.main # 仅运行订单任务 python -m cli.main --tasks ORDERS # 运行订单 + 支付 python -m cli.main --tasks ORDERS,PAYMENTS # Windows 使用脚本 run_etl.bat --tasks ORDERS # Linux / macOS 使用脚本 ./run_etl.sh --tasks ORDERS ``` ### 2.5 查看结果 - 日志目录:使用 `LOG_ROOT` 指定,例如 ```bash ls -la C:\dev\LLTQ\export\LOG/ ``` - 导出目录:使用 `EXPORT_ROOT` 指定,例如 ```bash ls -la C:\dev\LLTQ\export\JSON/ ``` --- ## 3. 常用命令与开发工具 ### 3.1 CLI 常用命令 ```bash # 运行所有任务 python -m cli.main # 运行指定任务 python -m cli.main --tasks ORDERS,PAYMENTS,MEMBERS # 使用自定义数据库 python -m cli.main --pg-dsn "postgresql://user:password@host:5432/db" # 使用自定义 API 端点 python -m cli.main --api-base "https://api.example.com" --api-token "..." # 试运行(不写入数据库) python -m cli.main --dry-run --tasks ORDERS ``` ### 3.2 IDE / 代码质量工具(示例:VSCode) `.vscode/settings.json` 示例: ```json { "python.linting.enabled": true, "python.linting.pylintEnabled": true, "python.formatting.provider": "black", "python.testing.pytestEnabled": true } ``` 代码格式化与检查: ```bash pip install black isort pylint black . isort . pylint etl_billiards/ ``` ### 3.3 测试 ```bash # 安装测试依赖(按需) pip install pytest pytest-cov # 运行全部测试 pytest # 仅运行单元测试 pytest tests/unit/ # 生成覆盖率报告 pytest --cov=. --cov-report=html ``` 测试示例(按实际项目为准): - `tests/unit/test_config.py` – 配置管理单元测试 - `tests/unit/test_parsers.py` – 解析器单元测试 - `tests/integration/test_database.py` – 数据库集成测试 #### 3.3.1 测试模式(ONLINE / OFFLINE) - `TEST_MODE=ONLINE`(默认)时,测试会模拟实时 API,完整执行 E/T/L。 - `TEST_MODE=OFFLINE` 时,测试改为从 `TEST_JSON_ARCHIVE_DIR` 指定的归档 JSON 中读取数据,仅做 Transform + Load,适合验证本地归档数据是否仍可回放。 - `TEST_JSON_ARCHIVE_DIR`:离线 JSON 归档目录(示例:`tests/source-data-doc` 或 CI 产出的快照)。 - `TEST_JSON_TEMP_DIR`:测试生成的临时 JSON 输出目录,便于隔离每次运行的数据。 - `TEST_DB_DSN`:可选,若设置则单元测试会连接到此 PostgreSQL DSN,实打实执行写库;留空时测试使用内存伪库,避免依赖数据库。 示例命令: ```bash # 在线模式覆盖所有任务 TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py # 离线模式使用归档 JSON 覆盖所有任务 TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/source-data-doc pytest tests/unit/test_etl_tasks_offline.py # 使用脚本按需组合参数(示例:在线 + 仅订单用例) python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS # 使用脚本连接真实测试库并回放离线模式 python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb # 使用“指令仓库”中的预置命令 python scripts/run_tests.py --preset offline_realdb python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py ``` #### 3.3.2 脚本化测试组合(`run_tests.py` / `test_presets.py`) - `scripts/run_tests.py` 是 pytest 的统一入口:自动把项目根目录加入 `sys.path`,并提供 `--suite online/offline/integration`、`--tests`(自定义路径)、`--mode`、`--db-dsn`、`--json-archive`、`--json-temp`、`--keyword/-k`、`--pytest-args`、`--env KEY=VALUE` 等参数,可以像搭积木一样自由组合; - `--preset foo` 会读取 `scripts/test_presets.py` 内 `PRESETS["foo"]` 的配置,并叠加到当前命令;`--list-presets` 与 `--dry-run` 可用来审阅或仅打印命令; - 直接执行 `python scripts/test_presets.py` 可依次运行 `AUTO_RUN_PRESETS` 中列出的预置;传入 `--preset x --dry-run` 则只打印对应命令。 `test_presets.py` 充当“指令仓库”。每个预置都是一个字典,常用字段解释如下: | 字段 | 作用 | | ---------------------------- | ------------------------------------------------------------------ | | `suite` | 复用 `run_tests.py` 内置套件(online/offline/integration,可多选) | | `tests` | 追加任意 pytest 路径,例如 `tests/unit/test_config.py` | | `mode` | 覆盖 `TEST_MODE`(ONLINE / OFFLINE) | | `db_dsn` | 覆盖 `TEST_DB_DSN`,用于连入真实测试库 | | `json_archive` / `json_temp` | 配置离线 JSON 归档与临时目录 | | `keyword` | 映射到 `pytest -k`,用于关键字过滤 | | `pytest_args` | 附加 pytest 参数,例 `-vv --maxfail=1` | | `env` | 额外环境变量列表,如 `["STORE_ID=123"]` | | `preset_meta` | 说明性文字,便于描述场景 | 示例:`offline_realdb` 预置会设置 `TEST_MODE=OFFLINE`、指定 `tests/source-data-doc` 为归档目录,并通过 `db_dsn` 连到测试库。执行 `python scripts/run_tests.py --preset offline_realdb` 或 `python scripts/test_presets.py --preset offline_realdb` 即可复用该组合,保证本地、CI 与生产回放脚本一致。 #### 3.3.3 数据库连通性快速检查 `python scripts/test_db_connection.py` 提供最轻量的 PostgreSQL 连通性检测:默认使用 `TEST_DB_DSN`(也可传 `--dsn`),尝试连接并执行 `SELECT 1 AS ok`(可通过 `--query` 自定义)。典型用途: ```bash # 读取 .env/环境变量中的 TEST_DB_DSN python scripts/test_db_connection.py # 临时指定 DSN,并检查任务配置表 python scripts/test_db_connection.py --dsn postgresql://user:pwd@host:5432/.... --query "SELECT count(*) FROM etl_admin.etl_task" ``` 脚本返回 0 代表连接与查询成功;若返回非 0,可结合第 8 章“常见问题排查”的数据库章节(网络、防火墙、账号权限等)先定位问题,再运行完整 ETL。 --- ## 4. 项目结构与文件说明 ### 4.1 总体目录结构(树状图) ```text etl_billiards/ │ ├── README.md # 项目总览和使用说明 ├── MIGRATION_GUIDE.md # 从旧版本迁移指南 ├── requirements.txt # Python 依赖列表 ├── setup.py # 项目安装配置 ├── .env.example # 环境变量配置模板 ├── .gitignore # Git 忽略文件配置 ├── run_etl.sh # Linux/Mac 运行脚本 ├── run_etl.bat # Windows 运行脚本 │ ├── config/ # 配置管理模块 │ ├── __init__.py │ ├── defaults.py # 默认配置值定义 │ ├── env_parser.py # 环境变量解析器 │ └── settings.py # 配置管理主类 │ ├── database/ # 数据库访问层 │ ├── __init__.py │ ├── connection.py # 数据库连接管理 │ └── operations.py # 批量操作封装 │ ├── api/ # HTTP API 客户端 │ ├── __init__.py │ └── client.py # API 客户端(重试 + 分页) │ ├── models/ # 数据模型层 │ ├── __init__.py │ ├── parsers.py # 类型解析器 │ └── validators.py # 数据验证器 │ ├── loaders/ # 数据加载器层 │ ├── __init__.py │ ├── base_loader.py # 加载器基类 │ ├── dimensions/ # 维度表加载器 │ │ ├── __init__.py │ │ └── member.py # 会员维度加载器 │ └── facts/ # 事实表加载器 │ ├── __init__.py │ ├── order.py # 订单事实表加载器 │ └── payment.py # 支付记录加载器 │ ├── scd/ # SCD2 处理层 │ ├── __init__.py │ └── scd2_handler.py # SCD2 历史记录处理器 │ ├── quality/ # 数据质量检查层 │ ├── __init__.py │ ├── base_checker.py # 质量检查器基类 │ └── balance_checker.py # 余额一致性检查器 │ ├── orchestration/ # ETL 编排层 │ ├── __init__.py │ ├── scheduler.py # ETL 调度器 │ ├── task_registry.py # 任务注册表(工厂模式) │ ├── cursor_manager.py # 游标管理器 │ └── run_tracker.py # 运行记录追踪器 │ ├── tasks/ # ETL 任务层 │ ├── __init__.py │ ├── base_task.py # 任务基类(模板方法) │ ├── orders_task.py # 订单 ETL 任务 │ ├── payments_task.py # 支付 ETL 任务 │ └── members_task.py # 会员 ETL 任务 │ ├── cli/ # 命令行接口层 │ ├── __init__.py │ └── main.py # CLI 主入口 │ ├── utils/ # 工具函数 │ ├── __init__.py │ └── helpers.py # 通用工具函数 │ ├── tests/ # 测试代码 │ ├── __init__.py │ ├── unit/ # 单元测试 │ │ ├── __init__.py │ │ ├── test_config.py │ │ └── test_parsers.py │ ├── testdata_json/ # 清洗入库用的测试Json文件 │ │ └── XX.json │ └── integration/ # 集成测试 │ ├── __init__.py │ └── test_database.py │ └── docs/ # 文档 └── ARCHITECTURE.md # 架构设计文档 ``` ### 4.2 各模块职责概览 - **config/** - 统一配置入口,支持默认值、环境变量、命令行参数三层覆盖。 - **database/** - 封装 PostgreSQL 连接与批量操作(插入、更新、Upsert 等)。 - **api/** - 对上游业务 API 的 HTTP 调用进行统一封装,支持重试、分页与超时控制。 - **models/** - 提供类型解析器(时间戳、金额、整数等)与业务级数据校验器。 - **loaders/** - 提供事实表与维度表的加载逻辑(包含批量 Upsert、统计写入结果等)。 - **scd/** - 维度型数据的 SCD2 历史管理(有效期、版本标记等)。 - **quality/** - 质量检查策略,例如余额一致性、记录数量对齐等。 - **orchestration/** - 任务调度、任务注册、游标管理(增量窗口)、运行记录追踪。 - **tasks/** - 具体业务任务(订单、支付、会员等),封装了从“取数 → 处理 → 写库 → 记录结果”的完整流程。 - **cli/** - 命令行入口,解析参数并启动调度流程。 - **utils/** - 杂项工具函数。 - **tests/** - 单元测试与集成测试代码。 --- ## 5. 架构设计与流程说明 ### 5.1 分层架构图 ```text ┌─────────────────────────────────────┐ │ CLI 命令行接口 │ <- cli/main.py └─────────────┬───────────────────────┘ │ ┌─────────────▼───────────────────────┐ │ Orchestration 编排层 │ <- orchestration/ │ (Scheduler, TaskRegistry, ...) │ └─────────────┬───────────────────────┘ │ ┌─────────────▼───────────────────────┐ │ Tasks 任务层 │ <- tasks/ │ (OrdersTask, PaymentsTask, ...) │ └───┬─────────┬─────────┬─────────────┘ │ │ │ ▼ ▼ ▼ ┌────────┐ ┌─────┐ ┌──────────┐ │Loaders │ │ SCD │ │ Quality │ <- loaders/, scd/, quality/ └────────┘ └─────┘ └──────────┘ │ ┌───────▼────────┐ │ Models 模型 │ <- models/ └───────┬────────┘ │ ┌───────▼────────┐ │ API 客户端 │ <- api/ └───────┬────────┘ │ ┌───────▼────────┐ │ Database 访问 │ <- database/ └───────┬────────┘ │ ┌───────▼────────┐ │ Config 配置 │ <- config/ └────────────────┘ ``` ### 5.2 各层职责(当前设计) - **CLI 层 (`cli/`)** - 解析命令行参数(指定任务列表、Dry-run、覆盖配置项等)。 - 初始化配置与日志后交由编排层执行。 - **编排层 (`orchestration/`)** - `scheduler.py`:根据配置与 CLI 参数选择需要执行的任务,控制执行顺序和并行策略。 - `task_registry.py`:提供任务注册表,按任务代码创建任务实例(工厂模式)。 - `cursor_manager.py`:管理增量游标(时间窗口 / ID 游标)。 - `run_tracker.py`:记录每次任务运行的状态、统计信息和错误信息。 - **任务层 (`tasks/`)** - `base_task.py`:定义任务执行模板流程(模板方法模式),包括获取窗口、调用上游、解析 / 校验、写库、更新游标等。 - `orders_task.py` / `payments_task.py` / `members_task.py`:实现具体任务逻辑(订单、支付、会员)。 - **加载器 / SCD / 质量层** - `loaders/`:根据目标表封装 Upsert / Insert / Update 逻辑。 - `scd/scd2_handler.py`:为维度表提供 SCD2 历史管理能力。 - `quality/`:执行数据质量检查,如余额对账。 - **模型层 (`models/`)** - `parsers.py`:负责数据类型转换(字符串 → 时间戳、Decimal、int 等)。 - `validators.py`:执行字段级和记录级的数据校验。 - **API 层 (`api/client.py`)** - 封装 HTTP 调用,处理重试、超时及分页。 - **数据库层 (`database/`)** - 管理数据库连接及上下文。 - 提供批量插入 / 更新 / Upsert 操作接口。 - **配置层 (`config/`)** - 定义配置项默认值。 - 解析环境变量并进行类型转换。 - 对外提供统一配置对象。 ### 5.3 设计模式(当前使用) - 工厂模式:任务注册 / 创建(`TaskRegistry`)。 - 模板方法模式:任务执行流程(`BaseTask`)。 - 策略模式:不同 Loader / Checker 实现不同策略。 - 依赖注入:通过构造函数向任务传入 `db`、`api`、`config` 等依赖。 ### 5.4 数据与控制流程 整体流程: 1. CLI 解析参数并加载配置。 2. Scheduler 构建数据库连接、API 客户端等依赖。 3. Scheduler 遍历任务配置,从 `TaskRegistry` 获取任务类并实例化。 4. 每个任务按统一模板执行: - 读取游标 / 时间窗口。 - 调用 API 拉取数据(可分页)。 - 解析、验证数据。 - 通过 Loader 写入数据库(事实表 / 维度表 / SCD2)。 - 执行质量检查。 - 更新游标与运行记录。 5. 所有任务执行完成后,释放连接并退出进程。 ### 5.5 错误处理策略 - 单个任务失败不影响其他任务执行。 - 数据库操作异常自动回滚当前事务。 - API 请求失败时按配置进行重试,超过重试次数记录错误并终止该任务。 - 所有错误被记录到日志和运行追踪表,便于事后排查。 ### 5.6 ODS + DWD 双阶段策略(新增) 为了支撑回溯/重放与后续 DWD 宽表构建,项目新增了 `billiards_ods` Schema 以及一组专门的 ODS 任务/Loader: - **ODS 表**:`billiards_ods.ods_order_settle`、`ods_table_use_detail`、`ods_assistant_ledger`、`ods_assistant_abolish`、`ods_goods_ledger`、`ods_payment`、`ods_refund`、`ods_coupon_verify`、`ods_member`、`ods_member_card`、`ods_package_coupon`、`ods_inventory_stock`、`ods_inventory_change`。每条记录都会保存 `store_id + 源主键 + payload JSON + fetched_at + source_endpoint` 等信息。 - **通用 Loader**:`loaders/ods/generic.py::GenericODSLoader` 统一封装了 `INSERT ... ON CONFLICT ...` 与批量写入逻辑,调用方只需提供列名与主键列即可。 - **ODS 任务**:`tasks/ods_tasks.py` 内通过 `OdsTaskSpec` 定义了一组任务(`ODS_ORDER_SETTLE`、`ODS_PAYMENT`、`ODS_ASSISTANT_LEDGER` 等),并在 `TaskRegistry` 中自动注册,可直接通过 `python -m cli.main --tasks ODS_ORDER_SETTLE,ODS_PAYMENT` 执行。 - **双阶段链路**: 1. 阶段 1(ODS):调用 API/离线归档 JSON,将原始记录写入 ODS 表,保留分页、抓取时间、来源文件等元数据。 2. 阶段 2(DWD/DIM):后续订单、支付、券等事实任务将改为从 ODS 读取 payload,经过解析/校验后写入 `billiards.fact_*`、`dim_*` 表,避免重复拉取上游接口。 > 新增的单元测试 `tests/unit/test_ods_tasks.py` 覆盖了 `ODS_ORDER_SETTLE`、`ODS_PAYMENT` 的入库路径,可作为扩展其他 ODS 任务的模板。 --- ## 6. 迁移指南(从旧脚本到当前项目) 本节用于说明如何从旧的单文件脚本(如 `task_merged.py`)迁移到当前模块化项目,属于当前项目的使用说明,不涉及历史对比细节。 ### 6.1 核心功能映射示意 | 旧版本函数 / 类 | 新版本位置 | 说明 | | --------------------- | ----------------------------------------------------- | ---------- | | `DEFAULTS` 字典 | `config/defaults.py` | 配置默认值 | | `build_config()` | `config/settings.py::AppConfig.load()` | 配置加载 | | `Pg` 类 | `database/connection.py::DatabaseConnection` | 数据库连接 | | `http_get_json()` | `api/client.py::APIClient.get()` | API 请求 | | `paged_get()` | `api/client.py::APIClient.get_paginated()` | 分页请求 | | `parse_ts()` | `models/parsers.py::TypeParser.parse_timestamp()` | 时间解析 | | `upsert_fact_order()` | `loaders/facts/order.py::OrderLoader.upsert_orders()` | 订单加载 | | `scd2_upsert()` | `scd/scd2_handler.py::SCD2Handler.upsert()` | SCD2 处理 | | `run_task_orders()` | `tasks/orders_task.py::OrdersTask.execute()` | 订单任务 | | `main()` | `cli/main.py::main()` | 主入口 | ### 6.2 典型迁移步骤 1. **配置迁移** - 原来在 `DEFAULTS` 或脚本内硬编码的配置,迁移到 `.env` 与 `config/defaults.py`。 - 使用 `AppConfig.load()` 统一获取配置。 2. **并行运行验证** ```bash # 旧脚本 python task_merged.py --tasks ORDERS # 新项目 python -m cli.main --tasks ORDERS ``` 对比新旧版本导出的数据表和日志,确认一致性。 3. **自定义逻辑迁移** - 原脚本中的自定义清洗逻辑 → 放入相应 `loaders/` 或任务类中。 - 自定义任务 → 在 `tasks/` 中实现并在 `task_registry` 中注册。 - 自定义 API 调用 → 扩展 `api/client.py` 或单独封装服务类。 4. **逐步切换** - 先在测试环境并行运行。 - 再逐步切换生产任务到新版本。 --- ## 7. 开发与扩展指南(当前项目) ### 7.1 添加新任务 1. 在 `tasks/` 目录创建任务类: ```python from .base_task import BaseTask class MyTask(BaseTask): def get_task_code(self) -> str: return "MY_TASK" def execute(self) -> dict: # 1. 获取时间窗口 window_start, window_end, _ = self._get_time_window() # 2. 调用 API 获取数据 records, _ = self.api.get_paginated(...) # 3. 解析 / 校验 parsed = [self._parse(r) for r in records] # 4. 加载数据 loader = MyLoader(self.db) inserted, updated, _ = loader.upsert(parsed) # 5. 提交并返回结果 self.db.commit() return self._build_result("SUCCESS", { "inserted": inserted, "updated": updated, }) ``` 2. 在 `orchestration/task_registry.py` 中注册: ```python from tasks.my_task import MyTask default_registry.register("MY_TASK", MyTask) ``` 3. 在任务配置表中启用(示例): ```sql INSERT INTO etl_admin.etl_task (task_code, store_id, enabled) VALUES ('MY_TASK', 123456, TRUE); ``` ### 7.2 添加新加载器 ```python from loaders.base_loader import BaseLoader class MyLoader(BaseLoader): def upsert(self, records: list) -> tuple: sql = "INSERT INTO table_name (...) VALUES (...) ON CONFLICT (...) DO UPDATE SET ... RETURNING (xmax = 0) AS inserted" inserted, updated = self.db.batch_upsert_with_returning( sql, records, page_size=self._batch_size() ) return (inserted, updated, 0) ``` ### 7.3 添加新质量检查器 1. 在 `quality/` 中实现检查器,继承 `base_checker.py`。 2. 在任务或调度流程中调用该检查器,在写库后进行验证。 ### 7.4 类型解析与校验扩展 - 在 `models/parsers.py` 中添加新类型解析方法。 - 在 `models/validators.py` 中添加新规则(如枚举校验、跨字段校验等)。 --- ## 8. 常见问题排查 ### 8.1 数据库连接失败 ```text 错误: could not connect to server ``` 排查要点: - 检查 `PG_DSN` 或相关数据库配置是否正确。 - 确认数据库服务是否启动、网络是否可达。 ### 8.2 API 请求超时 ```text 错误: requests.exceptions.Timeout ``` 排查要点: - 检查 `API_BASE` 地址与网络连通性。 - 适当提高超时与重试次数(在配置中调整)。 ### 8.3 模块导入错误 ```text 错误: ModuleNotFoundError ``` 排查要点: - 确认在项目根目录下运行(包含 `etl_billiards/` 包)。 - 或通过 `pip install -e .` 以可编辑模式安装项目。 ### 8.4 权限相关问题 ```text 错误: Permission denied ``` 排查要点: - 脚本无执行权限:`chmod +x run_etl.sh`。 - Windows 需要以管理员身份运行,或修改日志 / 导出目录权限。 --- ## 9. 使用前检查清单 在正式运行前建议确认: - [ ] 已安装 Python 3.10+。 - [ ] 已执行 `pip install -r requirements.txt`。 - [ ] `.env` 已配置正确(数据库、API、门店 ID、路径等)。 - [ ] PostgreSQL 数据库可连接。 - [ ] API 服务可访问且凭证有效。 - [ ] `LOG_ROOT`、`EXPORT_ROOT` 目录存在且拥有写权限。 --- ## 10. 参考说明 - 本文已合并原有的快速开始、项目结构、架构说明、迁移指南等内容,可作为当前项目的统一说明文档。 - 如需在此基础上拆分多份文档,可按章节拆出,例如「快速开始」「架构设计」「迁移指南」「开发扩展」等。 ## 11. 运行/调试模式说明 - 生产环境仅保留“任务模式”:通过调度/CLI 执行注册的任务(ETL/ODS),不使用调试脚本。 - 开发/调试可使用的辅助脚本(上线前可删除或禁用): - `python -m etl_billiards.scripts.rebuild_ods_from_json`:从本地 JSON 目录重建 `billiards_ods`,用于离线初始化/验证。环境变量:`PG_DSN`(必填)、`JSON_DOC_DIR`(可选,默认 `C:\dev\LLTQ\export\test-json-doc`)、`INCLUDE_FILES`(逗号分隔文件名)、`DROP_SCHEMA_FIRST`(默认 true)。 - 如需在生产环境保留脚本,请在运维手册中明确用途和禁用条件,避免误用。 ## 12. ODS 任务上线指引 - 任务注册:`etl_billiards/database/seed_ods_tasks.sql` 列出了当前启用的 ODS 任务。将其中的 `store_id` 替换为实际门店后执行: ``` psql "$PG_DSN" -f etl_billiards/database/seed_ods_tasks.sql ``` `ON CONFLICT` 会保持 enabled=true,避免重复。 - 调度:确认 `etl_admin.etl_task` 中已启用所需的 ODS 任务(任务代码见 seed 脚本),调度器或 CLI `--tasks` 即可调用。 - 离线回灌:开发环境可用 `rebuild_ods_from_json` 以样例 JSON 初始化 ODS,生产慎用;默认按 `(source_file, record_index)` 去重。 - 测试:`pytest etl_billiards/tests/unit/test_ods_tasks.py` 覆盖核心 ODS 任务;测试时可设置 `ETL_SKIP_DOTENV=1` 跳过本地 .env 读取。 ## 13. ODS 表映射总览 | ODS 表名 | 接口 Path | 数据列表路径 | | ------------------------------------ | ---------------------------------------------------- | ----------------------------- | | `assistant_accounts_master` | `/PersonnelManagement/SearchAssistantInfo` | data.assistantInfos | | `assistant_service_records` | `/AssistantPerformance/GetOrderAssistantDetails` | data.orderAssistantDetails | | `assistant_cancellation_records` | `/AssistantPerformance/GetAbolitionAssistant` | data.abolitionAssistants | | `goods_stock_movements` | `/GoodsStockManage/QueryGoodsOutboundReceipt` | data.queryDeliveryRecordsList | | `goods_stock_summary` | `/TenantGoods/GetGoodsStockReport` | data | | `group_buy_packages` | `/PackageCoupon/QueryPackageCouponList` | data.packageCouponList | | `group_buy_redemption_records` | `/Site/GetSiteTableUseDetails` | data.siteTableUseDetailsList | | `member_profiles` | `/MemberProfile/GetTenantMemberList` | data.tenantMemberInfos | | `member_balance_changes` | `/MemberProfile/GetMemberCardBalanceChange` | data.tenantMemberCardLogs | | `member_stored_value_cards` | `/MemberProfile/GetTenantMemberCardList` | data.tenantMemberCards | | `payment_transactions` | `/PayLog/GetPayLogListPage` | data | | `platform_coupon_redemption_records` | `/Promotion/GetOfflineCouponConsumePageList` | data | | `recharge_settlements` | `/Site/GetRechargeSettleList` | data.settleList | | `refund_transactions` | `/Order/GetRefundPayLogList` | data | | `settlement_records` | `/Site/GetAllOrderSettleList` | data.settleList | | `settlement_ticket_details` | `/Order/GetOrderSettleTicketNew` | (整包原始 JSON) | | `site_tables_master` | `/Table/GetSiteTables` | data.siteTables | | `stock_goods_category_tree` | `/TenantGoodsCategory/QueryPrimarySecondaryCategory` | data.goodsCategoryList | | `store_goods_master` | `/TenantGoods/GetGoodsInventoryList` | data.orderGoodsList | | `store_goods_sales_records` | `/TenantGoods/GetGoodsSalesList` | data.orderGoodsLedgers | | `table_fee_discount_records` | `/Site/GetTaiFeeAdjustList` | data.taiFeeAdjustInfos | | `table_fee_transactions` | `/Site/GetSiteTableOrderDetails` | data.siteTableUseDetailsList | | `tenant_goods_master` | `/TenantGoods/QueryTenantGoods` | data.tenantGoodsList | ## 14. ODS 相关环境变量/默认值 - `.env` / 环境变量: - `JSON_DOC_DIR`:ODS 样例 JSON 目录(开发/回灌用) - `ODS_INCLUDE_FILES`:限定导入的文件名(逗号分隔,不含 .json) - `ODS_DROP_SCHEMA_FIRST`:true/false,是否重建 schema - `ETL_SKIP_DOTENV`:测试/CI 时设为 1 跳过本地 .env 读取 - `config/defaults.py` 中 `ods` 默认值: - `json_doc_dir`: `C:\dev\LLTQ\export\test-json-doc` - `include_files`: `""` - `drop_schema_first`: `True` --- ## 15. DWD 维度 “业务事件” 1. 粒度唯一、原子 - 一张 DWD 表只能有一种业务粒度,比如: - 一条记录 = 一次结账; - 一条记录 = 一段台费流水; - 一条记录 = 一次助教服务; - 一条记录 = 一次会员余额变动。 - 表里面不能又混“订单头”又混“订单行”,不能一部分是“汇总”,一部分是“明细”。 - 一旦粒度确定,所有字段都要跟这个粒度匹配: - 比如“结账头表”就不要塞每一行商品明细; - 商品明细就不要塞整单级别的总金额。 - 这是 DWD 层最重要的一条。 2. 以业务过程建模,不以 JSON 列表建模 - 先画清楚你真实的业务链路: - 开台 / 换台 / 关台 → 台费流水 - 助教上桌 → 助教服务流水 / 废除事件 - 点单 → 商品销售流水 - 充值 / 消费 → 余额变更 / 充值单 - 结账 → 结账头表 + 支付流水 / 退款流水 - 团购 / 平台券 → 核销流水 3. 主键明确、外键统一 - 每张 DWD 表必须有业务主键(哪怕是接口给的 id),不要依赖数据库自增。 - 所有“同一概念”的字段必须统一命名、统一含义: - 门店:统一叫 site_id,都对应 siteProfile.id; - 会员:统一叫 member_id 对应 member_profiles.id,system_member_id 单独一列; - 台桌:统一 table_id 对应 site_tables_master.id; - 结账:统一 order_settle_id; - 订单:统一 order_trade_no 等。 - 否则后面 DWS、AI 要把表拼起来会非常痛苦。 4. 保留明细,不做过度汇总 - DWD 层的事实表原则上只做“明细级”的数据: - 不要在 DWD 就把“日汇总、周汇总、月汇总”算出来,那是 DWS 的事; - 也不要把多个事件折成一行(例如一张表同时放日汇总+单笔流水)。 - 需要聚合时,再在 DWS 做主题宽表: - dws_member_day_profile、dws_site_day_summary 等。 - DWD 只负责细颗粒度的真相。 5. 统一清洗、标准化,但保持可追溯 - 在 DWD 层一定要做的清洗: - 类型转换:字符串时间 → 时间类型,金额统一为 decimal,布尔统一为 0/1; - 单位统一:秒 / 分钟、元 / 分都统一; - 枚举标准化:状态码、类型码在 DWD 里就定死含义,必要时建枚举维表。 - 同时要保证: - 每条 DWD 记录都能追溯回 ODS: - 保留源系统主键; - 保留原始时间 / 原始金额字段(不要覆盖掉)。 6. 扁平化、去嵌套 - JSON 里常见结构是:分页壳 + 头 + 明细数组 + 各种嵌套对象(siteProfile、tableProfile、goodsLedgers…)。 - DWD 的原则是: - 去掉分页壳; - 把“数组”拆成子表(头表 / 行表); - 把重复出现的 profile 抽出去做维度表(门店、台、商品、会员……)。 - 目标是:DWD 表都是二维表结构,不存复杂嵌套 JSON。 7. 模型长期稳定,可扩展 - DWD 的表结构要尽可能稳定,新增需求尽量通过: - 加字段; - 新建事实表 / 维度表; - 在 DWS 做派生指标; - 而不是频繁重构已有 DWD 表结构。 - 这点跟你后面要喂给 LLM 也很相关:AI 配的 prompt、schema 理解都要尽量少改。