Files
ZQYY.FQ-ETL/gui/widgets/pipeline_selector.py

604 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""管道选择组件:统一的 ETL 管道配置界面。"""
from typing import Dict, List, Optional, Tuple
from PySide6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QGroupBox,
QRadioButton, QButtonGroup, QLabel, QSpinBox,
QDateTimeEdit, QComboBox, QCheckBox, QPushButton,
QScrollArea, QFrame
)
from PySide6.QtCore import Signal, Qt, QDateTime
from ..models.task_registry import (
TaskRegistry, TaskDefinition, BusinessDomain, DOMAIN_LABELS,
task_registry, get_fact_ods_task_codes, get_dimension_ods_task_codes
)
# 管道选项定义:(id, 显示名称, 包含的层)
PIPELINE_OPTIONS: List[Tuple[str, str, List[str]]] = [
("api_ods", "API → ODS", ["ODS"]),
("api_ods_dwd", "API → ODS → DWD", ["ODS", "DWD"]),
("api_full", "API → ODS → DWD → DWS汇总 → DWS指数", ["ODS", "DWD", "DWS", "INDEX"]),
("ods_dwd", "ODS → DWD", ["DWD"]),
("dwd_dws", "DWD → DWS汇总", ["DWS"]),
("dwd_dws_index", "DWD → DWS汇总 → DWS指数", ["DWS", "INDEX"]),
("dwd_index", "DWD → DWS指数", ["INDEX"]),
]
# 数据处理模式
PROCESSING_MODES: List[Tuple[str, str, str]] = [
("increment_only", "仅增量", "仅执行增量数据处理,不进行校验"),
("verify_only", "校验并修复", "跳过增量处理,直接校验数据一致性并自动补齐缺失/不一致数据"),
("increment_verify", "增量 + 校验并修复", "先执行增量处理,再校验并修复缺失/不一致数据"),
]
# 校验模式附加选项
VERIFY_MODE_OPTIONS = {
"fetch_before_verify": "校验前先从 API 获取数据",
"skip_ods_when_fetch_before_verify": "跳过 ODS 校验(仅在校验前获取时)",
"ods_use_local_json": "ODS 校验使用本地 JSON不请求 API",
}
# 时间窗口模式
WINDOW_MODES: List[Tuple[str, str]] = [
("lookback", "回溯 + 冗余"),
("custom", "自定义时间范围"),
]
# 时间窗口切分选项
WINDOW_SPLIT_OPTIONS: List[Tuple[str, str]] = [
("none", "不切分"),
("day", "按天"),
]
# 时间窗口切分天数(按天时生效)
WINDOW_SPLIT_DAY_OPTIONS: List[Tuple[int, str]] = [
(1, "1 天"),
(10, "10 天"),
(30, "30 天"),
]
def get_pipeline_layers(pipeline_id: str) -> List[str]:
"""获取管道包含的层"""
for pid, _, layers in PIPELINE_OPTIONS:
if pid == pipeline_id:
return layers
return []
def get_pipeline_display_name(pipeline_id: str) -> str:
"""获取管道显示名称"""
for pid, name, _ in PIPELINE_OPTIONS:
if pid == pipeline_id:
return name
return pipeline_id
class PipelineSelectorWidget(QWidget):
"""管道选择组件"""
# 信号
pipeline_changed = Signal(str) # 管道ID
processing_mode_changed = Signal(str) # 处理模式
window_mode_changed = Signal(str) # 时间窗口模式
config_changed = Signal() # 任意配置变化
def __init__(self, parent: Optional[QWidget] = None):
super().__init__(parent)
# 当前选择
self._pipeline_id = "api_ods_dwd"
self._processing_mode = "increment_only"
self._window_mode = "lookback"
self._fetch_before_verify = False
self._skip_ods_when_fetch_before_verify = True
self._ods_use_local_json = True
self._init_ui()
self._connect_signals()
def _init_ui(self):
"""初始化界面"""
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(12)
# 1. 管道选择
pipeline_group = self._create_pipeline_group()
layout.addWidget(pipeline_group)
# 2. 数据处理模式
processing_group = self._create_processing_mode_group()
layout.addWidget(processing_group)
# 3. 时间窗口配置
window_group = self._create_window_group()
layout.addWidget(window_group)
layout.addStretch()
def _create_pipeline_group(self) -> QGroupBox:
"""创建管道选择分组"""
group = QGroupBox("管道选择 (Pipeline)")
layout = QVBoxLayout(group)
layout.setSpacing(4)
self._pipeline_button_group = QButtonGroup(self)
for i, (pid, name, layers) in enumerate(PIPELINE_OPTIONS):
radio = QRadioButton(name)
radio.setProperty("pipeline_id", pid)
radio.setToolTip(f"包含层: {''.join(layers)}")
if pid == self._pipeline_id:
radio.setChecked(True)
self._pipeline_button_group.addButton(radio, i)
layout.addWidget(radio)
return group
def _create_processing_mode_group(self) -> QGroupBox:
"""创建数据处理模式分组"""
group = QGroupBox("数据处理模式")
layout = QVBoxLayout(group)
layout.setSpacing(4)
self._processing_button_group = QButtonGroup(self)
for i, (mode_id, name, tooltip) in enumerate(PROCESSING_MODES):
radio = QRadioButton(name)
radio.setProperty("mode_id", mode_id)
radio.setToolTip(tooltip)
if mode_id == self._processing_mode:
radio.setChecked(True)
self._processing_button_group.addButton(radio, i)
layout.addWidget(radio)
# 校验模式附加选项:校验前从 API 获取数据
option_layout = QHBoxLayout()
option_layout.setContentsMargins(20, 4, 0, 0) # 缩进以表示从属关系
self._fetch_before_verify_checkbox = QCheckBox(
VERIFY_MODE_OPTIONS["fetch_before_verify"]
)
self._fetch_before_verify_checkbox.setToolTip(
"勾选后,在执行校验前会先从 API 获取最新数据到 ODS 层。\n"
"适用于需要同时获取新数据并校验修复的场景。"
)
self._fetch_before_verify_checkbox.setChecked(self._fetch_before_verify)
# 默认禁用,仅在 verify_only 模式下启用
self._fetch_before_verify_checkbox.setEnabled(
self._processing_mode == "verify_only"
)
option_layout.addWidget(self._fetch_before_verify_checkbox)
option_layout.addStretch()
layout.addLayout(option_layout)
# 仅在 fetch_before_verify 时生效的附加选项
skip_ods_layout = QHBoxLayout()
skip_ods_layout.setContentsMargins(40, 2, 0, 0)
self._skip_ods_when_fetch_before_verify_checkbox = QCheckBox(
VERIFY_MODE_OPTIONS["skip_ods_when_fetch_before_verify"]
)
self._skip_ods_when_fetch_before_verify_checkbox.setToolTip(
"勾选后,在校验前先抓取数据的场景下跳过 ODS 校验。\n"
"适用于仅关心 ODS 入库统计或避免重复校验的场景。"
)
self._skip_ods_when_fetch_before_verify_checkbox.setChecked(
self._skip_ods_when_fetch_before_verify
)
skip_ods_layout.addWidget(self._skip_ods_when_fetch_before_verify_checkbox)
skip_ods_layout.addStretch()
layout.addLayout(skip_ods_layout)
local_json_layout = QHBoxLayout()
local_json_layout.setContentsMargins(40, 2, 0, 0)
self._ods_use_local_json_checkbox = QCheckBox(
VERIFY_MODE_OPTIONS["ods_use_local_json"]
)
self._ods_use_local_json_checkbox.setToolTip(
"勾选后ODS 校验将完全基于落盘 JSON 进行,不再请求 API。\n"
"需要先执行“校验前先从 API 获取数据”以生成 JSON。"
)
self._ods_use_local_json_checkbox.setChecked(self._ods_use_local_json)
local_json_layout.addWidget(self._ods_use_local_json_checkbox)
local_json_layout.addStretch()
layout.addLayout(local_json_layout)
self._update_verify_option_states()
return group
def _create_window_group(self) -> QGroupBox:
"""创建时间窗口配置分组"""
group = QGroupBox("时间窗口")
layout = QVBoxLayout(group)
layout.setSpacing(8)
# 时间窗口模式选择
self._window_button_group = QButtonGroup(self)
# 回溯模式
lookback_layout = QHBoxLayout()
self._lookback_radio = QRadioButton("回溯 + 冗余:")
self._lookback_radio.setProperty("mode_id", "lookback")
self._lookback_radio.setChecked(True)
self._window_button_group.addButton(self._lookback_radio, 0)
lookback_layout.addWidget(self._lookback_radio)
self._lookback_hours_spin = QSpinBox()
self._lookback_hours_spin.setRange(1, 720)
self._lookback_hours_spin.setValue(24)
self._lookback_hours_spin.setSuffix(" 小时")
self._lookback_hours_spin.setToolTip("回溯时间长度")
self._lookback_hours_spin.setFixedWidth(100)
lookback_layout.addWidget(self._lookback_hours_spin)
lookback_layout.addWidget(QLabel("冗余:"))
self._overlap_seconds_spin = QSpinBox()
self._overlap_seconds_spin.setRange(0, 7200)
self._overlap_seconds_spin.setValue(600)
self._overlap_seconds_spin.setSuffix("")
self._overlap_seconds_spin.setToolTip("时间窗口前后的重叠冗余")
self._overlap_seconds_spin.setFixedWidth(100)
lookback_layout.addWidget(self._overlap_seconds_spin)
lookback_layout.addStretch()
layout.addLayout(lookback_layout)
# 自定义模式
custom_layout = QHBoxLayout()
self._custom_radio = QRadioButton("自定义:")
self._custom_radio.setProperty("mode_id", "custom")
self._window_button_group.addButton(self._custom_radio, 1)
custom_layout.addWidget(self._custom_radio)
self._start_datetime = QDateTimeEdit()
self._start_datetime.setCalendarPopup(True)
self._start_datetime.setDisplayFormat("yyyy-MM-dd HH:mm:ss")
self._start_datetime.setDateTime(QDateTime.currentDateTime().addDays(-1))
self._start_datetime.setFixedWidth(160)
self._start_datetime.setEnabled(False)
custom_layout.addWidget(self._start_datetime)
custom_layout.addWidget(QLabel(""))
self._end_datetime = QDateTimeEdit()
self._end_datetime.setCalendarPopup(True)
self._end_datetime.setDisplayFormat("yyyy-MM-dd HH:mm:ss")
self._end_datetime.setDateTime(QDateTime.currentDateTime())
self._end_datetime.setFixedWidth(160)
self._end_datetime.setEnabled(False)
custom_layout.addWidget(self._end_datetime)
custom_layout.addStretch()
layout.addLayout(custom_layout)
# 时间窗口切分
split_layout = QHBoxLayout()
split_layout.addWidget(QLabel("时间窗口切分:"))
self._split_combo = QComboBox()
for split_id, split_name in WINDOW_SPLIT_OPTIONS:
self._split_combo.addItem(split_name, split_id)
default_split_index = self._split_combo.findData("day")
if default_split_index >= 0:
self._split_combo.setCurrentIndex(default_split_index)
self._split_combo.setFixedWidth(100)
split_layout.addWidget(self._split_combo)
split_layout.addWidget(QLabel("切分天数:"))
self._split_days_combo = QComboBox()
for days, label in WINDOW_SPLIT_DAY_OPTIONS:
self._split_days_combo.addItem(label, days)
default_days_index = self._split_days_combo.findData(10)
if default_days_index >= 0:
self._split_days_combo.setCurrentIndex(default_days_index)
self._split_days_combo.setFixedWidth(90)
split_layout.addWidget(self._split_days_combo)
split_layout.addStretch()
layout.addLayout(split_layout)
self._update_split_days_state()
return group
def _connect_signals(self):
"""连接信号"""
# 管道选择变化
self._pipeline_button_group.buttonClicked.connect(self._on_pipeline_changed)
# 处理模式变化
self._processing_button_group.buttonClicked.connect(self._on_processing_mode_changed)
# 时间窗口模式变化
self._window_button_group.buttonClicked.connect(self._on_window_mode_changed)
# 其他配置变化
self._lookback_hours_spin.valueChanged.connect(self._emit_config_changed)
self._overlap_seconds_spin.valueChanged.connect(self._emit_config_changed)
self._start_datetime.dateTimeChanged.connect(self._emit_config_changed)
self._end_datetime.dateTimeChanged.connect(self._emit_config_changed)
self._split_combo.currentIndexChanged.connect(self._on_split_changed)
self._split_days_combo.currentIndexChanged.connect(self._emit_config_changed)
# 校验模式附加选项变化
self._fetch_before_verify_checkbox.stateChanged.connect(self._on_fetch_before_verify_changed)
self._skip_ods_when_fetch_before_verify_checkbox.stateChanged.connect(
self._on_skip_ods_when_fetch_before_verify_changed
)
self._ods_use_local_json_checkbox.stateChanged.connect(
self._on_ods_use_local_json_changed
)
def _on_pipeline_changed(self, button: QRadioButton):
"""管道选择变化"""
pipeline_id = button.property("pipeline_id")
if pipeline_id and pipeline_id != self._pipeline_id:
self._pipeline_id = pipeline_id
self.pipeline_changed.emit(pipeline_id)
self.config_changed.emit()
def _on_processing_mode_changed(self, button: QRadioButton):
"""处理模式变化"""
mode_id = button.property("mode_id")
if mode_id and mode_id != self._processing_mode:
self._processing_mode = mode_id
# 更新 "校验前获取数据" 选项的启用状态
# 仅在 verify_only 模式下可用
is_verify_only = mode_id == "verify_only"
self._fetch_before_verify_checkbox.setEnabled(is_verify_only)
if not is_verify_only:
# 非 verify_only 模式时,自动取消勾选
self._fetch_before_verify_checkbox.setChecked(False)
self._update_verify_option_states()
self.processing_mode_changed.emit(mode_id)
self.config_changed.emit()
def _on_fetch_before_verify_changed(self, state: int):
"""校验前获取数据选项变化"""
from PySide6.QtCore import Qt
self._fetch_before_verify = state == Qt.Checked.value
self._update_verify_option_states()
self.config_changed.emit()
def _on_skip_ods_when_fetch_before_verify_changed(self, state: int):
"""跳过 ODS 校验选项变化"""
from PySide6.QtCore import Qt
self._skip_ods_when_fetch_before_verify = state == Qt.Checked.value
self.config_changed.emit()
def _on_ods_use_local_json_changed(self, state: int):
"""ODS 校验使用本地 JSON 选项变化"""
from PySide6.QtCore import Qt
self._ods_use_local_json = state == Qt.Checked.value
self.config_changed.emit()
def _update_verify_option_states(self):
"""更新校验附加选项的启用状态"""
enable_suboptions = self._processing_mode == "verify_only" and self._fetch_before_verify
self._skip_ods_when_fetch_before_verify_checkbox.setEnabled(enable_suboptions)
self._ods_use_local_json_checkbox.setEnabled(enable_suboptions)
def _on_window_mode_changed(self, button: QRadioButton):
"""时间窗口模式变化"""
mode_id = button.property("mode_id")
if mode_id and mode_id != self._window_mode:
self._window_mode = mode_id
# 更新控件启用状态
is_lookback = mode_id == "lookback"
self._lookback_hours_spin.setEnabled(is_lookback)
self._overlap_seconds_spin.setEnabled(is_lookback)
self._start_datetime.setEnabled(not is_lookback)
self._end_datetime.setEnabled(not is_lookback)
self.window_mode_changed.emit(mode_id)
self.config_changed.emit()
def _on_split_changed(self):
"""时间窗口切分方式变化"""
self._update_split_days_state()
self.config_changed.emit()
def _update_split_days_state(self):
"""按天切分才允许选择天数"""
is_day_split = self.get_window_split() == "day"
self._split_days_combo.setEnabled(is_day_split)
def _emit_config_changed(self):
"""发出配置变化信号"""
self.config_changed.emit()
# === 公共接口 ===
def get_pipeline_id(self) -> str:
"""获取当前管道ID"""
return self._pipeline_id
def set_pipeline_id(self, pipeline_id: str):
"""设置管道ID"""
for button in self._pipeline_button_group.buttons():
if button.property("pipeline_id") == pipeline_id:
button.setChecked(True)
self._pipeline_id = pipeline_id
break
def get_pipeline_layers(self) -> List[str]:
"""获取当前管道包含的层"""
return get_pipeline_layers(self._pipeline_id)
def get_processing_mode(self) -> str:
"""获取数据处理模式"""
return self._processing_mode
def set_processing_mode(self, mode: str):
"""设置数据处理模式"""
for button in self._processing_button_group.buttons():
if button.property("mode_id") == mode:
button.setChecked(True)
self._processing_mode = mode
# 更新复选框启用状态
is_verify_only = mode == "verify_only"
self._fetch_before_verify_checkbox.setEnabled(is_verify_only)
if not is_verify_only:
self._fetch_before_verify_checkbox.setChecked(False)
self._update_verify_option_states()
break
def get_fetch_before_verify(self) -> bool:
"""获取是否在校验前从 API 获取数据"""
return self._fetch_before_verify
def set_fetch_before_verify(self, enabled: bool):
"""设置是否在校验前从 API 获取数据"""
self._fetch_before_verify = enabled
self._fetch_before_verify_checkbox.setChecked(enabled)
self._update_verify_option_states()
def get_skip_ods_when_fetch_before_verify(self) -> bool:
"""获取是否跳过 ODS 校验(仅校验前获取时生效)"""
return self._skip_ods_when_fetch_before_verify
def set_skip_ods_when_fetch_before_verify(self, enabled: bool):
"""设置是否跳过 ODS 校验(仅校验前获取时生效)"""
self._skip_ods_when_fetch_before_verify = enabled
self._skip_ods_when_fetch_before_verify_checkbox.setChecked(enabled)
self._update_verify_option_states()
def get_ods_use_local_json(self) -> bool:
"""获取是否使用本地 JSON 进行 ODS 校验"""
return self._ods_use_local_json
def set_ods_use_local_json(self, enabled: bool):
"""设置是否使用本地 JSON 进行 ODS 校验"""
self._ods_use_local_json = enabled
self._ods_use_local_json_checkbox.setChecked(enabled)
self._update_verify_option_states()
def get_window_mode(self) -> str:
"""获取时间窗口模式"""
return self._window_mode
def set_window_mode(self, mode: str):
"""设置时间窗口模式"""
for button in self._window_button_group.buttons():
if button.property("mode_id") == mode:
button.setChecked(True)
self._on_window_mode_changed(button)
break
def get_lookback_hours(self) -> int:
"""获取回溯小时数"""
return self._lookback_hours_spin.value()
def set_lookback_hours(self, hours: int):
"""设置回溯小时数"""
self._lookback_hours_spin.setValue(hours)
def get_overlap_seconds(self) -> int:
"""获取冗余秒数"""
return self._overlap_seconds_spin.value()
def set_overlap_seconds(self, seconds: int):
"""设置冗余秒数"""
self._overlap_seconds_spin.setValue(seconds)
def get_window_start(self) -> str:
"""获取开始时间ISO格式"""
return self._start_datetime.dateTime().toString("yyyy-MM-dd HH:mm:ss")
def set_window_start(self, dt_str: str):
"""设置开始时间"""
dt = QDateTime.fromString(dt_str, "yyyy-MM-dd HH:mm:ss")
if dt.isValid():
self._start_datetime.setDateTime(dt)
def get_window_end(self) -> str:
"""获取结束时间ISO格式"""
return self._end_datetime.dateTime().toString("yyyy-MM-dd HH:mm:ss")
def set_window_end(self, dt_str: str):
"""设置结束时间"""
dt = QDateTime.fromString(dt_str, "yyyy-MM-dd HH:mm:ss")
if dt.isValid():
self._end_datetime.setDateTime(dt)
def get_window_split(self) -> str:
"""获取窗口切分模式"""
return self._split_combo.currentData()
def get_window_split_days(self) -> int:
"""获取按天切分天数"""
return int(self._split_days_combo.currentData())
def set_window_split(self, split: str):
"""设置窗口切分模式"""
index = self._split_combo.findData(split)
if index >= 0:
self._split_combo.setCurrentIndex(index)
self._update_split_days_state()
def set_window_split_days(self, days: int):
"""设置按天切分天数"""
index = self._split_days_combo.findData(days)
if index >= 0:
self._split_days_combo.setCurrentIndex(index)
def get_config(self) -> dict:
"""获取完整配置字典"""
split_unit = self.get_window_split()
split_days = self.get_window_split_days()
return {
"pipeline": self._pipeline_id,
"processing_mode": self._processing_mode,
"fetch_before_verify": self._fetch_before_verify,
"skip_ods_when_fetch_before_verify": self._skip_ods_when_fetch_before_verify,
"ods_use_local_json": self._ods_use_local_json,
"window_mode": self._window_mode,
"lookback_hours": self.get_lookback_hours(),
"overlap_seconds": self.get_overlap_seconds(),
"window_start": self.get_window_start(),
"window_end": self.get_window_end(),
"window_split": split_unit,
"window_split_days": split_days,
}
def set_config(self, config: dict):
"""从配置字典恢复设置"""
if "pipeline" in config:
self.set_pipeline_id(config["pipeline"])
if "processing_mode" in config:
self.set_processing_mode(config["processing_mode"])
if "fetch_before_verify" in config:
self.set_fetch_before_verify(config["fetch_before_verify"])
if "skip_ods_when_fetch_before_verify" in config:
self.set_skip_ods_when_fetch_before_verify(config["skip_ods_when_fetch_before_verify"])
if "ods_use_local_json" in config:
self.set_ods_use_local_json(config["ods_use_local_json"])
if "window_mode" in config:
self.set_window_mode(config["window_mode"])
if "lookback_hours" in config:
self.set_lookback_hours(config["lookback_hours"])
if "overlap_seconds" in config:
self.set_overlap_seconds(config["overlap_seconds"])
if "window_start" in config:
self.set_window_start(config["window_start"])
if "window_end" in config:
self.set_window_end(config["window_end"])
if "window_split" in config:
self.set_window_split(config["window_split"])
if "window_split_days" in config and config["window_split_days"]:
self.set_window_split_days(config["window_split_days"])