132 lines
4.1 KiB
Python
132 lines
4.1 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""CLI 命令构建器"""
|
||
|
||
from typing import List, Dict, Any, Optional
|
||
from ..models.task_model import TaskConfig
|
||
|
||
|
||
# CLI 支持的命令行参数(来自 cli/main.py)
|
||
CLI_SUPPORTED_ARGS = {
|
||
# 值类型参数
|
||
"store_id", "tasks", "pg_dsn", "pg_host", "pg_port", "pg_name",
|
||
"pg_user", "pg_password", "api_base", "api_token", "api_timeout",
|
||
"api_page_size", "api_retry_max", "window_start", "window_end",
|
||
"export_root", "log_root", "pipeline_flow", "fetch_root",
|
||
"ingest_source", "idle_start", "idle_end",
|
||
# 布尔类型参数
|
||
"dry_run", "force_window_override", "write_pretty_json", "allow_empty_advance",
|
||
}
|
||
|
||
|
||
class CLIBuilder:
|
||
"""构建 CLI 命令行参数"""
|
||
|
||
def __init__(self, python_executable: str = "python"):
|
||
self.python_executable = python_executable
|
||
|
||
def build_command(self, config: TaskConfig) -> List[str]:
|
||
"""
|
||
根据任务配置构建命令行参数列表
|
||
|
||
Args:
|
||
config: 任务配置对象
|
||
|
||
Returns:
|
||
命令行参数列表
|
||
"""
|
||
cmd = [self.python_executable, "-m", "cli.main"]
|
||
|
||
# 任务列表
|
||
if config.tasks:
|
||
cmd.extend(["--tasks", ",".join(config.tasks)])
|
||
|
||
# Pipeline 流程
|
||
if config.pipeline_flow:
|
||
cmd.extend(["--pipeline-flow", config.pipeline_flow])
|
||
|
||
# Dry-run 模式
|
||
if config.dry_run:
|
||
cmd.append("--dry-run")
|
||
|
||
# 时间窗口
|
||
if config.window_start:
|
||
cmd.extend(["--window-start", config.window_start])
|
||
if config.window_end:
|
||
cmd.extend(["--window-end", config.window_end])
|
||
|
||
# 数据源目录
|
||
if config.ingest_source:
|
||
cmd.extend(["--ingest-source", config.ingest_source])
|
||
|
||
# 门店 ID
|
||
if config.store_id is not None:
|
||
cmd.extend(["--store-id", str(config.store_id)])
|
||
|
||
# 数据库 DSN
|
||
if config.pg_dsn:
|
||
cmd.extend(["--pg-dsn", config.pg_dsn])
|
||
|
||
# API Token
|
||
if config.api_token:
|
||
cmd.extend(["--api-token", config.api_token])
|
||
|
||
# 额外参数(只传递 CLI 支持的参数)
|
||
for key, value in config.extra_args.items():
|
||
if value is not None and key in CLI_SUPPORTED_ARGS:
|
||
arg_name = f"--{key.replace('_', '-')}"
|
||
if isinstance(value, bool):
|
||
if value:
|
||
cmd.append(arg_name)
|
||
else:
|
||
cmd.extend([arg_name, str(value)])
|
||
|
||
return cmd
|
||
|
||
def build_command_string(self, config: TaskConfig) -> str:
|
||
"""
|
||
构建命令行字符串(用于显示)
|
||
|
||
Args:
|
||
config: 任务配置对象
|
||
|
||
Returns:
|
||
命令行字符串
|
||
"""
|
||
cmd = self.build_command(config)
|
||
# 对包含空格的参数添加引号
|
||
quoted_cmd = []
|
||
for arg in cmd:
|
||
if ' ' in arg or '"' in arg:
|
||
quoted_cmd.append(f'"{arg}"')
|
||
else:
|
||
quoted_cmd.append(arg)
|
||
return " ".join(quoted_cmd)
|
||
|
||
def build_from_dict(self, params: Dict[str, Any]) -> List[str]:
|
||
"""
|
||
从字典构建命令行参数
|
||
|
||
Args:
|
||
params: 参数字典
|
||
|
||
Returns:
|
||
命令行参数列表
|
||
"""
|
||
config = TaskConfig(
|
||
tasks=params.get("tasks", []),
|
||
pipeline_flow=params.get("pipeline_flow", "FULL"),
|
||
dry_run=params.get("dry_run", False),
|
||
window_start=params.get("window_start"),
|
||
window_end=params.get("window_end"),
|
||
ingest_source=params.get("ingest_source"),
|
||
store_id=params.get("store_id"),
|
||
pg_dsn=params.get("pg_dsn"),
|
||
api_token=params.get("api_token"),
|
||
extra_args=params.get("extra_args", {}),
|
||
)
|
||
return self.build_command(config)
|
||
|
||
|
||
# 全局实例
|
||
cli_builder = CLIBuilder()
|