213 lines
8.1 KiB
Python
213 lines
8.1 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""Data integrity task that checks API -> ODS -> DWD completeness."""
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from dateutil import parser as dtparser
|
||
|
||
import json
|
||
from pathlib import Path
|
||
from utils.windowing import build_window_segments, calc_window_minutes
|
||
from .base_task import BaseTask
|
||
from quality.integrity_checker import (
|
||
IntegrityWindow,
|
||
compute_last_etl_end,
|
||
run_integrity_history,
|
||
run_integrity_window,
|
||
)
|
||
|
||
|
||
class DataIntegrityTask(BaseTask):
|
||
"""Check data completeness across API -> ODS -> DWD."""
|
||
|
||
def get_task_code(self) -> str:
|
||
return "DATA_INTEGRITY_CHECK"
|
||
|
||
def execute(self, cursor_data: dict | None = None) -> dict:
|
||
tz = ZoneInfo(self.config.get("app.timezone", "Asia/Taipei"))
|
||
mode = str(self.config.get("integrity.mode", "history") or "history").lower()
|
||
include_dimensions = bool(self.config.get("integrity.include_dimensions", False))
|
||
task_codes = str(self.config.get("integrity.ods_task_codes", "") or "").strip()
|
||
auto_backfill = bool(self.config.get("integrity.auto_backfill", False))
|
||
|
||
# 检测是否通过 CLI 传入了时间窗口参数(window_override)
|
||
# 如果有,自动切换到 window 模式
|
||
window_override_start = self.config.get("run.window_override.start")
|
||
window_override_end = self.config.get("run.window_override.end")
|
||
if window_override_start or window_override_end:
|
||
self.logger.info(
|
||
"检测到 CLI 时间窗口参数,自动切换到 window 模式: %s ~ %s",
|
||
window_override_start, window_override_end
|
||
)
|
||
mode = "window"
|
||
|
||
if mode == "window":
|
||
base_start, base_end, _ = self._get_time_window(cursor_data)
|
||
segments = build_window_segments(
|
||
self.config,
|
||
base_start,
|
||
base_end,
|
||
tz=tz,
|
||
override_only=True,
|
||
)
|
||
if not segments:
|
||
segments = [(base_start, base_end)]
|
||
|
||
total_segments = len(segments)
|
||
if total_segments > 1:
|
||
self.logger.info("数据完整性检查: 分段执行 共%s段", total_segments)
|
||
|
||
window_reports = []
|
||
total_missing = 0
|
||
total_errors = 0
|
||
for idx, (seg_start, seg_end) in enumerate(segments, start=1):
|
||
window = IntegrityWindow(
|
||
start=seg_start,
|
||
end=seg_end,
|
||
label=f"segment_{idx}",
|
||
granularity="window",
|
||
)
|
||
payload = run_integrity_window(
|
||
cfg=self.config,
|
||
window=window,
|
||
include_dimensions=include_dimensions,
|
||
task_codes=task_codes,
|
||
logger=self.logger,
|
||
write_report=False,
|
||
window_split_unit="none",
|
||
window_compensation_hours=0,
|
||
)
|
||
window_reports.append(payload)
|
||
total_missing += int(payload.get("api_to_ods", {}).get("total_missing") or 0)
|
||
total_errors += int(payload.get("api_to_ods", {}).get("total_errors") or 0)
|
||
|
||
overall_start = segments[0][0]
|
||
overall_end = segments[-1][1]
|
||
report = {
|
||
"mode": "window",
|
||
"window": {
|
||
"start": overall_start.isoformat(),
|
||
"end": overall_end.isoformat(),
|
||
"segments": total_segments,
|
||
},
|
||
"windows": window_reports,
|
||
"api_to_ods": {
|
||
"total_missing": total_missing,
|
||
"total_errors": total_errors,
|
||
},
|
||
"total_missing": total_missing,
|
||
"total_errors": total_errors,
|
||
"generated_at": datetime.now(tz).isoformat(),
|
||
}
|
||
report_path = self._write_report(report, "data_integrity_window")
|
||
report["report_path"] = report_path
|
||
|
||
missing_count = int(total_missing or 0)
|
||
counts = {
|
||
"missing": missing_count,
|
||
"errors": int(total_errors or 0),
|
||
}
|
||
|
||
# ????
|
||
backfill_result = None
|
||
if auto_backfill and missing_count > 0:
|
||
backfill_result = self._run_backfill(base_start, base_end, task_codes)
|
||
counts["backfilled"] = backfill_result.get("backfilled", 0)
|
||
|
||
return {
|
||
"status": "SUCCESS",
|
||
"counts": counts,
|
||
"window": {
|
||
"start": overall_start,
|
||
"end": overall_end,
|
||
"minutes": calc_window_minutes(overall_start, overall_end),
|
||
},
|
||
"report_path": report_path,
|
||
"backfill_result": backfill_result,
|
||
}
|
||
|
||
history_start = str(self.config.get("integrity.history_start", "2025-07-01") or "2025-07-01")
|
||
history_end = str(self.config.get("integrity.history_end", "") or "").strip()
|
||
start_dt = dtparser.parse(history_start)
|
||
if start_dt.tzinfo is None:
|
||
start_dt = start_dt.replace(tzinfo=tz)
|
||
else:
|
||
start_dt = start_dt.astimezone(tz)
|
||
|
||
if history_end:
|
||
end_dt = dtparser.parse(history_end)
|
||
if end_dt.tzinfo is None:
|
||
end_dt = end_dt.replace(tzinfo=tz)
|
||
else:
|
||
end_dt = end_dt.astimezone(tz)
|
||
else:
|
||
end_dt = compute_last_etl_end(self.config) or datetime.now(tz)
|
||
|
||
report = run_integrity_history(
|
||
cfg=self.config,
|
||
start_dt=start_dt,
|
||
end_dt=end_dt,
|
||
include_dimensions=include_dimensions,
|
||
task_codes=task_codes,
|
||
logger=self.logger,
|
||
write_report=True,
|
||
)
|
||
missing_count = int(report.get("total_missing") or 0)
|
||
counts = {
|
||
"missing": missing_count,
|
||
"errors": int(report.get("total_errors") or 0),
|
||
}
|
||
|
||
# 自动补全
|
||
backfill_result = None
|
||
if auto_backfill and missing_count > 0:
|
||
backfill_result = self._run_backfill(start_dt, end_dt, task_codes)
|
||
counts["backfilled"] = backfill_result.get("backfilled", 0)
|
||
|
||
return {
|
||
"status": "SUCCESS",
|
||
"counts": counts,
|
||
"window": {
|
||
"start": start_dt,
|
||
"end": end_dt,
|
||
"minutes": int((end_dt - start_dt).total_seconds() // 60) if end_dt > start_dt else 0,
|
||
},
|
||
"report_path": report.get("report_path"),
|
||
"backfill_result": backfill_result,
|
||
}
|
||
|
||
def _write_report(self, report: dict, prefix: str) -> str:
|
||
root = Path(__file__).resolve().parents[1]
|
||
stamp = datetime.now(self.tz).strftime("%Y%m%d_%H%M%S")
|
||
path = root / "reports" / f"{prefix}_{stamp}.json"
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
||
return str(path)
|
||
|
||
def _run_backfill(self, start_dt: datetime, end_dt: datetime, task_codes: str) -> dict:
|
||
"""运行数据补全"""
|
||
self.logger.info("自动补全开始 起始=%s 结束=%s", start_dt, end_dt)
|
||
try:
|
||
from scripts.backfill_missing_data import run_backfill
|
||
result = run_backfill(
|
||
cfg=self.config,
|
||
start=start_dt,
|
||
end=end_dt,
|
||
task_codes=task_codes or None,
|
||
dry_run=False,
|
||
page_size=200,
|
||
chunk_size=500,
|
||
logger=self.logger,
|
||
)
|
||
self.logger.info(
|
||
"自动补全完成 已补全=%s 错误数=%s",
|
||
result.get("backfilled", 0),
|
||
result.get("errors", 0),
|
||
)
|
||
return result
|
||
except Exception as exc:
|
||
self.logger.exception("自动补全失败")
|
||
return {"backfilled": 0, "errors": 1, "error": str(exc)}
|