Files
ZQYY.FQ-ETL/gui/utils/cli_builder.py

199 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""CLI 命令构建器
支持两种模式:
1. 传统模式:--tasks 参数指定任务列表
2. 管道模式:--pipeline 参数指定管道类型,支持后置校验
"""
from typing import List, Dict, Any, Optional
from ..models.task_model import TaskConfig
# CLI 支持的命令行参数(来自 cli/main.py
CLI_SUPPORTED_ARGS = {
# 值类型参数
"store_id", "tasks", "pg_dsn", "pg_host", "pg_port", "pg_name",
"pg_user", "pg_password", "api_base", "api_token", "api_timeout",
"api_page_size", "api_retry_max", "window_start", "window_end",
"export_root", "log_root", "pipeline_flow", "fetch_root",
"ingest_source", "idle_start", "idle_end",
# 新增:管道模式参数
"pipeline", "processing_mode", "window_split", "window_split_unit", "window_split_days",
"lookback_hours", "overlap_seconds",
# 布尔类型参数
"dry_run", "force_window_override", "write_pretty_json", "allow_empty_advance",
}
class CLIBuilder:
"""构建 CLI 命令行参数"""
def __init__(self, python_executable: str = "python"):
self.python_executable = python_executable
def build_command(self, config: TaskConfig) -> List[str]:
"""
根据任务配置构建命令行参数列表
支持两种模式:
1. 管道模式(优先):使用 --pipeline 参数
2. 传统模式:使用 --tasks 参数
Args:
config: 任务配置对象
Returns:
命令行参数列表
"""
cmd = [self.python_executable, "-m", "cli.main"]
# 判断使用管道模式还是传统模式
use_pipeline_mode = bool(config.pipeline and config.pipeline != "legacy")
if use_pipeline_mode:
# 管道模式
cmd.extend(["--pipeline", config.pipeline])
# 处理模式
if config.processing_mode:
cmd.extend(["--processing-mode", config.processing_mode])
# 校验前从 API 获取数据(仅 verify_only 模式有效)
if config.fetch_before_verify and config.processing_mode == "verify_only":
cmd.append("--fetch-before-verify")
# 时间窗口模式
if config.window_mode == "lookback":
# 回溯模式:使用 lookback_hours 和 overlap_seconds
if config.lookback_hours:
cmd.extend(["--lookback-hours", str(config.lookback_hours)])
if config.overlap_seconds:
cmd.extend(["--overlap-seconds", str(config.overlap_seconds)])
else:
# 自定义时间窗口
if config.window_start:
cmd.extend(["--window-start", config.window_start])
if config.window_end:
cmd.extend(["--window-end", config.window_end])
# 时间窗口切分(管道层拆分 + 任务层拆分)
if config.window_split and config.window_split != "none":
cmd.extend(["--window-split", config.window_split])
cmd.extend(["--window-split-unit", config.window_split])
if config.window_split_days:
cmd.extend(["--window-split-days", str(config.window_split_days)])
# 如果同时指定了任务列表,也传递(用于过滤)
if config.tasks:
cmd.extend(["--tasks", ",".join(config.tasks)])
else:
# 传统模式
if config.tasks:
cmd.extend(["--tasks", ",".join(config.tasks)])
# Pipeline 流程
if config.pipeline_flow:
cmd.extend(["--pipeline-flow", config.pipeline_flow])
# 时间窗口
if config.window_start:
cmd.extend(["--window-start", config.window_start])
if config.window_end:
cmd.extend(["--window-end", config.window_end])
# 时间窗口切分(任务层拆分)
if config.window_split and config.window_split != "none":
cmd.extend(["--window-split-unit", config.window_split])
if config.window_split_days:
cmd.extend(["--window-split-days", str(config.window_split_days)])
# Dry-run 模式
if config.dry_run:
cmd.append("--dry-run")
# 数据源目录(传统模式)
if config.ingest_source:
cmd.extend(["--ingest-source", config.ingest_source])
# 门店 ID
if config.store_id is not None:
cmd.extend(["--store-id", str(config.store_id)])
# 数据库 DSN
if config.pg_dsn:
cmd.extend(["--pg-dsn", config.pg_dsn])
# API Token
if config.api_token:
cmd.extend(["--api-token", config.api_token])
# 额外参数(只传递 CLI 支持的参数)
for key, value in config.extra_args.items():
if value is not None and key in CLI_SUPPORTED_ARGS:
arg_name = f"--{key.replace('_', '-')}"
if isinstance(value, bool):
if value:
cmd.append(arg_name)
else:
cmd.extend([arg_name, str(value)])
return cmd
def build_command_string(self, config: TaskConfig) -> str:
"""
构建命令行字符串(用于显示)
Args:
config: 任务配置对象
Returns:
命令行字符串
"""
cmd = self.build_command(config)
# 对包含空格的参数添加引号
quoted_cmd = []
for arg in cmd:
if ' ' in arg or '"' in arg:
quoted_cmd.append(f'"{arg}"')
else:
quoted_cmd.append(arg)
return " ".join(quoted_cmd)
def build_from_dict(self, params: Dict[str, Any]) -> List[str]:
"""
从字典构建命令行参数
Args:
params: 参数字典
Returns:
命令行参数列表
"""
config = TaskConfig(
tasks=params.get("tasks", []),
pipeline_flow=params.get("pipeline_flow", "FULL"),
dry_run=params.get("dry_run", False),
window_start=params.get("window_start"),
window_end=params.get("window_end"),
window_split=params.get("window_split"),
window_split_days=params.get("window_split_days"),
ingest_source=params.get("ingest_source"),
store_id=params.get("store_id"),
pg_dsn=params.get("pg_dsn"),
api_token=params.get("api_token"),
extra_args=params.get("extra_args", {}),
# 新增管道参数
pipeline=params.get("pipeline", ""),
processing_mode=params.get("processing_mode", "increment_only"),
fetch_before_verify=params.get("fetch_before_verify", False),
window_mode=params.get("window_mode", "lookback"),
lookback_hours=params.get("lookback_hours", 24),
overlap_seconds=params.get("overlap_seconds", 600),
)
return self.build_command(config)
# 全局实例
cli_builder = CLIBuilder()