193 lines
6.8 KiB
Python
193 lines
6.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""数据库查询工作线程"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
|
from PySide6.QtCore import QThread, Signal
|
|
|
|
# 添加项目路径
|
|
PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
|
if str(PROJECT_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
|
|
|
|
|
class DBWorker(QThread):
|
|
"""数据库查询工作线程"""
|
|
|
|
# 信号
|
|
query_finished = Signal(list, list) # 查询完成 (columns, rows)
|
|
query_error = Signal(str) # 查询错误
|
|
connection_status = Signal(bool, str) # 连接状态 (connected, message)
|
|
tables_loaded = Signal(dict) # 表列表加载完成 {schema: [(table, rows, updated_at), ...]}
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
self.conn = None
|
|
self._task = None
|
|
self._task_args = None
|
|
|
|
def connect_db(self, dsn: str):
|
|
"""连接数据库"""
|
|
self._task = "connect"
|
|
self._task_args = (dsn,)
|
|
self.start()
|
|
|
|
def disconnect_db(self):
|
|
"""断开数据库连接"""
|
|
self._task = "disconnect"
|
|
self._task_args = None
|
|
self.start()
|
|
|
|
def execute_query(self, sql: str, params: Optional[tuple] = None):
|
|
"""执行查询"""
|
|
self._task = "query"
|
|
self._task_args = (sql, params)
|
|
self.start()
|
|
|
|
def load_tables(self, schemas: Optional[List[str]] = None):
|
|
"""加载表列表"""
|
|
self._task = "load_tables"
|
|
self._task_args = (schemas,)
|
|
self.start()
|
|
|
|
def run(self):
|
|
"""执行任务"""
|
|
if self._task == "connect":
|
|
self._do_connect(*self._task_args)
|
|
elif self._task == "disconnect":
|
|
self._do_disconnect()
|
|
elif self._task == "query":
|
|
self._do_query(*self._task_args)
|
|
elif self._task == "load_tables":
|
|
self._do_load_tables(*self._task_args)
|
|
|
|
def _do_connect(self, dsn: str):
|
|
"""执行连接"""
|
|
try:
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
self.conn = psycopg2.connect(dsn, connect_timeout=10)
|
|
self.conn.set_session(autocommit=True)
|
|
|
|
# 测试连接
|
|
with self.conn.cursor() as cur:
|
|
cur.execute("SELECT version()")
|
|
version = cur.fetchone()[0]
|
|
|
|
self.connection_status.emit(True, f"已连接: {version[:50]}...")
|
|
except ImportError:
|
|
self.connection_status.emit(False, "缺少 psycopg2 模块,请安装: pip install psycopg2-binary")
|
|
except Exception as e:
|
|
self.conn = None
|
|
self.connection_status.emit(False, f"连接失败: {e}")
|
|
|
|
def _do_disconnect(self):
|
|
"""执行断开连接"""
|
|
if self.conn:
|
|
try:
|
|
self.conn.close()
|
|
except Exception:
|
|
pass
|
|
self.conn = None
|
|
self.connection_status.emit(False, "已断开连接")
|
|
|
|
def _do_query(self, sql: str, params: Optional[tuple]):
|
|
"""执行查询"""
|
|
if not self.conn:
|
|
self.query_error.emit("未连接到数据库")
|
|
return
|
|
|
|
try:
|
|
from psycopg2.extras import RealDictCursor
|
|
|
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute(sql, params)
|
|
|
|
# 检查是否有结果
|
|
if cur.description:
|
|
columns = [desc[0] for desc in cur.description]
|
|
rows = [dict(row) for row in cur.fetchall()]
|
|
self.query_finished.emit(columns, rows)
|
|
else:
|
|
self.query_finished.emit([], [])
|
|
except Exception as e:
|
|
self.query_error.emit(f"查询失败: {e}")
|
|
|
|
def _do_load_tables(self, schemas: Optional[List[str]]):
|
|
"""加载表列表"""
|
|
if not self.conn:
|
|
self.query_error.emit("未连接到数据库")
|
|
return
|
|
|
|
try:
|
|
if schemas is None:
|
|
schemas = ["billiards_ods", "billiards_dwd", "billiards_dws", "etl_admin"]
|
|
|
|
result = {}
|
|
|
|
for schema in schemas:
|
|
tables = []
|
|
|
|
# 获取表列表
|
|
sql = """
|
|
SELECT
|
|
t.table_name,
|
|
COALESCE(s.n_live_tup, 0) as row_count
|
|
FROM information_schema.tables t
|
|
LEFT JOIN pg_stat_user_tables s
|
|
ON t.table_name = s.relname
|
|
AND t.table_schema = s.schemaname
|
|
WHERE t.table_schema = %s
|
|
AND t.table_type = 'BASE TABLE'
|
|
ORDER BY t.table_name
|
|
"""
|
|
|
|
with self.conn.cursor() as cur:
|
|
cur.execute(sql, (schema,))
|
|
for row in cur.fetchall():
|
|
table_name = row[0]
|
|
row_count = row[1] or 0
|
|
|
|
# 尝试获取最新更新时间
|
|
updated_at = None
|
|
try:
|
|
# 尝试 fetched_at 字段
|
|
cur.execute(f'SELECT MAX(fetched_at) FROM "{schema}"."{table_name}"')
|
|
result_row = cur.fetchone()
|
|
if result_row and result_row[0]:
|
|
updated_at = str(result_row[0])[:19]
|
|
except Exception:
|
|
pass
|
|
|
|
if not updated_at:
|
|
try:
|
|
# 尝试 updated_at 字段
|
|
cur.execute(f'SELECT MAX(updated_at) FROM "{schema}"."{table_name}"')
|
|
result_row = cur.fetchone()
|
|
if result_row and result_row[0]:
|
|
updated_at = str(result_row[0])[:19]
|
|
except Exception:
|
|
pass
|
|
|
|
tables.append((table_name, row_count, updated_at or "-"))
|
|
|
|
result[schema] = tables
|
|
|
|
self.tables_loaded.emit(result)
|
|
except Exception as e:
|
|
self.query_error.emit(f"加载表列表失败: {e}")
|
|
|
|
def is_connected(self) -> bool:
|
|
"""检查是否已连接"""
|
|
if not self.conn:
|
|
return False
|
|
try:
|
|
with self.conn.cursor() as cur:
|
|
cur.execute("SELECT 1")
|
|
return True
|
|
except Exception:
|
|
return False
|