# -*- coding: utf-8 -*- """Data integrity task that checks API -> ODS -> DWD completeness.""" from __future__ import annotations from datetime import datetime from zoneinfo import ZoneInfo from dateutil import parser as dtparser from utils.windowing import build_window_segments, calc_window_minutes from .base_task import BaseTask from quality.integrity_service import run_history_flow, run_window_flow, write_report class DataIntegrityTask(BaseTask): """Check data completeness across API -> ODS -> DWD.""" def get_task_code(self) -> str: return "DATA_INTEGRITY_CHECK" def execute(self, cursor_data: dict | None = None) -> dict: tz = ZoneInfo(self.config.get("app.timezone", "Asia/Taipei")) mode = str(self.config.get("integrity.mode", "history") or "history").lower() include_dimensions = bool(self.config.get("integrity.include_dimensions", False)) task_codes = str(self.config.get("integrity.ods_task_codes", "") or "").strip() auto_backfill = bool(self.config.get("integrity.auto_backfill", False)) compare_content = self.config.get("integrity.compare_content") if compare_content is None: compare_content = True content_sample_limit = self.config.get("integrity.content_sample_limit") backfill_mismatch = self.config.get("integrity.backfill_mismatch") if backfill_mismatch is None: backfill_mismatch = True recheck_after_backfill = self.config.get("integrity.recheck_after_backfill") if recheck_after_backfill is None: recheck_after_backfill = True # Switch to window mode when CLI override is provided. window_override_start = self.config.get("run.window_override.start") window_override_end = self.config.get("run.window_override.end") if window_override_start or window_override_end: self.logger.info( "Detected CLI window override. Switching to window mode: %s ~ %s", window_override_start, window_override_end, ) mode = "window" if mode == "window": base_start, base_end, _ = self._get_time_window(cursor_data) segments = build_window_segments( self.config, base_start, base_end, tz=tz, override_only=True, ) if not segments: segments = [(base_start, base_end)] total_segments = len(segments) if total_segments > 1: self.logger.info("Data integrity check split into %s segments.", total_segments) report, counts = run_window_flow( cfg=self.config, windows=segments, include_dimensions=include_dimensions, task_codes=task_codes, logger=self.logger, compare_content=bool(compare_content), content_sample_limit=content_sample_limit, do_backfill=bool(auto_backfill), include_mismatch=bool(backfill_mismatch), recheck_after_backfill=bool(recheck_after_backfill), page_size=int(self.config.get("api.page_size") or 200), chunk_size=500, ) overall_start = segments[0][0] overall_end = segments[-1][1] report_path = write_report(report, prefix="data_integrity_window", tz=tz) report["report_path"] = report_path return { "status": "SUCCESS", "counts": counts, "window": { "start": overall_start, "end": overall_end, "minutes": calc_window_minutes(overall_start, overall_end), }, "report_path": report_path, "backfill_result": report.get("backfill_result"), } history_start = str(self.config.get("integrity.history_start", "2025-07-01") or "2025-07-01") history_end = str(self.config.get("integrity.history_end", "") or "").strip() start_dt = dtparser.parse(history_start) if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=tz) else: start_dt = start_dt.astimezone(tz) end_dt = None if history_end: end_dt = dtparser.parse(history_end) if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=tz) else: end_dt = end_dt.astimezone(tz) report, counts = run_history_flow( cfg=self.config, start_dt=start_dt, end_dt=end_dt, include_dimensions=include_dimensions, task_codes=task_codes, logger=self.logger, compare_content=bool(compare_content), content_sample_limit=content_sample_limit, do_backfill=bool(auto_backfill), include_mismatch=bool(backfill_mismatch), recheck_after_backfill=bool(recheck_after_backfill), page_size=int(self.config.get("api.page_size") or 200), chunk_size=500, ) report_path = write_report(report, prefix="data_integrity_history", tz=tz) report["report_path"] = report_path end_dt_used = end_dt if end_dt_used is None: end_str = report.get("end") if end_str: parsed = dtparser.parse(end_str) if parsed.tzinfo is None: end_dt_used = parsed.replace(tzinfo=tz) else: end_dt_used = parsed.astimezone(tz) if end_dt_used is None: end_dt_used = start_dt return { "status": "SUCCESS", "counts": counts, "window": { "start": start_dt, "end": end_dt_used, "minutes": int((end_dt_used - start_dt).total_seconds() // 60) if end_dt_used > start_dt else 0, }, "report_path": report_path, "backfill_result": report.get("backfill_result"), }