Files
2026-01-27 22:47:05 +08:00

193 lines
6.8 KiB
Python

# -*- coding: utf-8 -*-
"""数据库查询工作线程"""
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from PySide6.QtCore import QThread, Signal
# 添加项目路径
PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
class DBWorker(QThread):
"""数据库查询工作线程"""
# 信号
query_finished = Signal(list, list) # 查询完成 (columns, rows)
query_error = Signal(str) # 查询错误
connection_status = Signal(bool, str) # 连接状态 (connected, message)
tables_loaded = Signal(dict) # 表列表加载完成 {schema: [(table, rows, updated_at), ...]}
def __init__(self, parent=None):
super().__init__(parent)
self.conn = None
self._task = None
self._task_args = None
def connect_db(self, dsn: str):
"""连接数据库"""
self._task = "connect"
self._task_args = (dsn,)
self.start()
def disconnect_db(self):
"""断开数据库连接"""
self._task = "disconnect"
self._task_args = None
self.start()
def execute_query(self, sql: str, params: Optional[tuple] = None):
"""执行查询"""
self._task = "query"
self._task_args = (sql, params)
self.start()
def load_tables(self, schemas: Optional[List[str]] = None):
"""加载表列表"""
self._task = "load_tables"
self._task_args = (schemas,)
self.start()
def run(self):
"""执行任务"""
if self._task == "connect":
self._do_connect(*self._task_args)
elif self._task == "disconnect":
self._do_disconnect()
elif self._task == "query":
self._do_query(*self._task_args)
elif self._task == "load_tables":
self._do_load_tables(*self._task_args)
def _do_connect(self, dsn: str):
"""执行连接"""
try:
import psycopg2
from psycopg2.extras import RealDictCursor
self.conn = psycopg2.connect(dsn, connect_timeout=10)
self.conn.set_session(autocommit=True)
# 测试连接
with self.conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
self.connection_status.emit(True, f"已连接: {version[:50]}...")
except ImportError:
self.connection_status.emit(False, "缺少 psycopg2 模块,请安装: pip install psycopg2-binary")
except Exception as e:
self.conn = None
self.connection_status.emit(False, f"连接失败: {e}")
def _do_disconnect(self):
"""执行断开连接"""
if self.conn:
try:
self.conn.close()
except Exception:
pass
self.conn = None
self.connection_status.emit(False, "已断开连接")
def _do_query(self, sql: str, params: Optional[tuple]):
"""执行查询"""
if not self.conn:
self.query_error.emit("未连接到数据库")
return
try:
from psycopg2.extras import RealDictCursor
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute(sql, params)
# 检查是否有结果
if cur.description:
columns = [desc[0] for desc in cur.description]
rows = [dict(row) for row in cur.fetchall()]
self.query_finished.emit(columns, rows)
else:
self.query_finished.emit([], [])
except Exception as e:
self.query_error.emit(f"查询失败: {e}")
def _do_load_tables(self, schemas: Optional[List[str]]):
"""加载表列表"""
if not self.conn:
self.query_error.emit("未连接到数据库")
return
try:
if schemas is None:
schemas = ["billiards_ods", "billiards_dwd", "billiards_dws", "etl_admin"]
result = {}
for schema in schemas:
tables = []
# 获取表列表
sql = """
SELECT
t.table_name,
COALESCE(s.n_live_tup, 0) as row_count
FROM information_schema.tables t
LEFT JOIN pg_stat_user_tables s
ON t.table_name = s.relname
AND t.table_schema = s.schemaname
WHERE t.table_schema = %s
AND t.table_type = 'BASE TABLE'
ORDER BY t.table_name
"""
with self.conn.cursor() as cur:
cur.execute(sql, (schema,))
for row in cur.fetchall():
table_name = row[0]
row_count = row[1] or 0
# 尝试获取最新更新时间
updated_at = None
try:
# 尝试 fetched_at 字段
cur.execute(f'SELECT MAX(fetched_at) FROM "{schema}"."{table_name}"')
result_row = cur.fetchone()
if result_row and result_row[0]:
updated_at = str(result_row[0])[:19]
except Exception:
pass
if not updated_at:
try:
# 尝试 updated_at 字段
cur.execute(f'SELECT MAX(updated_at) FROM "{schema}"."{table_name}"')
result_row = cur.fetchone()
if result_row and result_row[0]:
updated_at = str(result_row[0])[:19]
except Exception:
pass
tables.append((table_name, row_count, updated_at or "-"))
result[schema] = tables
self.tables_loaded.emit(result)
except Exception as e:
self.query_error.emit(f"加载表列表失败: {e}")
def is_connected(self) -> bool:
"""检查是否已连接"""
if not self.conn:
return False
try:
with self.conn.cursor() as cur:
cur.execute("SELECT 1")
return True
except Exception:
return False