170 lines
6.8 KiB
Python
170 lines
6.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Run data integrity checks across API -> ODS -> DWD."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from dateutil import parser as dtparser
|
|
|
|
from config.settings import AppConfig
|
|
from quality.integrity_checker import (
|
|
IntegrityWindow,
|
|
compute_last_etl_end,
|
|
run_integrity_history,
|
|
run_integrity_window,
|
|
)
|
|
from utils.logging_utils import build_log_path, configure_logging
|
|
from utils.windowing import split_window
|
|
|
|
|
|
def _parse_dt(value: str, tz: ZoneInfo) -> datetime:
|
|
dt = dtparser.parse(value)
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=tz)
|
|
return dt.astimezone(tz)
|
|
|
|
|
|
def main() -> int:
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
ap = argparse.ArgumentParser(description="Data integrity checks (API -> ODS -> DWD)")
|
|
ap.add_argument("--mode", choices=["history", "window"], default="history")
|
|
ap.add_argument("--start", default="2025-07-01", help="history start date (default: 2025-07-01)")
|
|
ap.add_argument("--end", default="", help="history end datetime (default: last ETL end)")
|
|
ap.add_argument("--window-start", default="", help="window start datetime (mode=window)")
|
|
ap.add_argument("--window-end", default="", help="window end datetime (mode=window)")
|
|
ap.add_argument("--window-split-unit", default="", help="split unit (month/none), default from config")
|
|
ap.add_argument("--window-compensation-hours", type=int, default=None, help="window compensation hours, default from config")
|
|
ap.add_argument("--include-dimensions", action="store_true", help="include dimension tables in ODS->DWD checks")
|
|
ap.add_argument("--ods-task-codes", default="", help="comma-separated ODS task codes for API checks")
|
|
ap.add_argument("--out", default="", help="output JSON path")
|
|
ap.add_argument("--log-file", default="", help="log file path")
|
|
ap.add_argument("--log-dir", default="", help="log directory")
|
|
ap.add_argument("--log-level", default="INFO", help="log level")
|
|
ap.add_argument("--no-log-console", action="store_true", help="disable console logging")
|
|
args = ap.parse_args()
|
|
|
|
log_dir = Path(args.log_dir) if args.log_dir else (Path(__file__).resolve().parent / "logs")
|
|
log_file = Path(args.log_file) if args.log_file else build_log_path(log_dir, "data_integrity")
|
|
log_console = not args.no_log_console
|
|
|
|
with configure_logging(
|
|
"data_integrity",
|
|
log_file,
|
|
level=args.log_level,
|
|
console=log_console,
|
|
tee_std=True,
|
|
) as logger:
|
|
cfg = AppConfig.load({})
|
|
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei"))
|
|
report_path = Path(args.out) if args.out else None
|
|
|
|
if args.mode == "window":
|
|
if not args.window_start or not args.window_end:
|
|
raise SystemExit("window-start and window-end are required for mode=window")
|
|
start_dt = _parse_dt(args.window_start, tz)
|
|
end_dt = _parse_dt(args.window_end, tz)
|
|
split_unit = (args.window_split_unit or cfg.get("run.window_split.unit", "month") or "month").strip()
|
|
comp_hours = args.window_compensation_hours
|
|
if comp_hours is None:
|
|
comp_hours = cfg.get("run.window_split.compensation_hours", 0)
|
|
|
|
windows = split_window(
|
|
start_dt,
|
|
end_dt,
|
|
tz=tz,
|
|
split_unit=split_unit,
|
|
compensation_hours=comp_hours,
|
|
)
|
|
if not windows:
|
|
windows = [(start_dt, end_dt)]
|
|
|
|
window_reports = []
|
|
total_missing = 0
|
|
total_errors = 0
|
|
for idx, (seg_start, seg_end) in enumerate(windows, start=1):
|
|
window = IntegrityWindow(
|
|
start=seg_start,
|
|
end=seg_end,
|
|
label=f"segment_{idx}",
|
|
granularity="window",
|
|
)
|
|
payload = run_integrity_window(
|
|
cfg=cfg,
|
|
window=window,
|
|
include_dimensions=args.include_dimensions,
|
|
task_codes=args.ods_task_codes,
|
|
logger=logger,
|
|
write_report=False,
|
|
report_path=None,
|
|
window_split_unit="none",
|
|
window_compensation_hours=0,
|
|
)
|
|
window_reports.append(payload)
|
|
total_missing += int(payload.get("api_to_ods", {}).get("total_missing") or 0)
|
|
total_errors += int(payload.get("api_to_ods", {}).get("total_errors") or 0)
|
|
|
|
overall_start = windows[0][0]
|
|
overall_end = windows[-1][1]
|
|
report = {
|
|
"mode": "window",
|
|
"window": {
|
|
"start": overall_start.isoformat(),
|
|
"end": overall_end.isoformat(),
|
|
"segments": len(windows),
|
|
},
|
|
"windows": window_reports,
|
|
"api_to_ods": {
|
|
"total_missing": total_missing,
|
|
"total_errors": total_errors,
|
|
},
|
|
"total_missing": total_missing,
|
|
"total_errors": total_errors,
|
|
"generated_at": datetime.now(tz).isoformat(),
|
|
}
|
|
if report_path is None:
|
|
root = Path(__file__).resolve().parents[1]
|
|
stamp = datetime.now(tz).strftime("%Y%m%d_%H%M%S")
|
|
report_path = root / "reports" / f"data_integrity_window_{stamp}.json"
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
report_path.write_text(json.dumps(report, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
|
report["report_path"] = str(report_path)
|
|
logger.info("REPORT_WRITTEN path=%s", report.get("report_path"))
|
|
else:
|
|
start_dt = _parse_dt(args.start, tz)
|
|
if args.end:
|
|
end_dt = _parse_dt(args.end, tz)
|
|
else:
|
|
end_dt = compute_last_etl_end(cfg) or datetime.now(tz)
|
|
report = run_integrity_history(
|
|
cfg=cfg,
|
|
start_dt=start_dt,
|
|
end_dt=end_dt,
|
|
include_dimensions=args.include_dimensions,
|
|
task_codes=args.ods_task_codes,
|
|
logger=logger,
|
|
write_report=True,
|
|
report_path=report_path,
|
|
)
|
|
logger.info("REPORT_WRITTEN path=%s", report.get("report_path"))
|
|
logger.info(
|
|
"SUMMARY missing=%s errors=%s",
|
|
report.get("total_missing"),
|
|
report.get("total_errors"),
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|