Files
feiqiu-ETL/etl_billiards/README.md
2025-11-19 05:35:22 +08:00

646 lines
22 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 台球场 ETL 系统(模块化版本)合并文档
本文为原多份文档(如 `INDEX.md``QUICK_START.md``ARCHITECTURE.md``MIGRATION_GUIDE.md``PROJECT_STRUCTURE.md``README.md` 等)的合并版,只保留与**当前项目本身**相关的内容:项目说明、目录结构、架构设计、数据与控制流程、迁移与扩展指南等,不包含修改历史和重构过程描述。
---
## 1. 项目概述
台球场 ETL 系统是一个面向门店业务的专业 ETL 工程项目,用于从外部业务 API 拉取订单、支付、会员等数据经过解析、校验、SCD2 处理、质量检查后写入 PostgreSQL 数据库,并支持增量同步和任务运行追踪。
系统采用模块化、分层架构设计,核心特性包括:
- 模块化目录结构配置、数据库、API、模型、加载器、SCD2、质量检查、编排、任务、CLI、工具、测试等分层清晰
- 完整的配置管理:默认值 + 环境变量 + CLI 参数多层覆盖。
- 可复用的数据库访问层(连接管理、批量 Upsert 封装)。
- 支持重试与分页的 API 客户端。
- 类型安全的数据解析与校验模块。
- SCD2 维度历史管理。
- 数据质量检查(例如余额一致性检查)。
- 任务编排层统一调度、游标管理与运行追踪。
- 命令行入口统一管理任务执行支持筛选任务、Dry-run 等模式。
---
## 2. 快速开始
### 2.1 环境准备
- Python 版本:建议 3.10+
- 数据库PostgreSQL
- 操作系统Windows / Linux / macOS 均可
```bash
# 克隆/下载代码后进入项目目录
cd etl_billiards/
ls -la
```
你会看到下述目录结构的顶层部分(详细见第 4 章):
- `config/` - 配置管理
- `database/` - 数据库访问
- `api/` - API 客户端
- `tasks/` - ETL 任务实现
- `cli/` - 命令行入口
- `docs/` - 技术文档
### 2.2 安装依赖
```bash
pip install -r requirements.txt
```
主要依赖示例(按实际 `requirements.txt` 为准):
- `psycopg2-binary`PostgreSQL 驱动
- `requests`HTTP 客户端
- `python-dateutil`:时间处理
- `tzdata`:时区数据
### 2.3 配置环境变量
复制并修改环境变量模板:
```bash
cp .env.example .env
# 使用你习惯的编辑器修改 .env
```
`.env` 示例(最小配置):
```bash
# 数据库
PG_DSN=postgresql://user:password@localhost:5432/LLZQ
# API
API_BASE=https://api.example.com
API_TOKEN=your_token_here
# 门店/应用
STORE_ID=2790685415443269
TIMEZONE=Asia/Taipei
# 目录
EXPORT_ROOT=/path/to/export
LOG_ROOT=/path/to/logs
```
> 所有配置项的默认值见 `config/defaults.py`,最终生效配置由「默认值 + 环境变量 + CLI 参数」三层叠加。
### 2.4 运行第一个任务
通过 CLI 入口运行:
```bash
# 运行所有任务
python -m cli.main
# 仅运行订单任务
python -m cli.main --tasks ORDERS
# 运行订单 + 支付
python -m cli.main --tasks ORDERS,PAYMENTS
# Windows 使用脚本
run_etl.bat --tasks ORDERS
# Linux / macOS 使用脚本
./run_etl.sh --tasks ORDERS
```
### 2.5 查看结果
- 日志目录:使用 `LOG_ROOT` 指定,例如
```bash
ls -la D:\LLZQ\DB\logs/
```
- 导出目录:使用 `EXPORT_ROOT` 指定,例如
```bash
ls -la D:\LLZQ\DB\export/
```
---
## 3. 常用命令与开发工具
### 3.1 CLI 常用命令
```bash
# 运行所有任务
python -m cli.main
# 运行指定任务
python -m cli.main --tasks ORDERS,PAYMENTS,MEMBERS
# 使用自定义数据库
python -m cli.main --pg-dsn "postgresql://user:password@host:5432/db"
# 使用自定义 API 端点
python -m cli.main --api-base "https://api.example.com" --api-token "..."
# 试运行(不写入数据库)
python -m cli.main --dry-run --tasks ORDERS
```
### 3.2 IDE / 代码质量工具示例VSCode
`.vscode/settings.json` 示例:
```json
{
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.formatting.provider": "black",
"python.testing.pytestEnabled": true
}
```
代码格式化与检查:
```bash
pip install black isort pylint
black .
isort .
pylint etl_billiards/
```
### 3.3 测试
```bash
# 安装测试依赖(按需)
pip install pytest pytest-cov
# 运行全部测试
pytest
# 仅运行单元测试
pytest tests/unit/
# 生成覆盖率报告
pytest --cov=. --cov-report=html
```
测试示例(按实际项目为准):
- `tests/unit/test_config.py` 配置管理单元测试
- `tests/unit/test_parsers.py` 解析器单元测试
- `tests/integration/test_database.py` 数据库集成测试
#### 3.3.1 测试模式ONLINE / OFFLINE
- `TEST_MODE=ONLINE`(默认)时,测试会模拟实时 API完整执行 E/T/L。
- `TEST_MODE=OFFLINE` 时,测试改为从 `TEST_JSON_ARCHIVE_DIR` 指定的归档 JSON 中读取数据,仅做 Transform + Load适合验证本地归档数据是否仍可回放。
- `TEST_JSON_ARCHIVE_DIR`:离线 JSON 归档目录(示例:`tests/testdata_json` 或 CI 产出的快照)。
- `TEST_JSON_TEMP_DIR`:测试生成的临时 JSON 输出目录,便于隔离每次运行的数据。
- `TEST_DB_DSN`:可选,若设置则单元测试会连接到此 PostgreSQL DSN实打实执行写库留空时测试使用内存伪库避免依赖数据库。
示例命令:
```bash
# 在线模式覆盖所有任务
TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py
# 离线模式使用归档 JSON 覆盖所有任务
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/testdata_json pytest tests/unit/test_etl_tasks_offline.py
# 使用脚本按需组合参数(示例:在线 + 仅订单用例)
python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS
# 使用脚本连接真实测试库并回放离线模式
python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb
# 使用“指令仓库”中的预置命令
python scripts/run_tests.py --preset offline_realdb
python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py
```
---
## 4. 项目结构与文件说明
### 4.1 总体目录结构(树状图)
```text
etl_billiards/
├── README.md # 项目总览和使用说明
├── MIGRATION_GUIDE.md # 从旧版本迁移指南
├── requirements.txt # Python 依赖列表
├── setup.py # 项目安装配置
├── .env.example # 环境变量配置模板
├── .gitignore # Git 忽略文件配置
├── run_etl.sh # Linux/Mac 运行脚本
├── run_etl.bat # Windows 运行脚本
├── config/ # 配置管理模块
│ ├── __init__.py
│ ├── defaults.py # 默认配置值定义
│ ├── env_parser.py # 环境变量解析器
│ └── settings.py # 配置管理主类
├── database/ # 数据库访问层
│ ├── __init__.py
│ ├── connection.py # 数据库连接管理
│ └── operations.py # 批量操作封装
├── api/ # HTTP API 客户端
│ ├── __init__.py
│ └── client.py # API 客户端(重试 + 分页)
├── models/ # 数据模型层
│ ├── __init__.py
│ ├── parsers.py # 类型解析器
│ └── validators.py # 数据验证器
├── loaders/ # 数据加载器层
│ ├── __init__.py
│ ├── base_loader.py # 加载器基类
│ ├── dimensions/ # 维度表加载器
│ │ ├── __init__.py
│ │ └── member.py # 会员维度加载器
│ └── facts/ # 事实表加载器
│ ├── __init__.py
│ ├── order.py # 订单事实表加载器
│ └── payment.py # 支付记录加载器
├── scd/ # SCD2 处理层
│ ├── __init__.py
│ └── scd2_handler.py # SCD2 历史记录处理器
├── quality/ # 数据质量检查层
│ ├── __init__.py
│ ├── base_checker.py # 质量检查器基类
│ └── balance_checker.py # 余额一致性检查器
├── orchestration/ # ETL 编排层
│ ├── __init__.py
│ ├── scheduler.py # ETL 调度器
│ ├── task_registry.py # 任务注册表(工厂模式)
│ ├── cursor_manager.py # 游标管理器
│ └── run_tracker.py # 运行记录追踪器
├── tasks/ # ETL 任务层
│ ├── __init__.py
│ ├── base_task.py # 任务基类(模板方法)
│ ├── orders_task.py # 订单 ETL 任务
│ ├── payments_task.py # 支付 ETL 任务
│ └── members_task.py # 会员 ETL 任务
├── cli/ # 命令行接口层
│ ├── __init__.py
│ └── main.py # CLI 主入口
├── utils/ # 工具函数
│ ├── __init__.py
│ └── helpers.py # 通用工具函数
├── tests/ # 测试代码
│ ├── __init__.py
│ ├── unit/ # 单元测试
│ │ ├── __init__.py
│ │ ├── test_config.py
│ │ └── test_parsers.py
│ ├── testdata_json/ # 清洗入库用的测试Json文件
│ │ └── XX.json
│ └── integration/ # 集成测试
│ ├── __init__.py
│ └── test_database.py
└── docs/ # 文档
└── ARCHITECTURE.md # 架构设计文档
```
### 4.2 各模块职责概览
- **config/**
- 统一配置入口,支持默认值、环境变量、命令行参数三层覆盖。
- **database/**
- 封装 PostgreSQL 连接与批量操作插入、更新、Upsert 等)。
- **api/**
- 对上游业务 API 的 HTTP 调用进行统一封装,支持重试、分页与超时控制。
- **models/**
- 提供类型解析器(时间戳、金额、整数等)与业务级数据校验器。
- **loaders/**
- 提供事实表与维度表的加载逻辑(包含批量 Upsert、统计写入结果等
- **scd/**
- 维度型数据的 SCD2 历史管理(有效期、版本标记等)。
- **quality/**
- 质量检查策略,例如余额一致性、记录数量对齐等。
- **orchestration/**
- 任务调度、任务注册、游标管理(增量窗口)、运行记录追踪。
- **tasks/**
- 具体业务任务(订单、支付、会员等),封装了从“取数 → 处理 → 写库 → 记录结果”的完整流程。
- **cli/**
- 命令行入口,解析参数并启动调度流程。
- **utils/**
- 杂项工具函数。
- **tests/**
- 单元测试与集成测试代码。
---
## 5. 架构设计与流程说明
### 5.1 分层架构图
```text
┌─────────────────────────────────────┐
│ CLI 命令行接口 │ <- cli/main.py
└─────────────┬───────────────────────┘
┌─────────────▼───────────────────────┐
│ Orchestration 编排层 │ <- orchestration/
│ (Scheduler, TaskRegistry, ...) │
└─────────────┬───────────────────────┘
┌─────────────▼───────────────────────┐
│ Tasks 任务层 │ <- tasks/
│ (OrdersTask, PaymentsTask, ...) │
└───┬─────────┬─────────┬─────────────┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────┐ ┌──────────┐
│Loaders │ │ SCD │ │ Quality │ <- loaders/, scd/, quality/
└────────┘ └─────┘ └──────────┘
┌───────▼────────┐
│ Models 模型 │ <- models/
└───────┬────────┘
┌───────▼────────┐
│ API 客户端 │ <- api/
└───────┬────────┘
┌───────▼────────┐
│ Database 访问 │ <- database/
└───────┬────────┘
┌───────▼────────┐
│ Config 配置 │ <- config/
└────────────────┘
```
### 5.2 各层职责(当前设计)
- **CLI 层 (`cli/`)**
- 解析命令行参数指定任务列表、Dry-run、覆盖配置项等
- 初始化配置与日志后交由编排层执行。
- **编排层 (`orchestration/`)**
- `scheduler.py`:根据配置与 CLI 参数选择需要执行的任务,控制执行顺序和并行策略。
- `task_registry.py`:提供任务注册表,按任务代码创建任务实例(工厂模式)。
- `cursor_manager.py`:管理增量游标(时间窗口 / ID 游标)。
- `run_tracker.py`:记录每次任务运行的状态、统计信息和错误信息。
- **任务层 (`tasks/`)**
- `base_task.py`:定义任务执行模板流程(模板方法模式),包括获取窗口、调用上游、解析 / 校验、写库、更新游标等。
- `orders_task.py` / `payments_task.py` / `members_task.py`:实现具体任务逻辑(订单、支付、会员)。
- **加载器 / SCD / 质量层**
- `loaders/`:根据目标表封装 Upsert / Insert / Update 逻辑。
- `scd/scd2_handler.py`:为维度表提供 SCD2 历史管理能力。
- `quality/`:执行数据质量检查,如余额对账。
- **模型层 (`models/`)**
- `parsers.py`:负责数据类型转换(字符串 → 时间戳、Decimal、int 等)。
- `validators.py`:执行字段级和记录级的数据校验。
- **API 层 (`api/client.py`)**
- 封装 HTTP 调用,处理重试、超时及分页。
- **数据库层 (`database/`)**
- 管理数据库连接及上下文。
- 提供批量插入 / 更新 / Upsert 操作接口。
- **配置层 (`config/`)**
- 定义配置项默认值。
- 解析环境变量并进行类型转换。
- 对外提供统一配置对象。
### 5.3 设计模式(当前使用)
- 工厂模式:任务注册 / 创建(`TaskRegistry`)。
- 模板方法模式:任务执行流程(`BaseTask`)。
- 策略模式:不同 Loader / Checker 实现不同策略。
- 依赖注入:通过构造函数向任务传入 `db`、`api`、`config` 等依赖。
### 5.4 数据与控制流程
整体流程:
1. CLI 解析参数并加载配置。
2. Scheduler 构建数据库连接、API 客户端等依赖。
3. Scheduler 遍历任务配置,从 `TaskRegistry` 获取任务类并实例化。
4. 每个任务按统一模板执行:
- 读取游标 / 时间窗口。
- 调用 API 拉取数据(可分页)。
- 解析、验证数据。
- 通过 Loader 写入数据库(事实表 / 维度表 / SCD2
- 执行质量检查。
- 更新游标与运行记录。
5. 所有任务执行完成后,释放连接并退出进程。
### 5.5 错误处理策略
- 单个任务失败不影响其他任务执行。
- 数据库操作异常自动回滚当前事务。
- API 请求失败时按配置进行重试,超过重试次数记录错误并终止该任务。
- 所有错误被记录到日志和运行追踪表,便于事后排查。
---
## 6. 迁移指南(从旧脚本到当前项目)
本节用于说明如何从旧的单文件脚本(如 `task_merged.py`)迁移到当前模块化项目,属于当前项目的使用说明,不涉及历史对比细节。
### 6.1 核心功能映射示意
| 旧版本函数 / 类 | 新版本位置 | 说明 |
|---------------------------|--------------------------------------------------------|----------------|
| `DEFAULTS` 字典 | `config/defaults.py` | 配置默认值 |
| `build_config()` | `config/settings.py::AppConfig.load()` | 配置加载 |
| `Pg` 类 | `database/connection.py::DatabaseConnection` | 数据库连接 |
| `http_get_json()` | `api/client.py::APIClient.get()` | API 请求 |
| `paged_get()` | `api/client.py::APIClient.get_paginated()` | 分页请求 |
| `parse_ts()` | `models/parsers.py::TypeParser.parse_timestamp()` | 时间解析 |
| `upsert_fact_order()` | `loaders/facts/order.py::OrderLoader.upsert_orders()` | 订单加载 |
| `scd2_upsert()` | `scd/scd2_handler.py::SCD2Handler.upsert()` | SCD2 处理 |
| `run_task_orders()` | `tasks/orders_task.py::OrdersTask.execute()` | 订单任务 |
| `main()` | `cli/main.py::main()` | 主入口 |
### 6.2 典型迁移步骤
1. **配置迁移**
- 原来在 `DEFAULTS` 或脚本内硬编码的配置,迁移到 `.env` 与 `config/defaults.py`。
- 使用 `AppConfig.load()` 统一获取配置。
2. **并行运行验证**
```bash
# 旧脚本
python task_merged.py --tasks ORDERS
# 新项目
python -m cli.main --tasks ORDERS
```
对比新旧版本导出的数据表和日志,确认一致性。
3. **自定义逻辑迁移**
- 原脚本中的自定义清洗逻辑 → 放入相应 `loaders/` 或任务类中。
- 自定义任务 → 在 `tasks/` 中实现并在 `task_registry` 中注册。
- 自定义 API 调用 → 扩展 `api/client.py` 或单独封装服务类。
4. **逐步切换**
- 先在测试环境并行运行。
- 再逐步切换生产任务到新版本。
---
## 7. 开发与扩展指南(当前项目)
### 7.1 添加新任务
1. 在 `tasks/` 目录创建任务类:
```python
from .base_task import BaseTask
class MyTask(BaseTask):
def get_task_code(self) -> str:
return "MY_TASK"
def execute(self) -> dict:
# 1. 获取时间窗口
window_start, window_end, _ = self._get_time_window()
# 2. 调用 API 获取数据
records, _ = self.api.get_paginated(...)
# 3. 解析 / 校验
parsed = [self._parse(r) for r in records]
# 4. 加载数据
loader = MyLoader(self.db)
inserted, updated, _ = loader.upsert(parsed)
# 5. 提交并返回结果
self.db.commit()
return self._build_result("SUCCESS", {
"inserted": inserted,
"updated": updated,
})
```
2. 在 `orchestration/task_registry.py` 中注册:
```python
from tasks.my_task import MyTask
default_registry.register("MY_TASK", MyTask)
```
3. 在任务配置表中启用(示例):
```sql
INSERT INTO etl_admin.etl_task (task_code, store_id, enabled)
VALUES ('MY_TASK', 123456, TRUE);
```
### 7.2 添加新加载器
```python
from loaders.base_loader import BaseLoader
class MyLoader(BaseLoader):
def upsert(self, records: list) -> tuple:
sql = "INSERT INTO table_name (...) VALUES (...) ON CONFLICT (...) DO UPDATE SET ... RETURNING (xmax = 0) AS inserted"
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)
```
### 7.3 添加新质量检查器
1. 在 `quality/` 中实现检查器,继承 `base_checker.py`。
2. 在任务或调度流程中调用该检查器,在写库后进行验证。
### 7.4 类型解析与校验扩展
- 在 `models/parsers.py` 中添加新类型解析方法。
- 在 `models/validators.py` 中添加新规则(如枚举校验、跨字段校验等)。
---
## 8. 常见问题排查
### 8.1 数据库连接失败
```text
错误: could not connect to server
```
排查要点:
- 检查 `PG_DSN` 或相关数据库配置是否正确。
- 确认数据库服务是否启动、网络是否可达。
### 8.2 API 请求超时
```text
错误: requests.exceptions.Timeout
```
排查要点:
- 检查 `API_BASE` 地址与网络连通性。
- 适当提高超时与重试次数(在配置中调整)。
### 8.3 模块导入错误
```text
错误: ModuleNotFoundError
```
排查要点:
- 确认在项目根目录下运行(包含 `etl_billiards/` 包)。
- 或通过 `pip install -e .` 以可编辑模式安装项目。
### 8.4 权限相关问题
```text
错误: Permission denied
```
排查要点:
- 脚本无执行权限:`chmod +x run_etl.sh`。
- Windows 需要以管理员身份运行,或修改日志 / 导出目录权限。
---
## 9. 使用前检查清单
在正式运行前建议确认:
- [ ] 已安装 Python 3.10+。
- [ ] 已执行 `pip install -r requirements.txt`。
- [ ] `.env` 已配置正确数据库、API、门店 ID、路径等
- [ ] PostgreSQL 数据库可连接。
- [ ] API 服务可访问且凭证有效。
- [ ] `LOG_ROOT`、`EXPORT_ROOT` 目录存在且拥有写权限。
---
## 10. 参考说明
- 本文已合并原有的快速开始、项目结构、架构说明、迁移指南等内容,可作为当前项目的统一说明文档。
- 如需在此基础上拆分多份文档,可按章节拆出,例如「快速开始」「架构设计」「迁移指南」「开发扩展」等。