Files
Neo-ZQYY/apps/backend/app/services/cli_builder.py

161 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""CLI 命令构建器
从 gui/utils/cli_builder.py 迁移,适配后端 TaskConfigSchema。
将 TaskConfigSchema 转换为 ETL CLI 命令行参数列表。
支持:
- 7 种 Flowapi_ods / api_ods_dwd / api_full / ods_dwd / dwd_dws / dwd_dws_index / dwd_index
- 4 种处理模式increment_only / verify_only / increment_verify / full_window
- 自动注入 --store-id 参数
"""
from typing import Any
from ..schemas.tasks import TaskConfigSchema
# 有效的 Flow ID 集合
VALID_FLOWS: set[str] = {
"api_ods",
"api_ods_dwd",
"api_full",
"ods_dwd",
"dwd_dws",
"dwd_dws_index",
"dwd_index",
}
# 有效的处理模式集合
VALID_PROCESSING_MODES: set[str] = {
"increment_only",
"verify_only",
"increment_verify",
"full_window",
}
# CLI 支持的 extra_args 键(值类型 + 布尔类型)
CLI_SUPPORTED_ARGS: set[str] = {
# 值类型参数
"pg_dsn", "pg_host", "pg_port", "pg_name",
"pg_user", "pg_password", "api_base", "api_token", "api_timeout",
"api_page_size", "api_retry_max",
"export_root", "log_root", "fetch_root",
"ingest_source", "idle_start", "idle_end",
"data_source", "pipeline_flow",
"window_split_unit",
# 布尔类型参数
"force_window_override", "write_pretty_json", "allow_empty_advance",
}
class CLIBuilder:
"""将 TaskConfigSchema 转换为 ETL CLI 命令行参数列表"""
def build_command(
self,
config: TaskConfigSchema,
etl_project_path: str,
python_executable: str = "python",
) -> list[str]:
"""构建完整的 CLI 命令参数列表。
生成格式:
[python, -m, cli.main, --flow, {flow_id}, --tasks, ..., --store-id, {site_id}, ...]
Args:
config: 任务配置对象Pydantic 模型)
etl_project_path: ETL 项目根目录路径(用于 cwd不拼入命令
python_executable: Python 可执行文件路径,默认 "python"
Returns:
命令行参数列表
"""
cmd: list[str] = [python_executable, "-m", "cli.main"]
# -- Flow执行流程 --
# CHANGE [2026-02-20] intent: pipeline → flow统一命名
cmd.extend(["--flow", config.flow])
# -- 处理模式 --
if config.processing_mode:
cmd.extend(["--processing-mode", config.processing_mode])
# -- 任务列表 --
if config.tasks:
cmd.extend(["--tasks", ",".join(config.tasks)])
# -- 校验前从 API 获取数据(仅 verify_only 模式有效) --
if config.fetch_before_verify and config.processing_mode == "verify_only":
cmd.append("--fetch-before-verify")
# -- 时间窗口 --
if config.window_mode == "lookback":
# 回溯模式
if config.lookback_hours is not None:
cmd.extend(["--lookback-hours", str(config.lookback_hours)])
if config.overlap_seconds is not None:
cmd.extend(["--overlap-seconds", str(config.overlap_seconds)])
else:
# 自定义时间窗口
if config.window_start:
cmd.extend(["--window-start", config.window_start])
if config.window_end:
cmd.extend(["--window-end", config.window_end])
# -- 时间窗口切分 --
if config.window_split and config.window_split != "none":
cmd.extend(["--window-split", config.window_split])
if config.window_split_days is not None:
cmd.extend(["--window-split-days", str(config.window_split_days)])
# -- Dry-run --
if config.dry_run:
cmd.append("--dry-run")
# -- 强制全量处理 --
if config.force_full:
cmd.append("--force-full")
# -- 本地 JSON 模式 → --data-source offline --
if config.ods_use_local_json:
cmd.extend(["--data-source", "offline"])
# -- 门店 ID自动注入 --
if config.store_id is not None:
cmd.extend(["--store-id", str(config.store_id)])
# -- 额外参数(只传递 CLI 支持的参数) --
for key, value in config.extra_args.items():
if value is not None and key in CLI_SUPPORTED_ARGS:
arg_name = f"--{key.replace('_', '-')}"
if isinstance(value, bool):
if value:
cmd.append(arg_name)
else:
cmd.extend([arg_name, str(value)])
return cmd
def build_command_string(
self,
config: TaskConfigSchema,
etl_project_path: str,
python_executable: str = "python",
) -> str:
"""构建命令行字符串(用于显示/日志记录)。
对包含空格的参数自动添加引号。
"""
cmd = self.build_command(config, etl_project_path, python_executable)
quoted: list[str] = []
for arg in cmd:
if " " in arg or '"' in arg:
quoted.append(f'"{arg}"')
else:
quoted.append(arg)
return " ".join(quoted)
# 全局单例
cli_builder = CLIBuilder()