# -*- coding: utf-8 -*- """ETL 状态面板""" from datetime import datetime from typing import Dict, List, Optional, Any from PySide6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, QGroupBox, QLabel, QPushButton, QTableWidget, QTableWidgetItem, QHeaderView, QFrame, QScrollArea, QMessageBox ) from PySide6.QtCore import Qt, Signal, QTimer from PySide6.QtGui import QColor from ..workers.db_worker import DBWorker from ..utils.config_helper import ConfigHelper class StatusCard(QFrame): """状态卡片""" def __init__(self, title: str, parent=None): super().__init__(parent) self.setProperty("card", True) self.setFrameShape(QFrame.StyledPanel) layout = QVBoxLayout(self) layout.setContentsMargins(16, 12, 16, 12) layout.setSpacing(8) # 标题 self.title_label = QLabel(title) self.title_label.setProperty("subheading", True) layout.addWidget(self.title_label) # 值 self.value_label = QLabel("-") self.value_label.setStyleSheet("font-size: 24px; font-weight: bold;") layout.addWidget(self.value_label) # 描述 self.desc_label = QLabel("") self.desc_label.setProperty("subheading", True) layout.addWidget(self.desc_label) def set_value(self, value: str, description: str = "", status: str = ""): """设置值""" self.value_label.setText(value) self.desc_label.setText(description) if status: self.value_label.setProperty("status", status) self.value_label.style().unpolish(self.value_label) self.value_label.style().polish(self.value_label) class StatusPanel(QWidget): """ETL 状态面板""" def __init__(self, parent=None): super().__init__(parent) self.config_helper = ConfigHelper() self.db_worker = DBWorker(self) self._connected = False self._init_ui() self._connect_signals() # 定时刷新 self.refresh_timer = QTimer(self) self.refresh_timer.timeout.connect(self._auto_refresh) def _init_ui(self): """初始化界面""" layout = QVBoxLayout(self) layout.setContentsMargins(16, 16, 16, 16) layout.setSpacing(16) # 标题和按钮 header_layout = QHBoxLayout() title = QLabel("ETL 状态") title.setProperty("heading", True) header_layout.addWidget(title) header_layout.addStretch() self.auto_refresh_btn = QPushButton("自动刷新: 关") self.auto_refresh_btn.setProperty("secondary", True) self.auto_refresh_btn.setCheckable(True) header_layout.addWidget(self.auto_refresh_btn) self.refresh_btn = QPushButton("刷新") self.refresh_btn.clicked.connect(self._refresh_all) header_layout.addWidget(self.refresh_btn) layout.addLayout(header_layout) # 连接状态 self.conn_status_label = QLabel("数据库: 未连接") self.conn_status_label.setProperty("status", "warning") layout.addWidget(self.conn_status_label) # 滚动区域 scroll_area = QScrollArea() scroll_area.setWidgetResizable(True) scroll_area.setFrameShape(QFrame.NoFrame) layout.addWidget(scroll_area, 1) # 内容容器 content_widget = QWidget() content_layout = QVBoxLayout(content_widget) content_layout.setSpacing(16) # 概览卡片 cards_layout = QHBoxLayout() self.ods_card = StatusCard("ODS 表数量") cards_layout.addWidget(self.ods_card) self.dwd_card = StatusCard("DWD 表数量") cards_layout.addWidget(self.dwd_card) self.last_update_card = StatusCard("最后更新") cards_layout.addWidget(self.last_update_card) self.task_count_card = StatusCard("今日任务") cards_layout.addWidget(self.task_count_card) content_layout.addLayout(cards_layout) # ODS Cutoff 状态 cutoff_group = QGroupBox("ODS Cutoff 状态") cutoff_layout = QVBoxLayout(cutoff_group) self.cutoff_table = QTableWidget() self.cutoff_table.setColumnCount(4) self.cutoff_table.setHorizontalHeaderLabels(["表名", "最新 fetched_at", "行数", "状态"]) self.cutoff_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch) self.cutoff_table.setMaximumHeight(250) cutoff_layout.addWidget(self.cutoff_table) content_layout.addWidget(cutoff_group) # 最近运行记录 history_group = QGroupBox("最近运行记录") history_layout = QVBoxLayout(history_group) self.history_table = QTableWidget() self.history_table.setColumnCount(6) self.history_table.setHorizontalHeaderLabels(["运行ID", "任务", "状态", "开始时间", "耗时", "影响行数"]) self.history_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) self.history_table.setMaximumHeight(250) history_layout.addWidget(self.history_table) content_layout.addWidget(history_group) # 弹性空间 content_layout.addStretch() scroll_area.setWidget(content_widget) def _connect_signals(self): """连接信号""" self.auto_refresh_btn.toggled.connect(self._toggle_auto_refresh) self.db_worker.connection_status.connect(self._on_connection_status) self.db_worker.query_finished.connect(self._on_query_finished) self.db_worker.query_error.connect(self._on_query_error) def _toggle_auto_refresh(self, checked: bool): """切换自动刷新""" if checked: self.auto_refresh_btn.setText("自动刷新: 开") self.refresh_timer.start(30000) # 30秒刷新一次 self._refresh_all() else: self.auto_refresh_btn.setText("自动刷新: 关") self.refresh_timer.stop() def _auto_refresh(self): """自动刷新""" if self._connected: self._refresh_all() def _refresh_all(self): """刷新所有数据""" # 尝试连接数据库 if not self._connected: env_vars = self.config_helper.load_env() dsn = env_vars.get("PG_DSN", "") if dsn: self.db_worker.connect_db(dsn) else: self.conn_status_label.setText("数据库: 未配置 DSN") return else: self._load_status_data() def _on_connection_status(self, connected: bool, message: str): """处理连接状态""" self._connected = connected if connected: self.conn_status_label.setText(f"数据库: 已连接") self.conn_status_label.setProperty("status", "success") self._load_status_data() else: self.conn_status_label.setText(f"数据库: {message}") self.conn_status_label.setProperty("status", "error") self.conn_status_label.style().unpolish(self.conn_status_label) self.conn_status_label.style().polish(self.conn_status_label) def _load_status_data(self): """加载状态数据""" # 加载表统计 self._current_query = "table_count" self.db_worker.execute_query(""" SELECT table_schema, COUNT(*) as table_count FROM information_schema.tables WHERE table_schema IN ('billiards_ods', 'billiards_dwd', 'billiards_dws') GROUP BY table_schema """) def _on_query_finished(self, columns: list, rows: list): """处理查询结果""" query_type = getattr(self, '_current_query', '') if query_type == "table_count": self._process_table_count(rows) # 继续加载 cutoff 数据 self._current_query = "cutoff" self.db_worker.execute_query(""" SELECT 'payment_transactions' AS table_name, MAX(fetched_at) AS max_fetched_at, COUNT(*) AS row_count FROM billiards_ods.payment_transactions UNION ALL SELECT 'member_profiles', MAX(fetched_at), COUNT(*) FROM billiards_ods.member_profiles UNION ALL SELECT 'settlement_records', MAX(fetched_at), COUNT(*) FROM billiards_ods.settlement_records UNION ALL SELECT 'recharge_settlements', MAX(fetched_at), COUNT(*) FROM billiards_ods.recharge_settlements UNION ALL SELECT 'assistant_service_records', MAX(fetched_at), COUNT(*) FROM billiards_ods.assistant_service_records ORDER BY table_name """) elif query_type == "cutoff": self._process_cutoff_data(rows) # 继续加载运行历史 self._current_query = "history" self.db_worker.execute_query(""" SELECT run_id, task_code, status, started_at, finished_at, rows_affected FROM etl_admin.run_tracker ORDER BY started_at DESC LIMIT 20 """) elif query_type == "history": self._process_history_data(rows) self._current_query = "" def _process_table_count(self, rows: list): """处理表数量数据""" ods_count = 0 dwd_count = 0 for row in rows: schema = row.get("table_schema", "") count = row.get("table_count", 0) if schema == "billiards_ods": ods_count = count elif schema == "billiards_dwd": dwd_count = count self.ods_card.set_value(str(ods_count), "个表") self.dwd_card.set_value(str(dwd_count), "个表") def _process_cutoff_data(self, rows: list): """处理 Cutoff 数据""" self.cutoff_table.setRowCount(len(rows)) latest_time = None now = datetime.now() for row_idx, row in enumerate(rows): table_name = row.get("table_name", "") max_fetched = row.get("max_fetched_at") row_count = row.get("row_count", 0) self.cutoff_table.setItem(row_idx, 0, QTableWidgetItem(table_name)) if max_fetched: time_str = str(max_fetched)[:19] self.cutoff_table.setItem(row_idx, 1, QTableWidgetItem(time_str)) # 更新最新时间 if latest_time is None or max_fetched > latest_time: latest_time = max_fetched # 计算状态 if isinstance(max_fetched, datetime): hours_ago = (now - max_fetched).total_seconds() / 3600 if hours_ago < 1: status = "正常" status_color = QColor("#1e8e3e") elif hours_ago < 24: status = "较新" status_color = QColor("#1a73e8") else: status = f"落后 {int(hours_ago)}h" status_color = QColor("#f9ab00") else: status = "-" status_color = QColor("#9aa0a6") else: self.cutoff_table.setItem(row_idx, 1, QTableWidgetItem("-")) status = "无数据" status_color = QColor("#d93025") self.cutoff_table.setItem(row_idx, 2, QTableWidgetItem(str(row_count))) status_item = QTableWidgetItem(status) status_item.setForeground(status_color) self.cutoff_table.setItem(row_idx, 3, status_item) # 更新最后更新时间卡片 if latest_time: time_str = str(latest_time)[:16] self.last_update_card.set_value(time_str, "") else: self.last_update_card.set_value("-", "无数据") def _process_history_data(self, rows: list): """处理运行历史数据""" self.history_table.setRowCount(len(rows)) today_count = 0 today = datetime.now().date() for row_idx, row in enumerate(rows): run_id = row.get("run_id", "") task_code = row.get("task_code", "") status = row.get("status", "") started_at = row.get("started_at") finished_at = row.get("finished_at") rows_affected = row.get("rows_affected", 0) # 统计今日任务 if started_at and isinstance(started_at, datetime): if started_at.date() == today: today_count += 1 self.history_table.setItem(row_idx, 0, QTableWidgetItem(str(run_id)[:8] if run_id else "-")) self.history_table.setItem(row_idx, 1, QTableWidgetItem(task_code)) # 状态 status_item = QTableWidgetItem(status) if status and "success" in status.lower(): status_item.setForeground(QColor("#1e8e3e")) elif status and ("fail" in status.lower() or "error" in status.lower()): status_item.setForeground(QColor("#d93025")) self.history_table.setItem(row_idx, 2, status_item) # 开始时间 time_str = str(started_at)[:19] if started_at else "-" self.history_table.setItem(row_idx, 3, QTableWidgetItem(time_str)) # 耗时 if started_at and finished_at: try: duration = (finished_at - started_at).total_seconds() if duration < 60: duration_str = f"{duration:.1f}秒" else: duration_str = f"{int(duration // 60)}分{int(duration % 60)}秒" except: duration_str = "-" else: duration_str = "-" self.history_table.setItem(row_idx, 4, QTableWidgetItem(duration_str)) # 影响行数 self.history_table.setItem(row_idx, 5, QTableWidgetItem(str(rows_affected or 0))) # 更新今日任务卡片 self.task_count_card.set_value(str(today_count), "次执行") def _on_query_error(self, error: str): """处理查询错误""" self._current_query = "" # 可能是表不存在,忽略错误继续 pass