Files

tasks/ — ETL 任务

目录结构

tasks/
├── base_task.py        # BaseTask 基类Extract → Transform → Load 模板方法)
├── ods/                # ODS 层:从 API 抓取或离线 JSON 回放,写入 ODS 表
├── dwd/                # DWD 层:从 ODS 清洗装载到 DWD维度 SCD2 + 事实增量)
├── dws/                # DWS 层:汇总统计(助教业绩、财务日报、会员分析等)
│   └── index/          # 指数计算(亲密度、新客转化、召回、关系、赢回)
├── utility/            # 工具类任务Schema 初始化、手动入库、完整性检查等)
└── verification/       # 校验任务ODS/DWD/DWS/指数层的数据一致性校验)

新增任务流程

  1. 在对应子目录创建任务文件,继承 BaseTask
  2. 实现 get_task_code() 返回大写蛇形任务代码(如 DWS_MEMBER_VISIT
  3. 实现 execute(context) 方法,包含 Extract → Transform → Load 逻辑
  4. orchestration/task_registry.py 中注册任务,指定元数据:
    • layerODS / DWD / DWS / UTILITY / VERIFICATION
    • task_typeETL / UTILITY / VERIFICATION
    • requires_db_config:是否需要数据库连接
# 示例:注册一个新的 DWS 任务
registry.register(
    task_code="DWS_NEW_REPORT",
    task_class=NewReportTask,
    layer="DWS",
    task_type="ETL",
    requires_db_config=True,
)

任务命名约定

  • 任务代码:大写蛇形(DWD_LOAD_FROM_ODSDWS_ASSISTANT_DAILY
  • 文件名:小写蛇形 + _task.py 后缀(assistant_daily_task.py
  • 类名:驼峰 + Task 后缀(AssistantDailyTask

ODS 任务特殊说明

ODS 任务通过 ods/ods_tasks.py 中的 ODS_TASK_SPECS 声明式定义,无需为每个实体单独写 execute 逻辑。新增 ODS 实体只需在 ODS_TASK_SPECS 列表中添加一条 spec 记录。