This commit is contained in:
Neo
2026-01-27 22:47:05 +08:00
parent a6ad343092
commit f5f9a7eb66
476 changed files with 381543 additions and 5819 deletions

3
.gitignore vendored
View File

@@ -6,7 +6,6 @@ __pycache__/
.Python .Python
build/ build/
develop-eggs/ develop-eggs/
dist/
downloads/ downloads/
eggs/ eggs/
.eggs/ .eggs/
@@ -39,8 +38,6 @@ export/
logs/ logs/
# 环境变量 # 环境变量
.env
.env.local
# 测试 # 测试
.pytest_cache/ .pytest_cache/

File diff suppressed because it is too large Load Diff

664
README.md
View File

@@ -1,78 +1,616 @@
# 台球场 ETL 系统 # 飞球 ETL 系统ODS → DWD
用于台球门店业务的数据采集与入湖:从上游 API 拉取订单、支付、会员、库存等数据,先落地 ODS再清洗写入事实/维度表,并提供运行追踪、增量游标、数据质量检查与测试脚手架 面向门店业务的 ETL:从上游 API 或离线 JSON 采集订单、支付、会员、库存等数据,先落地 **ODS**,再清洗装载 **DWD**(含 SCD2 维度、事实增量),并输出质量校验报表
## 核心特性 ## 快速运行(离线示例 JSON
- **两阶段链路**ODS 原始留痕 + DWD/事实表清洗,支持回放与重跑 > 以下命令默认在 `etl_billiards/` 目录执行(项目会从 `etl_billiards/.env` 读取配置;也可直接设置环境变量)
- **任务注册与调度**`TaskRegistry` 统一管理任务代码,`ETLScheduler` 负责游标、运行记录和失败隔离。
- **统一底座**:配置(默认值 + `.env` + CLI 覆盖)、分页/重试的 API 客户端、批量 Upsert 的数据库封装、SCD2 维度处理、质量检查。
- **测试与回放**ONLINE/OFFLINE 模式切换,`run_tests.py`/`test_presets.py` 支持参数化测试;`MANUAL_INGEST` 可将归档 JSON 重灌入 ODS。
- **可安装**`setup.py` / `entry_point` 提供 `etl-billiards` 命令,或直接 `python -m cli.main` 运行。
## 仓库结构(摘录) 1) 环境Python 3.10+、PostgreSQL。
- `etl_billiards/config`:默认配置、环境变量解析、配置加载。 2) 配置:编辑 `etl_billiards/.env`(或设环境变量),至少包含:
- `etl_billiards/api`HTTP 客户端,内置重试/分页。 ```env
- `etl_billiards/database`:连接管理、批量 Upsert。 STORE_ID=123
- `etl_billiards/tasks`业务任务ORDERS、PAYMENTS…、ODS 任务、DWD 任务、人工回放;`base_task.py`/`base_dwd_task.py` 提供模板。 PG_DSN=postgresql://<user>:<password>@<host>:<port>/<db>
- `etl_billiards/loaders`:事实/维度/ODS Loader`scd/` 为 SCD2。 # 示例:使用仓库内置的最小样例(仅 1 个 JSON
- `etl_billiards/orchestration`:调度器、任务注册表、游标与运行追踪。 INGEST_SOURCE_DIR=../tmp/single_ingest
- `etl_billiards/scripts`:测试执行器、数据库连通性检测、预置测试指令。 ```
- `etl_billiards/tests`:单元/集成测试与离线 JSON 归档。 3) 安装依赖:
## 支持的任务代码
- **事实/维度**`ORDERS``PAYMENTS``REFUNDS``INVENTORY_CHANGE``COUPON_USAGE``MEMBERS``ASSISTANTS``PRODUCTS``TABLES``PACKAGES_DEF``TOPUPS``TABLE_DISCOUNT``ASSISTANT_ABOLISH``LEDGER``TICKET_DWD``PAYMENTS_DWD``MEMBERS_DWD`
- **ODS 原始采集**`ODS_ORDER_SETTLE``ODS_TABLE_USE``ODS_ASSISTANT_LEDGER``ODS_ASSISTANT_ABOLISH``ODS_GOODS_LEDGER``ODS_PAYMENT``ODS_REFUND``ODS_COUPON_VERIFY``ODS_MEMBER``ODS_MEMBER_CARD``ODS_PACKAGE``ODS_INVENTORY_STOCK``ODS_INVENTORY_CHANGE`
- **辅助**`MANUAL_INGEST`(将归档 JSON 回放到 ODS
## 快速开始
1. **环境要求**Python 3.10+、PostgreSQL。推荐在 `etl_billiards/` 目录下执行命令。
2. **安装依赖**
```bash ```bash
cd etl_billiards cd etl_billiards
pip install -r requirements.txt pip install -r requirements.txt
# 开发模式pip install -e .
``` ```
3. **配置 `.env`** 4) 回放入库ODS→ 装载 DWD → 质检(可用 `--ingest-source` 覆盖 `INGEST_SOURCE_DIR`
```bash ```bash
cp .env.example .env python -m cli.main --pipeline-flow INGEST_ONLY --tasks INIT_ODS_SCHEMA,INIT_DWD_SCHEMA
# 核心项 python -m cli.main --pipeline-flow INGEST_ONLY --tasks MANUAL_INGEST --ingest-source "../tmp/single_ingest"
PG_DSN=postgresql://user:pwd@host:5432/LLZQ python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS
API_BASE=https://api.example.com python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_QUALITY_CHECK
API_TOKEN=your_token # 报表reports/dwd_quality_report.json
STORE_ID=2790685415443269
EXPORT_ROOT=/path/to/export
LOG_ROOT=/path/to/logs
``` ```
配置的生效顺序为 “默认值” < “环境变量/.env” < “CLI 参数”。
4. **运行任务** > 可按需单独运行:
> - 仅建表:`python -m cli.main --tasks INIT_ODS_SCHEMA`
> - 仅 ODS 灌入:`python -m cli.main --tasks MANUAL_INGEST`
> - 仅 DWD 装载:`python -m cli.main --tasks INIT_DWD_SCHEMA,DWD_LOAD_FROM_ODS`
> Windows可用 `etl_billiards/run_ods.bat` 一键执行 ODS 建表 + 灌入示例 JSON`INIT_ODS_SCHEMA` + `MANUAL_INGEST`)。
## 正式环境(在线抓取 → 更新 ODS → 更新 DWD
**核心入口 CLI推荐在 `etl_billiards/` 目录执行)**
- `python -m cli.main`
### 必备配置(建议通过环境变量或 `.env`
- 数据库:`PG_DSN`、`STORE_ID`
- 在线抓取:`API_TOKEN`(可选 `API_BASE`、`API_TIMEOUT`、`API_PAGE_SIZE`、`API_RETRY_MAX`
- 输出目录(可选):`EXPORT_ROOT`、`LOG_ROOT`、`FETCH_ROOT`/`JSON_FETCH_ROOT`
**安全提示**:建议将数据库/Token 等凭证保存在 `.env` 或受控秘钥管理中,生产环境使用最小权限账号。
### 推荐定时方式 A两段定时更清晰
1) **更新 ODS在线抓取 + 入库FULL**
```bash ```bash
# 运行默认任务集 python -m cli.main \
python -m cli.main --pipeline-flow FULL \
--tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER \
# 按需选择任务(逗号分隔) --pg-dsn "$PG_DSN" --store-id "$STORE_ID" \
python -m cli.main --tasks ODS_ORDER_SETTLE,ORDERS,PAYMENTS --api-token "$API_TOKEN"
```
# Dry-run 示例(不提交事务) 2) **ODS → DWD将新增/变更同步到 DWD**
python -m cli.main --tasks ORDERS --dry-run ```bash
python -m cli.main \
# Windows 批处理 --pipeline-flow INGEST_ONLY \
..\\run_etl.bat --tasks PAYMENTS --tasks DWD_LOAD_FROM_ODS \
--pg-dsn "$PG_DSN" --store-id "$STORE_ID"
``` ```
5. **查看输出**:日志目录与导出目录分别由 `LOG_ROOT`、`EXPORT_ROOT` 控制;运行追踪与游标记录写入数据库 `etl_admin.*` 表。
## 数据与运行流转 ### 推荐定时方式 B一条命令串起来
- CLI 解析参数 → `AppConfig.load()` 组装配置 → `ETLScheduler` 创建 DB/API/游标/运行追踪器。 同一条命令先跑在线抓取/入库任务,再跑 DWD 装载任务:
- 调度器按任务代码实例化任务,读取/推进游标,落盘运行记录。 ```bash
- 任务模板:确定时间窗口 → 调用 API/ODS 数据 → 解析校验 → Loader 批量 Upsert/SCD2 → 质量检查 → 提交事务并回写游标。 python -m cli.main \
--pipeline-flow FULL \
--tasks PRODUCTS,TABLES,MEMBERS,ASSISTANTS,PACKAGES_DEF,ORDERS,PAYMENTS,REFUNDS,COUPON_USAGE,INVENTORY_CHANGE,TOPUPS,TABLE_DISCOUNT,ASSISTANT_ABOLISH,LEDGER,DWD_LOAD_FROM_ODS \
--pg-dsn "$PG_DSN" --store-id "$STORE_ID" \
--api-token "$API_TOKEN"
```
## 测试与回放 ### `pipeline-flow` 说明
- 单元/集成测试:`pytest` 或 `python scripts/run_tests.py --suite online`。 - `FULL`:在线抓取落盘 + 本地清洗入库ODS 任务会走抓取;`DWD_LOAD_FROM_ODS` 仅走入库阶段)
- 预置组合:`python scripts/run_tests.py --preset offline_realdb`(见 `scripts/test_presets.py`)。 - `FETCH_ONLY`:仅在线抓取落盘,不入库
- 离线模式:`TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=... pytest tests/unit/test_etl_tasks_offline.py`。 - `INGEST_ONLY`:仅从本地 JSON 回放入库(适合离线回放/补跑)
- 数据库连通性:`python scripts/test_db_connection.py --dsn postgresql://... --query "SELECT 1"`。
## 其他提示 ## 目录结构与关键文件
- `.env.example` 列出了所有常用配置;`config/defaults.py` 记录默认值与任务窗口配置。 - 仓库根目录:`etl_billiards/` 主代码;`app/` 示例 runner`开发笔记/` 项目笔记;`tmp/` 草稿/调试归档;`requirements.txt`(仓库根)依赖;`run_etl.sh/.bat` 启动脚本。
- `loaders/ods/generic.py` 支持定义主键/列名即可落 ODS`tasks/manual_ingest_task.py` 可将归档 JSON 快速灌入对应 ODS 表 - 注意:根目录的 `run_etl.sh/.bat` 运行时要求当前目录为 `etl_billiards/`(因为入口是 `python -m cli.main`
- 需要新增任务时,在 `tasks/` 中实现并在 `orchestration/task_registry.py` 注册即可复用调度能力。 - `etl_billiards/`(主代码目录)
- `.env`:本地配置文件(可选,用环境变量也可)
- `cli/`CLI 入口(`cli/main.py`
- `config/``defaults.py` 默认值;`env_parser.py` 解析 `.env`/环境变量;`settings.py` AppConfig 加载/校验
- `api/``client.py` HTTP 请求、重试、分页与落盘
- `database/``connection.py` 连接封装;`operations.py` 批量 upsertDDL`schema_ODS_doc.sql`、`schema_dwd_doc.sql`
- `tasks/`业务任务ODS 抓取/回放、DWD 装载、质检等)
- `loaders/`ODS/DWD/SCD2 Loader 实现
- `scd/``scd2_handler.py`(维度 SCD2 历史)
- `quality/`:质量检查器(行数/金额对照)
- `orchestration/``scheduler.py` 调度;`task_registry.py` 注册;`cursor_manager.py` 水位管理;`run_tracker.py` 运行记录
- `scripts/`:重建/测试/探活工具
- `docs/`:映射/样例/质检说明文档
- `fetch-test/`:接口联调/规则验证的一次性脚本与报告(不影响主流程)
- `reports/`:质检输出(如 `dwd_quality_report.json`
- `tests/`:单元/集成测试
## 项目文件索引(维护/AI 快速定位)
> 说明:用于维护/AI 快速定位文件路径与用途;默认不列出 `.git/`、`__pycache__/`、`.pytest_cache/`、`*.pyc` 等自动生成内容。
### /
- `.gitignore`Git 忽略规则。
- `.gitkeep`:占位文件(用于保留空目录)。
- `README.md`:项目总览与使用说明(本文档)。
- `requirements.txt`:仓库根依赖清单(不含版本约束,建议优先用 `etl_billiards/requirements.txt`)。
- `run_etl.bat`Windows 启动脚本(需先 `cd etl_billiards`;入口为 `python -m cli.main`)。
- `run_etl.sh`Linux/macOS 启动脚本(需先 `cd etl_billiards`;会加载当前目录 `.env`)。
### app/
### etl_billiards/
- `etl_billiards/.env`:本地运行环境变量(含敏感信息,勿提交/勿外传)。
- `etl_billiards/__init__.py`Python 包标记文件。
- `etl_billiards/ods_row_report.json`ODS 行数对照报告source json vs ODS 表)。
- `etl_billiards/requirements.txt`ETL 运行依赖(带最低版本约束)。
- `etl_billiards/run_ods.bat`Windows 一键脚本:重建 ODS 并灌入示例 JSON。
- `etl_billiards/setup.py`:打包/安装脚本(当前项目主要按“`cd etl_billiards; python -m cli.main`”方式运行)。
### etl_billiards/api/
- `etl_billiards/api/__init__.py`Python 包标记文件。
- `etl_billiards/api/client.py`API客户端统一封装 POST/重试/分页与列表提取逻辑。
- `etl_billiards/api/endpoint_routing.py`:“近期记录 / 历史记录(Former)”接口路由规则。
- `etl_billiards/api/local_json_client.py`:本地 JSON 客户端,模拟 APIClient 的分页接口,从落盘的 JSON 回放数据。
- `etl_billiards/api/recording_client.py`:包装 APIClient将分页响应落盘便于后续本地清洗。
### etl_billiards/cli/
- `etl_billiards/cli/__init__.py`Python 包标记文件。
- `etl_billiards/cli/main.py`CLI主入口
### etl_billiards/config/
- `etl_billiards/config/__init__.py`Python 包标记文件。
- `etl_billiards/config/defaults.py`:配置默认值定义
- `etl_billiards/config/env_parser.py`:环境变量解析
- `etl_billiards/config/settings.py`:配置管理主类
### etl_billiards/database/
- `etl_billiards/database/__init__.py`Python 包标记文件。
- `etl_billiards/database/base.py`数据库操作批量、RETURNING支持
- `etl_billiards/database/connection.py`Database connection manager with capped connect_timeout.
- `etl_billiards/database/operations.py`:数据库批量操作
- `etl_billiards/database/schema_dwd_doc.sql`DWD Schema DDL含字段/注释/口径说明)。
- `etl_billiards/database/schema_etl_admin.sql`etl_admin 元数据 Schema DDL任务/水位/运行记录等)。
- `etl_billiards/database/schema_ODS_doc.sql`ODS Schema DDL含字段/注释/口径说明)。
- `etl_billiards/database/seed_ods_tasks.sql`SQL 种子脚本:初始化/注册 ODS 任务。
- `etl_billiards/database/seed_scheduler_tasks.sql`SQL 种子脚本:初始化调度任务配置。
### etl_billiards/database/Deleded & backup/
- (本目录无直接文件)
### etl_billiards/docs/
- `etl_billiards/docs/dwd_main_tables_dictionary.md`DWD 主表(非 Ex表格说明书
- `etl_billiards/docs/在线抓取更新ODS 然后将更新的ODS内容对应到DWD的更新。.md`在线抓取更新ODS 然后将更新的ODS内容对应到DWD的更新。
### etl_billiards/fetch-test/
- `etl_billiards/fetch-test/compare_recent_former_endpoints.py`:对比“近期记录”与“历史记录(Former)”接口:
- `etl_billiards/fetch-test/README.md`fetch-test
- `etl_billiards/fetch-test/recent_vs_former_report.json`:报告/比对输出JSON
- `etl_billiards/fetch-test/recent_vs_former_report.md`:近期记录 vs 历史记录(Former) 接口对比报告
### etl_billiards/loaders/
- `etl_billiards/loaders/__init__.py`Python 包标记文件。
- `etl_billiards/loaders/base_loader.py`:数据加载器基类
### etl_billiards/loaders/dimensions/
- `etl_billiards/loaders/dimensions/__init__.py`Python 包标记文件。
- `etl_billiards/loaders/dimensions/assistant.py`:助教维度加载器
- `etl_billiards/loaders/dimensions/member.py`:会员维度表加载器
- `etl_billiards/loaders/dimensions/package.py`:团购/套餐定义加载器
- `etl_billiards/loaders/dimensions/product.py`:商品维度 + 价格SCD2 加载器
- `etl_billiards/loaders/dimensions/table.py`:台桌维度加载器
### etl_billiards/loaders/facts/
- `etl_billiards/loaders/facts/__init__.py`Python 包标记文件。
- `etl_billiards/loaders/facts/assistant_abolish.py`:助教作废事实表
- `etl_billiards/loaders/facts/assistant_ledger.py`:助教流水事实表
- `etl_billiards/loaders/facts/coupon_usage.py`:券核销事实表
- `etl_billiards/loaders/facts/inventory_change.py`:库存变动事实表
- `etl_billiards/loaders/facts/order.py`:订单事实表加载器
- `etl_billiards/loaders/facts/payment.py`:支付事实表加载器
- `etl_billiards/loaders/facts/refund.py`:退款事实表加载器
- `etl_billiards/loaders/facts/table_discount.py`:台费打折事实表
- `etl_billiards/loaders/facts/ticket.py`:小票详情加载器
- `etl_billiards/loaders/facts/topup.py`:充值记录事实表
### etl_billiards/loaders/ods/
- `etl_billiards/loaders/ods/__init__.py`Python 包标记文件。
- `etl_billiards/loaders/ods/generic.py`Generic ODS loader that keeps raw payload + primary keys.
### etl_billiards/models/
- `etl_billiards/models/__init__.py`Python 包标记文件。
- `etl_billiards/models/parsers.py`:数据类型解析器
- `etl_billiards/models/validators.py`:数据验证器
### etl_billiards/orchestration/
- `etl_billiards/orchestration/__init__.py`Python 包标记文件。
- `etl_billiards/orchestration/cursor_manager.py`:游标管理器
- `etl_billiards/orchestration/run_tracker.py`:运行记录追踪器
- `etl_billiards/orchestration/scheduler.py`ETL 调度:支持在线抓取、离线清洗入库、全流程三种模式。
- `etl_billiards/orchestration/task_registry.py`:任务注册表
### etl_billiards/quality/
- `etl_billiards/quality/__init__.py`Python 包标记文件。
- `etl_billiards/quality/balance_checker.py`:余额一致性检查器
- `etl_billiards/quality/base_checker.py`:数据质量检查器基类
### etl_billiards/reports/
- `etl_billiards/reports/dwd_quality_report.json`DWD 质量核对输出(行数/金额对照)。
### etl_billiards/scd/
- `etl_billiards/scd/__init__.py`Python 包标记文件。
- `etl_billiards/scd/scd2_handler.py`SCD2 (Slowly Changing Dimension Type 2) 处理逻辑
### etl_billiards/scripts/
- `etl_billiards/scripts/bootstrap_schema.py`Apply the PRD-aligned warehouse schema (ODS/DWD/DWS) to PostgreSQL.
- `etl_billiards/scripts/build_dwd_from_ods.py`Populate PRD DWD tables from ODS payload snapshots.
- `etl_billiards/scripts/build_dws_order_summary.py`Recompute billiards_dws.dws_order_summary from DWD fact tables.
- `etl_billiards/scripts/check_ods_json_vs_table.py`ODS JSON 字段核对脚本:对照当前数据库中的 ODS 表字段,检查示例 JSON默认目录 export/test-json-doc
- `etl_billiards/scripts/check_ods_gaps.py`ODS 缺失校验脚本API 主键 vs ODS 主键逐条比对,输出缺失明细样例。
- `etl_billiards/scripts/reload_ods_windowed.py`ODS 窗口化补跑脚本:按时间切片重跑 ODS 任务,并可配置窗口粒度与延时。
- `etl_billiards/scripts/rebuild_db_and_run_ods_to_dwd.py`:一键重建 ETL 相关 Schema并执行 ODS → DWD。
- `etl_billiards/scripts/rebuild_ods_from_json.py`:从本地 JSON 示例目录重建 billiards_ods.* 表,并导入样例数据。
- `etl_billiards/scripts/run_tests.py`:灵活的测试执行脚本,可像搭积木一样组合不同参数或预置命令(模式/数据库/归档路径等),
- `etl_billiards/scripts/test_db_connection.py`Quick utility for validating PostgreSQL connectivity (ASCII-only output).
- `etl_billiards/scripts/test_presets.py`:测试命令仓库:集中维护 run_tests.py 的常用组合,支持一键执行。
### etl_billiards/scripts/Deleded & backup/
- (本目录无直接文件)
### etl_billiards/tasks/
- `etl_billiards/tasks/__init__.py`Python 包标记文件。
- `etl_billiards/tasks/assistant_abolish_task.py`:助教作废任务
- `etl_billiards/tasks/assistants_task.py`:助教账号任务
- `etl_billiards/tasks/base_dwd_task.py`DWD任务基类
- `etl_billiards/tasks/base_task.py`ETL任务基类引入 Extract/Transform/Load 模板方法)
- `etl_billiards/tasks/coupon_usage_task.py`:平台券核销任务
- `etl_billiards/tasks/dwd_load_task.py`DWD 装载任务:从 ODS 增量写入 DWD维度 SCD2事实按时间增量
- `etl_billiards/tasks/dwd_quality_task.py`DWD 质量核对任务:按 dwd_quality_check.md 输出行数/金额对照报表。
- `etl_billiards/tasks/init_dwd_schema_task.py`:初始化 DWD Schema执行 schema_dwd_doc.sql可选先 DROP SCHEMA。
- `etl_billiards/tasks/init_schema_task.py`:任务:初始化运行环境,执行 ODS 与 etl_admin 的 DDL并准备日志/导出目录。
- `etl_billiards/tasks/inventory_change_task.py`:库存变更任务
- `etl_billiards/tasks/ledger_task.py`:助教流水任务
- `etl_billiards/tasks/manual_ingest_task.py`:手工示例数据灌入:按 schema_ODS_doc.sql 的表结构写入 ODS。
- `etl_billiards/tasks/members_dwd_task.py`DWD TaskProcess Member Records from ODS to Dimension Table.
- `etl_billiards/tasks/members_task.py`会员ETL任务
- `etl_billiards/tasks/ods_json_archive_task.py`:在线抓取 ODS 相关接口并落盘为 JSON用于后续离线回放/入库)。
- `etl_billiards/tasks/ods_tasks.py`ODS ingestion tasks.
- `etl_billiards/tasks/orders_task.py`订单ETL任务
- `etl_billiards/tasks/packages_task.py`:团购/套餐定义任务
- `etl_billiards/tasks/payments_dwd_task.py`DWD TaskProcess Payment Records from ODS to Fact Table.
- `etl_billiards/tasks/payments_task.py`支付记录ETL任务
- `etl_billiards/tasks/products_task.py`商品档案PRODUCTSETL任务
- `etl_billiards/tasks/refunds_task.py`:退款记录任务
- `etl_billiards/tasks/table_discount_task.py`:台费折扣任务
- `etl_billiards/tasks/tables_task.py`:台桌档案任务
- `etl_billiards/tasks/ticket_dwd_task.py`DWD TaskProcess Ticket Details from ODS to DWD fact tables.
- `etl_billiards/tasks/topups_task.py`:充值记录任务
### etl_billiards/tasks/dwd/
- (本目录无直接文件)
### etl_billiards/tests/
- `etl_billiards/tests/__init__.py`Python 包标记文件。
### etl_billiards/tests/integration/
- `etl_billiards/tests/integration/__init__.py`Python 包标记文件。
- `etl_billiards/tests/integration/test_database.py`:数据库集成测试
### etl_billiards/tests/unit/
- `etl_billiards/tests/unit/__init__.py`Python 包标记文件。
- `etl_billiards/tests/unit/task_test_utils.py`ETL 任务测试的共用辅助模块,涵盖在线/离线模式所需的伪造数据、客户端与配置等工具函数。
- `etl_billiards/tests/unit/test_config.py`:配置管理测试
- `etl_billiards/tests/unit/test_endpoint_routing.py`Unit tests for recent/former endpoint routing.
- `etl_billiards/tests/unit/test_etl_tasks_offline.py`:离线模式任务测试,通过回放归档 JSON 来验证 T+L 链路可用。
- `etl_billiards/tests/unit/test_etl_tasks_online.py`:在线模式下的端到端任务测试,验证所有任务在模拟 API 下能顺利执行。
- `etl_billiards/tests/unit/test_etl_tasks_stages.py`:验证 14 个任务的 E/T/L 分阶段调用FakeDB/FakeAPI不访问真实接口或数据库
- `etl_billiards/tests/unit/test_ods_tasks.py`Unit tests for the new ODS ingestion tasks.
- `etl_billiards/tests/unit/test_parsers.py`:解析器测试
- `etl_billiards/tests/unit/test_reporting.py`:汇总与报告工具的单测。
### etl_billiards/utils/
- `etl_billiards/utils/__init__.py`Python 包标记文件。
- `etl_billiards/utils/helpers.py`:通用工具函数
- `etl_billiards/utils/json_store.py`JSON 归档/读取的通用工具。
- `etl_billiards/utils/reporting.py`:简单的任务结果汇总与格式化工具。
### tmp/
- `tmp/20251121-task.txt`:历史任务/计划记录(可能存在编码问题)。
- `tmp/doc_extracted.txt`:从 DWD 文档抽取的正文(大文本)。
- `tmp/doc_lines.txt`DWD 文档按行抽取/对照(文本)。
- `tmp/dwd_tables.json`DWD 表清单JSON
- `tmp/dwd_tables_full.json`DWD 表清单(完整版 JSON
- `tmp/hebing.py`:临时脚本:按“同名 key”合并目录内 md+json 输出 merged_output.txt。
- `tmp/README_FULL.md`:历史/草稿README 详细版(已合并进根 README
- `tmp/rebuild_run_20251214-042115.log`:运行日志/调试输出(临时文件)。
- `tmp/rewrite_schema_dwd_doc_comments.py`:临时脚本:批量重写 DWD DDL 注释(归档/草稿)。
- `tmp/rewrite_schema_ods_doc_comments.py`:临时脚本:批量重写 ODS DDL 注释(归档/草稿)。
- `tmp/schema_dwd.sql`DWD schema 草稿/导出(归档)。
- `tmp/schema_dwd_doc.sql`DWD schema doc 版本(归档)。
- `tmp/schema_ODS_doc copy.sql`ODS schema doc 备份(归档)。
- `tmp/schema_ODS_doc.sql`ODS schema doc 版本(归档)。
- `tmp/temp_chinese.txt`:编码/文本对照测试。
- `tmp/tmp_debug_sql.py`:临时脚本:调试 SQL/映射(归档)。
- `tmp/tmp_drop_dwd.py`临时脚本DROP SCHEMA billiards_dwd危险勿在生产执行
- `tmp/tmp_dwd_tasks.py`:临时脚本:调试 DWD 相关任务(归档)。
- `tmp/tmp_problems.py`:临时脚本:问题排查记录/复现(归档)。
- `tmp/tmp_run_sql.py`:临时脚本:拼接/执行一条 INSERT...SELECT 验证映射(需 PG_DSN
- `tmp/非球接口API.md`:上游接口笔记/汇总(草稿/归档)。
### tmp/a/
- (本目录无直接文件)
### tmp/b/
- (本目录无直接文件)
### tmp/etl_billiards_misc/
- `tmp/etl_billiards_misc/0.py`Simple PostgreSQL connectivity smoke-checker.
- `tmp/etl_billiards_misc/feiqiu-ETL.code-workspace`VS Code workspace 文件(归档)。
- `tmp/etl_billiards_misc/草稿.txt`:草稿/说明(归档)。
### tmp/etl_billiards_misc/backups/
- `tmp/etl_billiards_misc/backups/manual_ingest_task.py`:历史版本备份(归档)。
- `tmp/etl_billiards_misc/backups/manual_ingest_task.py.bak_20251209`:历史版本备份(归档)。
- `tmp/etl_billiards_misc/backups/schema_ODS_doc.sql`:历史版本备份(归档)。
- `tmp/etl_billiards_misc/backups/schema_ODS_doc.sql.bak_20251209`:历史版本备份(归档)。
### tmp/etl_billiards_misc/tmp & Delete/
- `tmp/etl_billiards_misc/tmp & Delete/.env.example`:旧示例配置(归档)。
- `tmp/etl_billiards_misc/tmp & Delete/dwd_schema_columns.txt`DWD 字段提取/对照文本(归档)。
- `tmp/etl_billiards_misc/tmp & Delete/DWD层设计建议.docx`DWD 设计建议文档(归档)。
- `tmp/etl_billiards_misc/tmp & Delete/DWD层设计草稿.md`DWD 设计草稿(归档)。
- `tmp/etl_billiards_misc/tmp & Delete/schema_dwd_doc.sql.bak`schema 备份(归档)。
- `tmp/etl_billiards_misc/tmp & Delete/schema_ODS_doc.sql.bak`schema 备份(归档)。
- `tmp/etl_billiards_misc/tmp & Delete/schema_ODS_doc.sql.rewrite2.bak`schema 重写过程备份(归档)。
- `tmp/etl_billiards_misc/tmp & Delete/schema_v2.sql`schema v2 草稿(归档)。
### tmp/recharge_only/
- `tmp/recharge_only/recharge_settlements.json`:离线样例 JSON仅充值结算
### tmp/single_ingest/
- `tmp/single_ingest/goods_stock_movements.json`:离线最小样例 JSON单文件
### 开发笔记/
- `开发笔记/记录.md`:开发/迁移过程的备忘与待办(归档)。
## 架构与流程
执行链路(控制流):
1) CLI`cli/main.py`)解析参数 → 生成 AppConfig → 初始化日志/DB/API
2) 调度层(`orchestration/scheduler.py`)按 `task_registry.py` 实例化任务,设置 run_uuid、cursor水位、上下文
3) 任务执行模板:获取时间窗口/水位(`cursor_manager.py`)→ ExtractAPI 分页/重试或离线读 JSON→ Transform解析/校验)→ LoadLoader 批量 upsert/SCD2/增量写入,底层 `database/operations.py`)→(可选)质量检查 → 更新水位与运行记录(`run_tracker.py`),提交/回滚事务。
数据流与依赖:
- 配置:`config/defaults.py` + `.env`/环境变量 + CLI 参数叠加
- 在线:`api/client.py` 支撑分页/重试;可落盘 JSON`pipeline.fetch_root`
- 离线:`manual_ingest_task.py` 从 `INGEST_SOURCE_DIR` 回放入库
- DWD`dwd_load_task.py` 依据 `TABLE_MAP/FACT_MAPPINGS` 映射装载,维度走 SCD2事实走增量
- 质检:`dwd_quality_task.py` 输出 `reports/dwd_quality_report.json`
## ODS → DWD 策略与建模要点
1) ODS 留底保留源主键、payload、时间/来源信息,便于回溯。
2) DWD 清洗:维度走 SCD2事实按时间/水位增量;字段类型、单位、枚举标准化,同时保留溯源字段。
3) 颗粒一致:一张 DWD 表只承载一种业务事件/颗粒,避免混颗粒。
4) 业务键统一site_id、member_id、table_id、order_settle_id、order_trade_no 等统一命名。
5) 不过度汇总DWD 只做明细/轻度清洗,聚合留到 DWS/报表。
6) 去嵌套:数组展开为子表/子行,重复 profile 提炼为维度。
7) 长期演进:优先加列/加表,减少对已有表结构的破坏。
## 常用 CLI
```bash
cd etl_billiards
# 运行 defaults.py 中的默认任务列表(在线 FULL 流程)
python -m cli.main --pg-dsn "$PG_DSN" --store-id "$STORE_ID" --api-token "$API_TOKEN"
# 运行指定任务
python -m cli.main --tasks INIT_ODS_SCHEMA,MANUAL_INGEST --pipeline-flow INGEST_ONLY --ingest-source "../tmp/single_ingest"
# 覆盖 DSN / API / 输出目录
python -m cli.main --pg-dsn "postgresql://user:pwd@host:5432/db" --store-id 123 --api-token "..." --fetch-root "./json_fetch"
# 试运行(不写库)
python -m cli.main --dry-run --tasks DWD_LOAD_FROM_ODS
```
## 窗口切分与补偿
用于 ETL 任务、ODS 缺失校验、数据一致性检查等“带时间窗口”的执行场景。逻辑如下:
- 仅当传入窗口参数(如 CLI `--window-start/--window-end` 或脚本 `--start/--end`)时启用切分。
- 先对整体窗口前后补偿 N 小时,再按月切分(`month` 为最大单位)。不需要切分时设为 `none`。
- 分段窗口将依次执行并汇总结果。
配置项(默认值见 `config/defaults.py`
- `run.window_split.unit``month` / `none`(默认 `month`
- `run.window_split.compensation_hours`:整数小时(默认 2
环境变量:
- `WINDOW_SPLIT_UNIT`
- `WINDOW_COMPENSATION_HOURS`
CLI 参数(覆盖配置):
- `python -m cli.main``--window-split-unit``--window-compensation-hours`
- `scripts/check_ods_gaps.py``--window-split-unit``--window-compensation-hours`
- `scripts/check_data_integrity.py``--window-split-unit``--window-compensation-hours`
- `scripts/reload_ods_windowed.py``--window-split-unit``--window-compensation-hours`
示例(`unit=month``compensation_hours=2`
- 传入窗口:`2025/11/10 10:00` - `2026/1/19 10:15`
- 实际处理窗口切分:
- `2025/11/10 08:00` - `2025/12/01 00:00`
- `2025/12/01 00:00` - `2026/01/01 00:00`
- `2026/01/01 00:00` - `2026/01/19 12:15`
## 测试
说明:仓库未固定 pytest 版本(运行测试需自行安装 `pytest`)。
```bash
cd etl_billiards
pip install pytest
# 单元测试(模拟 API + FakeDB
pytest tests/unit
# 集成测试(需要设置 TEST_DB_DSN
TEST_DB_DSN="postgresql://user:pwd@host:5432/db" pytest tests/integration/test_database.py
# 便捷测试执行器(可选)
python scripts/run_tests.py --suite online -k ORDERS
python scripts/test_db_connection.py --dsn "postgresql://user:pwd@host:5432/db" --query "SELECT 1"
```
## 开发与扩展
- 新任务:在 `tasks/` 继承 BaseTask实现 `get_task_code/execute`,并在 `orchestration/task_registry.py` 注册。
- 新 Loader/Checker参考 `loaders/`、`quality/`,复用批量 upsert/质检接口。
- 新配置项:在 `config/defaults.py` 增加默认值,并在 `config/env_parser.py` 增加环境变量映射(如需要)。
## ODS 任务上线指引
- 元数据/任务注册脚本:
- `etl_billiards/database/seed_ods_tasks.sql`
- `etl_billiards/database/seed_scheduler_tasks.sql`
- 确认 `etl_admin.etl_task` 中已启用所需任务(不同环境需替换 store_id / schema
- 离线回放/重建 ODS开发/运维):
```bash
cd etl_billiards
python scripts/rebuild_ods_from_json.py --dsn "$PG_DSN" --json-dir "export/test-json-doc"
```
## ODS 表概览(数据路径)
| ODS 表名 | 接口 Path | 数据列表路径 |
| ---------------------------------- | ------------------------------------------------- | ----------------------------- |
| assistant_accounts_master | /PersonnelManagement/SearchAssistantInfo | data.assistantInfos |
| assistant_service_records | /AssistantPerformance/GetOrderAssistantDetails | data.orderAssistantDetails |
| assistant_cancellation_records | /AssistantPerformance/GetAbolitionAssistant | data.abolitionAssistants |
| goods_stock_movements | /GoodsStockManage/QueryGoodsOutboundReceipt | data.queryDeliveryRecordsList |
| goods_stock_summary | /TenantGoods/GetGoodsStockReport | data |
| group_buy_packages | /PackageCoupon/QueryPackageCouponList | data.packageCouponList |
| group_buy_redemption_records | /Site/GetSiteTableUseDetails | data.siteTableUseDetailsList |
| member_profiles | /MemberProfile/GetTenantMemberList | data.tenantMemberInfos |
| member_balance_changes | /MemberProfile/GetMemberCardBalanceChange | data.tenantMemberCardLogs |
| member_stored_value_cards | /MemberProfile/GetTenantMemberCardList | data.tenantMemberCards |
| payment_transactions | /PayLog/GetPayLogListPage | data |
| platform_coupon_redemption_records | /Promotion/GetOfflineCouponConsumePageList | data |
| recharge_settlements | /Site/GetRechargeSettleList | data.settleList |
| refund_transactions | /Order/GetRefundPayLogList | data |
| settlement_records | /Site/GetAllOrderSettleList | data.settleList |
| settlement_ticket_details | /Order/GetOrderSettleTicketNew | 完整 JSON |
| site_tables_master | /Table/GetSiteTables | data.siteTables |
| stock_goods_category_tree | /TenantGoodsCategory/QueryPrimarySecondaryCategory| data.goodsCategoryList |
| store_goods_master | /TenantGoods/GetGoodsInventoryList | data.orderGoodsList |
| store_goods_sales_records | /TenantGoods/GetGoodsSalesList | data.orderGoodsLedgers |
| table_fee_discount_records | /Site/GetTaiFeeAdjustList | data.taiFeeAdjustInfos |
| table_fee_transactions | /Site/GetSiteTableOrderDetails | data.siteTableUseDetailsList |
| tenant_goods_master | /TenantGoods/QueryTenantGoods | data.tenantGoodsList |
> 完整字段级映射见 `etl_billiards/docs/` 与 ODS/DWD DDL。
## 当前状态2025-12-09
- 示例 JSON 已全量灌入DWD 行数与 ODS 对齐。
- 分类维度已展平大类+子类:`dim_goods_category` 26 行category_level/leaf 已赋值)。
- 部分空字段源数据即为空,如需补值请先确认上游。
## 可精简/归档
- `tmp/`、`tmp/etl_billiards_misc/` 中草稿、旧备份、调试脚本仅供参考,不影响运行。
- 根级保留必要文件README、requirements、run_etl.*),其余临时文件按需归档至 `tmp/`。
## 一键更新(推荐)
日常需要把数据从 ODS 更新到最新,并同步刷新 DWD/DWS 时,直接运行一键脚本:
```bash
cd etl_billiards
python run_update.py
```
常用参数:
- `--overlap-seconds 3600`:冗余抓取窗口(默认 3600 秒)
- `--dws-rebuild-days 1`DWS 回算冗余天数(默认 1 天)
- `--dws-start YYYY-MM-DD --dws-end YYYY-MM-DD`:手工指定 DWS 回算日期范围
- `--skip-ods`:跳过 ODS 在线抓取(仅跑 DWD/DWS
- `--ods-tasks ODS_PAYMENT,ODS_TABLE_USE,...`:只跑指定 ODS 任务
- `--check-ods-gaps`:在 ODS 更新完成后执行缺失校验API 主键 vs ODS 主键)
- `--check-ods-overlap-hours 24`:缺失校验时,从 ODS 最新截止时间回溯的小时数(默认 24
- `--check-ods-window-days 1`:缺失校验 API 窗口粒度(默认 1 天)
- `--check-ods-page-size 200`:缺失校验 API 每页大小(默认 200
- `--check-ods-timeout-sec 1800`:缺失校验步骤超时秒数(默认 1800
- `--check-ods-task-codes ODS_PAYMENT,ODS_TABLE_USE,...`:仅校验指定 ODS 任务
### ODS 缺失校验API vs ODS
说明:
- 校验口径为 ODS 表 `MAX(fetched_at)` 的最小值,视为“最新一致截止时间”。
- `--from-cutoff` 会从该截止时间回溯 N 小时(默认 24 小时)到当前,便于日常增量校验。
全量校验(从 2025-07 至今):
```bash
cd etl_billiards
python scripts/check_ods_gaps.py --start 2025-07-01
```
更新时校验(从 ODS 最新截止时间回溯 24h
```bash
cd etl_billiards
python run_update.py --check-ods-gaps
```
## FAQ
- 字段空值:若映射已存在且源列非空仍为空,再检查上游 JSON维度 SCD2 按全量合并。
- DSN/路径:确认 `PG_DSN`、`STORE_ID`、`INGEST_SOURCE_DIR` 与本地一致。
- 新增任务:在 `tasks/` 实现并注册到 `task_registry.py`,必要时同步更新 DDL 与映射。
- 权限/运行:检查网络、账号权限;脚本需执行权限(如 `chmod +x run_etl.sh`)。
---
## Cutoff截止时间检查
当你需要“上次数据截止到什么时候”“现在应该从哪里开始补跑”时,使用任务 `CHECK_CUTOFF`
```bash
cd etl_billiards
python -m cli.main --pipeline-flow INGEST_ONLY --tasks CHECK_CUTOFF
```
它会输出:
- `etl_admin.etl_cursor`:每个任务的 `last_start/last_end/last_run_id`(调度游标)
- ODS对 `DWD_LOAD_FROM_ODS` 依赖的各个 `billiards_ods.*` 表做 `MAX(fetched_at)`(真实已入库 ODS 的截止)
- DWD/DWS输出若干关键表的最大业务时间/最大更新时刻,便于快速核对
> 如果 `etl_cursor.last_end` 很新,但 ODS 的 `MAX(fetched_at)` 很旧,通常表示在线抓取没跑通(最常见是 `API_TOKEN` 过期导致 401
## 冗余抓取方案(推荐)
为避免边界时间丢数(上游延迟写入、接口分页抖动、窗口切换等),建议在 cutoff 基础上向前追加 **1 小时** 冗余量:
- 配置:将 `OVERLAP_SECONDS` 设为 3600默认 120 秒)
```env
# etl_billiards/.env
OVERLAP_SECONDS=3600
```
冗余方案的关键点是“重抓不重落”,依靠各层的去重/幂等机制只落新数据:
- **ODS 层**:主键/冲突列 UPSERT重复抓取只会 upsert不会重复插入
- **DWD 层**:事实表增量插入 + 主键冲突不重复落(重复范围会被跳过),维度表按 SCD2 合并
- **DWS 层**:对指定日期窗口先 delete 再 upsert窗口内重算幂等
> 如果你希望“冗余窗口内的数据发生变更也要覆盖更新”,需要把对应层的冲突策略从 `DO NOTHING` 调整为 `DO UPDATE`(当前实现以“只落新数据”为主)。
## DWS汇总层入库
本项目已包含 `billiards_dws` 汇总层(当前提供 `dws_order_summary`
1) 初始化 DWS 表结构:
```bash
python -m cli.main --pipeline-flow INGEST_ONLY --tasks INIT_DWS_SCHEMA
```
2) 生成/刷新汇总表(按窗口重算,建议配合 `--window-start/--window-end`
```bash
python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWS_BUILD_ORDER_SUMMARY \
--window-start "2025-10-01 00:00:00" \
--window-end "2025-12-26 23:59:59"
```
3) 推荐串联ODS -> DWD -> DWS
```bash
# 先跑在线 ODS 抓取(需要有效 API_TOKEN如果出现 401 请更新 token
python -m cli.main --pipeline-flow FULL --tasks ODS_MEMBER,ODS_PAYMENT,ODS_REFUND,ODS_SETTLEMENT_RECORDS
# 再把 ODS 增量同步到 DWD
python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWD_LOAD_FROM_ODS
# 最后重算 DWS
python -m cli.main --pipeline-flow INGEST_ONLY --tasks DWS_BUILD_ORDER_SUMMARY
```
## 日志 (UTF-8)
- 默认日志目录:`etl_billiards/logs/`
- 每次运行都会生成一个带有时间戳的 `.log` 文件,以便于使用外部工具查看。
常用选项:
- `--log-file` 自定义日志路径(覆盖默认值)。
- `--log-dir` 自定义日志目录。
- `--log-level` 日志级别(`INFO`/`DEBUG`)。
- `--no-log-console` 禁用控制台日志记录(仅写入文件)。
示例(按月切分 + 前后补偿 2h
```bash
cd etl_billiards
python scripts/check_ods_gaps.py --start 2025-07-01 --end 2025-09-30 --window-split-unit month --window-compensation-hours 2 --task-codes ODS_PAYMENT --sleep-per-window-seconds 0.5
python scripts/reload_ods_windowed.py --tasks ODS_PAYMENT,ODS_TABLE_USE --start 2025-07-01 --end 2025-09-30 --window-split-unit month --window-compensation-hours 2 --sleep-seconds 1
python run_update.py --check-ods-gaps --check-ods-window-days 1 --check-ods-sleep-per-window-seconds 0.5
```

1
Untitled Normal file
View File

@@ -0,0 +1 @@
README.md

View File

@@ -1,9 +0,0 @@
# app/etl_busy.py
def run():
"""
忙时抓取逻辑。
TODO: 这里写具体抓取流程API 调用 / 网页解析 / 写入 PostgreSQL 等)
"""
print("Running busy-period ETL...")
# 示例:后续在这里接 PostgreSQL 或 HTTP 抓取
# ...

View File

@@ -1,8 +0,0 @@
# app/etl_idle.py
def run():
"""
闲时抓取逻辑。
可以做全量同步、大批量历史修正等。
"""
print("Running idle-period ETL...")
# ...

View File

@@ -1,31 +0,0 @@
# app/runner.py
import argparse
from datetime import datetime
from . import etl_busy, etl_idle
def main():
parser = argparse.ArgumentParser(description="Feiqiu ETL Runner")
parser.add_argument(
"--mode",
choices=["busy", "idle"],
required=True,
help="ETL mode: busy or idle",
)
args = parser.parse_args()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{now}] Start ETL mode={args.mode}")
if args.mode == "busy":
etl_busy.run()
else:
etl_idle.run()
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] ETL finished.")
if __name__ == "__main__":
main()

View File

@@ -1,53 +1,132 @@
# 数据库配置(真实库) # ==============================================================================
# ETL 系统配置文件
# ==============================================================================
# 配置优先级DEFAULTS < .env < CLI 参数
# ------------------------------------------------------------------------------
# 数据库配置
# ------------------------------------------------------------------------------
# 完整 DSN优先使用如果设置了则忽略下面的 host/port/name/user/password
PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test
# 分离式配置(如果不使用 DSN可以单独配置以下参数
# PG_HOST=localhost
# PG_PORT=5432
# PG_NAME=your_database
# PG_USER=your_user
# PG_PASSWORD=your_password
# 连接超时(秒,范围 1-20
PG_CONNECT_TIMEOUT=10 PG_CONNECT_TIMEOUT=10
# 如需拆分配置PG_HOST=... PG_PORT=... PG_NAME=... PG_USER=... PG_PASSWORD=...
# API配置如需走真实接口再填写 # ------------------------------------------------------------------------------
API_BASE=https://api.example.com # 数据库 Schema 配置
API_TOKEN=your_token_here # ------------------------------------------------------------------------------
# API_TIMEOUT=20 # OLTP 业务数据 schema默认 billiards
# API_PAGE_SIZE=200 SCHEMA_OLTP=billiards
# API_RETRY_MAX=3
# 应用配置 # ETL 管理数据 schema默认 etl_admin
SCHEMA_ETL=etl_admin
# ------------------------------------------------------------------------------
# API 配置
# ------------------------------------------------------------------------------
API_BASE=https://pc.ficoo.vip/apiprod/admin/v1/
API_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjbGllbnQtdHlwZSI6IjQiLCJ1c2VyLXR5cGUiOiIxIiwiaHR0cDovL3NjaGVtYXMubWljcm9zb2Z0LmNvbS93cy8yMDA4LzA2L2lkZW50aXR5L2NsYWltcy9yb2xlIjoiMTIiLCJyb2xlLWlkIjoiMTIiLCJ0ZW5hbnQtaWQiOiIyNzkwNjgzMTYwNzA5OTU3Iiwibmlja25hbWUiOiLnp5_miLfnrqHnkIblkZjvvJrmganmgakxIiwic2l0ZS1pZCI6IjAiLCJtb2JpbGUiOiIxMzgxMDUwMjMwNCIsInNpZCI6IjI5NTA0ODk2NTgzOTU4NDUiLCJzdGFmZi1pZCI6IjMwMDk5MTg2OTE1NTkwNDUiLCJvcmctaWQiOiIwIiwicm9sZS10eXBlIjoiMyIsInJlZnJlc2hUb2tlbiI6Iks1ZnBhYlRTNkFsR0FpMmN4WGYrMHdJVkk0L2UvTVQrSVBHM3V5VWRrSjg9IiwicmVmcmVzaEV4cGlyeVRpbWUiOiIyMDI2LzEvMzEg5LiL5Y2IMTA6MTQ6NTEiLCJuZWVkQ2hlY2tUb2tlbiI6ImZhbHNlIiwiZXhwIjoxNzY5ODY4ODkxLCJpc3MiOiJ0ZXN0IiwiYXVkIjoiVXNlciJ9.BH3-iwwrBczb8aFfI__6kwe3AIsEPacN9TruaTrQ3nY
# API 请求超时(秒)
API_TIMEOUT=20
# 分页大小
API_PAGE_SIZE=200
# 最大重试次数
API_RETRY_MAX=3
# 重试退避时间JSON 数组格式,单位秒)
# API_RETRY_BACKOFF=[1, 2, 4]
# ------------------------------------------------------------------------------
# 门店配置
# ------------------------------------------------------------------------------
STORE_ID=2790685415443269 STORE_ID=2790685415443269
# TIMEZONE=Asia/Taipei TIMEZONE=Asia/Taipei
# SCHEMA_OLTP=billiards
# SCHEMA_ETL=etl_admin
# ------------------------------------------------------------------------------
# 路径配置 # 路径配置
EXPORT_ROOT=C:\dev\LLTQ\export\JSON # ------------------------------------------------------------------------------
LOG_ROOT=C:\dev\LLTQ\export\LOG # 导出根目录
FETCH_ROOT= EXPORT_ROOT=export/JSON
INGEST_SOURCE_DIR=
WRITE_PRETTY_JSON=false
PGCLIENTENCODING=utf8
# ETL配置 # 日志根目录
LOG_ROOT=export/LOG
# 在线抓取 JSON 输出目录
FETCH_ROOT=export/JSON
# 本地入库数据源目录INGEST_ONLY 模式使用)
INGEST_SOURCE_DIR=export/test-json-doc
# ------------------------------------------------------------------------------
# 流水线配置
# ------------------------------------------------------------------------------
# 运行模式FULL抓取+入库、FETCH_ONLY仅抓取、INGEST_ONLY仅入库
PIPELINE_FLOW=FULL
# JSON 美化输出(调试用,会增加文件大小)
WRITE_PRETTY_JSON=false
# ------------------------------------------------------------------------------
# 时间窗口配置
# ------------------------------------------------------------------------------
# 冗余窗口(秒),向前多抓取的时间避免边界数据丢失
OVERLAP_SECONDS=120 OVERLAP_SECONDS=120
# 忙时窗口大小(分钟)
WINDOW_BUSY_MIN=30 WINDOW_BUSY_MIN=30
# 闲时窗口大小(分钟)
WINDOW_IDLE_MIN=180 WINDOW_IDLE_MIN=180
# 闲时窗口定义HH:MM 格式)
IDLE_START=04:00 IDLE_START=04:00
IDLE_END=16:00 IDLE_END=16:00
# 窗口切分单位month/none用于长时间回溯任务按月切分
WINDOW_SPLIT_UNIT=month
# 窗口前后补偿小时数,用于捕获边界数据
WINDOW_COMPENSATION_HOURS=2
# 允许空结果推进窗口
ALLOW_EMPTY_RESULT_ADVANCE=true ALLOW_EMPTY_RESULT_ADVANCE=true
# 清洗配置 # ------------------------------------------------------------------------------
LOG_UNKNOWN_FIELDS=true # 数据完整性检查配置
HASH_ALGO=sha1 # ------------------------------------------------------------------------------
STRICT_NUMERIC=true # 检查模式history历史全量、recent最近增量
ROUND_MONEY_SCALE=2 INTEGRITY_MODE=history
# 测试/离线模式(真实库联调建议 ONLINE # 历史检查起始日期history 模式使用
TEST_MODE=ONLINE INTEGRITY_HISTORY_START=2025-07-01
TEST_JSON_ARCHIVE_DIR=tests/source-data-doc
TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp
# 测试数据库 # 历史检查结束日期(留空表示到当前)
TEST_DB_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test INTEGRITY_HISTORY_END=
# ODS <20>ؽ<EFBFBD><D8BD>ű<EFBFBD><C5B1><EFBFBD><EFBFBD>ã<EFBFBD><C3A3><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ã<EFBFBD> # 是否包含维度表校验
JSON_DOC_DIR=C:\dev\LLTQ\export\test-json-doc INTEGRITY_INCLUDE_DIMENSIONS=true
ODS_INCLUDE_FILES=
ODS_DROP_SCHEMA_FIRST=true # 发现丢失数据时是否自动补全
INTEGRITY_AUTO_BACKFILL=true
# 自动执行完整性检查ETL 完成后自动触发)
INTEGRITY_AUTO_CHECK=false
# 指定要校验的 ODS 任务代码(逗号分隔,留空表示全部)
# INTEGRITY_ODS_TASK_CODES=ODS_PAYMENT,ODS_MEMBER
# ------------------------------------------------------------------------------
# 默认任务列表(逗号分隔,可被 CLI --tasks 参数覆盖)
# ------------------------------------------------------------------------------
# RUN_TASKS=ODS_PAYMENT,ODS_MEMBER,ODS_SETTLEMENT_RECORDS,DWD_LOAD_FROM_ODS

Binary file not shown.

View File

@@ -0,0 +1,44 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\gui\\main.py'],
pathex=[],
binaries=[],
datas=[('C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\gui\\resources', 'gui/resources'), ('C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\database\\schema_dwd_doc.sql', 'database'), ('C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\database\\schema_dws.sql', 'database'), ('C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\database\\schema_etl_admin.sql', 'database'), ('C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\database\\schema_ODS_doc.sql', 'database'), ('C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\database\\seed_ods_tasks.sql', 'database'), ('C:\\dev\\LLTQ\\ETL\\feiqiu-ETL\\etl_billiards\\database\\seed_scheduler_tasks.sql', 'database')],
hiddenimports=['PySide6.QtCore', 'PySide6.QtGui', 'PySide6.QtWidgets', 'psycopg2', 'psycopg2.extras', 'psycopg2.extensions', 'gui.models.task_model', 'gui.models.schedule_model', 'gui.utils.cli_builder', 'gui.utils.config_helper', 'gui.utils.app_settings', 'gui.workers.task_worker', 'gui.workers.db_worker', 'gui.widgets.settings_dialog'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=['matplotlib', 'numpy', 'pandas', 'scipy', 'PIL', 'cv2', 'tkinter'],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
[],
exclude_binaries=True,
name='ETL_Manager',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
console=False,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
coll = COLLECT(
exe,
a.binaries,
a.datas,
strip=False,
upx=True,
upx_exclude=[],
name='ETL_Manager',
)

View File

@@ -1,837 +0,0 @@
# 台球场 ETL 系统(模块化版本)合并文档
本文为原多份文档(如 `INDEX.md``QUICK_START.md``ARCHITECTURE.md``MIGRATION_GUIDE.md``PROJECT_STRUCTURE.md``README.md` 等)的合并版,只保留与**当前项目本身**相关的内容:项目说明、目录结构、架构设计、数据与控制流程、迁移与扩展指南等,不包含修改历史和重构过程描述。
---
## 1. 项目概述
台球场 ETL 系统是一个面向门店业务的专业 ETL 工程项目,用于从外部业务 API 拉取订单、支付、会员等数据经过解析、校验、SCD2 处理、质量检查后写入 PostgreSQL 数据库,并支持增量同步和任务运行追踪。
系统采用模块化、分层架构设计,核心特性包括:
- 模块化目录结构配置、数据库、API、模型、加载器、SCD2、质量检查、编排、任务、CLI、工具、测试等分层清晰
- 完整的配置管理:默认值 + 环境变量 + CLI 参数多层覆盖。
- 可复用的数据库访问层(连接管理、批量 Upsert 封装)。
- 支持重试与分页的 API 客户端。
- 类型安全的数据解析与校验模块。
- SCD2 维度历史管理。
- 数据质量检查(例如余额一致性检查)。
- 任务编排层统一调度、游标管理与运行追踪。
- 命令行入口统一管理任务执行支持筛选任务、Dry-run 等模式。
---
## 2. 快速开始
### 2.1 环境准备
- Python 版本:建议 3.10+
- 数据库PostgreSQL
- 操作系统Windows / Linux / macOS 均可
```bash
# 克隆/下载代码后进入项目目录
cd etl_billiards/
ls -la
```
你会看到下述目录结构的顶层部分(详细见第 4 章):
- `config/` - 配置管理
- `database/` - 数据库访问
- `api/` - API 客户端
- `tasks/` - ETL 任务实现
- `cli/` - 命令行入口
- `docs/` - 技术文档
### 2.2 安装依赖
```bash
pip install -r requirements.txt
```
主要依赖示例(按实际 `requirements.txt` 为准):
- `psycopg2-binary`PostgreSQL 驱动
- `requests`HTTP 客户端
- `python-dateutil`:时间处理
- `tzdata`:时区数据
### 2.3 配置环境变量
复制并修改环境变量模板:
```bash
cp .env.example .env
# 使用你习惯的编辑器修改 .env
```
`.env` 示例(最小配置):
```bash
# 数据库
PG_DSN=postgresql://user:password@localhost:5432/....
# API
API_BASE=https://api.example.com
API_TOKEN=your_token_here
# 门店/应用
STORE_ID=2790685415443269
TIMEZONE=Asia/Taipei
# 目录
EXPORT_ROOT=/path/to/export
LOG_ROOT=/path/to/logs
```
> 所有配置项的默认值见 `config/defaults.py`,最终生效配置由「默认值 + 环境变量 + CLI 参数」三层叠加。
### 2.4 运行第一个任务
通过 CLI 入口运行:
```bash
# 运行所有任务
python -m cli.main
# 仅运行订单任务
python -m cli.main --tasks ORDERS
# 运行订单 + 支付
python -m cli.main --tasks ORDERS,PAYMENTS
# Windows 使用脚本
run_etl.bat --tasks ORDERS
# Linux / macOS 使用脚本
./run_etl.sh --tasks ORDERS
```
### 2.5 查看结果
- 日志目录:使用 `LOG_ROOT` 指定,例如
```bash
ls -la C:\dev\LLTQ\export\LOG/
```
- 导出目录:使用 `EXPORT_ROOT` 指定,例如
```bash
ls -la C:\dev\LLTQ\export\JSON/
```
---
## 3. 常用命令与开发工具
### 3.1 CLI 常用命令
```bash
# 运行所有任务
python -m cli.main
# 运行指定任务
python -m cli.main --tasks ORDERS,PAYMENTS,MEMBERS
# 使用自定义数据库
python -m cli.main --pg-dsn "postgresql://user:password@host:5432/db"
# 使用自定义 API 端点
python -m cli.main --api-base "https://api.example.com" --api-token "..."
# 试运行(不写入数据库)
python -m cli.main --dry-run --tasks ORDERS
```
### 3.2 IDE / 代码质量工具示例VSCode
`.vscode/settings.json` 示例:
```json
{
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"python.formatting.provider": "black",
"python.testing.pytestEnabled": true
}
```
代码格式化与检查:
```bash
pip install black isort pylint
black .
isort .
pylint etl_billiards/
```
### 3.3 测试
```bash
# 安装测试依赖(按需)
pip install pytest pytest-cov
# 运行全部测试
pytest
# 仅运行单元测试
pytest tests/unit/
# 生成覆盖率报告
pytest --cov=. --cov-report=html
```
测试示例(按实际项目为准):
- `tests/unit/test_config.py` 配置管理单元测试
- `tests/unit/test_parsers.py` 解析器单元测试
- `tests/integration/test_database.py` 数据库集成测试
#### 3.3.1 测试模式ONLINE / OFFLINE
- `TEST_MODE=ONLINE`(默认)时,测试会模拟实时 API完整执行 E/T/L。
- `TEST_MODE=OFFLINE` 时,测试改为从 `TEST_JSON_ARCHIVE_DIR` 指定的归档 JSON 中读取数据,仅做 Transform + Load适合验证本地归档数据是否仍可回放。
- `TEST_JSON_ARCHIVE_DIR`:离线 JSON 归档目录(示例:`tests/source-data-doc` 或 CI 产出的快照)。
- `TEST_JSON_TEMP_DIR`:测试生成的临时 JSON 输出目录,便于隔离每次运行的数据。
- `TEST_DB_DSN`:可选,若设置则单元测试会连接到此 PostgreSQL DSN实打实执行写库留空时测试使用内存伪库避免依赖数据库。
示例命令:
```bash
# 在线模式覆盖所有任务
TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py
# 离线模式使用归档 JSON 覆盖所有任务
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/source-data-doc pytest tests/unit/test_etl_tasks_offline.py
# 使用脚本按需组合参数(示例:在线 + 仅订单用例)
python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS
# 使用脚本连接真实测试库并回放离线模式
python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb
# 使用“指令仓库”中的预置命令
python scripts/run_tests.py --preset offline_realdb
python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py
```
#### 3.3.2 脚本化测试组合(`run_tests.py` / `test_presets.py`
- `scripts/run_tests.py` 是 pytest 的统一入口:自动把项目根目录加入 `sys.path`,并提供 `--suite online/offline/integration`、`--tests`(自定义路径)、`--mode`、`--db-dsn`、`--json-archive`、`--json-temp`、`--keyword/-k`、`--pytest-args`、`--env KEY=VALUE` 等参数,可以像搭积木一样自由组合;
- `--preset foo` 会读取 `scripts/test_presets.py` 内 `PRESETS["foo"]` 的配置,并叠加到当前命令;`--list-presets` 与 `--dry-run` 可用来审阅或仅打印命令;
- 直接执行 `python scripts/test_presets.py` 可依次运行 `AUTO_RUN_PRESETS` 中列出的预置;传入 `--preset x --dry-run` 则只打印对应命令。
`test_presets.py` 充当“指令仓库”。每个预置都是一个字典,常用字段解释如下:
| 字段 | 作用 |
| ---------------------------- | ------------------------------------------------------------------ |
| `suite` | 复用 `run_tests.py` 内置套件online/offline/integration可多选 |
| `tests` | 追加任意 pytest 路径,例如 `tests/unit/test_config.py` |
| `mode` | 覆盖 `TEST_MODE`ONLINE / OFFLINE |
| `db_dsn` | 覆盖 `TEST_DB_DSN`,用于连入真实测试库 |
| `json_archive` / `json_temp` | 配置离线 JSON 归档与临时目录 |
| `keyword` | 映射到 `pytest -k`,用于关键字过滤 |
| `pytest_args` | 附加 pytest 参数,例 `-vv --maxfail=1` |
| `env` | 额外环境变量列表,如 `["STORE_ID=123"]` |
| `preset_meta` | 说明性文字,便于描述场景 |
示例:`offline_realdb` 预置会设置 `TEST_MODE=OFFLINE`、指定 `tests/source-data-doc` 为归档目录,并通过 `db_dsn` 连到测试库。执行 `python scripts/run_tests.py --preset offline_realdb` 或 `python scripts/test_presets.py --preset offline_realdb` 即可复用该组合保证本地、CI 与生产回放脚本一致。
#### 3.3.3 数据库连通性快速检查
`python scripts/test_db_connection.py` 提供最轻量的 PostgreSQL 连通性检测:默认使用 `TEST_DB_DSN`(也可传 `--dsn`),尝试连接并执行 `SELECT 1 AS ok`(可通过 `--query` 自定义)。典型用途:
```bash
# 读取 .env/环境变量中的 TEST_DB_DSN
python scripts/test_db_connection.py
# 临时指定 DSN并检查任务配置表
python scripts/test_db_connection.py --dsn postgresql://user:pwd@host:5432/.... --query "SELECT count(*) FROM etl_admin.etl_task"
```
脚本返回 0 代表连接与查询成功;若返回非 0可结合第 8 章“常见问题排查”的数据库章节(网络、防火墙、账号权限等)先定位问题,再运行完整 ETL。
---
## 4. 项目结构与文件说明
### 4.1 总体目录结构(树状图)
```text
etl_billiards/
├── README.md # 项目总览和使用说明
├── MIGRATION_GUIDE.md # 从旧版本迁移指南
├── requirements.txt # Python 依赖列表
├── setup.py # 项目安装配置
├── .env.example # 环境变量配置模板
├── .gitignore # Git 忽略文件配置
├── run_etl.sh # Linux/Mac 运行脚本
├── run_etl.bat # Windows 运行脚本
├── config/ # 配置管理模块
│ ├── __init__.py
│ ├── defaults.py # 默认配置值定义
│ ├── env_parser.py # 环境变量解析器
│ └── settings.py # 配置管理主类
├── database/ # 数据库访问层
│ ├── __init__.py
│ ├── connection.py # 数据库连接管理
│ └── operations.py # 批量操作封装
├── api/ # HTTP API 客户端
│ ├── __init__.py
│ └── client.py # API 客户端(重试 + 分页)
├── models/ # 数据模型层
│ ├── __init__.py
│ ├── parsers.py # 类型解析器
│ └── validators.py # 数据验证器
├── loaders/ # 数据加载器层
│ ├── __init__.py
│ ├── base_loader.py # 加载器基类
│ ├── dimensions/ # 维度表加载器
│ │ ├── __init__.py
│ │ └── member.py # 会员维度加载器
│ └── facts/ # 事实表加载器
│ ├── __init__.py
│ ├── order.py # 订单事实表加载器
│ └── payment.py # 支付记录加载器
├── scd/ # SCD2 处理层
│ ├── __init__.py
│ └── scd2_handler.py # SCD2 历史记录处理器
├── quality/ # 数据质量检查层
│ ├── __init__.py
│ ├── base_checker.py # 质量检查器基类
│ └── balance_checker.py # 余额一致性检查器
├── orchestration/ # ETL 编排层
│ ├── __init__.py
│ ├── scheduler.py # ETL 调度器
│ ├── task_registry.py # 任务注册表(工厂模式)
│ ├── cursor_manager.py # 游标管理器
│ └── run_tracker.py # 运行记录追踪器
├── tasks/ # ETL 任务层
│ ├── __init__.py
│ ├── base_task.py # 任务基类(模板方法)
│ ├── orders_task.py # 订单 ETL 任务
│ ├── payments_task.py # 支付 ETL 任务
│ └── members_task.py # 会员 ETL 任务
├── cli/ # 命令行接口层
│ ├── __init__.py
│ └── main.py # CLI 主入口
├── utils/ # 工具函数
│ ├── __init__.py
│ └── helpers.py # 通用工具函数
├── tests/ # 测试代码
│ ├── __init__.py
│ ├── unit/ # 单元测试
│ │ ├── __init__.py
│ │ ├── test_config.py
│ │ └── test_parsers.py
│ ├── testdata_json/ # 清洗入库用的测试Json文件
│ │ └── XX.json
│ └── integration/ # 集成测试
│ ├── __init__.py
│ └── test_database.py
└── docs/ # 文档
└── ARCHITECTURE.md # 架构设计文档
```
### 4.2 各模块职责概览
- **config/**
- 统一配置入口,支持默认值、环境变量、命令行参数三层覆盖。
- **database/**
- 封装 PostgreSQL 连接与批量操作插入、更新、Upsert 等)。
- **api/**
- 对上游业务 API 的 HTTP 调用进行统一封装,支持重试、分页与超时控制。
- **models/**
- 提供类型解析器(时间戳、金额、整数等)与业务级数据校验器。
- **loaders/**
- 提供事实表与维度表的加载逻辑(包含批量 Upsert、统计写入结果等
- **scd/**
- 维度型数据的 SCD2 历史管理(有效期、版本标记等)。
- **quality/**
- 质量检查策略,例如余额一致性、记录数量对齐等。
- **orchestration/**
- 任务调度、任务注册、游标管理(增量窗口)、运行记录追踪。
- **tasks/**
- 具体业务任务(订单、支付、会员等),封装了从“取数 → 处理 → 写库 → 记录结果”的完整流程。
- **cli/**
- 命令行入口,解析参数并启动调度流程。
- **utils/**
- 杂项工具函数。
- **tests/**
- 单元测试与集成测试代码。
---
## 5. 架构设计与流程说明
### 5.1 分层架构图
```text
┌─────────────────────────────────────┐
│ CLI 命令行接口 │ <- cli/main.py
└─────────────┬───────────────────────┘
┌─────────────▼───────────────────────┐
│ Orchestration 编排层 │ <- orchestration/
│ (Scheduler, TaskRegistry, ...) │
└─────────────┬───────────────────────┘
┌─────────────▼───────────────────────┐
│ Tasks 任务层 │ <- tasks/
│ (OrdersTask, PaymentsTask, ...) │
└───┬─────────┬─────────┬─────────────┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────┐ ┌──────────┐
│Loaders │ │ SCD │ │ Quality │ <- loaders/, scd/, quality/
└────────┘ └─────┘ └──────────┘
┌───────▼────────┐
│ Models 模型 │ <- models/
└───────┬────────┘
┌───────▼────────┐
│ API 客户端 │ <- api/
└───────┬────────┘
┌───────▼────────┐
│ Database 访问 │ <- database/
└───────┬────────┘
┌───────▼────────┐
│ Config 配置 │ <- config/
└────────────────┘
```
### 5.2 各层职责(当前设计)
- **CLI 层 (`cli/`)**
- 解析命令行参数指定任务列表、Dry-run、覆盖配置项等
- 初始化配置与日志后交由编排层执行。
- **编排层 (`orchestration/`)**
- `scheduler.py`:根据配置与 CLI 参数选择需要执行的任务,控制执行顺序和并行策略。
- `task_registry.py`:提供任务注册表,按任务代码创建任务实例(工厂模式)。
- `cursor_manager.py`:管理增量游标(时间窗口 / ID 游标)。
- `run_tracker.py`:记录每次任务运行的状态、统计信息和错误信息。
- **任务层 (`tasks/`)**
- `base_task.py`:定义任务执行模板流程(模板方法模式),包括获取窗口、调用上游、解析 / 校验、写库、更新游标等。
- `orders_task.py` / `payments_task.py` / `members_task.py`:实现具体任务逻辑(订单、支付、会员)。
- **加载器 / SCD / 质量层**
- `loaders/`:根据目标表封装 Upsert / Insert / Update 逻辑。
- `scd/scd2_handler.py`:为维度表提供 SCD2 历史管理能力。
- `quality/`:执行数据质量检查,如余额对账。
- **模型层 (`models/`)**
- `parsers.py`:负责数据类型转换(字符串 → 时间戳、Decimal、int 等)。
- `validators.py`:执行字段级和记录级的数据校验。
- **API 层 (`api/client.py`)**
- 封装 HTTP 调用,处理重试、超时及分页。
- **数据库层 (`database/`)**
- 管理数据库连接及上下文。
- 提供批量插入 / 更新 / Upsert 操作接口。
- **配置层 (`config/`)**
- 定义配置项默认值。
- 解析环境变量并进行类型转换。
- 对外提供统一配置对象。
### 5.3 设计模式(当前使用)
- 工厂模式:任务注册 / 创建(`TaskRegistry`)。
- 模板方法模式:任务执行流程(`BaseTask`)。
- 策略模式:不同 Loader / Checker 实现不同策略。
- 依赖注入:通过构造函数向任务传入 `db`、`api`、`config` 等依赖。
### 5.4 数据与控制流程
整体流程:
1. CLI 解析参数并加载配置。
2. Scheduler 构建数据库连接、API 客户端等依赖。
3. Scheduler 遍历任务配置,从 `TaskRegistry` 获取任务类并实例化。
4. 每个任务按统一模板执行:
- 读取游标 / 时间窗口。
- 调用 API 拉取数据(可分页)。
- 解析、验证数据。
- 通过 Loader 写入数据库(事实表 / 维度表 / SCD2
- 执行质量检查。
- 更新游标与运行记录。
5. 所有任务执行完成后,释放连接并退出进程。
### 5.5 错误处理策略
- 单个任务失败不影响其他任务执行。
- 数据库操作异常自动回滚当前事务。
- API 请求失败时按配置进行重试,超过重试次数记录错误并终止该任务。
- 所有错误被记录到日志和运行追踪表,便于事后排查。
### 5.6 ODS + DWD 双阶段策略(新增)
为了支撑回溯/重放与后续 DWD 宽表构建,项目新增了 `billiards_ods` Schema 以及一组专门的 ODS 任务/Loader
- **ODS 表**`billiards_ods.ods_order_settle`、`ods_table_use_detail`、`ods_assistant_ledger`、`ods_assistant_abolish`、`ods_goods_ledger`、`ods_payment`、`ods_refund`、`ods_coupon_verify`、`ods_member`、`ods_member_card`、`ods_package_coupon`、`ods_inventory_stock`、`ods_inventory_change`。每条记录都会保存 `store_id + 源主键 + payload JSON + fetched_at + source_endpoint` 等信息。
- **通用 Loader**`loaders/ods/generic.py::GenericODSLoader` 统一封装了 `INSERT ... ON CONFLICT ...` 与批量写入逻辑,调用方只需提供列名与主键列即可。
- **ODS 任务**`tasks/ods_tasks.py` 内通过 `OdsTaskSpec` 定义了一组任务(`ODS_ORDER_SETTLE`、`ODS_PAYMENT`、`ODS_ASSISTANT_LEDGER` 等),并在 `TaskRegistry` 中自动注册,可直接通过 `python -m cli.main --tasks ODS_ORDER_SETTLE,ODS_PAYMENT` 执行。
- **双阶段链路**
1. 阶段 1ODS调用 API/离线归档 JSON将原始记录写入 ODS 表,保留分页、抓取时间、来源文件等元数据。
2. 阶段 2DWD/DIM后续订单、支付、券等事实任务将改为从 ODS 读取 payload经过解析/校验后写入 `billiards.fact_*`、`dim_*` 表,避免重复拉取上游接口。
> 新增的单元测试 `tests/unit/test_ods_tasks.py` 覆盖了 `ODS_ORDER_SETTLE`、`ODS_PAYMENT` 的入库路径,可作为扩展其他 ODS 任务的模板。
---
## 6. 迁移指南(从旧脚本到当前项目)
本节用于说明如何从旧的单文件脚本(如 `task_merged.py`)迁移到当前模块化项目,属于当前项目的使用说明,不涉及历史对比细节。
### 6.1 核心功能映射示意
| 旧版本函数 / 类 | 新版本位置 | 说明 |
| --------------------- | ----------------------------------------------------- | ---------- |
| `DEFAULTS` 字典 | `config/defaults.py` | 配置默认值 |
| `build_config()` | `config/settings.py::AppConfig.load()` | 配置加载 |
| `Pg` 类 | `database/connection.py::DatabaseConnection` | 数据库连接 |
| `http_get_json()` | `api/client.py::APIClient.get()` | API 请求 |
| `paged_get()` | `api/client.py::APIClient.get_paginated()` | 分页请求 |
| `parse_ts()` | `models/parsers.py::TypeParser.parse_timestamp()` | 时间解析 |
| `upsert_fact_order()` | `loaders/facts/order.py::OrderLoader.upsert_orders()` | 订单加载 |
| `scd2_upsert()` | `scd/scd2_handler.py::SCD2Handler.upsert()` | SCD2 处理 |
| `run_task_orders()` | `tasks/orders_task.py::OrdersTask.execute()` | 订单任务 |
| `main()` | `cli/main.py::main()` | 主入口 |
### 6.2 典型迁移步骤
1. **配置迁移**
- 原来在 `DEFAULTS` 或脚本内硬编码的配置,迁移到 `.env` 与 `config/defaults.py`。
- 使用 `AppConfig.load()` 统一获取配置。
2. **并行运行验证**
```bash
# 旧脚本
python task_merged.py --tasks ORDERS
# 新项目
python -m cli.main --tasks ORDERS
```
对比新旧版本导出的数据表和日志,确认一致性。
3. **自定义逻辑迁移**
- 原脚本中的自定义清洗逻辑 → 放入相应 `loaders/` 或任务类中。
- 自定义任务 → 在 `tasks/` 中实现并在 `task_registry` 中注册。
- 自定义 API 调用 → 扩展 `api/client.py` 或单独封装服务类。
4. **逐步切换**
- 先在测试环境并行运行。
- 再逐步切换生产任务到新版本。
---
## 7. 开发与扩展指南(当前项目)
### 7.1 添加新任务
1. 在 `tasks/` 目录创建任务类:
```python
from .base_task import BaseTask
class MyTask(BaseTask):
def get_task_code(self) -> str:
return "MY_TASK"
def execute(self) -> dict:
# 1. 获取时间窗口
window_start, window_end, _ = self._get_time_window()
# 2. 调用 API 获取数据
records, _ = self.api.get_paginated(...)
# 3. 解析 / 校验
parsed = [self._parse(r) for r in records]
# 4. 加载数据
loader = MyLoader(self.db)
inserted, updated, _ = loader.upsert(parsed)
# 5. 提交并返回结果
self.db.commit()
return self._build_result("SUCCESS", {
"inserted": inserted,
"updated": updated,
})
```
2. 在 `orchestration/task_registry.py` 中注册:
```python
from tasks.my_task import MyTask
default_registry.register("MY_TASK", MyTask)
```
3. 在任务配置表中启用(示例):
```sql
INSERT INTO etl_admin.etl_task (task_code, store_id, enabled)
VALUES ('MY_TASK', 123456, TRUE);
```
### 7.2 添加新加载器
```python
from loaders.base_loader import BaseLoader
class MyLoader(BaseLoader):
def upsert(self, records: list) -> tuple:
sql = "INSERT INTO table_name (...) VALUES (...) ON CONFLICT (...) DO UPDATE SET ... RETURNING (xmax = 0) AS inserted"
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)
```
### 7.3 添加新质量检查器
1. 在 `quality/` 中实现检查器,继承 `base_checker.py`。
2. 在任务或调度流程中调用该检查器,在写库后进行验证。
### 7.4 类型解析与校验扩展
- 在 `models/parsers.py` 中添加新类型解析方法。
- 在 `models/validators.py` 中添加新规则(如枚举校验、跨字段校验等)。
---
## 8. 常见问题排查
### 8.1 数据库连接失败
```text
错误: could not connect to server
```
排查要点:
- 检查 `PG_DSN` 或相关数据库配置是否正确。
- 确认数据库服务是否启动、网络是否可达。
### 8.2 API 请求超时
```text
错误: requests.exceptions.Timeout
```
排查要点:
- 检查 `API_BASE` 地址与网络连通性。
- 适当提高超时与重试次数(在配置中调整)。
### 8.3 模块导入错误
```text
错误: ModuleNotFoundError
```
排查要点:
- 确认在项目根目录下运行(包含 `etl_billiards/` 包)。
- 或通过 `pip install -e .` 以可编辑模式安装项目。
### 8.4 权限相关问题
```text
错误: Permission denied
```
排查要点:
- 脚本无执行权限:`chmod +x run_etl.sh`。
- Windows 需要以管理员身份运行,或修改日志 / 导出目录权限。
---
## 9. 使用前检查清单
在正式运行前建议确认:
- [ ] 已安装 Python 3.10+。
- [ ] 已执行 `pip install -r requirements.txt`。
- [ ] `.env` 已配置正确数据库、API、门店 ID、路径等
- [ ] PostgreSQL 数据库可连接。
- [ ] API 服务可访问且凭证有效。
- [ ] `LOG_ROOT`、`EXPORT_ROOT` 目录存在且拥有写权限。
---
## 10. 参考说明
- 本文已合并原有的快速开始、项目结构、架构说明、迁移指南等内容,可作为当前项目的统一说明文档。
- 如需在此基础上拆分多份文档,可按章节拆出,例如「快速开始」「架构设计」「迁移指南」「开发扩展」等。
## 11. 运行/调试模式说明
- 生产环境仅保留“任务模式”:通过调度/CLI 执行注册的任务ETL/ODS不使用调试脚本。
- 开发/调试可使用的辅助脚本(上线前可删除或禁用):
- `python -m etl_billiards.scripts.rebuild_ods_from_json`:从本地 JSON 目录重建 `billiards_ods`,用于离线初始化/验证。环境变量:`PG_DSN`(必填)、`JSON_DOC_DIR`(可选,默认 `C:\dev\LLTQ\export\test-json-doc`)、`INCLUDE_FILES`(逗号分隔文件名)、`DROP_SCHEMA_FIRST`(默认 true
- 如需在生产环境保留脚本,请在运维手册中明确用途和禁用条件,避免误用。
## 12. ODS 任务上线指引
- 任务注册:`etl_billiards/database/seed_ods_tasks.sql` 列出了当前启用的 ODS 任务。将其中的 `store_id` 替换为实际门店后执行:
```
psql "$PG_DSN" -f etl_billiards/database/seed_ods_tasks.sql
```
`ON CONFLICT` 会保持 enabled=true避免重复。
- 调度:确认 `etl_admin.etl_task` 中已启用所需的 ODS 任务(任务代码见 seed 脚本),调度器或 CLI `--tasks` 即可调用。
- 离线回灌:开发环境可用 `rebuild_ods_from_json` 以样例 JSON 初始化 ODS生产慎用默认按 `(source_file, record_index)` 去重。
- 测试:`pytest etl_billiards/tests/unit/test_ods_tasks.py` 覆盖核心 ODS 任务;测试时可设置 `ETL_SKIP_DOTENV=1` 跳过本地 .env 读取。
## 13. ODS 表映射总览
| ODS 表名 | 接口 Path | 数据列表路径 |
| ------------------------------------ | ---------------------------------------------------- | ----------------------------- |
| `assistant_accounts_master` | `/PersonnelManagement/SearchAssistantInfo` | data.assistantInfos |
| `assistant_service_records` | `/AssistantPerformance/GetOrderAssistantDetails` | data.orderAssistantDetails |
| `assistant_cancellation_records` | `/AssistantPerformance/GetAbolitionAssistant` | data.abolitionAssistants |
| `goods_stock_movements` | `/GoodsStockManage/QueryGoodsOutboundReceipt` | data.queryDeliveryRecordsList |
| `goods_stock_summary` | `/TenantGoods/GetGoodsStockReport` | data |
| `group_buy_packages` | `/PackageCoupon/QueryPackageCouponList` | data.packageCouponList |
| `group_buy_redemption_records` | `/Site/GetSiteTableUseDetails` | data.siteTableUseDetailsList |
| `member_profiles` | `/MemberProfile/GetTenantMemberList` | data.tenantMemberInfos |
| `member_balance_changes` | `/MemberProfile/GetMemberCardBalanceChange` | data.tenantMemberCardLogs |
| `member_stored_value_cards` | `/MemberProfile/GetTenantMemberCardList` | data.tenantMemberCards |
| `payment_transactions` | `/PayLog/GetPayLogListPage` | data |
| `platform_coupon_redemption_records` | `/Promotion/GetOfflineCouponConsumePageList` | data |
| `recharge_settlements` | `/Site/GetRechargeSettleList` | data.settleList |
| `refund_transactions` | `/Order/GetRefundPayLogList` | data |
| `settlement_records` | `/Site/GetAllOrderSettleList` | data.settleList |
| `settlement_ticket_details` | `/Order/GetOrderSettleTicketNew` | (整包原始 JSON |
| `site_tables_master` | `/Table/GetSiteTables` | data.siteTables |
| `stock_goods_category_tree` | `/TenantGoodsCategory/QueryPrimarySecondaryCategory` | data.goodsCategoryList |
| `store_goods_master` | `/TenantGoods/GetGoodsInventoryList` | data.orderGoodsList |
| `store_goods_sales_records` | `/TenantGoods/GetGoodsSalesList` | data.orderGoodsLedgers |
| `table_fee_discount_records` | `/Site/GetTaiFeeAdjustList` | data.taiFeeAdjustInfos |
| `table_fee_transactions` | `/Site/GetSiteTableOrderDetails` | data.siteTableUseDetailsList |
| `tenant_goods_master` | `/TenantGoods/QueryTenantGoods` | data.tenantGoodsList |
## 14. ODS 相关环境变量/默认值
- `.env` / 环境变量:
- `JSON_DOC_DIR`ODS 样例 JSON 目录(开发/回灌用)
- `ODS_INCLUDE_FILES`:限定导入的文件名(逗号分隔,不含 .json
- `ODS_DROP_SCHEMA_FIRST`true/false是否重建 schema
- `ETL_SKIP_DOTENV`:测试/CI 时设为 1 跳过本地 .env 读取
- `config/defaults.py` 中 `ods` 默认值:
- `json_doc_dir`: `C:\dev\LLTQ\export\test-json-doc`
- `include_files`: `""`
- `drop_schema_first`: `True`
---
## 15. DWD 维度 “业务事件”
1. 粒度唯一、原子
- 一张 DWD 表只能有一种业务粒度,比如:
- 一条记录 = 一次结账;
- 一条记录 = 一段台费流水;
- 一条记录 = 一次助教服务;
- 一条记录 = 一次会员余额变动。
- 表里面不能又混“订单头”又混“订单行”,不能一部分是“汇总”,一部分是“明细”。
- 一旦粒度确定,所有字段都要跟这个粒度匹配:
- 比如“结账头表”就不要塞每一行商品明细;
- 商品明细就不要塞整单级别的总金额。
- 这是 DWD 层最重要的一条。
2. 以业务过程建模,不以 JSON 列表建模
- 先画清楚你真实的业务链路:
- 开台 / 换台 / 关台 → 台费流水
- 助教上桌 → 助教服务流水 / 废除事件
- 点单 → 商品销售流水
- 充值 / 消费 → 余额变更 / 充值单
- 结账 → 结账头表 + 支付流水 / 退款流水
- 团购 / 平台券 → 核销流水
3. 主键明确、外键统一
- 每张 DWD 表必须有业务主键(哪怕是接口给的 id不要依赖数据库自增。
- 所有“同一概念”的字段必须统一命名、统一含义:
- 门店:统一叫 site_id都对应 siteProfile.id
- 会员:统一叫 member_id 对应 member_profiles.idsystem_member_id 单独一列;
- 台桌:统一 table_id 对应 site_tables_master.id
- 结账:统一 order_settle_id
- 订单:统一 order_trade_no 等。
- 否则后面 DWS、AI 要把表拼起来会非常痛苦。
4. 保留明细,不做过度汇总
- DWD 层的事实表原则上只做“明细级”的数据:
- 不要在 DWD 就把“日汇总、周汇总、月汇总”算出来,那是 DWS 的事;
- 也不要把多个事件折成一行(例如一张表同时放日汇总+单笔流水)。
- 需要聚合时,再在 DWS 做主题宽表:
- dws_member_day_profile、dws_site_day_summary 等。
- DWD 只负责细颗粒度的真相。
5. 统一清洗、标准化,但保持可追溯
- 在 DWD 层一定要做的清洗:
- 类型转换:字符串时间 → 时间类型,金额统一为 decimal布尔统一为 0/1
- 单位统一:秒 / 分钟、元 / 分都统一;
- 枚举标准化:状态码、类型码在 DWD 里就定死含义,必要时建枚举维表。
- 同时要保证:
- 每条 DWD 记录都能追溯回 ODS
- 保留源系统主键;
- 保留原始时间 / 原始金额字段(不要覆盖掉)。
6. 扁平化、去嵌套
- JSON 里常见结构是:分页壳 + 头 + 明细数组 + 各种嵌套对象siteProfile、tableProfile、goodsLedgers…
- DWD 的原则是:
- 去掉分页壳;
- 把“数组”拆成子表(头表 / 行表);
- 把重复出现的 profile 抽出去做维度表(门店、台、商品、会员……)。
- 目标是DWD 表都是二维表结构,不存复杂嵌套 JSON。
7. 模型长期稳定,可扩展
- DWD 的表结构要尽可能稳定,新增需求尽量通过:
- 加字段;
- 新建事实表 / 维度表;
- 在 DWS 做派生指标;
- 而不是频繁重构已有 DWD 表结构。
- 这点跟你后面要喂给 LLM 也很相关AI 配的 prompt、schema 理解都要尽量少改。

View File

@@ -8,6 +8,8 @@ import requests
from requests.adapters import HTTPAdapter from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
from api.endpoint_routing import plan_calls
DEFAULT_BROWSER_HEADERS = { DEFAULT_BROWSER_HEADERS = {
"Accept": "application/json, text/plain, */*", "Accept": "application/json, text/plain, */*",
"Content-Type": "application/json", "Content-Type": "application/json",
@@ -142,7 +144,7 @@ class APIClient:
raise ValueError(f"API 返回错误 code={code} msg={msg}") raise ValueError(f"API 返回错误 code={code} msg={msg}")
# ------------------------------------------------------------------ 分页 # ------------------------------------------------------------------ 分页
def iter_paginated( def _iter_paginated_single(
self, self,
endpoint: str, endpoint: str,
params: dict | None, params: dict | None,
@@ -155,8 +157,7 @@ class APIClient:
page_end: int | None = None, page_end: int | None = None,
) -> Iterable[tuple[int, list, dict, dict]]: ) -> Iterable[tuple[int, list, dict, dict]]:
""" """
分页迭代器:逐页拉取数据并产出 (page_no, records, request_params, raw_response) 单一 endpoint 的分页迭代器(不包含 recent/former 路由逻辑)
page_size=None 时不附带分页参数,仅拉取一次。
""" """
base_params = dict(params or {}) base_params = dict(params or {})
page = page_start page = page_start
@@ -183,6 +184,42 @@ class APIClient:
page += 1 page += 1
def iter_paginated(
self,
endpoint: str,
params: dict | None,
page_size: int | None = 200,
page_field: str = "page",
size_field: str = "limit",
data_path: tuple = ("data",),
list_key: str | Sequence[str] | None = None,
page_start: int = 1,
page_end: int | None = None,
) -> Iterable[tuple[int, list, dict, dict]]:
"""
分页迭代器:逐页拉取数据并产出 (page_no, records, request_params, raw_response)。
page_size=None 时不附带分页参数,仅拉取一次。
"""
# recent/former 路由:当 params 带时间范围字段时按“3个月自然月”边界决定走哪个 endpoint
# 跨越边界则拆分为两段请求并顺序产出,确保调用方使用 page_no 命名文件时不会被覆盖。
call_plan = plan_calls(endpoint, params)
global_page = 1
for call in call_plan:
for _, records, request_params, payload in self._iter_paginated_single(
endpoint=call.endpoint,
params=call.params,
page_size=page_size,
page_field=page_field,
size_field=size_field,
data_path=data_path,
list_key=list_key,
page_start=page_start,
page_end=page_end,
):
yield global_page, records, request_params, payload
global_page += 1
def get_paginated( def get_paginated(
self, self,
endpoint: str, endpoint: str,

View File

@@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
"""
“近期记录 / 历史记录(Former)”接口路由规则。
需求:
- 当请求参数包含可定义时间范围的字段时,根据当前时间(北京时间/上海时区)判断:
- 3个月自然月之前 -> 使用“历史记录”接口
- 3个月以内 -> 使用“近期记录”接口
- 若时间范围跨越边界 -> 拆分为两段分别请求并合并(由上层分页迭代器顺序产出)
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from dateutil import parser as dtparser
from dateutil.relativedelta import relativedelta
from zoneinfo import ZoneInfo
ROUTING_TZ = ZoneInfo("Asia/Shanghai")
RECENT_MONTHS = 3
# 按 `fetch-test/recent_vs_former_report.md` 更新(“无”表示没有历史接口;相同 path 表示同一个接口可查历史)
RECENT_TO_FORMER_OVERRIDES: dict[str, str | None] = {
"/AssistantPerformance/GetAbolitionAssistant": None,
"/Site/GetSiteTableUseDetails": "/Site/GetSiteTableUseDetails",
"/GoodsStockManage/QueryGoodsOutboundReceipt": "/GoodsStockManage/QueryFormerGoodsOutboundReceipt",
"/Promotion/GetOfflineCouponConsumePageList": "/Promotion/GetOfflineCouponConsumePageList",
"/Order/GetRefundPayLogList": None,
# 已知特殊
"/Site/GetAllOrderSettleList": "/Site/GetFormerOrderSettleList",
"/PayLog/GetPayLogListPage": "/PayLog/GetFormerPayLogListPage",
}
TIME_WINDOW_KEYS: tuple[tuple[str, str], ...] = (
("startTime", "endTime"),
("rangeStartTime", "rangeEndTime"),
("StartPayTime", "EndPayTime"),
)
@dataclass(frozen=True)
class WindowSpec:
start_key: str
end_key: str
start: datetime
end: datetime
@dataclass(frozen=True)
class RoutedCall:
endpoint: str
params: dict
def is_former_endpoint(endpoint: str) -> bool:
return "Former" in str(endpoint or "")
def _parse_dt(value: object, tz: ZoneInfo) -> datetime | None:
if value is None:
return None
s = str(value).strip()
if not s:
return None
dt = dtparser.parse(s)
if dt.tzinfo is None:
return dt.replace(tzinfo=tz)
return dt.astimezone(tz)
def _fmt_dt(dt: datetime, tz: ZoneInfo) -> str:
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S")
def extract_window_spec(params: dict | None, tz: ZoneInfo = ROUTING_TZ) -> WindowSpec | None:
if not isinstance(params, dict) or not params:
return None
for start_key, end_key in TIME_WINDOW_KEYS:
if start_key in params or end_key in params:
start = _parse_dt(params.get(start_key), tz)
end = _parse_dt(params.get(end_key), tz)
if start and end:
return WindowSpec(start_key=start_key, end_key=end_key, start=start, end=end)
return None
def derive_former_endpoint(recent_endpoint: str) -> str | None:
endpoint = str(recent_endpoint or "").strip()
if not endpoint:
return None
if endpoint in RECENT_TO_FORMER_OVERRIDES:
return RECENT_TO_FORMER_OVERRIDES[endpoint]
if is_former_endpoint(endpoint):
return endpoint
idx = endpoint.find("Get")
if idx == -1:
return endpoint
return f"{endpoint[:idx]}GetFormer{endpoint[idx + 3:]}"
def recent_boundary(now: datetime, months: int = RECENT_MONTHS) -> datetime:
"""
3个月自然月边界取 (now - months) 所在月份的 1 号 00:00:00。
"""
if now.tzinfo is None:
raise ValueError("now 必须为时区时间")
base = now - relativedelta(months=months)
return base.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
def plan_calls(
endpoint: str,
params: dict | None,
*,
now: datetime | None = None,
tz: ZoneInfo = ROUTING_TZ,
months: int = RECENT_MONTHS,
) -> list[RoutedCall]:
"""
根据 endpoint + params 的时间窗口,返回要调用的 endpoint/params 列表(可能拆分为两段)。
"""
base_params = dict(params or {})
if not base_params:
return [RoutedCall(endpoint=endpoint, params=base_params)]
# 若调用方显式传了 Former 接口,则不二次路由。
if is_former_endpoint(endpoint):
return [RoutedCall(endpoint=endpoint, params=base_params)]
window = extract_window_spec(base_params, tz)
if not window:
return [RoutedCall(endpoint=endpoint, params=base_params)]
former_endpoint = derive_former_endpoint(endpoint)
if former_endpoint is None or former_endpoint == endpoint:
return [RoutedCall(endpoint=endpoint, params=base_params)]
now_dt = (now or datetime.now(tz)).astimezone(tz)
boundary = recent_boundary(now_dt, months=months)
start, end = window.start, window.end
if end <= boundary:
return [RoutedCall(endpoint=former_endpoint, params=base_params)]
if start >= boundary:
return [RoutedCall(endpoint=endpoint, params=base_params)]
# 跨越边界:拆分两段(老数据 -> former新数据 -> recent
p1 = dict(base_params)
p1[window.start_key] = _fmt_dt(start, tz)
p1[window.end_key] = _fmt_dt(boundary, tz)
p2 = dict(base_params)
p2[window.start_key] = _fmt_dt(boundary, tz)
p2[window.end_key] = _fmt_dt(end, tz)
return [RoutedCall(endpoint=former_endpoint, params=p1), RoutedCall(endpoint=endpoint, params=p2)]

View File

@@ -20,6 +20,10 @@ class LocalJsonClient:
if not self.base_dir.exists(): if not self.base_dir.exists():
raise FileNotFoundError(f"JSON 目录不存在: {self.base_dir}") raise FileNotFoundError(f"JSON 目录不存在: {self.base_dir}")
def get_source_hint(self, endpoint: str) -> str:
"""Return the JSON file path for this endpoint (for source_file lineage)."""
return str(self.base_dir / endpoint_to_filename(endpoint))
def iter_paginated( def iter_paginated(
self, self,
endpoint: str, endpoint: str,

View File

@@ -7,6 +7,7 @@ from pathlib import Path
from typing import Any, Iterable, Tuple from typing import Any, Iterable, Tuple
from api.client import APIClient from api.client import APIClient
from api.endpoint_routing import plan_calls
from utils.json_store import dump_json, endpoint_to_filename from utils.json_store import dump_json, endpoint_to_filename
@@ -33,6 +34,10 @@ class RecordingAPIClient:
self.last_dump: dict[str, Any] | None = None self.last_dump: dict[str, Any] | None = None
# ------------------------------------------------------------------ public API # ------------------------------------------------------------------ public API
def get_source_hint(self, endpoint: str) -> str:
"""Return the JSON dump path for this endpoint (for source_file lineage)."""
return str(self.output_dir / endpoint_to_filename(endpoint))
def iter_paginated( def iter_paginated(
self, self,
endpoint: str, endpoint: str,
@@ -99,11 +104,18 @@ class RecordingAPIClient:
): ):
filename = endpoint_to_filename(endpoint) filename = endpoint_to_filename(endpoint)
path = self.output_dir / filename path = self.output_dir / filename
routing_calls = []
try:
for call in plan_calls(endpoint, params):
routing_calls.append({"endpoint": call.endpoint, "params": call.params})
except Exception:
routing_calls = []
payload = { payload = {
"task_code": self.task_code, "task_code": self.task_code,
"run_id": self.run_id, "run_id": self.run_id,
"endpoint": endpoint, "endpoint": endpoint,
"params": params or {}, "params": params or {},
"endpoint_routing": {"calls": routing_calls} if routing_calls else None,
"page_size": page_size, "page_size": page_size,
"pages": pages, "pages": pages,
"total_records": total_records, "total_records": total_records,

181
etl_billiards/build_exe.py Normal file
View File

@@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
"""
ETL GUI 打包脚本
使用 PyInstaller 将 GUI 应用打包为 Windows EXE
用法:
python build_exe.py [--onefile] [--console] [--clean]
参数:
--onefile 打包为单个 EXE 文件(默认为目录模式)
--console 显示控制台窗口(调试用)
--clean 打包前清理旧的构建文件
"""
import os
import sys
import shutil
import subprocess
from pathlib import Path
def get_project_root() -> Path:
"""获取项目根目录"""
return Path(__file__).resolve().parent
def clean_build():
"""清理旧的构建文件"""
project_root = get_project_root()
dirs_to_clean = [
project_root / "build",
project_root / "dist",
]
files_to_clean = [
project_root / "etl_gui.spec",
]
for d in dirs_to_clean:
if d.exists():
print(f"清理目录: {d}")
shutil.rmtree(d)
for f in files_to_clean:
if f.exists():
print(f"清理文件: {f}")
f.unlink()
def build_exe(onefile: bool = False, console: bool = False):
"""构建 EXE"""
project_root = get_project_root()
# 主入口
main_script = project_root / "gui" / "main.py"
# 资源文件
resources_dir = project_root / "gui" / "resources"
database_dir = project_root / "database"
# 构建 PyInstaller 命令
# 使用 ASCII 名称避免 Windows 控制台编码问题
cmd = [
sys.executable, "-m", "PyInstaller",
"--name", "ETL_Manager",
"--noconfirm",
]
# 单文件或目录模式
if onefile:
cmd.append("--onefile")
else:
cmd.append("--onedir")
# 窗口模式
if not console:
cmd.append("--windowed")
# 添加数据文件
# 样式表
if resources_dir.exists():
cmd.extend(["--add-data", f"{resources_dir};gui/resources"])
# 数据库 SQL 文件
if database_dir.exists():
for sql_file in database_dir.glob("*.sql"):
cmd.extend(["--add-data", f"{sql_file};database"])
# 隐式导入
hidden_imports = [
"PySide6.QtCore",
"PySide6.QtGui",
"PySide6.QtWidgets",
"psycopg2",
"psycopg2.extras",
"psycopg2.extensions",
# GUI 模块
"gui.models.task_model",
"gui.models.schedule_model",
"gui.utils.cli_builder",
"gui.utils.config_helper",
"gui.utils.app_settings",
"gui.workers.task_worker",
"gui.workers.db_worker",
"gui.widgets.settings_dialog",
]
for imp in hidden_imports:
cmd.extend(["--hidden-import", imp])
# 排除不需要的模块(减小体积)
excludes = [
"matplotlib",
"numpy",
"pandas",
"scipy",
"PIL",
"cv2",
"tkinter",
]
for exc in excludes:
cmd.extend(["--exclude-module", exc])
# 工作目录
cmd.extend(["--workpath", str(project_root / "build")])
cmd.extend(["--distpath", str(project_root / "dist")])
cmd.extend(["--specpath", str(project_root)])
# 主脚本
cmd.append(str(main_script))
print("执行命令:")
print(" ".join(cmd))
print()
# 执行打包
result = subprocess.run(cmd, cwd=str(project_root))
if result.returncode == 0:
print()
print("=" * 50)
print("打包成功!")
print(f"输出目录: {project_root / 'dist'}")
print("=" * 50)
else:
print()
print("打包失败,请检查错误信息")
sys.exit(1)
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="ETL GUI 打包工具")
parser.add_argument("--onefile", action="store_true", help="打包为单个 EXE")
parser.add_argument("--console", action="store_true", help="显示控制台窗口")
parser.add_argument("--clean", action="store_true", help="打包前清理")
args = parser.parse_args()
# 检查 PyInstaller
try:
import PyInstaller
print(f"PyInstaller 版本: {PyInstaller.__version__}")
except ImportError:
print("错误: 未安装 PyInstaller")
print("请运行: pip install pyinstaller")
sys.exit(1)
# 清理
if args.clean:
clean_build()
# 构建
build_exe(onefile=args.onefile, console=args.console)
if __name__ == "__main__":
main()

View File

@@ -40,6 +40,34 @@ def parse_args():
parser.add_argument("--api-timeout", type=int, help="API超时(秒)") parser.add_argument("--api-timeout", type=int, help="API超时(秒)")
parser.add_argument("--api-page-size", type=int, help="分页大小") parser.add_argument("--api-page-size", type=int, help="分页大小")
parser.add_argument("--api-retry-max", type=int, help="API重试最大次数") parser.add_argument("--api-retry-max", type=int, help="API重试最大次数")
# 回溯/手动窗口
parser.add_argument(
"--window-start",
dest="window_start",
help="固定时间窗口开始优先级高于游标例如2025-07-01 00:00:00",
)
parser.add_argument(
"--window-end",
dest="window_end",
help="固定时间窗口结束(优先级高于游标,推荐用月末+1例如2025-08-01 00:00:00",
)
parser.add_argument(
"--force-window-override",
action="store_true",
help="强制使用 window_start/window_end不走 MAX(fetched_at) 兜底",
)
parser.add_argument(
"--window-split-unit",
dest="window_split_unit",
help="窗口切分单位month/none默认来自配置 run.window_split.unit",
)
parser.add_argument(
"--window-compensation-hours",
dest="window_compensation_hours",
type=int,
help="窗口前后补偿小时数,默认来自配置 run.window_split.compensation_hours",
)
# 目录参数 # 目录参数
parser.add_argument("--export-root", help="导出根目录") parser.add_argument("--export-root", help="导出根目录")
@@ -108,6 +136,22 @@ def build_cli_overrides(args) -> dict:
if args.write_pretty_json: if args.write_pretty_json:
overrides.setdefault("io", {})["write_pretty_json"] = True overrides.setdefault("io", {})["write_pretty_json"] = True
# 回溯/手动窗口
if args.window_start or args.window_end:
overrides.setdefault("run", {}).setdefault("window_override", {})
if args.window_start:
overrides["run"]["window_override"]["start"] = args.window_start
if args.window_end:
overrides["run"]["window_override"]["end"] = args.window_end
if args.force_window_override:
overrides.setdefault("run", {})["force_window_override"] = True
if args.window_split_unit:
overrides.setdefault("run", {}).setdefault("window_split", {})["unit"] = args.window_split_unit
if args.window_compensation_hours is not None:
overrides.setdefault("run", {}).setdefault("window_split", {})[
"compensation_hours"
] = args.window_compensation_hours
# 运行窗口 # 运行窗口
if args.idle_start: if args.idle_start:
overrides.setdefault("run", {}).setdefault("idle_window", {})["start"] = args.idle_start overrides.setdefault("run", {}).setdefault("idle_window", {})["start"] = args.idle_start

View File

@@ -58,6 +58,10 @@ DEFAULTS = {
"default_idle": 180, "default_idle": 180,
}, },
"overlap_seconds": 120, "overlap_seconds": 120,
"window_split": {
"unit": "month",
"compensation_hours": 2,
},
"idle_window": { "idle_window": {
"start": "04:00", "start": "04:00",
"end": "16:00", "end": "16:00",
@@ -65,8 +69,8 @@ DEFAULTS = {
"allow_empty_result_advance": True, "allow_empty_result_advance": True,
}, },
"io": { "io": {
"export_root": r"C:\dev\LLTQ\export\JSON", "export_root": "export/JSON",
"log_root": r"C:\dev\LLTQ\export\LOG", "log_root": "export/LOG",
"manifest_name": "manifest.json", "manifest_name": "manifest.json",
"ingest_report_name": "ingest_report.json", "ingest_report_name": "ingest_report.json",
"write_pretty_json": True, "write_pretty_json": True,
@@ -76,7 +80,7 @@ DEFAULTS = {
# 运行流程FETCH_ONLY仅在线抓取落盘、INGEST_ONLY本地清洗入库、FULL抓取 + 清洗入库) # 运行流程FETCH_ONLY仅在线抓取落盘、INGEST_ONLY本地清洗入库、FULL抓取 + 清洗入库)
"flow": "FULL", "flow": "FULL",
# 在线抓取 JSON 输出根目录按任务、run_id 与时间自动创建子目录) # 在线抓取 JSON 输出根目录按任务、run_id 与时间自动创建子目录)
"fetch_root": r"C:\dev\LLTQ\export\JSON", "fetch_root": "export/JSON",
# 本地清洗入库时的 JSON 输入目录(为空则默认使用本次抓取目录) # 本地清洗入库时的 JSON 输入目录(为空则默认使用本次抓取目录)
"ingest_source_dir": "", "ingest_source_dir": "",
}, },
@@ -97,10 +101,19 @@ DEFAULTS = {
}, },
"ods": { "ods": {
# ODS 离线重建/回放相关(仅开发/运维使用) # ODS 离线重建/回放相关(仅开发/运维使用)
"json_doc_dir": r"C:\dev\LLTQ\export\test-json-doc", "json_doc_dir": "export/test-json-doc",
"include_files": "", "include_files": "",
"drop_schema_first": True, "drop_schema_first": True,
}, },
"integrity": {
"mode": "history",
"history_start": "2025-07-01",
"history_end": "",
"include_dimensions": False,
"auto_check": False,
"ods_task_codes": "",
},
} }
# 任务代码常量 # 任务代码常量

View File

@@ -40,11 +40,22 @@ ENV_MAP = {
"IDLE_WINDOW_END": ("run.idle_window.end",), "IDLE_WINDOW_END": ("run.idle_window.end",),
"ALLOW_EMPTY_RESULT_ADVANCE": ("run.allow_empty_result_advance",), "ALLOW_EMPTY_RESULT_ADVANCE": ("run.allow_empty_result_advance",),
"ALLOW_EMPTY_ADVANCE": ("run.allow_empty_result_advance",), "ALLOW_EMPTY_ADVANCE": ("run.allow_empty_result_advance",),
"WINDOW_START": ("run.window_override.start",),
"WINDOW_END": ("run.window_override.end",),
"WINDOW_SPLIT_UNIT": ("run.window_split.unit",),
"WINDOW_COMPENSATION_HOURS": ("run.window_split.compensation_hours",),
"PIPELINE_FLOW": ("pipeline.flow",), "PIPELINE_FLOW": ("pipeline.flow",),
"JSON_FETCH_ROOT": ("pipeline.fetch_root",), "JSON_FETCH_ROOT": ("pipeline.fetch_root",),
"JSON_SOURCE_DIR": ("pipeline.ingest_source_dir",), "JSON_SOURCE_DIR": ("pipeline.ingest_source_dir",),
"FETCH_ROOT": ("pipeline.fetch_root",), "FETCH_ROOT": ("pipeline.fetch_root",),
"INGEST_SOURCE_DIR": ("pipeline.ingest_source_dir",), "INGEST_SOURCE_DIR": ("pipeline.ingest_source_dir",),
"INTEGRITY_MODE": ("integrity.mode",),
"INTEGRITY_HISTORY_START": ("integrity.history_start",),
"INTEGRITY_HISTORY_END": ("integrity.history_end",),
"INTEGRITY_INCLUDE_DIMENSIONS": ("integrity.include_dimensions",),
"INTEGRITY_AUTO_CHECK": ("integrity.auto_check",),
"INTEGRITY_AUTO_BACKFILL": ("integrity.auto_backfill",),
"INTEGRITY_ODS_TASK_CODES": ("integrity.ods_task_codes",),
} }

View File

@@ -25,46 +25,54 @@ class DatabaseOperations:
use_returning = "RETURNING" in sql.upper() use_returning = "RETURNING" in sql.upper()
with self.conn.cursor() as c: # 不带 RETURNING直接批量执行即可
if not use_returning: if not use_returning:
with self.conn.cursor() as c:
psycopg2.extras.execute_batch(c, sql, rows, page_size=page_size) psycopg2.extras.execute_batch(c, sql, rows, page_size=page_size)
return (0, 0) return (0, 0)
# 尝试向量化执行 # 尝试向量化执行execute_values + fetch returning
vectorized_failed = False
m = re.search(r"VALUES\s*\((.*?)\)", sql, flags=re.IGNORECASE | re.DOTALL)
if m:
tpl = "(" + m.group(1) + ")"
base_sql = sql[:m.start()] + "VALUES %s" + sql[m.end():]
try: try:
m = re.search(r"VALUES\s*\((.*?)\)", sql, flags=re.IGNORECASE | re.DOTALL) with self.conn.cursor() as c:
if m:
tpl = "(" + m.group(1) + ")"
base_sql = sql[:m.start()] + "VALUES %s" + sql[m.end():]
ret = psycopg2.extras.execute_values( ret = psycopg2.extras.execute_values(
c, base_sql, rows, template=tpl, page_size=page_size, fetch=True c, base_sql, rows, template=tpl, page_size=page_size, fetch=True
) )
if not ret:
if not ret: return (0, 0)
return (0, 0) inserted = sum(1 for rec in ret if self._is_inserted(rec))
return (inserted, len(ret) - inserted)
inserted = sum(1 for rec in ret if self._is_inserted(rec)) except Exception:
return (inserted, len(ret) - inserted) # 向量化失败后,事务通常处于 aborted 状态,需要先 rollback 才能继续执行。
vectorized_failed = True
if vectorized_failed:
try:
self.conn.rollback()
except Exception: except Exception:
pass pass
# 回退:逐行执行 # 回退:逐行执行
inserted = 0 inserted = 0
updated = 0 updated = 0
with self.conn.cursor() as c:
for r in rows: for r in rows:
c.execute(sql, r) c.execute(sql, r)
try: try:
rec = c.fetchone() rec = c.fetchone()
except Exception: except Exception:
rec = None rec = None
if self._is_inserted(rec): if self._is_inserted(rec):
inserted += 1 inserted += 1
else: else:
updated += 1 updated += 1
return (inserted, updated) return (inserted, updated)
@staticmethod @staticmethod
def _is_inserted(rec) -> bool: def _is_inserted(rec) -> bool:

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,50 @@
-- DWS schema for aggregated / serving tables.
CREATE SCHEMA IF NOT EXISTS billiards_dws;
CREATE TABLE IF NOT EXISTS billiards_dws.dws_order_summary (
site_id BIGINT NOT NULL,
order_settle_id BIGINT NOT NULL,
order_trade_no TEXT,
order_date DATE,
tenant_id BIGINT,
member_id BIGINT,
member_flag BOOLEAN,
recharge_order_flag BOOLEAN,
item_count INT,
total_item_quantity NUMERIC,
table_fee_amount NUMERIC,
assistant_service_amount NUMERIC,
goods_amount NUMERIC,
group_amount NUMERIC,
total_coupon_deduction NUMERIC,
member_discount_amount NUMERIC,
manual_discount_amount NUMERIC,
order_original_amount NUMERIC,
order_final_amount NUMERIC,
stored_card_deduct NUMERIC,
external_paid_amount NUMERIC,
total_paid_amount NUMERIC,
book_table_flow NUMERIC,
book_assistant_flow NUMERIC,
book_goods_flow NUMERIC,
book_group_flow NUMERIC,
book_order_flow NUMERIC,
order_effective_consume_cash NUMERIC,
order_effective_recharge_cash NUMERIC,
order_effective_flow NUMERIC,
refund_amount NUMERIC,
net_income NUMERIC,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
PRIMARY KEY (site_id, order_settle_id)
);
CREATE INDEX IF NOT EXISTS idx_dws_order_summary_order_date
ON billiards_dws.dws_order_summary (order_date);
CREATE INDEX IF NOT EXISTS idx_dws_order_summary_tenant_date
ON billiards_dws.dws_order_summary (tenant_id, order_date);
CREATE INDEX IF NOT EXISTS idx_dws_order_summary_member_date
ON billiards_dws.dws_order_summary (member_id, order_date);

View File

@@ -0,0 +1,105 @@
-- 文件说明etl_admin 调度元数据 DDL独立文件便于初始化任务单独执行
-- 包含任务注册表、游标表、运行记录表;字段注释使用中文。
CREATE SCHEMA IF NOT EXISTS etl_admin;
CREATE TABLE IF NOT EXISTS etl_admin.etl_task (
task_id BIGSERIAL PRIMARY KEY,
task_code TEXT NOT NULL,
store_id BIGINT NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
cursor_field TEXT,
window_minutes_default INT DEFAULT 30,
overlap_seconds INT DEFAULT 120,
page_size INT DEFAULT 200,
retry_max INT DEFAULT 3,
params JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (task_code, store_id)
);
COMMENT ON TABLE etl_admin.etl_task IS '任务注册表:调度依据的任务清单(与 task_registry 中的任务码对应)。';
COMMENT ON COLUMN etl_admin.etl_task.task_code IS '任务编码,需与代码中的任务码一致。';
COMMENT ON COLUMN etl_admin.etl_task.store_id IS '门店/租户粒度,区分多门店执行。';
COMMENT ON COLUMN etl_admin.etl_task.enabled IS '是否启用此任务。';
COMMENT ON COLUMN etl_admin.etl_task.cursor_field IS '增量游标字段名(可选)。';
COMMENT ON COLUMN etl_admin.etl_task.window_minutes_default IS '默认时间窗口(分钟)。';
COMMENT ON COLUMN etl_admin.etl_task.overlap_seconds IS '窗口重叠秒数,用于防止遗漏。';
COMMENT ON COLUMN etl_admin.etl_task.page_size IS '默认分页大小。';
COMMENT ON COLUMN etl_admin.etl_task.retry_max IS 'API重试次数上限。';
COMMENT ON COLUMN etl_admin.etl_task.params IS '任务级自定义参数 JSON。';
COMMENT ON COLUMN etl_admin.etl_task.created_at IS '创建时间。';
COMMENT ON COLUMN etl_admin.etl_task.updated_at IS '更新时间。';
CREATE TABLE IF NOT EXISTS etl_admin.etl_cursor (
cursor_id BIGSERIAL PRIMARY KEY,
task_id BIGINT NOT NULL REFERENCES etl_admin.etl_task(task_id) ON DELETE CASCADE,
store_id BIGINT NOT NULL,
last_start TIMESTAMPTZ,
last_end TIMESTAMPTZ,
last_id BIGINT,
last_run_id BIGINT,
extra JSONB DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (task_id, store_id)
);
COMMENT ON TABLE etl_admin.etl_cursor IS '任务游标表:记录每个任务/门店的增量窗口及最后 run。';
COMMENT ON COLUMN etl_admin.etl_cursor.task_id IS '关联 etl_task.task_id。';
COMMENT ON COLUMN etl_admin.etl_cursor.store_id IS '门店/租户粒度。';
COMMENT ON COLUMN etl_admin.etl_cursor.last_start IS '上次窗口开始时间(含重叠偏移)。';
COMMENT ON COLUMN etl_admin.etl_cursor.last_end IS '上次窗口结束时间。';
COMMENT ON COLUMN etl_admin.etl_cursor.last_id IS '上次处理的最大主键/游标值(可选)。';
COMMENT ON COLUMN etl_admin.etl_cursor.last_run_id IS '上次运行ID对应 etl_run.run_id。';
COMMENT ON COLUMN etl_admin.etl_cursor.extra IS '附加游标信息 JSON。';
COMMENT ON COLUMN etl_admin.etl_cursor.created_at IS '创建时间。';
COMMENT ON COLUMN etl_admin.etl_cursor.updated_at IS '更新时间。';
CREATE TABLE IF NOT EXISTS etl_admin.etl_run (
run_id BIGSERIAL PRIMARY KEY,
run_uuid TEXT NOT NULL,
task_id BIGINT NOT NULL REFERENCES etl_admin.etl_task(task_id) ON DELETE CASCADE,
store_id BIGINT NOT NULL,
status TEXT NOT NULL,
started_at TIMESTAMPTZ DEFAULT now(),
ended_at TIMESTAMPTZ,
window_start TIMESTAMPTZ,
window_end TIMESTAMPTZ,
window_minutes INT,
overlap_seconds INT,
fetched_count INT DEFAULT 0,
loaded_count INT DEFAULT 0,
updated_count INT DEFAULT 0,
skipped_count INT DEFAULT 0,
error_count INT DEFAULT 0,
unknown_fields INT DEFAULT 0,
export_dir TEXT,
log_path TEXT,
request_params JSONB DEFAULT '{}'::jsonb,
manifest JSONB DEFAULT '{}'::jsonb,
error_message TEXT,
extra JSONB DEFAULT '{}'::jsonb
);
COMMENT ON TABLE etl_admin.etl_run IS '运行记录表:记录每次任务执行的窗口、状态、计数与日志路径。';
COMMENT ON COLUMN etl_admin.etl_run.run_uuid IS '本次调度的唯一标识。';
COMMENT ON COLUMN etl_admin.etl_run.task_id IS '关联 etl_task.task_id。';
COMMENT ON COLUMN etl_admin.etl_run.store_id IS '门店/租户粒度。';
COMMENT ON COLUMN etl_admin.etl_run.status IS '运行状态SUCC/FAIL/PARTIAL 等)。';
COMMENT ON COLUMN etl_admin.etl_run.started_at IS '开始时间。';
COMMENT ON COLUMN etl_admin.etl_run.ended_at IS '结束时间。';
COMMENT ON COLUMN etl_admin.etl_run.window_start IS '本次窗口开始时间。';
COMMENT ON COLUMN etl_admin.etl_run.window_end IS '本次窗口结束时间。';
COMMENT ON COLUMN etl_admin.etl_run.window_minutes IS '窗口跨度(分钟)。';
COMMENT ON COLUMN etl_admin.etl_run.overlap_seconds IS '窗口重叠秒数。';
COMMENT ON COLUMN etl_admin.etl_run.fetched_count IS '抓取/读取的记录数。';
COMMENT ON COLUMN etl_admin.etl_run.loaded_count IS '插入的记录数。';
COMMENT ON COLUMN etl_admin.etl_run.updated_count IS '更新的记录数。';
COMMENT ON COLUMN etl_admin.etl_run.skipped_count IS '跳过的记录数。';
COMMENT ON COLUMN etl_admin.etl_run.error_count IS '错误记录数。';
COMMENT ON COLUMN etl_admin.etl_run.unknown_fields IS '未知字段计数(清洗阶段)。';
COMMENT ON COLUMN etl_admin.etl_run.export_dir IS '抓取/导出目录。';
COMMENT ON COLUMN etl_admin.etl_run.log_path IS '日志路径。';
COMMENT ON COLUMN etl_admin.etl_run.request_params IS '请求参数 JSON。';
COMMENT ON COLUMN etl_admin.etl_run.manifest IS '运行产出清单/统计 JSON。';
COMMENT ON COLUMN etl_admin.etl_run.error_message IS '错误信息(若失败)。';
COMMENT ON COLUMN etl_admin.etl_run.extra IS '附加字段,保留扩展。';

File diff suppressed because it is too large Load Diff

View File

@@ -1,34 +1,37 @@
-- 将新的 ODS 任务注册到 etl_admin.etl_task根据需要替换 store_id -- 将新的 ODS 任务注册到 etl_admin.etl_task按需替换 store_id
-- 使用方式(示例): -- 使用方式(示例):
-- psql "$PG_DSN" -f etl_billiards/database/seed_ods_tasks.sql -- psql "$PG_DSN" -f etl_billiards/database/seed_ods_tasks.sql
-- 或在 psql 中执行本文件内容。 -- 或在 psql 中直接执行本文件内容。
WITH target_store AS ( WITH target_store AS (
SELECT 2790685415443269::bigint AS store_id -- TODO: 替换为实际 store_id SELECT 2790685415443269::bigint AS store_id -- TODO: 替换为实际 store_id
), ),
task_codes AS ( task_codes AS (
SELECT unnest(ARRAY[ SELECT unnest(ARRAY[
'ODS_ASSISTANT_ACCOUNTS', -- Must match tasks/ods_tasks.py (ENABLED_ODS_CODES)
'ODS_ASSISTANT_ACCOUNT',
'ODS_ASSISTANT_LEDGER', 'ODS_ASSISTANT_LEDGER',
'ODS_ASSISTANT_ABOLISH', 'ODS_ASSISTANT_ABOLISH',
'ODS_INVENTORY_CHANGE', 'ODS_SETTLEMENT_RECORDS',
'ODS_INVENTORY_STOCK', 'ODS_TABLE_USE',
'ODS_PACKAGE',
'ODS_GROUP_BUY_REDEMPTION',
'ODS_MEMBER',
'ODS_MEMBER_BALANCE',
'ODS_MEMBER_CARD',
'ODS_PAYMENT', 'ODS_PAYMENT',
'ODS_REFUND', 'ODS_REFUND',
'ODS_COUPON_VERIFY', 'ODS_PLATFORM_COUPON',
'ODS_MEMBER',
'ODS_MEMBER_CARD',
'ODS_MEMBER_BALANCE',
'ODS_RECHARGE_SETTLE', 'ODS_RECHARGE_SETTLE',
'ODS_GROUP_PACKAGE',
'ODS_GROUP_BUY_REDEMPTION',
'ODS_INVENTORY_STOCK',
'ODS_INVENTORY_CHANGE',
'ODS_TABLES', 'ODS_TABLES',
'ODS_GOODS_CATEGORY', 'ODS_GOODS_CATEGORY',
'ODS_STORE_GOODS', 'ODS_STORE_GOODS',
'ODS_TABLE_DISCOUNT', 'ODS_STORE_GOODS_SALES',
'ODS_TABLE_FEE_DISCOUNT',
'ODS_TENANT_GOODS', 'ODS_TENANT_GOODS',
'ODS_SETTLEMENT_TICKET', 'ODS_SETTLEMENT_TICKET'
'ODS_ORDER_SETTLE'
]) AS task_code ]) AS task_code
) )
INSERT INTO etl_admin.etl_task (task_code, store_id, enabled) INSERT INTO etl_admin.etl_task (task_code, store_id, enabled)
@@ -36,4 +39,3 @@ SELECT t.task_code, s.store_id, TRUE
FROM task_codes t CROSS JOIN target_store s FROM task_codes t CROSS JOIN target_store s
ON CONFLICT (task_code, store_id) DO UPDATE ON CONFLICT (task_code, store_id) DO UPDATE
SET enabled = EXCLUDED.enabled; SET enabled = EXCLUDED.enabled;

View File

@@ -0,0 +1,50 @@
-- Seed scheduler-compatible tasks into etl_admin.etl_task.
--
-- Notes:
-- - These task_code values must match orchestration/task_registry.py.
-- - ODS_* tasks are intentionally excluded here because they don't follow the
-- BaseTask(cursor_data) scheduler interface in this repo version.
--
-- Usage (example):
-- psql "%PG_DSN%" -f etl_billiards/database/seed_scheduler_tasks.sql
--
WITH target_store AS (
SELECT 2790685415443269::bigint AS store_id -- TODO: replace with your store_id
),
task_codes AS (
SELECT unnest(ARRAY[
'ASSISTANT_ABOLISH',
'ASSISTANTS',
'COUPON_USAGE',
'CHECK_CUTOFF',
'DATA_INTEGRITY_CHECK',
'DWD_LOAD_FROM_ODS',
'DWD_QUALITY_CHECK',
'INIT_DWD_SCHEMA',
'INIT_DWS_SCHEMA',
'INIT_ODS_SCHEMA',
'INVENTORY_CHANGE',
'LEDGER',
'MANUAL_INGEST',
'MEMBERS',
'MEMBERS_DWD',
'ODS_JSON_ARCHIVE',
'ORDERS',
'PACKAGES_DEF',
'PAYMENTS',
'PAYMENTS_DWD',
'PRODUCTS',
'REFUNDS',
'TABLE_DISCOUNT',
'TABLES',
'TICKET_DWD',
'TOPUPS',
'DWS_BUILD_ORDER_SUMMARY'
]) AS task_code
)
INSERT INTO etl_admin.etl_task (task_code, store_id, enabled)
SELECT t.task_code, s.store_id, TRUE
FROM task_codes t CROSS JOIN target_store s
ON CONFLICT (task_code, store_id) DO UPDATE
SET enabled = EXCLUDED.enabled,
updated_at = now();

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More