Files
feiqiu-ETL/etl_billiards/README.md
2025-11-30 07:19:05 +08:00

838 lines
35 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 台球场 ETL 系统(模块化版本)合并文档
本文为原多份文档(如 `INDEX.md``QUICK_START.md``ARCHITECTURE.md``MIGRATION_GUIDE.md``PROJECT_STRUCTURE.md``README.md` 等)的合并版,只保留与**当前项目本身**相关的内容:项目说明、目录结构、架构设计、数据与控制流程、迁移与扩展指南等,不包含修改历史和重构过程描述。
---
## 1. 项目概述
台球场 ETL 系统是一个面向门店业务的专业 ETL 工程项目,用于从外部业务 API 拉取订单、支付、会员等数据经过解析、校验、SCD2 处理、质量检查后写入 PostgreSQL 数据库,并支持增量同步和任务运行追踪。
系统采用模块化、分层架构设计,核心特性包括:
- 模块化目录结构配置、数据库、API、模型、加载器、SCD2、质量检查、编排、任务、CLI、工具、测试等分层清晰
- 完整的配置管理:默认值 + 环境变量 + CLI 参数多层覆盖。
- 可复用的数据库访问层(连接管理、批量 Upsert 封装)。
- 支持重试与分页的 API 客户端。
- 类型安全的数据解析与校验模块。
- SCD2 维度历史管理。
- 数据质量检查(例如余额一致性检查)。
- 任务编排层统一调度、游标管理与运行追踪。
- 命令行入口统一管理任务执行支持筛选任务、Dry-run 等模式。
---
## 2. 快速开始
### 2.1 环境准备
- Python 版本:建议 3.10+
- 数据库PostgreSQL
- 操作系统Windows / Linux / macOS 均可
```bash
# 克隆/下载代码后进入项目目录
cd etl_billiards/
ls -la
```
你会看到下述目录结构的顶层部分(详细见第 4 章):
- `config/` - 配置管理
- `database/` - 数据库访问
- `api/` - API 客户端
- `tasks/` - ETL 任务实现
- `cli/` - 命令行入口
- `docs/` - 技术文档
### 2.2 安装依赖
```bash
pip install -r requirements.txt
```
主要依赖示例(按实际 `requirements.txt` 为准):
- `psycopg2-binary`PostgreSQL 驱动
- `requests`HTTP 客户端
- `python-dateutil`:时间处理
- `tzdata`:时区数据
### 2.3 配置环境变量
复制并修改环境变量模板:
```bash
cp .env.example .env
# 使用你习惯的编辑器修改 .env
```
`.env` 示例(最小配置):
```bash
# 数据库
PG_DSN=postgresql://user:password@localhost:5432/....
# API
API_BASE=https://api.example.com
API_TOKEN=your_token_here
# 门店/应用
STORE_ID=2790685415443269
TIMEZONE=Asia/Taipei
# 目录
EXPORT_ROOT=/path/to/export
LOG_ROOT=/path/to/logs
```
> 所有配置项的默认值见 `config/defaults.py`,最终生效配置由「默认值 + 环境变量 + CLI 参数」三层叠加。
### 2.4 运行第一个任务
通过 CLI 入口运行:
```bash
# 运行所有任务
python -m cli.main
# 仅运行订单任务
python -m cli.main --tasks ORDERS
# 运行订单 + 支付
python -m cli.main --tasks ORDERS,PAYMENTS
# Windows 使用脚本
run_etl.bat --tasks ORDERS
# Linux / macOS 使用脚本
./run_etl.sh --tasks ORDERS
```
### 2.5 查看结果
- 日志目录:使用 `LOG_ROOT` 指定,例如
```bash
ls -la C:\dev\LLTQ\export\LOG/
```
- 导出目录:使用 `EXPORT_ROOT` 指定,例如
```bash
ls -la C:\dev\LLTQ\export\JSON/
```
---
## 3. 常用命令与开发工具
### 3.1 CLI 常用命令
```bash
# 运行所有任务
python -m cli.main
# 运行指定任务
python -m cli.main --tasks ORDERS,PAYMENTS,MEMBERS
# 使用自定义数据库
python -m cli.main --pg-dsn "postgresql://user:password@host:5432/db"
# 使用自定义 API 端点
python -m cli.main --api-base "https://api.example.com" --api-token "..."
# 试运行(不写入数据库)
python -m cli.main --dry-run --tasks ORDERS
```
### 3.2 IDE / 代码质量工具示例VSCode
`.vscode/settings.json` 示例:
```json
{
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.formatting.provider": "black",
"python.testing.pytestEnabled": true
}
```
代码格式化与检查:
```bash
pip install black isort pylint
black .
isort .
pylint etl_billiards/
```
### 3.3 测试
```bash
# 安装测试依赖(按需)
pip install pytest pytest-cov
# 运行全部测试
pytest
# 仅运行单元测试
pytest tests/unit/
# 生成覆盖率报告
pytest --cov=. --cov-report=html
```
测试示例(按实际项目为准):
- `tests/unit/test_config.py` 配置管理单元测试
- `tests/unit/test_parsers.py` 解析器单元测试
- `tests/integration/test_database.py` 数据库集成测试
#### 3.3.1 测试模式ONLINE / OFFLINE
- `TEST_MODE=ONLINE`(默认)时,测试会模拟实时 API完整执行 E/T/L。
- `TEST_MODE=OFFLINE` 时,测试改为从 `TEST_JSON_ARCHIVE_DIR` 指定的归档 JSON 中读取数据,仅做 Transform + Load适合验证本地归档数据是否仍可回放。
- `TEST_JSON_ARCHIVE_DIR`:离线 JSON 归档目录(示例:`tests/source-data-doc` 或 CI 产出的快照)。
- `TEST_JSON_TEMP_DIR`:测试生成的临时 JSON 输出目录,便于隔离每次运行的数据。
- `TEST_DB_DSN`:可选,若设置则单元测试会连接到此 PostgreSQL DSN实打实执行写库留空时测试使用内存伪库避免依赖数据库。
示例命令:
```bash
# 在线模式覆盖所有任务
TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py
# 离线模式使用归档 JSON 覆盖所有任务
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/source-data-doc pytest tests/unit/test_etl_tasks_offline.py
# 使用脚本按需组合参数(示例:在线 + 仅订单用例)
python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS
# 使用脚本连接真实测试库并回放离线模式
python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb
# 使用“指令仓库”中的预置命令
python scripts/run_tests.py --preset offline_realdb
python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py
```
#### 3.3.2 脚本化测试组合(`run_tests.py` / `test_presets.py`
- `scripts/run_tests.py` 是 pytest 的统一入口:自动把项目根目录加入 `sys.path`,并提供 `--suite online/offline/integration`、`--tests`(自定义路径)、`--mode`、`--db-dsn`、`--json-archive`、`--json-temp`、`--keyword/-k`、`--pytest-args`、`--env KEY=VALUE` 等参数,可以像搭积木一样自由组合;
- `--preset foo` 会读取 `scripts/test_presets.py` 内 `PRESETS["foo"]` 的配置,并叠加到当前命令;`--list-presets` 与 `--dry-run` 可用来审阅或仅打印命令;
- 直接执行 `python scripts/test_presets.py` 可依次运行 `AUTO_RUN_PRESETS` 中列出的预置;传入 `--preset x --dry-run` 则只打印对应命令。
`test_presets.py` 充当“指令仓库”。每个预置都是一个字典,常用字段解释如下:
| 字段 | 作用 |
| ---------------------------- | ------------------------------------------------------------------ |
| `suite` | 复用 `run_tests.py` 内置套件online/offline/integration可多选 |
| `tests` | 追加任意 pytest 路径,例如 `tests/unit/test_config.py` |
| `mode` | 覆盖 `TEST_MODE`ONLINE / OFFLINE |
| `db_dsn` | 覆盖 `TEST_DB_DSN`,用于连入真实测试库 |
| `json_archive` / `json_temp` | 配置离线 JSON 归档与临时目录 |
| `keyword` | 映射到 `pytest -k`,用于关键字过滤 |
| `pytest_args` | 附加 pytest 参数,例 `-vv --maxfail=1` |
| `env` | 额外环境变量列表,如 `["STORE_ID=123"]` |
| `preset_meta` | 说明性文字,便于描述场景 |
示例:`offline_realdb` 预置会设置 `TEST_MODE=OFFLINE`、指定 `tests/source-data-doc` 为归档目录,并通过 `db_dsn` 连到测试库。执行 `python scripts/run_tests.py --preset offline_realdb` 或 `python scripts/test_presets.py --preset offline_realdb` 即可复用该组合保证本地、CI 与生产回放脚本一致。
#### 3.3.3 数据库连通性快速检查
`python scripts/test_db_connection.py` 提供最轻量的 PostgreSQL 连通性检测:默认使用 `TEST_DB_DSN`(也可传 `--dsn`),尝试连接并执行 `SELECT 1 AS ok`(可通过 `--query` 自定义)。典型用途:
```bash
# 读取 .env/环境变量中的 TEST_DB_DSN
python scripts/test_db_connection.py
# 临时指定 DSN并检查任务配置表
python scripts/test_db_connection.py --dsn postgresql://user:pwd@host:5432/.... --query "SELECT count(*) FROM etl_admin.etl_task"
```
脚本返回 0 代表连接与查询成功;若返回非 0可结合第 8 章“常见问题排查”的数据库章节(网络、防火墙、账号权限等)先定位问题,再运行完整 ETL。
---
## 4. 项目结构与文件说明
### 4.1 总体目录结构(树状图)
```text
etl_billiards/
├── README.md # 项目总览和使用说明
├── MIGRATION_GUIDE.md # 从旧版本迁移指南
├── requirements.txt # Python 依赖列表
├── setup.py # 项目安装配置
├── .env.example # 环境变量配置模板
├── .gitignore # Git 忽略文件配置
├── run_etl.sh # Linux/Mac 运行脚本
├── run_etl.bat # Windows 运行脚本
├── config/ # 配置管理模块
│ ├── __init__.py
│ ├── defaults.py # 默认配置值定义
│ ├── env_parser.py # 环境变量解析器
│ └── settings.py # 配置管理主类
├── database/ # 数据库访问层
│ ├── __init__.py
│ ├── connection.py # 数据库连接管理
│ └── operations.py # 批量操作封装
├── api/ # HTTP API 客户端
│ ├── __init__.py
│ └── client.py # API 客户端(重试 + 分页)
├── models/ # 数据模型层
│ ├── __init__.py
│ ├── parsers.py # 类型解析器
│ └── validators.py # 数据验证器
├── loaders/ # 数据加载器层
│ ├── __init__.py
│ ├── base_loader.py # 加载器基类
│ ├── dimensions/ # 维度表加载器
│ │ ├── __init__.py
│ │ └── member.py # 会员维度加载器
│ └── facts/ # 事实表加载器
│ ├── __init__.py
│ ├── order.py # 订单事实表加载器
│ └── payment.py # 支付记录加载器
├── scd/ # SCD2 处理层
│ ├── __init__.py
│ └── scd2_handler.py # SCD2 历史记录处理器
├── quality/ # 数据质量检查层
│ ├── __init__.py
│ ├── base_checker.py # 质量检查器基类
│ └── balance_checker.py # 余额一致性检查器
├── orchestration/ # ETL 编排层
│ ├── __init__.py
│ ├── scheduler.py # ETL 调度器
│ ├── task_registry.py # 任务注册表(工厂模式)
│ ├── cursor_manager.py # 游标管理器
│ └── run_tracker.py # 运行记录追踪器
├── tasks/ # ETL 任务层
│ ├── __init__.py
│ ├── base_task.py # 任务基类(模板方法)
│ ├── orders_task.py # 订单 ETL 任务
│ ├── payments_task.py # 支付 ETL 任务
│ └── members_task.py # 会员 ETL 任务
├── cli/ # 命令行接口层
│ ├── __init__.py
│ └── main.py # CLI 主入口
├── utils/ # 工具函数
│ ├── __init__.py
│ └── helpers.py # 通用工具函数
├── tests/ # 测试代码
│ ├── __init__.py
│ ├── unit/ # 单元测试
│ │ ├── __init__.py
│ │ ├── test_config.py
│ │ └── test_parsers.py
│ ├── testdata_json/ # 清洗入库用的测试Json文件
│ │ └── XX.json
│ └── integration/ # 集成测试
│ ├── __init__.py
│ └── test_database.py
└── docs/ # 文档
└── ARCHITECTURE.md # 架构设计文档
```
### 4.2 各模块职责概览
- **config/**
- 统一配置入口,支持默认值、环境变量、命令行参数三层覆盖。
- **database/**
- 封装 PostgreSQL 连接与批量操作插入、更新、Upsert 等)。
- **api/**
- 对上游业务 API 的 HTTP 调用进行统一封装,支持重试、分页与超时控制。
- **models/**
- 提供类型解析器(时间戳、金额、整数等)与业务级数据校验器。
- **loaders/**
- 提供事实表与维度表的加载逻辑(包含批量 Upsert、统计写入结果等
- **scd/**
- 维度型数据的 SCD2 历史管理(有效期、版本标记等)。
- **quality/**
- 质量检查策略,例如余额一致性、记录数量对齐等。
- **orchestration/**
- 任务调度、任务注册、游标管理(增量窗口)、运行记录追踪。
- **tasks/**
- 具体业务任务(订单、支付、会员等),封装了从“取数 → 处理 → 写库 → 记录结果”的完整流程。
- **cli/**
- 命令行入口,解析参数并启动调度流程。
- **utils/**
- 杂项工具函数。
- **tests/**
- 单元测试与集成测试代码。
---
## 5. 架构设计与流程说明
### 5.1 分层架构图
```text
┌─────────────────────────────────────┐
│ CLI 命令行接口 │ <- cli/main.py
└─────────────┬───────────────────────┘
┌─────────────▼───────────────────────┐
│ Orchestration 编排层 │ <- orchestration/
│ (Scheduler, TaskRegistry, ...) │
└─────────────┬───────────────────────┘
┌─────────────▼───────────────────────┐
│ Tasks 任务层 │ <- tasks/
│ (OrdersTask, PaymentsTask, ...) │
└───┬─────────┬─────────┬─────────────┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────┐ ┌──────────┐
│Loaders │ │ SCD │ │ Quality │ <- loaders/, scd/, quality/
└────────┘ └─────┘ └──────────┘
┌───────▼────────┐
│ Models 模型 │ <- models/
└───────┬────────┘
┌───────▼────────┐
│ API 客户端 │ <- api/
└───────┬────────┘
┌───────▼────────┐
│ Database 访问 │ <- database/
└───────┬────────┘
┌───────▼────────┐
│ Config 配置 │ <- config/
└────────────────┘
```
### 5.2 各层职责(当前设计)
- **CLI 层 (`cli/`)**
- 解析命令行参数指定任务列表、Dry-run、覆盖配置项等
- 初始化配置与日志后交由编排层执行。
- **编排层 (`orchestration/`)**
- `scheduler.py`:根据配置与 CLI 参数选择需要执行的任务,控制执行顺序和并行策略。
- `task_registry.py`:提供任务注册表,按任务代码创建任务实例(工厂模式)。
- `cursor_manager.py`:管理增量游标(时间窗口 / ID 游标)。
- `run_tracker.py`:记录每次任务运行的状态、统计信息和错误信息。
- **任务层 (`tasks/`)**
- `base_task.py`:定义任务执行模板流程(模板方法模式),包括获取窗口、调用上游、解析 / 校验、写库、更新游标等。
- `orders_task.py` / `payments_task.py` / `members_task.py`:实现具体任务逻辑(订单、支付、会员)。
- **加载器 / SCD / 质量层**
- `loaders/`:根据目标表封装 Upsert / Insert / Update 逻辑。
- `scd/scd2_handler.py`:为维度表提供 SCD2 历史管理能力。
- `quality/`:执行数据质量检查,如余额对账。
- **模型层 (`models/`)**
- `parsers.py`:负责数据类型转换(字符串 → 时间戳、Decimal、int 等)。
- `validators.py`:执行字段级和记录级的数据校验。
- **API 层 (`api/client.py`)**
- 封装 HTTP 调用,处理重试、超时及分页。
- **数据库层 (`database/`)**
- 管理数据库连接及上下文。
- 提供批量插入 / 更新 / Upsert 操作接口。
- **配置层 (`config/`)**
- 定义配置项默认值。
- 解析环境变量并进行类型转换。
- 对外提供统一配置对象。
### 5.3 设计模式(当前使用)
- 工厂模式:任务注册 / 创建(`TaskRegistry`)。
- 模板方法模式:任务执行流程(`BaseTask`)。
- 策略模式:不同 Loader / Checker 实现不同策略。
- 依赖注入:通过构造函数向任务传入 `db`、`api`、`config` 等依赖。
### 5.4 数据与控制流程
整体流程:
1. CLI 解析参数并加载配置。
2. Scheduler 构建数据库连接、API 客户端等依赖。
3. Scheduler 遍历任务配置,从 `TaskRegistry` 获取任务类并实例化。
4. 每个任务按统一模板执行:
- 读取游标 / 时间窗口。
- 调用 API 拉取数据(可分页)。
- 解析、验证数据。
- 通过 Loader 写入数据库(事实表 / 维度表 / SCD2
- 执行质量检查。
- 更新游标与运行记录。
5. 所有任务执行完成后,释放连接并退出进程。
### 5.5 错误处理策略
- 单个任务失败不影响其他任务执行。
- 数据库操作异常自动回滚当前事务。
- API 请求失败时按配置进行重试,超过重试次数记录错误并终止该任务。
- 所有错误被记录到日志和运行追踪表,便于事后排查。
### 5.6 ODS + DWD 双阶段策略(新增)
为了支撑回溯/重放与后续 DWD 宽表构建,项目新增了 `billiards_ods` Schema 以及一组专门的 ODS 任务/Loader
- **ODS 表**`billiards_ods.ods_order_settle`、`ods_table_use_detail`、`ods_assistant_ledger`、`ods_assistant_abolish`、`ods_goods_ledger`、`ods_payment`、`ods_refund`、`ods_coupon_verify`、`ods_member`、`ods_member_card`、`ods_package_coupon`、`ods_inventory_stock`、`ods_inventory_change`。每条记录都会保存 `store_id + 源主键 + payload JSON + fetched_at + source_endpoint` 等信息。
- **通用 Loader**`loaders/ods/generic.py::GenericODSLoader` 统一封装了 `INSERT ... ON CONFLICT ...` 与批量写入逻辑,调用方只需提供列名与主键列即可。
- **ODS 任务**`tasks/ods_tasks.py` 内通过 `OdsTaskSpec` 定义了一组任务(`ODS_ORDER_SETTLE`、`ODS_PAYMENT`、`ODS_ASSISTANT_LEDGER` 等),并在 `TaskRegistry` 中自动注册,可直接通过 `python -m cli.main --tasks ODS_ORDER_SETTLE,ODS_PAYMENT` 执行。
- **双阶段链路**
1. 阶段 1ODS调用 API/离线归档 JSON将原始记录写入 ODS 表,保留分页、抓取时间、来源文件等元数据。
2. 阶段 2DWD/DIM后续订单、支付、券等事实任务将改为从 ODS 读取 payload经过解析/校验后写入 `billiards.fact_*`、`dim_*` 表,避免重复拉取上游接口。
> 新增的单元测试 `tests/unit/test_ods_tasks.py` 覆盖了 `ODS_ORDER_SETTLE`、`ODS_PAYMENT` 的入库路径,可作为扩展其他 ODS 任务的模板。
---
## 6. 迁移指南(从旧脚本到当前项目)
本节用于说明如何从旧的单文件脚本(如 `task_merged.py`)迁移到当前模块化项目,属于当前项目的使用说明,不涉及历史对比细节。
### 6.1 核心功能映射示意
| 旧版本函数 / 类 | 新版本位置 | 说明 |
| --------------------- | ----------------------------------------------------- | ---------- |
| `DEFAULTS` 字典 | `config/defaults.py` | 配置默认值 |
| `build_config()` | `config/settings.py::AppConfig.load()` | 配置加载 |
| `Pg` 类 | `database/connection.py::DatabaseConnection` | 数据库连接 |
| `http_get_json()` | `api/client.py::APIClient.get()` | API 请求 |
| `paged_get()` | `api/client.py::APIClient.get_paginated()` | 分页请求 |
| `parse_ts()` | `models/parsers.py::TypeParser.parse_timestamp()` | 时间解析 |
| `upsert_fact_order()` | `loaders/facts/order.py::OrderLoader.upsert_orders()` | 订单加载 |
| `scd2_upsert()` | `scd/scd2_handler.py::SCD2Handler.upsert()` | SCD2 处理 |
| `run_task_orders()` | `tasks/orders_task.py::OrdersTask.execute()` | 订单任务 |
| `main()` | `cli/main.py::main()` | 主入口 |
### 6.2 典型迁移步骤
1. **配置迁移**
- 原来在 `DEFAULTS` 或脚本内硬编码的配置,迁移到 `.env` 与 `config/defaults.py`。
- 使用 `AppConfig.load()` 统一获取配置。
2. **并行运行验证**
```bash
# 旧脚本
python task_merged.py --tasks ORDERS
# 新项目
python -m cli.main --tasks ORDERS
```
对比新旧版本导出的数据表和日志,确认一致性。
3. **自定义逻辑迁移**
- 原脚本中的自定义清洗逻辑 → 放入相应 `loaders/` 或任务类中。
- 自定义任务 → 在 `tasks/` 中实现并在 `task_registry` 中注册。
- 自定义 API 调用 → 扩展 `api/client.py` 或单独封装服务类。
4. **逐步切换**
- 先在测试环境并行运行。
- 再逐步切换生产任务到新版本。
---
## 7. 开发与扩展指南(当前项目)
### 7.1 添加新任务
1. 在 `tasks/` 目录创建任务类:
```python
from .base_task import BaseTask
class MyTask(BaseTask):
def get_task_code(self) -> str:
return "MY_TASK"
def execute(self) -> dict:
# 1. 获取时间窗口
window_start, window_end, _ = self._get_time_window()
# 2. 调用 API 获取数据
records, _ = self.api.get_paginated(...)
# 3. 解析 / 校验
parsed = [self._parse(r) for r in records]
# 4. 加载数据
loader = MyLoader(self.db)
inserted, updated, _ = loader.upsert(parsed)
# 5. 提交并返回结果
self.db.commit()
return self._build_result("SUCCESS", {
"inserted": inserted,
"updated": updated,
})
```
2. 在 `orchestration/task_registry.py` 中注册:
```python
from tasks.my_task import MyTask
default_registry.register("MY_TASK", MyTask)
```
3. 在任务配置表中启用(示例):
```sql
INSERT INTO etl_admin.etl_task (task_code, store_id, enabled)
VALUES ('MY_TASK', 123456, TRUE);
```
### 7.2 添加新加载器
```python
from loaders.base_loader import BaseLoader
class MyLoader(BaseLoader):
def upsert(self, records: list) -> tuple:
sql = "INSERT INTO table_name (...) VALUES (...) ON CONFLICT (...) DO UPDATE SET ... RETURNING (xmax = 0) AS inserted"
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)
```
### 7.3 添加新质量检查器
1. 在 `quality/` 中实现检查器,继承 `base_checker.py`。
2. 在任务或调度流程中调用该检查器,在写库后进行验证。
### 7.4 类型解析与校验扩展
- 在 `models/parsers.py` 中添加新类型解析方法。
- 在 `models/validators.py` 中添加新规则(如枚举校验、跨字段校验等)。
---
## 8. 常见问题排查
### 8.1 数据库连接失败
```text
错误: could not connect to server
```
排查要点:
- 检查 `PG_DSN` 或相关数据库配置是否正确。
- 确认数据库服务是否启动、网络是否可达。
### 8.2 API 请求超时
```text
错误: requests.exceptions.Timeout
```
排查要点:
- 检查 `API_BASE` 地址与网络连通性。
- 适当提高超时与重试次数(在配置中调整)。
### 8.3 模块导入错误
```text
错误: ModuleNotFoundError
```
排查要点:
- 确认在项目根目录下运行(包含 `etl_billiards/` 包)。
- 或通过 `pip install -e .` 以可编辑模式安装项目。
### 8.4 权限相关问题
```text
错误: Permission denied
```
排查要点:
- 脚本无执行权限:`chmod +x run_etl.sh`。
- Windows 需要以管理员身份运行,或修改日志 / 导出目录权限。
---
## 9. 使用前检查清单
在正式运行前建议确认:
- [ ] 已安装 Python 3.10+。
- [ ] 已执行 `pip install -r requirements.txt`。
- [ ] `.env` 已配置正确数据库、API、门店 ID、路径等
- [ ] PostgreSQL 数据库可连接。
- [ ] API 服务可访问且凭证有效。
- [ ] `LOG_ROOT`、`EXPORT_ROOT` 目录存在且拥有写权限。
---
## 10. 参考说明
- 本文已合并原有的快速开始、项目结构、架构说明、迁移指南等内容,可作为当前项目的统一说明文档。
- 如需在此基础上拆分多份文档,可按章节拆出,例如「快速开始」「架构设计」「迁移指南」「开发扩展」等。
## 11. 运行/调试模式说明
- 生产环境仅保留“任务模式”:通过调度/CLI 执行注册的任务ETL/ODS不使用调试脚本。
- 开发/调试可使用的辅助脚本(上线前可删除或禁用):
- `python -m etl_billiards.scripts.rebuild_ods_from_json`:从本地 JSON 目录重建 `billiards_ods`,用于离线初始化/验证。环境变量:`PG_DSN`(必填)、`JSON_DOC_DIR`(可选,默认 `C:\dev\LLTQ\export\test-json-doc`)、`INCLUDE_FILES`(逗号分隔文件名)、`DROP_SCHEMA_FIRST`(默认 true
- 如需在生产环境保留脚本,请在运维手册中明确用途和禁用条件,避免误用。
## 12. ODS 任务上线指引
- 任务注册:`etl_billiards/database/seed_ods_tasks.sql` 列出了当前启用的 ODS 任务。将其中的 `store_id` 替换为实际门店后执行:
```
psql "$PG_DSN" -f etl_billiards/database/seed_ods_tasks.sql
```
`ON CONFLICT` 会保持 enabled=true避免重复。
- 调度:确认 `etl_admin.etl_task` 中已启用所需的 ODS 任务(任务代码见 seed 脚本),调度器或 CLI `--tasks` 即可调用。
- 离线回灌:开发环境可用 `rebuild_ods_from_json` 以样例 JSON 初始化 ODS生产慎用默认按 `(source_file, record_index)` 去重。
- 测试:`pytest etl_billiards/tests/unit/test_ods_tasks.py` 覆盖核心 ODS 任务;测试时可设置 `ETL_SKIP_DOTENV=1` 跳过本地 .env 读取。
## 13. ODS 表映射总览
| ODS 表名 | 接口 Path | 数据列表路径 |
| ------------------------------------ | ---------------------------------------------------- | ----------------------------- |
| `assistant_accounts_master` | `/PersonnelManagement/SearchAssistantInfo` | data.assistantInfos |
| `assistant_service_records` | `/AssistantPerformance/GetOrderAssistantDetails` | data.orderAssistantDetails |
| `assistant_cancellation_records` | `/AssistantPerformance/GetAbolitionAssistant` | data.abolitionAssistants |
| `goods_stock_movements` | `/GoodsStockManage/QueryGoodsOutboundReceipt` | data.queryDeliveryRecordsList |
| `goods_stock_summary` | `/TenantGoods/GetGoodsStockReport` | data |
| `group_buy_packages` | `/PackageCoupon/QueryPackageCouponList` | data.packageCouponList |
| `group_buy_redemption_records` | `/Site/GetSiteTableUseDetails` | data.siteTableUseDetailsList |
| `member_profiles` | `/MemberProfile/GetTenantMemberList` | data.tenantMemberInfos |
| `member_balance_changes` | `/MemberProfile/GetMemberCardBalanceChange` | data.tenantMemberCardLogs |
| `member_stored_value_cards` | `/MemberProfile/GetTenantMemberCardList` | data.tenantMemberCards |
| `payment_transactions` | `/PayLog/GetPayLogListPage` | data |
| `platform_coupon_redemption_records` | `/Promotion/GetOfflineCouponConsumePageList` | data |
| `recharge_settlements` | `/Site/GetRechargeSettleList` | data.settleList |
| `refund_transactions` | `/Order/GetRefundPayLogList` | data |
| `settlement_records` | `/Site/GetAllOrderSettleList` | data.settleList |
| `settlement_ticket_details` | `/Order/GetOrderSettleTicketNew` | (整包原始 JSON |
| `site_tables_master` | `/Table/GetSiteTables` | data.siteTables |
| `stock_goods_category_tree` | `/TenantGoodsCategory/QueryPrimarySecondaryCategory` | data.goodsCategoryList |
| `store_goods_master` | `/TenantGoods/GetGoodsInventoryList` | data.orderGoodsList |
| `store_goods_sales_records` | `/TenantGoods/GetGoodsSalesList` | data.orderGoodsLedgers |
| `table_fee_discount_records` | `/Site/GetTaiFeeAdjustList` | data.taiFeeAdjustInfos |
| `table_fee_transactions` | `/Site/GetSiteTableOrderDetails` | data.siteTableUseDetailsList |
| `tenant_goods_master` | `/TenantGoods/QueryTenantGoods` | data.tenantGoodsList |
## 14. ODS 相关环境变量/默认值
- `.env` / 环境变量:
- `JSON_DOC_DIR`ODS 样例 JSON 目录(开发/回灌用)
- `ODS_INCLUDE_FILES`:限定导入的文件名(逗号分隔,不含 .json
- `ODS_DROP_SCHEMA_FIRST`true/false是否重建 schema
- `ETL_SKIP_DOTENV`:测试/CI 时设为 1 跳过本地 .env 读取
- `config/defaults.py` 中 `ods` 默认值:
- `json_doc_dir`: `C:\dev\LLTQ\export\test-json-doc`
- `include_files`: `""`
- `drop_schema_first`: `True`
---
## 15. DWD 维度 “业务事件”
1. 粒度唯一、原子
- 一张 DWD 表只能有一种业务粒度,比如:
- 一条记录 = 一次结账;
- 一条记录 = 一段台费流水;
- 一条记录 = 一次助教服务;
- 一条记录 = 一次会员余额变动。
- 表里面不能又混“订单头”又混“订单行”,不能一部分是“汇总”,一部分是“明细”。
- 一旦粒度确定,所有字段都要跟这个粒度匹配:
- 比如“结账头表”就不要塞每一行商品明细;
- 商品明细就不要塞整单级别的总金额。
- 这是 DWD 层最重要的一条。
2. 以业务过程建模,不以 JSON 列表建模
- 先画清楚你真实的业务链路:
- 开台 / 换台 / 关台 → 台费流水
- 助教上桌 → 助教服务流水 / 废除事件
- 点单 → 商品销售流水
- 充值 / 消费 → 余额变更 / 充值单
- 结账 → 结账头表 + 支付流水 / 退款流水
- 团购 / 平台券 → 核销流水
3. 主键明确、外键统一
- 每张 DWD 表必须有业务主键(哪怕是接口给的 id不要依赖数据库自增。
- 所有“同一概念”的字段必须统一命名、统一含义:
- 门店:统一叫 site_id都对应 siteProfile.id
- 会员:统一叫 member_id 对应 member_profiles.idsystem_member_id 单独一列;
- 台桌:统一 table_id 对应 site_tables_master.id
- 结账:统一 order_settle_id
- 订单:统一 order_trade_no 等。
- 否则后面 DWS、AI 要把表拼起来会非常痛苦。
4. 保留明细,不做过度汇总
- DWD 层的事实表原则上只做“明细级”的数据:
- 不要在 DWD 就把“日汇总、周汇总、月汇总”算出来,那是 DWS 的事;
- 也不要把多个事件折成一行(例如一张表同时放日汇总+单笔流水)。
- 需要聚合时,再在 DWS 做主题宽表:
- dws_member_day_profile、dws_site_day_summary 等。
- DWD 只负责细颗粒度的真相。
5. 统一清洗、标准化,但保持可追溯
- 在 DWD 层一定要做的清洗:
- 类型转换:字符串时间 → 时间类型,金额统一为 decimal布尔统一为 0/1
- 单位统一:秒 / 分钟、元 / 分都统一;
- 枚举标准化:状态码、类型码在 DWD 里就定死含义,必要时建枚举维表。
- 同时要保证:
- 每条 DWD 记录都能追溯回 ODS
- 保留源系统主键;
- 保留原始时间 / 原始金额字段(不要覆盖掉)。
6. 扁平化、去嵌套
- JSON 里常见结构是:分页壳 + 头 + 明细数组 + 各种嵌套对象siteProfile、tableProfile、goodsLedgers…
- DWD 的原则是:
- 去掉分页壳;
- 把“数组”拆成子表(头表 / 行表);
- 把重复出现的 profile 抽出去做维度表(门店、台、商品、会员……)。
- 目标是DWD 表都是二维表结构,不存复杂嵌套 JSON。
7. 模型长期稳定,可扩展
- DWD 的表结构要尽可能稳定,新增需求尽量通过:
- 加字段;
- 新建事实表 / 维度表;
- 在 DWS 做派生指标;
- 而不是频繁重构已有 DWD 表结构。
- 这点跟你后面要喂给 LLM 也很相关AI 配的 prompt、schema 理解都要尽量少改。