Files
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

8.9 KiB
Raw Permalink Blame History

实施计划ETL DWS/Flow 重构

概述

按 4 个阶段顺序实施BaseDwsTask 模板方法重构 → --layers CLI 参数 → 任务依赖声明 → 关键词/路径重命名。每个阶段完成后运行回归测试。

任务

  • 1. BaseDwsTask 默认模板方法

    • 1.1 在 BaseDwsTask 中添加 DATE_COL 类属性和默认 extract()/load() 实现

      • 添加 DATE_COL: str | None = None 类属性
      • 添加 _do_extract(self, context) -> list[dict] 抽象方法raise NotImplementedError
      • 实现默认 extract():调用 _do_extract() 并包装为标准字典
      • 实现默认 load()delete_existing_data + bulk_insert返回标准统计字典
      • Requirements: 1.1, 1.2, 1.4, 1.5
    • 1.2 编写 BaseDwsTask 默认模板方法的属性测试

      • Property 1: 默认 extract() 返回标准结构
      • Validates: Requirements 1.1, 1.4
      • Property 2: 默认 load() 幂等写入与标准统计
      • Validates: Requirements 1.2, 1.5
    • 1.3 迁移 DWS 子类使用默认模板方法

      • 为每个 DWS 子类声明 DATE_COL
      • 将各子类的 extract() 逻辑迁移到 _do_extract()
      • 移除与默认 load() 行为一致的子类 load() 覆盖
      • 保留有自定义逻辑的子类覆盖(如 AssistantSalaryTask 的月度删除)
      • 涉及文件assistant_daily_task.py, assistant_monthly_task.py, assistant_customer_task.py, assistant_finance_task.py, member_consumption_task.py, member_visit_task.py, finance_daily_task.py, finance_recharge_task.py, finance_income_task.py, finance_discount_task.py
      • Requirements: 1.1, 1.2, 1.3
    • 1.4 运行现有 DWS 单元测试确认无回归

      • cd apps/etl/pipelines/feiqiu && pytest tests/unit/test_dws_tasks.py -v
      • Requirements: 11.1
  • 2. 公共辅助方法提取与财务基类

    • 2.1 创建 dws_helpers.py 公共辅助模块

      • 创建 tasks/dws/dws_helpers.py
      • 提取 mask_mobile()、calc_days_since()、parse_id_list()、safe_division() 等函数
      • 更新各 DWS 子类的导入,替换内联实现为 dws_helpers 调用
      • Requirements: 2.1, 2.2
    • 2.2 编写 dws_helpers 函数等价性属性测试

      • Property 3: dws_helpers 函数等价性
      • Validates: Requirements 2.3
    • 2.3 创建 FinanceBaseTask 共享提取层

      • 创建 tasks/dws/finance_base_task.py
      • 从 FinanceDailyTask 提取共享方法_extract_settlement_summary, _extract_recharge_summary, _extract_groupbuy_summary, _extract_platform_summary
      • 迁移 FinanceDailyTask, FinanceRechargeTask, FinanceIncomeStructureTask, FinanceDiscountDetailTask 继承 FinanceBaseTask
      • Requirements: 3.1, 3.2, 3.3
  • 3. MV 刷新与数据清理合并 + MemberIndexBaseTask 模板

    • 3.1 创建 DwsMaintenanceTask 合并任务

      • 创建 tasks/dws/maintenance_task.py
      • 合并 BaseMvRefreshTask 和 DwsRetentionCleanupTask 的核心逻辑
      • 在 TaskRegistry 中注册 DWS_MAINTENANCE移除原三个任务注册
      • 更新 tasks/dws/__init__.py 导出
      • Requirements: 4.1, 4.2, 4.3, 4.4, 4.5
    • 3.2 编写 DwsMaintenanceTask 属性测试和单元测试

      • Property 4: DwsMaintenanceTask 配置控制
      • Validates: Requirements 4.3, 4.4
      • 单元测试:执行顺序(先刷新后清理)、注册项替换
    • 3.3 重构 MemberIndexBaseTask 模板方法

      • 在 MemberIndexBaseTask 中实现 execute() 模板方法
      • 添加 _calculate_scores() 和 _save_results() 抽象方法
      • 迁移 WinbackIndexTask 和 NewconvIndexTask 使用新模板
      • Requirements: 5.1, 5.2, 5.3, 5.4
  • 4. 检查点 - 阶段 1 回归测试

    • 运行 cd apps/etl/pipelines/feiqiu && pytest tests/unit -v 确保所有测试通过
    • 确保所有测试通过,如有问题请询问用户
  • 5. --layers CLI 参数与统一层解析

    • 5.1 实现 --layers CLI 参数

      • 在 cli/main.py 中添加 --layers 参数
      • 实现 parse_layers() 函数:解析逗号分隔的层名,校验合法性
      • 添加 --layers 和 --pipeline 互斥校验
      • 更新 main() 函数:当指定 --layers 时,构造层列表传递给 PipelineRunner
      • Requirements: 6.1, 6.2, 6.3, 6.4
    • 5.2 编写 --layers 解析属性测试

      • Property 5: --layers 解析正确性
      • Validates: Requirements 6.1, 6.2
    • 5.3 统一 _resolve_tasks() 去掉硬编码回退

      • 移除 _resolve_tasks() 中所有硬编码回退列表
      • 统一走 TaskRegistry.get_tasks_by_layer() 获取任务
      • 保留配置优先级run.ods_tasks / run.dws_tasks / run.index_tasks > Registry
      • 空 Registry + 无配置时记录警告并返回空列表
      • Requirements: 7.1, 7.2, 7.3
    • 5.4 编写配置优先级属性测试

      • Property 6: 配置优先级——配置值优先于 Registry
      • Validates: Requirements 7.2
    • 5.5 实现 DWS/INDEX 层轻量级校验

      • 当 --layers 包含 DWS 或 INDEX 时,跳过完整性校验或仅执行行数校验
      • Requirements: 6.5
  • 6. 任务依赖声明与拓扑排序

    • 6.1 扩展 TaskMeta 添加 depends_on 字段

      • 在 TaskMeta 数据类中添加 depends_on: list[str] = field(default_factory=list)
      • 更新 TaskRegistry.register() 接受 depends_on 参数
      • 为已知依赖关系添加声明DWS_ASSISTANT_FINANCE → DWS_ASSISTANT_SALARY 等)
      • Requirements: 8.1, 8.2
    • 6.2 实现拓扑排序函数

      • 创建 orchestration/topological_sort.py
      • 实现 Kahn's algorithm 拓扑排序
      • 处理循环依赖检测(抛出 ValueError
      • 处理缺失依赖警告(记录日志继续执行)
      • 在 PipelineRunner._resolve_tasks() 中集成拓扑排序
      • Requirements: 8.3, 8.4, 8.5
    • 6.3 编写拓扑排序属性测试

      • Property 7: 拓扑排序正确性
      • Validates: Requirements 8.3
      • Property 8: 循环依赖检测
      • Validates: Requirements 8.4
  • 7. 检查点 - 阶段 2+3 回归测试

    • 运行 cd apps/etl/pipelines/feiqiu && pytest tests/unit -v 确保所有测试通过
    • 运行 cd C:\Project\NeoZQYY && pytest tests/ -v 确保 Monorepo 属性测试通过
    • 确保所有测试通过,如有问题请询问用户
  • 8. 关键词重命名 pipeline → flow

    • 8.1 重命名 PipelineRunner → FlowRunner

      • orchestration/pipeline_runner.py 重命名为 orchestration/flow_runner.py
      • 类名 PipelineRunner → FlowRunner
      • 常量 PIPELINE_LAYERS → FLOW_LAYERS
      • 更新所有导入引用task_executor.py, cli/main.py, scheduler.py 等)
      • Requirements: 9.1, 9.2, 9.5
    • 8.2 更新 CLI 参数 --pipeline → --flow

      • --pipeline 重命名为 --flow
      • 保留 --pipeline 作为已弃用别名(使用 argparse dest 映射)
      • 使用已弃用参数时输出 DeprecationWarning
      • 更新 --layers 互斥校验同时检查 --flow 和 --pipeline
      • Requirements: 9.3, 9.4
    • 8.3 更新所有日志消息和注释中的 pipeline 术语

      • 全局搜索替换日志中的 "Pipeline" / "pipeline" → "Flow" / "flow"
      • 更新代码注释中的术语
      • Requirements: 9.6
  • 9. 路径重命名 pipelines → connectors

    • 9.1 重命名目录 apps/etl/pipelines → apps/etl/connectors

      • 执行目录重命名
      • 更新 pyproject.toml workspace 成员声明
      • 更新所有 Python 导入路径
      • Requirements: 10.1, 10.2, 10.4
    • 9.2 更新所有配置文件、脚本和文档中的路径引用

      • 更新 .env / .env.template 中的路径
      • 更新 run_etl.bat / run_etl.sh 中的路径
      • 更新 scripts/ 目录下引用旧路径的脚本
      • Requirements: 10.3
    • 9.3 运行全量测试确认路径重命名无回归

      • cd apps/etl/connectors/feiqiu && pytest tests/unit -v
      • cd C:\Project\NeoZQYY && pytest tests/ -v
      • Requirements: 10.5
  • 10. 文档同步更新

    • 10.1 更新架构文档和模块文档

      • 更新 docs/etl-feiqiu-architecture.md:类名、方法名、术语、路径
      • 更新 apps/etl/connectors/feiqiu/docs/ 下所有文档
      • 更新 tasks/README.md:任务列表和继承关系
      • Requirements: 12.1, 12.2, 12.4
    • 10.2 更新 CLI 帮助文本和示例

      • 更新 cli/main.py 中的 epilog 示例
      • 更新 argparse 帮助文本反映 --layers 和 --flow 参数
      • Requirements: 12.3
  • 11. 最终检查点 - 全量回归测试

    • 运行 cd apps/etl/connectors/feiqiu && pytest tests/unit -v
    • 运行 cd C:\Project\NeoZQYY && pytest tests/ -v
    • 确保所有测试通过,如有问题请询问用户
    • Requirements: 11.1, 11.2, 11.3, 11.4, 11.5

备注

  • 标记 * 的子任务为可选,可跳过以加速 MVP
  • 每个任务引用具体需求编号以确保可追溯性
  • 检查点确保增量验证
  • 属性测试验证普遍正确性属性,单元测试验证具体示例和边界条件
  • 阶段 4关键词/路径重命名)风险最高,建议在独立 Git 分支上执行