Files
feiqiu-ETL/etl_billiards/tasks/base_dwd_task.py
2025-11-30 07:19:05 +08:00

80 lines
2.3 KiB
Python

# -*- coding: utf-8 -*-
"""DWD任务基类"""
import json
from typing import Any, Dict, Iterator, List, Optional, Tuple
from datetime import datetime
from .base_task import BaseTask
from models.parsers import TypeParser
class BaseDwdTask(BaseTask):
"""
DWD 层任务基类
负责从 ODS 表读取数据,供子类清洗和写入事实/维度表
"""
def _get_ods_cursor(self, task_code: str) -> datetime:
"""
获取上次处理的 ODS 数据的时间点 (fetched_at)
这里简化处理,实际应该从 etl_cursor 表读取
目前先依赖 BaseTask 的时间窗口逻辑,或者子类自己管理
"""
# TODO: 对接真正的 CursorManager
# 暂时返回一个较早的时间,或者由子类通过 _get_time_window 获取
return None
def iter_ods_rows(
self,
table_name: str,
columns: List[str],
start_time: datetime,
end_time: datetime,
time_col: str = "fetched_at",
batch_size: int = 1000
) -> Iterator[List[Dict[str, Any]]]:
"""
分批迭代读取 ODS 表数据
Args:
table_name: ODS 表名
columns: 需要查询的字段列表 (必须包含 payload)
start_time: 开始时间 (包含)
end_time: 结束时间 (包含)
time_col: 时间过滤字段,默认 fetched_at
batch_size: 批次大小
"""
offset = 0
cols_str = ", ".join(columns)
while True:
sql = f"""
SELECT {cols_str}
FROM {table_name}
WHERE {time_col} >= %s AND {time_col} <= %s
ORDER BY {time_col} ASC
LIMIT %s OFFSET %s
"""
rows = self.db.query(sql, (start_time, end_time, batch_size, offset))
if not rows:
break
yield rows
if len(rows) < batch_size:
break
offset += batch_size
def parse_payload(self, row: Dict[str, Any]) -> Dict[str, Any]:
"""
解析 ODS 行中的 payload JSON
"""
payload = row.get("payload")
if isinstance(payload, str):
return json.loads(payload)
elif isinstance(payload, dict):
return payload
return {}