generated from root/feiqiu-ETL
42 lines
1.1 KiB
Python
42 lines
1.1 KiB
Python
# app/db.py
|
||
from contextlib import contextmanager
|
||
from typing import Iterator
|
||
|
||
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
|
||
from .config import get_settings
|
||
|
||
|
||
@contextmanager
|
||
def get_connection(db_name: str) -> Iterator[psycopg2.extensions.connection]:
|
||
"""
|
||
简单连接管理:每次请求开一个连接,用完就关。
|
||
当前访问量不高,这种写法足够。
|
||
"""
|
||
settings = get_settings()
|
||
conn = psycopg2.connect(
|
||
host=settings.db_host,
|
||
port=settings.db_port,
|
||
user=settings.db_user,
|
||
password=settings.db_password,
|
||
dbname=db_name,
|
||
)
|
||
try:
|
||
# 设置 schema = XCX
|
||
with conn.cursor() as cur:
|
||
cur.execute("SET search_path TO %s;", (settings.db_schema,))
|
||
yield conn
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def check_health(db_name: str) -> bool:
|
||
"""
|
||
DB 健康检查:SELECT 1
|
||
"""
|
||
with get_connection(db_name) as conn:
|
||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute("SELECT 1 AS v;")
|
||
row = cur.fetchone()
|
||
return row["v"] == 1 |