# -*- coding: utf-8 -*- """任务配置面板""" from datetime import datetime, timedelta from PySide6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QGroupBox, QLabel, QLineEdit, QComboBox, QCheckBox, QPushButton, QPlainTextEdit, QListWidget, QListWidgetItem, QSplitter, QFrame, QFileDialog, QMessageBox, QScrollArea, QSpinBox, QTabWidget, QDateTimeEdit ) from PySide6.QtCore import Qt, Signal, QDateTime from PySide6.QtGui import QFont from ..models.task_model import TaskItem, TaskConfig, TaskCategory, TASK_CATEGORIES, get_task_category from ..utils.cli_builder import CLIBuilder from ..utils.app_settings import app_settings from ..workers.task_worker import TaskWorker # ODS 自动更新任务(从 API 抓取) AUTO_UPDATE_TASKS = [ "ODS_PAYMENT", "ODS_MEMBER", "ODS_MEMBER_CARD", "ODS_MEMBER_BALANCE", "ODS_SETTLEMENT_RECORDS", "ODS_TABLE_USE", "ODS_ASSISTANT_ACCOUNT", "ODS_ASSISTANT_LEDGER", "ODS_ASSISTANT_ABOLISH", "ODS_REFUND", "ODS_PLATFORM_COUPON", "ODS_RECHARGE_SETTLE", "ODS_SETTLEMENT_TICKET", ] # 数据校验相关任务 INTEGRITY_CHECK_TASKS = [ "DATA_INTEGRITY_CHECK", ] # 所有可用任务 ALL_TASKS = [ # ODS 任务 ("ODS_PAYMENT", "支付流水", "抓取支付记录到 ODS"), ("ODS_MEMBER", "会员档案", "抓取会员信息到 ODS"), ("ODS_MEMBER_CARD", "会员储值卡", "抓取会员卡信息到 ODS"), ("ODS_MEMBER_BALANCE", "会员余额变动", "抓取会员余额变动到 ODS"), ("ODS_SETTLEMENT_RECORDS", "结账记录", "抓取结账记录到 ODS"), ("ODS_TABLE_USE", "台费计费流水", "抓取台费使用记录到 ODS"), ("ODS_ASSISTANT_ACCOUNT", "助教账号", "抓取助教信息到 ODS"), ("ODS_ASSISTANT_LEDGER", "助教流水", "抓取助教服务流水到 ODS"), ("ODS_ASSISTANT_ABOLISH", "助教作废", "抓取助教作废记录到 ODS"), ("ODS_REFUND", "退款流水", "抓取退款记录到 ODS"), ("ODS_PLATFORM_COUPON", "平台券核销", "抓取券核销记录到 ODS"), ("ODS_RECHARGE_SETTLE", "充值结算", "抓取充值结算记录到 ODS"), ("ODS_GROUP_PACKAGE", "团购套餐", "抓取团购套餐定义到 ODS"), ("ODS_GROUP_BUY_REDEMPTION", "团购核销", "抓取团购核销记录到 ODS"), ("ODS_INVENTORY_STOCK", "库存汇总", "抓取库存汇总到 ODS"), ("ODS_INVENTORY_CHANGE", "库存变化", "抓取库存变动记录到 ODS"), ("ODS_TABLES", "台桌维表", "抓取台桌信息到 ODS"), ("ODS_GOODS_CATEGORY", "商品分类", "抓取商品分类到 ODS"), ("ODS_STORE_GOODS", "门店商品", "抓取门店商品档案到 ODS"), ("ODS_STORE_GOODS_SALES", "商品销售流水", "抓取商品销售流水到 ODS"), ("ODS_TABLE_FEE_DISCOUNT", "台费折扣", "抓取台费折扣记录到 ODS"), ("ODS_TENANT_GOODS", "租户商品", "抓取租户商品档案到 ODS"), ("ODS_SETTLEMENT_TICKET", "结账小票", "抓取结账小票详情到 ODS"), # DWD/DWS 任务 ("DWD_LOAD_FROM_ODS", "ODS→DWD 装载", "从 ODS 增量装载到 DWD"), ("DWD_QUALITY_CHECK", "DWD 质量检查", "执行 DWD 数据质量检查"), ("DWS_BUILD_ORDER_SUMMARY", "构建订单汇总", "重算 DWS 订单汇总表"), # Schema 初始化 ("INIT_ODS_SCHEMA", "初始化 ODS Schema", "创建/重建 ODS 表结构"), ("INIT_DWD_SCHEMA", "初始化 DWD Schema", "创建/重建 DWD 表结构"), ("INIT_DWS_SCHEMA", "初始化 DWS Schema", "创建/重建 DWS 表结构"), # 其他任务 ("MANUAL_INGEST", "手工数据灌入", "从本地 JSON 回放入库"), ("CHECK_CUTOFF", "检查 Cutoff", "查看各表数据截止时间"), ("DATA_INTEGRITY_CHECK", "数据完整性检查", "检查 ODS/DWD 数据完整性"), ] class TaskPanel(QWidget): """任务配置面板""" # 信号 task_started = Signal(str) # 任务开始信号 task_finished = Signal(bool, str) # 任务完成信号 (success, message) log_message = Signal(str) # 日志消息信号 add_to_queue = Signal(object) # 添加到队列信号 (TaskConfig) create_schedule = Signal(str, list, dict) # 创建调度任务信号 (name, task_codes, task_config) def __init__(self, parent=None): super().__init__(parent) self.cli_builder = CLIBuilder() self.worker = None self._init_ui() self._connect_signals() self.refresh_tasks() self._load_settings() # 启动时加载保存的设置 def _init_ui(self): """初始化界面""" layout = QVBoxLayout(self) layout.setContentsMargins(16, 16, 16, 16) layout.setSpacing(16) # 标题 title = QLabel("任务配置") title.setProperty("heading", True) layout.addWidget(title) # 创建分割器 splitter = QSplitter(Qt.Horizontal) layout.addWidget(splitter, 1) # 左侧:任务选择 left_widget = self._create_task_selection() splitter.addWidget(left_widget) # 右侧:参数配置 right_widget = self._create_config_area() splitter.addWidget(right_widget) # 设置分割比例 splitter.setSizes([400, 600]) # 底部:CLI 预览和执行按钮 bottom_widget = self._create_bottom_area() layout.addWidget(bottom_widget) def _create_task_selection(self) -> QWidget: """创建任务选择区域""" widget = QWidget() layout = QVBoxLayout(widget) layout.setContentsMargins(0, 0, 8, 0) # 任务分类过滤 filter_layout = QHBoxLayout() filter_layout.addWidget(QLabel("分类:")) self.category_combo = QComboBox() self.category_combo.addItem("全部", None) self.category_combo.addItem("ODS 数据抓取", TaskCategory.ODS) self.category_combo.addItem("DWD 装载", TaskCategory.DWD) self.category_combo.addItem("DWS 汇总", TaskCategory.DWS) self.category_combo.addItem("Schema 初始化", TaskCategory.SCHEMA) self.category_combo.addItem("质量检查", TaskCategory.QUALITY) self.category_combo.addItem("其他", TaskCategory.OTHER) filter_layout.addWidget(self.category_combo, 1) layout.addLayout(filter_layout) # 快捷操作按钮 btn_layout = QHBoxLayout() self.select_all_btn = QPushButton("全选") self.select_all_btn.setProperty("secondary", True) self.deselect_all_btn = QPushButton("全不选") self.deselect_all_btn.setProperty("secondary", True) btn_layout.addWidget(self.select_all_btn) btn_layout.addWidget(self.deselect_all_btn) layout.addLayout(btn_layout) # 任务列表 self.task_list = QListWidget() self.task_list.setSelectionMode(QListWidget.MultiSelection) layout.addWidget(self.task_list, 1) # 已选任务数 self.selected_count_label = QLabel("已选: 0 个任务") self.selected_count_label.setProperty("subheading", True) layout.addWidget(self.selected_count_label) return widget def _create_config_area(self) -> QWidget: """创建参数配置区域""" # 使用选项卡区分快捷操作和高级配置 tab_widget = QTabWidget() # 快捷操作选项卡 quick_tab = self._create_quick_actions_tab() tab_widget.addTab(quick_tab, "快捷操作") # 高级配置选项卡 advanced_tab = self._create_advanced_config_tab() tab_widget.addTab(advanced_tab, "高级配置") return tab_widget def _create_quick_actions_tab(self) -> QWidget: """创建快捷操作选项卡""" scroll_area = QScrollArea() scroll_area.setWidgetResizable(True) scroll_area.setFrameShape(QFrame.NoFrame) widget = QWidget() layout = QVBoxLayout(widget) layout.setContentsMargins(8, 8, 8, 8) layout.setSpacing(16) # ====== 自动更新 ====== auto_update_group = QGroupBox("自动更新(API → ODS)") auto_update_layout = QVBoxLayout(auto_update_group) # 说明 desc_label = QLabel("从 API 抓取数据并更新 ODS 层,适用于日常增量更新") desc_label.setProperty("subheading", True) desc_label.setWordWrap(True) auto_update_layout.addWidget(desc_label) # 时间范围 time_layout = QHBoxLayout() time_layout.addWidget(QLabel("时间范围:")) self.auto_update_hours = QSpinBox() self.auto_update_hours.setRange(1, 720) # 1小时 - 30天 self.auto_update_hours.setValue(24) self.auto_update_hours.setSuffix(" 小时前") time_layout.addWidget(self.auto_update_hours) time_layout.addWidget(QLabel("到 当前")) time_layout.addStretch() auto_update_layout.addLayout(time_layout) # 冗余窗口 overlap_layout = QHBoxLayout() overlap_layout.addWidget(QLabel("冗余窗口:")) self.overlap_seconds = QSpinBox() self.overlap_seconds.setRange(0, 7200) self.overlap_seconds.setValue(3600) self.overlap_seconds.setSuffix(" 秒") self.overlap_seconds.setToolTip("向前多抓取的时间,避免边界数据丢失") overlap_layout.addWidget(self.overlap_seconds) overlap_layout.addStretch() auto_update_layout.addLayout(overlap_layout) # 任务选择 task_header_layout = QHBoxLayout() task_label = QLabel("包含任务:") task_header_layout.addWidget(task_label) task_header_layout.addStretch() self.auto_update_select_all_btn = QPushButton("全选") self.auto_update_select_all_btn.setProperty("secondary", True) self.auto_update_select_all_btn.setFixedWidth(60) task_header_layout.addWidget(self.auto_update_select_all_btn) self.auto_update_deselect_all_btn = QPushButton("全不选") self.auto_update_deselect_all_btn.setProperty("secondary", True) self.auto_update_deselect_all_btn.setFixedWidth(60) task_header_layout.addWidget(self.auto_update_deselect_all_btn) auto_update_layout.addLayout(task_header_layout) self.auto_update_tasks_list = QListWidget() self.auto_update_tasks_list.setSelectionMode(QListWidget.MultiSelection) self.auto_update_tasks_list.setMaximumHeight(150) for task_code in AUTO_UPDATE_TASKS: item = QListWidgetItem(task_code) item.setSelected(True) # 默认全选 self.auto_update_tasks_list.addItem(item) auto_update_layout.addWidget(self.auto_update_tasks_list) # 包含 DWD 装载 self.include_dwd_check = QCheckBox("完成后执行 DWD 装载 (ODS → DWD)") self.include_dwd_check.setChecked(True) auto_update_layout.addWidget(self.include_dwd_check) # 自动校验 self.auto_verify_check = QCheckBox("完成后执行数据校验") self.auto_verify_check.setToolTip("ETL 导入完成后,自动执行数据完整性校验") self.auto_verify_check.setChecked(False) auto_update_layout.addWidget(self.auto_verify_check) # 操作按钮 auto_btn_layout = QHBoxLayout() auto_btn_layout.addStretch() self.auto_update_run_btn = QPushButton("立即执行一次") self.auto_update_run_btn.setToolTip("添加到任务队列并立即执行") auto_btn_layout.addWidget(self.auto_update_run_btn) self.auto_update_schedule_btn = QPushButton("创建调度任务") self.auto_update_schedule_btn.setToolTip("添加到定时调度中,可设置执行周期") auto_btn_layout.addWidget(self.auto_update_schedule_btn) auto_update_layout.addLayout(auto_btn_layout) layout.addWidget(auto_update_group) # ====== 数据校验 ====== integrity_group = QGroupBox("数据校验(API vs ODS)") integrity_layout = QVBoxLayout(integrity_group) # 说明 int_desc = QLabel("对比 API 数据与 ODS 数据,检查是否有缺失或不一致") int_desc.setProperty("subheading", True) int_desc.setWordWrap(True) integrity_layout.addWidget(int_desc) # 校验模式 mode_layout = QHBoxLayout() mode_layout.addWidget(QLabel("校验模式:")) self.integrity_mode_combo = QComboBox() self.integrity_mode_combo.addItem("历史全量校验", "history") self.integrity_mode_combo.addItem("最近增量校验", "recent") mode_layout.addWidget(self.integrity_mode_combo, 1) integrity_layout.addLayout(mode_layout) # 时间范围 - 历史模式 self.history_range_widget = QWidget() history_layout = QGridLayout(self.history_range_widget) history_layout.setContentsMargins(0, 0, 0, 0) history_layout.addWidget(QLabel("开始日期:"), 0, 0) self.integrity_start_date = QDateTimeEdit() self.integrity_start_date.setDisplayFormat("yyyy-MM-dd") self.integrity_start_date.setCalendarPopup(True) self.integrity_start_date.setDateTime(QDateTime.currentDateTime().addMonths(-6)) history_layout.addWidget(self.integrity_start_date, 0, 1) history_layout.addWidget(QLabel("结束日期:"), 1, 0) self.integrity_end_date = QDateTimeEdit() self.integrity_end_date.setDisplayFormat("yyyy-MM-dd") self.integrity_end_date.setCalendarPopup(True) self.integrity_end_date.setDateTime(QDateTime.currentDateTime()) history_layout.addWidget(self.integrity_end_date, 1, 1) integrity_layout.addWidget(self.history_range_widget) # 时间范围 - 最近模式 self.recent_range_widget = QWidget() recent_layout = QHBoxLayout(self.recent_range_widget) recent_layout.setContentsMargins(0, 0, 0, 0) recent_layout.addWidget(QLabel("回溯时间:")) self.integrity_hours = QSpinBox() self.integrity_hours.setRange(1, 168) # 1小时 - 7天 self.integrity_hours.setValue(24) self.integrity_hours.setSuffix(" 小时") recent_layout.addWidget(self.integrity_hours) recent_layout.addStretch() self.recent_range_widget.setVisible(False) integrity_layout.addWidget(self.recent_range_widget) # 校验选项 self.include_dimensions_check = QCheckBox("包含维度表校验") integrity_layout.addWidget(self.include_dimensions_check) # 自动补全选项 self.auto_backfill_check = QCheckBox("校验后自动补全丢失数据") self.auto_backfill_check.setToolTip("如果发现丢失数据,自动从 API 重新获取并补全到 ODS") integrity_layout.addWidget(self.auto_backfill_check) # 指定 ODS 任务 ods_task_layout = QHBoxLayout() ods_task_layout.addWidget(QLabel("指定任务 (可选):")) self.integrity_ods_tasks = QLineEdit() self.integrity_ods_tasks.setPlaceholderText("例: ODS_PAYMENT,ODS_MEMBER (留空=全部)") ods_task_layout.addWidget(self.integrity_ods_tasks, 1) integrity_layout.addLayout(ods_task_layout) # 操作按钮 int_btn_layout = QHBoxLayout() int_btn_layout.addStretch() self.integrity_run_btn = QPushButton("立即执行一次") self.integrity_run_btn.setToolTip("添加到任务队列并立即执行") int_btn_layout.addWidget(self.integrity_run_btn) self.integrity_schedule_btn = QPushButton("创建调度任务") self.integrity_schedule_btn.setToolTip("添加到定时调度中,可设置执行周期") int_btn_layout.addWidget(self.integrity_schedule_btn) integrity_layout.addLayout(int_btn_layout) layout.addWidget(integrity_group) # 弹性空间 layout.addStretch() scroll_area.setWidget(widget) return scroll_area def _create_advanced_config_tab(self) -> QWidget: """创建高级配置选项卡""" scroll_area = QScrollArea() scroll_area.setWidgetResizable(True) scroll_area.setFrameShape(QFrame.NoFrame) widget = QWidget() layout = QVBoxLayout(widget) layout.setContentsMargins(8, 8, 8, 8) # Pipeline 流程配置 pipeline_group = QGroupBox("流水线配置") pipeline_layout = QGridLayout(pipeline_group) pipeline_layout.addWidget(QLabel("运行模式:"), 0, 0) self.pipeline_flow_combo = QComboBox() self.pipeline_flow_combo.addItem("FULL - 在线抓取 + 入库", "FULL") self.pipeline_flow_combo.addItem("FETCH_ONLY - 仅在线抓取落盘", "FETCH_ONLY") self.pipeline_flow_combo.addItem("INGEST_ONLY - 仅本地 JSON 入库", "INGEST_ONLY") pipeline_layout.addWidget(self.pipeline_flow_combo, 0, 1) self.dry_run_check = QCheckBox("Dry-run 模式(不提交数据库)") pipeline_layout.addWidget(self.dry_run_check, 1, 0, 1, 2) layout.addWidget(pipeline_group) # 时间窗口配置 window_group = QGroupBox("时间窗口(可选)") window_layout = QGridLayout(window_group) window_layout.addWidget(QLabel("开始时间:"), 0, 0) self.window_start_edit = QLineEdit() self.window_start_edit.setPlaceholderText("例: 2025-07-01 00:00:00") window_layout.addWidget(self.window_start_edit, 0, 1) window_layout.addWidget(QLabel("结束时间:"), 1, 0) self.window_end_edit = QLineEdit() self.window_end_edit.setPlaceholderText("例: 2025-08-01 00:00:00") window_layout.addWidget(self.window_end_edit, 1, 1) # 窗口切分选项 window_layout.addWidget(QLabel("切分模式:"), 2, 0) self.window_split_combo = QComboBox() self.window_split_combo.addItem("不切分", "none") self.window_split_combo.addItem("按月切分", "month") self.window_split_combo.setToolTip("长时间窗口按月切分执行,避免单次请求过大") window_layout.addWidget(self.window_split_combo, 2, 1) window_layout.addWidget(QLabel("补偿小时:"), 3, 0) self.window_compensation_spin = QSpinBox() self.window_compensation_spin.setRange(0, 168) # 最多7天 self.window_compensation_spin.setValue(0) self.window_compensation_spin.setSuffix(" 小时") self.window_compensation_spin.setToolTip("窗口前后扩展的小时数,用于捕获边界数据") window_layout.addWidget(self.window_compensation_spin, 3, 1) layout.addWidget(window_group) # 数据源配置 source_group = QGroupBox("数据源配置(INGEST_ONLY 模式)") source_layout = QGridLayout(source_group) source_layout.addWidget(QLabel("JSON 目录:"), 0, 0) self.ingest_source_edit = QLineEdit() self.ingest_source_edit.setPlaceholderText("本地 JSON 文件目录") source_layout.addWidget(self.ingest_source_edit, 0, 1) self.browse_btn = QPushButton("浏览...") self.browse_btn.setProperty("secondary", True) source_layout.addWidget(self.browse_btn, 0, 2) layout.addWidget(source_group) # 覆盖配置(高级) override_group = QGroupBox("覆盖配置(可选)") override_layout = QGridLayout(override_group) override_layout.addWidget(QLabel("门店 ID:"), 0, 0) self.store_id_edit = QLineEdit() self.store_id_edit.setPlaceholderText("使用 .env 中的配置") override_layout.addWidget(self.store_id_edit, 0, 1) override_layout.addWidget(QLabel("数据库 DSN:"), 1, 0) self.pg_dsn_edit = QLineEdit() self.pg_dsn_edit.setPlaceholderText("使用 .env 中的配置") override_layout.addWidget(self.pg_dsn_edit, 1, 1) override_layout.addWidget(QLabel("API Token:"), 2, 0) self.api_token_edit = QLineEdit() self.api_token_edit.setPlaceholderText("使用 .env 中的配置") self.api_token_edit.setEchoMode(QLineEdit.Password) override_layout.addWidget(self.api_token_edit, 2, 1) layout.addWidget(override_group) # 弹性空间 layout.addStretch() scroll_area.setWidget(widget) return scroll_area def _create_bottom_area(self) -> QWidget: """创建底部区域""" widget = QWidget() layout = QVBoxLayout(widget) layout.setContentsMargins(0, 0, 0, 0) # CLI 预览 preview_group = QGroupBox("命令行预览") preview_layout = QVBoxLayout(preview_group) self.cli_preview = QPlainTextEdit() self.cli_preview.setReadOnly(True) self.cli_preview.setMaximumHeight(80) self.cli_preview.setFont(QFont("Consolas", 10)) preview_layout.addWidget(self.cli_preview) layout.addWidget(preview_group) # 执行按钮 btn_layout = QHBoxLayout() btn_layout.addStretch() self.add_to_queue_btn = QPushButton("添加到队列") self.add_to_queue_btn.setProperty("secondary", True) btn_layout.addWidget(self.add_to_queue_btn) self.run_btn = QPushButton("立即执行") self.run_btn.setFixedWidth(120) btn_layout.addWidget(self.run_btn) self.stop_btn = QPushButton("停止") self.stop_btn.setProperty("danger", True) self.stop_btn.setEnabled(False) self.stop_btn.setFixedWidth(80) btn_layout.addWidget(self.stop_btn) layout.addLayout(btn_layout) return widget def _connect_signals(self): """连接信号""" # 分类过滤 self.category_combo.currentIndexChanged.connect(self._filter_tasks) # 任务选择 self.task_list.itemSelectionChanged.connect(self._on_selection_changed) self.select_all_btn.clicked.connect(self._select_all) self.deselect_all_btn.clicked.connect(self._deselect_all) # 配置变化 self.pipeline_flow_combo.currentIndexChanged.connect(self._update_preview) self.dry_run_check.stateChanged.connect(self._update_preview) self.window_start_edit.textChanged.connect(self._update_preview) self.window_end_edit.textChanged.connect(self._update_preview) self.ingest_source_edit.textChanged.connect(self._update_preview) self.store_id_edit.textChanged.connect(self._update_preview) self.pg_dsn_edit.textChanged.connect(self._update_preview) self.api_token_edit.textChanged.connect(self._update_preview) # 浏览目录 self.browse_btn.clicked.connect(self._browse_source_dir) # 执行按钮 self.run_btn.clicked.connect(self._run_task) self.stop_btn.clicked.connect(self._stop_task) self.add_to_queue_btn.clicked.connect(self._add_task_to_queue) # 快捷操作 - 自动更新 self.auto_update_run_btn.clicked.connect(self._run_auto_update_now) self.auto_update_schedule_btn.clicked.connect(self._create_auto_update_schedule) self.auto_update_select_all_btn.clicked.connect(self._auto_update_select_all) self.auto_update_deselect_all_btn.clicked.connect(self._auto_update_deselect_all) # 快捷操作 - 数据校验 self.integrity_run_btn.clicked.connect(self._run_integrity_check_now) self.integrity_schedule_btn.clicked.connect(self._create_integrity_schedule) self.integrity_mode_combo.currentIndexChanged.connect(self._on_integrity_mode_changed) # 保存设置的信号连接 self.auto_update_hours.valueChanged.connect(self._save_auto_update_settings) self.overlap_seconds.valueChanged.connect(self._save_auto_update_settings) self.include_dwd_check.stateChanged.connect(self._save_auto_update_settings) self.auto_verify_check.stateChanged.connect(self._save_auto_update_settings) self.integrity_mode_combo.currentIndexChanged.connect(self._save_integrity_settings) self.integrity_start_date.dateTimeChanged.connect(self._save_integrity_settings) self.integrity_end_date.dateTimeChanged.connect(self._save_integrity_settings) self.integrity_hours.valueChanged.connect(self._save_integrity_settings) self.include_dimensions_check.stateChanged.connect(self._save_integrity_settings) self.auto_backfill_check.stateChanged.connect(self._save_integrity_settings) self.integrity_ods_tasks.textChanged.connect(self._save_integrity_settings) self.pipeline_flow_combo.currentIndexChanged.connect(self._save_advanced_settings) self.dry_run_check.stateChanged.connect(self._save_advanced_settings) self.window_split_combo.currentIndexChanged.connect(self._save_advanced_settings) self.window_compensation_spin.valueChanged.connect(self._save_advanced_settings) def refresh_tasks(self): """刷新任务列表""" self.task_list.clear() current_category = self.category_combo.currentData() for code, name, desc in ALL_TASKS: category = get_task_category(code) # 应用分类过滤 if current_category is not None and category != current_category: continue item = QListWidgetItem(f"{name} ({code})") item.setData(Qt.UserRole, code) item.setToolTip(desc) self.task_list.addItem(item) self._on_selection_changed() def _filter_tasks(self): """过滤任务列表""" self.refresh_tasks() def _on_selection_changed(self): """选择变化时""" selected = self.task_list.selectedItems() self.selected_count_label.setText(f"已选: {len(selected)} 个任务") self._update_preview() def _select_all(self): """全选""" self.task_list.selectAll() def _deselect_all(self): """全不选""" self.task_list.clearSelection() def _browse_source_dir(self): """浏览数据源目录""" dir_path = QFileDialog.getExistingDirectory( self, "选择 JSON 数据目录" ) if dir_path: self.ingest_source_edit.setText(dir_path) def _get_config(self) -> TaskConfig: """获取当前配置""" # 获取选中的任务 selected_tasks = [] for item in self.task_list.selectedItems(): task_code = item.data(Qt.UserRole) if task_code: selected_tasks.append(task_code) # 构建环境变量(窗口切分参数) env_vars = {} split_unit = self.window_split_combo.currentData() or "none" compensation = self.window_compensation_spin.value() if split_unit and split_unit != "none": env_vars["WINDOW_SPLIT_UNIT"] = split_unit if compensation > 0: env_vars["WINDOW_COMPENSATION_HOURS"] = str(compensation) # 构建配置 config = TaskConfig( tasks=selected_tasks, pipeline_flow=self.pipeline_flow_combo.currentData(), dry_run=self.dry_run_check.isChecked(), window_start=self.window_start_edit.text().strip() or None, window_end=self.window_end_edit.text().strip() or None, window_split=split_unit, window_compensation=compensation, ingest_source=self.ingest_source_edit.text().strip() or None, store_id=int(self.store_id_edit.text()) if self.store_id_edit.text().strip().isdigit() else None, pg_dsn=self.pg_dsn_edit.text().strip() or None, api_token=self.api_token_edit.text().strip() or None, env_vars=env_vars, ) return config def _update_preview(self): """更新命令行预览""" config = self._get_config() cmd_str = self.cli_builder.build_command_string(config) self.cli_preview.setPlainText(cmd_str) def _run_task(self): """执行任务""" config = self._get_config() if not config.tasks: QMessageBox.warning(self, "提示", "请至少选择一个任务") return # 创建工作线程 cmd = self.cli_builder.build_command(config) self.worker = TaskWorker(cmd) # 连接信号 self.worker.output_received.connect(self._on_output) self.worker.task_finished.connect(self._on_finished) self.worker.error_occurred.connect(self._on_error) # 更新 UI 状态 self.run_btn.setEnabled(False) self.stop_btn.setEnabled(True) # 发送开始信号 task_info = ",".join(config.tasks[:3]) if len(config.tasks) > 3: task_info += f" 等{len(config.tasks)}个任务" self.task_started.emit(task_info) # 启动 self.worker.start() def _stop_task(self): """停止任务""" if self.worker and self.worker.isRunning(): self.worker.stop() self.log_message.emit("[GUI] 正在停止任务...") def _on_output(self, line: str): """收到输出""" self.log_message.emit(line) def _on_finished(self, exit_code: int, summary: str): """任务完成""" self.run_btn.setEnabled(True) self.stop_btn.setEnabled(False) success = exit_code == 0 message = summary if summary else ("任务执行成功" if success else f"任务执行失败 (exit={exit_code})") self.task_finished.emit(success, message) if success: self.log_message.emit(f"[GUI] 任务完成: {message}") else: self.log_message.emit(f"[GUI] 任务失败: {message}") def _on_error(self, error: str): """发生错误""" self.log_message.emit(f"[GUI] 错误: {error}") QMessageBox.critical(self, "执行错误", error) def is_running(self) -> bool: """是否正在执行任务""" return self.worker is not None and self.worker.isRunning() def _add_task_to_queue(self): """将任务列表中选中的任务添加到队列""" config = self._get_config() if not config.tasks: QMessageBox.warning(self, "提示", "请至少选择一个任务") return # 发送信号添加到队列 self.add_to_queue.emit(config) task_info = ",".join(config.tasks[:3]) if len(config.tasks) > 3: task_info += f" 等{len(config.tasks)}个" self.log_message.emit(f"[GUI] 已添加到任务队列: {task_info}") QMessageBox.information(self, "提示", f"已添加到任务队列\n\n任务: {task_info}\n\n请切换到「任务管理」查看和执行") def _on_integrity_mode_changed(self, index: int): """校验模式变化""" mode = self.integrity_mode_combo.currentData() self.history_range_widget.setVisible(mode == "history") self.recent_range_widget.setVisible(mode == "recent") def _auto_update_select_all(self): """自动更新任务列表全选""" self.auto_update_tasks_list.selectAll() self._save_auto_update_settings() def _auto_update_deselect_all(self): """自动更新任务列表全不选""" self.auto_update_tasks_list.clearSelection() self._save_auto_update_settings() def _get_auto_update_tasks(self) -> list: """获取自动更新选中的任务""" selected_tasks = [] for i in range(self.auto_update_tasks_list.count()): item = self.auto_update_tasks_list.item(i) if item.isSelected(): selected_tasks.append(item.text()) if self.include_dwd_check.isChecked(): selected_tasks.append("DWD_LOAD_FROM_ODS") return selected_tasks def _get_auto_update_config(self) -> dict: """获取自动更新配置""" return { "pipeline_flow": "FULL", "lookback_hours": self.auto_update_hours.value(), "overlap_seconds": self.overlap_seconds.value(), } def _run_auto_update_now(self): """立即执行自动更新(添加到队列)""" selected_tasks = self._get_auto_update_tasks() if not selected_tasks: QMessageBox.warning(self, "提示", "请至少选择一个任务") return hours = self.auto_update_hours.value() overlap = self.overlap_seconds.value() include_dwd = self.include_dwd_check.isChecked() auto_verify = self.auto_verify_check.isChecked() now = datetime.now() start_time = now - timedelta(hours=hours, seconds=overlap) # 构建完整任务列表 all_tasks = selected_tasks.copy() # 添加 DWD 装载 if include_dwd and "DWD_LOAD_FROM_ODS" not in all_tasks: all_tasks.append("DWD_LOAD_FROM_ODS") # 添加数据校验 if auto_verify and "DATA_INTEGRITY_CHECK" not in all_tasks: all_tasks.append("DATA_INTEGRITY_CHECK") # 构建环境变量(用于校验任务) env_vars = {} if auto_verify: env_vars["INTEGRITY_MODE"] = "window" env_vars["INTEGRITY_AUTO_BACKFILL"] = "1" # 自动补全丢失数据 config = TaskConfig( tasks=all_tasks, pipeline_flow="FULL", window_start=start_time.strftime("%Y-%m-%d %H:%M:%S"), window_end=now.strftime("%Y-%m-%d %H:%M:%S"), env_vars=env_vars, ) # 更新预览 cmd_str = self.cli_builder.build_command_string(config) self.cli_preview.setPlainText(cmd_str) # 构建任务描述 desc_parts = [f"自动更新 ({hours}h)"] if include_dwd: desc_parts.append("+ DWD") if auto_verify: desc_parts.append("+ 校验") task_desc = " ".join(desc_parts) # 发送信号添加到队列 self.add_to_queue.emit(config) self.log_message.emit(f"[GUI] 已添加到任务队列: {task_desc}") QMessageBox.information(self, "提示", f"已添加到任务队列\n\n任务: {task_desc}\n包含: {len(all_tasks)} 个任务\n\n请切换到「任务管理」查看和执行") def _create_auto_update_schedule(self): """创建自动更新的调度任务""" selected_tasks = self._get_auto_update_tasks() if not selected_tasks: QMessageBox.warning(self, "提示", "请至少选择一个任务") return hours = self.auto_update_hours.value() task_config = self._get_auto_update_config() # 发送信号创建调度任务 self.create_schedule.emit( f"自动更新 ({hours}h)", selected_tasks, task_config ) self.log_message.emit(f"[GUI] 创建调度任务: 自动更新 ({hours}h)") def _get_integrity_config(self) -> dict: """获取数据校验配置""" mode = self.integrity_mode_combo.currentData() config = { "pipeline_flow": "INGEST_ONLY", "integrity_mode": mode, "integrity_include_dimensions": self.include_dimensions_check.isChecked(), "integrity_auto_backfill": self.auto_backfill_check.isChecked(), } if mode == "history": config["integrity_history_start"] = self.integrity_start_date.dateTime().toString("yyyy-MM-dd") config["integrity_history_end"] = self.integrity_end_date.dateTime().toString("yyyy-MM-dd") else: config["lookback_hours"] = self.integrity_hours.value() ods_tasks = self.integrity_ods_tasks.text().strip() if ods_tasks: config["integrity_ods_task_codes"] = ods_tasks return config def _run_integrity_check_now(self): """立即执行数据校验(添加到队列)""" mode = self.integrity_mode_combo.currentData() int_config = self._get_integrity_config() # 通过环境变量传递 integrity 配置(CLI 不支持这些参数) env_vars = { "INTEGRITY_MODE": int_config.get("integrity_mode", "history"), "INTEGRITY_INCLUDE_DIMENSIONS": "1" if int_config.get("integrity_include_dimensions") else "0", "INTEGRITY_AUTO_BACKFILL": "1" if int_config.get("integrity_auto_backfill") else "0", } if mode == "history": env_vars["INTEGRITY_HISTORY_START"] = int_config.get("integrity_history_start") env_vars["INTEGRITY_HISTORY_END"] = int_config.get("integrity_history_end") desc = f"数据校验 ({int_config.get('integrity_history_start')} ~ {int_config.get('integrity_history_end')})" else: hours = int_config.get("lookback_hours", 24) now = datetime.now() start_time = now - timedelta(hours=hours) env_vars["INTEGRITY_HISTORY_START"] = start_time.strftime("%Y-%m-%d") env_vars["INTEGRITY_HISTORY_END"] = now.strftime("%Y-%m-%d") desc = f"数据校验 (最近 {hours}h)" if int_config.get("integrity_ods_task_codes"): env_vars["INTEGRITY_ODS_TASK_CODES"] = int_config.get("integrity_ods_task_codes") if int_config.get("integrity_auto_backfill"): desc += " + 自动补全" config = TaskConfig( tasks=["DATA_INTEGRITY_CHECK"], pipeline_flow="FULL", # 使用 FULL 因为需要 DB 连接 env_vars=env_vars, ) # 更新预览 cmd_str = self.cli_builder.build_command_string(config) self.cli_preview.setPlainText(cmd_str + f"\n\n# 环境变量:\n" + "\n".join(f"# {k}={v}" for k, v in env_vars.items())) # 发送信号添加到队列 self.add_to_queue.emit(config) self.log_message.emit(f"[GUI] 已添加到任务队列: {desc}") QMessageBox.information(self, "提示", f"已添加到任务队列\n\n任务: {desc}\n\n请切换到「任务管理」查看和执行") def _create_integrity_schedule(self): """创建数据校验的调度任务""" mode = self.integrity_mode_combo.currentData() task_config = self._get_integrity_config() if mode == "history": desc = f"数据校验 ({task_config.get('integrity_history_start')} ~ {task_config.get('integrity_history_end')})" else: hours = task_config.get("lookback_hours", 24) desc = f"数据校验 (最近 {hours}h)" # 发送信号创建调度任务 self.create_schedule.emit( desc, ["DATA_INTEGRITY_CHECK"], task_config ) self.log_message.emit(f"[GUI] 创建调度任务: {desc}") # ==================== 设置持久化 ==================== def _load_settings(self): """从持久化存储加载设置""" try: # 加载自动更新设置 self.auto_update_hours.setValue(app_settings.auto_update_hours) self.overlap_seconds.setValue(app_settings.auto_update_overlap_seconds) self.include_dwd_check.setChecked(app_settings.auto_update_include_dwd) self.auto_verify_check.setChecked(app_settings.auto_update_auto_verify) # 恢复自动更新任务选择 saved_tasks = app_settings.auto_update_selected_tasks if saved_tasks: for i in range(self.auto_update_tasks_list.count()): item = self.auto_update_tasks_list.item(i) item.setSelected(item.text() in saved_tasks) # 加载数据校验设置 mode = app_settings.integrity_mode mode_index = 0 if mode == "history" else 1 self.integrity_mode_combo.setCurrentIndex(mode_index) # 历史日期 if app_settings.integrity_history_start: try: start_date = QDateTime.fromString(app_settings.integrity_history_start, "yyyy-MM-dd") if start_date.isValid(): self.integrity_start_date.setDateTime(start_date) except Exception: pass if app_settings.integrity_history_end: try: end_date = QDateTime.fromString(app_settings.integrity_history_end, "yyyy-MM-dd") if end_date.isValid(): self.integrity_end_date.setDateTime(end_date) except Exception: pass self.integrity_hours.setValue(app_settings.integrity_lookback_hours) self.include_dimensions_check.setChecked(app_settings.integrity_include_dimensions) self.auto_backfill_check.setChecked(app_settings.integrity_auto_backfill) self.integrity_ods_tasks.setText(app_settings.integrity_ods_tasks) # 加载高级设置 pipeline_flow = app_settings.advanced_pipeline_flow flow_map = {"FULL": 0, "FETCH_ONLY": 1, "INGEST_ONLY": 2} self.pipeline_flow_combo.setCurrentIndex(flow_map.get(pipeline_flow, 0)) self.dry_run_check.setChecked(app_settings.advanced_dry_run) self.window_start_edit.setText(app_settings.advanced_window_start) self.window_end_edit.setText(app_settings.advanced_window_end) self.ingest_source_edit.setText(app_settings.advanced_ingest_source) # 加载窗口切分设置 split_map = {"none": 0, "month": 1} self.window_split_combo.setCurrentIndex(split_map.get(app_settings.advanced_window_split, 0)) self.window_compensation_spin.setValue(app_settings.advanced_window_compensation) # 更新 UI 状态 self._on_integrity_mode_changed(mode_index) except Exception as e: # 加载失败时不影响程序运行 print(f"加载设置失败: {e}") def _save_auto_update_settings(self): """保存自动更新设置""" try: app_settings.auto_update_hours = self.auto_update_hours.value() app_settings.auto_update_overlap_seconds = self.overlap_seconds.value() app_settings.auto_update_include_dwd = self.include_dwd_check.isChecked() app_settings.auto_update_auto_verify = self.auto_verify_check.isChecked() # 保存选中的任务 selected_tasks = [] for i in range(self.auto_update_tasks_list.count()): item = self.auto_update_tasks_list.item(i) if item.isSelected(): selected_tasks.append(item.text()) app_settings.auto_update_selected_tasks = selected_tasks except Exception as e: print(f"保存自动更新设置失败: {e}") def _save_integrity_settings(self): """保存数据校验设置""" try: mode = self.integrity_mode_combo.currentData() app_settings.integrity_mode = mode or "history" app_settings.integrity_history_start = self.integrity_start_date.dateTime().toString("yyyy-MM-dd") app_settings.integrity_history_end = self.integrity_end_date.dateTime().toString("yyyy-MM-dd") app_settings.integrity_lookback_hours = self.integrity_hours.value() app_settings.integrity_include_dimensions = self.include_dimensions_check.isChecked() app_settings.integrity_auto_backfill = self.auto_backfill_check.isChecked() app_settings.integrity_ods_tasks = self.integrity_ods_tasks.text().strip() except Exception as e: print(f"保存数据校验设置失败: {e}") def _save_advanced_settings(self): """保存高级设置""" try: app_settings.advanced_pipeline_flow = self.pipeline_flow_combo.currentData() or "FULL" app_settings.advanced_dry_run = self.dry_run_check.isChecked() app_settings.advanced_window_start = self.window_start_edit.text().strip() app_settings.advanced_window_end = self.window_end_edit.text().strip() app_settings.advanced_ingest_source = self.ingest_source_edit.text().strip() app_settings.advanced_window_split = self.window_split_combo.currentData() or "none" app_settings.advanced_window_compensation = self.window_compensation_spin.value() except Exception as e: print(f"保存高级设置失败: {e}")