Files
Neo-ZQYY/apps/backend/app/database.py
Neo 16c6fb0d3b fix(backend): F1-5b A6 ETL 连接显式 client_encoding=UTF8 防御 GBK (W1)
Windows GBK 环境下 psycopg2/libpq 在拼接连接字符串时,会读取系统
用户名 / 计算机名,若含中文(0xd6 是 GBK 首字节)会触发
UnicodeDecodeError。admin_db_health.py:105-115 已用显式 DSN +
PGCLIENTENCODING 修过,但 database.py 中的 4 个 connect 函数遗漏。

变更:
- apps/backend/app/database.py
  - 新增 _CONN_KWARGS = {**_KEEPALIVE_KWARGS, "client_encoding": "UTF8"}
  - 4 处 psycopg2.connect 调用从 **_KEEPALIVE_KWARGS 改为 **_CONN_KWARGS:
    * get_connection(zqyy_app 业务库)
    * get_etl_global_readonly_connection(ETL 全局只读)
    * get_etl_readonly_connection(ETL RLS 只读)
    * get_etl_write_connection(ETL 可写)

业务影响:
- 影响 75+ 调用点(grep 统计),Windows GBK 环境下未来出现
  UnicodeDecodeError 概率大幅降低
- Linux UTF-8 环境无影响
- ETL RLS / FDW 链路无逻辑变化(client_encoding 是协议层)

验证:
- 后端 reload + /health 200 OK
- /api/admin/db-health 测试库 connected(test_zqyy_app + test_etl_feiqiu)
- BE-3 / T3 unit test 5/5 PASS,间接证明 ETL 连接链路无破坏

§3.3 标"sandbox 无关",4b 跳过(client_encoding 是协议层,与 sandbox
业务时钟无关)。

未加 feature flag ETL_FORCE_UTF8(§8.3 兜底建议):client_encoding=UTF8
是 PostgreSQL 默认安全设置,无需 flag 控制。若未来出现特殊业务字段
含非 UTF-8 字节再考虑加 flag。

审计:docs/audit/changes/2026-05-05__wave1_f1_5b_a6_etl_conn_utf8.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 22:11:43 +08:00

