# -*- coding: utf-8 -*- """Time window helpers for ETL and validation tasks.""" from __future__ import annotations from datetime import datetime, timedelta, time from typing import List, Tuple from zoneinfo import ZoneInfo def _ensure_tz(dt: datetime, tz: ZoneInfo | None) -> datetime: if tz is None: return dt if dt.tzinfo is None: return dt.replace(tzinfo=tz) return dt.astimezone(tz) def _next_month_start(dt: datetime, tz: ZoneInfo | None) -> datetime: year = dt.year month = dt.month if month == 12: year += 1 month = 1 else: month += 1 return datetime(year, month, 1, tzinfo=tz) def calc_window_minutes(start: datetime, end: datetime) -> int: if end <= start: return 0 return max(1, int((end - start).total_seconds() // 60)) def split_window( start: datetime, end: datetime, *, tz: ZoneInfo | None, split_unit: str | None, compensation_hours: int | float | None, ) -> List[Tuple[datetime, datetime]]: start = _ensure_tz(start, tz) end = _ensure_tz(end, tz) comp = int(compensation_hours or 0) if comp: start = start - timedelta(hours=comp) end = end + timedelta(hours=comp) if end <= start: return [] unit = (split_unit or "").strip().lower() if unit in ("", "none", "off", "false", "0"): return [(start, end)] if unit not in ("month", "monthly"): return [(start, end)] windows: List[Tuple[datetime, datetime]] = [] cur = start while cur < end: boundary = _next_month_start(cur, tz) nxt = boundary if boundary < end else end if nxt <= cur: break windows.append((cur, nxt)) cur = nxt return windows def build_window_segments( cfg, start: datetime, end: datetime, *, tz: ZoneInfo | None, override_only: bool, ) -> List[Tuple[datetime, datetime]]: split_unit = cfg.get("run.window_split.unit", "month") compensation_hours = cfg.get("run.window_split.compensation_hours", 0) if override_only: override_start = cfg.get("run.window_override.start") override_end = cfg.get("run.window_override.end") if not (override_start and override_end): split_unit = "none" compensation_hours = 0 return split_window( start, end, tz=tz, split_unit=split_unit, compensation_hours=compensation_hours, )