Files
feiqiu-ETL/etl_billiards/scripts/check_ods_gaps.py

1009 lines
36 KiB
Python

# -*- coding: utf-8 -*-
"""
Check missing ODS records by comparing API primary keys vs ODS table primary keys.
Default range:
start = 2025-07-01 00:00:00
end = now
For update runs, use --from-cutoff to derive the start time from ODS max(fetched_at),
then backtrack by --cutoff-overlap-hours.
"""
from __future__ import annotations
import argparse
import json
import logging
import time as time_mod
import sys
from datetime import datetime, time, timedelta
from pathlib import Path
from typing import Iterable, Sequence
from zoneinfo import ZoneInfo
from dateutil import parser as dtparser
from psycopg2 import InterfaceError, OperationalError
from psycopg2.extras import execute_values
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from api.client import APIClient
from config.settings import AppConfig
from database.connection import DatabaseConnection
from models.parsers import TypeParser
from tasks.ods_tasks import BaseOdsTask, ENABLED_ODS_CODES, ODS_TASK_SPECS
from utils.logging_utils import build_log_path, configure_logging
from utils.ods_record_utils import (
get_value_case_insensitive,
merge_record_layers,
normalize_pk_value,
pk_tuple_from_record,
)
from utils.windowing import split_window
DEFAULT_START = "2025-07-01"
MIN_COMPLETENESS_WINDOW_DAYS = 30
def _reconfigure_stdout_utf8() -> None:
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
def _parse_dt(value: str, tz: ZoneInfo, *, is_end: bool) -> datetime:
raw = (value or "").strip()
if not raw:
raise ValueError("empty datetime")
has_time = any(ch in raw for ch in (":", "T"))
dt = dtparser.parse(raw)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=tz)
else:
dt = dt.astimezone(tz)
if not has_time:
dt = dt.replace(hour=23 if is_end else 0, minute=59 if is_end else 0, second=59 if is_end else 0, microsecond=0)
return dt
def _iter_windows(start: datetime, end: datetime, window_size: timedelta) -> Iterable[tuple[datetime, datetime]]:
if window_size.total_seconds() <= 0:
raise ValueError("window_size must be > 0")
cur = start
while cur < end:
nxt = min(cur + window_size, end)
yield cur, nxt
cur = nxt
def _merge_record_layers(record: dict) -> dict:
return merge_record_layers(record)
def _chunked(seq: Sequence, size: int) -> Iterable[Sequence]:
if size <= 0:
size = 500
for i in range(0, len(seq), size):
yield seq[i : i + size]
def _get_table_pk_columns(conn, table: str) -> list[str]:
if "." in table:
schema, name = table.split(".", 1)
else:
schema, name = "public", table
sql = """
SELECT kcu.column_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'PRIMARY KEY'
AND tc.table_schema = %s
AND tc.table_name = %s
ORDER BY kcu.ordinal_position
"""
with conn.cursor() as cur:
cur.execute(sql, (schema, name))
cols = [r[0] for r in cur.fetchall()]
return [c for c in cols if c.lower() != "content_hash"]
def _table_has_column(conn, table: str, column: str) -> bool:
if "." in table:
schema, name = table.split(".", 1)
else:
schema, name = "public", table
sql = """
SELECT 1
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s AND column_name = %s
LIMIT 1
"""
with conn.cursor() as cur:
cur.execute(sql, (schema, name, column))
return cur.fetchone() is not None
def _fetch_existing_pk_set(conn, table: str, pk_cols: Sequence[str], pk_values: list[tuple], chunk_size: int) -> set[tuple]:
if not pk_values:
return set()
select_cols = ", ".join(f't."{c}"' for c in pk_cols)
value_cols = ", ".join(f'"{c}"' for c in pk_cols)
join_cond = " AND ".join(f't."{c}" = v."{c}"' for c in pk_cols)
sql = (
f"SELECT {select_cols} FROM {table} t "
f"JOIN (VALUES %s) AS v({value_cols}) ON {join_cond}"
)
existing: set[tuple] = set()
with conn.cursor() as cur:
for chunk in _chunked(pk_values, chunk_size):
execute_values(cur, sql, chunk, page_size=len(chunk))
for row in cur.fetchall():
existing.add(tuple(row))
return existing
def _fetch_existing_pk_hash_set(
conn, table: str, pk_cols: Sequence[str], pk_hash_values: list[tuple], chunk_size: int
) -> set[tuple]:
if not pk_hash_values:
return set()
select_cols = ", ".join([*(f't.\"{c}\"' for c in pk_cols), 't.\"content_hash\"'])
value_cols = ", ".join([*(f'\"{c}\"' for c in pk_cols), '\"content_hash\"'])
join_cond = " AND ".join([*(f't.\"{c}\" = v.\"{c}\"' for c in pk_cols), 't.\"content_hash\" = v.\"content_hash\"'])
sql = (
f"SELECT {select_cols} FROM {table} t "
f"JOIN (VALUES %s) AS v({value_cols}) ON {join_cond}"
)
existing: set[tuple] = set()
with conn.cursor() as cur:
for chunk in _chunked(pk_hash_values, chunk_size):
execute_values(cur, sql, chunk, page_size=len(chunk))
for row in cur.fetchall():
existing.add(tuple(row))
return existing
def _init_db_state(cfg: AppConfig) -> dict:
db_conn = DatabaseConnection(dsn=cfg["db"]["dsn"], session=cfg["db"].get("session"))
try:
db_conn.conn.rollback()
except Exception:
pass
db_conn.conn.autocommit = True
return {"db": db_conn, "conn": db_conn.conn}
def _reconnect_db(db_state: dict, cfg: AppConfig, logger: logging.Logger):
try:
db_state.get("db").close()
except Exception:
pass
db_state.update(_init_db_state(cfg))
logger.warning("DB connection reset/reconnected")
return db_state["conn"]
def _ensure_db_conn(db_state: dict, cfg: AppConfig, logger: logging.Logger):
conn = db_state.get("conn")
if conn is None or getattr(conn, "closed", 0):
return _reconnect_db(db_state, cfg, logger)
return conn
def _merge_common_params(cfg: AppConfig, task_code: str, base: dict) -> dict:
merged: dict = {}
common = cfg.get("api.params", {}) or {}
if isinstance(common, dict):
merged.update(common)
scoped = cfg.get(f"api.params.{task_code.lower()}", {}) or {}
if isinstance(scoped, dict):
merged.update(scoped)
merged.update(base)
return merged
def _build_params(cfg: AppConfig, spec, store_id: int, window_start: datetime | None, window_end: datetime | None) -> dict:
base: dict = {}
if spec.include_site_id:
if spec.endpoint == "/TenantGoods/GetGoodsInventoryList":
base["siteId"] = [store_id]
else:
base["siteId"] = store_id
if spec.requires_window and spec.time_fields and window_start and window_end:
start_key, end_key = spec.time_fields
base[start_key] = TypeParser.format_timestamp(window_start, ZoneInfo(cfg.get("app.timezone", "Asia/Taipei")))
base[end_key] = TypeParser.format_timestamp(window_end, ZoneInfo(cfg.get("app.timezone", "Asia/Taipei")))
base.update(spec.extra_params or {})
return _merge_common_params(cfg, spec.code, base)
def _pk_tuple_from_merged(merged: dict, pk_cols: Sequence[str]) -> tuple | None:
values = []
for col in pk_cols:
val = normalize_pk_value(get_value_case_insensitive(merged, col))
if val is None or val == "":
return None
values.append(val)
return tuple(values)
def _pk_tuple_from_record(record: dict, pk_cols: Sequence[str]) -> tuple | None:
return pk_tuple_from_record(record, pk_cols)
def _pk_tuple_from_ticket_candidate(value) -> tuple | None:
val = normalize_pk_value(value)
if val is None or val == "":
return None
return (val,)
def _format_missing_sample(pk_cols: Sequence[str], pk_tuple: tuple) -> dict:
return {col: pk_tuple[idx] for idx, col in enumerate(pk_cols)}
def _format_mismatch_sample(pk_cols: Sequence[str], pk_tuple: tuple, content_hash: str | None) -> dict:
sample = _format_missing_sample(pk_cols, pk_tuple)
if content_hash:
sample["content_hash"] = content_hash
return sample
def _check_spec(
*,
client: APIClient,
db_state: dict,
cfg: AppConfig,
tz: ZoneInfo,
logger: logging.Logger,
spec,
store_id: int,
start: datetime | None,
end: datetime | None,
windows: list[tuple[datetime, datetime]] | None,
page_size: int,
chunk_size: int,
sample_limit: int,
compare_content: bool,
content_sample_limit: int,
sleep_per_window: float,
sleep_per_page: float,
) -> dict:
result = {
"task_code": spec.code,
"table": spec.table_name,
"endpoint": spec.endpoint,
"pk_columns": [],
"records": 0,
"records_with_pk": 0,
"missing": 0,
"missing_samples": [],
"mismatch": 0,
"mismatch_samples": [],
"pages": 0,
"skipped_missing_pk": 0,
"errors": 0,
"error_detail": None,
}
db_conn = _ensure_db_conn(db_state, cfg, logger)
try:
pk_cols = _get_table_pk_columns(db_conn, spec.table_name)
except (OperationalError, InterfaceError):
db_conn = _reconnect_db(db_state, cfg, logger)
pk_cols = _get_table_pk_columns(db_conn, spec.table_name)
result["pk_columns"] = pk_cols
if not pk_cols:
result["errors"] = 1
result["error_detail"] = "no primary key columns found"
return result
try:
has_content_hash = bool(compare_content and _table_has_column(db_conn, spec.table_name, "content_hash"))
except (OperationalError, InterfaceError):
db_conn = _reconnect_db(db_state, cfg, logger)
has_content_hash = bool(compare_content and _table_has_column(db_conn, spec.table_name, "content_hash"))
result["compare_content"] = bool(compare_content)
result["content_hash_supported"] = has_content_hash
if spec.requires_window and spec.time_fields:
if not start or not end:
result["errors"] = 1
result["error_detail"] = "missing start/end for windowed endpoint"
return result
windows = list(windows or [(start, end)])
else:
windows = [(None, None)]
logger.info(
"CHECK_START task=%s table=%s windows=%s start=%s end=%s",
spec.code,
spec.table_name,
len(windows),
start.isoformat() if start else None,
end.isoformat() if end else None,
)
missing_seen: set[tuple] = set()
for window_idx, (window_start, window_end) in enumerate(windows, start=1):
window_label = (
f"{window_start.isoformat()}~{window_end.isoformat()}"
if window_start and window_end
else "FULL"
)
logger.info(
"WINDOW_START task=%s idx=%s window=%s",
spec.code,
window_idx,
window_label,
)
window_pages = 0
window_records = 0
window_missing = 0
window_skipped = 0
params = _build_params(cfg, spec, store_id, window_start, window_end)
try:
for page_no, records, _, _ in client.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
window_pages += 1
window_records += len(records)
result["pages"] += 1
result["records"] += len(records)
pk_tuples: list[tuple] = []
pk_hash_tuples: list[tuple] = []
for rec in records:
if not isinstance(rec, dict):
result["skipped_missing_pk"] += 1
window_skipped += 1
continue
merged = _merge_record_layers(rec)
pk_tuple = _pk_tuple_from_merged(merged, pk_cols)
if not pk_tuple:
result["skipped_missing_pk"] += 1
window_skipped += 1
continue
pk_tuples.append(pk_tuple)
if has_content_hash:
content_hash = BaseOdsTask._compute_content_hash(merged, include_fetched_at=False)
pk_hash_tuples.append((*pk_tuple, content_hash))
if not pk_tuples:
continue
result["records_with_pk"] += len(pk_tuples)
pk_unique = list(dict.fromkeys(pk_tuples))
try:
existing = _fetch_existing_pk_set(db_conn, spec.table_name, pk_cols, pk_unique, chunk_size)
except (OperationalError, InterfaceError):
db_conn = _reconnect_db(db_state, cfg, logger)
existing = _fetch_existing_pk_set(db_conn, spec.table_name, pk_cols, pk_unique, chunk_size)
for pk_tuple in pk_unique:
if pk_tuple in existing:
continue
if pk_tuple in missing_seen:
continue
missing_seen.add(pk_tuple)
result["missing"] += 1
window_missing += 1
if len(result["missing_samples"]) < sample_limit:
result["missing_samples"].append(_format_missing_sample(pk_cols, pk_tuple))
if has_content_hash and pk_hash_tuples:
pk_hash_unique = list(dict.fromkeys(pk_hash_tuples))
try:
existing_hash = _fetch_existing_pk_hash_set(
db_conn, spec.table_name, pk_cols, pk_hash_unique, chunk_size
)
except (OperationalError, InterfaceError):
db_conn = _reconnect_db(db_state, cfg, logger)
existing_hash = _fetch_existing_pk_hash_set(
db_conn, spec.table_name, pk_cols, pk_hash_unique, chunk_size
)
for pk_hash_tuple in pk_hash_unique:
pk_tuple = pk_hash_tuple[:-1]
if pk_tuple not in existing:
continue
if pk_hash_tuple in existing_hash:
continue
result["mismatch"] += 1
if len(result["mismatch_samples"]) < content_sample_limit:
result["mismatch_samples"].append(
_format_mismatch_sample(pk_cols, pk_tuple, pk_hash_tuple[-1])
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"PAGE task=%s idx=%s page=%s records=%s missing=%s skipped=%s",
spec.code,
window_idx,
page_no,
len(records),
window_missing,
window_skipped,
)
if sleep_per_page > 0:
time_mod.sleep(sleep_per_page)
except Exception as exc:
result["errors"] += 1
result["error_detail"] = f"{type(exc).__name__}: {exc}"
logger.exception(
"WINDOW_ERROR task=%s idx=%s window=%s error=%s",
spec.code,
window_idx,
window_label,
result["error_detail"],
)
break
logger.info(
"WINDOW_DONE task=%s idx=%s window=%s pages=%s records=%s missing=%s skipped=%s",
spec.code,
window_idx,
window_label,
window_pages,
window_records,
window_missing,
window_skipped,
)
if sleep_per_window > 0:
logger.debug(
"SLEEP_WINDOW task=%s idx=%s seconds=%.2f",
spec.code,
window_idx,
sleep_per_window,
)
time_mod.sleep(sleep_per_window)
return result
def _check_settlement_tickets(
*,
client: APIClient,
db_state: dict,
cfg: AppConfig,
tz: ZoneInfo,
logger: logging.Logger,
store_id: int,
start: datetime | None,
end: datetime | None,
windows: list[tuple[datetime, datetime]] | None,
page_size: int,
chunk_size: int,
sample_limit: int,
compare_content: bool,
content_sample_limit: int,
sleep_per_window: float,
sleep_per_page: float,
) -> dict:
table_name = "billiards_ods.settlement_ticket_details"
db_conn = _ensure_db_conn(db_state, cfg, logger)
try:
pk_cols = _get_table_pk_columns(db_conn, table_name)
except (OperationalError, InterfaceError):
db_conn = _reconnect_db(db_state, cfg, logger)
pk_cols = _get_table_pk_columns(db_conn, table_name)
result = {
"task_code": "ODS_SETTLEMENT_TICKET",
"table": table_name,
"endpoint": "/Order/GetOrderSettleTicketNew",
"pk_columns": pk_cols,
"records": 0,
"records_with_pk": 0,
"missing": 0,
"missing_samples": [],
"mismatch": 0,
"mismatch_samples": [],
"pages": 0,
"skipped_missing_pk": 0,
"errors": 0,
"error_detail": None,
"source_endpoint": "/PayLog/GetPayLogListPage",
}
if not pk_cols:
result["errors"] = 1
result["error_detail"] = "no primary key columns found"
return result
if not start or not end:
result["errors"] = 1
result["error_detail"] = "missing start/end for ticket check"
return result
missing_seen: set[tuple] = set()
pay_endpoint = "/PayLog/GetPayLogListPage"
windows = list(windows or [(start, end)])
logger.info(
"CHECK_START task=%s table=%s windows=%s start=%s end=%s",
result["task_code"],
table_name,
len(windows),
start.isoformat() if start else None,
end.isoformat() if end else None,
)
for window_idx, (window_start, window_end) in enumerate(windows, start=1):
window_label = f"{window_start.isoformat()}~{window_end.isoformat()}"
logger.info(
"WINDOW_START task=%s idx=%s window=%s",
result["task_code"],
window_idx,
window_label,
)
window_pages = 0
window_records = 0
window_missing = 0
window_skipped = 0
base = {
"siteId": store_id,
"StartPayTime": TypeParser.format_timestamp(window_start, tz),
"EndPayTime": TypeParser.format_timestamp(window_end, tz),
}
params = _merge_common_params(cfg, "ODS_PAYMENT", base)
try:
for page_no, records, _, _ in client.iter_paginated(
endpoint=pay_endpoint,
params=params,
page_size=page_size,
data_path=("data",),
list_key=None,
):
window_pages += 1
window_records += len(records)
result["pages"] += 1
result["records"] += len(records)
pk_tuples: list[tuple] = []
for rec in records:
if not isinstance(rec, dict):
result["skipped_missing_pk"] += 1
window_skipped += 1
continue
relate_id = TypeParser.parse_int(
(rec or {}).get("relateId")
or (rec or {}).get("orderSettleId")
or (rec or {}).get("order_settle_id")
)
pk_tuple = _pk_tuple_from_ticket_candidate(relate_id)
if not pk_tuple:
result["skipped_missing_pk"] += 1
window_skipped += 1
continue
pk_tuples.append(pk_tuple)
if not pk_tuples:
continue
result["records_with_pk"] += len(pk_tuples)
pk_unique = list(dict.fromkeys(pk_tuples))
try:
existing = _fetch_existing_pk_set(db_conn, table_name, pk_cols, pk_unique, chunk_size)
except (OperationalError, InterfaceError):
db_conn = _reconnect_db(db_state, cfg, logger)
existing = _fetch_existing_pk_set(db_conn, table_name, pk_cols, pk_unique, chunk_size)
for pk_tuple in pk_unique:
if pk_tuple in existing:
continue
if pk_tuple in missing_seen:
continue
missing_seen.add(pk_tuple)
result["missing"] += 1
window_missing += 1
if len(result["missing_samples"]) < sample_limit:
result["missing_samples"].append(_format_missing_sample(pk_cols, pk_tuple))
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
"PAGE task=%s idx=%s page=%s records=%s missing=%s skipped=%s",
result["task_code"],
window_idx,
page_no,
len(records),
window_missing,
window_skipped,
)
if sleep_per_page > 0:
time_mod.sleep(sleep_per_page)
except Exception as exc:
result["errors"] += 1
result["error_detail"] = f"{type(exc).__name__}: {exc}"
logger.exception(
"WINDOW_ERROR task=%s idx=%s window=%s error=%s",
result["task_code"],
window_idx,
window_label,
result["error_detail"],
)
break
logger.info(
"WINDOW_DONE task=%s idx=%s window=%s pages=%s records=%s missing=%s skipped=%s",
result["task_code"],
window_idx,
window_label,
window_pages,
window_records,
window_missing,
window_skipped,
)
if sleep_per_window > 0:
logger.debug(
"SLEEP_WINDOW task=%s idx=%s seconds=%.2f",
result["task_code"],
window_idx,
sleep_per_window,
)
time_mod.sleep(sleep_per_window)
return result
def _compute_ods_cutoff(conn, ods_tables: Sequence[str]) -> datetime | None:
values: list[datetime] = []
with conn.cursor() as cur:
for table in ods_tables:
try:
cur.execute(f"SELECT MAX(fetched_at) FROM {table}")
row = cur.fetchone()
if row and row[0]:
values.append(row[0])
except Exception:
continue
if not values:
return None
return min(values)
def _resolve_window_from_cutoff(
*,
conn,
ods_tables: Sequence[str],
tz: ZoneInfo,
overlap_hours: int,
) -> tuple[datetime, datetime, datetime | None]:
cutoff = _compute_ods_cutoff(conn, ods_tables)
now = datetime.now(tz)
if cutoff is None:
start = now - timedelta(hours=max(1, overlap_hours))
return start, now, None
if cutoff.tzinfo is None:
cutoff = cutoff.replace(tzinfo=tz)
else:
cutoff = cutoff.astimezone(tz)
start = cutoff - timedelta(hours=max(0, overlap_hours))
return start, now, cutoff
def run_gap_check(
*,
cfg: AppConfig | None,
start: datetime | str | None,
end: datetime | str | None,
window_days: int,
window_hours: int,
page_size: int,
chunk_size: int,
sample_limit: int,
sleep_per_window: float,
sleep_per_page: float,
task_codes: str,
from_cutoff: bool,
cutoff_overlap_hours: int,
allow_small_window: bool,
logger: logging.Logger,
compare_content: bool = False,
content_sample_limit: int | None = None,
window_split_unit: str | None = None,
window_compensation_hours: int | None = None,
) -> dict:
cfg = cfg or AppConfig.load({})
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei"))
store_id = int(cfg.get("app.store_id") or 0)
if not cfg.get("api.token"):
raise ValueError("missing api.token; please set API_TOKEN in .env")
window_days = int(window_days)
window_hours = int(window_hours)
split_unit = (window_split_unit or cfg.get("run.window_split.unit", "month") or "month").strip()
comp_hours = window_compensation_hours
if comp_hours is None:
comp_hours = cfg.get("run.window_split.compensation_hours", 0)
use_split = split_unit.lower() not in ("", "none", "off", "false", "0")
if not use_split and not from_cutoff and not allow_small_window:
min_hours = MIN_COMPLETENESS_WINDOW_DAYS * 24
if window_hours > 0:
if window_hours < min_hours:
logger.warning(
"window_hours=%s too small for completeness check; adjust to %s",
window_hours,
min_hours,
)
window_hours = min_hours
elif window_days < MIN_COMPLETENESS_WINDOW_DAYS:
logger.warning(
"window_days=%s too small for completeness check; adjust to %s",
window_days,
MIN_COMPLETENESS_WINDOW_DAYS,
)
window_days = MIN_COMPLETENESS_WINDOW_DAYS
cutoff = None
if from_cutoff:
db_tmp = DatabaseConnection(dsn=cfg["db"]["dsn"], session=cfg["db"].get("session"))
ods_tables = [s.table_name for s in ODS_TASK_SPECS if s.code in ENABLED_ODS_CODES]
start, end, cutoff = _resolve_window_from_cutoff(
conn=db_tmp.conn,
ods_tables=ods_tables,
tz=tz,
overlap_hours=cutoff_overlap_hours,
)
db_tmp.close()
else:
if not start:
start = DEFAULT_START
if not end:
end = datetime.now(tz)
if isinstance(start, str):
start = _parse_dt(start, tz, is_end=False)
if isinstance(end, str):
end = _parse_dt(end, tz, is_end=True)
windows = None
if use_split:
windows = split_window(
start,
end,
tz=tz,
split_unit=split_unit,
compensation_hours=comp_hours,
)
else:
adjusted = split_window(
start,
end,
tz=tz,
split_unit="none",
compensation_hours=comp_hours,
)
if adjusted:
start, end = adjusted[0]
window_size = timedelta(hours=window_hours) if window_hours > 0 else timedelta(days=window_days)
windows = list(_iter_windows(start, end, window_size))
if windows:
start, end = windows[0][0], windows[-1][1]
if content_sample_limit is None:
content_sample_limit = sample_limit
logger.info(
"START range=%s~%s window_days=%s window_hours=%s split_unit=%s comp_hours=%s page_size=%s chunk_size=%s",
start.isoformat() if isinstance(start, datetime) else None,
end.isoformat() if isinstance(end, datetime) else None,
window_days,
window_hours,
split_unit,
comp_hours,
page_size,
chunk_size,
)
if cutoff:
logger.info("CUTOFF=%s overlap_hours=%s", cutoff.isoformat(), cutoff_overlap_hours)
client = APIClient(
base_url=cfg["api"]["base_url"],
token=cfg["api"]["token"],
timeout=int(cfg["api"].get("timeout_sec") or 20),
retry_max=int(cfg["api"].get("retries", {}).get("max_attempts") or 3),
headers_extra=cfg["api"].get("headers_extra") or {},
)
db_state = _init_db_state(cfg)
try:
task_filter = {t.strip().upper() for t in (task_codes or "").split(",") if t.strip()}
specs = [s for s in ODS_TASK_SPECS if s.code in ENABLED_ODS_CODES]
if task_filter:
specs = [s for s in specs if s.code in task_filter]
results: list[dict] = []
for spec in specs:
if spec.code == "ODS_SETTLEMENT_TICKET":
continue
result = _check_spec(
client=client,
db_state=db_state,
cfg=cfg,
tz=tz,
logger=logger,
spec=spec,
store_id=store_id,
start=start,
end=end,
windows=windows,
page_size=page_size,
chunk_size=chunk_size,
sample_limit=sample_limit,
compare_content=compare_content,
content_sample_limit=content_sample_limit,
sleep_per_window=sleep_per_window,
sleep_per_page=sleep_per_page,
)
results.append(result)
logger.info(
"CHECK_DONE task=%s missing=%s records=%s errors=%s",
result.get("task_code"),
result.get("missing"),
result.get("records"),
result.get("errors"),
)
if (not task_filter) or ("ODS_SETTLEMENT_TICKET" in task_filter):
ticket_result = _check_settlement_tickets(
client=client,
db_state=db_state,
cfg=cfg,
tz=tz,
logger=logger,
store_id=store_id,
start=start,
end=end,
windows=windows,
page_size=page_size,
chunk_size=chunk_size,
sample_limit=sample_limit,
compare_content=compare_content,
content_sample_limit=content_sample_limit,
sleep_per_window=sleep_per_window,
sleep_per_page=sleep_per_page,
)
results.append(ticket_result)
logger.info(
"CHECK_DONE task=%s missing=%s records=%s errors=%s",
ticket_result.get("task_code"),
ticket_result.get("missing"),
ticket_result.get("records"),
ticket_result.get("errors"),
)
total_missing = sum(int(r.get("missing") or 0) for r in results)
total_mismatch = sum(int(r.get("mismatch") or 0) for r in results)
total_errors = sum(int(r.get("errors") or 0) for r in results)
payload = {
"window_split_unit": split_unit,
"window_compensation_hours": comp_hours,
"start": start.isoformat() if isinstance(start, datetime) else None,
"end": end.isoformat() if isinstance(end, datetime) else None,
"cutoff": cutoff.isoformat() if cutoff else None,
"window_days": window_days,
"window_hours": window_hours,
"page_size": page_size,
"chunk_size": chunk_size,
"sample_limit": sample_limit,
"compare_content": compare_content,
"content_sample_limit": content_sample_limit,
"store_id": store_id,
"base_url": cfg.get("api.base_url"),
"results": results,
"total_missing": total_missing,
"total_mismatch": total_mismatch,
"total_errors": total_errors,
"generated_at": datetime.now(tz).isoformat(),
}
return payload
finally:
try:
db_state.get("db").close()
except Exception:
pass
def main() -> int:
_reconfigure_stdout_utf8()
ap = argparse.ArgumentParser(description="Check missing ODS records by comparing API vs ODS PKs.")
ap.add_argument("--start", default=DEFAULT_START, help="start datetime (default: 2025-07-01)")
ap.add_argument("--end", default="", help="end datetime (default: now)")
ap.add_argument("--window-days", type=int, default=1, help="days per API window (default: 1)")
ap.add_argument("--window-hours", type=int, default=0, help="hours per API window (default: 0)")
ap.add_argument("--window-split-unit", default="", help="split unit (month/none), default from config")
ap.add_argument("--window-compensation-hours", type=int, default=None, help="window compensation hours, default from config")
ap.add_argument("--page-size", type=int, default=200, help="API page size (default: 200)")
ap.add_argument("--chunk-size", type=int, default=500, help="DB query chunk size (default: 500)")
ap.add_argument("--sample-limit", type=int, default=50, help="max missing PK samples per table")
ap.add_argument("--compare-content", action="store_true", help="compare record content hash (mismatch detection)")
ap.add_argument(
"--content-sample-limit",
type=int,
default=None,
help="max mismatch samples per table (default: same as --sample-limit)",
)
ap.add_argument("--sleep-per-window-seconds", type=float, default=0, help="sleep seconds after each window")
ap.add_argument("--sleep-per-page-seconds", type=float, default=0, help="sleep seconds after each page")
ap.add_argument("--task-codes", default="", help="comma-separated task codes to check (optional)")
ap.add_argument("--out", default="", help="output JSON path (optional)")
ap.add_argument("--tag", default="", help="tag suffix for output filename")
ap.add_argument("--from-cutoff", action="store_true", help="derive start from ODS cutoff")
ap.add_argument(
"--cutoff-overlap-hours",
type=int,
default=24,
help="overlap hours when using --from-cutoff (default: 24)",
)
ap.add_argument(
"--allow-small-window",
action="store_true",
help="allow windows smaller than default completeness guard",
)
ap.add_argument("--log-file", default="", help="log file path (default: logs/check_ods_gaps_YYYYMMDD_HHMMSS.log)")
ap.add_argument("--log-dir", default="", help="log directory (default: logs)")
ap.add_argument("--log-level", default="INFO", help="log level (default: INFO)")
ap.add_argument("--no-log-console", action="store_true", help="disable console logging")
args = ap.parse_args()
log_dir = Path(args.log_dir) if args.log_dir else (PROJECT_ROOT / "logs")
log_file = Path(args.log_file) if args.log_file else build_log_path(log_dir, "check_ods_gaps", args.tag)
log_console = not args.no_log_console
with configure_logging(
"ods_gap_check",
log_file,
level=args.log_level,
console=log_console,
tee_std=True,
) as logger:
cfg = AppConfig.load({})
payload = run_gap_check(
cfg=cfg,
start=args.start,
end=args.end,
window_days=args.window_days,
window_hours=args.window_hours,
page_size=args.page_size,
chunk_size=args.chunk_size,
sample_limit=args.sample_limit,
sleep_per_window=args.sleep_per_window_seconds,
sleep_per_page=args.sleep_per_page_seconds,
task_codes=args.task_codes,
from_cutoff=args.from_cutoff,
cutoff_overlap_hours=args.cutoff_overlap_hours,
allow_small_window=args.allow_small_window,
logger=logger,
compare_content=args.compare_content,
content_sample_limit=args.content_sample_limit,
window_split_unit=args.window_split_unit or None,
window_compensation_hours=args.window_compensation_hours,
)
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei"))
if args.out:
out_path = Path(args.out)
else:
tag = f"_{args.tag}" if args.tag else ""
stamp = datetime.now(tz).strftime("%Y%m%d_%H%M%S")
out_path = PROJECT_ROOT / "reports" / f"ods_gap_check{tag}_{stamp}.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
logger.info("REPORT_WRITTEN path=%s", out_path)
logger.info(
"SUMMARY missing=%s mismatch=%s errors=%s",
payload.get("total_missing"),
payload.get("total_mismatch"),
payload.get("total_errors"),
)
return 0
if __name__ == "__main__":
raise SystemExit(main())