Files
Neo-ZQYY/apps/backend/app/routers/admin_db_health.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

165 lines
4.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""管理端 — 数据库健康监控 API
提供 1 个端点:
- GET /api/admin/db-health — 返回 4 个数据库的健康状态
遍历 etl_feiqiu / test_etl_feiqiu / zqyy_app / test_zqyy_app
对每个库执行诊断 SQL连接池、大小、慢查询
连接失败时返回 disconnected 状态,不抛出 HTTP 错误。
需求: 6.1, 6.2, 6.3, 6.4
"""
from __future__ import annotations
import logging
import os
import psycopg2
from fastapi import APIRouter, Depends
from app.auth.dependencies import CurrentUser, get_current_user
from app.config import (
DB_HOST,
DB_PASSWORD,
DB_PORT,
DB_USER,
ETL_DB_HOST,
ETL_DB_PASSWORD,
ETL_DB_PORT,
ETL_DB_USER,
)
from app.schemas.admin_db_health import DbHealthItem
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/admin/db-health", tags=["系统管理"])
# 4 个数据库的连接参数:业务库正式/测试 + ETL 库正式/测试
DB_CONFIGS: list[dict] = [
{
"db_name": "zqyy_app",
"host": DB_HOST,
"port": DB_PORT,
"user": DB_USER,
"password": DB_PASSWORD,
"dbname": "zqyy_app",
},
{
"db_name": "test_zqyy_app",
"host": DB_HOST,
"port": DB_PORT,
"user": DB_USER,
"password": DB_PASSWORD,
"dbname": "test_zqyy_app",
},
{
"db_name": "etl_feiqiu",
"host": ETL_DB_HOST,
"port": ETL_DB_PORT,
"user": ETL_DB_USER,
"password": ETL_DB_PASSWORD,
"dbname": "etl_feiqiu",
},
{
"db_name": "test_etl_feiqiu",
"host": ETL_DB_HOST,
"port": ETL_DB_PORT,
"user": ETL_DB_USER,
"password": ETL_DB_PASSWORD,
"dbname": "test_etl_feiqiu",
},
]
# 诊断 SQL连接池状态
_SQL_CONNECTIONS = """
SELECT
count(*) FILTER (WHERE state = 'active') AS active_connections,
count(*) FILTER (WHERE state = 'idle') AS idle_connections
FROM pg_stat_activity
WHERE datname = current_database();
"""
# 诊断 SQL数据库大小MB
_SQL_DB_SIZE = """
SELECT pg_database_size(current_database()) / (1024.0 * 1024.0) AS db_size_mb;
"""
# 诊断 SQL慢查询最近 1 小时内执行时间超过 1 秒)
_SQL_SLOW_QUERIES = """
SELECT count(*) AS slow_query_count
FROM pg_stat_activity
WHERE datname = current_database()
AND state = 'active'
AND query_start < now() - interval '1 second'
AND query_start > now() - interval '1 hour';
"""
def _check_single_db(cfg: dict) -> DbHealthItem:
"""对单个数据库执行诊断,连接失败时返回 disconnected。"""
db_name = cfg["db_name"]
try:
# CHANGE 2026-03-29 | Windows GBK 环境下 psycopg2/libpq 构建连接字符串时
# 会读取系统用户名/计算机名,含中文时触发 UnicodeDecodeError0xd6 是 GBK 首字节)。
# 用显式 DSN 字符串连接,避免 libpq 自动拼接时混入系统 locale 信息。
dsn = (
f"host={cfg['host']} port={cfg['port']} "
f"dbname={cfg['dbname']} user={cfg['user']} "
f"password={cfg['password']} "
f"connect_timeout=5 client_encoding=UTF8 "
f"application_name=neozqyy_health"
)
os.environ.setdefault("PGCLIENTENCODING", "UTF8")
conn = psycopg2.connect(dsn)
except Exception:
logger.warning("数据库 %s 连接失败", db_name, exc_info=True)
return DbHealthItem(db_name=db_name, status="disconnected")
try:
with conn.cursor() as cur:
# 连接池状态
cur.execute(_SQL_CONNECTIONS)
row = cur.fetchone()
active_connections = row[0] if row else 0
idle_connections = row[1] if row else 0
# 数据库大小
cur.execute(_SQL_DB_SIZE)
row = cur.fetchone()
db_size_mb = round(float(row[0]), 2) if row else 0.0
# 慢查询
cur.execute(_SQL_SLOW_QUERIES)
row = cur.fetchone()
slow_query_count = row[0] if row else 0
return DbHealthItem(
db_name=db_name,
status="connected",
active_connections=active_connections,
idle_connections=idle_connections,
db_size_mb=db_size_mb,
slow_query_count=slow_query_count,
)
except Exception:
logger.warning("数据库 %s 诊断 SQL 执行失败", db_name, exc_info=True)
return DbHealthItem(db_name=db_name, status="disconnected")
finally:
conn.close()
@router.get("", response_model=list[DbHealthItem])
async def get_db_health(
user: CurrentUser = Depends(get_current_user),
) -> list[DbHealthItem]:
"""返回 4 个数据库的健康状态。
遍历 DB_CONFIGS 中的 4 个库,逐一执行诊断 SQL。
连接失败时返回 disconnected 状态,不抛出 HTTP 错误。
即使所有库都连接失败,仍返回 HTTP 200。
"""
return [_check_single_db(cfg) for cfg in DB_CONFIGS]