202 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库连接
使用 psycopg2 直连 PostgreSQL不引入 ORM。
连接参数从环境变量读取(经 config 模块加载)。
提供两类连接:
- get_connection()zqyy_app 读写连接(用户/队列/调度等业务数据)
- get_etl_readonly_connection(site_id)etl_feiqiu 只读连接(数据库查看器),
自动设置 RLS site_id 隔离
当 DEV_TRACE_ENABLED=true 且存在活跃 TraceContext 时,
get_connection() 返回 TracedConnection 包装,自动记录 DB_CONN / DB_QUERY / DB_CONN_RELEASE span。
"""
import time
import psycopg2
from psycopg2.extensions import connection as PgConnection
from app.config import (
APP_DB_NAME,
DB_HOST,
DB_PASSWORD,
DB_PORT,
DB_USER,
ETL_DB_HOST,
ETL_DB_NAME,
ETL_DB_PASSWORD,
ETL_DB_PORT,
ETL_DB_USER,
)
# TCP keepalive 参数:防止长期运行的后台服务连接被网络设备/PostgreSQL 回收
_KEEPALIVE_KWARGS = {
"keepalives": 1,
"keepalives_idle": 60, # 空闲 60 秒后开始探测
"keepalives_interval": 10, # 每 10 秒探测一次
"keepalives_count": 3, # 连续 3 次失败判定断开
}
# F1-5b A6: 显式 client_encoding 防御 Windows GBK 环境下 libpq 拼接连接字符串
# 时混入系统 locale 触发 UnicodeDecodeError(参见 admin_db_health.py 同款修复)。
# 加此参数后,psycopg2 在握手时明确告知服务器使用 UTF-8 编码,
# 不再依赖系统/客户端默认 locale。
_CONN_KWARGS = {
**_KEEPALIVE_KWARGS,
"client_encoding": "UTF8",
}
# 连接重试参数:应对 PostgreSQL 瞬时不可用Tailscale 网络抖动等)
_CONNECT_MAX_RETRIES = 3
_CONNECT_RETRY_DELAY = 1.0 # 秒
def _connect_with_retry(connect_fn, max_retries=_CONNECT_MAX_RETRIES):
"""带重试的数据库连接,应对服务端瞬时拒绝连接。"""
last_exc = None
for attempt in range(max_retries):
try:
return connect_fn()
except psycopg2.OperationalError as e:
last_exc = e
if attempt < max_retries - 1:
time.sleep(_CONNECT_RETRY_DELAY * (attempt + 1))
raise last_exc
def get_connection() -> PgConnection:
"""
获取 zqyy_app 数据库连接。
调用方负责关闭连接(推荐配合 contextmanager 或 try/finally 使用)。
当 trace 启用且有活跃 TraceContext 时,返回 TracedConnection 包装,
自动记录 DB_CONN span连接获取耗时并拦截后续 SQL 执行。
"""
# CHANGE 2026-03-22 | task 8.2 | 集成 trace db_wrapper仅 trace 启用时包装
from app.trace.config import get_trace_config
from app.trace.context import SpanType, TraceSpan, get_current_trace
config = get_trace_config()
should_trace = config.enabled and get_current_trace() is not None
start = time.perf_counter() if should_trace else 0.0
conn = _connect_with_retry(lambda: psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
dbname=APP_DB_NAME,
**_CONN_KWARGS,
))
if should_trace:
from datetime import datetime
from app.trace.db_wrapper import traced_connection
elapsed_ms = (time.perf_counter() - start) * 1000
ctx = get_current_trace()
# ctx 不为 None上面已检查
ctx.add_span(TraceSpan(
span_type=SpanType.DB_CONN,
module="app.database",
function="get_connection",
description_zh=f"获取数据库连接,耗时 {elapsed_ms:.1f}ms",
description_en=f"Acquired database connection in {elapsed_ms:.1f}ms",
params={},
result_summary=f"{elapsed_ms:.1f}ms",
duration_ms=elapsed_ms,
timestamp=datetime.now().isoformat(),
))
return traced_connection(conn)
return conn
def get_etl_global_readonly_connection() -> PgConnection:
"""
获取 ETL 数据库的全局只读连接(不设 RLS
用于系统管理后台等不需要门店隔离的场景(如 ETL 状态监控)。
"""
conn = _connect_with_retry(lambda: psycopg2.connect(
host=ETL_DB_HOST,
port=ETL_DB_PORT,
user=ETL_DB_USER,
password=ETL_DB_PASSWORD,
dbname=ETL_DB_NAME,
**_CONN_KWARGS,
))
try:
conn.autocommit = False
with conn.cursor() as cur:
cur.execute("SET default_transaction_read_only = on")
conn.commit()
except Exception:
conn.close()
raise
return conn
def get_etl_readonly_connection(site_id: int | str) -> PgConnection:
"""
获取 ETL 数据库etl_feiqiu的只读连接。
连接建立后自动执行:
1. SET default_transaction_read_only = on — 禁止写操作
2. SET LOCAL app.current_site_id = '{site_id}' — 启用 RLS 门店隔离
调用方负责关闭连接。典型用法::
conn = get_etl_readonly_connection(site_id)
try:
with conn.cursor() as cur:
cur.execute("SELECT ...")
finally:
conn.close()
"""
conn = _connect_with_retry(lambda: psycopg2.connect(
host=ETL_DB_HOST,
port=ETL_DB_PORT,
user=ETL_DB_USER,
password=ETL_DB_PASSWORD,
dbname=ETL_DB_NAME,
**_CONN_KWARGS,
))
try:
conn.autocommit = False
with conn.cursor() as cur:
# 会话级只读:防止任何写操作
cur.execute("SET default_transaction_read_only = on")
# 事务级 RLS 隔离:设置当前门店 ID
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
conn.commit()
except Exception:
conn.close()
raise
return conn
def get_etl_write_connection() -> PgConnection:
"""
获取 ETL 数据库etl_feiqiu的可写连接。
仅用于后端需要回写 ETL 汇总表的场景(如 task_generator 回写关系指数统计)。
不设置 RLS 隔离,调用方需在 SQL 中显式指定 site_id。
调用方负责关闭连接。
"""
conn = _connect_with_retry(lambda: psycopg2.connect(
host=ETL_DB_HOST,
port=ETL_DB_PORT,
user=ETL_DB_USER,
password=ETL_DB_PASSWORD,
dbname=ETL_DB_NAME,
**_CONN_KWARGS,
))
conn.autocommit = False
return conn