98 lines
2.4 KiB
Python
98 lines
2.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Time window helpers for ETL and validation tasks."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, time
|
|
from typing import List, Tuple
|
|
from zoneinfo import ZoneInfo
|
|
|
|
|
|
def _ensure_tz(dt: datetime, tz: ZoneInfo | None) -> datetime:
|
|
if tz is None:
|
|
return dt
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=tz)
|
|
return dt.astimezone(tz)
|
|
|
|
|
|
def _next_month_start(dt: datetime, tz: ZoneInfo | None) -> datetime:
|
|
year = dt.year
|
|
month = dt.month
|
|
if month == 12:
|
|
year += 1
|
|
month = 1
|
|
else:
|
|
month += 1
|
|
return datetime(year, month, 1, tzinfo=tz)
|
|
|
|
|
|
def calc_window_minutes(start: datetime, end: datetime) -> int:
|
|
if end <= start:
|
|
return 0
|
|
return max(1, int((end - start).total_seconds() // 60))
|
|
|
|
|
|
def split_window(
|
|
start: datetime,
|
|
end: datetime,
|
|
*,
|
|
tz: ZoneInfo | None,
|
|
split_unit: str | None,
|
|
compensation_hours: int | float | None,
|
|
) -> List[Tuple[datetime, datetime]]:
|
|
start = _ensure_tz(start, tz)
|
|
end = _ensure_tz(end, tz)
|
|
|
|
comp = int(compensation_hours or 0)
|
|
if comp:
|
|
start = start - timedelta(hours=comp)
|
|
end = end + timedelta(hours=comp)
|
|
|
|
if end <= start:
|
|
return []
|
|
|
|
unit = (split_unit or "").strip().lower()
|
|
if unit in ("", "none", "off", "false", "0"):
|
|
return [(start, end)]
|
|
|
|
if unit not in ("month", "monthly"):
|
|
return [(start, end)]
|
|
|
|
windows: List[Tuple[datetime, datetime]] = []
|
|
cur = start
|
|
while cur < end:
|
|
boundary = _next_month_start(cur, tz)
|
|
nxt = boundary if boundary < end else end
|
|
if nxt <= cur:
|
|
break
|
|
windows.append((cur, nxt))
|
|
cur = nxt
|
|
return windows
|
|
|
|
|
|
def build_window_segments(
|
|
cfg,
|
|
start: datetime,
|
|
end: datetime,
|
|
*,
|
|
tz: ZoneInfo | None,
|
|
override_only: bool,
|
|
) -> List[Tuple[datetime, datetime]]:
|
|
split_unit = cfg.get("run.window_split.unit", "month")
|
|
compensation_hours = cfg.get("run.window_split.compensation_hours", 0)
|
|
|
|
if override_only:
|
|
override_start = cfg.get("run.window_override.start")
|
|
override_end = cfg.get("run.window_override.end")
|
|
if not (override_start and override_end):
|
|
split_unit = "none"
|
|
compensation_hours = 0
|
|
|
|
return split_window(
|
|
start,
|
|
end,
|
|
tz=tz,
|
|
split_unit=split_unit,
|
|
compensation_hours=compensation_hours,
|
|
)
|