# -*- coding: utf-8 -*- """Data integrity task that checks API -> ODS -> DWD completeness.""" from __future__ import annotations from datetime import datetime from zoneinfo import ZoneInfo from dateutil import parser as dtparser import json from pathlib import Path from utils.windowing import build_window_segments, calc_window_minutes from .base_task import BaseTask from quality.integrity_checker import ( IntegrityWindow, compute_last_etl_end, run_integrity_history, run_integrity_window, ) class DataIntegrityTask(BaseTask): """Check data completeness across API -> ODS -> DWD.""" def get_task_code(self) -> str: return "DATA_INTEGRITY_CHECK" def execute(self, cursor_data: dict | None = None) -> dict: tz = ZoneInfo(self.config.get("app.timezone", "Asia/Taipei")) mode = str(self.config.get("integrity.mode", "history") or "history").lower() include_dimensions = bool(self.config.get("integrity.include_dimensions", False)) task_codes = str(self.config.get("integrity.ods_task_codes", "") or "").strip() auto_backfill = bool(self.config.get("integrity.auto_backfill", False)) # 检测是否通过 CLI 传入了时间窗口参数(window_override) # 如果有,自动切换到 window 模式 window_override_start = self.config.get("run.window_override.start") window_override_end = self.config.get("run.window_override.end") if window_override_start or window_override_end: self.logger.info( "检测到 CLI 时间窗口参数,自动切换到 window 模式: %s ~ %s", window_override_start, window_override_end ) mode = "window" if mode == "window": base_start, base_end, _ = self._get_time_window(cursor_data) segments = build_window_segments( self.config, base_start, base_end, tz=tz, override_only=True, ) if not segments: segments = [(base_start, base_end)] total_segments = len(segments) if total_segments > 1: self.logger.info("数据完整性检查: 分段执行 共%s段", total_segments) window_reports = [] total_missing = 0 total_errors = 0 for idx, (seg_start, seg_end) in enumerate(segments, start=1): window = IntegrityWindow( start=seg_start, end=seg_end, label=f"segment_{idx}", granularity="window", ) payload = run_integrity_window( cfg=self.config, window=window, include_dimensions=include_dimensions, task_codes=task_codes, logger=self.logger, write_report=False, window_split_unit="none", window_compensation_hours=0, ) window_reports.append(payload) total_missing += int(payload.get("api_to_ods", {}).get("total_missing") or 0) total_errors += int(payload.get("api_to_ods", {}).get("total_errors") or 0) overall_start = segments[0][0] overall_end = segments[-1][1] report = { "mode": "window", "window": { "start": overall_start.isoformat(), "end": overall_end.isoformat(), "segments": total_segments, }, "windows": window_reports, "api_to_ods": { "total_missing": total_missing, "total_errors": total_errors, }, "total_missing": total_missing, "total_errors": total_errors, "generated_at": datetime.now(tz).isoformat(), } report_path = self._write_report(report, "data_integrity_window") report["report_path"] = report_path missing_count = int(total_missing or 0) counts = { "missing": missing_count, "errors": int(total_errors or 0), } # ???? backfill_result = None if auto_backfill and missing_count > 0: backfill_result = self._run_backfill(base_start, base_end, task_codes) counts["backfilled"] = backfill_result.get("backfilled", 0) return { "status": "SUCCESS", "counts": counts, "window": { "start": overall_start, "end": overall_end, "minutes": calc_window_minutes(overall_start, overall_end), }, "report_path": report_path, "backfill_result": backfill_result, } history_start = str(self.config.get("integrity.history_start", "2025-07-01") or "2025-07-01") history_end = str(self.config.get("integrity.history_end", "") or "").strip() start_dt = dtparser.parse(history_start) if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=tz) else: start_dt = start_dt.astimezone(tz) if history_end: end_dt = dtparser.parse(history_end) if end_dt.tzinfo is None: end_dt = end_dt.replace(tzinfo=tz) else: end_dt = end_dt.astimezone(tz) else: end_dt = compute_last_etl_end(self.config) or datetime.now(tz) report = run_integrity_history( cfg=self.config, start_dt=start_dt, end_dt=end_dt, include_dimensions=include_dimensions, task_codes=task_codes, logger=self.logger, write_report=True, ) missing_count = int(report.get("total_missing") or 0) counts = { "missing": missing_count, "errors": int(report.get("total_errors") or 0), } # 自动补全 backfill_result = None if auto_backfill and missing_count > 0: backfill_result = self._run_backfill(start_dt, end_dt, task_codes) counts["backfilled"] = backfill_result.get("backfilled", 0) return { "status": "SUCCESS", "counts": counts, "window": { "start": start_dt, "end": end_dt, "minutes": int((end_dt - start_dt).total_seconds() // 60) if end_dt > start_dt else 0, }, "report_path": report.get("report_path"), "backfill_result": backfill_result, } def _write_report(self, report: dict, prefix: str) -> str: root = Path(__file__).resolve().parents[1] stamp = datetime.now(self.tz).strftime("%Y%m%d_%H%M%S") path = root / "reports" / f"{prefix}_{stamp}.json" path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") return str(path) def _run_backfill(self, start_dt: datetime, end_dt: datetime, task_codes: str) -> dict: """运行数据补全""" self.logger.info("自动补全开始 起始=%s 结束=%s", start_dt, end_dt) try: from scripts.backfill_missing_data import run_backfill result = run_backfill( cfg=self.config, start=start_dt, end=end_dt, task_codes=task_codes or None, dry_run=False, page_size=200, chunk_size=500, logger=self.logger, ) self.logger.info( "自动补全完成 已补全=%s 错误数=%s", result.get("backfilled", 0), result.get("errors", 0), ) return result except Exception as exc: self.logger.exception("自动补全失败") return {"backfilled": 0, "errors": 1, "error": str(exc)}