# app/db.py from contextlib import contextmanager from typing import Iterator import psycopg2 from psycopg2.extras import RealDictCursor from .config import get_settings @contextmanager def get_connection(db_name: str) -> Iterator[psycopg2.extensions.connection]: """ 简单连接管理:每次请求开一个连接,用完就关。 当前访问量不高,这种写法足够。 """ settings = get_settings() conn = psycopg2.connect( host=settings.db_host, port=settings.db_port, user=settings.db_user, password=settings.db_password, dbname=db_name, ) try: # 设置 schema = XCX with conn.cursor() as cur: cur.execute("SET search_path TO %s;", (settings.db_schema,)) yield conn finally: conn.close() def check_health(db_name: str) -> bool: """ DB 健康检查:SELECT 1 """ with get_connection(db_name) as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT 1 AS v;") row = cur.fetchone() return row["v"] == 1