# -*- coding: utf-8 -*- """CLI 命令构建器 支持两种模式: 1. 传统模式:--tasks 参数指定任务列表 2. 管道模式:--pipeline 参数指定管道类型,支持后置校验 """ from typing import List, Dict, Any, Optional from ..models.task_model import TaskConfig # CLI 支持的命令行参数(来自 cli/main.py) CLI_SUPPORTED_ARGS = { # 值类型参数 "store_id", "tasks", "pg_dsn", "pg_host", "pg_port", "pg_name", "pg_user", "pg_password", "api_base", "api_token", "api_timeout", "api_page_size", "api_retry_max", "window_start", "window_end", "export_root", "log_root", "pipeline_flow", "fetch_root", "ingest_source", "idle_start", "idle_end", # 新增:管道模式参数 "pipeline", "processing_mode", "window_split", "window_split_unit", "window_split_days", "lookback_hours", "overlap_seconds", # 布尔类型参数 "dry_run", "force_window_override", "write_pretty_json", "allow_empty_advance", } class CLIBuilder: """构建 CLI 命令行参数""" def __init__(self, python_executable: str = "python"): self.python_executable = python_executable def build_command(self, config: TaskConfig) -> List[str]: """ 根据任务配置构建命令行参数列表 支持两种模式: 1. 管道模式(优先):使用 --pipeline 参数 2. 传统模式:使用 --tasks 参数 Args: config: 任务配置对象 Returns: 命令行参数列表 """ cmd = [self.python_executable, "-m", "cli.main"] # 判断使用管道模式还是传统模式 use_pipeline_mode = bool(config.pipeline and config.pipeline != "legacy") if use_pipeline_mode: # 管道模式 cmd.extend(["--pipeline", config.pipeline]) # 处理模式 if config.processing_mode: cmd.extend(["--processing-mode", config.processing_mode]) # 校验前从 API 获取数据(仅 verify_only 模式有效) if config.fetch_before_verify and config.processing_mode == "verify_only": cmd.append("--fetch-before-verify") # 时间窗口模式 if config.window_mode == "lookback": # 回溯模式:使用 lookback_hours 和 overlap_seconds if config.lookback_hours: cmd.extend(["--lookback-hours", str(config.lookback_hours)]) if config.overlap_seconds: cmd.extend(["--overlap-seconds", str(config.overlap_seconds)]) else: # 自定义时间窗口 if config.window_start: cmd.extend(["--window-start", config.window_start]) if config.window_end: cmd.extend(["--window-end", config.window_end]) # 时间窗口切分(管道层拆分 + 任务层拆分) if config.window_split and config.window_split != "none": cmd.extend(["--window-split", config.window_split]) cmd.extend(["--window-split-unit", config.window_split]) if config.window_split_days: cmd.extend(["--window-split-days", str(config.window_split_days)]) # 如果同时指定了任务列表,也传递(用于过滤) if config.tasks: cmd.extend(["--tasks", ",".join(config.tasks)]) else: # 传统模式 if config.tasks: cmd.extend(["--tasks", ",".join(config.tasks)]) # Pipeline 流程 if config.pipeline_flow: cmd.extend(["--pipeline-flow", config.pipeline_flow]) # 时间窗口 if config.window_start: cmd.extend(["--window-start", config.window_start]) if config.window_end: cmd.extend(["--window-end", config.window_end]) # 时间窗口切分(任务层拆分) if config.window_split and config.window_split != "none": cmd.extend(["--window-split-unit", config.window_split]) if config.window_split_days: cmd.extend(["--window-split-days", str(config.window_split_days)]) # Dry-run 模式 if config.dry_run: cmd.append("--dry-run") # 数据源目录(传统模式) if config.ingest_source: cmd.extend(["--ingest-source", config.ingest_source]) # 门店 ID if config.store_id is not None: cmd.extend(["--store-id", str(config.store_id)]) # 数据库 DSN if config.pg_dsn: cmd.extend(["--pg-dsn", config.pg_dsn]) # API Token if config.api_token: cmd.extend(["--api-token", config.api_token]) # 额外参数(只传递 CLI 支持的参数) for key, value in config.extra_args.items(): if value is not None and key in CLI_SUPPORTED_ARGS: arg_name = f"--{key.replace('_', '-')}" if isinstance(value, bool): if value: cmd.append(arg_name) else: cmd.extend([arg_name, str(value)]) return cmd def build_command_string(self, config: TaskConfig) -> str: """ 构建命令行字符串(用于显示) Args: config: 任务配置对象 Returns: 命令行字符串 """ cmd = self.build_command(config) # 对包含空格的参数添加引号 quoted_cmd = [] for arg in cmd: if ' ' in arg or '"' in arg: quoted_cmd.append(f'"{arg}"') else: quoted_cmd.append(arg) return " ".join(quoted_cmd) def build_from_dict(self, params: Dict[str, Any]) -> List[str]: """ 从字典构建命令行参数 Args: params: 参数字典 Returns: 命令行参数列表 """ config = TaskConfig( tasks=params.get("tasks", []), pipeline_flow=params.get("pipeline_flow", "FULL"), dry_run=params.get("dry_run", False), window_start=params.get("window_start"), window_end=params.get("window_end"), window_split=params.get("window_split"), window_split_days=params.get("window_split_days"), ingest_source=params.get("ingest_source"), store_id=params.get("store_id"), pg_dsn=params.get("pg_dsn"), api_token=params.get("api_token"), extra_args=params.get("extra_args", {}), # 新增管道参数 pipeline=params.get("pipeline", ""), processing_mode=params.get("processing_mode", "increment_only"), fetch_before_verify=params.get("fetch_before_verify", False), window_mode=params.get("window_mode", "lookback"), lookback_hours=params.get("lookback_hours", 24), overlap_seconds=params.get("overlap_seconds", 600), ) return self.build_command(config) # 全局实例 cli_builder = CLIBuilder()