80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""DWD任务基类"""
|
|
import json
|
|
from typing import Any, Dict, Iterator, List, Optional, Tuple
|
|
from datetime import datetime
|
|
|
|
from .base_task import BaseTask
|
|
from models.parsers import TypeParser
|
|
|
|
class BaseDwdTask(BaseTask):
|
|
"""
|
|
DWD 层任务基类
|
|
负责从 ODS 表读取数据,供子类清洗和写入事实/维度表
|
|
"""
|
|
|
|
def _get_ods_cursor(self, task_code: str) -> datetime:
|
|
"""
|
|
获取上次处理的 ODS 数据的时间点 (fetched_at)
|
|
这里简化处理,实际应该从 etl_cursor 表读取
|
|
目前先依赖 BaseTask 的时间窗口逻辑,或者子类自己管理
|
|
"""
|
|
# TODO: 对接真正的 CursorManager
|
|
# 暂时返回一个较早的时间,或者由子类通过 _get_time_window 获取
|
|
return None
|
|
|
|
def iter_ods_rows(
|
|
self,
|
|
table_name: str,
|
|
columns: List[str],
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
time_col: str = "fetched_at",
|
|
batch_size: int = 1000
|
|
) -> Iterator[List[Dict[str, Any]]]:
|
|
"""
|
|
分批迭代读取 ODS 表数据
|
|
|
|
Args:
|
|
table_name: ODS 表名
|
|
columns: 需要查询的字段列表 (必须包含 payload)
|
|
start_time: 开始时间 (包含)
|
|
end_time: 结束时间 (包含)
|
|
time_col: 时间过滤字段,默认 fetched_at
|
|
batch_size: 批次大小
|
|
"""
|
|
offset = 0
|
|
cols_str = ", ".join(columns)
|
|
|
|
while True:
|
|
sql = f"""
|
|
SELECT {cols_str}
|
|
FROM {table_name}
|
|
WHERE {time_col} >= %s AND {time_col} <= %s
|
|
ORDER BY {time_col} ASC
|
|
LIMIT %s OFFSET %s
|
|
"""
|
|
|
|
rows = self.db.query(sql, (start_time, end_time, batch_size, offset))
|
|
|
|
if not rows:
|
|
break
|
|
|
|
yield rows
|
|
|
|
if len(rows) < batch_size:
|
|
break
|
|
|
|
offset += batch_size
|
|
|
|
def parse_payload(self, row: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
解析 ODS 行中的 payload JSON
|
|
"""
|
|
payload = row.get("payload")
|
|
if isinstance(payload, str):
|
|
return json.loads(payload)
|
|
elif isinstance(payload, dict):
|
|
return payload
|
|
return {}
|