tasks/ — ETL 任务
目录结构
tasks/
├── base_task.py # BaseTask 基类(Extract → Transform → Load 模板方法)
├── ods/ # ODS 层:从 API 抓取或离线 JSON 回放,写入 ODS 表
├── dwd/ # DWD 层:从 ODS 清洗装载到 DWD(维度 SCD2 + 事实增量)
├── dws/ # DWS 层:汇总统计(助教业绩、财务日报、会员分析等)
│ └── index/ # 指数计算(亲密度、新客转化、召回、关系、赢回)
├── utility/ # 工具类任务(Schema 初始化、手动入库、完整性检查等)
└── verification/ # 校验任务(ODS/DWD/DWS/指数层的数据一致性校验)
新增任务流程
- 在对应子目录创建任务文件,继承
BaseTask - 实现
get_task_code()返回大写蛇形任务代码(如DWS_MEMBER_VISIT) - 实现
execute(context)方法,包含 Extract → Transform → Load 逻辑 - 在
orchestration/task_registry.py中注册任务,指定元数据:layer:ODS/DWD/DWS/UTILITY/VERIFICATIONtask_type:ETL/UTILITY/VERIFICATIONrequires_db_config:是否需要数据库连接
# 示例:注册一个新的 DWS 任务
registry.register(
task_code="DWS_NEW_REPORT",
task_class=NewReportTask,
layer="DWS",
task_type="ETL",
requires_db_config=True,
)
任务命名约定
- 任务代码:大写蛇形(
DWD_LOAD_FROM_ODS、DWS_ASSISTANT_DAILY) - 文件名:小写蛇形 +
_task.py后缀(assistant_daily_task.py) - 类名:驼峰 +
Task后缀(AssistantDailyTask)
ODS 任务特殊说明
ODS 任务通过 ods/ods_tasks.py 中的 ODS_TASK_SPECS 声明式定义,无需为每个实体单独写 execute 逻辑。新增 ODS 实体只需在 ODS_TASK_SPECS 列表中添加一条 spec 记录。