# -*- coding: utf-8 -*- """ Trace 装饰器模块 提供 trace_service 装饰器,自动为 Service 层函数记录 SERVICE span。 当无活跃 TraceContext 时零开销直接调用原函数。 """ from __future__ import annotations import asyncio import functools import inspect import time from datetime import datetime from typing import Any, Callable from app.trace.config import get_trace_config from app.trace.context import SpanType, TraceSpan, get_current_trace # 参数值被隐藏时的占位符 _REDACTED = "[redacted]" # 返回值摘要最大长度 _RESULT_SUMMARY_MAX_LEN = 120 def _build_params_dict(func: Callable, args: tuple, kwargs: dict, *, redact: bool) -> dict[str, Any]: """从函数签名和实际调用参数构建参数字典。 redact=True 时只记录参数名,值替换为 "[redacted]"。 """ sig = inspect.signature(func) bound = sig.bind(*args, **kwargs) bound.apply_defaults() if redact: return {name: _REDACTED for name in bound.arguments} return {name: _safe_repr(val) for name, val in bound.arguments.items()} def _safe_repr(value: Any) -> Any: """安全地将参数值转为可序列化的表示。""" if isinstance(value, (str, int, float, bool, type(None))): return value if isinstance(value, (list, tuple)): if len(value) <= 10: return [_safe_repr(v) for v in value] return f"[{len(value)} items]" if isinstance(value, dict): if len(value) <= 10: return {str(k): _safe_repr(v) for k, v in value.items()} return f"{{{len(value)} keys}}" # 其他类型用 repr 截断 r = repr(value) if len(r) > 100: return r[:97] + "..." return r def _make_result_summary(result: Any) -> str: """从返回值生成摘要字符串。""" if result is None: return "None" if isinstance(result, (str, int, float, bool)): s = str(result) if len(s) > _RESULT_SUMMARY_MAX_LEN: return s[:_RESULT_SUMMARY_MAX_LEN - 3] + "..." return s if isinstance(result, (list, tuple)): return f"[{len(result)} items]" if isinstance(result, dict): return f"{{{len(result)} keys}}" r = repr(result) if len(r) > _RESULT_SUMMARY_MAX_LEN: return r[:_RESULT_SUMMARY_MAX_LEN - 3] + "..." return r def trace_service(description_zh: str, description_en: str) -> Callable: """Service 层函数装饰器,自动记录 SERVICE span。 - 无活跃 TraceContext 时零开销直接调用原函数 - DEV_TRACE_LOG_PARAMS=false 时参数值替换为 "[redacted]" - 支持同步和异步函数 """ def decorator(func: Callable) -> Callable: module_name = func.__module__ or "" func_name = func.__qualname__ or func.__name__ if asyncio.iscoroutinefunction(func): @functools.wraps(func) async def async_wrapper(*args: Any, **kwargs: Any) -> Any: ctx = get_current_trace() if ctx is None: return await func(*args, **kwargs) config = get_trace_config() redact = not config.log_params params = _build_params_dict(func, args, kwargs, redact=redact) start = time.perf_counter() try: result = await func(*args, **kwargs) except Exception: elapsed = (time.perf_counter() - start) * 1000 ctx.add_span(TraceSpan( span_type=SpanType.SERVICE, module=module_name, function=func_name, description_zh=description_zh, description_en=description_en, params=params, result_summary="exception", duration_ms=elapsed, timestamp=datetime.now().isoformat(), )) raise elapsed = (time.perf_counter() - start) * 1000 ctx.add_span(TraceSpan( span_type=SpanType.SERVICE, module=module_name, function=func_name, description_zh=description_zh, description_en=description_en, params=params, result_summary=_make_result_summary(result), duration_ms=elapsed, timestamp=datetime.now().isoformat(), )) return result return async_wrapper else: @functools.wraps(func) def sync_wrapper(*args: Any, **kwargs: Any) -> Any: ctx = get_current_trace() if ctx is None: return func(*args, **kwargs) config = get_trace_config() redact = not config.log_params params = _build_params_dict(func, args, kwargs, redact=redact) start = time.perf_counter() try: result = func(*args, **kwargs) except Exception: elapsed = (time.perf_counter() - start) * 1000 ctx.add_span(TraceSpan( span_type=SpanType.SERVICE, module=module_name, function=func_name, description_zh=description_zh, description_en=description_en, params=params, result_summary="exception", duration_ms=elapsed, timestamp=datetime.now().isoformat(), )) raise elapsed = (time.perf_counter() - start) * 1000 ctx.add_span(TraceSpan( span_type=SpanType.SERVICE, module=module_name, function=func_name, description_zh=description_zh, description_en=description_en, params=params, result_summary=_make_result_summary(result), duration_ms=elapsed, timestamp=datetime.now().isoformat(), )) return result return sync_wrapper return decorator def truncate_token(token: str) -> str: """截断 token,仅保留前 16 个字符 + '...' 后缀。 短于等于 16 字符的 token 原样返回。 """ if len(token) <= 16: return token return token[:16] + "..."