From 7c7280917a2d3fc51d164a9805b88658c5df77a0 Mon Sep 17 00:00:00 2001 From: Neo Date: Mon, 19 Jan 2026 22:53:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=8F=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etl_billiards/gui/models/schedule_model.py | 312 ++++++++++++++++ etl_billiards/gui/widgets/status_panel.py | 406 +++++++++++++++++++++ 2 files changed, 718 insertions(+) create mode 100644 etl_billiards/gui/models/schedule_model.py create mode 100644 etl_billiards/gui/widgets/status_panel.py diff --git a/etl_billiards/gui/models/schedule_model.py b/etl_billiards/gui/models/schedule_model.py new file mode 100644 index 0000000..f7822ab --- /dev/null +++ b/etl_billiards/gui/models/schedule_model.py @@ -0,0 +1,312 @@ +# -*- coding: utf-8 -*- +"""调度任务数据模型""" + +import json +from dataclasses import dataclass, field, asdict +from datetime import datetime, timedelta +from enum import Enum +from typing import Optional, List, Dict, Any +from pathlib import Path + + +class ScheduleType(Enum): + """调度类型""" + ONCE = "once" # 一次性 + INTERVAL = "interval" # 固定间隔 + DAILY = "daily" # 每天 + WEEKLY = "weekly" # 每周 + CRON = "cron" # Cron 表达式 + + +class IntervalUnit(Enum): + """间隔单位""" + MINUTES = "minutes" + HOURS = "hours" + DAYS = "days" + + +@dataclass +class ScheduleConfig: + """调度配置""" + schedule_type: ScheduleType = ScheduleType.ONCE + + # 间隔调度 + interval_value: int = 1 + interval_unit: IntervalUnit = IntervalUnit.HOURS + + # 每日调度 + daily_time: str = "04:00" # HH:MM + + # 每周调度 + weekly_days: List[int] = field(default_factory=lambda: [1]) # 1-7, 1=周一 + weekly_time: str = "04:00" + + # Cron 表达式 + cron_expression: str = "0 4 * * *" + + # 通用设置 + enabled: bool = True + start_date: Optional[str] = None # YYYY-MM-DD + end_date: Optional[str] = None # YYYY-MM-DD + + def to_dict(self) -> dict: + """转换为字典""" + return { + "schedule_type": self.schedule_type.value, + "interval_value": self.interval_value, + "interval_unit": self.interval_unit.value, + "daily_time": self.daily_time, + "weekly_days": self.weekly_days, + "weekly_time": self.weekly_time, + "cron_expression": self.cron_expression, + "enabled": self.enabled, + "start_date": self.start_date, + "end_date": self.end_date, + } + + @classmethod + def from_dict(cls, data: dict) -> "ScheduleConfig": + """从字典创建""" + return cls( + schedule_type=ScheduleType(data.get("schedule_type", "once")), + interval_value=data.get("interval_value", 1), + interval_unit=IntervalUnit(data.get("interval_unit", "hours")), + daily_time=data.get("daily_time", "04:00"), + weekly_days=data.get("weekly_days", [1]), + weekly_time=data.get("weekly_time", "04:00"), + cron_expression=data.get("cron_expression", "0 4 * * *"), + enabled=data.get("enabled", True), + start_date=data.get("start_date"), + end_date=data.get("end_date"), + ) + + def get_description(self) -> str: + """获取调度描述""" + if self.schedule_type == ScheduleType.ONCE: + return "一次性执行" + elif self.schedule_type == ScheduleType.INTERVAL: + unit_names = {"minutes": "分钟", "hours": "小时", "days": "天"} + return f"每 {self.interval_value} {unit_names[self.interval_unit.value]}" + elif self.schedule_type == ScheduleType.DAILY: + return f"每天 {self.daily_time}" + elif self.schedule_type == ScheduleType.WEEKLY: + day_names = {1: "一", 2: "二", 3: "三", 4: "四", 5: "五", 6: "六", 7: "日"} + days = "、".join(f"周{day_names[d]}" for d in sorted(self.weekly_days)) + return f"每周 {days} {self.weekly_time}" + elif self.schedule_type == ScheduleType.CRON: + return f"Cron: {self.cron_expression}" + return "未知" + + def get_next_run_time(self, last_run: Optional[datetime] = None) -> Optional[datetime]: + """计算下次运行时间""" + now = datetime.now() + + # 检查日期范围 + if self.start_date: + start = datetime.strptime(self.start_date, "%Y-%m-%d") + if now < start: + now = start + + if self.end_date: + end = datetime.strptime(self.end_date, "%Y-%m-%d") + timedelta(days=1) + if now >= end: + return None + + if self.schedule_type == ScheduleType.ONCE: + return None if last_run else now + + elif self.schedule_type == ScheduleType.INTERVAL: + if not last_run: + return now + if self.interval_unit == IntervalUnit.MINUTES: + delta = timedelta(minutes=self.interval_value) + elif self.interval_unit == IntervalUnit.HOURS: + delta = timedelta(hours=self.interval_value) + else: + delta = timedelta(days=self.interval_value) + return last_run + delta + + elif self.schedule_type == ScheduleType.DAILY: + hour, minute = map(int, self.daily_time.split(":")) + next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_run <= now: + next_run += timedelta(days=1) + return next_run + + elif self.schedule_type == ScheduleType.WEEKLY: + hour, minute = map(int, self.weekly_time.split(":")) + # 找到下一个匹配的日期 + for i in range(8): + check_date = now + timedelta(days=i) + weekday = check_date.isoweekday() # 1-7 + if weekday in self.weekly_days: + next_run = check_date.replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_run > now: + return next_run + return None + + elif self.schedule_type == ScheduleType.CRON: + # 简化版 Cron 解析(只支持基本格式) + try: + return self._parse_simple_cron(now) + except Exception: + return None + + return None + + def _parse_simple_cron(self, now: datetime) -> Optional[datetime]: + """简化版 Cron 解析""" + parts = self.cron_expression.split() + if len(parts) != 5: + return None + + minute, hour, day, month, weekday = parts + + # 只处理简单情况 + if minute.isdigit() and hour.isdigit(): + next_run = now.replace( + hour=int(hour), + minute=int(minute), + second=0, + microsecond=0 + ) + if next_run <= now: + next_run += timedelta(days=1) + return next_run + + return None + + +@dataclass +class ScheduledTask: + """调度任务""" + id: str + name: str + task_codes: List[str] + schedule: ScheduleConfig + task_config: Dict[str, Any] = field(default_factory=dict) + + # 运行状态 + enabled: bool = True + last_run: Optional[datetime] = None + next_run: Optional[datetime] = None + run_count: int = 0 + last_status: str = "" + + created_at: datetime = field(default_factory=datetime.now) + updated_at: datetime = field(default_factory=datetime.now) + + def to_dict(self) -> dict: + """转换为字典""" + return { + "id": self.id, + "name": self.name, + "task_codes": self.task_codes, + "schedule": self.schedule.to_dict(), + "task_config": self.task_config, + "enabled": self.enabled, + "last_run": self.last_run.isoformat() if self.last_run else None, + "next_run": self.next_run.isoformat() if self.next_run else None, + "run_count": self.run_count, + "last_status": self.last_status, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + } + + @classmethod + def from_dict(cls, data: dict) -> "ScheduledTask": + """从字典创建""" + return cls( + id=data["id"], + name=data["name"], + task_codes=data["task_codes"], + schedule=ScheduleConfig.from_dict(data.get("schedule", {})), + task_config=data.get("task_config", {}), + enabled=data.get("enabled", True), + last_run=datetime.fromisoformat(data["last_run"]) if data.get("last_run") else None, + next_run=datetime.fromisoformat(data["next_run"]) if data.get("next_run") else None, + run_count=data.get("run_count", 0), + last_status=data.get("last_status", ""), + created_at=datetime.fromisoformat(data["created_at"]) if data.get("created_at") else datetime.now(), + updated_at=datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else datetime.now(), + ) + + def update_next_run(self): + """更新下次运行时间""" + if self.enabled and self.schedule.enabled: + self.next_run = self.schedule.get_next_run_time(self.last_run) + else: + self.next_run = None + self.updated_at = datetime.now() + + +class ScheduleStore: + """调度任务存储""" + + def __init__(self, storage_path: Optional[Path] = None): + if storage_path is None: + storage_path = Path(__file__).resolve().parents[2] / "scheduled_tasks.json" + self.storage_path = storage_path + self.tasks: Dict[str, ScheduledTask] = {} + self.load() + + def load(self): + """加载任务""" + if self.storage_path.exists(): + try: + data = json.loads(self.storage_path.read_text(encoding="utf-8")) + self.tasks = { + task_id: ScheduledTask.from_dict(task_data) + for task_id, task_data in data.get("tasks", {}).items() + } + except Exception: + self.tasks = {} + + def save(self): + """保存任务""" + data = { + "tasks": { + task_id: task.to_dict() + for task_id, task in self.tasks.items() + } + } + self.storage_path.write_text( + json.dumps(data, ensure_ascii=False, indent=2), + encoding="utf-8" + ) + + def add_task(self, task: ScheduledTask): + """添加任务""" + task.update_next_run() + self.tasks[task.id] = task + self.save() + + def remove_task(self, task_id: str): + """移除任务""" + if task_id in self.tasks: + del self.tasks[task_id] + self.save() + + def update_task(self, task: ScheduledTask): + """更新任务""" + task.update_next_run() + task.updated_at = datetime.now() + self.tasks[task.id] = task + self.save() + + def get_task(self, task_id: str) -> Optional[ScheduledTask]: + """获取任务""" + return self.tasks.get(task_id) + + def get_all_tasks(self) -> List[ScheduledTask]: + """获取所有任务""" + return list(self.tasks.values()) + + def get_due_tasks(self) -> List[ScheduledTask]: + """获取到期需要执行的任务""" + now = datetime.now() + due_tasks = [] + for task in self.tasks.values(): + if task.enabled and task.next_run and task.next_run <= now: + due_tasks.append(task) + return due_tasks diff --git a/etl_billiards/gui/widgets/status_panel.py b/etl_billiards/gui/widgets/status_panel.py new file mode 100644 index 0000000..9618860 --- /dev/null +++ b/etl_billiards/gui/widgets/status_panel.py @@ -0,0 +1,406 @@ +# -*- coding: utf-8 -*- +"""ETL 状态面板""" + +from datetime import datetime +from typing import Dict, List, Optional, Any + +from PySide6.QtWidgets import ( + QWidget, QVBoxLayout, QHBoxLayout, QGridLayout, + QGroupBox, QLabel, QPushButton, QTableWidget, QTableWidgetItem, + QHeaderView, QFrame, QScrollArea, QMessageBox +) +from PySide6.QtCore import Qt, Signal, QTimer +from PySide6.QtGui import QColor + +from ..workers.db_worker import DBWorker +from ..utils.config_helper import ConfigHelper + + +class StatusCard(QFrame): + """状态卡片""" + + def __init__(self, title: str, parent=None): + super().__init__(parent) + self.setProperty("card", True) + self.setFrameShape(QFrame.StyledPanel) + + layout = QVBoxLayout(self) + layout.setContentsMargins(16, 12, 16, 12) + layout.setSpacing(8) + + # 标题 + self.title_label = QLabel(title) + self.title_label.setProperty("subheading", True) + layout.addWidget(self.title_label) + + # 值 + self.value_label = QLabel("-") + self.value_label.setStyleSheet("font-size: 24px; font-weight: bold;") + layout.addWidget(self.value_label) + + # 描述 + self.desc_label = QLabel("") + self.desc_label.setProperty("subheading", True) + layout.addWidget(self.desc_label) + + def set_value(self, value: str, description: str = "", status: str = ""): + """设置值""" + self.value_label.setText(value) + self.desc_label.setText(description) + + if status: + self.value_label.setProperty("status", status) + self.value_label.style().unpolish(self.value_label) + self.value_label.style().polish(self.value_label) + + +class StatusPanel(QWidget): + """ETL 状态面板""" + + def __init__(self, parent=None): + super().__init__(parent) + self.config_helper = ConfigHelper() + self.db_worker = DBWorker(self) + self._connected = False + + self._init_ui() + self._connect_signals() + + # 定时刷新 + self.refresh_timer = QTimer(self) + self.refresh_timer.timeout.connect(self._auto_refresh) + + def _init_ui(self): + """初始化界面""" + layout = QVBoxLayout(self) + layout.setContentsMargins(16, 16, 16, 16) + layout.setSpacing(16) + + # 标题和按钮 + header_layout = QHBoxLayout() + + title = QLabel("ETL 状态") + title.setProperty("heading", True) + header_layout.addWidget(title) + + header_layout.addStretch() + + self.auto_refresh_btn = QPushButton("自动刷新: 关") + self.auto_refresh_btn.setProperty("secondary", True) + self.auto_refresh_btn.setCheckable(True) + header_layout.addWidget(self.auto_refresh_btn) + + self.refresh_btn = QPushButton("刷新") + self.refresh_btn.clicked.connect(self._refresh_all) + header_layout.addWidget(self.refresh_btn) + + layout.addLayout(header_layout) + + # 连接状态 + self.conn_status_label = QLabel("数据库: 未连接") + self.conn_status_label.setProperty("status", "warning") + layout.addWidget(self.conn_status_label) + + # 滚动区域 + scroll_area = QScrollArea() + scroll_area.setWidgetResizable(True) + scroll_area.setFrameShape(QFrame.NoFrame) + layout.addWidget(scroll_area, 1) + + # 内容容器 + content_widget = QWidget() + content_layout = QVBoxLayout(content_widget) + content_layout.setSpacing(16) + + # 概览卡片 + cards_layout = QHBoxLayout() + + self.ods_card = StatusCard("ODS 表数量") + cards_layout.addWidget(self.ods_card) + + self.dwd_card = StatusCard("DWD 表数量") + cards_layout.addWidget(self.dwd_card) + + self.last_update_card = StatusCard("最后更新") + cards_layout.addWidget(self.last_update_card) + + self.task_count_card = StatusCard("今日任务") + cards_layout.addWidget(self.task_count_card) + + content_layout.addLayout(cards_layout) + + # ODS Cutoff 状态 + cutoff_group = QGroupBox("ODS Cutoff 状态") + cutoff_layout = QVBoxLayout(cutoff_group) + + self.cutoff_table = QTableWidget() + self.cutoff_table.setColumnCount(4) + self.cutoff_table.setHorizontalHeaderLabels(["表名", "最新 fetched_at", "行数", "状态"]) + self.cutoff_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.Stretch) + self.cutoff_table.setMaximumHeight(250) + cutoff_layout.addWidget(self.cutoff_table) + + content_layout.addWidget(cutoff_group) + + # 最近运行记录 + history_group = QGroupBox("最近运行记录") + history_layout = QVBoxLayout(history_group) + + self.history_table = QTableWidget() + self.history_table.setColumnCount(6) + self.history_table.setHorizontalHeaderLabels(["运行ID", "任务", "状态", "开始时间", "耗时", "影响行数"]) + self.history_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) + self.history_table.setMaximumHeight(250) + history_layout.addWidget(self.history_table) + + content_layout.addWidget(history_group) + + # 弹性空间 + content_layout.addStretch() + + scroll_area.setWidget(content_widget) + + def _connect_signals(self): + """连接信号""" + self.auto_refresh_btn.toggled.connect(self._toggle_auto_refresh) + self.db_worker.connection_status.connect(self._on_connection_status) + self.db_worker.query_finished.connect(self._on_query_finished) + self.db_worker.query_error.connect(self._on_query_error) + + def _toggle_auto_refresh(self, checked: bool): + """切换自动刷新""" + if checked: + self.auto_refresh_btn.setText("自动刷新: 开") + self.refresh_timer.start(30000) # 30秒刷新一次 + self._refresh_all() + else: + self.auto_refresh_btn.setText("自动刷新: 关") + self.refresh_timer.stop() + + def _auto_refresh(self): + """自动刷新""" + if self._connected: + self._refresh_all() + + def _refresh_all(self): + """刷新所有数据""" + # 尝试连接数据库 + if not self._connected: + env_vars = self.config_helper.load_env() + dsn = env_vars.get("PG_DSN", "") + if dsn: + self.db_worker.connect_db(dsn) + else: + self.conn_status_label.setText("数据库: 未配置 DSN") + return + else: + self._load_status_data() + + def _on_connection_status(self, connected: bool, message: str): + """处理连接状态""" + self._connected = connected + + if connected: + self.conn_status_label.setText(f"数据库: 已连接") + self.conn_status_label.setProperty("status", "success") + self._load_status_data() + else: + self.conn_status_label.setText(f"数据库: {message}") + self.conn_status_label.setProperty("status", "error") + + self.conn_status_label.style().unpolish(self.conn_status_label) + self.conn_status_label.style().polish(self.conn_status_label) + + def _load_status_data(self): + """加载状态数据""" + # 加载表统计 + self._current_query = "table_count" + self.db_worker.execute_query(""" + SELECT + table_schema, + COUNT(*) as table_count + FROM information_schema.tables + WHERE table_schema IN ('billiards_ods', 'billiards_dwd', 'billiards_dws') + GROUP BY table_schema + """) + + def _on_query_finished(self, columns: list, rows: list): + """处理查询结果""" + query_type = getattr(self, '_current_query', '') + + if query_type == "table_count": + self._process_table_count(rows) + # 继续加载 cutoff 数据 + self._current_query = "cutoff" + self.db_worker.execute_query(""" + SELECT + 'payment_transactions' AS table_name, + MAX(fetched_at) AS max_fetched_at, + COUNT(*) AS row_count + FROM billiards_ods.payment_transactions + UNION ALL + SELECT 'member_profiles', MAX(fetched_at), COUNT(*) + FROM billiards_ods.member_profiles + UNION ALL + SELECT 'settlement_records', MAX(fetched_at), COUNT(*) + FROM billiards_ods.settlement_records + UNION ALL + SELECT 'recharge_settlements', MAX(fetched_at), COUNT(*) + FROM billiards_ods.recharge_settlements + UNION ALL + SELECT 'assistant_service_records', MAX(fetched_at), COUNT(*) + FROM billiards_ods.assistant_service_records + ORDER BY table_name + """) + elif query_type == "cutoff": + self._process_cutoff_data(rows) + # 继续加载运行历史 + self._current_query = "history" + self.db_worker.execute_query(""" + SELECT + run_id, + task_code, + status, + started_at, + finished_at, + rows_affected + FROM etl_admin.run_tracker + ORDER BY started_at DESC + LIMIT 20 + """) + elif query_type == "history": + self._process_history_data(rows) + self._current_query = "" + + def _process_table_count(self, rows: list): + """处理表数量数据""" + ods_count = 0 + dwd_count = 0 + + for row in rows: + schema = row.get("table_schema", "") + count = row.get("table_count", 0) + + if schema == "billiards_ods": + ods_count = count + elif schema == "billiards_dwd": + dwd_count = count + + self.ods_card.set_value(str(ods_count), "个表") + self.dwd_card.set_value(str(dwd_count), "个表") + + def _process_cutoff_data(self, rows: list): + """处理 Cutoff 数据""" + self.cutoff_table.setRowCount(len(rows)) + + latest_time = None + now = datetime.now() + + for row_idx, row in enumerate(rows): + table_name = row.get("table_name", "") + max_fetched = row.get("max_fetched_at") + row_count = row.get("row_count", 0) + + self.cutoff_table.setItem(row_idx, 0, QTableWidgetItem(table_name)) + + if max_fetched: + time_str = str(max_fetched)[:19] + self.cutoff_table.setItem(row_idx, 1, QTableWidgetItem(time_str)) + + # 更新最新时间 + if latest_time is None or max_fetched > latest_time: + latest_time = max_fetched + + # 计算状态 + if isinstance(max_fetched, datetime): + hours_ago = (now - max_fetched).total_seconds() / 3600 + if hours_ago < 1: + status = "正常" + status_color = QColor("#1e8e3e") + elif hours_ago < 24: + status = "较新" + status_color = QColor("#1a73e8") + else: + status = f"落后 {int(hours_ago)}h" + status_color = QColor("#f9ab00") + else: + status = "-" + status_color = QColor("#9aa0a6") + else: + self.cutoff_table.setItem(row_idx, 1, QTableWidgetItem("-")) + status = "无数据" + status_color = QColor("#d93025") + + self.cutoff_table.setItem(row_idx, 2, QTableWidgetItem(str(row_count))) + + status_item = QTableWidgetItem(status) + status_item.setForeground(status_color) + self.cutoff_table.setItem(row_idx, 3, status_item) + + # 更新最后更新时间卡片 + if latest_time: + time_str = str(latest_time)[:16] + self.last_update_card.set_value(time_str, "") + else: + self.last_update_card.set_value("-", "无数据") + + def _process_history_data(self, rows: list): + """处理运行历史数据""" + self.history_table.setRowCount(len(rows)) + + today_count = 0 + today = datetime.now().date() + + for row_idx, row in enumerate(rows): + run_id = row.get("run_id", "") + task_code = row.get("task_code", "") + status = row.get("status", "") + started_at = row.get("started_at") + finished_at = row.get("finished_at") + rows_affected = row.get("rows_affected", 0) + + # 统计今日任务 + if started_at and isinstance(started_at, datetime): + if started_at.date() == today: + today_count += 1 + + self.history_table.setItem(row_idx, 0, QTableWidgetItem(str(run_id)[:8] if run_id else "-")) + self.history_table.setItem(row_idx, 1, QTableWidgetItem(task_code)) + + # 状态 + status_item = QTableWidgetItem(status) + if status and "success" in status.lower(): + status_item.setForeground(QColor("#1e8e3e")) + elif status and ("fail" in status.lower() or "error" in status.lower()): + status_item.setForeground(QColor("#d93025")) + self.history_table.setItem(row_idx, 2, status_item) + + # 开始时间 + time_str = str(started_at)[:19] if started_at else "-" + self.history_table.setItem(row_idx, 3, QTableWidgetItem(time_str)) + + # 耗时 + if started_at and finished_at: + try: + duration = (finished_at - started_at).total_seconds() + if duration < 60: + duration_str = f"{duration:.1f}秒" + else: + duration_str = f"{int(duration // 60)}分{int(duration % 60)}秒" + except: + duration_str = "-" + else: + duration_str = "-" + self.history_table.setItem(row_idx, 4, QTableWidgetItem(duration_str)) + + # 影响行数 + self.history_table.setItem(row_idx, 5, QTableWidgetItem(str(rows_affected or 0))) + + # 更新今日任务卡片 + self.task_count_card.set_value(str(today_count), "次执行") + + def _on_query_error(self, error: str): + """处理查询错误""" + self._current_query = "" + # 可能是表不存在,忽略错误继续 + pass