Files

448 lines
20 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 设计文档:数据流字段补全与前后端联调
## 概述
本设计基于 `dataflow_2026-02-19_190440.md` 数据流分析报告,覆盖两大任务:
1. **字段补全**:对 11 张 ODS/DWD 表执行字段映射补全,包括 DDL 更新、ETL loader/task 代码同步、文档精化
2. **DWS 库存汇总**:在 DWS 层新建日/周/月三个粒度的库存汇总表,基于 DWD goods_stock_summary 数据构建
3. **前后端联调**:确保 admin-web 前端与 FastAPI 后端的 ETL 执行流程完整可用,含计时和黑盒测试
核心设计原则:
- **执行依据**:字段补全部分基于排查结论文档 `export/SYSTEM/REPORTS/field_audit/field_review_for_user.md`(由 `FIELD_AUDIT_ROOT` 环境变量配置路径)
- **先确认再新增**:对每个疑似缺失字段,必须先排查是否已存在(可能是命名差异、已映射到其他列、或已在 FACT_MAPPINGS 中以不同名称配置),确认确实缺失后才执行新增
- 所有字段映射变更通过 `DwdLoadTask.FACT_MAPPINGS` 声明式配置,不修改核心合并逻辑
- 新建 DWD 表遵循现有 main/ex 分表模式(核心字段 → main 表,扩展字段 → ex 表)
- DDL 变更通过迁移脚本(`db/etl_feiqiu/migrations/`)执行,同步更新 schema 文件
- 控制无效字段新增:仅在确认字段确实缺失且有业务价值时才新增
## 架构
### 现有 ETL 数据流架构
```mermaid
graph LR
API[上游 SaaS API] -->|JSON| ODS_Loader[GenericODSLoader]
ODS_Loader -->|UPSERT| ODS[(ODS 表)]
ODS -->|SELECT| DWD_Task[DwdLoadTask]
DWD_Task -->|SCD2 合并| DIM[(DWD 维度表)]
DWD_Task -->|增量插入| FACT[(DWD 事实表)]
```
### 字段映射机制
`DwdLoadTask` 使用两层映射策略:
1. **自动映射**ODS 列名与 DWD 列名相同时自动匹配
2. **显式映射**:通过 `FACT_MAPPINGS` 字典声明 `(dwd_col, ods_expr, cast_type)` 三元组
本次变更主要操作 `FACT_MAPPINGS``TABLE_MAP`,以及对应的 DDL。
### 前后端联调架构
```mermaid
graph LR
AdminWeb[Admin Web<br/>React + Ant Design] -->|HTTP/WS| Backend[FastAPI 后端]
Backend -->|subprocess| ETL[ETL CLI]
ETL -->|SQL| DB[(PostgreSQL)]
Backend -->|WebSocket| AdminWeb
```
## 字段排查结论(已完成)
排查工作已完成,详细结论见 `export/SYSTEM/REPORTS/field_audit/field_review_for_user.md`
排查方法包括:查 DWD 表现有列、查 FACT_MAPPINGS、查 ODS 表现有列、查自动映射、查 API JSON 样本、数据库实际数据验证。排查发现 4 个映射错误、21 个待新增字段、2 张需新建 DWD 表、6 个跳过字段。
## 组件与接口
### 任务 1字段补全涉及的组件
| 组件 | 文件路径 | 变更类型 |
|------|---------|---------|
| DWD 加载任务 | `tasks/dwd/dwd_load_task.py` | 修改 `FACT_MAPPINGS``TABLE_MAP` |
| ODS DDL | `db/etl_feiqiu/schemas/ods.sql` | 新增列store_goods_master 嵌套展开) |
| DWD DDL | `db/etl_feiqiu/schemas/dwd.sql` | 新增列、新建表 |
| 迁移脚本 | `db/etl_feiqiu/migrations/` | 新增 ALTER TABLE / CREATE TABLE |
| ODS 加载器 | `loaders/ods/generic.py` | 可能需要扩展 columns 列表 |
| BD_Manual 文档 | `docs/database/` | 更新字段说明 |
### 任务 2前后端联调涉及的组件
| 组件 | 文件路径 | 变更类型 |
|------|---------|---------|
| 执行 API | `apps/backend/app/routers/` | 调试/修复参数传递 |
| 执行页面 | `apps/admin-web/src/pages/TaskManager.tsx` | 调试/修复前端逻辑 |
| 计时模块 | `apps/etl/connectors/feiqiu/utils/` | 新增计时器工具 |
| 黑盒测试 | `apps/etl/connectors/feiqiu/quality/` | 新增数据一致性检查 |
## 数据模型
### 字段补全分类
根据 `field_review_for_user.md` 排查结论,将变更分为四类:
#### 🔴 映射错误修复(高优先级)
| 表 | 问题 | 修正方案 |
|----|------|---------|
| assistant_service_records | DWD `site_assistant_id` 错误映射自 ODS `order_assistant_id` | 修正映射源 + 新增 `order_assistant_id` 列 |
| store_goods_sales_records | DWD `discount_price` 实际映射自 ODS `discount_money`(列名误导) | 重命名 DWD 列 + 新增真正的 `discount_price` |
| store_goods_master | `batch_stock_qty` 映射自 `stock`(错误),`provisional_total_cost` 映射自 `total_purchase_cost`(错误) | 修正 FACT_MAPPINGS 源列 |
#### A 类:新增 DWD 列 + FACT_MAPPINGS
| 表 | 新增字段数 | DWD 目标 |
|----|----------|---------|
| assistant_accounts_master | 4 | dim_assistant_ex |
| assistant_service_records | 2 | dwd_assistant_service_log_ex |
| assistant_cancellation_records | 0仅更新映射 | dwd_assistant_trash_event |
| member_balance_changes | 1 | dwd_member_balance_change_ex |
| site_tables_master | 14 | dim_table_ex |
#### B 类:仅补 FACT_MAPPINGSDWD 列已存在)
| 表 | 说明 |
|----|------|
| recharge_settlements | 5 个字段DWD 列已存在ODS/DWD 两侧数据全为 0业务未启用 |
#### 跳过(无需变更)
| 表 | 原因 |
|----|------|
| tenant_goods_master | `commoditycode``commodity_code` 100% 冗余(花括号包裹格式),跳过 |
| store_goods_mastertime_slot_sale | ODS 列不存在,跳过 |
#### C 类:需新建 DWD 表
| 表 | ODS 字段数 | DWD 新表 | 备注 |
|----|----------|---------|------|
| goods_stock_summary | 14 | dwd_goods_stock_summary | 需先修改 ODS 配置 `requires_window=True` 并重新采集 |
| goods_stock_movements | 19 | dwd_goods_stock_movement | 事实表,按 createtime 增量加载 |
#### C 类:疑似需新建 DWD 表(需排查是否有替代方案)
| 表 | ODS 字段数 | 疑似新建 DWD 表 | 排查重点 |
|----|----------|---------------|---------|
| goods_stock_summary | 14 | dwd_goods_stock_summary | 确认是否有意不建 DWD 表(如数据直接在 ODS 层使用) |
| goods_stock_movements | 19 | dwd_goods_stock_movement | 同上 |
### 已确认的映射关系(排查结论)
以下映射关系已通过数据库实际数据验证确认:
| 字段 | 排查结论 | 所在表 |
|------|---------|-------|
| discount_price (store_goods_sales) | 🔴 DWD `discount_price` 实际映射自 ODS `discount_money`,需重命名 + 新增 | store_goods_sales_records |
| commoditycode (tenant_goods) | ⏭️ 与 `commodity_code` 100% 冗余,跳过 | tenant_goods_master |
| site_assistant_id (assistant_service) | 🔴 DWD 错误映射自 ODS `order_assistant_id`,需修正 | assistant_service_records |
| recharge 电费/券字段 | ✅ DWD 列已存在,仅需补 FACT_MAPPINGS数据全为 0 | recharge_settlements |
| batch_stock_qty (store_goods) | 🔴 错误映射自 `stock`,应映射自 `batch_stock_quantity` | store_goods_master |
| provisional_total_cost (store_goods) | 🔴 错误映射自 `total_purchase_cost`,应映射自 `provisional_total_cost` | store_goods_master |
### 新建 DWD 表设计
#### dwd_goods_stock_summary
```sql
CREATE TABLE dwd.dwd_goods_stock_summary (
site_goods_id bigint NOT NULL,
goods_name text,
goods_unit text,
goods_category_id bigint,
goods_category_second_id bigint,
category_name text,
range_start_stock numeric,
range_end_stock numeric,
range_in numeric,
range_out numeric,
range_sale numeric,
range_sale_money numeric(12,2),
range_inventory numeric,
current_stock numeric,
site_id bigint,
tenant_id bigint,
fetched_at timestamptz,
PRIMARY KEY (site_goods_id)
);
```
#### dwd_goods_stock_movement
```sql
CREATE TABLE dwd.dwd_goods_stock_movement (
site_goods_stock_id bigint NOT NULL,
tenant_id bigint,
site_id bigint,
site_goods_id bigint,
goods_name text,
goods_category_id bigint,
goods_second_category_id bigint,
unit text,
price numeric(12,2),
stock_type integer,
change_num numeric,
start_num numeric,
end_num numeric,
change_num_a numeric,
start_num_a numeric,
end_num_a numeric,
remark text,
operator_name text,
create_time timestamptz,
fetched_at timestamptz,
PRIMARY KEY (site_goods_stock_id)
);
```
### recharge_settlements 映射关系
ODS 列与 DWD 列的对应关系(命名转换):
| ODS 列(驼峰) | DWD 列(蛇形) |
|---------------|--------------|
| plcouponsaleamount | pl_coupon_sale_amount |
| mervousalesamount | mervou_sales_amount |
| electricitymoney | electricity_money |
| realelectricitymoney | real_electricity_money |
| electricityadjustmoney | electricity_adjust_money |
这 5 个字段在 `dwd_recharge_order` 中已有列定义但缺少 FACT_MAPPINGS 条目,需要补充映射。
### store_goods_master 映射修正
根据排查结论,该表存在两个映射错误(非新增字段):
| DWD 列 | 当前错误映射 ODS 列 | 正确 ODS 列 | 验证结果 |
|--------|-------------------|------------|---------|
| `batch_stock_qty` | `stock`(当前库存) | `batch_stock_quantity`(批次库存) | 仅 7.3% 行相等 |
| `provisional_total_cost` | `total_purchase_cost`(实际采购成本) | `provisional_total_cost`(暂估成本) | 93.5% 行相等但 113 行不同 |
`time_slot_sale` ODS 列不存在,跳过。`goodsStockWarningInfo` 嵌套展开不在本次范围内。
### DWS 库存汇总表设计(日/周/月)
基于 `field_review_for_user.md` 第 10 章发现goods_stock_summary API 支持 `startTime`/`endTime` 参数返回时间范围内的库存汇总数据。在 ODS 任务配置修改(`requires_window=True` + `time_fields=("startTime", "endTime")`并重新采集后DWD 层 `dwd_goods_stock_summary` 将拥有带时间范围的真实数据,可在此基础上构建 DWS 层汇总。
#### 三张 DWS 表
| 表名 | 粒度 | 任务代码 | stat_period |
|------|------|---------|-------------|
| `dws.dws_goods_stock_daily_summary` | 日 | `DWS_GOODS_STOCK_DAILY` | `'daily'` |
| `dws.dws_goods_stock_weekly_summary` | 周 | `DWS_GOODS_STOCK_WEEKLY` | `'weekly'` |
| `dws.dws_goods_stock_monthly_summary` | 月 | `DWS_GOODS_STOCK_MONTHLY` | `'monthly'` |
#### DDL 设计(三张表结构相同)
```sql
CREATE TABLE dws.dws_goods_stock_daily_summary (
site_id bigint NOT NULL,
tenant_id bigint,
stat_date date NOT NULL,
site_goods_id bigint NOT NULL,
goods_name text,
goods_unit text,
goods_category_id bigint,
goods_category_second_id bigint,
category_name text,
range_start_stock numeric,
range_end_stock numeric,
range_in numeric,
range_out numeric,
range_sale numeric,
range_sale_money numeric(12,2),
range_inventory numeric,
current_stock numeric,
stat_period text NOT NULL DEFAULT 'daily',
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (site_id, stat_date, site_goods_id)
);
```
周度表和月度表结构相同,仅表名和 `stat_period` 默认值不同(`'weekly'` / `'monthly'`)。
#### 任务实现模式
继承 `BaseDwsTask`,实现 `extract` / `transform` / `load` 三阶段:
- **extract**:从 `dwd.dwd_goods_stock_summary` 按时间范围查询数据
- **transform**:按粒度(日/周/月)对 `stat_date` 进行分组聚合,计算各库存指标
- 日度:直接取 DWD 数据(`stat_date` = 采集日期)
- 周度:按 ISO 周分组,`stat_date` = 周一日期
- 月度:按自然月分组,`stat_date` = 月首日期
- **load**:使用 `upsert` 写入目标表,主键冲突时更新
#### 前置依赖
- 需求 7goods_stock_summary 新建 DWD 表)必须先完成
- ODS 任务配置修改(`requires_window=True`)必须先完成并重新采集数据
#### 文件位置
- DDL`db/etl_feiqiu/schemas/dws.sql`
- 迁移脚本:`db/etl_feiqiu/migrations/{date}__create_dws_goods_stock_summary.sql`
- 任务代码:`apps/etl/connectors/feiqiu/tasks/dws/goods_stock_daily_task.py``goods_stock_weekly_task.py``goods_stock_monthly_task.py`
### settlement_ticket_details 彻底移除设计
从项目中完整移除 `settlement_ticket_details`结账小票详情相关的所有代码、DDL、配置、文档和数据。
#### 需要移除的文件/代码位置
| 层级 | 文件路径 | 移除内容 |
|------|---------|---------|
| ETL 任务定义 | `tasks/ods/ods_tasks.py` | `OdsTaskSpec("ODS_SETTLEMENT_TICKET", ...)``OdsSettlementTicketTask` 类、`ENABLED_ODS_CODES` 中的条目、`ODS_TASK_CLASSES` 覆盖 |
| ETL 校验 | `tasks/verification/dwd_verifier.py` | `settlement_ticket_details` 主键映射条目 |
| ETL 校验 | `tasks/verification/ods_verifier.py` | 相关注释和特殊处理逻辑 |
| ETL 手动导入 | `tasks/utility/manual_ingest_task.py` | `settlement_ticket_details` 的表映射和配置 |
| JSON 存储 | `utils/json_store.py` | `/order/getordersettleticketnew` 的路径映射 |
| ODS 间隙检查 | `scripts/check/check_ods_gaps.py` | `_check_settlement_tickets` 函数及调用 |
| 黑盒调试 | `scripts/debug/debug_blackbox.py` | `ODS_SETTLEMENT_TICKET` 跳过逻辑 |
| DDL | `db/etl_feiqiu/schemas/ods.sql``schema_ODS_doc.sql` | `settlement_ticket_details` 建表语句和注释 |
| 种子数据 | `db/etl_feiqiu/seeds/seed_ods_tasks.sql` | `ODS_SETTLEMENT_TICKET` 条目 |
| 索引检查 | `scripts/ops/check_ods_latest_indexes.py` | `idx_ods_settlement_ticket_details_latest` |
| 分析脚本 | `scripts/ops/gen_full_dataflow_doc.py` | ODS spec 条目和特殊跳过逻辑 |
| 分析脚本 | `scripts/ops/gen_field_review_doc.py` | 第 12 章 settlement_ticket_details 配置 |
| 分析脚本 | `scripts/ops/gen_api_field_mapping.py` | 表名列表中的条目 |
| 分析脚本 | `scripts/ops/field_audit.py` | 排查配置和特殊处理 |
| 分析脚本 | `scripts/ops/export_dwd_field_review.py` | 字段列表配置 |
| 分析脚本 | `scripts/ops/dataflow_analyzer.py` | ODS spec 条目和跳过逻辑 |
| 文档 | `docs/database/etl_feiqiu_schema_migration.md` | 索引条目 |
| ETL 文档 | `apps/etl/connectors/feiqiu/docs/etl_tasks/` | 任务表格条目 |
| 单元测试 | `tests/unit/test_ods_tasks.py` | `test_ods_settlement_ticket_by_payment_relate_ids` |
#### 迁移脚本
```sql
-- 移除 settlement_ticket_details 表和索引
DROP INDEX IF EXISTS ods.idx_ods_settlement_ticket_details_latest;
DROP TABLE IF EXISTS ods.settlement_ticket_details;
-- 移除 meta.ods_task_registry 中的任务注册
DELETE FROM meta.ods_task_registry WHERE task_code = 'ODS_SETTLEMENT_TICKET';
```
#### 注意事项
- `export/` 下的报告文件(`field_audit_report.md``dataflow_api_ods_dwd.md` 等)为历史产物,不需要手动清理,下次重新生成时自然不再包含
- `docs/audit/` 下的审计日志为历史记录,保留不动
- `tmp/` 下的临时文件不需要处理
## 正确性属性
*正确性属性是一种在系统所有合法执行中都应成立的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。*
### Property 1FACT_MAPPINGS 字段映射正确性
*对于任意* ODS 表行和任意已配置的 `FACT_MAPPINGS` 条目 `(dwd_col, ods_expr, cast_type)`,当 DWD 加载任务执行后DWD 目标行中 `dwd_col` 列的值应等于从 ODS 行中按 `ods_expr` 提取并按 `cast_type` 转换后的值。
**Validates: Requirements 1.1, 1.2, 2.1, 3.1, 4.1, 5.1, 6.1, 6.2, 7.2, 8.2, 9.1, 10.3, 11.1**
### Property 2FACT_MAPPINGS 引用完整性
*对于任意* `FACT_MAPPINGS` 中的映射条目,其 DWD 目标列名必须存在于对应 DWD 表的列定义中,其 ODS 源表达式引用的列名必须存在于对应 ODS 表的列定义中(或为合法的 SQL 表达式)。
**Validates: Requirements 6.3**
### Property 3TABLE_MAP 覆盖完整性
*对于任意*`TABLE_MAP` 中注册的 DWD 表,该表的所有非 SCD2 列要么在 `FACT_MAPPINGS` 中有显式映射,要么在对应 ODS 表中存在同名列(自动映射)。
**Validates: Requirements 7.2, 8.2**
### Property 4映射错误修正后数据一致性
*对于任意* 已修正映射的字段assistant_service_records.site_assistant_id、store_goods_sales_records.discount_price、store_goods_master.batch_stock_qty、store_goods_master.provisional_total_cost修正后 DWD 目标列的值应等于从正确的 ODS 源列提取的值,而非修正前的错误源列。
**Validates: Requirements 2.1, 4.1, 10.3**
### Property 5ETL 参数解析与 CLI 命令构建正确性
*对于任意* 合法的 ETL 执行参数组合门店列表、数据源模式、校验模式、时间范围、窗口切分、force-full 标志、任务选择Backend 构建的 CLI 命令字符串应包含所有指定参数,且参数值与输入一致。
**Validates: Requirements 14.1, 14.2**
### Property 6数据一致性检查正确性
*对于任意* ODS 行和对应的 DWD 行,黑盒测试检查器应能正确识别:(a) ODS 中存在但 DWD 中缺失的字段,(b) ODS 与 DWD 之间值不一致的字段。
**Validates: Requirements 16.2, 16.3**
### Property 7计时器记录完整性
*对于任意* ETL 步骤序列,计时器输出应包含每个步骤的名称、开始时间、结束时间和耗时,且耗时等于结束时间减去开始时间。
**Validates: Requirements 15.2**
### Property 8DWS 库存汇总粒度聚合正确性
*对于任意* DWD 库存汇总数据集和任意汇总粒度(日/周/月DWS 汇总任务的 transform 输出应满足:(a) 每条记录的 `stat_period` 与任务粒度一致,(b) 同一 `(site_id, stat_date, site_goods_id)` 组合不重复,(c) 日度汇总的记录数不少于周度和月度汇总的记录数。
**Validates: Requirements 12.2, 12.3, 12.4, 12.5, 12.6**
## 错误处理
### 字段补全错误处理
| 场景 | 处理方式 |
|------|---------|
| DDL 迁移失败 | 回滚事务,记录错误日志,不影响其他表 |
| ODS 列不存在 | 跳过该映射条目,记录 WARNING 日志 |
| 类型转换失败 | 使用 NULLIF + CAST 兜底,转换失败写入 NULL |
| 新建 DWD 表主键冲突 | 使用 ON CONFLICT DO UPDATE 策略 |
### DWS 库存汇总错误处理
| 场景 | 处理方式 |
|------|---------|
| DWD 源表无数据 | 跳过汇总,记录 WARNING 日志 |
| 跨周/跨月边界数据不完整 | 按已有数据汇总,不补零 |
| upsert 主键冲突 | 使用 ON CONFLICT DO UPDATE 更新已有记录 |
| DWD 表尚未创建(前置依赖未完成) | 抛出明确错误,提示需先完成需求 7 |
### 前后端联调错误处理
| 场景 | 处理方式 |
|------|---------|
| 参数校验失败 | 返回 422 状态码,附带详细错误信息 |
| ETL 子进程超时 | 设置超时阈值,超时后终止进程并返回错误 |
| WebSocket 断连 | 前端自动重连,后端缓存最近日志 |
| 黑盒测试发现不一致 | 记录差异明细到报告,不中断流程 |
## 测试策略
### 属性测试
使用 `hypothesis` 库进行属性测试,每个属性至少运行 100 次迭代。
- **Property 1-3**:通过 FakeDB 模拟 ODS/DWD 表结构,生成随机 ODS 行数据,验证 FACT_MAPPINGS 映射逻辑
- **Property 4**:对修正后的映射字段,验证 DWD 值来自正确的 ODS 源列
- **Property 5**:生成随机参数组合,验证 CLI 命令构建
- **Property 6**:生成随机 ODS/DWD 行对,验证一致性检查逻辑
- **Property 7**:生成随机步骤序列,验证计时器输出
- **Property 8**:生成随机 DWD 库存数据,验证日/周/月三个粒度的聚合逻辑正确性
测试标签格式:`Feature: dataflow-field-completion, Property N: {property_text}`
### 单元测试
- DDL 迁移脚本语法正确性SQL 解析)
- 各表 FACT_MAPPINGS 条目的具体映射值验证
- DWS 库存汇总任务的边界值测试(跨周/跨月数据、空数据集)
- 前端参数表单的边界值测试
- 计时器精度测试
### 集成测试
- 端到端 ETL 执行:从 API JSON 到 DWD 落库的完整流程
- 前后端联调:从 Admin Web 触发到 ETL 完成的完整流程
- 黑盒测试:全量数据一致性验证
### 测试工具
- ETL 单元测试使用 `tests/unit/task_test_utils.py` 提供的 FakeDB/FakeAPI
- 属性测试使用 `hypothesis`
- 后端测试使用 `pytest` + FastAPI TestClient