Files
feiqiu-ETL/etl_billiards/run_update.py
2026-01-27 22:45:50 +08:00

518 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
一键增量更新脚本ODS -> DWD -> DWS
用法:
cd etl_billiards
python run_update.py
"""
from __future__ import annotations
import argparse
import logging
import multiprocessing as mp
import subprocess
import sys
import time as time_mod
from datetime import date, datetime, time, timedelta
from pathlib import Path
from zoneinfo import ZoneInfo
from api.client import APIClient
from config.settings import AppConfig
from database.connection import DatabaseConnection
from database.operations import DatabaseOperations
from orchestration.scheduler import ETLScheduler
from tasks.check_cutoff_task import CheckCutoffTask
from tasks.dwd_load_task import DwdLoadTask
from tasks.ods_tasks import ENABLED_ODS_CODES
from utils.logging_utils import build_log_path, configure_logging
STEP_TIMEOUT_SEC = 120
def _coerce_date(s: str) -> date:
s = (s or "").strip()
if not s:
raise ValueError("empty date")
if len(s) >= 10:
s = s[:10]
return date.fromisoformat(s)
def _compute_dws_window(
*,
cfg: AppConfig,
tz: ZoneInfo,
rebuild_days: int,
bootstrap_days: int,
dws_start: date | None,
dws_end: date | None,
) -> tuple[datetime, datetime]:
if dws_start and dws_end and dws_end < dws_start:
raise ValueError("dws_end must be >= dws_start")
store_id = int(cfg.get("app.store_id"))
dsn = cfg["db"]["dsn"]
session = cfg["db"].get("session")
conn = DatabaseConnection(dsn=dsn, session=session)
try:
if dws_start is None:
row = conn.query(
"SELECT MAX(order_date) AS mx FROM billiards_dws.dws_order_summary WHERE site_id=%s",
(store_id,),
)
mx = (row[0] or {}).get("mx") if row else None
if isinstance(mx, date):
dws_start = mx - timedelta(days=max(0, int(rebuild_days)))
else:
dws_start = (datetime.now(tz).date()) - timedelta(days=max(1, int(bootstrap_days)))
if dws_end is None:
dws_end = datetime.now(tz).date()
finally:
conn.close()
start_dt = datetime.combine(dws_start, time.min).replace(tzinfo=tz)
# end_dt 取到当天 23:59:59避免只跑到“当前时刻”的 date() 导致少一天
end_dt = datetime.combine(dws_end, time.max).replace(tzinfo=tz)
return start_dt, end_dt
def _run_check_cutoff(cfg: AppConfig, logger: logging.Logger):
dsn = cfg["db"]["dsn"]
session = cfg["db"].get("session")
db_conn = DatabaseConnection(dsn=dsn, session=session)
db_ops = DatabaseOperations(db_conn)
api = APIClient(
base_url=cfg["api"]["base_url"],
token=cfg["api"]["token"],
timeout=cfg["api"]["timeout_sec"],
retry_max=cfg["api"]["retries"]["max_attempts"],
headers_extra=cfg["api"].get("headers_extra"),
)
try:
CheckCutoffTask(cfg, db_ops, api, logger).execute(None)
finally:
db_conn.close()
def _iter_daily_windows(window_start: datetime, window_end: datetime) -> list[tuple[datetime, datetime]]:
if window_start > window_end:
return []
tz = window_start.tzinfo
windows: list[tuple[datetime, datetime]] = []
cur = window_start
while cur <= window_end:
day_start = datetime.combine(cur.date(), time.min).replace(tzinfo=tz)
day_end = datetime.combine(cur.date(), time.max).replace(tzinfo=tz)
if day_start < window_start:
day_start = window_start
if day_end > window_end:
day_end = window_end
windows.append((day_start, day_end))
next_day = cur.date() + timedelta(days=1)
cur = datetime.combine(next_day, time.min).replace(tzinfo=tz)
return windows
def _run_step_worker(result_queue: "mp.Queue[dict[str, str]]", step: dict[str, str]) -> None:
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
log_file = step.get("log_file") or ""
log_level = step.get("log_level") or "INFO"
log_console = bool(step.get("log_console", True))
log_path = Path(log_file) if log_file else None
with configure_logging(
"etl_update",
log_path,
level=log_level,
console=log_console,
tee_std=True,
) as logger:
cfg_base = AppConfig.load({})
step_type = step.get("type", "")
try:
if step_type == "check_cutoff":
_run_check_cutoff(cfg_base, logger)
elif step_type == "ods_task":
task_code = step["task_code"]
overlap_seconds = int(step.get("overlap_seconds", 0))
cfg_ods = AppConfig.load(
{
"pipeline": {"flow": "FULL"},
"run": {"tasks": [task_code], "overlap_seconds": overlap_seconds},
}
)
scheduler = ETLScheduler(cfg_ods, logger)
try:
scheduler.run_tasks([task_code])
finally:
scheduler.close()
elif step_type == "init_dws_schema":
overlap_seconds = int(step.get("overlap_seconds", 0))
cfg_dwd = AppConfig.load(
{
"pipeline": {"flow": "INGEST_ONLY"},
"run": {"tasks": ["INIT_DWS_SCHEMA"], "overlap_seconds": overlap_seconds},
}
)
scheduler = ETLScheduler(cfg_dwd, logger)
try:
scheduler.run_tasks(["INIT_DWS_SCHEMA"])
finally:
scheduler.close()
elif step_type == "dwd_table":
dwd_table = step["dwd_table"]
overlap_seconds = int(step.get("overlap_seconds", 0))
cfg_dwd = AppConfig.load(
{
"pipeline": {"flow": "INGEST_ONLY"},
"run": {"tasks": ["DWD_LOAD_FROM_ODS"], "overlap_seconds": overlap_seconds},
"dwd": {"only_tables": [dwd_table]},
}
)
scheduler = ETLScheduler(cfg_dwd, logger)
try:
scheduler.run_tasks(["DWD_LOAD_FROM_ODS"])
finally:
scheduler.close()
elif step_type == "dws_window":
overlap_seconds = int(step.get("overlap_seconds", 0))
window_start = step["window_start"]
window_end = step["window_end"]
cfg_dws = AppConfig.load(
{
"pipeline": {"flow": "INGEST_ONLY"},
"run": {
"tasks": ["DWS_BUILD_ORDER_SUMMARY"],
"overlap_seconds": overlap_seconds,
"window_override": {"start": window_start, "end": window_end},
},
}
)
scheduler = ETLScheduler(cfg_dws, logger)
try:
scheduler.run_tasks(["DWS_BUILD_ORDER_SUMMARY"])
finally:
scheduler.close()
elif step_type == "ods_gap_check":
overlap_hours = int(step.get("overlap_hours", 24))
window_days = int(step.get("window_days", 1))
window_hours = int(step.get("window_hours", 0))
page_size = int(step.get("page_size", 0) or 0)
sleep_per_window = float(step.get("sleep_per_window", 0) or 0)
sleep_per_page = float(step.get("sleep_per_page", 0) or 0)
tag = step.get("tag", "run_update")
task_codes = (step.get("task_codes") or "").strip()
script_dir = Path(__file__).resolve().parent
script_path = script_dir / "scripts" / "check_ods_gaps.py"
cmd = [
sys.executable,
str(script_path),
"--from-cutoff",
"--cutoff-overlap-hours",
str(overlap_hours),
"--window-days",
str(window_days),
"--tag",
str(tag),
]
if window_hours > 0:
cmd += ["--window-hours", str(window_hours)]
if page_size > 0:
cmd += ["--page-size", str(page_size)]
if sleep_per_window > 0:
cmd += ["--sleep-per-window-seconds", str(sleep_per_window)]
if sleep_per_page > 0:
cmd += ["--sleep-per-page-seconds", str(sleep_per_page)]
if task_codes:
cmd += ["--task-codes", task_codes]
subprocess.run(cmd, check=True, cwd=str(script_dir))
else:
raise ValueError(f"Unknown step type: {step_type}")
result_queue.put({"status": "ok"})
except Exception as exc:
result_queue.put({"status": "error", "error": str(exc)})
def _run_step_with_timeout(
step: dict[str, str], logger: logging.Logger, timeout_sec: int
) -> dict[str, object]:
start = time_mod.monotonic()
step_timeout = timeout_sec
if step.get("timeout_sec"):
try:
step_timeout = int(step.get("timeout_sec"))
except Exception:
step_timeout = timeout_sec
ctx = mp.get_context("spawn")
result_queue: mp.Queue = ctx.Queue()
proc = ctx.Process(target=_run_step_worker, args=(result_queue, step))
proc.start()
proc.join(timeout=step_timeout)
elapsed = time_mod.monotonic() - start
if proc.is_alive():
logger.error(
"STEP_TIMEOUT name=%s elapsed=%.2fs limit=%ss", step["name"], elapsed, step_timeout
)
proc.terminate()
proc.join(10)
return {"name": step["name"], "status": "timeout", "elapsed": elapsed}
result: dict[str, object] = {"name": step["name"], "status": "error", "elapsed": elapsed}
try:
payload = result_queue.get_nowait()
except Exception:
payload = {}
if payload:
result.update(payload)
if result.get("status") == "ok":
logger.info("STEP_OK name=%s elapsed=%.2fs", step["name"], elapsed)
else:
logger.error(
"STEP_FAIL name=%s elapsed=%.2fs error=%s",
step["name"],
elapsed,
result.get("error"),
)
return result
def main() -> int:
if hasattr(sys.stdout, "reconfigure"):
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
parser = argparse.ArgumentParser(description="One-click ETL update (ODS -> DWD -> DWS)")
parser.add_argument("--overlap-seconds", type=int, default=3600, help="overlap seconds (default: 3600)")
parser.add_argument(
"--dws-rebuild-days",
type=int,
default=1,
help="DWS 回算冗余天数default: 1",
)
parser.add_argument(
"--dws-bootstrap-days",
type=int,
default=30,
help="DWS 首次/空表时回算天数default: 30",
)
parser.add_argument("--dws-start", type=str, default="", help="DWS 回算开始日期 YYYY-MM-DD可选")
parser.add_argument("--dws-end", type=str, default="", help="DWS 回算结束日期 YYYY-MM-DD可选")
parser.add_argument(
"--skip-cutoff",
action="store_true",
help="跳过 CHECK_CUTOFF默认会在开始/结束各跑一次)",
)
parser.add_argument(
"--skip-ods",
action="store_true",
help="跳过 ODS 在线抓取(仅跑 DWD/DWS",
)
parser.add_argument(
"--ods-tasks",
type=str,
default="",
help="指定要跑的 ODS 任务(逗号分隔),默认跑全部 ENABLED_ODS_CODES",
)
parser.add_argument(
"--check-ods-gaps",
action="store_true",
help="run ODS gap check after ODS load (default: off)",
)
parser.add_argument(
"--check-ods-overlap-hours",
type=int,
default=24,
help="gap check overlap hours from cutoff (default: 24)",
)
parser.add_argument(
"--check-ods-window-days",
type=int,
default=1,
help="gap check window days (default: 1)",
)
parser.add_argument(
"--check-ods-window-hours",
type=int,
default=0,
help="gap check window hours (default: 0)",
)
parser.add_argument(
"--check-ods-page-size",
type=int,
default=200,
help="gap check API page size (default: 200)",
)
parser.add_argument(
"--check-ods-timeout-sec",
type=int,
default=1800,
help="gap check timeout seconds (default: 1800)",
)
parser.add_argument(
"--check-ods-task-codes",
type=str,
default="",
help="gap check task codes (comma-separated, optional)",
)
parser.add_argument(
"--check-ods-sleep-per-window-seconds",
type=float,
default=0,
help="gap check sleep seconds after each window (default: 0)",
)
parser.add_argument(
"--check-ods-sleep-per-page-seconds",
type=float,
default=0,
help="gap check sleep seconds after each page (default: 0)",
)
parser.add_argument("--log-file", type=str, default="", help="log file path (default: logs/run_update_YYYYMMDD_HHMMSS.log)")
parser.add_argument("--log-dir", type=str, default="", help="log directory (default: logs)")
parser.add_argument("--log-level", type=str, default="INFO", help="log level (default: INFO)")
parser.add_argument("--no-log-console", action="store_true", help="disable console logging")
args = parser.parse_args()
log_dir = Path(args.log_dir) if args.log_dir else (Path(__file__).resolve().parent / "logs")
log_file = Path(args.log_file) if args.log_file else build_log_path(log_dir, "run_update")
log_console = not args.no_log_console
with configure_logging(
"etl_update",
log_file,
level=args.log_level,
console=log_console,
tee_std=True,
) as logger:
cfg_base = AppConfig.load({})
tz = ZoneInfo(cfg_base.get("app.timezone", "Asia/Taipei"))
dws_start = _coerce_date(args.dws_start) if args.dws_start else None
dws_end = _coerce_date(args.dws_end) if args.dws_end else None
steps: list[dict[str, str]] = []
if not args.skip_cutoff:
steps.append({"name": "CHECK_CUTOFF:before", "type": "check_cutoff"})
# ------------------------------------------------------------------ ODS (online fetch + load)
if not args.skip_ods:
if args.ods_tasks:
ods_tasks = [t.strip().upper() for t in args.ods_tasks.split(",") if t.strip()]
else:
ods_tasks = sorted(ENABLED_ODS_CODES)
for task_code in ods_tasks:
steps.append(
{
"name": f"ODS:{task_code}",
"type": "ods_task",
"task_code": task_code,
"overlap_seconds": str(args.overlap_seconds),
}
)
if args.check_ods_gaps:
steps.append(
{
"name": "ODS_GAP_CHECK",
"type": "ods_gap_check",
"overlap_hours": str(args.check_ods_overlap_hours),
"window_days": str(args.check_ods_window_days),
"window_hours": str(args.check_ods_window_hours),
"page_size": str(args.check_ods_page_size),
"sleep_per_window": str(args.check_ods_sleep_per_window_seconds),
"sleep_per_page": str(args.check_ods_sleep_per_page_seconds),
"timeout_sec": str(args.check_ods_timeout_sec),
"task_codes": str(args.check_ods_task_codes or ""),
"tag": "run_update",
}
)
# ------------------------------------------------------------------ DWD (load from ODS tables)
steps.append(
{
"name": "INIT_DWS_SCHEMA",
"type": "init_dws_schema",
"overlap_seconds": str(args.overlap_seconds),
}
)
for dwd_table in DwdLoadTask.TABLE_MAP.keys():
steps.append(
{
"name": f"DWD:{dwd_table}",
"type": "dwd_table",
"dwd_table": dwd_table,
"overlap_seconds": str(args.overlap_seconds),
}
)
# ------------------------------------------------------------------ DWS (rebuild by date window)
window_start, window_end = _compute_dws_window(
cfg=cfg_base,
tz=tz,
rebuild_days=int(args.dws_rebuild_days),
bootstrap_days=int(args.dws_bootstrap_days),
dws_start=dws_start,
dws_end=dws_end,
)
for start_dt, end_dt in _iter_daily_windows(window_start, window_end):
steps.append(
{
"name": f"DWS:{start_dt.date().isoformat()}",
"type": "dws_window",
"window_start": start_dt.strftime("%Y-%m-%d %H:%M:%S"),
"window_end": end_dt.strftime("%Y-%m-%d %H:%M:%S"),
"overlap_seconds": str(args.overlap_seconds),
}
)
if not args.skip_cutoff:
steps.append({"name": "CHECK_CUTOFF:after", "type": "check_cutoff"})
for step in steps:
step["log_file"] = str(log_file)
step["log_level"] = args.log_level
step["log_console"] = log_console
step_results: list[dict[str, object]] = []
for step in steps:
logger.info("STEP_START name=%s timeout=%ss", step["name"], STEP_TIMEOUT_SEC)
result = _run_step_with_timeout(step, logger, STEP_TIMEOUT_SEC)
step_results.append(result)
total = len(step_results)
ok_count = sum(1 for r in step_results if r.get("status") == "ok")
timeout_count = sum(1 for r in step_results if r.get("status") == "timeout")
fail_count = total - ok_count - timeout_count
logger.info(
"STEP_SUMMARY total=%s ok=%s failed=%s timeout=%s",
total,
ok_count,
fail_count,
timeout_count,
)
for item in sorted(step_results, key=lambda r: float(r.get("elapsed", 0.0)), reverse=True):
logger.info(
"STEP_RESULT name=%s status=%s elapsed=%.2fs",
item.get("name"),
item.get("status"),
item.get("elapsed", 0.0),
)
logger.info("Update done.")
return 0
if __name__ == "__main__":
raise SystemExit(main())