# -*- coding: utf-8 -*- """CLI主入口""" import sys import argparse import logging from pathlib import Path from config.settings import AppConfig from orchestration.scheduler import ETLScheduler def setup_logging(): """设置日志""" logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) return logging.getLogger("etl_billiards") def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser(description="台球场ETL系统") # 基本参数 parser.add_argument("--store-id", type=int, help="门店ID") parser.add_argument("--tasks", help="任务列表,逗号分隔") parser.add_argument("--dry-run", action="store_true", help="试运行(不提交)") # 数据库参数 parser.add_argument("--pg-dsn", help="PostgreSQL DSN") parser.add_argument("--pg-host", help="PostgreSQL主机") parser.add_argument("--pg-port", type=int, help="PostgreSQL端口") parser.add_argument("--pg-name", help="PostgreSQL数据库名") parser.add_argument("--pg-user", help="PostgreSQL用户名") parser.add_argument("--pg-password", help="PostgreSQL密码") # API参数 parser.add_argument("--api-base", help="API基础URL") parser.add_argument("--api-token", "--token", dest="api_token", help="API令牌(Bearer Token)") parser.add_argument("--api-timeout", type=int, help="API超时(秒)") parser.add_argument("--api-page-size", type=int, help="分页大小") parser.add_argument("--api-retry-max", type=int, help="API重试最大次数") # 目录参数 parser.add_argument("--export-root", help="导出根目录") parser.add_argument("--log-root", help="日志根目录") # 抓取/清洗管线 parser.add_argument("--pipeline-flow", choices=["FULL", "FETCH_ONLY", "INGEST_ONLY"], help="流水线模式") parser.add_argument("--fetch-root", help="抓取JSON输出根目录") parser.add_argument("--ingest-source", help="本地清洗入库源目录") parser.add_argument("--write-pretty-json", action="store_true", help="抓取JSON美化输出") # 运行窗口 parser.add_argument("--idle-start", help="闲时窗口开始(HH:MM)") parser.add_argument("--idle-end", help="闲时窗口结束(HH:MM)") parser.add_argument("--allow-empty-advance", action="store_true", help="允许空结果推进窗口") return parser.parse_args() def build_cli_overrides(args) -> dict: """从命令行参数构建配置覆盖""" overrides = {} # 基本信息 if args.store_id is not None: overrides.setdefault("app", {})["store_id"] = args.store_id # 数据库 if args.pg_dsn: overrides.setdefault("db", {})["dsn"] = args.pg_dsn if args.pg_host: overrides.setdefault("db", {})["host"] = args.pg_host if args.pg_port: overrides.setdefault("db", {})["port"] = args.pg_port if args.pg_name: overrides.setdefault("db", {})["name"] = args.pg_name if args.pg_user: overrides.setdefault("db", {})["user"] = args.pg_user if args.pg_password: overrides.setdefault("db", {})["password"] = args.pg_password # API if args.api_base: overrides.setdefault("api", {})["base_url"] = args.api_base if args.api_token: overrides.setdefault("api", {})["token"] = args.api_token if args.api_timeout: overrides.setdefault("api", {})["timeout_sec"] = args.api_timeout if args.api_page_size: overrides.setdefault("api", {})["page_size"] = args.api_page_size if args.api_retry_max: overrides.setdefault("api", {}).setdefault("retries", {})["max_attempts"] = args.api_retry_max # 目录 if args.export_root: overrides.setdefault("io", {})["export_root"] = args.export_root if args.log_root: overrides.setdefault("io", {})["log_root"] = args.log_root # 抓取/清洗管线 if args.pipeline_flow: overrides.setdefault("pipeline", {})["flow"] = args.pipeline_flow.upper() if args.fetch_root: overrides.setdefault("pipeline", {})["fetch_root"] = args.fetch_root if args.ingest_source: overrides.setdefault("pipeline", {})["ingest_source_dir"] = args.ingest_source if args.write_pretty_json: overrides.setdefault("io", {})["write_pretty_json"] = True # 运行窗口 if args.idle_start: overrides.setdefault("run", {}).setdefault("idle_window", {})["start"] = args.idle_start if args.idle_end: overrides.setdefault("run", {}).setdefault("idle_window", {})["end"] = args.idle_end if args.allow_empty_advance: overrides.setdefault("run", {})["allow_empty_result_advance"] = True # 任务 if args.tasks: tasks = [t.strip().upper() for t in args.tasks.split(",") if t.strip()] overrides.setdefault("run", {})["tasks"] = tasks return overrides def main(): """主函数""" logger = setup_logging() args = parse_args() try: # 加载配置 cli_overrides = build_cli_overrides(args) config = AppConfig.load(cli_overrides) logger.info("配置加载完成") logger.info(f"门店ID: {config.get('app.store_id')}") logger.info(f"任务列表: {config.get('run.tasks')}") # 创建调度器 scheduler = ETLScheduler(config, logger) # 运行任务 task_codes = config.get("run.tasks") scheduler.run_tasks(task_codes) # 关闭连接 scheduler.close() logger.info("ETL运行完成") return 0 except Exception as e: logger.error(f"ETL运行失败: {e}", exc_info=True) return 1 if __name__ == "__main__": sys.exit(main())