Files

375 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 设计文档ETL 管道全流程调试与 Debug
## 概述
本设计覆盖 `apps/etl/pipelines/feiqiu/` 下 ETL 管道的全流程调试,采用五阶段策略:
1. **分层单元调试**逐层ODS → DWD → DWS → INDEX使用 FakeDB/FakeAPI 和真实数据库验证任务逻辑
2. **全量数据刷新**:执行 2026-01-01 至 2026-02-16 的 `api_full` Flow内嵌性能计时边执行边 Debug发现问题即修复并重试
3. **黑盒数据校验**:从 API 源数据出发,逐层对比各 Schema 各表的记录数、金额汇总、可疑值检测和抽样逐字段比对
4. **架构优化分析**:分析 ETL 整体结构,在保证稳定和正确的基础上提出精简架构的建议,生成架构报告
5. **报告生成**:基于全量刷新阶段采集的计时数据生成性能分析报告,汇总所有调试结果生成 Debug 报告
性能计时从全量刷新阶段开始就嵌入采集(每任务/每层/API 调用耗时),报告阶段仅做分析和输出,不再单独重跑流程采集数据。
调试使用 `.env` 中配置的真实 API 和数据库连接test_etl_feiqiu所有发现的问题和修复记录在结构化 Debug 报告中。性能计时在全量刷新阶段实时采集,报告阶段读取中间数据做分析输出。
## 架构
### 调试流程架构
```mermaid
flowchart TD
A[阶段1: 分层单元调试] --> B[阶段2: 全量数据刷新<br/>内嵌性能计时]
B --> C[阶段3: 黑盒数据校验]
C --> D[阶段4: 架构优化分析]
D --> E[阶段5: 报告生成<br/>含性能分析]
subgraph 阶段1 [分层单元调试]
A1[ODS 层: API抓取 → 表写入] --> A2[DWD 层: ODS → DWD 清洗装载]
A2 --> A3[DWS 层: DWD → DWS 汇总]
A3 --> A4[INDEX 层: 指数算法计算]
A4 --> A5[编排层: PipelineRunner + TaskExecutor]
A5 --> A6[CLI 入口: 参数解析 + 模式切换]
A6 --> A7[配置体系: AppConfig 加载合并]
end
subgraph 阶段2 [全量数据刷新 - 内嵌计时 + 边执行边Debug]
B1[api_full Flow 执行<br/>每任务/每层计时] --> B2{发现Bug?}
B2 -->|是| B3[修复Bug]
B3 --> B4[重试失败的层/任务]
B4 --> B2
B2 -->|否| B5[increment_verify 校验]
B5 --> B6[自动补齐缺失数据]
B6 --> B7[输出计时 JSON 中间文件]
end
subgraph 阶段3 [黑盒数据校验]
C1[API → ODS 记录数对比] --> C2[ODS → DWD 记录数+金额对比]
C2 --> C3[DWD → DWS 聚合一致性]
C3 --> C4[可疑值检测: 边缘值/空值/重复]
C4 --> C5[抽样100条逐字段比对]
end
subgraph 阶段4 [架构优化分析]
D1[代码结构分析] --> D2[冗余识别]
D2 --> D3[精简建议]
end
subgraph 阶段5 [报告生成 - 含性能分析]
E1[读取计时 JSON] --> E2[瓶颈识别 + 优化建议]
E2 --> E3[汇总 Debug 报告]
end
```
### 现有系统架构
```mermaid
flowchart LR
CLI[cli/main.py] --> PR[PipelineRunner]
CLI --> TE[TaskExecutor]
PR --> TE
TE --> TR[TaskRegistry<br/>52个任务]
TE --> CM[CursorManager]
TE --> RT[RunTracker]
TR --> ODS[ODS 任务<br/>BaseOdsTask × 23]
TR --> DWD[DWD 任务<br/>DwdLoadTask]
TR --> DWS[DWS 任务<br/>BaseDwsTask × 15]
TR --> IDX[INDEX 任务 × 4]
TR --> UTL[工具类任务 × 7]
ODS --> API[APIClient<br/>上游 SaaS]
ODS --> DB[(PostgreSQL<br/>ods.*)]
DWD --> DB
DWS --> DB
IDX --> DB
QC[IntegrityChecker] --> API
QC --> DB
```
## 组件与接口
### 调试脚本组件
调试通过一组 Python 脚本实现,放置在 `apps/etl/pipelines/feiqiu/scripts/debug/` 目录下:
| 脚本 | 职责 | 对应需求 |
|------|------|----------|
| `debug_ods.py` | ODS 层逐任务调试:连接真实 API 和 DB执行单个 ODS 任务并验证写入结果 | 需求 1, 9 |
| `debug_dwd.py` | DWD 层调试:执行 DWD_LOAD_FROM_ODS 并验证 TABLE_MAP 中每对映射的记录数 | 需求 2, 9 |
| `debug_dws.py` | DWS 层调试:逐个执行 DWS 任务并验证汇总结果 | 需求 3, 9 |
| `debug_index.py` | INDEX 层调试:执行 4 个指数任务并验证计算结果 | 需求 4, 9 |
| `debug_orchestration.py` | 编排层调试:验证 PipelineRunner 和 TaskExecutor 的流程控制 | 需求 5 |
| `run_full_refresh.py` | 全量刷新:执行 2026-01-01 ~ 2026-02-16 的 api_full内嵌性能计时边执行边 Debug发现问题即修复重试输出计时 JSON | 需求 10, 13 |
| `debug_blackbox.py` | 黑盒校验:从 API 源数据逐层对比各表数据完整性 + 可疑值检测 + 抽样逐字段比对 | 需求 12 |
| `analyze_performance.py` | 性能分析:读取全量刷新采集的计时 JSON统计耗时识别瓶颈生成性能优化报告 | 需求 13 |
| `analyze_architecture.py` | 架构分析:分析代码结构,识别冗余和可精简点,生成架构优化报告 | 需求 14 |
| `generate_report.py` | 报告生成:汇总所有调试结果生成 Markdown 报告 | 需求 11 |
### 关键接口
```python
# 调试脚本的统一入口接口
class DebugResult:
"""单个调试步骤的结果"""
layer: str # "ODS" / "DWD" / "DWS" / "INDEX" / "ORCHESTRATION"
task_code: str # 任务代码
status: str # "PASS" / "FAIL" / "WARN" / "ERROR"
message: str # 结果描述
details: dict # 详细信息(记录数、错误堆栈等)
fix_applied: str | None # 已应用的修复措施
class BlackboxCheckResult:
"""黑盒校验单表结果"""
layer: str # "API_ODS" / "ODS_DWD" / "DWD_DWS"
source_table: str # 源表名
target_table: str # 目标表名
source_count: int # 源记录数
target_count: int # 目标记录数
count_diff: int # 差异数
amount_diffs: list # 金额差异列表
missing_keys: list # 缺失记录主键
mismatch_count: int # 内容不一致数
```
## 数据模型
### Debug 报告结构
```python
@dataclass
class DebugReport:
"""Debug 报告数据模型"""
title: str # 报告标题
generated_at: datetime # 生成时间
environment: dict # 环境信息DB DSN, API base, store_id
# 分层调试结果
ods_results: list[DebugResult] # ODS 层调试结果
dwd_results: list[DebugResult] # DWD 层调试结果
dws_results: list[DebugResult] # DWS 层调试结果
index_results: list[DebugResult] # INDEX 层调试结果
orchestration_results: list[DebugResult] # 编排层调试结果
# 黑盒校验结果
blackbox_results: list[BlackboxCheckResult]
# 全量刷新结果
full_refresh: dict # {status, layers, counts, duration}
verification: dict # {status, total_tables, consistent, backfilled}
# 性能分析
performance_report: dict # {layer_timings, bottlenecks, recommendations}
# 架构优化
architecture_report: dict # {structure_analysis, redundancies, simplification_suggestions}
# 汇总
total_issues: int # 发现的问题总数
fixed_issues: int # 已修复的问题数
remaining_issues: int # 遗留问题数
```
### 黑盒校验数据流
```mermaid
flowchart TD
API[上游 API] -->|iter_paginated| COUNT_API[API 记录数]
API -->|抽样100条| SAMPLE[抽样记录集]
ODS[(ods.* 表)] -->|SELECT COUNT| COUNT_ODS[ODS 记录数]
ODS -->|可疑值检测| SUSPECT[可疑值: 边缘值/空值/重复]
DWD[(dwd.* 表)] -->|SELECT COUNT| COUNT_DWD[DWD 记录数]
DWS[(dws.* 表)] -->|SELECT SUM| SUM_DWS[DWS 汇总值]
COUNT_API --> CMP1{API vs ODS<br/>记录数对比}
COUNT_ODS --> CMP1
COUNT_ODS --> CMP2{ODS vs DWD<br/>记录数+金额对比}
COUNT_DWD --> CMP2
COUNT_DWD --> CMP3{DWD vs DWS<br/>聚合一致性}
SUM_DWS --> CMP3
SAMPLE --> CMP4{抽样逐字段比对<br/>API vs ODS 100条}
ODS --> CMP4
SUSPECT --> CMP5{可疑值分析<br/>追溯上游原因}
CMP1 --> RPT[校验报告]
CMP2 --> RPT
CMP3 --> RPT
CMP4 --> RPT
CMP5 --> RPT
```
## 正确性属性
*属性是在系统所有有效执行中都应成立的特征或行为——本质上是关于系统应该做什么的形式化陈述。属性是人类可读规范与机器可验证正确性保证之间的桥梁。*
基于前置分析,以下属性覆盖了可自动化测试的验收标准。本项目的许多需求(特别是需求 8-12 的黑盒校验和全量刷新)依赖真实数据库和 API 连接,属于集成测试范畴,不适合属性测试。属性测试聚焦于可用 FakeDB/FakeAPI 验证的核心逻辑。
### Property 1: ODS 任务提取记录数一致性
*对任意* ODS 任务和 FakeAPI 提供的任意非空记录列表,任务执行后 FakeDB 接收到的记录数应等于 API 提供的记录数减去被跳过的记录数(缺失主键 + 重复哈希)。
**Validates: Requirements 1.1, 1.2**
### Property 2: ODS 冲突处理策略正确性
*对任意* `ods_conflict_mode` 配置值nothing/backfill/updateBaseOdsTask 生成的 SQL 语句应包含对应的冲突处理子句nothing → `DO NOTHING`backfill → `COALESCE`update → `IS DISTINCT FROM`
**Validates: Requirements 1.3**
### Property 3: ODS 跳过缺失主键记录
*对任意*包含缺失主键字段的记录集合BaseOdsTask 的 skipped 计数应等于缺失主键的记录数,且这些记录不应出现在写入的行中。
**Validates: Requirements 1.4**
### Property 4: ODS content_hash 去重
*对任意*两条内容相同的 ODS 记录(仅 fetched_at 不同),计算出的 content_hash 应相同;当已有记录的 content_hash 与新记录相同时,新记录应被跳过。
**Validates: Requirements 1.5**
### Property 5: ODS 快照删除标记
*对任意*启用 `snapshot_missing_delete` 的 ODS 任务,当 API 返回的记录集是已有记录集的真子集时,差集中的记录应被标记为 `is_delete=1`
**Validates: Requirements 1.7**
### Property 6: DWD FACT_MAPPINGS 列映射完整性
*对任意* FACT_MAPPINGS 中的映射条目 `(dwd_col, ods_expr, cast_type)`,当 `ods_expr` 是简单列名时,该列应存在于对应的 ODS 源表中。
**Validates: Requirements 2.4**
### Property 7: DWD only_tables 过滤
*对任意*非空的 `dwd.only_tables` 配置列表DwdLoadTask 处理的表集合应是配置列表与 TABLE_MAP 键集合的交集。
**Validates: Requirements 2.6**
### Property 8: DWS 分段累加一致性
*对任意*时间窗口和分段配置BaseTask 的 `_accumulate_counts` 方法对各分段计数的累加结果应等于各分段计数的逐键求和。
**Validates: Requirements 3.3**
### Property 9: PipelineRunner Flow 层解析
*对任意*有效的 Flow 名称PIPELINE_LAYERS 的键PipelineRunner 解析出的层列表应与 PIPELINE_LAYERS 中定义的值完全一致。
**Validates: Requirements 5.1, 10.2**
### Property 10: PipelineRunner 无效 Flow 拒绝
*对任意*不在 PIPELINE_LAYERS 键集合中的字符串PipelineRunner.run() 应抛出 ValueError。
**Validates: Requirements 5.2**
### Property 11: TaskExecutor 工具类任务跳过游标
*对任意*被 TaskRegistry 标记为 `requires_db_config=False` 的任务代码TaskExecutor 应通过 `_run_utility_task` 路径执行,不调用 CursorManager 和 RunTracker。
**Validates: Requirements 5.5**
### Property 12: CLI data_source 解析
*对任意* `--data-source` 参数值online/offline/hybrid`--pipeline-flow` 参数值FULL/FETCH_ONLY/INGEST_ONLY`resolve_data_source` 应返回正确的映射值,且 `--data-source` 优先于 `--pipeline-flow`
**Validates: Requirements 6.3, 6.4**
### Property 13: AppConfig 优先级合并
*对任意*嵌套字典 DEFAULTS 和 CLI 覆盖,`_deep_merge` 后 CLI 中的键值应覆盖 DEFAULTS 中的同名键值,未被覆盖的键应保持原值。
**Validates: Requirements 7.1**
### Property 14: AppConfig store_id 验证
*对任意*非整数字符串作为 `app.store_id`AppConfig._normalize 应抛出 SystemExit。
**Validates: Requirements 7.2**
### Property 15: AppConfig DSN 组装
*对任意* host、port、name、user、password 组合db.dsn 为空时AppConfig._normalize 应组装出格式为 `postgresql://{user}:{password}@{host}:{port}/{name}` 的 DSN 字符串。
**Validates: Requirements 7.3**
### Property 16: AppConfig 点号路径 get
*对任意*嵌套字典和有效的点号路径,`config.get(path)` 应返回路径对应的值;对无效路径应返回 default 参数值。
**Validates: Requirements 7.4**
## 错误处理
### 数据库连接错误
- 调试脚本在启动时验证数据库连接,连接失败时记录错误详情并在报告中标注
- 单表处理失败时回滚该表事务继续处理后续表DWD 层已实现此逻辑)
- 全量刷新过程中连接断开时,利用 `DatabaseConnection.ensure_open()` 尝试重连
### API 连接错误
- APIClient 内置重试机制(`retry_max=3`,指数退避)
- API 返回非 0 code 时抛出 ValueError由上层捕获并记录
- Token 过期时在报告中标注,提示用户更新 `.env` 中的 `API_TOKEN`
### 任务执行错误
- ODS 任务:数据库异常时回滚并递增 errors 计数
- DWD 任务:单表失败时回滚该表,继续后续表,最终汇总错误
- DWS/INDEX 任务:继承 BaseTask 的异常处理,回滚后重新抛出
- 工具类任务:异常直接向上传播,不影响游标和运行记录
### 数据质量错误
- 黑盒校验发现的不一致记录在报告中详细列出主键
- 金额差异超过阈值时标记为 WARN 级别
- 校验后自动补齐通过 `run_backfill` 执行,补齐失败的记录单独记录
## 测试策略
### 双轨测试方法
本项目采用单元测试 + 属性测试的双轨方法:
- **属性测试**:使用 `hypothesis` 库验证上述 16 个正确性属性,每个属性至少运行 100 次迭代
- **单元测试**:使用 `pytest` 验证具体示例、边界情况和错误条件
- **集成测试**:使用真实数据库(`TEST_DB_DSN`)验证端到端数据流
### 属性测试配置
- 库:`hypothesis`(已在项目依赖中)
- 最小迭代次数100
- 每个属性测试必须引用设计文档中的属性编号
- 标签格式:`Feature: etl-pipeline-debug, Property {number}: {property_text}`
### 测试文件组织
```
apps/etl/pipelines/feiqiu/tests/unit/
├── test_debug_ods_properties.py # Property 1-5: ODS 层属性测试
├── test_debug_dwd_properties.py # Property 6-8: DWD/DWS 层属性测试
├── test_debug_orchestration_properties.py # Property 9-12: 编排层属性测试
├── test_debug_config_properties.py # Property 13-16: 配置层属性测试
└── (现有测试文件保持不变)
```
### 集成测试
集成测试通过调试脚本实现,连接真实数据库和 API
- `debug_ods.py`:逐个执行 ODS 任务,验证写入结果
- `debug_dwd.py`:执行 DWD 装载,验证映射正确性
- `debug_blackbox.py`:黑盒校验,逐层对比数据完整性
- `run_full_refresh.py`:全量刷新 + 校验
### 现有测试基础设施
项目已有完善的测试工具:
- `FakeDBOperations`:拦截并记录 SQL 操作,提供 commit/rollback 计数
- `FakeAPIClient`:返回预置内存数据,记录调用参数
- `OfflineAPIClient`:从归档 JSON 回放数据
- `RealDBOperationsAdapter`:连接真实 PostgreSQL 的适配器
- `create_test_config()`:构建测试用 AppConfig