generated from root/feiqiu-ETL
正式起服务打底
This commit is contained in:
42
app/db.py
Normal file
42
app/db.py
Normal file
@@ -0,0 +1,42 @@
|
||||
# app/db.py
|
||||
from contextlib import contextmanager
|
||||
from typing import Iterator
|
||||
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
|
||||
from .config import get_settings
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_connection(db_name: str) -> Iterator[psycopg2.extensions.connection]:
|
||||
"""
|
||||
简单连接管理:每次请求开一个连接,用完就关。
|
||||
当前访问量不高,这种写法足够。
|
||||
"""
|
||||
settings = get_settings()
|
||||
conn = psycopg2.connect(
|
||||
host=settings.db_host,
|
||||
port=settings.db_port,
|
||||
user=settings.db_user,
|
||||
password=settings.db_password,
|
||||
dbname=db_name,
|
||||
)
|
||||
try:
|
||||
# 设置 schema = XCX
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SET search_path TO %s;", (settings.db_schema,))
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def check_health(db_name: str) -> bool:
|
||||
"""
|
||||
DB 健康检查:SELECT 1
|
||||
"""
|
||||
with get_connection(db_name) as conn:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
cur.execute("SELECT 1 AS v;")
|
||||
row = cur.fetchone()
|
||||
return row["v"] == 1
|
||||
Reference in New Issue
Block a user