Files

498 lines
20 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 设计文档ETL DWS/Flow 重构
## 概述
本设计覆盖飞球 ETL 连接器的四阶段重构:
1. **BaseDwsTask 模板方法重构**:在基类中提供默认 extract()/load(),子类通过声明 `DATE_COL` + 实现 `_do_extract()` 即可完成大部分工作;提取公共辅助方法到 `dws_helpers.py`;财务任务共享提取层;合并 MV 刷新 + 数据清理任务MemberIndexBaseTask 模板方法。
2. **--layers CLI 参数**:新增 `--layers ODS,DWD,DWS,INDEX` 自由组合参数,保留 `--pipeline` 作为快捷别名;去掉硬编码回退,统一走 `TaskRegistry.get_tasks_by_layer()`
3. **任务依赖声明**:在 TaskMeta 中增加 `depends_on` 字段,`_resolve_tasks()` 执行拓扑排序。
4. **关键词重命名**`pipeline → flow`类名、变量名、CLI 参数、日志);`pipelines → connectors`(目录路径)。
执行顺序严格按 1→2→3→4→收尾每阶段完成后运行回归测试。
## 架构
### 当前架构
```
BaseTask (tasks/base_task.py)
└── BaseDwsTask (tasks/dws/base_dws_task.py)
├── 15 个 DWS 子类(各自实现 extract/transform/load
└── BaseIndexTask (tasks/dws/index/base_index_task.py)
└── MemberIndexBaseTask (tasks/dws/index/member_index_base.py)
├── WinbackIndexTask
└── NewconvIndexTask
PipelineRunner (orchestration/pipeline_runner.py)
├── PIPELINE_LAYERS: 7 种固定 Flow 定义
└── _resolve_tasks(): 层→任务映射(含硬编码回退)
TaskRegistry (orchestration/task_registry.py)
└── TaskMeta: task_class, requires_db_config, layer, task_type
```
### 目标架构
```
BaseTask (tasks/base_task.py) [不变]
└── BaseDwsTask (tasks/dws/base_dws_task.py) [新增默认 extract/load]
├── DWS 子类(仅声明 DATE_COL + 实现 _do_extract/transform
├── FinanceBaseTask (tasks/dws/finance_base_task.py) [新增]
│ ├── FinanceDailyTask
│ ├── FinanceRechargeTask
│ ├── FinanceIncomeStructureTask
│ └── FinanceDiscountDetailTask
├── DwsMaintenanceTask (tasks/dws/maintenance_task.py) [合并]
└── BaseIndexTask
└── MemberIndexBaseTask [新增模板 execute]
├── WinbackIndexTask仅实现 _calculate_scores/_save_results
└── NewconvIndexTask
FlowRunner (orchestration/flow_runner.py) [重命名]
├── FLOW_LAYERS: 保留快捷别名
└── _resolve_tasks(): 拓扑排序 + 纯 Registry 解析
TaskRegistry (orchestration/task_registry.py)
└── TaskMeta: + depends_on: list[str]
dws_helpers.py (tasks/dws/dws_helpers.py) [新增]
└── mask_mobile(), calc_days_since(), parse_id_list() 等
目录: apps/etl/connectors/feiqiu/ [重命名]
```
### 变更影响范围
```mermaid
graph TD
A[BaseDwsTask 重构] --> B[15 个 DWS 子类简化]
A --> C[dws_helpers.py 提取]
A --> D[FinanceBaseTask 提取]
A --> E[DwsMaintenanceTask 合并]
A --> F[MemberIndexBaseTask 模板]
G[--layers 参数] --> H[CLI main.py]
G --> I[PipelineRunner._resolve_tasks]
G --> J[去掉硬编码回退]
K[任务依赖] --> L[TaskMeta.depends_on]
K --> M[拓扑排序]
N[关键词重命名] --> O[PipelineRunner → FlowRunner]
N --> P[pipelines → connectors 路径]
N --> Q[所有文档更新]
```
## 组件与接口
### 组件 1BaseDwsTask 默认模板方法
```python
class BaseDwsTask(BaseTask):
DATE_COL: str | None = None # 子类声明日期列名
def _do_extract(self, context: TaskContext) -> list[dict]:
"""子类实现:返回从 DWD 提取的原始行列表。"""
raise NotImplementedError
def extract(self, context: TaskContext) -> dict:
"""默认实现:调用 _do_extract 并包装为标准字典。
子类可覆盖以自定义提取逻辑。
"""
rows = self._do_extract(context)
return {
"rows": rows,
"start_date": context.window_start.date(),
"end_date": context.window_end.date(),
"site_id": context.store_id,
}
def load(self, transformed, context: TaskContext) -> dict:
"""默认实现delete-before-insert 幂等写入。
子类可覆盖以自定义加载逻辑。
"""
if not transformed:
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
date_col = self.DATE_COL or "stat_date"
deleted = self.delete_existing_data(context, date_col=date_col)
inserted = self.bulk_insert(transformed)
return {
"counts": {"fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0},
"extra": {"deleted": deleted},
}
```
**设计决策**
- `_do_extract()` 而非直接修改 `extract()` 签名,是为了保持向后兼容——已覆盖 `extract()` 的子类无需改动。
- `DATE_COL = None` 作为哨兵值,未声明时 load() 回退到 `"stat_date"` 默认值。
- 子类迁移是渐进式的:先在基类添加默认实现,再逐个子类迁移。
### 组件 2dws_helpers.py 公共辅助模块
```python
# tasks/dws/dws_helpers.py
def mask_mobile(phone: str | None) -> str | None:
"""手机号脱敏138****1234"""
def calc_days_since(target_date: date | None, base_date: date | None = None) -> int | None:
"""计算距今天数"""
def parse_id_list(value: Any) -> set[int]:
"""解析逗号分隔的 ID 列表字符串为 int 集合"""
def safe_division(numerator, denominator, default=Decimal("0")) -> Decimal:
"""安全除法,分母为零时返回默认值"""
```
**设计决策**:使用独立模块而非 Mixin因为这些是纯函数不依赖实例状态。
### 组件 3FinanceBaseTask 共享提取层
```python
class FinanceBaseTask(BaseDwsTask):
"""财务任务共享基类,提供公共数据提取方法。"""
def _extract_settlement_summary(self, site_id, start_date, end_date) -> list[dict]:
"""结算汇总提取(共享 SQL"""
def _extract_recharge_summary(self, site_id, start_date, end_date) -> list[dict]:
"""充值汇总提取"""
def _extract_groupbuy_summary(self, site_id, start_date, end_date) -> list[dict]:
"""团购汇总提取"""
def _extract_platform_summary(self, site_id, start_date, end_date) -> list[dict]:
"""平台结算提取"""
```
**设计决策**使用继承FinanceBaseTask而非 Mixin因为财务任务的提取方法需要访问 `self.db``self.config`,且财务任务形成清晰的子类族。
### 组件 4DwsMaintenanceTask 合并任务
```python
class DwsMaintenanceTask(BaseDwsTask):
"""合并 MV 刷新 + 数据清理为单一维护任务。"""
def get_task_code(self) -> str:
return "DWS_MAINTENANCE"
def extract(self, context): ...
def transform(self, extracted, context): ...
def load(self, transformed, context) -> dict:
stats = {"refreshed": 0, "cleaned": 0}
if self._is_mv_enabled():
stats["refreshed"] = self._refresh_all_views()
if self._is_retention_enabled():
stats["cleaned"] = self._cleanup_all_tables(context)
return {"counts": stats}
```
**设计决策**:合并后的任务内部复用原 BaseMvRefreshTask 和 DwsRetentionCleanupTask 的核心逻辑,但作为单一任务注册和调度。
### 组件 5MemberIndexBaseTask 模板方法
```python
class MemberIndexBaseTask(BaseIndexTask):
def execute(self, cursor_data=None) -> dict:
context = self._build_context(cursor_data)
site_id = self._get_site_id(context)
tenant_id = self._get_tenant_id()
params = self._load_params()
activities = self._build_member_activity(site_id, tenant_id, params)
raw_scores = self._calculate_scores(activities, params, site_id, tenant_id)
normalized = self.batch_normalize_to_display(raw_scores, ...)
result = self._save_results(normalized, site_id, tenant_id, context)
return result
def _calculate_scores(self, activities, params, site_id, tenant_id) -> dict:
raise NotImplementedError
def _save_results(self, normalized, site_id, tenant_id, context) -> dict:
raise NotImplementedError
```
### 组件 6TaskMeta 依赖声明与拓扑排序
```python
@dataclass
class TaskMeta:
task_class: type
requires_db_config: bool = True
layer: str | None = None
task_type: str = "etl"
depends_on: list[str] = field(default_factory=list) # 新增
```
拓扑排序算法Kahn's algorithm
```python
def topological_sort(task_codes: list[str], registry: TaskRegistry) -> list[str]:
"""对任务列表执行拓扑排序。
- 仅对当前执行列表内的任务排序
- depends_on 中引用的任务不在列表内时记录警告
- 检测循环依赖并抛出 ValueError
"""
in_degree = {code: 0 for code in task_codes}
graph = {code: [] for code in task_codes}
task_set = set(task_codes)
for code in task_codes:
meta = registry.get_metadata(code)
if meta and meta.depends_on:
for dep in meta.depends_on:
if dep in task_set:
graph[dep].append(code)
in_degree[code] += 1
else:
logger.warning("任务 %s 依赖 %s,但后者不在当前执行列表中", code, dep)
queue = deque(code for code in task_codes if in_degree[code] == 0)
result = []
while queue:
node = queue.popleft()
result.append(node)
for neighbor in graph[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(result) != len(task_codes):
cycle_tasks = [c for c in task_codes if c not in result]
raise ValueError(f"检测到循环依赖: {cycle_tasks}")
return result
```
### 组件 7--layers CLI 参数
```python
# cli/main.py 新增参数
parser.add_argument(
"--layers",
help="ETL 层自由组合逗号分隔ODS,DWD,DWS,INDEX",
)
# 互斥校验
if args.layers and args.pipeline:
parser.error("--layers 和 --pipeline/--flow 互斥,请只指定其中一个")
# 层解析
VALID_LAYERS = {"ODS", "DWD", "DWS", "INDEX"}
def parse_layers(raw: str) -> list[str]:
layers = [l.strip().upper() for l in raw.split(",")]
invalid = set(layers) - VALID_LAYERS
if invalid:
raise ValueError(f"无效的层名: {invalid}")
return layers
```
### 组件 8FlowRunner 重命名
重命名映射:
| 原名 | 新名 | 文件 |
|------|------|------|
| `PipelineRunner` | `FlowRunner` | `orchestration/flow_runner.py` |
| `PIPELINE_LAYERS` | `FLOW_LAYERS` | 同上 |
| `pipeline_runner.py` | `flow_runner.py` | 文件名 |
| `--pipeline` | `--flow`(保留 `--pipeline` 弃用别名) | `cli/main.py` |
| 日志中 "Pipeline" / "Flow" | 统一为 "Flow" | 全局 |
### 组件 9路径重命名 pipelines → connectors
```
apps/etl/pipelines/feiqiu/ → apps/etl/connectors/feiqiu/
```
影响范围:
- `pyproject.toml` workspace 成员声明
- 所有 `from pipelines.feiqiu...` 导入ETL 内部使用相对导入,影响较小)
- 脚本中的路径引用(`scripts/``run_etl.bat``run_etl.sh`
- 文档中的路径引用
- `.env` 中的路径配置
- CI/CD 配置(如有)
## 数据模型
### TaskMeta 扩展
```python
@dataclass
class TaskMeta:
task_class: type
requires_db_config: bool = True
layer: str | None = None
task_type: str = "etl"
depends_on: list[str] = field(default_factory=list) # 新增:依赖的任务代码列表
```
### 已知任务依赖关系
| 任务 | 依赖 | 说明 |
|------|------|------|
| `DWS_ASSISTANT_FINANCE` | `DWS_ASSISTANT_SALARY` | 财务分析需要工资计算结果 |
| `DWS_ASSISTANT_MONTHLY` | `DWS_ASSISTANT_DAILY` | 月度汇总基于日度明细 |
| `DWS_MAINTENANCE` | 所有其他 DWS 任务 | MV 刷新和清理应在数据写入后执行 |
| `DWS_WINBACK_INDEX` | `DWS_MEMBER_VISIT`, `DWS_MEMBER_CONSUMPTION` | 指数计算依赖会员行为数据 |
| `DWS_NEWCONV_INDEX` | `DWS_MEMBER_VISIT`, `DWS_MEMBER_CONSUMPTION` | 同上 |
| `DWS_RELATION_INDEX` | `DWS_ASSISTANT_DAILY` | 关系指数依赖助教服务记录 |
### DWS 子类 DATE_COL 映射
| 任务类 | DATE_COL | 目标表 |
|--------|----------|--------|
| AssistantDailyTask | `stat_date` | `dws_assistant_daily_detail` |
| AssistantMonthlyTask | `stat_month` | `dws_assistant_monthly_summary` |
| AssistantCustomerTask | `stat_date` | `dws_assistant_customer_stats` |
| AssistantSalaryTask | `salary_month` | `dws_assistant_salary_calc` |
| AssistantFinanceTask | `stat_date` | `dws_assistant_finance_analysis` |
| MemberConsumptionTask | `stat_date` | `dws_member_consumption_summary` |
| MemberVisitTask | `visit_date` | `dws_member_visit_detail` |
| FinanceDailyTask | `stat_date` | `dws_finance_daily_summary` |
| FinanceRechargeTask | `stat_date` | `dws_finance_recharge_summary` |
| FinanceIncomeStructureTask | `stat_date` | `dws_finance_income_structure` |
| FinanceDiscountDetailTask | `stat_date` | `dws_finance_discount_detail` |
## 正确性属性
*正确性属性是系统在所有合法执行中都应保持为真的特征或行为——本质上是关于系统应该做什么的形式化陈述。属性是人类可读规格说明与机器可验证正确性保证之间的桥梁。*
### Property 1默认 extract() 返回标准结构
*对于任意* 声明了 DATE_COL 且未覆盖 extract() 的 BaseDwsTask 子类,以及任意合法的 TaskContext调用 extract(context) 应返回包含 "rows"、"start_date"、"end_date"、"site_id" 键的字典,且 "rows" 的值等于 _do_extract(context) 的返回值。
**Validates: Requirements 1.1, 1.4**
### Property 2默认 load() 幂等写入与标准统计
*对于任意* 非空的 transformed 列表和任意合法的 TaskContextBaseDwsTask 默认 load() 应返回包含 "counts" 键的字典,其中 "counts" 包含 "fetched"、"inserted"、"updated"、"skipped"、"errors" 五个整数键,且 "fetched" 等于 len(transformed)。对于空的 transformed 列表,所有计数应为 0。
**Validates: Requirements 1.2, 1.5**
### Property 3dws_helpers 函数等价性
*对于任意* 合法输入dws_helpers 模块中的 mask_mobile()、calc_days_since()、parse_id_list() 函数应产生与原子类内联实现完全相同的输出。具体地:
- *对于任意* 11 位数字字符串mask_mobile() 应返回中间 4 位被 `****` 替换的字符串
- *对于任意* 两个 date 对象target_date, base_datecalc_days_since() 应返回 (base_date - target_date).days
- *对于任意* 包含逗号分隔整数的字符串parse_id_list() 应返回对应的 int 集合
**Validates: Requirements 2.3**
### Property 4DwsMaintenanceTask 配置控制
*对于任意* mv_enabled 和 retention_enabled 的布尔组合DwsMaintenanceTask.load() 应:
- 当 mv_enabled=True 时执行物化视图刷新,否则跳过
- 当 retention_enabled=True 时执行数据清理,否则跳过
- 返回的统计字典始终包含 "refreshed" 和 "cleaned" 键
**Validates: Requirements 4.3, 4.4**
### Property 5--layers 解析正确性
*对于任意* {ODS, DWD, DWS, INDEX} 的非空子集以逗号分隔拼接为字符串后parse_layers() 应返回包含且仅包含该子集元素的列表且元素均为大写。对于包含无效层名的字符串parse_layers() 应抛出 ValueError。
**Validates: Requirements 6.1, 6.2**
### Property 6配置优先级——配置值优先于 Registry
*对于任意* 层名和任意非空的配置任务列表_resolve_tasks() 应返回配置中指定的任务列表,而非 TaskRegistry.get_tasks_by_layer() 的结果。当配置为空时,应返回 Registry 的结果。
**Validates: Requirements 7.2**
### Property 7拓扑排序正确性
*对于任意* 有向无环图DAG表示的任务依赖关系topological_sort() 返回的列表中,每个任务的所有依赖(在当前列表内的)都应排在该任务之前。
**Validates: Requirements 8.3**
### Property 8循环依赖检测
*对于任意* 包含至少一个环的有向图表示的任务依赖关系topological_sort() 应抛出 ValueError且错误信息中包含环中涉及的任务代码。
**Validates: Requirements 8.4**
## 错误处理
### BaseDwsTask 模板方法
| 场景 | 处理方式 |
|------|----------|
| 子类未实现 _do_extract() 且未覆盖 extract() | 抛出 NotImplementedError |
| 子类未声明 DATE_COL | load() 回退到 "stat_date" 默认值 |
| _do_extract() 返回 None | extract() 将 rows 设为空列表 |
| bulk_insert() 失败 | 异常向上传播,由 BaseTask.execute() 的 try/except 捕获并 rollback |
### 拓扑排序
| 场景 | 处理方式 |
|------|----------|
| 循环依赖 | 抛出 ValueError包含循环涉及的任务列表 |
| 依赖任务不在执行列表中 | 记录 WARNING 日志,继续执行 |
| 空任务列表 | 返回空列表 |
### CLI 参数
| 场景 | 处理方式 |
|------|----------|
| --layers 和 --pipeline/--flow 同时指定 | argparse 报错退出 |
| --layers 包含无效层名 | 抛出 ValueError提示有效层名 |
| --pipeline 使用已弃用参数 | 输出 DeprecationWarning正常执行 |
### 路径重命名
| 场景 | 处理方式 |
|------|----------|
| 导入路径未更新 | ImportError需在重命名脚本中全量扫描 |
| 配置文件中的旧路径 | 启动时检查并输出警告 |
## 测试策略
### 测试框架
- 单元测试:`pytest`
- 属性测试:`hypothesis`Python 的属性测试库)
- 测试工具:`apps/etl/pipelines/feiqiu/tests/unit/task_test_utils.py` 提供 FakeDB/FakeAPI
### 属性测试配置
- 每个属性测试最少运行 100 次迭代
- 使用 `@settings(max_examples=100)` 配置
- 每个属性测试用注释标注对应的设计属性编号
- 标注格式:`# Feature: etl-dws-flow-refactor, Property N: <属性标题>`
### 双轨测试方法
**属性测试**(验证普遍性质):
- Property 1-2BaseDwsTask 默认模板方法的返回值结构和行为
- Property 3dws_helpers 函数等价性
- Property 4DwsMaintenanceTask 配置控制
- Property 5--layers 解析正确性
- Property 6配置优先级
- Property 7拓扑排序正确性
- Property 8循环依赖检测
**单元测试**(验证具体示例和边界条件):
- DwsMaintenanceTask 执行顺序(先刷新后清理)
- TaskRegistry 注册项替换(旧任务移除、新任务添加)
- --pipeline 快捷别名映射
- --layers 和 --pipeline 互斥报错
- --pipeline 弃用警告
- 空 Registry 返回空列表(边界条件)
- 依赖任务不在执行列表中的警告(边界条件)
- 路径重命名后的导入正确性
### 测试文件组织
| 测试文件 | 内容 |
|----------|------|
| `tests/unit/test_base_dws_template.py` | Property 1-2 + BaseDwsTask 模板方法单元测试 |
| `tests/unit/test_dws_helpers.py` | Property 3 + dws_helpers 函数单元测试 |
| `tests/unit/test_maintenance_task.py` | Property 4 + DwsMaintenanceTask 单元测试 |
| `tests/unit/test_layers_cli.py` | Property 5 + --layers CLI 参数单元测试 |
| `tests/unit/test_resolve_tasks.py` | Property 6 + _resolve_tasks 配置优先级单元测试 |
| `tests/unit/test_topological_sort.py` | Property 7-8 + 拓扑排序单元测试 |
| `tests/unit/test_flow_rename.py` | FlowRunner 重命名相关单元测试 |
| `tests/test_etl_refactor_properties.py` | Monorepo 级属性测试(根目录) |