Files
2025-11-30 07:19:05 +08:00

159 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""CLI主入口"""
import sys
import argparse
import logging
from pathlib import Path
from config.settings import AppConfig
from orchestration.scheduler import ETLScheduler
def setup_logging():
"""设置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger("etl_billiards")
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="台球场ETL系统")
# 基本参数
parser.add_argument("--store-id", type=int, help="门店ID")
parser.add_argument("--tasks", help="任务列表,逗号分隔")
parser.add_argument("--dry-run", action="store_true", help="试运行(不提交)")
# 数据库参数
parser.add_argument("--pg-dsn", help="PostgreSQL DSN")
parser.add_argument("--pg-host", help="PostgreSQL主机")
parser.add_argument("--pg-port", type=int, help="PostgreSQL端口")
parser.add_argument("--pg-name", help="PostgreSQL数据库名")
parser.add_argument("--pg-user", help="PostgreSQL用户名")
parser.add_argument("--pg-password", help="PostgreSQL密码")
# API参数
parser.add_argument("--api-base", help="API基础URL")
parser.add_argument("--api-token", "--token", dest="api_token", help="API令牌Bearer Token")
parser.add_argument("--api-timeout", type=int, help="API超时(秒)")
parser.add_argument("--api-page-size", type=int, help="分页大小")
parser.add_argument("--api-retry-max", type=int, help="API重试最大次数")
# 目录参数
parser.add_argument("--export-root", help="导出根目录")
parser.add_argument("--log-root", help="日志根目录")
# 抓取/清洗管线
parser.add_argument("--pipeline-flow", choices=["FULL", "FETCH_ONLY", "INGEST_ONLY"], help="流水线模式")
parser.add_argument("--fetch-root", help="抓取JSON输出根目录")
parser.add_argument("--ingest-source", help="本地清洗入库源目录")
parser.add_argument("--write-pretty-json", action="store_true", help="抓取JSON美化输出")
# 运行窗口
parser.add_argument("--idle-start", help="闲时窗口开始(HH:MM)")
parser.add_argument("--idle-end", help="闲时窗口结束(HH:MM)")
parser.add_argument("--allow-empty-advance", action="store_true", help="允许空结果推进窗口")
return parser.parse_args()
def build_cli_overrides(args) -> dict:
"""从命令行参数构建配置覆盖"""
overrides = {}
# 基本信息
if args.store_id is not None:
overrides.setdefault("app", {})["store_id"] = args.store_id
# 数据库
if args.pg_dsn:
overrides.setdefault("db", {})["dsn"] = args.pg_dsn
if args.pg_host:
overrides.setdefault("db", {})["host"] = args.pg_host
if args.pg_port:
overrides.setdefault("db", {})["port"] = args.pg_port
if args.pg_name:
overrides.setdefault("db", {})["name"] = args.pg_name
if args.pg_user:
overrides.setdefault("db", {})["user"] = args.pg_user
if args.pg_password:
overrides.setdefault("db", {})["password"] = args.pg_password
# API
if args.api_base:
overrides.setdefault("api", {})["base_url"] = args.api_base
if args.api_token:
overrides.setdefault("api", {})["token"] = args.api_token
if args.api_timeout:
overrides.setdefault("api", {})["timeout_sec"] = args.api_timeout
if args.api_page_size:
overrides.setdefault("api", {})["page_size"] = args.api_page_size
if args.api_retry_max:
overrides.setdefault("api", {}).setdefault("retries", {})["max_attempts"] = args.api_retry_max
# 目录
if args.export_root:
overrides.setdefault("io", {})["export_root"] = args.export_root
if args.log_root:
overrides.setdefault("io", {})["log_root"] = args.log_root
# 抓取/清洗管线
if args.pipeline_flow:
overrides.setdefault("pipeline", {})["flow"] = args.pipeline_flow.upper()
if args.fetch_root:
overrides.setdefault("pipeline", {})["fetch_root"] = args.fetch_root
if args.ingest_source:
overrides.setdefault("pipeline", {})["ingest_source_dir"] = args.ingest_source
if args.write_pretty_json:
overrides.setdefault("io", {})["write_pretty_json"] = True
# 运行窗口
if args.idle_start:
overrides.setdefault("run", {}).setdefault("idle_window", {})["start"] = args.idle_start
if args.idle_end:
overrides.setdefault("run", {}).setdefault("idle_window", {})["end"] = args.idle_end
if args.allow_empty_advance:
overrides.setdefault("run", {})["allow_empty_result_advance"] = True
# 任务
if args.tasks:
tasks = [t.strip().upper() for t in args.tasks.split(",") if t.strip()]
overrides.setdefault("run", {})["tasks"] = tasks
return overrides
def main():
"""主函数"""
logger = setup_logging()
args = parse_args()
try:
# 加载配置
cli_overrides = build_cli_overrides(args)
config = AppConfig.load(cli_overrides)
logger.info("配置加载完成")
logger.info(f"门店ID: {config.get('app.store_id')}")
logger.info(f"任务列表: {config.get('run.tasks')}")
# 创建调度器
scheduler = ETLScheduler(config, logger)
# 运行任务
task_codes = config.get("run.tasks")
scheduler.run_tasks(task_codes)
# 关闭连接
scheduler.close()
logger.info("ETL运行完成")
return 0
except Exception as e:
logger.error(f"ETL运行失败: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())