Files
Neo-ZQYY/apps/backend/app/trace/db_wrapper.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

223 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
数据库连接包装器 — 拦截 cursor.execute() 和连接生命周期
包装 psycopg2 连接对象,自动记录以下 span
- DB_QUERY: 每条 SQL 的语句、参数、行数、耗时、调用来源
- DB_CONN: 连接获取耗时(在 database.py 中记录)
- DB_CONN_RELEASE: 连接释放
- DB_ERROR: 数据库异常PostgreSQL 错误码、消息、触发 SQL
当 DEV_TRACE_LOG_SQL=false 时SQL 语句替换为 "[SQL hidden]"
当无活跃 TraceContext 时零开销直接委托原始对象。
"""
from __future__ import annotations
import inspect
import time
from datetime import datetime
from typing import Any
import psycopg2
from app.trace.config import get_trace_config
from app.trace.context import (
SpanType,
TraceSpan,
get_current_trace,
)
# SQL 被隐藏时的占位符
_SQL_HIDDEN = "[SQL hidden]"
def _get_caller_info() -> tuple[str, str]:
"""获取调用来源(跳过 wrapper 自身的栈帧)。
返回 (module, function) 元组。
"""
frame = inspect.currentframe()
try:
# 向上跳 3 层_get_caller_info → execute → 调用方
caller = frame
for _ in range(3):
if caller is not None:
caller = caller.f_back
if caller is not None:
module = caller.f_globals.get("__name__", "unknown")
func_name = caller.f_code.co_name
return module, func_name
finally:
del frame
return "unknown", "unknown"
def _safe_sql_repr(sql: Any) -> str:
"""安全地将 SQL 转为字符串表示。"""
if sql is None:
return ""
if isinstance(sql, bytes):
return sql.decode("utf-8", errors="replace")
return str(sql)
def _safe_params_repr(params: Any) -> Any:
"""安全地将 SQL 参数转为可序列化的表示。"""
if params is None:
return None
if isinstance(params, (list, tuple)):
return [_safe_param_value(v) for v in params]
if isinstance(params, dict):
return {str(k): _safe_param_value(v) for k, v in params.items()}
return str(params)
def _safe_param_value(value: Any) -> Any:
"""安全地将单个参数值转为可序列化的表示。"""
if isinstance(value, (str, int, float, bool, type(None))):
return value
return str(value)
class TracedCursor:
"""包装 psycopg2 cursor拦截 execute() 记录 DB_QUERY / DB_ERROR span。
未实现的方法通过 __getattr__ 委托给底层 cursor。
"""
def __init__(self, cursor: Any) -> None:
# 使用 object.__setattr__ 避免触发 __getattr__
object.__setattr__(self, "_cursor", cursor)
def execute(self, sql: Any, params: Any = None) -> None:
"""拦截 execute记录 DB_QUERY span异常时记录 DB_ERROR span。"""
ctx = get_current_trace()
if ctx is None:
# 无活跃 trace直接执行
self._cursor.execute(sql, params)
return
config = get_trace_config()
caller_module, caller_func = _get_caller_info()
sql_str = _safe_sql_repr(sql)
display_sql = sql_str if config.log_sql else _SQL_HIDDEN
start = time.perf_counter()
try:
self._cursor.execute(sql, params)
except psycopg2.Error as exc:
elapsed = (time.perf_counter() - start) * 1000
# 记录 DB_ERROR span
ctx.add_span(TraceSpan(
span_type=SpanType.DB_ERROR,
module=caller_module,
function=caller_func,
description_zh=f"数据库异常: {type(exc).__name__}",
description_en=f"Database error: {type(exc).__name__}",
params={},
result_summary=str(exc).strip()[:200],
duration_ms=elapsed,
timestamp=datetime.now().isoformat(),
extra={
"pgcode": getattr(exc, "pgcode", None),
"pgerror": (getattr(exc, "pgerror", None) or "")[:500],
"sql": display_sql,
},
))
raise
elapsed = (time.perf_counter() - start) * 1000
# 获取行数
row_count = self._cursor.rowcount if self._cursor.rowcount >= 0 else 0
ctx.add_span(TraceSpan(
span_type=SpanType.DB_QUERY,
module=caller_module,
function=caller_func,
description_zh=f"执行 SQL 查询,返回 {row_count}",
description_en=f"Executed SQL query, returned {row_count} rows",
params={"params": _safe_params_repr(params)} if config.log_sql else {},
result_summary=f"{row_count}",
duration_ms=elapsed,
timestamp=datetime.now().isoformat(),
extra={
"sql": display_sql,
"params": _safe_params_repr(params) if config.log_sql else None,
"row_count": row_count,
"caller": f"{caller_module}.{caller_func}",
},
))
def __getattr__(self, name: str) -> Any:
"""未实现的方法委托给底层 cursor。"""
return getattr(self._cursor, name)
def __iter__(self):
return iter(self._cursor)
def __next__(self):
return next(self._cursor)
def __enter__(self):
return self
def __exit__(self, *args):
self._cursor.close()
class TracedConnection:
"""包装 psycopg2 连接,拦截 cursor() 和 close()。
- cursor() 返回 TracedCursor
- close() 记录 DB_CONN_RELEASE span
- 其他方法通过 __getattr__ 委托给底层连接
"""
def __init__(self, conn: Any) -> None:
object.__setattr__(self, "_conn", conn)
object.__setattr__(self, "_closed_traced", False)
def cursor(self, *args: Any, **kwargs: Any) -> TracedCursor:
"""返回包装后的 TracedCursor。"""
raw_cursor = self._conn.cursor(*args, **kwargs)
return TracedCursor(raw_cursor)
def close(self) -> None:
"""关闭连接并记录 DB_CONN_RELEASE span。"""
if not self._closed_traced:
object.__setattr__(self, "_closed_traced", True)
ctx = get_current_trace()
if ctx is not None:
ctx.add_span(TraceSpan(
span_type=SpanType.DB_CONN_RELEASE,
module="trace.db_wrapper",
function="TracedConnection.close",
description_zh="释放数据库连接",
description_en="Released database connection",
params={},
result_summary="closed",
duration_ms=0.0,
timestamp=datetime.now().isoformat(),
))
self._conn.close()
def __getattr__(self, name: str) -> Any:
"""未实现的方法委托给底层连接。"""
return getattr(self._conn, name)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def traced_connection(conn: Any) -> TracedConnection:
"""将 psycopg2 连接包装为 TracedConnection。
仅在 trace 启用且有活跃 TraceContext 时调用。
"""
return TracedConnection(conn)