From e4219afc24c2d3f58db5c087125703f2479e8d98 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 17 Nov 2025 16:32:17 +0000 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=96=B0=E7=9A=84.md?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 616 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 614 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index acaeb38..938ec08 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,615 @@ -# feiqiu-ETL +# 台球场 ETL 系统(模块化版本)合并文档 -从非球 接口获取数据,处理到入库 \ No newline at end of file +本文为原多份文档(如 `INDEX.md`、`QUICK_START.md`、`ARCHITECTURE.md`、`MIGRATION_GUIDE.md`、`PROJECT_STRUCTURE.md`、`README.md` 等)的合并版,只保留与**当前项目本身**相关的内容:项目说明、目录结构、架构设计、数据与控制流程、迁移与扩展指南等,不包含修改历史和重构过程描述。 + +--- + +## 1. 项目概述 + +台球场 ETL 系统是一个面向门店业务的专业 ETL 工程项目,用于从外部业务 API 拉取订单、支付、会员等数据,经过解析、校验、SCD2 处理、质量检查后写入 PostgreSQL 数据库,并支持增量同步和任务运行追踪。 + +系统采用模块化、分层架构设计,核心特性包括: + +- 模块化目录结构(配置、数据库、API、模型、加载器、SCD2、质量检查、编排、任务、CLI、工具、测试等分层清晰)。 +- 完整的配置管理:默认值 + 环境变量 + CLI 参数多层覆盖。 +- 可复用的数据库访问层(连接管理、批量 Upsert 封装)。 +- 支持重试与分页的 API 客户端。 +- 类型安全的数据解析与校验模块。 +- SCD2 维度历史管理。 +- 数据质量检查(例如余额一致性检查)。 +- 任务编排层统一调度、游标管理与运行追踪。 +- 命令行入口统一管理任务执行,支持筛选任务、Dry-run 等模式。 + +--- + +## 2. 快速开始 + +### 2.1 环境准备 + +- Python 版本:建议 3.10+ +- 数据库:PostgreSQL +- 操作系统:Windows / Linux / macOS 均可 + +```bash +# 克隆/下载代码后进入项目目录 +cd etl_billiards/ +ls -la +``` + +你会看到下述目录结构的顶层部分(详细见第 4 章): + +- `config/` - 配置管理 +- `database/` - 数据库访问 +- `api/` - API 客户端 +- `tasks/` - ETL 任务实现 +- `cli/` - 命令行入口 +- `docs/` - 技术文档 + +### 2.2 安装依赖 + +```bash +pip install -r requirements.txt +``` + +主要依赖示例(按实际 `requirements.txt` 为准): + +- `psycopg2-binary`:PostgreSQL 驱动 +- `requests`:HTTP 客户端 +- `python-dateutil`:时间处理 +- `tzdata`:时区数据 + +### 2.3 配置环境变量 + +复制并修改环境变量模板: + +```bash +cp .env.example .env +# 使用你习惯的编辑器修改 .env +``` + +`.env` 示例(最小配置): + +```bash +# 数据库 +PG_DSN=postgresql://user:password@localhost:5432/LLZQ + +# API +API_BASE=https://api.example.com +API_TOKEN=your_token_here + +# 门店/应用 +STORE_ID=2790685415443269 +TIMEZONE=Asia/Taipei + +# 目录 +EXPORT_ROOT=/path/to/export +LOG_ROOT=/path/to/logs +``` + +> 所有配置项的默认值见 `config/defaults.py`,最终生效配置由「默认值 + 环境变量 + CLI 参数」三层叠加。 + +### 2.4 运行第一个任务 + +通过 CLI 入口运行: + +```bash +# 运行所有任务 +python -m cli.main + +# 仅运行订单任务 +python -m cli.main --tasks ORDERS + +# 运行订单 + 支付 +python -m cli.main --tasks ORDERS,PAYMENTS + +# Windows 使用脚本 +run_etl.bat --tasks ORDERS + +# Linux / macOS 使用脚本 +./run_etl.sh --tasks ORDERS +``` + +### 2.5 查看结果 + +- 日志目录:使用 `LOG_ROOT` 指定,例如 + + ```bash + ls -la D:\LLZQ\DB\logs/ + ``` + +- 导出目录:使用 `EXPORT_ROOT` 指定,例如 + + ```bash + ls -la D:\LLZQ\DB\export/ + ``` + +--- + +## 3. 常用命令与开发工具 + +### 3.1 CLI 常用命令 + +```bash +# 运行所有任务 +python -m cli.main + +# 运行指定任务 +python -m cli.main --tasks ORDERS,PAYMENTS,MEMBERS + +# 使用自定义数据库 +python -m cli.main --pg-dsn "postgresql://user:password@host:5432/db" + +# 使用自定义 API 端点 +python -m cli.main --api-base "https://api.example.com" --api-token "..." + +# 试运行(不写入数据库) +python -m cli.main --dry-run --tasks ORDERS +``` + +### 3.2 IDE / 代码质量工具(示例:VSCode) + +`.vscode/settings.json` 示例: + +```json +{ + "python.linting.enabled": true, + "python.linting.pylintEnabled": true, + "python.formatting.provider": "black", + "python.testing.pytestEnabled": true +} +``` + +代码格式化与检查: + +```bash +pip install black isort pylint + +black . +isort . +pylint etl_billiards/ +``` + +### 3.3 测试 + +```bash +# 安装测试依赖(按需) +pip install pytest pytest-cov + +# 运行全部测试 +pytest + +# 仅运行单元测试 +pytest tests/unit/ + +# 生成覆盖率报告 +pytest --cov=. --cov-report=html +``` + +测试示例(按实际项目为准): + +- `tests/unit/test_config.py` – 配置管理单元测试 +- `tests/unit/test_parsers.py` – 解析器单元测试 +- `tests/integration/test_database.py` – 数据库集成测试 + +--- + +## 4. 项目结构与文件说明 + +### 4.1 总体目录结构(树状图) + +```text +etl_billiards/ +│ +├── README.md # 项目总览和使用说明 +├── MIGRATION_GUIDE.md # 从旧版本迁移指南 +├── requirements.txt # Python 依赖列表 +├── setup.py # 项目安装配置 +├── .env.example # 环境变量配置模板 +├── .gitignore # Git 忽略文件配置 +├── run_etl.sh # Linux/Mac 运行脚本 +├── run_etl.bat # Windows 运行脚本 +│ +├── config/ # 配置管理模块 +│ ├── __init__.py +│ ├── defaults.py # 默认配置值定义 +│ ├── env_parser.py # 环境变量解析器 +│ └── settings.py # 配置管理主类 +│ +├── database/ # 数据库访问层 +│ ├── __init__.py +│ ├── connection.py # 数据库连接管理 +│ └── operations.py # 批量操作封装 +│ +├── api/ # HTTP API 客户端 +│ ├── __init__.py +│ └── client.py # API 客户端(重试 + 分页) +│ +├── models/ # 数据模型层 +│ ├── __init__.py +│ ├── parsers.py # 类型解析器 +│ └── validators.py # 数据验证器 +│ +├── loaders/ # 数据加载器层 +│ ├── __init__.py +│ ├── base_loader.py # 加载器基类 +│ ├── dimensions/ # 维度表加载器 +│ │ ├── __init__.py +│ │ └── member.py # 会员维度加载器 +│ └── facts/ # 事实表加载器 +│ ├── __init__.py +│ ├── order.py # 订单事实表加载器 +│ └── payment.py # 支付记录加载器 +│ +├── scd/ # SCD2 处理层 +│ ├── __init__.py +│ └── scd2_handler.py # SCD2 历史记录处理器 +│ +├── quality/ # 数据质量检查层 +│ ├── __init__.py +│ ├── base_checker.py # 质量检查器基类 +│ └── balance_checker.py # 余额一致性检查器 +│ +├── orchestration/ # ETL 编排层 +│ ├── __init__.py +│ ├── scheduler.py # ETL 调度器 +│ ├── task_registry.py # 任务注册表(工厂模式) +│ ├── cursor_manager.py # 游标管理器 +│ └── run_tracker.py # 运行记录追踪器 +│ +├── tasks/ # ETL 任务层 +│ ├── __init__.py +│ ├── base_task.py # 任务基类(模板方法) +│ ├── orders_task.py # 订单 ETL 任务 +│ ├── payments_task.py # 支付 ETL 任务 +│ └── members_task.py # 会员 ETL 任务 +│ +├── cli/ # 命令行接口层 +│ ├── __init__.py +│ └── main.py # CLI 主入口 +│ +├── utils/ # 工具函数 +│ ├── __init__.py +│ └── helpers.py # 通用工具函数 +│ +├── tests/ # 测试代码 +│ ├── __init__.py +│ ├── unit/ # 单元测试 +│ │ ├── __init__.py +│ │ ├── test_config.py +│ │ └── test_parsers.py +│ └── integration/ # 集成测试 +│ ├── __init__.py +│ └── test_database.py +│ +└── docs/ # 文档 + └── ARCHITECTURE.md # 架构设计文档 +``` + +### 4.2 各模块职责概览 + +- **config/** + - 统一配置入口,支持默认值、环境变量、命令行参数三层覆盖。 +- **database/** + - 封装 PostgreSQL 连接与批量操作(插入、更新、Upsert 等)。 +- **api/** + - 对上游业务 API 的 HTTP 调用进行统一封装,支持重试、分页与超时控制。 +- **models/** + - 提供类型解析器(时间戳、金额、整数等)与业务级数据校验器。 +- **loaders/** + - 提供事实表与维度表的加载逻辑(包含批量 Upsert、统计写入结果等)。 +- **scd/** + - 维度型数据的 SCD2 历史管理(有效期、版本标记等)。 +- **quality/** + - 质量检查策略,例如余额一致性、记录数量对齐等。 +- **orchestration/** + - 任务调度、任务注册、游标管理(增量窗口)、运行记录追踪。 +- **tasks/** + - 具体业务任务(订单、支付、会员等),封装了从“取数 → 处理 → 写库 → 记录结果”的完整流程。 +- **cli/** + - 命令行入口,解析参数并启动调度流程。 +- **utils/** + - 杂项工具函数。 +- **tests/** + - 单元测试与集成测试代码。 + +--- + +## 5. 架构设计与流程说明 + +### 5.1 分层架构图 + +```text +┌─────────────────────────────────────┐ +│ CLI 命令行接口 │ <- cli/main.py +└─────────────┬───────────────────────┘ + │ +┌─────────────▼───────────────────────┐ +│ Orchestration 编排层 │ <- orchestration/ +│ (Scheduler, TaskRegistry, ...) │ +└─────────────┬───────────────────────┘ + │ +┌─────────────▼───────────────────────┐ +│ Tasks 任务层 │ <- tasks/ +│ (OrdersTask, PaymentsTask, ...) │ +└───┬─────────┬─────────┬─────────────┘ + │ │ │ + ▼ ▼ ▼ +┌────────┐ ┌─────┐ ┌──────────┐ +│Loaders │ │ SCD │ │ Quality │ <- loaders/, scd/, quality/ +└────────┘ └─────┘ └──────────┘ + │ + ┌───────▼────────┐ + │ Models 模型 │ <- models/ + └───────┬────────┘ + │ + ┌───────▼────────┐ + │ API 客户端 │ <- api/ + └───────┬────────┘ + │ + ┌───────▼────────┐ + │ Database 访问 │ <- database/ + └───────┬────────┘ + │ + ┌───────▼────────┐ + │ Config 配置 │ <- config/ + └────────────────┘ +``` + +### 5.2 各层职责(当前设计) + +- **CLI 层 (`cli/`)** + - 解析命令行参数(指定任务列表、Dry-run、覆盖配置项等)。 + - 初始化配置与日志后交由编排层执行。 + +- **编排层 (`orchestration/`)** + - `scheduler.py`:根据配置与 CLI 参数选择需要执行的任务,控制执行顺序和并行策略。 + - `task_registry.py`:提供任务注册表,按任务代码创建任务实例(工厂模式)。 + - `cursor_manager.py`:管理增量游标(时间窗口 / ID 游标)。 + - `run_tracker.py`:记录每次任务运行的状态、统计信息和错误信息。 + +- **任务层 (`tasks/`)** + - `base_task.py`:定义任务执行模板流程(模板方法模式),包括获取窗口、调用上游、解析 / 校验、写库、更新游标等。 + - `orders_task.py` / `payments_task.py` / `members_task.py`:实现具体任务逻辑(订单、支付、会员)。 + +- **加载器 / SCD / 质量层** + - `loaders/`:根据目标表封装 Upsert / Insert / Update 逻辑。 + - `scd/scd2_handler.py`:为维度表提供 SCD2 历史管理能力。 + - `quality/`:执行数据质量检查,如余额对账。 + +- **模型层 (`models/`)** + - `parsers.py`:负责数据类型转换(字符串 → 时间戳、Decimal、int 等)。 + - `validators.py`:执行字段级和记录级的数据校验。 + +- **API 层 (`api/client.py`)** + - 封装 HTTP 调用,处理重试、超时及分页。 + +- **数据库层 (`database/`)** + - 管理数据库连接及上下文。 + - 提供批量插入 / 更新 / Upsert 操作接口。 + +- **配置层 (`config/`)** + - 定义配置项默认值。 + - 解析环境变量并进行类型转换。 + - 对外提供统一配置对象。 + +### 5.3 设计模式(当前使用) + +- 工厂模式:任务注册 / 创建(`TaskRegistry`)。 +- 模板方法模式:任务执行流程(`BaseTask`)。 +- 策略模式:不同 Loader / Checker 实现不同策略。 +- 依赖注入:通过构造函数向任务传入 `db`、`api`、`config` 等依赖。 + +### 5.4 数据与控制流程 + +整体流程: + +1. CLI 解析参数并加载配置。 +2. Scheduler 构建数据库连接、API 客户端等依赖。 +3. Scheduler 遍历任务配置,从 `TaskRegistry` 获取任务类并实例化。 +4. 每个任务按统一模板执行: + - 读取游标 / 时间窗口。 + - 调用 API 拉取数据(可分页)。 + - 解析、验证数据。 + - 通过 Loader 写入数据库(事实表 / 维度表 / SCD2)。 + - 执行质量检查。 + - 更新游标与运行记录。 +5. 所有任务执行完成后,释放连接并退出进程。 + +### 5.5 错误处理策略 + +- 单个任务失败不影响其他任务执行。 +- 数据库操作异常自动回滚当前事务。 +- API 请求失败时按配置进行重试,超过重试次数记录错误并终止该任务。 +- 所有错误被记录到日志和运行追踪表,便于事后排查。 + +--- + +## 6. 迁移指南(从旧脚本到当前项目) + +本节用于说明如何从旧的单文件脚本(如 `task_merged.py`)迁移到当前模块化项目,属于当前项目的使用说明,不涉及历史对比细节。 + +### 6.1 核心功能映射示意 + +| 旧版本函数 / 类 | 新版本位置 | 说明 | +|---------------------------|--------------------------------------------------------|----------------| +| `DEFAULTS` 字典 | `config/defaults.py` | 配置默认值 | +| `build_config()` | `config/settings.py::AppConfig.load()` | 配置加载 | +| `Pg` 类 | `database/connection.py::DatabaseConnection` | 数据库连接 | +| `http_get_json()` | `api/client.py::APIClient.get()` | API 请求 | +| `paged_get()` | `api/client.py::APIClient.get_paginated()` | 分页请求 | +| `parse_ts()` | `models/parsers.py::TypeParser.parse_timestamp()` | 时间解析 | +| `upsert_fact_order()` | `loaders/facts/order.py::OrderLoader.upsert_orders()` | 订单加载 | +| `scd2_upsert()` | `scd/scd2_handler.py::SCD2Handler.upsert()` | SCD2 处理 | +| `run_task_orders()` | `tasks/orders_task.py::OrdersTask.execute()` | 订单任务 | +| `main()` | `cli/main.py::main()` | 主入口 | + +### 6.2 典型迁移步骤 + +1. **配置迁移** + - 原来在 `DEFAULTS` 或脚本内硬编码的配置,迁移到 `.env` 与 `config/defaults.py`。 + - 使用 `AppConfig.load()` 统一获取配置。 + +2. **并行运行验证** + + ```bash + # 旧脚本 + python task_merged.py --tasks ORDERS + + # 新项目 + python -m cli.main --tasks ORDERS + ``` + + 对比新旧版本导出的数据表和日志,确认一致性。 + +3. **自定义逻辑迁移** + - 原脚本中的自定义清洗逻辑 → 放入相应 `loaders/` 或任务类中。 + - 自定义任务 → 在 `tasks/` 中实现并在 `task_registry` 中注册。 + - 自定义 API 调用 → 扩展 `api/client.py` 或单独封装服务类。 + +4. **逐步切换** + - 先在测试环境并行运行。 + - 再逐步切换生产任务到新版本。 + +--- + +## 7. 开发与扩展指南(当前项目) + +### 7.1 添加新任务 + +1. 在 `tasks/` 目录创建任务类: + +```python +from .base_task import BaseTask + +class MyTask(BaseTask): + def get_task_code(self) -> str: + return "MY_TASK" + + def execute(self) -> dict: + # 1. 获取时间窗口 + window_start, window_end, _ = self._get_time_window() + + # 2. 调用 API 获取数据 + records, _ = self.api.get_paginated(...) + + # 3. 解析 / 校验 + parsed = [self._parse(r) for r in records] + + # 4. 加载数据 + loader = MyLoader(self.db) + inserted, updated, _ = loader.upsert(parsed) + + # 5. 提交并返回结果 + self.db.commit() + return self._build_result("SUCCESS", { + "inserted": inserted, + "updated": updated, + }) +``` + +2. 在 `orchestration/task_registry.py` 中注册: + +```python +from tasks.my_task import MyTask + +default_registry.register("MY_TASK", MyTask) +``` + +3. 在任务配置表中启用(示例): + +```sql +INSERT INTO etl_admin.etl_task (task_code, store_id, enabled) +VALUES ('MY_TASK', 123456, TRUE); +``` + +### 7.2 添加新加载器 + +```python +from loaders.base_loader import BaseLoader + +class MyLoader(BaseLoader): + def upsert(self, records: list) -> tuple: + sql = "INSERT INTO table_name (...) VALUES (...) ON CONFLICT (...) DO UPDATE SET ... RETURNING (xmax = 0) AS inserted" + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) +``` + +### 7.3 添加新质量检查器 + +1. 在 `quality/` 中实现检查器,继承 `base_checker.py`。 +2. 在任务或调度流程中调用该检查器,在写库后进行验证。 + +### 7.4 类型解析与校验扩展 + +- 在 `models/parsers.py` 中添加新类型解析方法。 +- 在 `models/validators.py` 中添加新规则(如枚举校验、跨字段校验等)。 + +--- + +## 8. 常见问题排查 + +### 8.1 数据库连接失败 + +```text +错误: could not connect to server +``` + +排查要点: + +- 检查 `PG_DSN` 或相关数据库配置是否正确。 +- 确认数据库服务是否启动、网络是否可达。 + +### 8.2 API 请求超时 + +```text +错误: requests.exceptions.Timeout +``` + +排查要点: + +- 检查 `API_BASE` 地址与网络连通性。 +- 适当提高超时与重试次数(在配置中调整)。 + +### 8.3 模块导入错误 + +```text +错误: ModuleNotFoundError +``` + +排查要点: + +- 确认在项目根目录下运行(包含 `etl_billiards/` 包)。 +- 或通过 `pip install -e .` 以可编辑模式安装项目。 + +### 8.4 权限相关问题 + +```text +错误: Permission denied +``` + +排查要点: + +- 脚本无执行权限:`chmod +x run_etl.sh`。 +- Windows 需要以管理员身份运行,或修改日志 / 导出目录权限。 + +--- + +## 9. 使用前检查清单 + +在正式运行前建议确认: + +- [ ] 已安装 Python 3.10+。 +- [ ] 已执行 `pip install -r requirements.txt`。 +- [ ] `.env` 已配置正确(数据库、API、门店 ID、路径等)。 +- [ ] PostgreSQL 数据库可连接。 +- [ ] API 服务可访问且凭证有效。 +- [ ] `LOG_ROOT`、`EXPORT_ROOT` 目录存在且拥有写权限。 + +--- + +## 10. 参考说明 + +- 本文已合并原有的快速开始、项目结构、架构说明、迁移指南等内容,可作为当前项目的统一说明文档。 +- 如需在此基础上拆分多份文档,可按章节拆出,例如「快速开始」「架构设计」「迁移指南」「开发扩展」等。