199 lines
7.4 KiB
Python
199 lines
7.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""CLI 命令构建器
|
||
|
||
支持两种模式:
|
||
1. 传统模式:--tasks 参数指定任务列表
|
||
2. 管道模式:--pipeline 参数指定管道类型,支持后置校验
|
||
"""
|
||
|
||
from typing import List, Dict, Any, Optional
|
||
from ..models.task_model import TaskConfig
|
||
|
||
|
||
# CLI 支持的命令行参数(来自 cli/main.py)
|
||
CLI_SUPPORTED_ARGS = {
|
||
# 值类型参数
|
||
"store_id", "tasks", "pg_dsn", "pg_host", "pg_port", "pg_name",
|
||
"pg_user", "pg_password", "api_base", "api_token", "api_timeout",
|
||
"api_page_size", "api_retry_max", "window_start", "window_end",
|
||
"export_root", "log_root", "pipeline_flow", "fetch_root",
|
||
"ingest_source", "idle_start", "idle_end",
|
||
# 新增:管道模式参数
|
||
"pipeline", "processing_mode", "window_split", "window_split_unit", "window_split_days",
|
||
"lookback_hours", "overlap_seconds",
|
||
# 布尔类型参数
|
||
"dry_run", "force_window_override", "write_pretty_json", "allow_empty_advance",
|
||
}
|
||
|
||
|
||
class CLIBuilder:
|
||
"""构建 CLI 命令行参数"""
|
||
|
||
def __init__(self, python_executable: str = "python"):
|
||
self.python_executable = python_executable
|
||
|
||
def build_command(self, config: TaskConfig) -> List[str]:
|
||
"""
|
||
根据任务配置构建命令行参数列表
|
||
|
||
支持两种模式:
|
||
1. 管道模式(优先):使用 --pipeline 参数
|
||
2. 传统模式:使用 --tasks 参数
|
||
|
||
Args:
|
||
config: 任务配置对象
|
||
|
||
Returns:
|
||
命令行参数列表
|
||
"""
|
||
cmd = [self.python_executable, "-m", "cli.main"]
|
||
|
||
# 判断使用管道模式还是传统模式
|
||
use_pipeline_mode = bool(config.pipeline and config.pipeline != "legacy")
|
||
|
||
if use_pipeline_mode:
|
||
# 管道模式
|
||
cmd.extend(["--pipeline", config.pipeline])
|
||
|
||
# 处理模式
|
||
if config.processing_mode:
|
||
cmd.extend(["--processing-mode", config.processing_mode])
|
||
|
||
# 校验前从 API 获取数据(仅 verify_only 模式有效)
|
||
if config.fetch_before_verify and config.processing_mode == "verify_only":
|
||
cmd.append("--fetch-before-verify")
|
||
|
||
# 时间窗口模式
|
||
if config.window_mode == "lookback":
|
||
# 回溯模式:使用 lookback_hours 和 overlap_seconds
|
||
if config.lookback_hours:
|
||
cmd.extend(["--lookback-hours", str(config.lookback_hours)])
|
||
if config.overlap_seconds:
|
||
cmd.extend(["--overlap-seconds", str(config.overlap_seconds)])
|
||
else:
|
||
# 自定义时间窗口
|
||
if config.window_start:
|
||
cmd.extend(["--window-start", config.window_start])
|
||
if config.window_end:
|
||
cmd.extend(["--window-end", config.window_end])
|
||
|
||
# 时间窗口切分(管道层拆分 + 任务层拆分)
|
||
if config.window_split and config.window_split != "none":
|
||
cmd.extend(["--window-split", config.window_split])
|
||
cmd.extend(["--window-split-unit", config.window_split])
|
||
if config.window_split_days:
|
||
cmd.extend(["--window-split-days", str(config.window_split_days)])
|
||
|
||
# 如果同时指定了任务列表,也传递(用于过滤)
|
||
if config.tasks:
|
||
cmd.extend(["--tasks", ",".join(config.tasks)])
|
||
else:
|
||
# 传统模式
|
||
if config.tasks:
|
||
cmd.extend(["--tasks", ",".join(config.tasks)])
|
||
|
||
# Pipeline 流程
|
||
if config.pipeline_flow:
|
||
cmd.extend(["--pipeline-flow", config.pipeline_flow])
|
||
|
||
# 时间窗口
|
||
if config.window_start:
|
||
cmd.extend(["--window-start", config.window_start])
|
||
if config.window_end:
|
||
cmd.extend(["--window-end", config.window_end])
|
||
|
||
# 时间窗口切分(任务层拆分)
|
||
if config.window_split and config.window_split != "none":
|
||
cmd.extend(["--window-split-unit", config.window_split])
|
||
if config.window_split_days:
|
||
cmd.extend(["--window-split-days", str(config.window_split_days)])
|
||
|
||
# Dry-run 模式
|
||
if config.dry_run:
|
||
cmd.append("--dry-run")
|
||
|
||
# 数据源目录(传统模式)
|
||
if config.ingest_source:
|
||
cmd.extend(["--ingest-source", config.ingest_source])
|
||
|
||
# 门店 ID
|
||
if config.store_id is not None:
|
||
cmd.extend(["--store-id", str(config.store_id)])
|
||
|
||
# 数据库 DSN
|
||
if config.pg_dsn:
|
||
cmd.extend(["--pg-dsn", config.pg_dsn])
|
||
|
||
# API Token
|
||
if config.api_token:
|
||
cmd.extend(["--api-token", config.api_token])
|
||
|
||
# 额外参数(只传递 CLI 支持的参数)
|
||
for key, value in config.extra_args.items():
|
||
if value is not None and key in CLI_SUPPORTED_ARGS:
|
||
arg_name = f"--{key.replace('_', '-')}"
|
||
if isinstance(value, bool):
|
||
if value:
|
||
cmd.append(arg_name)
|
||
else:
|
||
cmd.extend([arg_name, str(value)])
|
||
|
||
return cmd
|
||
|
||
def build_command_string(self, config: TaskConfig) -> str:
|
||
"""
|
||
构建命令行字符串(用于显示)
|
||
|
||
Args:
|
||
config: 任务配置对象
|
||
|
||
Returns:
|
||
命令行字符串
|
||
"""
|
||
cmd = self.build_command(config)
|
||
# 对包含空格的参数添加引号
|
||
quoted_cmd = []
|
||
for arg in cmd:
|
||
if ' ' in arg or '"' in arg:
|
||
quoted_cmd.append(f'"{arg}"')
|
||
else:
|
||
quoted_cmd.append(arg)
|
||
return " ".join(quoted_cmd)
|
||
|
||
def build_from_dict(self, params: Dict[str, Any]) -> List[str]:
|
||
"""
|
||
从字典构建命令行参数
|
||
|
||
Args:
|
||
params: 参数字典
|
||
|
||
Returns:
|
||
命令行参数列表
|
||
"""
|
||
config = TaskConfig(
|
||
tasks=params.get("tasks", []),
|
||
pipeline_flow=params.get("pipeline_flow", "FULL"),
|
||
dry_run=params.get("dry_run", False),
|
||
window_start=params.get("window_start"),
|
||
window_end=params.get("window_end"),
|
||
window_split=params.get("window_split"),
|
||
window_split_days=params.get("window_split_days"),
|
||
ingest_source=params.get("ingest_source"),
|
||
store_id=params.get("store_id"),
|
||
pg_dsn=params.get("pg_dsn"),
|
||
api_token=params.get("api_token"),
|
||
extra_args=params.get("extra_args", {}),
|
||
# 新增管道参数
|
||
pipeline=params.get("pipeline", ""),
|
||
processing_mode=params.get("processing_mode", "increment_only"),
|
||
fetch_before_verify=params.get("fetch_before_verify", False),
|
||
window_mode=params.get("window_mode", "lookback"),
|
||
lookback_hours=params.get("lookback_hours", 24),
|
||
overlap_seconds=params.get("overlap_seconds", 600),
|
||
)
|
||
return self.build_command(config)
|
||
|
||
|
||
# 全局实例
|
||
cli_builder = CLIBuilder()
|