# -*- coding: utf-8 -*- """ 数据库连接包装器 — 拦截 cursor.execute() 和连接生命周期 包装 psycopg2 连接对象,自动记录以下 span: - DB_QUERY: 每条 SQL 的语句、参数、行数、耗时、调用来源 - DB_CONN: 连接获取耗时(在 database.py 中记录) - DB_CONN_RELEASE: 连接释放 - DB_ERROR: 数据库异常(PostgreSQL 错误码、消息、触发 SQL) 当 DEV_TRACE_LOG_SQL=false 时,SQL 语句替换为 "[SQL hidden]"。 当无活跃 TraceContext 时零开销直接委托原始对象。 """ from __future__ import annotations import inspect import time from datetime import datetime from typing import Any import psycopg2 from app.trace.config import get_trace_config from app.trace.context import ( SpanType, TraceSpan, get_current_trace, ) # SQL 被隐藏时的占位符 _SQL_HIDDEN = "[SQL hidden]" def _get_caller_info() -> tuple[str, str]: """获取调用来源(跳过 wrapper 自身的栈帧)。 返回 (module, function) 元组。 """ frame = inspect.currentframe() try: # 向上跳 3 层:_get_caller_info → execute → 调用方 caller = frame for _ in range(3): if caller is not None: caller = caller.f_back if caller is not None: module = caller.f_globals.get("__name__", "unknown") func_name = caller.f_code.co_name return module, func_name finally: del frame return "unknown", "unknown" def _safe_sql_repr(sql: Any) -> str: """安全地将 SQL 转为字符串表示。""" if sql is None: return "" if isinstance(sql, bytes): return sql.decode("utf-8", errors="replace") return str(sql) def _safe_params_repr(params: Any) -> Any: """安全地将 SQL 参数转为可序列化的表示。""" if params is None: return None if isinstance(params, (list, tuple)): return [_safe_param_value(v) for v in params] if isinstance(params, dict): return {str(k): _safe_param_value(v) for k, v in params.items()} return str(params) def _safe_param_value(value: Any) -> Any: """安全地将单个参数值转为可序列化的表示。""" if isinstance(value, (str, int, float, bool, type(None))): return value return str(value) class TracedCursor: """包装 psycopg2 cursor,拦截 execute() 记录 DB_QUERY / DB_ERROR span。 未实现的方法通过 __getattr__ 委托给底层 cursor。 """ def __init__(self, cursor: Any) -> None: # 使用 object.__setattr__ 避免触发 __getattr__ object.__setattr__(self, "_cursor", cursor) def execute(self, sql: Any, params: Any = None) -> None: """拦截 execute,记录 DB_QUERY span;异常时记录 DB_ERROR span。""" ctx = get_current_trace() if ctx is None: # 无活跃 trace,直接执行 self._cursor.execute(sql, params) return config = get_trace_config() caller_module, caller_func = _get_caller_info() sql_str = _safe_sql_repr(sql) display_sql = sql_str if config.log_sql else _SQL_HIDDEN start = time.perf_counter() try: self._cursor.execute(sql, params) except psycopg2.Error as exc: elapsed = (time.perf_counter() - start) * 1000 # 记录 DB_ERROR span ctx.add_span(TraceSpan( span_type=SpanType.DB_ERROR, module=caller_module, function=caller_func, description_zh=f"数据库异常: {type(exc).__name__}", description_en=f"Database error: {type(exc).__name__}", params={}, result_summary=str(exc).strip()[:200], duration_ms=elapsed, timestamp=datetime.now().isoformat(), extra={ "pgcode": getattr(exc, "pgcode", None), "pgerror": (getattr(exc, "pgerror", None) or "")[:500], "sql": display_sql, }, )) raise elapsed = (time.perf_counter() - start) * 1000 # 获取行数 row_count = self._cursor.rowcount if self._cursor.rowcount >= 0 else 0 ctx.add_span(TraceSpan( span_type=SpanType.DB_QUERY, module=caller_module, function=caller_func, description_zh=f"执行 SQL 查询,返回 {row_count} 行", description_en=f"Executed SQL query, returned {row_count} rows", params={"params": _safe_params_repr(params)} if config.log_sql else {}, result_summary=f"{row_count} 行", duration_ms=elapsed, timestamp=datetime.now().isoformat(), extra={ "sql": display_sql, "params": _safe_params_repr(params) if config.log_sql else None, "row_count": row_count, "caller": f"{caller_module}.{caller_func}", }, )) def __getattr__(self, name: str) -> Any: """未实现的方法委托给底层 cursor。""" return getattr(self._cursor, name) def __iter__(self): return iter(self._cursor) def __next__(self): return next(self._cursor) def __enter__(self): return self def __exit__(self, *args): self._cursor.close() class TracedConnection: """包装 psycopg2 连接,拦截 cursor() 和 close()。 - cursor() 返回 TracedCursor - close() 记录 DB_CONN_RELEASE span - 其他方法通过 __getattr__ 委托给底层连接 """ def __init__(self, conn: Any) -> None: object.__setattr__(self, "_conn", conn) object.__setattr__(self, "_closed_traced", False) def cursor(self, *args: Any, **kwargs: Any) -> TracedCursor: """返回包装后的 TracedCursor。""" raw_cursor = self._conn.cursor(*args, **kwargs) return TracedCursor(raw_cursor) def close(self) -> None: """关闭连接并记录 DB_CONN_RELEASE span。""" if not self._closed_traced: object.__setattr__(self, "_closed_traced", True) ctx = get_current_trace() if ctx is not None: ctx.add_span(TraceSpan( span_type=SpanType.DB_CONN_RELEASE, module="trace.db_wrapper", function="TracedConnection.close", description_zh="释放数据库连接", description_en="Released database connection", params={}, result_summary="closed", duration_ms=0.0, timestamp=datetime.now().isoformat(), )) self._conn.close() def __getattr__(self, name: str) -> Any: """未实现的方法委托给底层连接。""" return getattr(self._conn, name) def __enter__(self): return self def __exit__(self, *args): self.close() def traced_connection(conn: Any) -> TracedConnection: """将 psycopg2 连接包装为 TracedConnection。 仅在 trace 启用且有活跃 TraceContext 时调用。 """ return TracedConnection(conn)