# -*- coding: utf-8 -*- """DWD任务基类""" import json from typing import Any, Dict, Iterator, List, Optional, Tuple from datetime import datetime from .base_task import BaseTask from models.parsers import TypeParser class BaseDwdTask(BaseTask): """ DWD 层任务基类 负责从 ODS 表读取数据,供子类清洗和写入事实/维度表 """ def _get_ods_cursor(self, task_code: str) -> datetime: """ 获取上次处理的 ODS 数据的时间点 (fetched_at) 这里简化处理,实际应该从 etl_cursor 表读取 目前先依赖 BaseTask 的时间窗口逻辑,或者子类自己管理 """ # TODO: 对接真正的 CursorManager # 暂时返回一个较早的时间,或者由子类通过 _get_time_window 获取 return None def iter_ods_rows( self, table_name: str, columns: List[str], start_time: datetime, end_time: datetime, time_col: str = "fetched_at", batch_size: int = 1000 ) -> Iterator[List[Dict[str, Any]]]: """ 分批迭代读取 ODS 表数据 Args: table_name: ODS 表名 columns: 需要查询的字段列表 (必须包含 payload) start_time: 开始时间 (包含) end_time: 结束时间 (包含) time_col: 时间过滤字段,默认 fetched_at batch_size: 批次大小 """ offset = 0 cols_str = ", ".join(columns) while True: sql = f""" SELECT {cols_str} FROM {table_name} WHERE {time_col} >= %s AND {time_col} <= %s ORDER BY {time_col} ASC LIMIT %s OFFSET %s """ rows = self.db.query(sql, (start_time, end_time, batch_size, offset)) if not rows: break yield rows if len(rows) < batch_size: break offset += batch_size def parse_payload(self, row: Dict[str, Any]) -> Dict[str, Any]: """ 解析 ODS 行中的 payload JSON """ payload = row.get("payload") if isinstance(payload, str): return json.loads(payload) elif isinstance(payload, dict): return payload return {}