# -*- coding: utf-8 -*- """ETL任务基类(引入 Extract/Transform/Load 模板方法)""" from __future__ import annotations from dataclasses import dataclass from datetime import datetime, timedelta from zoneinfo import ZoneInfo @dataclass(frozen=True) class TaskContext: """统一透传给 Extract/Transform/Load 的运行期信息。""" store_id: int window_start: datetime window_end: datetime window_minutes: int cursor: dict | None = None class BaseTask: """提供 E/T/L 模板的任务基类。""" def __init__(self, config, db_connection, api_client, logger): self.config = config self.db = db_connection self.api = api_client self.logger = logger self.tz = ZoneInfo(config.get("app.timezone", "Asia/Taipei")) # ------------------------------------------------------------------ 基本信息 def get_task_code(self) -> str: """获取任务代码""" raise NotImplementedError("子类需实现 get_task_code 方法") # ------------------------------------------------------------------ E/T/L 钩子 def extract(self, context: TaskContext): """提取数据""" raise NotImplementedError("子类需实现 extract 方法") def transform(self, extracted, context: TaskContext): """转换数据""" return extracted def load(self, transformed, context: TaskContext) -> dict: """加载数据并返回统计信息""" raise NotImplementedError("子类需实现 load 方法") # ------------------------------------------------------------------ 主流程 def execute(self, cursor_data: dict | None = None) -> dict: """统一 orchestrate Extract → Transform → Load""" context = self._build_context(cursor_data) task_code = self.get_task_code() self.logger.info( "%s: 开始执行,窗口[%s ~ %s]", task_code, context.window_start, context.window_end, ) try: extracted = self.extract(context) transformed = self.transform(extracted, context) counts = self.load(transformed, context) or {} self.db.commit() except Exception: self.db.rollback() self.logger.error("%s: 执行失败", task_code, exc_info=True) raise result = self._build_result("SUCCESS", counts) result["window"] = { "start": context.window_start, "end": context.window_end, "minutes": context.window_minutes, } self.logger.info("%s: 完成,统计=%s", task_code, result["counts"]) return result # ------------------------------------------------------------------ 辅助方法 def _build_context(self, cursor_data: dict | None) -> TaskContext: window_start, window_end, window_minutes = self._get_time_window(cursor_data) return TaskContext( store_id=self.config.get("app.store_id"), window_start=window_start, window_end=window_end, window_minutes=window_minutes, cursor=cursor_data, ) def _get_time_window(self, cursor_data: dict = None) -> tuple: """计算时间窗口""" now = datetime.now(self.tz) idle_start = self.config.get("run.idle_window.start", "04:00") idle_end = self.config.get("run.idle_window.end", "16:00") is_idle = self._is_in_idle_window(now, idle_start, idle_end) if is_idle: window_minutes = self.config.get("run.window_minutes.default_idle", 180) else: window_minutes = self.config.get("run.window_minutes.default_busy", 30) overlap_seconds = self.config.get("run.overlap_seconds", 120) if cursor_data and cursor_data.get("last_end"): window_start = cursor_data["last_end"] - timedelta(seconds=overlap_seconds) else: window_start = now - timedelta(minutes=window_minutes) window_end = now return window_start, window_end, window_minutes def _is_in_idle_window(self, dt: datetime, start_time: str, end_time: str) -> bool: """判断是否在闲时窗口""" current_time = dt.strftime("%H:%M") return start_time <= current_time <= end_time def _merge_common_params(self, base: dict) -> dict: """ 合并全局/任务级参数池,便于在配置中统一覆�?/追加过滤条件。 支持: - api.params 下的通用键�? - api.params. 下的任务级键�? """ merged: dict = {} common = self.config.get("api.params", {}) or {} if isinstance(common, dict): merged.update(common) task_key = f"api.params.{self.get_task_code().lower()}" scoped = self.config.get(task_key, {}) or {} if isinstance(scoped, dict): merged.update(scoped) merged.update(base) return merged def _build_result(self, status: str, counts: dict) -> dict: """构建结果字典""" return {"status": status, "counts": counts}