154 lines
6.1 KiB
Python
154 lines
6.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Data integrity task that checks API -> ODS -> DWD completeness."""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from dateutil import parser as dtparser
|
|
|
|
from utils.windowing import build_window_segments, calc_window_minutes
|
|
from .base_task import BaseTask
|
|
from quality.integrity_service import run_history_flow, run_window_flow, write_report
|
|
|
|
|
|
class DataIntegrityTask(BaseTask):
|
|
"""Check data completeness across API -> ODS -> DWD."""
|
|
|
|
def get_task_code(self) -> str:
|
|
return "DATA_INTEGRITY_CHECK"
|
|
|
|
def execute(self, cursor_data: dict | None = None) -> dict:
|
|
tz = ZoneInfo(self.config.get("app.timezone", "Asia/Taipei"))
|
|
mode = str(self.config.get("integrity.mode", "history") or "history").lower()
|
|
include_dimensions = bool(self.config.get("integrity.include_dimensions", False))
|
|
task_codes = str(self.config.get("integrity.ods_task_codes", "") or "").strip()
|
|
auto_backfill = bool(self.config.get("integrity.auto_backfill", False))
|
|
compare_content = self.config.get("integrity.compare_content")
|
|
if compare_content is None:
|
|
compare_content = True
|
|
content_sample_limit = self.config.get("integrity.content_sample_limit")
|
|
backfill_mismatch = self.config.get("integrity.backfill_mismatch")
|
|
if backfill_mismatch is None:
|
|
backfill_mismatch = True
|
|
recheck_after_backfill = self.config.get("integrity.recheck_after_backfill")
|
|
if recheck_after_backfill is None:
|
|
recheck_after_backfill = True
|
|
|
|
# Switch to window mode when CLI override is provided.
|
|
window_override_start = self.config.get("run.window_override.start")
|
|
window_override_end = self.config.get("run.window_override.end")
|
|
if window_override_start or window_override_end:
|
|
self.logger.info(
|
|
"Detected CLI window override. Switching to window mode: %s ~ %s",
|
|
window_override_start,
|
|
window_override_end,
|
|
)
|
|
mode = "window"
|
|
|
|
if mode == "window":
|
|
base_start, base_end, _ = self._get_time_window(cursor_data)
|
|
segments = build_window_segments(
|
|
self.config,
|
|
base_start,
|
|
base_end,
|
|
tz=tz,
|
|
override_only=True,
|
|
)
|
|
if not segments:
|
|
segments = [(base_start, base_end)]
|
|
|
|
total_segments = len(segments)
|
|
if total_segments > 1:
|
|
self.logger.info("Data integrity check split into %s segments.", total_segments)
|
|
|
|
report, counts = run_window_flow(
|
|
cfg=self.config,
|
|
windows=segments,
|
|
include_dimensions=include_dimensions,
|
|
task_codes=task_codes,
|
|
logger=self.logger,
|
|
compare_content=bool(compare_content),
|
|
content_sample_limit=content_sample_limit,
|
|
do_backfill=bool(auto_backfill),
|
|
include_mismatch=bool(backfill_mismatch),
|
|
recheck_after_backfill=bool(recheck_after_backfill),
|
|
page_size=int(self.config.get("api.page_size") or 200),
|
|
chunk_size=500,
|
|
)
|
|
|
|
overall_start = segments[0][0]
|
|
overall_end = segments[-1][1]
|
|
report_path = write_report(report, prefix="data_integrity_window", tz=tz)
|
|
report["report_path"] = report_path
|
|
|
|
return {
|
|
"status": "SUCCESS",
|
|
"counts": counts,
|
|
"window": {
|
|
"start": overall_start,
|
|
"end": overall_end,
|
|
"minutes": calc_window_minutes(overall_start, overall_end),
|
|
},
|
|
"report_path": report_path,
|
|
"backfill_result": report.get("backfill_result"),
|
|
}
|
|
|
|
history_start = str(self.config.get("integrity.history_start", "2025-07-01") or "2025-07-01")
|
|
history_end = str(self.config.get("integrity.history_end", "") or "").strip()
|
|
start_dt = dtparser.parse(history_start)
|
|
if start_dt.tzinfo is None:
|
|
start_dt = start_dt.replace(tzinfo=tz)
|
|
else:
|
|
start_dt = start_dt.astimezone(tz)
|
|
|
|
end_dt = None
|
|
if history_end:
|
|
end_dt = dtparser.parse(history_end)
|
|
if end_dt.tzinfo is None:
|
|
end_dt = end_dt.replace(tzinfo=tz)
|
|
else:
|
|
end_dt = end_dt.astimezone(tz)
|
|
|
|
report, counts = run_history_flow(
|
|
cfg=self.config,
|
|
start_dt=start_dt,
|
|
end_dt=end_dt,
|
|
include_dimensions=include_dimensions,
|
|
task_codes=task_codes,
|
|
logger=self.logger,
|
|
compare_content=bool(compare_content),
|
|
content_sample_limit=content_sample_limit,
|
|
do_backfill=bool(auto_backfill),
|
|
include_mismatch=bool(backfill_mismatch),
|
|
recheck_after_backfill=bool(recheck_after_backfill),
|
|
page_size=int(self.config.get("api.page_size") or 200),
|
|
chunk_size=500,
|
|
)
|
|
report_path = write_report(report, prefix="data_integrity_history", tz=tz)
|
|
report["report_path"] = report_path
|
|
|
|
end_dt_used = end_dt
|
|
if end_dt_used is None:
|
|
end_str = report.get("end")
|
|
if end_str:
|
|
parsed = dtparser.parse(end_str)
|
|
if parsed.tzinfo is None:
|
|
end_dt_used = parsed.replace(tzinfo=tz)
|
|
else:
|
|
end_dt_used = parsed.astimezone(tz)
|
|
if end_dt_used is None:
|
|
end_dt_used = start_dt
|
|
|
|
return {
|
|
"status": "SUCCESS",
|
|
"counts": counts,
|
|
"window": {
|
|
"start": start_dt,
|
|
"end": end_dt_used,
|
|
"minutes": int((end_dt_used - start_dt).total_seconds() // 60) if end_dt_used > start_dt else 0,
|
|
},
|
|
"report_path": report_path,
|
|
"backfill_result": report.get("backfill_result"),
|
|
}
|