# -*- coding: utf-8 -*- """任务管理器面板""" import uuid from datetime import datetime, timedelta from typing import Dict, List, Optional from PySide6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QSplitter, QGridLayout, QGroupBox, QLabel, QPushButton, QTableWidget, QTableWidgetItem, QHeaderView, QMessageBox, QMenu, QAbstractItemView, QDialog, QComboBox, QSpinBox, QLineEdit, QCheckBox, QTimeEdit, QDateEdit, QListWidget, QListWidgetItem, QDialogButtonBox, QTabWidget, QFrame, QTextEdit, QPlainTextEdit ) from PySide6.QtCore import Qt, Signal, QTimer, QTime, QDate from PySide6.QtGui import QColor, QAction, QFont from ..models.task_model import QueuedTask, TaskConfig, TaskStatus from ..models.schedule_model import ( ScheduledTask, ScheduleConfig, ScheduleType, IntervalUnit, ScheduleStore, ScheduleExecutionRecord ) from ..utils.cli_builder import CLIBuilder from ..utils.app_settings import app_settings from ..workers.task_worker import TaskWorker # 可调度的任务列表(包含所有 ODS 任务 + DWD/质量检查任务) SCHEDULABLE_TASKS = [ # ODS 数据抓取任务(与 task_panel.AUTO_UPDATE_TASKS 保持一致) ("ODS_PAYMENT", "支付流水"), ("ODS_MEMBER", "会员档案"), ("ODS_MEMBER_CARD", "会员储值卡"), ("ODS_MEMBER_BALANCE", "会员余额变动"), ("ODS_SETTLEMENT_RECORDS", "结账记录"), ("ODS_TABLE_USE", "台费计费流水"), ("ODS_ASSISTANT_ACCOUNT", "助教账号"), ("ODS_ASSISTANT_LEDGER", "助教流水"), ("ODS_ASSISTANT_ABOLISH", "助教作废"), ("ODS_REFUND", "退款流水"), ("ODS_PLATFORM_COUPON", "平台券核销"), ("ODS_RECHARGE_SETTLE", "充值结算"), ("ODS_SETTLEMENT_TICKET", "结账小票"), # DWD 和质量检查任务 ("DWD_LOAD_FROM_ODS", "ODS→DWD 装载"), ("DWD_QUALITY_CHECK", "DWD 质量检查"), ("DATA_INTEGRITY_CHECK", "数据完整性检查"), ("CHECK_CUTOFF", "检查 Cutoff"), ] class TaskLogDialog(QDialog): """任务日志查看对话框""" def __init__(self, task: QueuedTask, parent=None): super().__init__(parent) self.task = task self.setWindowTitle(f"任务日志 - {', '.join(task.config.tasks[:2])}") self.setMinimumSize(800, 600) self._init_ui() def _init_ui(self): layout = QVBoxLayout(self) # 任务信息 info_group = QGroupBox("任务信息") info_layout = QGridLayout(info_group) info_layout.addWidget(QLabel("任务 ID:"), 0, 0) info_layout.addWidget(QLabel(self.task.id), 0, 1) info_layout.addWidget(QLabel("任务列表:"), 0, 2) info_layout.addWidget(QLabel(", ".join(self.task.config.tasks)), 0, 3) info_layout.addWidget(QLabel("状态:"), 1, 0) status_label = QLabel(self._get_status_text(self.task.status)) status_label.setStyleSheet(f"color: {self._get_status_color(self.task.status)};") info_layout.addWidget(status_label, 1, 1) info_layout.addWidget(QLabel("退出码:"), 1, 2) info_layout.addWidget(QLabel(str(self.task.exit_code) if self.task.exit_code is not None else "-"), 1, 3) if self.task.started_at: info_layout.addWidget(QLabel("开始时间:"), 2, 0) info_layout.addWidget(QLabel(self.task.started_at.strftime("%Y-%m-%d %H:%M:%S")), 2, 1) if self.task.finished_at: info_layout.addWidget(QLabel("结束时间:"), 2, 2) info_layout.addWidget(QLabel(self.task.finished_at.strftime("%Y-%m-%d %H:%M:%S")), 2, 3) if self.task.started_at: duration = (self.task.finished_at - self.task.started_at).total_seconds() info_layout.addWidget(QLabel("耗时:"), 3, 0) info_layout.addWidget(QLabel(f"{duration:.1f} 秒"), 3, 1) layout.addWidget(info_group) # 命令行 cmd_group = QGroupBox("执行命令") cmd_layout = QVBoxLayout(cmd_group) cmd_text = QLineEdit() cmd_text.setReadOnly(True) from ..utils.cli_builder import CLIBuilder cli = CLIBuilder() cmd_text.setText(cli.build_command_string(self.task.config)) cmd_layout.addWidget(cmd_text) # 显示环境变量 if hasattr(self.task.config, 'env_vars') and self.task.config.env_vars: env_label = QLabel("环境变量: " + ", ".join(f"{k}={v}" for k, v in self.task.config.env_vars.items())) env_label.setWordWrap(True) cmd_layout.addWidget(env_label) layout.addWidget(cmd_group) # 输出日志 log_group = QGroupBox("执行输出") log_layout = QVBoxLayout(log_group) self.log_text = QPlainTextEdit() self.log_text.setReadOnly(True) self.log_text.setFont(QFont("Consolas", 9)) self.log_text.setPlainText(self.task.output if self.task.output else "(无输出)") log_layout.addWidget(self.log_text) layout.addWidget(log_group) # 错误信息 if self.task.error: error_group = QGroupBox("错误信息") error_layout = QVBoxLayout(error_group) error_text = QPlainTextEdit() error_text.setReadOnly(True) error_text.setPlainText(self.task.error) error_text.setMaximumHeight(100) error_layout.addWidget(error_text) layout.addWidget(error_group) # 按钮 btn_layout = QHBoxLayout() copy_btn = QPushButton("复制日志") copy_btn.clicked.connect(self._copy_log) btn_layout.addWidget(copy_btn) btn_layout.addStretch() close_btn = QPushButton("关闭") close_btn.clicked.connect(self.accept) btn_layout.addWidget(close_btn) layout.addLayout(btn_layout) def _copy_log(self): """复制日志到剪贴板""" from PySide6.QtWidgets import QApplication QApplication.clipboard().setText(self.task.output or "") QMessageBox.information(self, "提示", "日志已复制到剪贴板") @staticmethod def _get_status_text(status: TaskStatus) -> str: return { TaskStatus.PENDING: "待执行", TaskStatus.RUNNING: "执行中", TaskStatus.SUCCESS: "成功", TaskStatus.FAILED: "失败", TaskStatus.CANCELLED: "已取消", }.get(status, "未知") @staticmethod def _get_status_color(status: TaskStatus) -> str: return { TaskStatus.PENDING: "#5f6368", TaskStatus.RUNNING: "#1a73e8", TaskStatus.SUCCESS: "#1e8e3e", TaskStatus.FAILED: "#d93025", TaskStatus.CANCELLED: "#9aa0a6", }.get(status, "#333333") class ScheduleEditDialog(QDialog): """调度任务编辑对话框""" def __init__(self, task: Optional[ScheduledTask] = None, parent=None): super().__init__(parent) self.task = task self.cli_builder = CLIBuilder() self.setWindowTitle("编辑调度任务" if task else "新建调度任务") self.setMinimumWidth(600) self.setMinimumHeight(700) self._init_ui() self._connect_preview_signals() if task: self._load_task(task) self._update_cli_preview() # 初始化预览 def _init_ui(self): layout = QVBoxLayout(self) # 基本信息 basic_group = QGroupBox("基本信息") basic_layout = QGridLayout(basic_group) basic_layout.addWidget(QLabel("任务名称:"), 0, 0) self.name_edit = QLineEdit() self.name_edit.setPlaceholderText("例: 每日数据更新") basic_layout.addWidget(self.name_edit, 0, 1) basic_layout.addWidget(QLabel("启用:"), 1, 0) self.enabled_check = QCheckBox("启用此调度任务") self.enabled_check.setChecked(True) basic_layout.addWidget(self.enabled_check, 1, 1) layout.addWidget(basic_group) # 任务选择 task_group = QGroupBox("执行任务") task_layout = QVBoxLayout(task_group) self.task_list = QListWidget() self.task_list.setSelectionMode(QListWidget.MultiSelection) self.task_list.setMaximumHeight(150) for code, name in SCHEDULABLE_TASKS: item = QListWidgetItem(f"{name} ({code})") item.setData(Qt.UserRole, code) self.task_list.addItem(item) task_layout.addWidget(self.task_list) layout.addWidget(task_group) # 调度设置 schedule_group = QGroupBox("调度设置") schedule_layout = QVBoxLayout(schedule_group) # 调度类型 type_layout = QHBoxLayout() type_layout.addWidget(QLabel("调度类型:")) self.schedule_type_combo = QComboBox() self.schedule_type_combo.addItem("固定间隔", ScheduleType.INTERVAL) self.schedule_type_combo.addItem("每天定时", ScheduleType.DAILY) self.schedule_type_combo.addItem("每周定时", ScheduleType.WEEKLY) self.schedule_type_combo.addItem("Cron 表达式", ScheduleType.CRON) self.schedule_type_combo.currentIndexChanged.connect(self._on_type_changed) type_layout.addWidget(self.schedule_type_combo, 1) schedule_layout.addLayout(type_layout) # 间隔设置 self.interval_widget = QWidget() interval_layout = QHBoxLayout(self.interval_widget) interval_layout.setContentsMargins(0, 0, 0, 0) interval_layout.addWidget(QLabel("执行间隔:")) self.interval_value = QSpinBox() self.interval_value.setRange(1, 999) self.interval_value.setValue(1) interval_layout.addWidget(self.interval_value) self.interval_unit = QComboBox() self.interval_unit.addItem("分钟", IntervalUnit.MINUTES) self.interval_unit.addItem("小时", IntervalUnit.HOURS) self.interval_unit.addItem("天", IntervalUnit.DAYS) self.interval_unit.setCurrentIndex(1) # 默认小时 interval_layout.addWidget(self.interval_unit) interval_layout.addStretch() schedule_layout.addWidget(self.interval_widget) # 每日设置 self.daily_widget = QWidget() daily_layout = QHBoxLayout(self.daily_widget) daily_layout.setContentsMargins(0, 0, 0, 0) daily_layout.addWidget(QLabel("执行时间:")) self.daily_time = QTimeEdit() self.daily_time.setTime(QTime(4, 0)) self.daily_time.setDisplayFormat("HH:mm") daily_layout.addWidget(self.daily_time) daily_layout.addStretch() self.daily_widget.setVisible(False) schedule_layout.addWidget(self.daily_widget) # 每周设置 self.weekly_widget = QWidget() weekly_layout = QVBoxLayout(self.weekly_widget) weekly_layout.setContentsMargins(0, 0, 0, 0) days_layout = QHBoxLayout() days_layout.addWidget(QLabel("执行日:")) self.day_checks = {} for i, day in enumerate(["一", "二", "三", "四", "五", "六", "日"], 1): check = QCheckBox(f"周{day}") check.setChecked(i == 1) # 默认周一 self.day_checks[i] = check days_layout.addWidget(check) weekly_layout.addLayout(days_layout) weekly_time_layout = QHBoxLayout() weekly_time_layout.addWidget(QLabel("执行时间:")) self.weekly_time = QTimeEdit() self.weekly_time.setTime(QTime(4, 0)) self.weekly_time.setDisplayFormat("HH:mm") weekly_time_layout.addWidget(self.weekly_time) weekly_time_layout.addStretch() weekly_layout.addLayout(weekly_time_layout) self.weekly_widget.setVisible(False) schedule_layout.addWidget(self.weekly_widget) # Cron 设置 self.cron_widget = QWidget() cron_layout = QHBoxLayout(self.cron_widget) cron_layout.setContentsMargins(0, 0, 0, 0) cron_layout.addWidget(QLabel("Cron:")) self.cron_edit = QLineEdit() self.cron_edit.setPlaceholderText("分 时 日 月 周 (例: 0 4 * * *)") self.cron_edit.setText("0 4 * * *") cron_layout.addWidget(self.cron_edit, 1) self.cron_widget.setVisible(False) schedule_layout.addWidget(self.cron_widget) layout.addWidget(schedule_group) # 任务配置 config_group = QGroupBox("任务配置") config_layout = QGridLayout(config_group) config_layout.addWidget(QLabel("运行模式:"), 0, 0) self.pipeline_combo = QComboBox() self.pipeline_combo.addItem("FULL - 在线抓取 + 入库", "FULL") self.pipeline_combo.addItem("INGEST_ONLY - 仅入库", "INGEST_ONLY") config_layout.addWidget(self.pipeline_combo, 0, 1) config_layout.addWidget(QLabel("回溯小时:"), 1, 0) self.lookback_hours = QSpinBox() self.lookback_hours.setRange(1, 720) self.lookback_hours.setValue(24) self.lookback_hours.setSuffix(" 小时") self.lookback_hours.setToolTip("每次执行时,抓取最近 N 小时的数据") config_layout.addWidget(self.lookback_hours, 1, 1) layout.addWidget(config_group) # CLI 命令行预览 cli_group = QGroupBox("命令行预览") cli_layout = QVBoxLayout(cli_group) self.cli_preview = QPlainTextEdit() self.cli_preview.setMaximumHeight(100) self.cli_preview.setFont(QFont("Consolas", 9)) self.cli_preview.setPlaceholderText("CLI 命令行将在此显示...") cli_layout.addWidget(self.cli_preview) # CLI 编辑提示和复制按钮 cli_btn_layout = QHBoxLayout() self.cli_editable_check = QCheckBox("允许手动编辑") self.cli_editable_check.setToolTip("勾选后可以手动修改命令行参数") self.cli_editable_check.stateChanged.connect(self._on_cli_editable_changed) cli_btn_layout.addWidget(self.cli_editable_check) cli_btn_layout.addStretch() self.copy_cli_btn = QPushButton("复制命令") self.copy_cli_btn.setProperty("secondary", True) self.copy_cli_btn.clicked.connect(self._copy_cli_to_clipboard) cli_btn_layout.addWidget(self.copy_cli_btn) self.refresh_cli_btn = QPushButton("刷新预览") self.refresh_cli_btn.setProperty("secondary", True) self.refresh_cli_btn.clicked.connect(self._update_cli_preview) cli_btn_layout.addWidget(self.refresh_cli_btn) cli_layout.addLayout(cli_btn_layout) layout.addWidget(cli_group) # 按钮 btn_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) btn_box.accepted.connect(self.accept) btn_box.rejected.connect(self.reject) layout.addWidget(btn_box) def _on_type_changed(self, index: int): schedule_type = self.schedule_type_combo.currentData() self.interval_widget.setVisible(schedule_type == ScheduleType.INTERVAL) self.daily_widget.setVisible(schedule_type == ScheduleType.DAILY) self.weekly_widget.setVisible(schedule_type == ScheduleType.WEEKLY) self.cron_widget.setVisible(schedule_type == ScheduleType.CRON) def _load_task(self, task: ScheduledTask): """加载任务数据""" self.name_edit.setText(task.name) self.enabled_check.setChecked(task.enabled) # 选择任务 for i in range(self.task_list.count()): item = self.task_list.item(i) code = item.data(Qt.UserRole) item.setSelected(code in task.task_codes) # 调度设置 schedule = task.schedule # 设置类型 for i in range(self.schedule_type_combo.count()): if self.schedule_type_combo.itemData(i) == schedule.schedule_type: self.schedule_type_combo.setCurrentIndex(i) break self.interval_value.setValue(schedule.interval_value) for i in range(self.interval_unit.count()): if self.interval_unit.itemData(i) == schedule.interval_unit: self.interval_unit.setCurrentIndex(i) break if schedule.daily_time: h, m = map(int, schedule.daily_time.split(":")) self.daily_time.setTime(QTime(h, m)) for day, check in self.day_checks.items(): check.setChecked(day in schedule.weekly_days) if schedule.weekly_time: h, m = map(int, schedule.weekly_time.split(":")) self.weekly_time.setTime(QTime(h, m)) self.cron_edit.setText(schedule.cron_expression) # 任务配置 if task.task_config.get("pipeline_flow"): for i in range(self.pipeline_combo.count()): if self.pipeline_combo.itemData(i) == task.task_config["pipeline_flow"]: self.pipeline_combo.setCurrentIndex(i) break self.lookback_hours.setValue(task.task_config.get("lookback_hours", 24)) def get_task(self) -> Optional[ScheduledTask]: """获取配置的任务""" name = self.name_edit.text().strip() if not name: QMessageBox.warning(self, "提示", "请输入任务名称") return None # 获取选中的任务 task_codes = [] for i in range(self.task_list.count()): item = self.task_list.item(i) if item.isSelected(): task_codes.append(item.data(Qt.UserRole)) if not task_codes: QMessageBox.warning(self, "提示", "请至少选择一个任务") return None # 构建调度配置 schedule_type = self.schedule_type_combo.currentData() weekly_days = [day for day, check in self.day_checks.items() if check.isChecked()] schedule = ScheduleConfig( schedule_type=schedule_type, interval_value=self.interval_value.value(), interval_unit=self.interval_unit.currentData(), daily_time=self.daily_time.time().toString("HH:mm"), weekly_days=weekly_days or [1], weekly_time=self.weekly_time.time().toString("HH:mm"), cron_expression=self.cron_edit.text().strip(), enabled=True, ) # 构建任务配置 task_config = { "pipeline_flow": self.pipeline_combo.currentData(), "lookback_hours": self.lookback_hours.value(), } # 创建或更新任务 if self.task: task = self.task task.name = name task.task_codes = task_codes task.schedule = schedule task.task_config = task_config task.enabled = self.enabled_check.isChecked() else: task = ScheduledTask( id=str(uuid.uuid4())[:8], name=name, task_codes=task_codes, schedule=schedule, task_config=task_config, enabled=self.enabled_check.isChecked(), ) task.update_next_run() return task def _connect_preview_signals(self): """连接信号以实时更新 CLI 预览""" # 任务选择变化 self.task_list.itemSelectionChanged.connect(self._update_cli_preview) # 调度配置变化 self.schedule_type_combo.currentIndexChanged.connect(self._update_cli_preview) self.interval_value.valueChanged.connect(self._update_cli_preview) self.interval_unit.currentIndexChanged.connect(self._update_cli_preview) # 任务配置变化 self.pipeline_combo.currentIndexChanged.connect(self._update_cli_preview) self.lookback_hours.valueChanged.connect(self._update_cli_preview) def _update_cli_preview(self): """更新 CLI 命令行预览""" # 如果用户正在手动编辑,不自动更新 if self.cli_editable_check.isChecked(): return # 获取选中的任务 task_codes = [] for i in range(self.task_list.count()): item = self.task_list.item(i) if item.isSelected(): task_codes.append(item.data(Qt.UserRole)) if not task_codes: self.cli_preview.setPlainText("# 请选择至少一个任务") return # 获取配置 lookback_hours = self.lookback_hours.value() pipeline_flow = self.pipeline_combo.currentData() # 构建说明注释 lines = [] # 调度规则说明 schedule_type = self.schedule_type_combo.currentData() if schedule_type == ScheduleType.INTERVAL: interval_val = self.interval_value.value() interval_unit = self.interval_unit.currentText() lines.append(f"# 调度:每 {interval_val} {interval_unit} 执行一次") elif schedule_type == ScheduleType.DAILY: daily_time = self.daily_time.time().toString("HH:mm") lines.append(f"# 调度:每天 {daily_time} 执行") elif schedule_type == ScheduleType.WEEKLY: weekly_time = self.weekly_time.time().toString("HH:mm") days = [f"周{['一','二','三','四','五','六','日'][d-1]}" for d, c in self.day_checks.items() if c.isChecked()] lines.append(f"# 调度:每周 {','.join(days)} {weekly_time} 执行") elif schedule_type == ScheduleType.CRON: cron_expr = self.cron_edit.text().strip() lines.append(f"# 调度:Cron 表达式 {cron_expr}") # 动态时间窗口说明 lines.append(f"# 回溯窗口:{lookback_hours} 小时") lines.append("#") lines.append("# ⚠ 时间窗口在每次执行时动态计算:") lines.append(f"# --window-start = <执行时间> - {lookback_hours}h") lines.append(f"# --window-end = <执行时间>") lines.append("") # 生成命令行(使用占位符表示动态时间) tasks_str = ",".join(task_codes) cmd_parts = [ "python -m cli.main", f"--tasks {tasks_str}", f"--pipeline-flow {pipeline_flow}", f'--window-start "<执行时间 - {lookback_hours}h>"', f'--window-end "<执行时间>"', ] lines.append(" \\\n ".join(cmd_parts)) # 添加示例(使用当前时间作为示例) lines.append("") lines.append("# -------- 示例(假设现在执行)--------") now = datetime.now() start_time = now - timedelta(hours=lookback_hours) # 构建示例 TaskConfig config = TaskConfig( tasks=task_codes, pipeline_flow=pipeline_flow, window_start=start_time.strftime("%Y-%m-%d %H:%M:%S"), window_end=now.strftime("%Y-%m-%d %H:%M:%S"), ) example_cmd = self.cli_builder.build_command_string(config) lines.append(f"# {example_cmd}") self.cli_preview.setPlainText("\n".join(lines)) def _on_cli_editable_changed(self, state): """切换 CLI 编辑模式""" editable = state == Qt.Checked self.cli_preview.setReadOnly(not editable) if editable: # 切换到编辑模式,提示用户 current_text = self.cli_preview.toPlainText() if not current_text.startswith("# [手动编辑模式]"): self.cli_preview.setPlainText(f"# [手动编辑模式] 修改后点击「刷新预览」可恢复自动生成\n{current_text}") else: # 切换回只读模式,刷新预览 self._update_cli_preview() def _copy_cli_to_clipboard(self): """复制 CLI 命令到剪贴板""" from PySide6.QtWidgets import QApplication text = self.cli_preview.toPlainText() # 提取实际命令行(跳过注释行) lines = text.split('\n') cmd_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')] cmd_text = '\n'.join(cmd_lines) if cmd_text: QApplication.clipboard().setText(cmd_text) QMessageBox.information(self, "提示", "命令行已复制到剪贴板") else: QMessageBox.warning(self, "提示", "没有可复制的命令") class ScheduleLogDialog(QDialog): """调度任务日志查看对话框""" def __init__(self, scheduled_task: ScheduledTask, task_history: List[QueuedTask], task_queue: List[QueuedTask] = None, parent=None): super().__init__(parent) self.scheduled_task = scheduled_task self.task_history = task_history # 引用任务管理器的执行历史 self.task_queue = task_queue or [] # 引用任务队列(用于获取执行中任务的实时日志) self.setWindowTitle(f"调度日志 - {scheduled_task.name}") self.setMinimumSize(900, 600) self._init_ui() # 定时刷新执行中任务的日志 self._refresh_timer = QTimer(self) self._refresh_timer.timeout.connect(self._refresh_running_log) self._refresh_timer.start(1000) # 每秒刷新 def _init_ui(self): layout = QVBoxLayout(self) # 调度任务基本信息 info_group = QGroupBox("调度任务信息") info_layout = QGridLayout(info_group) info_layout.addWidget(QLabel("任务名称:"), 0, 0) info_layout.addWidget(QLabel(self.scheduled_task.name), 0, 1) info_layout.addWidget(QLabel("执行任务:"), 0, 2) tasks_str = ", ".join(self.scheduled_task.task_codes[:3]) if len(self.scheduled_task.task_codes) > 3: tasks_str += f" (+{len(self.scheduled_task.task_codes) - 3})" info_layout.addWidget(QLabel(tasks_str), 0, 3) info_layout.addWidget(QLabel("调度规则:"), 1, 0) info_layout.addWidget(QLabel(self.scheduled_task.schedule.get_description()), 1, 1) info_layout.addWidget(QLabel("执行次数:"), 1, 2) info_layout.addWidget(QLabel(str(self.scheduled_task.run_count)), 1, 3) layout.addWidget(info_group) # 分割器 splitter = QSplitter(Qt.Vertical) layout.addWidget(splitter, 1) # 执行历史列表 history_group = QGroupBox(f"执行历史 (最近 {len(self.scheduled_task.execution_history)} 次)") history_layout = QVBoxLayout(history_group) self.history_table = QTableWidget() self.history_table.setColumnCount(5) self.history_table.setHorizontalHeaderLabels(["执行时间", "状态", "耗时", "退出码", "摘要"]) self.history_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents) self.history_table.horizontalHeader().setSectionResizeMode(4, QHeaderView.Stretch) self.history_table.setSelectionBehavior(QAbstractItemView.SelectRows) self.history_table.setSelectionMode(QAbstractItemView.SingleSelection) self.history_table.itemSelectionChanged.connect(self._on_selection_changed) history_layout.addWidget(self.history_table) splitter.addWidget(history_group) # 日志详情 log_group = QGroupBox("执行日志") log_layout = QVBoxLayout(log_group) self.log_text = QPlainTextEdit() self.log_text.setReadOnly(True) self.log_text.setFont(QFont("Consolas", 9)) self.log_text.setPlaceholderText("选择上方的执行记录查看详细日志...") log_layout.addWidget(self.log_text) splitter.addWidget(log_group) splitter.setSizes([250, 350]) # 按钮 btn_layout = QHBoxLayout() copy_btn = QPushButton("复制日志") copy_btn.clicked.connect(self._copy_log) btn_layout.addWidget(copy_btn) btn_layout.addStretch() close_btn = QPushButton("关闭") close_btn.clicked.connect(self.accept) btn_layout.addWidget(close_btn) layout.addLayout(btn_layout) # 填充历史表格 self._load_history() def _load_history(self): """加载执行历史""" records = self.scheduled_task.execution_history self.history_table.setRowCount(len(records)) for row, record in enumerate(records): # 执行时间 time_str = record.executed_at.strftime("%Y-%m-%d %H:%M:%S") self.history_table.setItem(row, 0, QTableWidgetItem(time_str)) # 状态 status_item = QTableWidgetItem(self._get_status_text(record.status)) status_item.setForeground(self._get_status_color(record.status)) self.history_table.setItem(row, 1, status_item) # 耗时 if record.duration_seconds > 0: if record.duration_seconds < 60: duration_str = f"{record.duration_seconds:.1f}秒" else: mins = int(record.duration_seconds // 60) secs = int(record.duration_seconds % 60) duration_str = f"{mins}分{secs}秒" else: duration_str = "-" self.history_table.setItem(row, 2, QTableWidgetItem(duration_str)) # 退出码 exit_str = str(record.exit_code) if record.exit_code is not None else "-" self.history_table.setItem(row, 3, QTableWidgetItem(exit_str)) # 摘要 summary_item = QTableWidgetItem(record.summary[:50] if record.summary else "-") summary_item.setData(Qt.UserRole, record.task_id) # 存储关联的任务ID self.history_table.setItem(row, 4, summary_item) # 自动选择第一行 if records: self.history_table.selectRow(0) def _on_selection_changed(self): """选择变化时显示对应的日志""" row = self.history_table.currentRow() if row < 0 or row >= len(self.scheduled_task.execution_history): self.log_text.clear() return record = self.scheduled_task.execution_history[row] # 如果是执行中的任务,从任务队列获取实时日志 if record.status == "pending": self._show_running_task_log(record) return # 优先使用执行记录中保存的日志 if record.output: log_content = record.output # 如果有错误信息,附加到末尾 if record.error: log_content += f"\n\n===== 错误信息 =====\n{record.error}" self.log_text.setPlainText(log_content) else: # 尝试从任务历史中查找(兼容旧记录) queued_task = None for task in self.task_history: if task.id == record.task_id: queued_task = task break if queued_task and queued_task.output: self.log_text.setPlainText(queued_task.output) else: # 显示基本信息 info_lines = [ f"执行时间: {record.executed_at.strftime('%Y-%m-%d %H:%M:%S')}", f"状态: {self._get_status_text(record.status)}", f"耗时: {record.duration_seconds:.1f} 秒", f"退出码: {record.exit_code}", f"", f"执行摘要:", record.summary if record.summary else "(无)", ] if record.error: info_lines.extend(["", "错误信息:", record.error]) self.log_text.setPlainText("\n".join(info_lines)) def _show_running_task_log(self, record: ScheduleExecutionRecord): """显示执行中任务的实时日志""" # 1. 先从任务队列中查找正在执行的任务 found_task = None task_source = "queue" for task in self.task_queue: if task.id == record.task_id: found_task = task break # 2. 如果队列中找不到,可能任务刚完成还在历史中,从历史中查找 if not found_task: for task in self.task_history: if task.id == record.task_id: found_task = task task_source = "history" break if found_task and found_task.output: # 显示日志 if task_source == "queue" and found_task.status == TaskStatus.RUNNING: # 任务还在执行中 header = f"===== 任务执行中 (实时日志) =====\n" header += f"任务 ID: {found_task.id}\n" if found_task.started_at: elapsed = (datetime.now() - found_task.started_at).total_seconds() header += f"已运行: {elapsed:.0f} 秒\n" header += "=" * 40 + "\n\n" else: # 任务已完成(可能是刚完成,记录状态还没更新) status_text = "成功" if found_task.status == TaskStatus.SUCCESS else "已完成" if found_task.status == TaskStatus.FAILED: status_text = "失败" header = f"===== 任务 {status_text} =====\n" header += f"任务 ID: {found_task.id}\n" if found_task.started_at and found_task.finished_at: duration = (found_task.finished_at - found_task.started_at).total_seconds() header += f"执行耗时: {duration:.1f} 秒\n" header += "=" * 40 + "\n\n" self.log_text.setPlainText(header + found_task.output) else: # 任务可能还未开始 info_lines = [ "===== 任务执行中 =====", f"任务 ID: {record.task_id}", f"开始时间: {record.executed_at.strftime('%Y-%m-%d %H:%M:%S')}", "", "日志正在生成中,请稍候...", "(日志将在任务执行过程中实时更新)", ] self.log_text.setPlainText("\n".join(info_lines)) def _refresh_running_log(self): """定时刷新执行中任务的日志""" row = self.history_table.currentRow() if row < 0 or row >= len(self.scheduled_task.execution_history): return record = self.scheduled_task.execution_history[row] # 检查任务是否还在执行中 if record.status == "pending": # 保存当前滚动位置 scrollbar = self.log_text.verticalScrollBar() at_bottom = scrollbar.value() >= scrollbar.maximum() - 10 # 检查任务是否已经完成(可能记录状态还没更新) task_completed = False for task in self.task_history: if task.id == record.task_id: task_completed = True break if task_completed: # 任务已完成,刷新历史表格以更新状态显示 # 重新加载历史数据(从 schedule_store 获取最新状态) # 注意:这里需要从父窗口重新获取调度任务的最新状态 self._show_running_task_log(record) # 触发重新选择以更新显示 self._on_selection_changed() else: self._show_running_task_log(record) # 如果之前在底部,保持在底部 if at_bottom: scrollbar.setValue(scrollbar.maximum()) def _copy_log(self): """复制日志到剪贴板""" from PySide6.QtWidgets import QApplication text = self.log_text.toPlainText() if text: QApplication.clipboard().setText(text) QMessageBox.information(self, "提示", "日志已复制到剪贴板") @staticmethod def _get_status_text(status: str) -> str: return { "pending": "执行中", "success": "成功", "failed": "失败", }.get(status, status or "未知") @staticmethod def _get_status_color(status: str) -> QColor: return { "pending": QColor("#1a73e8"), "success": QColor("#1e8e3e"), "failed": QColor("#d93025"), }.get(status, QColor("#333333")) class TaskManager(QWidget): """任务管理器""" # 信号 task_started = Signal(str) # 任务开始 task_finished = Signal(bool, str) # 任务完成 log_message = Signal(str) # 日志消息 def __init__(self, parent=None): super().__init__(parent) self.cli_builder = CLIBuilder() self.task_queue: List[QueuedTask] = [] self.task_history: List[QueuedTask] = [] self.current_worker: Optional[TaskWorker] = None self.auto_run = False # 调度任务存储 self.schedule_store = ScheduleStore() self.scheduler_enabled = False # 任务ID到调度任务ID的映射,用于在任务完成时更新调度执行记录 self._task_schedule_mapping: Dict[str, str] = {} self._init_ui() self._connect_signals() # 定时刷新 self.refresh_timer = QTimer(self) self.refresh_timer.timeout.connect(self._refresh_display) self.refresh_timer.start(1000) # 每秒刷新 # 调度检查定时器 self.schedule_timer = QTimer(self) self.schedule_timer.timeout.connect(self._check_scheduled_tasks) # 加载调度任务 self._refresh_schedule_table() # 加载任务历史 self._load_task_history() def _init_ui(self): """初始化界面""" layout = QVBoxLayout(self) layout.setContentsMargins(16, 16, 16, 16) layout.setSpacing(16) # 标题 title = QLabel("任务管理") title.setProperty("heading", True) layout.addWidget(title) # 使用选项卡 tab_widget = QTabWidget() layout.addWidget(tab_widget, 1) # ====== 任务队列选项卡 ====== queue_tab = QWidget() queue_tab_layout = QVBoxLayout(queue_tab) queue_tab_layout.setContentsMargins(0, 8, 0, 0) # 控制按钮 header_layout = QHBoxLayout() self.auto_run_btn = QPushButton("自动执行: 关") self.auto_run_btn.setProperty("secondary", True) self.auto_run_btn.setCheckable(True) header_layout.addWidget(self.auto_run_btn) self.clear_queue_btn = QPushButton("清空队列") self.clear_queue_btn.setProperty("secondary", True) header_layout.addWidget(self.clear_queue_btn) header_layout.addStretch() queue_tab_layout.addLayout(header_layout) # 分割器 splitter = QSplitter(Qt.Vertical) queue_tab_layout.addWidget(splitter, 1) # 任务队列 queue_group = QGroupBox("任务队列") queue_layout = QVBoxLayout(queue_group) self.queue_table = QTableWidget() self.queue_table.setColumnCount(5) self.queue_table.setHorizontalHeaderLabels(["ID", "任务", "状态", "创建时间", "操作"]) self.queue_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) self.queue_table.setSelectionBehavior(QAbstractItemView.SelectRows) self.queue_table.setContextMenuPolicy(Qt.CustomContextMenu) queue_layout.addWidget(self.queue_table) queue_btn_layout = QHBoxLayout() self.run_next_btn = QPushButton("执行下一个") self.run_all_btn = QPushButton("执行全部") self.stop_btn = QPushButton("停止当前") self.stop_btn.setProperty("danger", True) self.stop_btn.setEnabled(False) queue_btn_layout.addWidget(self.run_next_btn) queue_btn_layout.addWidget(self.run_all_btn) queue_btn_layout.addStretch() queue_btn_layout.addWidget(self.stop_btn) queue_layout.addLayout(queue_btn_layout) splitter.addWidget(queue_group) # 实时日志 log_group = QGroupBox("执行日志 (实时)") log_layout = QVBoxLayout(log_group) self.live_log = QPlainTextEdit() self.live_log.setReadOnly(True) self.live_log.setFont(QFont("Consolas", 9)) self.live_log.setMaximumBlockCount(1000) # 限制行数 self.live_log.setPlaceholderText("任务执行时,日志将在此实时显示...") log_layout.addWidget(self.live_log) log_btn_layout = QHBoxLayout() self.clear_log_btn = QPushButton("清空日志") self.clear_log_btn.setProperty("secondary", True) self.clear_log_btn.clicked.connect(self.live_log.clear) self.auto_scroll_check = QCheckBox("自动滚动") self.auto_scroll_check.setChecked(True) log_btn_layout.addWidget(self.clear_log_btn) log_btn_layout.addWidget(self.auto_scroll_check) log_btn_layout.addStretch() log_layout.addLayout(log_btn_layout) splitter.addWidget(log_group) # 执行历史 history_group = QGroupBox("执行历史") history_layout = QVBoxLayout(history_group) self.history_table = QTableWidget() self.history_table.setColumnCount(6) self.history_table.setHorizontalHeaderLabels(["ID", "任务", "状态", "开始时间", "耗时", "结果"]) self.history_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) self.history_table.horizontalHeader().setSectionResizeMode(5, QHeaderView.Stretch) self.history_table.setSelectionBehavior(QAbstractItemView.SelectRows) self.history_table.setContextMenuPolicy(Qt.CustomContextMenu) history_layout.addWidget(self.history_table) # 提示标签 hint_label = QLabel("提示:双击任务可查看详细日志") hint_label.setProperty("subheading", True) history_layout.addWidget(hint_label) history_btn_layout = QHBoxLayout() self.clear_history_btn = QPushButton("清空历史") self.clear_history_btn.setProperty("secondary", True) history_btn_layout.addStretch() history_btn_layout.addWidget(self.clear_history_btn) history_layout.addLayout(history_btn_layout) splitter.addWidget(history_group) splitter.setSizes([200, 250, 250]) tab_widget.addTab(queue_tab, "任务队列") # ====== 定时调度选项卡 ====== schedule_tab = QWidget() schedule_layout = QVBoxLayout(schedule_tab) schedule_layout.setContentsMargins(0, 8, 0, 0) # 调度控制 schedule_header = QHBoxLayout() self.scheduler_btn = QPushButton("调度器: 关") self.scheduler_btn.setProperty("secondary", True) self.scheduler_btn.setCheckable(True) self.scheduler_btn.setToolTip("开启后将自动执行到期的调度任务") schedule_header.addWidget(self.scheduler_btn) schedule_header.addStretch() self.add_schedule_btn = QPushButton("新建调度") schedule_header.addWidget(self.add_schedule_btn) schedule_layout.addLayout(schedule_header) # 调度任务表格 self.schedule_table = QTableWidget() self.schedule_table.setColumnCount(7) self.schedule_table.setHorizontalHeaderLabels([ "名称", "任务", "调度", "下次执行", "上次执行", "执行次数", "状态" ]) self.schedule_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents) self.schedule_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) self.schedule_table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents) self.schedule_table.setSelectionBehavior(QAbstractItemView.SelectRows) self.schedule_table.setContextMenuPolicy(Qt.CustomContextMenu) schedule_layout.addWidget(self.schedule_table, 1) # 调度说明 schedule_note = QLabel( "提示: 双击调度任务可查看执行日志。" "调度器运行时会自动检查并执行到期的任务。" "新建调度任务首次执行将延迟 60 秒。" ) schedule_note.setProperty("subheading", True) schedule_note.setWordWrap(True) schedule_layout.addWidget(schedule_note) tab_widget.addTab(schedule_tab, "定时调度") def _connect_signals(self): """连接信号""" self.auto_run_btn.toggled.connect(self._toggle_auto_run) self.clear_queue_btn.clicked.connect(self._clear_queue) self.run_next_btn.clicked.connect(self._run_next) self.run_all_btn.clicked.connect(self._run_all) self.stop_btn.clicked.connect(self._stop_current) self.clear_history_btn.clicked.connect(self._clear_history) self.queue_table.customContextMenuRequested.connect(self._show_queue_menu) # 历史表格 self.history_table.customContextMenuRequested.connect(self._show_history_menu) self.history_table.doubleClicked.connect(self._view_task_log) # 调度相关 self.scheduler_btn.toggled.connect(self._toggle_scheduler) self.add_schedule_btn.clicked.connect(self._add_schedule) self.schedule_table.customContextMenuRequested.connect(self._show_schedule_menu) self.schedule_table.doubleClicked.connect(self._on_schedule_double_click) def add_task(self, config: TaskConfig) -> str: """添加任务到队列""" task_id = str(uuid.uuid4())[:8] task = QueuedTask( id=task_id, config=config, status=TaskStatus.PENDING, ) self.task_queue.append(task) self._refresh_queue_table() # 如果开启了自动执行且当前没有任务在运行 if self.auto_run and not self._is_running(): self._run_next() return task_id def _toggle_auto_run(self, checked: bool): """切换自动执行""" self.auto_run = checked self.auto_run_btn.setText(f"自动执行: {'开' if checked else '关'}") # 如果开启自动执行且有待执行任务 if checked and self.task_queue and not self._is_running(): self._run_next() def _clear_queue(self): """清空队列""" # 只清除未执行的任务 self.task_queue = [t for t in self.task_queue if t.status == TaskStatus.RUNNING] self._refresh_queue_table() def _run_next(self): """执行下一个任务""" if self._is_running(): QMessageBox.information(self, "提示", "当前有任务正在执行") return # 找到下一个待执行的任务 for task in self.task_queue: if task.status == TaskStatus.PENDING: self._execute_task(task) break def _run_all(self): """执行全部任务""" self.auto_run = True self.auto_run_btn.setChecked(True) if not self._is_running(): self._run_next() def _stop_current(self): """停止当前任务""" if self.current_worker and self.current_worker.isRunning(): self.current_worker.stop() def _clear_history(self): """清空历史""" self.task_history.clear() self._refresh_history_table() self._save_task_history() # 保存到文件 def _execute_task(self, task: QueuedTask): """执行任务""" # 构建命令 cmd = self.cli_builder.build_command(task.config) # 获取额外环境变量 extra_env = task.config.env_vars if hasattr(task.config, 'env_vars') else {} # 创建工作线程 self.current_worker = TaskWorker(cmd, extra_env=extra_env) self.current_worker.output_received.connect(lambda line: self._on_output(task, line)) self.current_worker.task_finished.connect(lambda code, summary: self._on_finished(task, code, summary)) self.current_worker.error_occurred.connect(lambda error: self._on_error(task, error)) # 更新任务状态 task.status = TaskStatus.RUNNING task.started_at = datetime.now() # 更新 UI self.stop_btn.setEnabled(True) self._refresh_queue_table() # 发送信号 task_info = ",".join(task.config.tasks[:2]) if len(task.config.tasks) > 2: task_info += f" 等{len(task.config.tasks)}个" self.task_started.emit(task_info) # 在实时日志中显示任务开始信息 self.live_log.appendPlainText("") self.live_log.appendPlainText("=" * 60) self.live_log.appendPlainText(f"▶ 开始执行任务: {task_info}") self.live_log.appendPlainText(f" 任务 ID: {task.id}") self.live_log.appendPlainText(f" 开始时间: {task.started_at.strftime('%Y-%m-%d %H:%M:%S')}") self.live_log.appendPlainText("=" * 60) # 启动 self.current_worker.start() def _on_output(self, task: QueuedTask, line: str): """收到输出""" task.output += line + "\n" self.log_message.emit(line) # 显示到实时日志区域 timestamp = datetime.now().strftime("%H:%M:%S") self.live_log.appendPlainText(f"[{timestamp}] {line}") # 自动滚动 if self.auto_scroll_check.isChecked(): scrollbar = self.live_log.verticalScrollBar() scrollbar.setValue(scrollbar.maximum()) def _on_finished(self, task: QueuedTask, exit_code: int, summary: str): """任务完成""" task.finished_at = datetime.now() task.exit_code = exit_code if exit_code == 0: task.status = TaskStatus.SUCCESS else: task.status = TaskStatus.FAILED task.error = summary # 在实时日志中显示任务完成信息(详细报告) duration = (task.finished_at - task.started_at).total_seconds() if task.started_at else 0 status_text = "✓ 成功" if exit_code == 0 else f"✗ 失败 (退出码: {exit_code})" self.live_log.appendPlainText("") self.live_log.appendPlainText("=" * 60) self.live_log.appendPlainText(f"■ 任务执行报告") self.live_log.appendPlainText("=" * 60) self.live_log.appendPlainText(f" 状态: {status_text}") self.live_log.appendPlainText(f" 任务 ID: {task.id}") self.live_log.appendPlainText(f" 任务列表: {', '.join(task.config.tasks)}") self.live_log.appendPlainText(f" 开始时间: {task.started_at.strftime('%Y-%m-%d %H:%M:%S') if task.started_at else '-'}") self.live_log.appendPlainText(f" 结束时间: {task.finished_at.strftime('%Y-%m-%d %H:%M:%S') if task.finished_at else '-'}") # 格式化耗时 if duration < 60: duration_str = f"{duration:.1f} 秒" elif duration < 3600: mins = int(duration // 60) secs = int(duration % 60) duration_str = f"{mins} 分 {secs} 秒" else: hours = int(duration // 3600) mins = int((duration % 3600) // 60) duration_str = f"{hours} 时 {mins} 分" self.live_log.appendPlainText(f" 总耗时: {duration_str}") # 显示详细摘要 if summary: self.live_log.appendPlainText("") self.live_log.appendPlainText(" -------- 执行摘要 --------") for line in summary.split('\n'): self.live_log.appendPlainText(f" {line}") self.live_log.appendPlainText("=" * 60) self.live_log.appendPlainText("") # 【重要】先更新调度执行记录(如果此任务来自调度),确保日志能正确保存 # 必须在移除任务之前执行,否则 ScheduleLogDialog 无法获取实时日志 self._update_schedule_execution_record(task, duration, summary) # 移动到历史 self.task_queue.remove(task) self.task_history.insert(0, task) # 限制历史记录数量 if len(self.task_history) > 100: self.task_history = self.task_history[:100] # 保存历史到文件 self._save_task_history() # 更新 UI self.stop_btn.setEnabled(False) self._refresh_queue_table() self._refresh_history_table() self._refresh_schedule_table() # 发送信号 self.task_finished.emit(exit_code == 0, summary) # 如果开启自动执行,或者队列中有定时任务待执行,继续下一个 if self.auto_run or self._has_scheduled_tasks_pending(): QTimer.singleShot(500, self._run_next) def _on_error(self, task: QueuedTask, error: str): """发生错误""" task.error = error self.log_message.emit(f"[错误] {error}") def _is_running(self) -> bool: """是否有任务在运行""" return self.current_worker is not None and self.current_worker.isRunning() def _has_scheduled_tasks_pending(self) -> bool: """检查队列中是否有待执行的定时任务""" for task in self.task_queue: if task.status == TaskStatus.PENDING and task.id in self._task_schedule_mapping: return True return False def _refresh_display(self): """刷新显示""" # 更新运行中任务的状态 if self._is_running(): for row in range(self.queue_table.rowCount()): status_item = self.queue_table.item(row, 2) if status_item and status_item.text() == "执行中": # 添加动画效果 dots = "." * (int(datetime.now().timestamp()) % 4) status_item.setText(f"执行中{dots}") def _refresh_queue_table(self): """刷新队列表格""" self.queue_table.setRowCount(len(self.task_queue)) for row, task in enumerate(self.task_queue): # ID self.queue_table.setItem(row, 0, QTableWidgetItem(task.id)) # 任务 tasks_str = ", ".join(task.config.tasks[:3]) if len(task.config.tasks) > 3: tasks_str += f" (+{len(task.config.tasks) - 3})" self.queue_table.setItem(row, 1, QTableWidgetItem(tasks_str)) # 状态 status_item = QTableWidgetItem(self._get_status_text(task.status)) status_item.setForeground(self._get_status_color(task.status)) self.queue_table.setItem(row, 2, status_item) # 创建时间 time_str = task.created_at.strftime("%H:%M:%S") self.queue_table.setItem(row, 3, QTableWidgetItem(time_str)) # 操作按钮 self.queue_table.setItem(row, 4, QTableWidgetItem("")) def _refresh_history_table(self): """刷新历史表格""" self.history_table.setRowCount(len(self.task_history)) for row, task in enumerate(self.task_history): # ID self.history_table.setItem(row, 0, QTableWidgetItem(task.id)) # 任务 tasks_str = ", ".join(task.config.tasks[:3]) if len(task.config.tasks) > 3: tasks_str += f" (+{len(task.config.tasks) - 3})" self.history_table.setItem(row, 1, QTableWidgetItem(tasks_str)) # 状态 status_item = QTableWidgetItem(self._get_status_text(task.status)) status_item.setForeground(self._get_status_color(task.status)) self.history_table.setItem(row, 2, status_item) # 开始时间 time_str = task.started_at.strftime("%Y-%m-%d %H:%M:%S") if task.started_at else "-" self.history_table.setItem(row, 3, QTableWidgetItem(time_str)) # 耗时 if task.started_at and task.finished_at: duration = (task.finished_at - task.started_at).total_seconds() if duration < 60: duration_str = f"{duration:.1f}秒" else: duration_str = f"{int(duration // 60)}分{int(duration % 60)}秒" else: duration_str = "-" self.history_table.setItem(row, 4, QTableWidgetItem(duration_str)) # 结果 - 提取摘要的第一行作为显示 if task.status == TaskStatus.SUCCESS: # 从 output 中提取摘要信息 result = self._extract_result_summary(task) else: result = task.error if task.error else "失败" # 显示结果(取第一行,最多80字符) result_display = result.split('\n')[0][:80] if result else "成功" result_item = QTableWidgetItem(result_display) result_item.setToolTip(result) # 完整内容作为 tooltip self.history_table.setItem(row, 5, result_item) def _show_queue_menu(self, pos): """显示队列右键菜单""" item = self.queue_table.itemAt(pos) if not item: return row = item.row() if row >= len(self.task_queue): return task = self.task_queue[row] menu = QMenu(self) if task.status == TaskStatus.PENDING: run_action = QAction("立即执行", self) run_action.triggered.connect(lambda: self._execute_task(task)) menu.addAction(run_action) remove_action = QAction("移除", self) remove_action.triggered.connect(lambda: self._remove_task(task)) menu.addAction(remove_action) elif task.status == TaskStatus.RUNNING: stop_action = QAction("停止", self) stop_action.triggered.connect(self._stop_current) menu.addAction(stop_action) menu.exec(self.queue_table.mapToGlobal(pos)) def _remove_task(self, task: QueuedTask): """移除任务""" if task in self.task_queue: self.task_queue.remove(task) self._refresh_queue_table() def _show_history_menu(self, pos): """显示历史右键菜单""" item = self.history_table.itemAt(pos) if not item: return row = item.row() if row >= len(self.task_history): return task = self.task_history[row] menu = QMenu(self) view_action = QAction("查看日志", self) view_action.triggered.connect(lambda: self._show_task_log_dialog(task)) menu.addAction(view_action) rerun_action = QAction("重新执行", self) rerun_action.triggered.connect(lambda: self._rerun_task(task)) menu.addAction(rerun_action) menu.addSeparator() remove_action = QAction("从历史删除", self) remove_action.triggered.connect(lambda: self._remove_from_history(task)) menu.addAction(remove_action) menu.exec(self.history_table.mapToGlobal(pos)) def _view_task_log(self, index): """双击查看任务日志""" row = index.row() if row < len(self.task_history): self._show_task_log_dialog(self.task_history[row]) def _show_task_log_dialog(self, task: QueuedTask): """显示任务日志对话框""" dialog = TaskLogDialog(task, self) dialog.exec() def _rerun_task(self, task: QueuedTask): """重新执行任务""" # 创建新任务(使用相同配置) self.add_task(task.config) QMessageBox.information(self, "提示", "任务已添加到队列") def _remove_from_history(self, task: QueuedTask): """从历史中删除任务""" if task in self.task_history: self.task_history.remove(task) self._refresh_history_table() self._save_task_history() # 保存到文件 def _load_task_history(self): """从文件加载任务历史""" try: history_data = app_settings.load_task_history() for item in history_data: try: # 重建 TaskConfig config = TaskConfig( tasks=item.get("tasks", []), pipeline_flow=item.get("pipeline_flow", "FULL"), window_start=item.get("window_start"), window_end=item.get("window_end"), ) # 重建 QueuedTask status_str = item.get("status", "success") status_map = { "pending": TaskStatus.PENDING, "running": TaskStatus.RUNNING, "success": TaskStatus.SUCCESS, "failed": TaskStatus.FAILED, "cancelled": TaskStatus.CANCELLED, } status = status_map.get(status_str, TaskStatus.SUCCESS) task = QueuedTask( id=item.get("id", "unknown"), config=config, status=status, ) # 恢复时间 if item.get("created_at"): task.created_at = datetime.fromisoformat(item["created_at"]) if item.get("started_at"): task.started_at = datetime.fromisoformat(item["started_at"]) if item.get("finished_at"): task.finished_at = datetime.fromisoformat(item["finished_at"]) task.exit_code = item.get("exit_code") task.error = item.get("error", "") task.output = item.get("output_preview", "") self.task_history.append(task) except Exception: continue # 刷新显示 self._refresh_history_table() if self.task_history: self.log_message.emit(f"[系统] 已加载 {len(self.task_history)} 条历史任务记录") except Exception as e: print(f"加载任务历史失败: {e}") def _save_task_history(self): """保存任务历史到文件""" try: app_settings.save_task_history(self.task_history) except Exception as e: print(f"保存任务历史失败: {e}") def _extract_result_summary(self, task: QueuedTask) -> str: """从任务输出中提取结果摘要""" import re if not task.output: return "成功" lines = task.output.split('\n') summary_parts = [] # 统计关键数据 total_inserted = 0 total_missing = 0 total_records = 0 for line in lines: # 解析 DWD 装载统计 if "完成,统计=" in line: try: match = re.search(r"统计=(\{.+\})", line) if match: import json stats_str = match.group(1).replace("'", '"') stats = json.loads(stats_str) if 'tables' in stats: for tbl in stats['tables']: inserted = tbl.get('inserted', 0) processed = tbl.get('processed', 0) total_inserted += inserted + processed except Exception: pass # 解析数据校验结果 if "结果统计:" in line or "结果统计:" in line: try: match = re.search(r"\{.+\}", line) if match: import json stats_str = match.group(0).replace("'", '"') stats = json.loads(stats_str) total_missing = stats.get('missing', 0) except Exception: pass # 解析 CHECK_DONE match = re.search(r'CHECK_DONE.*records=(\d+)', line) if match: total_records += int(match.group(1)) # 构建摘要 if total_inserted > 0: summary_parts.append(f"处理 {total_inserted} 条") if total_records > 0: if total_missing > 0: summary_parts.append(f"校验 {total_records} 条, 缺失 {total_missing}") else: summary_parts.append(f"校验 {total_records} 条, 数据完整") if summary_parts: return " | ".join(summary_parts) return "成功" @staticmethod def _get_status_text(status: TaskStatus) -> str: """获取状态文本""" return { TaskStatus.PENDING: "待执行", TaskStatus.RUNNING: "执行中", TaskStatus.SUCCESS: "成功", TaskStatus.FAILED: "失败", TaskStatus.CANCELLED: "已取消", }.get(status, "未知") @staticmethod def _get_status_color(status: TaskStatus) -> QColor: """获取状态颜色""" return { TaskStatus.PENDING: QColor("#5f6368"), TaskStatus.RUNNING: QColor("#1a73e8"), TaskStatus.SUCCESS: QColor("#1e8e3e"), TaskStatus.FAILED: QColor("#d93025"), TaskStatus.CANCELLED: QColor("#9aa0a6"), }.get(status, QColor("#333333")) # ========== 调度相关方法 ========== def _toggle_scheduler(self, checked: bool): """切换调度器状态""" self.scheduler_enabled = checked self.scheduler_btn.setText(f"调度器: {'开' if checked else '关'}") if checked: # 启动调度检查定时器(每分钟检查一次) self.schedule_timer.start(60000) # 立即检查一次 self._check_scheduled_tasks() self.log_message.emit("[调度器] 已启动") else: self.schedule_timer.stop() self.log_message.emit("[调度器] 已停止") def _check_scheduled_tasks(self): """检查并执行到期的调度任务""" if not self.scheduler_enabled: return due_tasks = self.schedule_store.get_due_tasks() for task in due_tasks: self._execute_scheduled_task(task) def _execute_scheduled_task(self, scheduled_task: ScheduledTask): """执行调度任务""" # 构建任务配置 lookback_hours = scheduled_task.task_config.get("lookback_hours", 24) now = datetime.now() start_time = now - timedelta(hours=lookback_hours) config = TaskConfig( tasks=scheduled_task.task_codes, pipeline_flow=scheduled_task.task_config.get("pipeline_flow", "FULL"), window_start=start_time.strftime("%Y-%m-%d %H:%M:%S"), window_end=now.strftime("%Y-%m-%d %H:%M:%S"), ) # 添加到队列 task_id = self.add_task(config) # 创建执行记录 execution_record = ScheduleExecutionRecord( task_id=task_id, executed_at=now, status="pending", ) scheduled_task.add_execution_record(execution_record) # 保存映射关系,以便任务完成时更新记录 self._task_schedule_mapping[task_id] = scheduled_task.id # 更新调度任务状态 scheduled_task.last_run = now scheduled_task.run_count += 1 scheduled_task.last_status = "执行中" scheduled_task.update_next_run() self.schedule_store.update_task(scheduled_task) self._refresh_schedule_table() self.log_message.emit(f"[调度器] 执行任务: {scheduled_task.name} (ID: {task_id})") # 定时任务必须立即启动执行,不受 auto_run 设置影响 if not self._is_running(): # 从队列中找到刚添加的任务并执行 for queued_task in self.task_queue: if queued_task.id == task_id: self._execute_task(queued_task) break def _add_schedule(self): """添加调度任务""" dialog = ScheduleEditDialog(parent=self) if dialog.exec() == QDialog.Accepted: task = dialog.get_task() if task: self.schedule_store.add_task(task) self._refresh_schedule_table() self.log_message.emit(f"[调度器] 已创建任务: {task.name}") def _edit_schedule(self): """编辑调度任务""" row = self.schedule_table.currentRow() if row < 0: return tasks = self.schedule_store.get_all_tasks() if row >= len(tasks): return task = tasks[row] dialog = ScheduleEditDialog(task=task, parent=self) if dialog.exec() == QDialog.Accepted: updated_task = dialog.get_task() if updated_task: self.schedule_store.update_task(updated_task) self._refresh_schedule_table() self.log_message.emit(f"[调度器] 已更新任务: {updated_task.name}") def _delete_schedule(self, task_id: str): """删除调度任务""" task = self.schedule_store.get_task(task_id) if not task: return reply = QMessageBox.question( self, "确认删除", f"确定要删除调度任务 '{task.name}' 吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.Yes: self.schedule_store.remove_task(task_id) self._refresh_schedule_table() self.log_message.emit(f"[调度器] 已删除任务: {task.name}") def _toggle_schedule_enabled(self, task_id: str): """切换调度任务启用状态""" task = self.schedule_store.get_task(task_id) if task: task.enabled = not task.enabled task.update_next_run() self.schedule_store.update_task(task) self._refresh_schedule_table() status = "启用" if task.enabled else "禁用" self.log_message.emit(f"[调度器] {task.name}: {status}") def _run_schedule_now(self, task_id: str): """立即执行调度任务""" task = self.schedule_store.get_task(task_id) if task: self._execute_scheduled_task(task) def _show_schedule_menu(self, pos): """显示调度任务右键菜单""" item = self.schedule_table.itemAt(pos) if not item: return row = item.row() tasks = self.schedule_store.get_all_tasks() if row >= len(tasks): return task = tasks[row] menu = QMenu(self) # 查看日志 log_action = QAction("查看执行日志", self) log_action.triggered.connect(lambda: self._view_schedule_log(task.id)) menu.addAction(log_action) menu.addSeparator() # 立即执行 run_action = QAction("立即执行", self) run_action.triggered.connect(lambda: self._run_schedule_now(task.id)) menu.addAction(run_action) # 编辑 edit_action = QAction("编辑", self) edit_action.triggered.connect(self._edit_schedule) menu.addAction(edit_action) menu.addSeparator() # 启用/禁用 toggle_text = "禁用" if task.enabled else "启用" toggle_action = QAction(toggle_text, self) toggle_action.triggered.connect(lambda: self._toggle_schedule_enabled(task.id)) menu.addAction(toggle_action) menu.addSeparator() # 删除 delete_action = QAction("删除", self) delete_action.triggered.connect(lambda: self._delete_schedule(task.id)) menu.addAction(delete_action) menu.exec(self.schedule_table.mapToGlobal(pos)) def _refresh_schedule_table(self): """刷新调度任务表格""" tasks = self.schedule_store.get_all_tasks() self.schedule_table.setRowCount(len(tasks)) for row, task in enumerate(tasks): # 名称 name_item = QTableWidgetItem(task.name) name_item.setData(Qt.UserRole, task.id) self.schedule_table.setItem(row, 0, name_item) # 任务 tasks_str = ", ".join(task.task_codes[:2]) if len(task.task_codes) > 2: tasks_str += f" (+{len(task.task_codes) - 2})" self.schedule_table.setItem(row, 1, QTableWidgetItem(tasks_str)) # 调度 self.schedule_table.setItem(row, 2, QTableWidgetItem(task.schedule.get_description())) # 下次执行 if task.next_run: next_str = task.next_run.strftime("%m-%d %H:%M") else: next_str = "-" self.schedule_table.setItem(row, 3, QTableWidgetItem(next_str)) # 上次执行 if task.last_run: last_str = task.last_run.strftime("%m-%d %H:%M") else: last_str = "-" self.schedule_table.setItem(row, 4, QTableWidgetItem(last_str)) # 执行次数 self.schedule_table.setItem(row, 5, QTableWidgetItem(str(task.run_count))) # 状态 if task.enabled: status_text = "启用" status_color = QColor("#1e8e3e") else: status_text = "禁用" status_color = QColor("#9aa0a6") status_item = QTableWidgetItem(status_text) status_item.setForeground(status_color) self.schedule_table.setItem(row, 6, status_item) def _update_schedule_execution_record(self, task: QueuedTask, duration: float, summary: str): """更新调度执行记录(如果此任务来自调度)""" schedule_id = self._task_schedule_mapping.get(task.id) if not schedule_id: return # 从映射中移除(一次性) del self._task_schedule_mapping[task.id] # 获取调度任务 scheduled_task = self.schedule_store.get_task(schedule_id) if not scheduled_task: return # 更新执行记录(包含完整日志) status = "success" if task.status == TaskStatus.SUCCESS else "failed" scheduled_task.update_execution_record( task_id=task.id, status=status, exit_code=task.exit_code or 0, duration=duration, summary=summary or self._extract_result_summary(task), output=task.output or "", error=task.error or "", ) # 更新调度任务状态 scheduled_task.last_status = "成功" if status == "success" else "失败" # 保存 self.schedule_store.update_task(scheduled_task) self.log_message.emit(f"[调度器] 任务 {scheduled_task.name} 执行完成: {scheduled_task.last_status}") def _view_schedule_log(self, task_id: str): """查看调度任务日志""" task = self.schedule_store.get_task(task_id) if not task: QMessageBox.warning(self, "提示", "未找到调度任务") return if not task.execution_history: QMessageBox.information(self, "提示", "该调度任务尚无执行记录") return # 传递 task_queue 以便获取执行中任务的实时日志 dialog = ScheduleLogDialog(task, self.task_history, self.task_queue, self) dialog.exec() def _on_schedule_double_click(self, index): """双击调度任务查看日志""" row = index.row() tasks = self.schedule_store.get_all_tasks() if row < len(tasks): self._view_schedule_log(tasks[row].id)