159 lines
5.8 KiB
Python
159 lines
5.8 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""CLI主入口"""
|
||
import sys
|
||
import argparse
|
||
import logging
|
||
from pathlib import Path
|
||
|
||
from config.settings import AppConfig
|
||
from orchestration.scheduler import ETLScheduler
|
||
|
||
def setup_logging():
|
||
"""设置日志"""
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
|
||
datefmt='%Y-%m-%d %H:%M:%S'
|
||
)
|
||
return logging.getLogger("etl_billiards")
|
||
|
||
def parse_args():
|
||
"""解析命令行参数"""
|
||
parser = argparse.ArgumentParser(description="台球场ETL系统")
|
||
|
||
# 基本参数
|
||
parser.add_argument("--store-id", type=int, help="门店ID")
|
||
parser.add_argument("--tasks", help="任务列表,逗号分隔")
|
||
parser.add_argument("--dry-run", action="store_true", help="试运行(不提交)")
|
||
|
||
# 数据库参数
|
||
parser.add_argument("--pg-dsn", help="PostgreSQL DSN")
|
||
parser.add_argument("--pg-host", help="PostgreSQL主机")
|
||
parser.add_argument("--pg-port", type=int, help="PostgreSQL端口")
|
||
parser.add_argument("--pg-name", help="PostgreSQL数据库名")
|
||
parser.add_argument("--pg-user", help="PostgreSQL用户名")
|
||
parser.add_argument("--pg-password", help="PostgreSQL密码")
|
||
|
||
# API参数
|
||
parser.add_argument("--api-base", help="API基础URL")
|
||
parser.add_argument("--api-token", "--token", dest="api_token", help="API令牌(Bearer Token)")
|
||
parser.add_argument("--api-timeout", type=int, help="API超时(秒)")
|
||
parser.add_argument("--api-page-size", type=int, help="分页大小")
|
||
parser.add_argument("--api-retry-max", type=int, help="API重试最大次数")
|
||
|
||
# 目录参数
|
||
parser.add_argument("--export-root", help="导出根目录")
|
||
parser.add_argument("--log-root", help="日志根目录")
|
||
|
||
# 抓取/清洗管线
|
||
parser.add_argument("--pipeline-flow", choices=["FULL", "FETCH_ONLY", "INGEST_ONLY"], help="流水线模式")
|
||
parser.add_argument("--fetch-root", help="抓取JSON输出根目录")
|
||
parser.add_argument("--ingest-source", help="本地清洗入库源目录")
|
||
parser.add_argument("--write-pretty-json", action="store_true", help="抓取JSON美化输出")
|
||
|
||
# 运行窗口
|
||
parser.add_argument("--idle-start", help="闲时窗口开始(HH:MM)")
|
||
parser.add_argument("--idle-end", help="闲时窗口结束(HH:MM)")
|
||
parser.add_argument("--allow-empty-advance", action="store_true", help="允许空结果推进窗口")
|
||
|
||
return parser.parse_args()
|
||
|
||
def build_cli_overrides(args) -> dict:
|
||
"""从命令行参数构建配置覆盖"""
|
||
overrides = {}
|
||
|
||
# 基本信息
|
||
if args.store_id is not None:
|
||
overrides.setdefault("app", {})["store_id"] = args.store_id
|
||
|
||
# 数据库
|
||
if args.pg_dsn:
|
||
overrides.setdefault("db", {})["dsn"] = args.pg_dsn
|
||
if args.pg_host:
|
||
overrides.setdefault("db", {})["host"] = args.pg_host
|
||
if args.pg_port:
|
||
overrides.setdefault("db", {})["port"] = args.pg_port
|
||
if args.pg_name:
|
||
overrides.setdefault("db", {})["name"] = args.pg_name
|
||
if args.pg_user:
|
||
overrides.setdefault("db", {})["user"] = args.pg_user
|
||
if args.pg_password:
|
||
overrides.setdefault("db", {})["password"] = args.pg_password
|
||
|
||
# API
|
||
if args.api_base:
|
||
overrides.setdefault("api", {})["base_url"] = args.api_base
|
||
if args.api_token:
|
||
overrides.setdefault("api", {})["token"] = args.api_token
|
||
if args.api_timeout:
|
||
overrides.setdefault("api", {})["timeout_sec"] = args.api_timeout
|
||
if args.api_page_size:
|
||
overrides.setdefault("api", {})["page_size"] = args.api_page_size
|
||
if args.api_retry_max:
|
||
overrides.setdefault("api", {}).setdefault("retries", {})["max_attempts"] = args.api_retry_max
|
||
|
||
# 目录
|
||
if args.export_root:
|
||
overrides.setdefault("io", {})["export_root"] = args.export_root
|
||
if args.log_root:
|
||
overrides.setdefault("io", {})["log_root"] = args.log_root
|
||
|
||
# 抓取/清洗管线
|
||
if args.pipeline_flow:
|
||
overrides.setdefault("pipeline", {})["flow"] = args.pipeline_flow.upper()
|
||
if args.fetch_root:
|
||
overrides.setdefault("pipeline", {})["fetch_root"] = args.fetch_root
|
||
if args.ingest_source:
|
||
overrides.setdefault("pipeline", {})["ingest_source_dir"] = args.ingest_source
|
||
if args.write_pretty_json:
|
||
overrides.setdefault("io", {})["write_pretty_json"] = True
|
||
|
||
# 运行窗口
|
||
if args.idle_start:
|
||
overrides.setdefault("run", {}).setdefault("idle_window", {})["start"] = args.idle_start
|
||
if args.idle_end:
|
||
overrides.setdefault("run", {}).setdefault("idle_window", {})["end"] = args.idle_end
|
||
if args.allow_empty_advance:
|
||
overrides.setdefault("run", {})["allow_empty_result_advance"] = True
|
||
|
||
# 任务
|
||
if args.tasks:
|
||
tasks = [t.strip().upper() for t in args.tasks.split(",") if t.strip()]
|
||
overrides.setdefault("run", {})["tasks"] = tasks
|
||
|
||
return overrides
|
||
|
||
def main():
|
||
"""主函数"""
|
||
logger = setup_logging()
|
||
args = parse_args()
|
||
|
||
try:
|
||
# 加载配置
|
||
cli_overrides = build_cli_overrides(args)
|
||
config = AppConfig.load(cli_overrides)
|
||
|
||
logger.info("配置加载完成")
|
||
logger.info(f"门店ID: {config.get('app.store_id')}")
|
||
logger.info(f"任务列表: {config.get('run.tasks')}")
|
||
|
||
# 创建调度器
|
||
scheduler = ETLScheduler(config, logger)
|
||
|
||
# 运行任务
|
||
task_codes = config.get("run.tasks")
|
||
scheduler.run_tasks(task_codes)
|
||
|
||
# 关闭连接
|
||
scheduler.close()
|
||
|
||
logger.info("ETL运行完成")
|
||
return 0
|
||
|
||
except Exception as e:
|
||
logger.error(f"ETL运行失败: {e}", exc_info=True)
|
||
return 1
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|