# -*- coding: utf-8 -*- """ 一键重建 ETL 相关 Schema,并执行 ODS → DWD。 本脚本面向“离线示例 JSON 回放”的开发/运维场景,使用当前项目内的任务实现: 1) (可选)DROP 并重建 schema:`etl_admin` / `billiards_ods` / `billiards_dwd` 2) 执行 `INIT_ODS_SCHEMA`:创建 `etl_admin` 元数据表 + 执行 `schema_ODS_doc.sql`(内部会做轻量清洗) 3) 执行 `INIT_DWD_SCHEMA`:执行 `schema_dwd_doc.sql` 4) 执行 `MANUAL_INGEST`:从本地 JSON 目录灌入 ODS 5) 执行 `DWD_LOAD_FROM_ODS`:从 ODS 装载到 DWD 用法(推荐): python -m scripts.rebuild.rebuild_db_and_run_ods_to_dwd ^ --dsn "postgresql://user:pwd@host:5432/db" ^ --store-id 1 ^ --json-dir "export/test-json-doc" ^ --drop-schemas 环境变量(可选): PG_DSN、STORE_ID、INGEST_SOURCE_DIR 日志: 默认同时输出到控制台与文件;文件路径为 `io.log_root/rebuild_db_<时间戳>.log`。 """ from __future__ import annotations import argparse import logging import os import time from dataclasses import dataclass from pathlib import Path from typing import Any import psycopg2 from config.settings import AppConfig from database.connection import DatabaseConnection from database.operations import DatabaseOperations from tasks.dwd.dwd_load_task import DwdLoadTask from tasks.utility.init_dwd_schema_task import InitDwdSchemaTask from tasks.utility.init_schema_task import InitOdsSchemaTask from tasks.utility.manual_ingest_task import ManualIngestTask DEFAULT_JSON_DIR = "export/test-json-doc" @dataclass(frozen=True) class RunArgs: """脚本参数对象(用于减少散落的参数传递)。""" dsn: str store_id: int json_dir: str drop_schemas: bool terminate_own_sessions: bool demo: bool only_files: list[str] only_dwd_tables: list[str] stop_after: str | None def _attach_file_logger(log_root: str | Path, filename: str, logger: logging.Logger) -> logging.Handler | None: """ 给 root logger 附加文件日志处理器(UTF-8)。 说明: - 使用 root logger 是为了覆盖项目中不同命名的 logger(包含第三方/子模块)。 - 若创建失败仅记录 warning,不中断主流程。 返回值: 创建成功返回 handler(调用方负责 removeHandler/close),失败返回 None。 """ log_dir = Path(log_root) try: log_dir.mkdir(parents=True, exist_ok=True) except Exception as exc: # noqa: BLE001 logger.warning("创建日志目录失败:%s(%s)", log_dir, exc) return None log_path = log_dir / filename try: handler: logging.Handler = logging.FileHandler(log_path, encoding="utf-8") except Exception as exc: # noqa: BLE001 logger.warning("创建文件日志失败:%s(%s)", log_path, exc) return None handler.setLevel(logging.INFO) handler.setFormatter( logging.Formatter( fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) ) logging.getLogger().addHandler(handler) logger.info("文件日志已启用:%s", log_path) return handler def _parse_args() -> RunArgs: """解析命令行/环境变量参数。""" parser = argparse.ArgumentParser(description="重建 Schema 并执行 ODS→DWD(离线 JSON 回放)") parser.add_argument("--dsn", default=os.environ.get("PG_DSN"), help="PostgreSQL DSN(默认读取 PG_DSN)") parser.add_argument( "--store-id", type=int, default=int(os.environ.get("STORE_ID") or 1), help="门店/租户 store_id(默认读取 STORE_ID,否则为 1)", ) parser.add_argument( "--json-dir", default=os.environ.get("INGEST_SOURCE_DIR") or DEFAULT_JSON_DIR, help=f"示例 JSON 目录(默认 {DEFAULT_JSON_DIR},也可读 INGEST_SOURCE_DIR)", ) parser.add_argument( "--drop-schemas", action=argparse.BooleanOptionalAction, default=True, help="是否先 DROP 并重建 etl_admin/billiards_ods/billiards_dwd(默认:是)", ) parser.add_argument( "--terminate-own-sessions", action=argparse.BooleanOptionalAction, default=True, help="执行 DROP 前是否终止当前用户的 idle-in-transaction 会话(默认:是)", ) parser.add_argument( "--demo", action=argparse.BooleanOptionalAction, default=False, help="运行最小 Demo(仅导入 member_profiles 并生成 dim_member/dim_member_ex)", ) parser.add_argument( "--only-files", default="", help="仅处理指定 JSON 文件(逗号分隔,不含 .json,例如:member_profiles,settlement_records)", ) parser.add_argument( "--only-dwd-tables", default="", help="仅处理指定 DWD 表(逗号分隔,支持完整名或表名,例如:billiards_dwd.dim_member,dim_member_ex)", ) parser.add_argument( "--stop-after", default="", help="在指定阶段后停止(可选:DROP_SCHEMAS/INIT_ODS_SCHEMA/INIT_DWD_SCHEMA/MANUAL_INGEST/DWD_LOAD_FROM_ODS/BASIC_VALIDATE)", ) args = parser.parse_args() if not args.dsn: raise SystemExit("缺少 DSN:请传入 --dsn 或设置环境变量 PG_DSN") only_files = [x.strip().lower() for x in str(args.only_files or "").split(",") if x.strip()] only_dwd_tables = [x.strip().lower() for x in str(args.only_dwd_tables or "").split(",") if x.strip()] stop_after = str(args.stop_after or "").strip().upper() or None return RunArgs( dsn=args.dsn, store_id=args.store_id, json_dir=str(args.json_dir), drop_schemas=bool(args.drop_schemas), terminate_own_sessions=bool(args.terminate_own_sessions), demo=bool(args.demo), only_files=only_files, only_dwd_tables=only_dwd_tables, stop_after=stop_after, ) def _build_config(args: RunArgs) -> AppConfig: """构建本次执行所需的最小配置覆盖。""" manual_cfg: dict[str, Any] = {} dwd_cfg: dict[str, Any] = {} if args.demo: manual_cfg["include_files"] = ["member_profiles"] dwd_cfg["only_tables"] = ["billiards_dwd.dim_member", "billiards_dwd.dim_member_ex"] if args.only_files: manual_cfg["include_files"] = args.only_files if args.only_dwd_tables: dwd_cfg["only_tables"] = args.only_dwd_tables overrides: dict[str, Any] = { "app": {"store_id": args.store_id}, "pipeline": {"flow": "INGEST_ONLY", "ingest_source_dir": args.json_dir}, "manual": manual_cfg, "dwd": dwd_cfg, # 离线回放/建仓可能耗时较长,关闭 statement_timeout,避免被默认 30s 中断。 # 同时关闭 lock_timeout,避免 DROP/DDL 因锁等待稍久就直接失败。 "db": {"dsn": args.dsn, "session": {"statement_timeout_ms": 0, "lock_timeout_ms": 0}}, } return AppConfig.load(overrides) def _drop_schemas(db: DatabaseOperations, logger: logging.Logger) -> None: """删除并重建 ETL 相关 schema(具备破坏性,请谨慎)。""" with db.conn.cursor() as cur: # 避免因为其他会话持锁而无限等待;若确实被占用,提示用户先释放/终止阻塞会话。 cur.execute("SET lock_timeout TO '5s'") for schema in ("billiards_dwd", "billiards_ods", "etl_admin"): logger.info("DROP SCHEMA IF EXISTS %s CASCADE ...", schema) cur.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE;') def _terminate_own_idle_in_tx(db: DatabaseOperations, logger: logging.Logger) -> int: """终止当前用户在本库中处于 idle-in-transaction 的会话,避免阻塞 DROP/DDL。""" with db.conn.cursor() as cur: cur.execute( """ SELECT pid FROM pg_stat_activity WHERE datname = current_database() AND usename = current_user AND pid <> pg_backend_pid() AND state = 'idle in transaction' """ ) pids = [r[0] for r in cur.fetchall()] killed = 0 for pid in pids: cur.execute("SELECT pg_terminate_backend(%s)", (pid,)) ok = bool(cur.fetchone()[0]) logger.info("终止会话 pid=%s ok=%s", pid, ok) killed += 1 if ok else 0 return killed def _run_task(task, logger: logging.Logger) -> dict: """统一运行任务并打印关键结果。""" result = task.execute(None) logger.info("%s: status=%s counts=%s", task.get_task_code(), result.get("status"), result.get("counts")) return result def _basic_validate(db: DatabaseOperations, logger: logging.Logger) -> None: """做最基础的可用性校验:schema 存在、关键表行数可查询。""" checks = [ ("billiards_ods", "member_profiles"), ("billiards_ods", "settlement_records"), ("billiards_dwd", "dim_member"), ("billiards_dwd", "dwd_settlement_head"), ] for schema, table in checks: try: rows = db.query(f'SELECT COUNT(1) AS cnt FROM "{schema}"."{table}"') logger.info("校验行数:%s.%s = %s", schema, table, (rows[0] or {}).get("cnt") if rows else None) except Exception as exc: # noqa: BLE001 logger.warning("校验失败:%s.%s(%s)", schema, table, exc) def _connect_db_with_retry(cfg: AppConfig, logger: logging.Logger) -> DatabaseConnection: """创建数据库连接(带重试),避免短暂网络抖动导致脚本直接失败。""" dsn = cfg["db"]["dsn"] session = cfg["db"].get("session") connect_timeout = cfg["db"].get("connect_timeout_sec") backoffs = [1, 2, 4, 8, 16] last_exc: Exception | None = None for attempt, wait_sec in enumerate([0] + backoffs, start=1): if wait_sec: time.sleep(wait_sec) try: return DatabaseConnection(dsn=dsn, session=session, connect_timeout=connect_timeout) except Exception as exc: # noqa: BLE001 last_exc = exc logger.warning("数据库连接失败(第 %s 次):%s", attempt, exc) raise last_exc or RuntimeError("数据库连接失败") def _is_connection_error(exc: Exception) -> bool: """判断是否为连接断开/服务端异常导致的可重试错误。""" return isinstance(exc, (psycopg2.OperationalError, psycopg2.InterfaceError)) def _run_stage_with_reconnect( cfg: AppConfig, logger: logging.Logger, stage_name: str, fn, max_attempts: int = 3, ) -> dict | None: """ 运行单个阶段:失败(尤其是连接断开)时自动重连并重试。 fn: (db_ops) -> dict | None """ last_exc: Exception | None = None for attempt in range(1, max_attempts + 1): db_conn = _connect_db_with_retry(cfg, logger) db_ops = DatabaseOperations(db_conn) try: logger.info("阶段开始:%s(第 %s/%s 次)", stage_name, attempt, max_attempts) result = fn(db_ops) logger.info("阶段完成:%s", stage_name) return result except Exception as exc: # noqa: BLE001 last_exc = exc logger.exception("阶段失败:%s(第 %s/%s 次):%s", stage_name, attempt, max_attempts, exc) # 连接类错误允许重试;非连接错误直接抛出,避免掩盖逻辑问题。 if not _is_connection_error(exc): raise time.sleep(min(2**attempt, 10)) finally: try: db_ops.close() # type: ignore[attr-defined] except Exception: pass try: db_conn.close() except Exception: pass raise last_exc or RuntimeError(f"阶段失败:{stage_name}") def main() -> int: """脚本主入口:按顺序重建并跑通 ODS→DWD。""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("fq_etl.rebuild_db") args = _parse_args() cfg = _build_config(args) # 默认启用文件日志,便于事后追溯(即便运行失败也应尽早落盘)。 file_handler = _attach_file_logger( log_root=cfg["io"]["log_root"], filename=time.strftime("rebuild_db_%Y%m%d-%H%M%S.log"), logger=logger, ) try: json_dir = Path(args.json_dir) if not json_dir.exists(): logger.error("示例 JSON 目录不存在:%s", json_dir) return 2 def stage_drop(db_ops: DatabaseOperations): if not args.drop_schemas: return None if args.terminate_own_sessions: killed = _terminate_own_idle_in_tx(db_ops, logger) if killed: db_ops.commit() _drop_schemas(db_ops, logger) db_ops.commit() return None def stage_init_ods(db_ops: DatabaseOperations): return _run_task(InitOdsSchemaTask(cfg, db_ops, None, logger), logger) def stage_init_dwd(db_ops: DatabaseOperations): return _run_task(InitDwdSchemaTask(cfg, db_ops, None, logger), logger) def stage_manual_ingest(db_ops: DatabaseOperations): logger.info("开始执行:MANUAL_INGEST(json_dir=%s)", json_dir) return _run_task(ManualIngestTask(cfg, db_ops, None, logger), logger) def stage_dwd_load(db_ops: DatabaseOperations): logger.info("开始执行:DWD_LOAD_FROM_ODS") return _run_task(DwdLoadTask(cfg, db_ops, None, logger), logger) _run_stage_with_reconnect(cfg, logger, "DROP_SCHEMAS", stage_drop, max_attempts=3) if args.stop_after == "DROP_SCHEMAS": return 0 _run_stage_with_reconnect(cfg, logger, "INIT_ODS_SCHEMA", stage_init_ods, max_attempts=3) if args.stop_after == "INIT_ODS_SCHEMA": return 0 _run_stage_with_reconnect(cfg, logger, "INIT_DWD_SCHEMA", stage_init_dwd, max_attempts=3) if args.stop_after == "INIT_DWD_SCHEMA": return 0 _run_stage_with_reconnect(cfg, logger, "MANUAL_INGEST", stage_manual_ingest, max_attempts=5) if args.stop_after == "MANUAL_INGEST": return 0 _run_stage_with_reconnect(cfg, logger, "DWD_LOAD_FROM_ODS", stage_dwd_load, max_attempts=5) if args.stop_after == "DWD_LOAD_FROM_ODS": return 0 # 校验阶段复用一条新连接即可 _run_stage_with_reconnect( cfg, logger, "BASIC_VALIDATE", lambda db_ops: _basic_validate(db_ops, logger), max_attempts=3, ) if args.stop_after == "BASIC_VALIDATE": return 0 return 0 finally: if file_handler is not None: try: logging.getLogger().removeHandler(file_handler) except Exception: pass try: file_handler.close() except Exception: pass if __name__ == "__main__": raise SystemExit(main())