# -*- coding: utf-8 -*- """ Check missing ODS records by comparing API primary keys vs ODS table primary keys. Default range: start = 2025-07-01 00:00:00 end = now For update runs, use --from-cutoff to derive the start time from ODS max(fetched_at), then backtrack by --cutoff-overlap-hours. """ from __future__ import annotations import argparse import json import logging import time as time_mod import sys from datetime import datetime, time, timedelta from pathlib import Path from typing import Iterable, Sequence from zoneinfo import ZoneInfo from dateutil import parser as dtparser from psycopg2.extras import execute_values PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from api.client import APIClient from config.settings import AppConfig from database.connection import DatabaseConnection from models.parsers import TypeParser from tasks.ods_tasks import ENABLED_ODS_CODES, ODS_TASK_SPECS from utils.logging_utils import build_log_path, configure_logging from utils.windowing import split_window DEFAULT_START = "2025-07-01" MIN_COMPLETENESS_WINDOW_DAYS = 30 def _reconfigure_stdout_utf8() -> None: if hasattr(sys.stdout, "reconfigure"): try: sys.stdout.reconfigure(encoding="utf-8") except Exception: pass def _parse_dt(value: str, tz: ZoneInfo, *, is_end: bool) -> datetime: raw = (value or "").strip() if not raw: raise ValueError("empty datetime") has_time = any(ch in raw for ch in (":", "T")) dt = dtparser.parse(raw) if dt.tzinfo is None: dt = dt.replace(tzinfo=tz) else: dt = dt.astimezone(tz) if not has_time: dt = dt.replace(hour=23 if is_end else 0, minute=59 if is_end else 0, second=59 if is_end else 0, microsecond=0) return dt def _iter_windows(start: datetime, end: datetime, window_size: timedelta) -> Iterable[tuple[datetime, datetime]]: if window_size.total_seconds() <= 0: raise ValueError("window_size must be > 0") cur = start while cur < end: nxt = min(cur + window_size, end) yield cur, nxt cur = nxt def _merge_record_layers(record: dict) -> dict: merged = record data_part = merged.get("data") while isinstance(data_part, dict): merged = {**data_part, **merged} data_part = data_part.get("data") settle_inner = merged.get("settleList") if isinstance(settle_inner, dict): merged = {**settle_inner, **merged} return merged def _get_value_case_insensitive(record: dict | None, col: str | None): if record is None or col is None: return None if col in record: return record.get(col) col_lower = col.lower() for k, v in record.items(): if isinstance(k, str) and k.lower() == col_lower: return v return None def _normalize_pk_value(value): if value is None: return None if isinstance(value, str) and value.isdigit(): try: return int(value) except Exception: return value return value def _chunked(seq: Sequence, size: int) -> Iterable[Sequence]: if size <= 0: size = 500 for i in range(0, len(seq), size): yield seq[i : i + size] def _get_table_pk_columns(conn, table: str) -> list[str]: if "." in table: schema, name = table.split(".", 1) else: schema, name = "public", table sql = """ SELECT kcu.column_name FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema WHERE tc.constraint_type = 'PRIMARY KEY' AND tc.table_schema = %s AND tc.table_name = %s ORDER BY kcu.ordinal_position """ with conn.cursor() as cur: cur.execute(sql, (schema, name)) return [r[0] for r in cur.fetchall()] def _fetch_existing_pk_set(conn, table: str, pk_cols: Sequence[str], pk_values: list[tuple], chunk_size: int) -> set[tuple]: if not pk_values: return set() select_cols = ", ".join(f't."{c}"' for c in pk_cols) value_cols = ", ".join(f'"{c}"' for c in pk_cols) join_cond = " AND ".join(f't."{c}" = v."{c}"' for c in pk_cols) sql = ( f"SELECT {select_cols} FROM {table} t " f"JOIN (VALUES %s) AS v({value_cols}) ON {join_cond}" ) existing: set[tuple] = set() with conn.cursor() as cur: for chunk in _chunked(pk_values, chunk_size): execute_values(cur, sql, chunk, page_size=len(chunk)) for row in cur.fetchall(): existing.add(tuple(row)) return existing def _merge_common_params(cfg: AppConfig, task_code: str, base: dict) -> dict: merged: dict = {} common = cfg.get("api.params", {}) or {} if isinstance(common, dict): merged.update(common) scoped = cfg.get(f"api.params.{task_code.lower()}", {}) or {} if isinstance(scoped, dict): merged.update(scoped) merged.update(base) return merged def _build_params(cfg: AppConfig, spec, store_id: int, window_start: datetime | None, window_end: datetime | None) -> dict: base: dict = {} if spec.include_site_id: if spec.endpoint == "/TenantGoods/GetGoodsInventoryList": base["siteId"] = [store_id] else: base["siteId"] = store_id if spec.requires_window and spec.time_fields and window_start and window_end: start_key, end_key = spec.time_fields base[start_key] = TypeParser.format_timestamp(window_start, ZoneInfo(cfg.get("app.timezone", "Asia/Taipei"))) base[end_key] = TypeParser.format_timestamp(window_end, ZoneInfo(cfg.get("app.timezone", "Asia/Taipei"))) base.update(spec.extra_params or {}) return _merge_common_params(cfg, spec.code, base) def _pk_tuple_from_record(record: dict, pk_cols: Sequence[str]) -> tuple | None: merged = _merge_record_layers(record) values = [] for col in pk_cols: val = _normalize_pk_value(_get_value_case_insensitive(merged, col)) if val is None or val == "": return None values.append(val) return tuple(values) def _pk_tuple_from_ticket_candidate(value) -> tuple | None: val = _normalize_pk_value(value) if val is None or val == "": return None return (val,) def _format_missing_sample(pk_cols: Sequence[str], pk_tuple: tuple) -> dict: return {col: pk_tuple[idx] for idx, col in enumerate(pk_cols)} def _check_spec( *, client: APIClient, db_conn, cfg: AppConfig, tz: ZoneInfo, logger: logging.Logger, spec, store_id: int, start: datetime | None, end: datetime | None, windows: list[tuple[datetime, datetime]] | None, page_size: int, chunk_size: int, sample_limit: int, sleep_per_window: float, sleep_per_page: float, ) -> dict: result = { "task_code": spec.code, "table": spec.table_name, "endpoint": spec.endpoint, "pk_columns": [], "records": 0, "records_with_pk": 0, "missing": 0, "missing_samples": [], "pages": 0, "skipped_missing_pk": 0, "errors": 0, "error_detail": None, } pk_cols = _get_table_pk_columns(db_conn, spec.table_name) result["pk_columns"] = pk_cols if not pk_cols: result["errors"] = 1 result["error_detail"] = "no primary key columns found" return result if spec.requires_window and spec.time_fields: if not start or not end: result["errors"] = 1 result["error_detail"] = "missing start/end for windowed endpoint" return result windows = list(windows or [(start, end)]) else: windows = [(None, None)] logger.info( "CHECK_START task=%s table=%s windows=%s start=%s end=%s", spec.code, spec.table_name, len(windows), start.isoformat() if start else None, end.isoformat() if end else None, ) missing_seen: set[tuple] = set() for window_idx, (window_start, window_end) in enumerate(windows, start=1): window_label = ( f"{window_start.isoformat()}~{window_end.isoformat()}" if window_start and window_end else "FULL" ) logger.info( "WINDOW_START task=%s idx=%s window=%s", spec.code, window_idx, window_label, ) window_pages = 0 window_records = 0 window_missing = 0 window_skipped = 0 params = _build_params(cfg, spec, store_id, window_start, window_end) try: for page_no, records, _, _ in client.iter_paginated( endpoint=spec.endpoint, params=params, page_size=page_size, data_path=spec.data_path, list_key=spec.list_key, ): window_pages += 1 window_records += len(records) result["pages"] += 1 result["records"] += len(records) pk_tuples: list[tuple] = [] for rec in records: if not isinstance(rec, dict): result["skipped_missing_pk"] += 1 window_skipped += 1 continue pk_tuple = _pk_tuple_from_record(rec, pk_cols) if not pk_tuple: result["skipped_missing_pk"] += 1 window_skipped += 1 continue pk_tuples.append(pk_tuple) if not pk_tuples: continue result["records_with_pk"] += len(pk_tuples) pk_unique = list(dict.fromkeys(pk_tuples)) existing = _fetch_existing_pk_set(db_conn, spec.table_name, pk_cols, pk_unique, chunk_size) for pk_tuple in pk_unique: if pk_tuple in existing: continue if pk_tuple in missing_seen: continue missing_seen.add(pk_tuple) result["missing"] += 1 window_missing += 1 if len(result["missing_samples"]) < sample_limit: result["missing_samples"].append(_format_missing_sample(pk_cols, pk_tuple)) if logger.isEnabledFor(logging.DEBUG): logger.debug( "PAGE task=%s idx=%s page=%s records=%s missing=%s skipped=%s", spec.code, window_idx, page_no, len(records), window_missing, window_skipped, ) if sleep_per_page > 0: time_mod.sleep(sleep_per_page) except Exception as exc: result["errors"] += 1 result["error_detail"] = f"{type(exc).__name__}: {exc}" logger.exception( "WINDOW_ERROR task=%s idx=%s window=%s error=%s", spec.code, window_idx, window_label, result["error_detail"], ) break logger.info( "WINDOW_DONE task=%s idx=%s window=%s pages=%s records=%s missing=%s skipped=%s", spec.code, window_idx, window_label, window_pages, window_records, window_missing, window_skipped, ) if sleep_per_window > 0: logger.debug( "SLEEP_WINDOW task=%s idx=%s seconds=%.2f", spec.code, window_idx, sleep_per_window, ) time_mod.sleep(sleep_per_window) return result def _check_settlement_tickets( *, client: APIClient, db_conn, cfg: AppConfig, tz: ZoneInfo, logger: logging.Logger, store_id: int, start: datetime | None, end: datetime | None, windows: list[tuple[datetime, datetime]] | None, page_size: int, chunk_size: int, sample_limit: int, sleep_per_window: float, sleep_per_page: float, ) -> dict: table_name = "billiards_ods.settlement_ticket_details" pk_cols = _get_table_pk_columns(db_conn, table_name) result = { "task_code": "ODS_SETTLEMENT_TICKET", "table": table_name, "endpoint": "/Order/GetOrderSettleTicketNew", "pk_columns": pk_cols, "records": 0, "records_with_pk": 0, "missing": 0, "missing_samples": [], "pages": 0, "skipped_missing_pk": 0, "errors": 0, "error_detail": None, "source_endpoint": "/PayLog/GetPayLogListPage", } if not pk_cols: result["errors"] = 1 result["error_detail"] = "no primary key columns found" return result if not start or not end: result["errors"] = 1 result["error_detail"] = "missing start/end for ticket check" return result missing_seen: set[tuple] = set() pay_endpoint = "/PayLog/GetPayLogListPage" windows = list(windows or [(start, end)]) logger.info( "CHECK_START task=%s table=%s windows=%s start=%s end=%s", result["task_code"], table_name, len(windows), start.isoformat() if start else None, end.isoformat() if end else None, ) for window_idx, (window_start, window_end) in enumerate(windows, start=1): window_label = f"{window_start.isoformat()}~{window_end.isoformat()}" logger.info( "WINDOW_START task=%s idx=%s window=%s", result["task_code"], window_idx, window_label, ) window_pages = 0 window_records = 0 window_missing = 0 window_skipped = 0 base = { "siteId": store_id, "StartPayTime": TypeParser.format_timestamp(window_start, tz), "EndPayTime": TypeParser.format_timestamp(window_end, tz), } params = _merge_common_params(cfg, "ODS_PAYMENT", base) try: for page_no, records, _, _ in client.iter_paginated( endpoint=pay_endpoint, params=params, page_size=page_size, data_path=("data",), list_key=None, ): window_pages += 1 window_records += len(records) result["pages"] += 1 result["records"] += len(records) pk_tuples: list[tuple] = [] for rec in records: if not isinstance(rec, dict): result["skipped_missing_pk"] += 1 window_skipped += 1 continue relate_id = TypeParser.parse_int( (rec or {}).get("relateId") or (rec or {}).get("orderSettleId") or (rec or {}).get("order_settle_id") ) pk_tuple = _pk_tuple_from_ticket_candidate(relate_id) if not pk_tuple: result["skipped_missing_pk"] += 1 window_skipped += 1 continue pk_tuples.append(pk_tuple) if not pk_tuples: continue result["records_with_pk"] += len(pk_tuples) pk_unique = list(dict.fromkeys(pk_tuples)) existing = _fetch_existing_pk_set(db_conn, table_name, pk_cols, pk_unique, chunk_size) for pk_tuple in pk_unique: if pk_tuple in existing: continue if pk_tuple in missing_seen: continue missing_seen.add(pk_tuple) result["missing"] += 1 window_missing += 1 if len(result["missing_samples"]) < sample_limit: result["missing_samples"].append(_format_missing_sample(pk_cols, pk_tuple)) if logger.isEnabledFor(logging.DEBUG): logger.debug( "PAGE task=%s idx=%s page=%s records=%s missing=%s skipped=%s", result["task_code"], window_idx, page_no, len(records), window_missing, window_skipped, ) if sleep_per_page > 0: time_mod.sleep(sleep_per_page) except Exception as exc: result["errors"] += 1 result["error_detail"] = f"{type(exc).__name__}: {exc}" logger.exception( "WINDOW_ERROR task=%s idx=%s window=%s error=%s", result["task_code"], window_idx, window_label, result["error_detail"], ) break logger.info( "WINDOW_DONE task=%s idx=%s window=%s pages=%s records=%s missing=%s skipped=%s", result["task_code"], window_idx, window_label, window_pages, window_records, window_missing, window_skipped, ) if sleep_per_window > 0: logger.debug( "SLEEP_WINDOW task=%s idx=%s seconds=%.2f", result["task_code"], window_idx, sleep_per_window, ) time_mod.sleep(sleep_per_window) return result def _compute_ods_cutoff(conn, ods_tables: Sequence[str]) -> datetime | None: values: list[datetime] = [] with conn.cursor() as cur: for table in ods_tables: try: cur.execute(f"SELECT MAX(fetched_at) FROM {table}") row = cur.fetchone() if row and row[0]: values.append(row[0]) except Exception: continue if not values: return None return min(values) def _resolve_window_from_cutoff( *, conn, ods_tables: Sequence[str], tz: ZoneInfo, overlap_hours: int, ) -> tuple[datetime, datetime, datetime | None]: cutoff = _compute_ods_cutoff(conn, ods_tables) now = datetime.now(tz) if cutoff is None: start = now - timedelta(hours=max(1, overlap_hours)) return start, now, None if cutoff.tzinfo is None: cutoff = cutoff.replace(tzinfo=tz) else: cutoff = cutoff.astimezone(tz) start = cutoff - timedelta(hours=max(0, overlap_hours)) return start, now, cutoff def run_gap_check( *, cfg: AppConfig | None, start: datetime | str | None, end: datetime | str | None, window_days: int, window_hours: int, page_size: int, chunk_size: int, sample_limit: int, sleep_per_window: float, sleep_per_page: float, task_codes: str, from_cutoff: bool, cutoff_overlap_hours: int, allow_small_window: bool, logger: logging.Logger, window_split_unit: str | None = None, window_compensation_hours: int | None = None, ) -> dict: cfg = cfg or AppConfig.load({}) tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei")) store_id = int(cfg.get("app.store_id") or 0) if not cfg.get("api.token"): raise ValueError("missing api.token; please set API_TOKEN in .env") window_days = int(window_days) window_hours = int(window_hours) split_unit = (window_split_unit or cfg.get("run.window_split.unit", "month") or "month").strip() comp_hours = window_compensation_hours if comp_hours is None: comp_hours = cfg.get("run.window_split.compensation_hours", 0) use_split = split_unit.lower() not in ("", "none", "off", "false", "0") if not use_split and not from_cutoff and not allow_small_window: min_hours = MIN_COMPLETENESS_WINDOW_DAYS * 24 if window_hours > 0: if window_hours < min_hours: logger.warning( "window_hours=%s too small for completeness check; adjust to %s", window_hours, min_hours, ) window_hours = min_hours elif window_days < MIN_COMPLETENESS_WINDOW_DAYS: logger.warning( "window_days=%s too small for completeness check; adjust to %s", window_days, MIN_COMPLETENESS_WINDOW_DAYS, ) window_days = MIN_COMPLETENESS_WINDOW_DAYS cutoff = None if from_cutoff: db_tmp = DatabaseConnection(dsn=cfg["db"]["dsn"], session=cfg["db"].get("session")) ods_tables = [s.table_name for s in ODS_TASK_SPECS if s.code in ENABLED_ODS_CODES] start, end, cutoff = _resolve_window_from_cutoff( conn=db_tmp.conn, ods_tables=ods_tables, tz=tz, overlap_hours=cutoff_overlap_hours, ) db_tmp.close() else: if not start: start = DEFAULT_START if not end: end = datetime.now(tz) if isinstance(start, str): start = _parse_dt(start, tz, is_end=False) if isinstance(end, str): end = _parse_dt(end, tz, is_end=True) windows = None if use_split: windows = split_window( start, end, tz=tz, split_unit=split_unit, compensation_hours=comp_hours, ) else: adjusted = split_window( start, end, tz=tz, split_unit="none", compensation_hours=comp_hours, ) if adjusted: start, end = adjusted[0] window_size = timedelta(hours=window_hours) if window_hours > 0 else timedelta(days=window_days) windows = list(_iter_windows(start, end, window_size)) if windows: start, end = windows[0][0], windows[-1][1] logger.info( "START range=%s~%s window_days=%s window_hours=%s split_unit=%s comp_hours=%s page_size=%s chunk_size=%s", start.isoformat() if isinstance(start, datetime) else None, end.isoformat() if isinstance(end, datetime) else None, window_days, window_hours, split_unit, comp_hours, page_size, chunk_size, ) if cutoff: logger.info("CUTOFF=%s overlap_hours=%s", cutoff.isoformat(), cutoff_overlap_hours) client = APIClient( base_url=cfg["api"]["base_url"], token=cfg["api"]["token"], timeout=int(cfg["api"].get("timeout_sec") or 20), retry_max=int(cfg["api"].get("retries", {}).get("max_attempts") or 3), headers_extra=cfg["api"].get("headers_extra") or {}, ) db_conn = DatabaseConnection(dsn=cfg["db"]["dsn"], session=cfg["db"].get("session")) try: db_conn.conn.rollback() except Exception: pass db_conn.conn.autocommit = True try: task_filter = {t.strip().upper() for t in (task_codes or "").split(",") if t.strip()} specs = [s for s in ODS_TASK_SPECS if s.code in ENABLED_ODS_CODES] if task_filter: specs = [s for s in specs if s.code in task_filter] results: list[dict] = [] for spec in specs: if spec.code == "ODS_SETTLEMENT_TICKET": continue result = _check_spec( client=client, db_conn=db_conn.conn, cfg=cfg, tz=tz, logger=logger, spec=spec, store_id=store_id, start=start, end=end, windows=windows, page_size=page_size, chunk_size=chunk_size, sample_limit=sample_limit, sleep_per_window=sleep_per_window, sleep_per_page=sleep_per_page, ) results.append(result) logger.info( "CHECK_DONE task=%s missing=%s records=%s errors=%s", result.get("task_code"), result.get("missing"), result.get("records"), result.get("errors"), ) if (not task_filter) or ("ODS_SETTLEMENT_TICKET" in task_filter): ticket_result = _check_settlement_tickets( client=client, db_conn=db_conn.conn, cfg=cfg, tz=tz, logger=logger, store_id=store_id, start=start, end=end, windows=windows, page_size=page_size, chunk_size=chunk_size, sample_limit=sample_limit, sleep_per_window=sleep_per_window, sleep_per_page=sleep_per_page, ) results.append(ticket_result) logger.info( "CHECK_DONE task=%s missing=%s records=%s errors=%s", ticket_result.get("task_code"), ticket_result.get("missing"), ticket_result.get("records"), ticket_result.get("errors"), ) total_missing = sum(int(r.get("missing") or 0) for r in results) total_errors = sum(int(r.get("errors") or 0) for r in results) payload = { "window_split_unit": split_unit, "window_compensation_hours": comp_hours, "start": start.isoformat() if isinstance(start, datetime) else None, "end": end.isoformat() if isinstance(end, datetime) else None, "cutoff": cutoff.isoformat() if cutoff else None, "window_days": window_days, "window_hours": window_hours, "page_size": page_size, "chunk_size": chunk_size, "sample_limit": sample_limit, "store_id": store_id, "base_url": cfg.get("api.base_url"), "results": results, "total_missing": total_missing, "total_errors": total_errors, "generated_at": datetime.now(tz).isoformat(), } return payload finally: db_conn.close() def main() -> int: _reconfigure_stdout_utf8() ap = argparse.ArgumentParser(description="Check missing ODS records by comparing API vs ODS PKs.") ap.add_argument("--start", default=DEFAULT_START, help="start datetime (default: 2025-07-01)") ap.add_argument("--end", default="", help="end datetime (default: now)") ap.add_argument("--window-days", type=int, default=1, help="days per API window (default: 1)") ap.add_argument("--window-hours", type=int, default=0, help="hours per API window (default: 0)") ap.add_argument("--window-split-unit", default="", help="split unit (month/none), default from config") ap.add_argument("--window-compensation-hours", type=int, default=None, help="window compensation hours, default from config") ap.add_argument("--page-size", type=int, default=200, help="API page size (default: 200)") ap.add_argument("--chunk-size", type=int, default=500, help="DB query chunk size (default: 500)") ap.add_argument("--sample-limit", type=int, default=50, help="max missing PK samples per table") ap.add_argument("--sleep-per-window-seconds", type=float, default=0, help="sleep seconds after each window") ap.add_argument("--sleep-per-page-seconds", type=float, default=0, help="sleep seconds after each page") ap.add_argument("--task-codes", default="", help="comma-separated task codes to check (optional)") ap.add_argument("--out", default="", help="output JSON path (optional)") ap.add_argument("--tag", default="", help="tag suffix for output filename") ap.add_argument("--from-cutoff", action="store_true", help="derive start from ODS cutoff") ap.add_argument( "--cutoff-overlap-hours", type=int, default=24, help="overlap hours when using --from-cutoff (default: 24)", ) ap.add_argument( "--allow-small-window", action="store_true", help="allow windows smaller than default completeness guard", ) ap.add_argument("--log-file", default="", help="log file path (default: logs/check_ods_gaps_YYYYMMDD_HHMMSS.log)") ap.add_argument("--log-dir", default="", help="log directory (default: logs)") ap.add_argument("--log-level", default="INFO", help="log level (default: INFO)") ap.add_argument("--no-log-console", action="store_true", help="disable console logging") args = ap.parse_args() log_dir = Path(args.log_dir) if args.log_dir else (PROJECT_ROOT / "logs") log_file = Path(args.log_file) if args.log_file else build_log_path(log_dir, "check_ods_gaps", args.tag) log_console = not args.no_log_console with configure_logging( "ods_gap_check", log_file, level=args.log_level, console=log_console, tee_std=True, ) as logger: cfg = AppConfig.load({}) payload = run_gap_check( cfg=cfg, start=args.start, end=args.end, window_days=args.window_days, window_hours=args.window_hours, page_size=args.page_size, chunk_size=args.chunk_size, sample_limit=args.sample_limit, sleep_per_window=args.sleep_per_window_seconds, sleep_per_page=args.sleep_per_page_seconds, task_codes=args.task_codes, from_cutoff=args.from_cutoff, cutoff_overlap_hours=args.cutoff_overlap_hours, allow_small_window=args.allow_small_window, logger=logger, window_split_unit=args.window_split_unit or None, window_compensation_hours=args.window_compensation_hours, ) tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei")) if args.out: out_path = Path(args.out) else: tag = f"_{args.tag}" if args.tag else "" stamp = datetime.now(tz).strftime("%Y%m%d_%H%M%S") out_path = PROJECT_ROOT / "reports" / f"ods_gap_check{tag}_{stamp}.json" out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") logger.info("REPORT_WRITTEN path=%s", out_path) logger.info( "SUMMARY missing=%s errors=%s", payload.get("total_missing"), payload.get("total_errors"), ) return 0 if __name__ == "__main__": raise SystemExit(main())