Files

20 KiB
Raw Permalink Blame History

设计文档ETL DWS/Flow 重构

概述

本设计覆盖飞球 ETL 连接器的四阶段重构:

  1. BaseDwsTask 模板方法重构:在基类中提供默认 extract()/load(),子类通过声明 DATE_COL + 实现 _do_extract() 即可完成大部分工作;提取公共辅助方法到 dws_helpers.py;财务任务共享提取层;合并 MV 刷新 + 数据清理任务MemberIndexBaseTask 模板方法。
  2. --layers CLI 参数:新增 --layers ODS,DWD,DWS,INDEX 自由组合参数,保留 --pipeline 作为快捷别名;去掉硬编码回退,统一走 TaskRegistry.get_tasks_by_layer()
  3. 任务依赖声明:在 TaskMeta 中增加 depends_on 字段,_resolve_tasks() 执行拓扑排序。
  4. 关键词重命名pipeline → flow类名、变量名、CLI 参数、日志);pipelines → connectors(目录路径)。

执行顺序严格按 1→2→3→4→收尾每阶段完成后运行回归测试。

架构

当前架构

BaseTask (tasks/base_task.py)
  └── BaseDwsTask (tasks/dws/base_dws_task.py)
        ├── 15 个 DWS 子类(各自实现 extract/transform/load
        └── BaseIndexTask (tasks/dws/index/base_index_task.py)
              └── MemberIndexBaseTask (tasks/dws/index/member_index_base.py)
                    ├── WinbackIndexTask
                    └── NewconvIndexTask

PipelineRunner (orchestration/pipeline_runner.py)
  ├── PIPELINE_LAYERS: 7 种固定 Flow 定义
  └── _resolve_tasks(): 层→任务映射(含硬编码回退)

TaskRegistry (orchestration/task_registry.py)
  └── TaskMeta: task_class, requires_db_config, layer, task_type

目标架构

BaseTask (tasks/base_task.py)  [不变]
  └── BaseDwsTask (tasks/dws/base_dws_task.py)  [新增默认 extract/load]
        ├── DWS 子类(仅声明 DATE_COL + 实现 _do_extract/transform
        ├── FinanceBaseTask (tasks/dws/finance_base_task.py)  [新增]
        │     ├── FinanceDailyTask
        │     ├── FinanceRechargeTask
        │     ├── FinanceIncomeStructureTask
        │     └── FinanceDiscountDetailTask
        ├── DwsMaintenanceTask (tasks/dws/maintenance_task.py)  [合并]
        └── BaseIndexTask
              └── MemberIndexBaseTask  [新增模板 execute]
                    ├── WinbackIndexTask仅实现 _calculate_scores/_save_results
                    └── NewconvIndexTask

FlowRunner (orchestration/flow_runner.py)  [重命名]
  ├── FLOW_LAYERS: 保留快捷别名
  └── _resolve_tasks(): 拓扑排序 + 纯 Registry 解析

TaskRegistry (orchestration/task_registry.py)
  └── TaskMeta: + depends_on: list[str]

dws_helpers.py (tasks/dws/dws_helpers.py)  [新增]
  └── mask_mobile(), calc_days_since(), parse_id_list() 等

目录: apps/etl/connectors/feiqiu/  [重命名]

变更影响范围

graph TD
    A[BaseDwsTask 重构] --> B[15 个 DWS 子类简化]
    A --> C[dws_helpers.py 提取]
    A --> D[FinanceBaseTask 提取]
    A --> E[DwsMaintenanceTask 合并]
    A --> F[MemberIndexBaseTask 模板]
    G[--layers 参数] --> H[CLI main.py]
    G --> I[PipelineRunner._resolve_tasks]
    G --> J[去掉硬编码回退]
    K[任务依赖] --> L[TaskMeta.depends_on]
    K --> M[拓扑排序]
    N[关键词重命名] --> O[PipelineRunner → FlowRunner]
    N --> P[pipelines → connectors 路径]
    N --> Q[所有文档更新]

组件与接口

组件 1BaseDwsTask 默认模板方法

class BaseDwsTask(BaseTask):
    DATE_COL: str | None = None  # 子类声明日期列名

    def _do_extract(self, context: TaskContext) -> list[dict]:
        """子类实现:返回从 DWD 提取的原始行列表。"""
        raise NotImplementedError

    def extract(self, context: TaskContext) -> dict:
        """默认实现:调用 _do_extract 并包装为标准字典。
        子类可覆盖以自定义提取逻辑。
        """
        rows = self._do_extract(context)
        return {
            "rows": rows,
            "start_date": context.window_start.date(),
            "end_date": context.window_end.date(),
            "site_id": context.store_id,
        }

    def load(self, transformed, context: TaskContext) -> dict:
        """默认实现delete-before-insert 幂等写入。
        子类可覆盖以自定义加载逻辑。
        """
        if not transformed:
            return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
        date_col = self.DATE_COL or "stat_date"
        deleted = self.delete_existing_data(context, date_col=date_col)
        inserted = self.bulk_insert(transformed)
        return {
            "counts": {"fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0},
            "extra": {"deleted": deleted},
        }

设计决策

  • _do_extract() 而非直接修改 extract() 签名,是为了保持向后兼容——已覆盖 extract() 的子类无需改动。
  • DATE_COL = None 作为哨兵值,未声明时 load() 回退到 "stat_date" 默认值。
  • 子类迁移是渐进式的:先在基类添加默认实现,再逐个子类迁移。
  • 营业日切点:所有 stat_date / stat_month 等日期列的值为营业日,以 BUSINESS_DAY_START_HOUR(默认 08:00为分割点。08:00 前的记录归属前一天/前一月。月统计 = 当月1日 08:00 ~ 次月1日 08:00周统计 = 周一 08:00 ~ 次周一 08:00。

组件 2dws_helpers.py 公共辅助模块

# tasks/dws/dws_helpers.py

def mask_mobile(phone: str | None) -> str | None:
    """手机号脱敏138****1234"""

def calc_days_since(target_date: date | None, base_date: date | None = None) -> int | None:
    """计算距今天数"""

def parse_id_list(value: Any) -> set[int]:
    """解析逗号分隔的 ID 列表字符串为 int 集合"""

def safe_division(numerator, denominator, default=Decimal("0")) -> Decimal:
    """安全除法,分母为零时返回默认值"""

设计决策:使用独立模块而非 Mixin因为这些是纯函数不依赖实例状态。

组件 3FinanceBaseTask 共享提取层

class FinanceBaseTask(BaseDwsTask):
    """财务任务共享基类,提供公共数据提取方法。"""

    def _extract_settlement_summary(self, site_id, start_date, end_date) -> list[dict]:
        """结算汇总提取(共享 SQL"""

    def _extract_recharge_summary(self, site_id, start_date, end_date) -> list[dict]:
        """充值汇总提取"""

    def _extract_groupbuy_summary(self, site_id, start_date, end_date) -> list[dict]:
        """团购汇总提取"""

    def _extract_platform_summary(self, site_id, start_date, end_date) -> list[dict]:
        """平台结算提取"""

设计决策使用继承FinanceBaseTask而非 Mixin因为财务任务的提取方法需要访问 self.dbself.config,且财务任务形成清晰的子类族。

组件 4DwsMaintenanceTask 合并任务

class DwsMaintenanceTask(BaseDwsTask):
    """合并 MV 刷新 + 数据清理为单一维护任务。"""

    def get_task_code(self) -> str:
        return "DWS_MAINTENANCE"

    def extract(self, context): ...
    def transform(self, extracted, context): ...

    def load(self, transformed, context) -> dict:
        stats = {"refreshed": 0, "cleaned": 0}
        if self._is_mv_enabled():
            stats["refreshed"] = self._refresh_all_views()
        if self._is_retention_enabled():
            stats["cleaned"] = self._cleanup_all_tables(context)
        return {"counts": stats}

设计决策:合并后的任务内部复用原 BaseMvRefreshTask 和 DwsRetentionCleanupTask 的核心逻辑,但作为单一任务注册和调度。

组件 5MemberIndexBaseTask 模板方法

class MemberIndexBaseTask(BaseIndexTask):
    def execute(self, cursor_data=None) -> dict:
        context = self._build_context(cursor_data)
        site_id = self._get_site_id(context)
        tenant_id = self._get_tenant_id()
        params = self._load_params()
        activities = self._build_member_activity(site_id, tenant_id, params)
        raw_scores = self._calculate_scores(activities, params, site_id, tenant_id)
        normalized = self.batch_normalize_to_display(raw_scores, ...)
        result = self._save_results(normalized, site_id, tenant_id, context)
        return result

    def _calculate_scores(self, activities, params, site_id, tenant_id) -> dict:
        raise NotImplementedError

    def _save_results(self, normalized, site_id, tenant_id, context) -> dict:
        raise NotImplementedError

组件 6TaskMeta 依赖声明与拓扑排序

@dataclass
class TaskMeta:
    task_class: type
    requires_db_config: bool = True
    layer: str | None = None
    task_type: str = "etl"
    depends_on: list[str] = field(default_factory=list)  # 新增

拓扑排序算法Kahn's algorithm

def topological_sort(task_codes: list[str], registry: TaskRegistry) -> list[str]:
    """对任务列表执行拓扑排序。
    
    - 仅对当前执行列表内的任务排序
    - depends_on 中引用的任务不在列表内时记录警告
    - 检测循环依赖并抛出 ValueError
    """
    in_degree = {code: 0 for code in task_codes}
    graph = {code: [] for code in task_codes}
    task_set = set(task_codes)

    for code in task_codes:
        meta = registry.get_metadata(code)
        if meta and meta.depends_on:
            for dep in meta.depends_on:
                if dep in task_set:
                    graph[dep].append(code)
                    in_degree[code] += 1
                else:
                    logger.warning("任务 %s 依赖 %s,但后者不在当前执行列表中", code, dep)

    queue = deque(code for code in task_codes if in_degree[code] == 0)
    result = []
    while queue:
        node = queue.popleft()
        result.append(node)
        for neighbor in graph[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    if len(result) != len(task_codes):
        cycle_tasks = [c for c in task_codes if c not in result]
        raise ValueError(f"检测到循环依赖: {cycle_tasks}")

    return result

组件 7--layers CLI 参数

# cli/main.py 新增参数
parser.add_argument(
    "--layers",
    help="ETL 层自由组合逗号分隔ODS,DWD,DWS,INDEX",
)

# 互斥校验
if args.layers and args.pipeline:
    parser.error("--layers 和 --pipeline/--flow 互斥,请只指定其中一个")

# 层解析
VALID_LAYERS = {"ODS", "DWD", "DWS", "INDEX"}
def parse_layers(raw: str) -> list[str]:
    layers = [l.strip().upper() for l in raw.split(",")]
    invalid = set(layers) - VALID_LAYERS
    if invalid:
        raise ValueError(f"无效的层名: {invalid}")
    return layers

组件 8FlowRunner 重命名

重命名映射:

原名 新名 文件
PipelineRunner FlowRunner orchestration/flow_runner.py
PIPELINE_LAYERS FLOW_LAYERS 同上
pipeline_runner.py flow_runner.py 文件名
--pipeline --flow(保留 --pipeline 弃用别名) cli/main.py
日志中 "Pipeline" / "Flow" 统一为 "Flow" 全局

组件 9路径重命名 pipelines → connectors

apps/etl/pipelines/feiqiu/  →  apps/etl/connectors/feiqiu/

影响范围:

  • pyproject.toml workspace 成员声明
  • 所有 from pipelines.feiqiu... 导入ETL 内部使用相对导入,影响较小)
  • 脚本中的路径引用(scripts/run_etl.batrun_etl.sh
  • 文档中的路径引用
  • .env 中的路径配置
  • CI/CD 配置(如有)

数据模型

TaskMeta 扩展

@dataclass
class TaskMeta:
    task_class: type
    requires_db_config: bool = True
    layer: str | None = None
    task_type: str = "etl"
    depends_on: list[str] = field(default_factory=list)  # 新增:依赖的任务代码列表

已知任务依赖关系

任务 依赖 说明
DWS_ASSISTANT_FINANCE DWS_ASSISTANT_SALARY 财务分析需要工资计算结果
DWS_ASSISTANT_MONTHLY DWS_ASSISTANT_DAILY 月度汇总基于日度明细
DWS_MAINTENANCE 所有其他 DWS 任务 MV 刷新和清理应在数据写入后执行
DWS_WINBACK_INDEX DWS_MEMBER_VISIT, DWS_MEMBER_CONSUMPTION 指数计算依赖会员行为数据
DWS_NEWCONV_INDEX DWS_MEMBER_VISIT, DWS_MEMBER_CONSUMPTION 同上
DWS_RELATION_INDEX DWS_ASSISTANT_DAILY 关系指数依赖助教服务记录

DWS 子类 DATE_COL 映射

任务类 DATE_COL 目标表
AssistantDailyTask stat_date dws_assistant_daily_detail
AssistantMonthlyTask stat_month dws_assistant_monthly_summary
AssistantCustomerTask stat_date dws_assistant_customer_stats
AssistantSalaryTask salary_month dws_assistant_salary_calc
AssistantFinanceTask stat_date dws_assistant_finance_analysis
MemberConsumptionTask stat_date dws_member_consumption_summary
MemberVisitTask visit_date dws_member_visit_detail
FinanceDailyTask stat_date dws_finance_daily_summary
FinanceRechargeTask stat_date dws_finance_recharge_summary
FinanceIncomeStructureTask stat_date dws_finance_income_structure
FinanceDiscountDetailTask stat_date dws_finance_discount_detail

正确性属性

正确性属性是系统在所有合法执行中都应保持为真的特征或行为——本质上是关于系统应该做什么的形式化陈述。属性是人类可读规格说明与机器可验证正确性保证之间的桥梁。

Property 1默认 extract() 返回标准结构

对于任意 声明了 DATE_COL 且未覆盖 extract() 的 BaseDwsTask 子类,以及任意合法的 TaskContext调用 extract(context) 应返回包含 "rows"、"start_date"、"end_date"、"site_id" 键的字典,且 "rows" 的值等于 _do_extract(context) 的返回值。

Validates: Requirements 1.1, 1.4

Property 2默认 load() 幂等写入与标准统计

对于任意 非空的 transformed 列表和任意合法的 TaskContextBaseDwsTask 默认 load() 应返回包含 "counts" 键的字典,其中 "counts" 包含 "fetched"、"inserted"、"updated"、"skipped"、"errors" 五个整数键,且 "fetched" 等于 len(transformed)。对于空的 transformed 列表,所有计数应为 0。

Validates: Requirements 1.2, 1.5

Property 3dws_helpers 函数等价性

对于任意 合法输入dws_helpers 模块中的 mask_mobile()、calc_days_since()、parse_id_list() 函数应产生与原子类内联实现完全相同的输出。具体地:

  • 对于任意 11 位数字字符串mask_mobile() 应返回中间 4 位被 **** 替换的字符串
  • 对于任意 两个 date 对象target_date, base_datecalc_days_since() 应返回 (base_date - target_date).days
  • 对于任意 包含逗号分隔整数的字符串parse_id_list() 应返回对应的 int 集合

Validates: Requirements 2.3

Property 4DwsMaintenanceTask 配置控制

对于任意 mv_enabled 和 retention_enabled 的布尔组合DwsMaintenanceTask.load() 应:

  • 当 mv_enabled=True 时执行物化视图刷新,否则跳过
  • 当 retention_enabled=True 时执行数据清理,否则跳过
  • 返回的统计字典始终包含 "refreshed" 和 "cleaned" 键

Validates: Requirements 4.3, 4.4

Property 5--layers 解析正确性

对于任意 {ODS, DWD, DWS, INDEX} 的非空子集以逗号分隔拼接为字符串后parse_layers() 应返回包含且仅包含该子集元素的列表且元素均为大写。对于包含无效层名的字符串parse_layers() 应抛出 ValueError。

Validates: Requirements 6.1, 6.2

Property 6配置优先级——配置值优先于 Registry

对于任意 层名和任意非空的配置任务列表_resolve_tasks() 应返回配置中指定的任务列表,而非 TaskRegistry.get_tasks_by_layer() 的结果。当配置为空时,应返回 Registry 的结果。

Validates: Requirements 7.2

Property 7拓扑排序正确性

对于任意 有向无环图DAG表示的任务依赖关系topological_sort() 返回的列表中,每个任务的所有依赖(在当前列表内的)都应排在该任务之前。

Validates: Requirements 8.3

Property 8循环依赖检测

对于任意 包含至少一个环的有向图表示的任务依赖关系topological_sort() 应抛出 ValueError且错误信息中包含环中涉及的任务代码。

Validates: Requirements 8.4

错误处理

BaseDwsTask 模板方法

场景 处理方式
子类未实现 _do_extract() 且未覆盖 extract() 抛出 NotImplementedError
子类未声明 DATE_COL load() 回退到 "stat_date" 默认值
_do_extract() 返回 None extract() 将 rows 设为空列表
bulk_insert() 失败 异常向上传播,由 BaseTask.execute() 的 try/except 捕获并 rollback

拓扑排序

场景 处理方式
循环依赖 抛出 ValueError包含循环涉及的任务列表
依赖任务不在执行列表中 记录 WARNING 日志,继续执行
空任务列表 返回空列表

CLI 参数

场景 处理方式
--layers 和 --pipeline/--flow 同时指定 argparse 报错退出
--layers 包含无效层名 抛出 ValueError提示有效层名
--pipeline 使用已弃用参数 输出 DeprecationWarning正常执行

路径重命名

场景 处理方式
导入路径未更新 ImportError需在重命名脚本中全量扫描
配置文件中的旧路径 启动时检查并输出警告

测试策略

测试框架

  • 单元测试:pytest
  • 属性测试:hypothesisPython 的属性测试库)
  • 测试工具:apps/etl/pipelines/feiqiu/tests/unit/task_test_utils.py 提供 FakeDB/FakeAPI

属性测试配置

  • 每个属性测试最少运行 100 次迭代
  • 使用 @settings(max_examples=100) 配置
  • 每个属性测试用注释标注对应的设计属性编号
  • 标注格式:# Feature: etl-dws-flow-refactor, Property N: <属性标题>

双轨测试方法

属性测试(验证普遍性质):

  • Property 1-2BaseDwsTask 默认模板方法的返回值结构和行为
  • Property 3dws_helpers 函数等价性
  • Property 4DwsMaintenanceTask 配置控制
  • Property 5--layers 解析正确性
  • Property 6配置优先级
  • Property 7拓扑排序正确性
  • Property 8循环依赖检测

单元测试(验证具体示例和边界条件):

  • DwsMaintenanceTask 执行顺序(先刷新后清理)
  • TaskRegistry 注册项替换(旧任务移除、新任务添加)
  • --pipeline 快捷别名映射
  • --layers 和 --pipeline 互斥报错
  • --pipeline 弃用警告
  • 空 Registry 返回空列表(边界条件)
  • 依赖任务不在执行列表中的警告(边界条件)
  • 路径重命名后的导入正确性

测试文件组织

测试文件 内容
tests/unit/test_base_dws_template.py Property 1-2 + BaseDwsTask 模板方法单元测试
tests/unit/test_dws_helpers.py Property 3 + dws_helpers 函数单元测试
tests/unit/test_maintenance_task.py Property 4 + DwsMaintenanceTask 单元测试
tests/unit/test_layers_cli.py Property 5 + --layers CLI 参数单元测试
tests/unit/test_resolve_tasks.py Property 6 + _resolve_tasks 配置优先级单元测试
tests/unit/test_topological_sort.py Property 7-8 + 拓扑排序单元测试
tests/unit/test_flow_rename.py FlowRunner 重命名相关单元测试
tests/test_etl_refactor_properties.py Monorepo 级属性测试(根目录)