12 KiB
需求文档:ETL DWS/Flow 重构
简介
对 NeoZQYY Monorepo 的飞球 ETL 连接器进行大型重构,涵盖四个主要方向:
- BaseDwsTask 模板方法重构——消除 DWS 子类中的样板代码
--layersCLI 参数替代固定 pipeline 名称——提升用户体验- 任务依赖声明与拓扑排序——消除隐式依赖风险
- 关键词重命名(pipeline → flow、pipelines → connectors)——统一术语与路径
执行顺序严格按 1→2→3→4→收尾,每一步完成后进行回归测试。
术语表
- BaseDwsTask:DWS 层任务基类,位于
tasks/dws/base_dws_task.py,提供 DWD 数据读取、幂等写入、配置缓存等通用能力 - BaseIndexTask:INDEX 层指数算法基类,继承 BaseDwsTask,位于
tasks/dws/index/base_index_task.py - MemberIndexBaseTask:会员指数共享基类,继承 BaseIndexTask,位于
tasks/dws/index/member_index_base.py - TaskRegistry:任务注册表,维护 task_code → TaskMeta 映射,位于
orchestration/task_registry.py - TaskMeta:任务元数据数据类,包含 task_class、requires_db_config、layer、task_type 字段
- PipelineRunner:Flow 编排器,根据 Flow 定义执行多层 ETL 任务,位于
orchestration/pipeline_runner.py - TaskExecutor:单任务执行器,管理游标、运行记录和任务生命周期,位于
orchestration/task_executor.py - Flow:ETL 编排单元,定义一组按层顺序执行的任务集合(原名 pipeline)
- Layer:ETL 数据处理层级,包括 ODS、DWD、DWS、INDEX
- Connector:ETL 连接器,对接特定上游 SaaS 的数据抽取模块(原名 pipeline 目录)
- DATE_COL:DWS 子类声明的日期列名,用于 extract 和 delete_existing_data 的时间过滤
- TaskContext:运行期上下文数据类,包含 store_id、window_start/end、window_minutes、cursor
- 拓扑排序:根据任务间依赖关系确定执行顺序的算法,确保被依赖任务先于依赖方执行
- 幂等:同一操作执行多次与执行一次效果相同,本系统通过 delete-before-insert 实现
需求
需求 1:BaseDwsTask 默认 extract/load 模板方法
用户故事: 作为 ETL 开发者,我希望 BaseDwsTask 提供默认的 extract() 和 load() 实现,以便 DWS 子类只需声明 DATE_COL 并实现 _do_extract() 和 transform(),从而减少每个子类 20-30 行样板代码。
验收标准
- WHEN 一个 DWS 子类声明了 DATE_COL 类属性且未覆盖 extract(),THE BaseDwsTask SHALL 使用 DATE_COL 从 DWD 层按时间窗口提取数据并传递给 transform()
- WHEN 一个 DWS 子类声明了 DATE_COL 类属性且未覆盖 load(),THE BaseDwsTask SHALL 执行 delete_existing_data(date_col=DATE_COL) 后调用 bulk_insert(),并返回标准统计字典
- WHEN 一个 DWS 子类覆盖了 extract() 或 load(),THE BaseDwsTask SHALL 使用子类的覆盖实现而非默认实现
- WHEN 默认 extract() 执行时,THE BaseDwsTask SHALL 调用子类实现的 _do_extract(context) 方法获取原始数据
- THE BaseDwsTask 默认 load() SHALL 返回包含 fetched、inserted、updated、skipped、errors 键的统计字典
需求 2:DWS 公共辅助方法提取
用户故事: 作为 ETL 开发者,我希望将散落在多个 DWS 子类中的重复辅助方法提取到公共位置,以便消除代码重复并统一行为。
验收标准
- THE dws_helpers 模块 SHALL 提供 _mask_mobile()、_calc_days_since()、_parse_id_list() 等公共辅助函数
- WHEN 多个 DWS 子类使用相同的辅助逻辑时,THE 子类 SHALL 调用 dws_helpers 中的公共实现而非各自维护副本
- WHEN dws_helpers 中的辅助函数被调用时,THE 函数 SHALL 产生与原子类内联实现完全相同的输出结果
需求 3:财务任务共享提取层
用户故事: 作为 ETL 开发者,我希望财务类 DWS 任务(FinanceDailyTask、FinanceRechargeTask、FinanceIncomeStructureTask、FinanceDiscountDetailTask)共享数据提取逻辑,以便减少重复的 SQL 查询和数据获取代码。
验收标准
- THE FinanceExtractMixin 或 FinanceBaseTask SHALL 提供财务任务共用的数据提取方法(结算汇总、充值汇总、团购汇总等)
- WHEN 财务类 DWS 子类执行 extract() 时,THE 子类 SHALL 通过共享提取层获取公共数据,仅补充各自特有的提取逻辑
- WHEN 共享提取层返回数据时,THE 数据 SHALL 与原各子类独立提取的结果在数值精度和字段结构上完全一致
需求 4:MV 刷新与数据清理任务合并
用户故事: 作为 ETL 运维人员,我希望将 DWS_MV_REFRESH_FINANCE_DAILY、DWS_MV_REFRESH_ASSISTANT_DAILY 和 DWS_RETENTION_CLEANUP 三个任务合并为一个统一的维护任务,以便简化调度配置和减少任务数量。
验收标准
- THE 合并后的 DWS_MAINTENANCE 任务 SHALL 在单次执行中完成物化视图刷新和历史数据清理
- WHEN DWS_MAINTENANCE 任务执行时,THE 任务 SHALL 先执行物化视图刷新,再执行数据清理
- WHEN 物化视图刷新或数据清理功能被配置为禁用时,THE DWS_MAINTENANCE 任务 SHALL 跳过对应步骤并记录日志
- WHEN DWS_MAINTENANCE 任务完成时,THE 任务 SHALL 返回包含刷新视图数和清理行数的统计信息
- THE TaskRegistry SHALL 移除原 DWS_MV_REFRESH_FINANCE_DAILY、DWS_MV_REFRESH_ASSISTANT_DAILY、DWS_RETENTION_CLEANUP 三个注册项,替换为 DWS_MAINTENANCE
需求 5:MemberIndexBaseTask 模板方法
用户故事: 作为 ETL 开发者,我希望 MemberIndexBaseTask 提供模板方法 execute(),以便子类(WinbackIndexTask、NewconvIndexTask)只需实现 _calculate_scores() 和 _save_results(),减少重复的编排代码。
验收标准
- THE MemberIndexBaseTask SHALL 提供 execute() 模板方法,按顺序执行:获取站点信息 → 加载参数 → 构建会员活动数据 → 调用 _calculate_scores() → 归一化 → 调用 _save_results()
- WHEN 子类实现 _calculate_scores(member_activities, params) 时,THE 方法 SHALL 接收会员活动数据和参数字典,返回原始评分字典
- WHEN 子类实现 _save_results(normalized_scores, context) 时,THE 方法 SHALL 接收归一化后的评分和上下文,完成数据持久化
- WHEN MemberIndexBaseTask 的 execute() 执行完成时,THE 方法 SHALL 返回与原子类 execute() 相同结构的结果字典
需求 6:--layers CLI 参数
用户故事: 作为 ETL 运维人员,我希望使用 --layers ODS,DWD,DWS,INDEX 的自由组合方式替代固定的 pipeline 名称,以便更灵活地控制 ETL 执行范围。
验收标准
- WHEN 用户指定
--layers ODS,DWD时,THE CLI SHALL 解析为 ["ODS", "DWD"] 层列表并按顺序执行对应任务 - WHEN 用户指定
--layers参数时,THE CLI SHALL 接受 ODS、DWD、DWS、INDEX 四个层的任意组合 - THE CLI SHALL 保留
--pipeline参数作为快捷别名(如--pipeline api_full等价于--layers ODS,DWD,DWS,INDEX) - WHEN 用户同时指定
--layers和--pipeline时,THE CLI SHALL 报错并提示两者互斥 - WHEN
--layers包含 DWS 或 INDEX 层时,THE PipelineRunner SHALL 跳过完整性校验或仅执行轻量级行数校验
需求 7:统一层→任务解析
用户故事: 作为 ETL 开发者,我希望去掉 _resolve_tasks() 中的硬编码回退列表,统一走 TaskRegistry.get_tasks_by_layer() 获取任务,以便新增任务时只需在 TaskRegistry 注册即可自动纳入 Flow。
验收标准
- THE PipelineRunner._resolve_tasks() SHALL 仅通过 TaskRegistry.get_tasks_by_layer() 获取各层任务列表,移除所有硬编码回退列表
- WHEN 配置中指定了 run.ods_tasks / run.dws_tasks / run.index_tasks 时,THE _resolve_tasks() SHALL 优先使用配置值
- WHEN TaskRegistry.get_tasks_by_layer() 返回空列表且无配置覆盖时,THE _resolve_tasks() SHALL 记录警告日志并返回空列表
需求 8:任务依赖声明
用户故事: 作为 ETL 开发者,我希望在 TaskMeta 中声明任务间的依赖关系,以便系统自动进行拓扑排序,消除隐式依赖导致的执行顺序错误。
验收标准
- THE TaskMeta 数据类 SHALL 包含 depends_on: list[str] 字段,默认为空列表
- WHEN 注册任务时指定 depends_on 时,THE TaskRegistry SHALL 存储依赖关系
- WHEN _resolve_tasks() 生成任务列表时,THE PipelineRunner SHALL 对任务列表执行拓扑排序,确保被依赖任务排在依赖方之前
- WHEN 任务依赖关系中存在循环依赖时,THE 拓扑排序 SHALL 抛出明确的错误信息,指出循环涉及的任务
- WHEN 任务 A 声明 depends_on 包含任务 B,且任务 B 不在当前执行列表中时,THE 拓扑排序 SHALL 记录警告日志但继续执行
需求 9:关键词重命名 pipeline → flow
用户故事: 作为 ETL 开发者,我希望将代码中所有 "pipeline" 相关术语统一为 "flow",以便术语一致性和代码可读性。
验收标准
- THE PipelineRunner 类 SHALL 重命名为 FlowRunner
- THE PIPELINE_LAYERS 常量 SHALL 重命名为 FLOW_LAYERS
- THE CLI 参数
--pipelineSHALL 重命名为--flow,同时保留--pipeline作为已弃用别名 - WHEN 用户使用已弃用的
--pipeline参数时,THE CLI SHALL 输出弃用警告并正常执行 - THE 代码中所有 pipeline_runner 模块名 SHALL 重命名为 flow_runner
- THE 所有日志消息、注释和文档中的 "pipeline" 术语 SHALL 替换为 "flow"
需求 10:路径重命名 pipelines → connectors
用户故事: 作为 ETL 开发者,我希望将 apps/etl/pipelines 目录重命名为 apps/etl/connectors,以便目录名准确反映其"连接器"语义。
验收标准
- THE 目录
apps/etl/pipelines/SHALL 重命名为apps/etl/connectors/ - WHEN 路径重命名完成后,THE 所有 Python 导入路径 SHALL 更新为使用新路径
- WHEN 路径重命名完成后,THE 所有配置文件、脚本和文档中的旧路径引用 SHALL 更新为新路径
- WHEN 路径重命名完成后,THE pyproject.toml 中的 workspace 成员声明 SHALL 更新为新路径
- WHEN 路径重命名完成后,THE 所有测试 SHALL 通过且无导入错误
需求 11:回归测试与数据验证
用户故事: 作为 ETL 运维人员,我希望重构后的系统通过完整的回归测试,以便确保数据处理无错误和偏移。
验收标准
- WHEN BaseDwsTask 模板方法重构完成后,THE 所有现有 DWS 单元测试 SHALL 通过且无失败
- WHEN --layers 参数实现完成后,THE CLI 参数解析测试 SHALL 覆盖所有合法和非法的层组合
- WHEN 任务依赖声明实现完成后,THE 拓扑排序测试 SHALL 覆盖正常依赖、循环依赖和缺失依赖场景
- WHEN 关键词和路径重命名完成后,THE 所有现有测试 SHALL 通过且无导入错误
- WHEN 整体重构完成后,THE 系统 SHALL 通过端到端 dry-run 测试,验证 ODS→DWD→DWS→INDEX 全链路无异常
需求 12:文档同步更新
用户故事: 作为 ETL 开发者,我希望所有相关文档在重构后同步更新,以便文档与代码保持一致。
验收标准
- WHEN 重构完成后,THE
docs/etl-feiqiu-architecture.mdSHALL 反映所有类名、方法名和术语变更 - WHEN 重构完成后,THE
apps/etl/pipelines/feiqiu/docs/下的所有文档 SHALL 更新路径引用和术语 - WHEN 重构完成后,THE CLI 帮助文本和示例 SHALL 反映新的
--layers和--flow参数 - WHEN 重构完成后,THE
tasks/README.mdSHALL 更新任务列表和继承关系说明