Files
Neo-ZQYY/apps/backend/app/database.py

83 lines
2.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库连接
使用 psycopg2 直连 PostgreSQL不引入 ORM。
连接参数从环境变量读取(经 config 模块加载)。
提供两类连接:
- get_connection()zqyy_app 读写连接(用户/队列/调度等业务数据)
- get_etl_readonly_connection(site_id)etl_feiqiu 只读连接(数据库查看器),
自动设置 RLS site_id 隔离
"""
import psycopg2
from psycopg2.extensions import connection as PgConnection
from app.config import (
APP_DB_NAME,
DB_HOST,
DB_PASSWORD,
DB_PORT,
DB_USER,
ETL_DB_HOST,
ETL_DB_NAME,
ETL_DB_PASSWORD,
ETL_DB_PORT,
ETL_DB_USER,
)
def get_connection() -> PgConnection:
"""
获取 zqyy_app 数据库连接。
调用方负责关闭连接(推荐配合 contextmanager 或 try/finally 使用)。
"""
return psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
dbname=APP_DB_NAME,
)
def get_etl_readonly_connection(site_id: int | str) -> PgConnection:
"""
获取 ETL 数据库etl_feiqiu的只读连接。
连接建立后自动执行:
1. SET default_transaction_read_only = on — 禁止写操作
2. SET LOCAL app.current_site_id = '{site_id}' — 启用 RLS 门店隔离
调用方负责关闭连接。典型用法::
conn = get_etl_readonly_connection(site_id)
try:
with conn.cursor() as cur:
cur.execute("SELECT ...")
finally:
conn.close()
"""
conn = psycopg2.connect(
host=ETL_DB_HOST,
port=ETL_DB_PORT,
user=ETL_DB_USER,
password=ETL_DB_PASSWORD,
dbname=ETL_DB_NAME,
)
try:
conn.autocommit = False
with conn.cursor() as cur:
# 会话级只读:防止任何写操作
cur.execute("SET default_transaction_read_only = on")
# 事务级 RLS 隔离:设置当前门店 ID
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
conn.commit()
except Exception:
conn.close()
raise
return conn