194 lines
8.3 KiB
Python
194 lines
8.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Run data integrity checks across API -> ODS -> DWD."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from dateutil import parser as dtparser
|
|
|
|
from config.settings import AppConfig
|
|
from quality.integrity_service import run_history_flow, run_window_flow, write_report
|
|
from utils.logging_utils import build_log_path, configure_logging
|
|
from utils.windowing import split_window
|
|
|
|
|
|
def _parse_dt(value: str, tz: ZoneInfo) -> datetime:
|
|
dt = dtparser.parse(value)
|
|
if dt.tzinfo is None:
|
|
return dt.replace(tzinfo=tz)
|
|
return dt.astimezone(tz)
|
|
|
|
|
|
def main() -> int:
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
except Exception:
|
|
pass
|
|
|
|
ap = argparse.ArgumentParser(description="Data integrity checks (API -> ODS -> DWD)")
|
|
ap.add_argument("--mode", choices=["history", "window"], default="history")
|
|
ap.add_argument(
|
|
"--flow",
|
|
choices=["verify", "update_and_verify"],
|
|
default="verify",
|
|
help="verify only or update+verify (auto backfill then optional recheck)",
|
|
)
|
|
ap.add_argument("--start", default="2025-07-01", help="history start date (default: 2025-07-01)")
|
|
ap.add_argument("--end", default="", help="history end datetime (default: last ETL end)")
|
|
ap.add_argument("--window-start", default="", help="window start datetime (mode=window)")
|
|
ap.add_argument("--window-end", default="", help="window end datetime (mode=window)")
|
|
ap.add_argument("--window-split-unit", default="", help="split unit (month/none), default from config")
|
|
ap.add_argument("--window-compensation-hours", type=int, default=None, help="window compensation hours, default from config")
|
|
ap.add_argument(
|
|
"--include-dimensions",
|
|
action="store_true",
|
|
default=None,
|
|
help="include dimension tables in ODS->DWD checks",
|
|
)
|
|
ap.add_argument(
|
|
"--no-include-dimensions",
|
|
action="store_true",
|
|
help="exclude dimension tables in ODS->DWD checks",
|
|
)
|
|
ap.add_argument("--ods-task-codes", default="", help="comma-separated ODS task codes for API checks")
|
|
ap.add_argument("--compare-content", action="store_true", help="compare API vs ODS content hash")
|
|
ap.add_argument("--no-compare-content", action="store_true", help="disable content comparison even if enabled in config")
|
|
ap.add_argument("--include-mismatch", action="store_true", help="backfill mismatch records as well")
|
|
ap.add_argument("--no-include-mismatch", action="store_true", help="disable mismatch backfill")
|
|
ap.add_argument("--recheck", action="store_true", help="re-run checks after backfill")
|
|
ap.add_argument("--no-recheck", action="store_true", help="skip recheck after backfill")
|
|
ap.add_argument("--content-sample-limit", type=int, default=None, help="max mismatch samples per table")
|
|
ap.add_argument("--out", default="", help="output JSON path")
|
|
ap.add_argument("--log-file", default="", help="log file path")
|
|
ap.add_argument("--log-dir", default="", help="log directory")
|
|
ap.add_argument("--log-level", default="INFO", help="log level")
|
|
ap.add_argument("--no-log-console", action="store_true", help="disable console logging")
|
|
args = ap.parse_args()
|
|
|
|
log_dir = Path(args.log_dir) if args.log_dir else (Path(__file__).resolve().parent / "logs")
|
|
log_file = Path(args.log_file) if args.log_file else build_log_path(log_dir, "data_integrity")
|
|
log_console = not args.no_log_console
|
|
|
|
with configure_logging(
|
|
"data_integrity",
|
|
log_file,
|
|
level=args.log_level,
|
|
console=log_console,
|
|
tee_std=True,
|
|
) as logger:
|
|
cfg = AppConfig.load({})
|
|
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei"))
|
|
report_path = Path(args.out) if args.out else None
|
|
|
|
if args.recheck and args.no_recheck:
|
|
raise SystemExit("cannot set both --recheck and --no-recheck")
|
|
if args.include_mismatch and args.no_include_mismatch:
|
|
raise SystemExit("cannot set both --include-mismatch and --no-include-mismatch")
|
|
if args.include_dimensions and args.no_include_dimensions:
|
|
raise SystemExit("cannot set both --include-dimensions and --no-include-dimensions")
|
|
|
|
compare_content = None
|
|
if args.compare_content and args.no_compare_content:
|
|
raise SystemExit("cannot set both --compare-content and --no-compare-content")
|
|
if args.compare_content:
|
|
compare_content = True
|
|
elif args.no_compare_content:
|
|
compare_content = False
|
|
|
|
include_mismatch = cfg.get("integrity.backfill_mismatch", True)
|
|
if args.include_mismatch:
|
|
include_mismatch = True
|
|
elif args.no_include_mismatch:
|
|
include_mismatch = False
|
|
|
|
recheck_after_backfill = cfg.get("integrity.recheck_after_backfill", True)
|
|
if args.recheck:
|
|
recheck_after_backfill = True
|
|
elif args.no_recheck:
|
|
recheck_after_backfill = False
|
|
|
|
include_dimensions = cfg.get("integrity.include_dimensions", True)
|
|
if args.include_dimensions:
|
|
include_dimensions = True
|
|
elif args.no_include_dimensions:
|
|
include_dimensions = False
|
|
|
|
if args.mode == "window":
|
|
if not args.window_start or not args.window_end:
|
|
raise SystemExit("window-start and window-end are required for mode=window")
|
|
start_dt = _parse_dt(args.window_start, tz)
|
|
end_dt = _parse_dt(args.window_end, tz)
|
|
split_unit = (args.window_split_unit or cfg.get("run.window_split.unit", "month") or "month").strip()
|
|
comp_hours = args.window_compensation_hours
|
|
if comp_hours is None:
|
|
comp_hours = cfg.get("run.window_split.compensation_hours", 0)
|
|
|
|
windows = split_window(
|
|
start_dt,
|
|
end_dt,
|
|
tz=tz,
|
|
split_unit=split_unit,
|
|
compensation_hours=comp_hours,
|
|
)
|
|
if not windows:
|
|
windows = [(start_dt, end_dt)]
|
|
|
|
report, counts = run_window_flow(
|
|
cfg=cfg,
|
|
windows=windows,
|
|
include_dimensions=bool(include_dimensions),
|
|
task_codes=args.ods_task_codes,
|
|
logger=logger,
|
|
compare_content=compare_content,
|
|
content_sample_limit=args.content_sample_limit,
|
|
do_backfill=args.flow == "update_and_verify",
|
|
include_mismatch=bool(include_mismatch),
|
|
recheck_after_backfill=bool(recheck_after_backfill),
|
|
page_size=int(cfg.get("api.page_size") or 200),
|
|
chunk_size=500,
|
|
)
|
|
report_path = write_report(report, prefix="data_integrity_window", tz=tz, report_path=report_path)
|
|
report["report_path"] = report_path
|
|
logger.info("REPORT_WRITTEN path=%s", report.get("report_path"))
|
|
else:
|
|
start_dt = _parse_dt(args.start, tz)
|
|
if args.end:
|
|
end_dt = _parse_dt(args.end, tz)
|
|
else:
|
|
end_dt = None
|
|
report, counts = run_history_flow(
|
|
cfg=cfg,
|
|
start_dt=start_dt,
|
|
end_dt=end_dt,
|
|
include_dimensions=bool(include_dimensions),
|
|
task_codes=args.ods_task_codes,
|
|
logger=logger,
|
|
compare_content=compare_content,
|
|
content_sample_limit=args.content_sample_limit,
|
|
do_backfill=args.flow == "update_and_verify",
|
|
include_mismatch=bool(include_mismatch),
|
|
recheck_after_backfill=bool(recheck_after_backfill),
|
|
page_size=int(cfg.get("api.page_size") or 200),
|
|
chunk_size=500,
|
|
)
|
|
report_path = write_report(report, prefix="data_integrity_history", tz=tz, report_path=report_path)
|
|
report["report_path"] = report_path
|
|
logger.info("REPORT_WRITTEN path=%s", report.get("report_path"))
|
|
logger.info(
|
|
"SUMMARY missing=%s mismatch=%s errors=%s",
|
|
counts.get("missing"),
|
|
counts.get("mismatch"),
|
|
counts.get("errors"),
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|