Files

465 lines
17 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 设计文档
## 概述
本设计覆盖 v8 联调中 4 个"临时止血"修复的深度方案,按优先级排列:
1. **需求 DP0**`DwdLoadTask.load()` 返回值格式规范化
2. **需求 C1P1**:会员生日字段 ETL 链路补齐
3. **需求 BP1**:多门店会员查询支持
4. **需求 AP2**:助教月度聚合按档位分段统计
5. **需求 C2P2**:助教手动补录会员生日
设计原则:
- 每个需求独立可部署,按优先级逐步实施
- DDL 变更通过迁移脚本执行,支持回滚
- 保持现有 ETL 架构BaseTask E/T/L 模板)不变
## 架构
整体架构不变,变更集中在以下层面:
```mermaid
graph TD
subgraph "需求 D: 返回值规范化"
D1[DwdLoadTask.load] -->|errors: int| D2[BaseTask._accumulate_counts]
D2 -->|sum| D3[FlowRunner._safe_int]
end
subgraph "需求 A: 档位分段统计"
A1[dws_assistant_daily_detail] -->|GROUP BY level_code| A2[AssistantMonthlyTask]
A2 -->|多行/档位| A3[dws_assistant_monthly_summary]
A3 --> A4[AssistantSalaryTask 分段计算]
end
subgraph "需求 B: 多门店会员查询"
B1[dwd.事实表] -->|member_id IN| B2[dim_member]
B2 --> B3[DWS 任务]
end
subgraph "需求 C: 生日字段"
C1[ODS payload] -->|birthday 提取| C2[dim_member.birthday]
C3[后端 API] -->|UPSERT| C4[zqyy_app.member_birthday_manual]
C2 --> C5[DWS 任务: COALESCE]
C4 -->|FDW 只读| C5
end
```
## 组件与接口
### 需求 D返回值格式规范化
**变更文件:**
- `apps/etl/connectors/feiqiu/tasks/dwd/dwd_load_task.py`
- `apps/etl/connectors/feiqiu/tasks/base_task.py`
**DwdLoadTask.load() 返回值变更:**
```python
# 变更前
return {"tables": summary, "errors": errors}
# errors: list[dict],如 [{"table": "dim_assistant_ex", "error": "..."}]
# 变更后
return {
"tables": summary,
"errors": len(errors), # int — 与其他任务一致
"error_details": errors, # list[dict] — 保留详情供日志使用
}
```
**BaseTask._accumulate_counts() 防御层增强:**
```python
@staticmethod
def _accumulate_counts(total: dict, current: dict) -> dict:
for key, value in (current or {}).items():
if isinstance(value, (int, float)):
total[key] = (total.get(key) or 0) + value
elif isinstance(value, list):
# 防御层list 类型转为 len() 累加
total[key] = (total.get(key) or 0) + len(value)
else:
total.setdefault(key, value)
return total
```
**FlowRunner._safe_int() 保留不变**,作为最终防御层。
### 需求 A助教月度聚合按档位分段统计
**DDL 变更:**
```sql
-- 迁移脚本:删除旧唯一约束,创建新约束
ALTER TABLE dws.dws_assistant_monthly_summary
DROP CONSTRAINT IF EXISTS uk_dws_assistant_monthly;
ALTER TABLE dws.dws_assistant_monthly_summary
ADD CONSTRAINT uk_dws_assistant_monthly
UNIQUE (site_id, assistant_id, stat_month, assistant_level_code);
```
**AssistantMonthlyTask._extract_daily_aggregates() 变更:**
```sql
-- 变更前GROUP BY assistant_id, DATE_TRUNC('month', stat_date)
-- 变更后:加入 assistant_level_code 分组
SELECT
assistant_id,
assistant_level_code,
assistant_level_name,
-- nickname 取时间最后一条
(ARRAY_AGG(assistant_nickname ORDER BY stat_date DESC))[1] AS assistant_nickname,
DATE_TRUNC('month', stat_date)::DATE AS stat_month,
COUNT(DISTINCT stat_date) AS work_days,
SUM(total_service_count) AS total_service_count,
-- ... 其余聚合字段不变
FROM dws.dws_assistant_daily_detail
WHERE site_id = %s AND ({month_where})
GROUP BY assistant_id, assistant_level_code, assistant_level_name,
DATE_TRUNC('month', stat_date)
```
**AssistantSalaryTask 适配:**
- `_extract_monthly_summary()` 返回多行(同一助教不同档位)
- `transform()` 遍历每行分别计算工资,按档位使用对应的 `level_price``tier`
- 最终每个 `(assistant_id, stat_month, assistant_level_code)` 生成一条工资记录
**AssistantFinanceTask._extract_daily_revenue() nickname 修复:**
```sql
-- 变更前MAX(s.nickname) AS assistant_nickname
-- 变更后:
(ARRAY_AGG(s.nickname ORDER BY s.start_use_time DESC))[1] AS assistant_nickname
```
**AssistantCustomerTask._extract_service_pairs() nickname 修复:**
```sql
-- 变更前MAX(assistant_nickname) AS assistant_nickname
-- 变更后:
(ARRAY_AGG(assistant_nickname ORDER BY service_date DESC))[1] AS assistant_nickname
```
### 需求 B多门店会员查询支持
**变更模式:** 所有 `_extract_member_info(site_id)` 方法的 SQL 从:
```sql
WHERE register_site_id = %s AND scd2_is_current = 1
```
改为通过事实表反查:
```sql
WHERE member_id IN (
SELECT DISTINCT tenant_member_id
FROM dwd.{}
WHERE site_id = %s AND tenant_member_id IS NOT NULL AND tenant_member_id != 0
) AND scd2_is_current = 1
```
**受影响的任务和对应事实表:**
| 任务 | 方法 | 事实表 |
|------|------|--------|
| `member_visit_task.py` | `_extract_member_info` | `dwd_settlement_head` |
| `member_consumption_task.py` | `_extract_member_info` | `dwd_settlement_head` |
| `assistant_customer_task.py` | `_extract_member_info` | `dwd_assistant_service_log` |
**`dim_member_card_account` 的处理:**
- `member_consumption_task.py``finance_recharge_task.py` 中对 `dim_member_card_account` 的查询也使用 `register_site_id`
- 同样改为通过事实表的 `tenant_member_id` 反查:
```sql
WHERE tenant_member_id IN (
SELECT DISTINCT tenant_member_id
FROM dwd.{}
WHERE site_id = %s AND tenant_member_id IS NOT NULL AND tenant_member_id != 0
) AND scd2_is_current = 1
```
### 需求 C1会员生日字段 ETL 链路补齐
**DDL 变更:**
```sql
-- dim_member 加列
ALTER TABLE dwd.dim_member ADD COLUMN IF NOT EXISTS birthday DATE;
COMMENT ON COLUMN dwd.dim_member.birthday IS '会员生日来源ODS member_profiles payload 中的 birthday 字段';
```
**ODS → DWD 装载:**
- `DwdLoadTask` 的列映射是自动的(通过 `_get_columns()` 读取 DWD 表列名,与 ODS 列名匹配)
- ODS `member_profiles` 表没有 `birthday` 列,但 `payload` JSONB 中可能包含
- 需要在 `_build_column_mapping()``_fetch_source_rows()` 中增加从 `payload` 提取 `birthday` 的逻辑
- 方案:在 ODS 表也加 `birthday` 列(保持 ODS 与 API 字段对齐ODS 入库时从 JSON 提取
```sql
-- ODS member_profiles 加列
ALTER TABLE ods.member_profiles ADD COLUMN IF NOT EXISTS birthday DATE;
```
ODS 入库逻辑(`ods_tasks.py`)已有从 JSON 提取字段的机制,新增 `birthday` 字段映射即可。DwdLoadTask 的自动列匹配会自动将 `ods.member_profiles.birthday` 映射到 `dwd.dim_member.birthday`
**SCD2 处理:**
- `birthday` 作为 `dim_member` 的普通维度列SCD2 变化检测会自动包含
- 当 API 返回的 birthday 值变化时,会触发 SCD2 版本更新
**DWS 任务恢复 birthday 引用:**
- `member_visit_task.py``_extract_member_info()` SQL 中加入 `birthday`
- `member_consumption_task.py` 同理
### 需求 C2助教手动补录会员生日
**DDL`zqyy_app` / `test_zqyy_app` 业务库):**
```sql
CREATE TABLE IF NOT EXISTS member_birthday_manual (
id BIGSERIAL PRIMARY KEY,
member_id BIGINT NOT NULL,
birthday_value DATE NOT NULL,
recorded_by_assistant_id BIGINT,
recorded_by_name VARCHAR(50),
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
source VARCHAR(20) DEFAULT 'assistant',
CONSTRAINT uk_member_birthday_manual
UNIQUE (member_id, recorded_by_assistant_id)
);
COMMENT ON TABLE member_birthday_manual IS '助教手动补录的会员生日信息';
CREATE INDEX idx_mbd_member ON member_birthday_manual (member_id);
```
**FDW 映射ETL 库读取业务库数据):**
当前 FDW 方向是 `zqyy_app``etl_feiqiu`(业务库读 ETL 数据)。需求 C2 需要反向ETL DWS 任务读取业务库的手动补录表。
方案:在 `etl_feiqiu` 库中创建指向 `zqyy_app` 的 FDW 外部表:
```sql
-- 在 etl_feiqiu 中执行
CREATE SERVER IF NOT EXISTS zqyy_app_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', dbname 'zqyy_app', port '5432');
CREATE USER MAPPING IF NOT EXISTS FOR etl_user
SERVER zqyy_app_server
OPTIONS (user 'app_reader', password '***');
CREATE SCHEMA IF NOT EXISTS fdw_app;
CREATE FOREIGN TABLE fdw_app.member_birthday_manual (
id BIGINT,
member_id BIGINT,
birthday_value DATE,
recorded_by_assistant_id BIGINT,
recorded_by_name VARCHAR(50),
recorded_at TIMESTAMPTZ,
source VARCHAR(20)
) SERVER zqyy_app_server
OPTIONS (schema_name 'public', table_name 'member_birthday_manual');
```
**DWS 任务生日读取逻辑:**
```sql
-- 优先手动补录值,其次 API 值
COALESCE(
(SELECT birthday_value
FROM fdw_app.member_birthday_manual
WHERE member_id = m.member_id
ORDER BY recorded_at ASC -- 最早提交优先
LIMIT 1),
m.birthday
) AS member_birthday
```
**后端 API**
```python
# apps/backend/app/routers/member_birthday.py
@router.post("/member-birthday")
async def submit_member_birthday(
member_id: int,
birthday_value: date,
assistant_id: int,
assistant_name: str,
db=Depends(get_db),
):
"""助教提交会员生日UPSERT"""
sql = """
INSERT INTO member_birthday_manual
(member_id, birthday_value, recorded_by_assistant_id, recorded_by_name)
VALUES (%s, %s, %s, %s)
ON CONFLICT (member_id, recorded_by_assistant_id)
DO UPDATE SET
birthday_value = EXCLUDED.birthday_value,
recorded_at = NOW()
"""
db.execute(sql, (member_id, birthday_value, assistant_id, assistant_name))
return {"status": "ok"}
```
## 数据模型
### 变更汇总
| 库 | 表 | 变更类型 | 说明 |
|----|-----|---------|------|
| `etl_feiqiu` | `ods.member_profiles` | 加列 | `birthday DATE` |
| `etl_feiqiu` | `dwd.dim_member` | 加列 | `birthday DATE` |
| `etl_feiqiu` | `dws.dws_assistant_monthly_summary` | 改约束 | UK 加入 `assistant_level_code` |
| `zqyy_app` | `member_birthday_manual` | 新建表 | 手动补录生日 |
| `etl_feiqiu` | `fdw_app.member_birthday_manual` | 新建外部表 | FDW 映射 |
### 迁移脚本清单
按优先级排序,每个迁移脚本独立可执行:
1. `2026-02-22__D_dwd_load_return_format.sql` — 无 DDL纯代码变更
2. `2026-02-22__C1_dim_member_add_birthday.sql` — ODS/DWD 加列
3. `2026-02-22__B_no_ddl_code_only.sql` — 无 DDL纯代码变更
4. `2026-02-22__A_monthly_summary_uk_change.sql` — 唯一约束变更
5. `2026-02-22__C2_member_birthday_manual.sql` — 新建表 + FDW
## 正确性属性
*属性Property是系统在所有合法执行中都应保持为真的特征或行为——本质上是对"系统应该做什么"的形式化陈述。属性是连接人类可读规格说明与机器可验证正确性保证之间的桥梁。*
### Property 1: DwdLoadTask 返回值格式一致性
*对于任意* DwdLoadTask.load() 的执行结果,返回字典中 `errors` 键的值应为 `int` 类型,且等于 `error_details` 列表的长度。
**验证: 需求 1.1**
### Property 2: _accumulate_counts 类型安全累加
*对于任意* 包含 `int``float``list` 类型值的计数字典,`_accumulate_counts()` 应将 `int`/`float` 直接累加,将 `list` 转为 `len()` 后累加,且不抛出异常。
**验证: 需求 1.2**
### Property 3: 档位分段聚合正确性
*对于任意* 助教在同一月内存在 N 个不同 `assistant_level_code` 的日度数据,`_extract_daily_aggregates()` 应返回恰好 N 行记录,每行的业绩指标之和应等于该助教该月的总业绩。
**验证: 需求 2.1**
### Property 4: nickname 按时间倒序取值
*对于任意* 助教在聚合周期内有多条不同 nickname 的记录,聚合结果中的 nickname 应等于时间最晚的那条记录的 nickname。此属性适用于 AssistantMonthlyTask、AssistantFinanceTask、AssistantCustomerTask 三个任务。
**验证: 需求 2.3, 2.5, 2.6**
### Property 5: 工资按档位分段计算
*对于任意* 助教在同一月有多个档位的月度汇总记录AssistantSalaryTask 应为每个档位分别计算工资,每个档位使用对应的 `level_price``tier` 配置,且所有档位的工资记录数等于月度汇总的行数。
**验证: 需求 2.4**
### Property 6: 跨店会员可查
*对于任意* 在 A 店注册但在 B 店有消费记录的会员B 店的 DWS 任务通过事实表反查 `dim_member`应能获取到该会员的维度信息nickname、mobile 等)。
**验证: 需求 3.1, 3.2**
### Property 7: birthday ODS→DWD 装载正确性
*对于任意* ODS `member_profiles` 中包含 `birthday` 值的记录DwdLoadTask 装载后 `dwd.dim_member` 中对应记录的 `birthday` 应与 ODS 源值一致。
**验证: 需求 4.2**
### Property 8: birthday SCD2 变化检测
*对于任意* `dim_member` 现有记录,当 ODS 中同一会员的 `birthday` 值发生变化时SCD2 应关闭旧版本并创建新版本,新版本的 `birthday` 等于新值。
**验证: 需求 4.3**
### Property 9: 生日 UPSERT 幂等性
*对于任意* `(member_id, assistant_id)` 组合,连续两次提交不同的 `birthday_value``member_birthday_manual` 表中该组合应只有一条记录,且 `birthday_value` 等于最后一次提交的值。
**验证: 需求 5.2, 5.5**
### Property 10: 手动补录优先于 API 来源
*对于任意* 同时在 `dim_member.birthday``member_birthday_manual` 中有值的会员DWS 任务输出的 `member_birthday` 应等于手动补录表中的值。
**验证: 需求 5.4**
### Property 11: SCD2 更新不影响手动补录表
*对于任意*`member_birthday_manual` 中有记录的会员,执行 DwdLoadTask SCD2 更新 `dim_member.birthday` 后,`member_birthday_manual` 中的记录应保持不变。
**验证: 需求 5.6**
## 错误处理
### 需求 D返回值格式
- `DwdLoadTask.load()` 中单表装载失败时,错误信息追加到 `error_details` 列表,`errors` 计数递增
- `_accumulate_counts()` 遇到未知类型时使用 `setdefault` 保留原值(现有行为不变)
- `_safe_int()` 遇到非 int/list 类型时返回 0现有行为不变
### 需求 A档位分段
- 助教在某月无任何服务记录时,不生成月度汇总行(现有行为不变)
- `assistant_level_code` 为 NULL 时作为独立分组处理NULL 视为一个档位)
- 唯一约束变更后需要清理旧数据DELETE + 重新计算当月数据)
### 需求 B多门店查询
- 事实表中无该门店消费记录时,`_extract_member_info()` 返回空字典(现有行为不变)
- 子查询返回空集时,`WHERE member_id IN (空集)` 等价于 `WHERE FALSE`,不会报错
### 需求 C1生日字段
- ODS 中 `birthday` 为 NULL 或空字符串时DWD 中存为 NULL
- 无效日期格式时DwdLoadTask 的现有类型转换逻辑会将其置为 NULL
### 需求 C2手动补录
- `member_id` 不存在于 `dim_member` 时,仍允许提交(助教可能先于 ETL 发现新客户)
- `birthday_value` 格式校验由后端 API 的 Pydantic schema 处理
- FDW 连接失败时DWS 任务应 catch 异常并降级为仅使用 `dim_member.birthday`
## 测试策略
### 测试框架
- 属性测试:`hypothesis`Python每个属性测试最少 100 次迭代
- 单元测试:`pytest`
- 测试工具:`apps/etl/connectors/feiqiu/tests/unit/task_test_utils.py` 提供 FakeDB/FakeAPI
### 属性测试
每个正确性属性对应一个 hypothesis 属性测试,标注格式:
```python
# Feature: etl-aggregation-fix, Property N: {property_text}
@given(...)
def test_property_N_xxx(data):
...
```
属性测试放置位置:
- 需求 D 相关Property 1-2`apps/etl/connectors/feiqiu/tests/unit/test_return_format_properties.py`
- 需求 A 相关Property 3-5`apps/etl/connectors/feiqiu/tests/unit/test_monthly_aggregation_properties.py`
- 需求 B 相关Property 6`apps/etl/connectors/feiqiu/tests/unit/test_multi_store_properties.py`
- 需求 C 相关Property 7-11`apps/etl/connectors/feiqiu/tests/unit/test_birthday_properties.py`
### 单元测试
单元测试覆盖具体示例和边界情况:
- DDL 结构验证(唯一约束、列存在性)
- 空数据 / NULL 值边界
- 迁移脚本的回滚验证
### 测试环境
- 数据库:`test_etl_feiqiu` / `test_zqyy_app`(通过 `TEST_DB_DSN` 环境变量)
- 纯单元测试使用 FakeDB/FakeAPI不涉及真实数据库连接
- ETL 测试 cwd`apps/etl/connectors/feiqiu/`
- 后端测试 cwd`apps/backend/`