# -*- coding: utf-8 -*- """ JSON Lines 日志写入器 负责将 TraceContext 序列化为 JSON 并追加写入 .jsonl 文件。 按日期分目录(YYYY-MM-DD/),按小时分文件(trace_YYYY-MM-DD_HH.jsonl), 单文件超过 10MB 自动轮转(trace_YYYY-MM-DD_HH_NNN.jsonl)。 维护 _index.json 索引文件(文件列表、记录数、文件大小)。 """ from __future__ import annotations import asyncio import json import logging from datetime import datetime from pathlib import Path from typing import Any from app.trace.config import get_trace_config from app.trace.context import TraceContext, TraceSpan, SpanType logger = logging.getLogger(__name__) # 单文件大小上限(10MB) MAX_FILE_SIZE = 10 * 1024 * 1024 # ────────────────────────────────────────────── # 序列化函数 # ────────────────────────────────────────────── def serialize_span(span: TraceSpan) -> dict[str, Any]: """将 TraceSpan 转为可 JSON 序列化的 dict。 包含所有必需字段:span_type, module, function, description_zh, description_en, params, result_summary, duration_ms, timestamp, extra。 """ return { "span_type": span.span_type, "module": span.module, "function": span.function, "description_zh": span.description_zh, "description_en": span.description_en, "params": span.params, "result_summary": span.result_summary, "duration_ms": span.duration_ms, "timestamp": span.timestamp, "extra": span.extra, } def serialize_trace(ctx: TraceContext) -> dict[str, Any]: """将 TraceContext 转为可 JSON 序列化的 dict。 包含完整字段:request_id, trace_type, timestamp, method, path, status_code, total_duration_ms, user_id, site_id, db_query_count, db_total_ms, error, spans。 衍生字段从 spans 列表计算: - status_code: 从 HTTP_OUT span 的 extra 或 result_summary 提取 - total_duration_ms: 从 HTTP_OUT span 的 duration_ms 获取 - db_query_count: 统计 DB_QUERY span 数量 - db_total_ms: 累加 DB_QUERY span 的 duration_ms - error: 从 ERROR/DB_ERROR span 提取错误信息 """ spans = ctx.spans # 衍生字段:status_code — 从 HTTP_OUT span 提取 status_code: int | None = None total_duration_ms: float = 0.0 for s in spans: if s.span_type == SpanType.HTTP_OUT: # 优先从 extra 中取 status_code status_code = s.extra.get("status_code") if status_code is None: # 尝试从 result_summary 解析,如 "200 OK, 3.2KB body" summary = s.result_summary or "" parts = summary.split() if parts and parts[0].isdigit(): status_code = int(parts[0]) total_duration_ms = s.duration_ms break # 衍生字段:db_query_count, db_total_ms db_query_count = 0 db_total_ms = 0.0 for s in spans: if s.span_type == SpanType.DB_QUERY: db_query_count += 1 db_total_ms += s.duration_ms # 衍生字段:error — 从 ERROR/DB_ERROR span 提取 error: str | None = None for s in spans: if s.span_type in (SpanType.ERROR, SpanType.DB_ERROR): # 拼接错误信息:优先 result_summary,其次 description_zh error = s.result_summary or s.description_zh or s.description_en break return { "request_id": ctx.request_id, "trace_type": ctx.trace_type, "timestamp": ctx.start_time.isoformat() if isinstance(ctx.start_time, datetime) else str(ctx.start_time), "method": ctx.method, "path": ctx.path, "status_code": status_code, "total_duration_ms": total_duration_ms, "user_id": ctx.user_id, "site_id": ctx.site_id, "db_query_count": db_query_count, "db_total_ms": round(db_total_ms, 2), "error": error, "spans": [serialize_span(s) for s in spans], } # ────────────────────────────────────────────── # 路径生成函数 # ────────────────────────────────────────────── def get_log_dir(dt: datetime, base_dir: str | None = None) -> Path: """返回日期目录路径:{base_dir}/{YYYY-MM-DD}/。 Args: dt: 时间戳,用于确定日期目录名 base_dir: 日志根目录,默认从 TraceConfig 读取 """ if base_dir is None: base_dir = get_trace_config().log_dir date_str = dt.strftime("%Y-%m-%d") return Path(base_dir) / date_str def get_log_file(dt: datetime, base_dir: str | None = None) -> Path: """返回当前小时的日志文件路径:{base_dir}/{YYYY-MM-DD}/trace_YYYY-MM-DD_HH.jsonl。 Args: dt: 时间戳,用于确定日期目录和小时文件名 base_dir: 日志根目录,默认从 TraceConfig 读取 """ date_dir = get_log_dir(dt, base_dir=base_dir) filename = f"trace_{dt.strftime('%Y-%m-%d_%H')}.jsonl" return date_dir / filename # ────────────────────────────────────────────── # TraceWriter 类 # ────────────────────────────────────────────── class TraceWriter: """JSON Lines 日志写入器。 负责将 TraceContext 序列化为 JSON 并追加写入 .jsonl 文件。 支持按日期分目录、按小时分文件、超 10MB 自动轮转、索引维护。 """ def __init__(self, base_dir: str | None = None): """初始化写入器。 Args: base_dir: 日志根目录,默认从 TraceConfig 读取 """ self._base_dir = base_dir @property def base_dir(self) -> str: """获取日志根目录。""" if self._base_dir is not None: return self._base_dir return get_trace_config().log_dir def _rotate_if_needed(self, filepath: Path) -> Path: """检查文件大小,需要时返回轮转后的新路径。 如果文件不存在或未超过 10MB,返回原路径。 如果超过 10MB,返回带序号后缀的新路径(trace_YYYY-MM-DD_HH_001.jsonl)。 """ if not filepath.exists(): return filepath if filepath.stat().st_size < MAX_FILE_SIZE: return filepath # 需要轮转:查找下一个可用序号 stem = filepath.stem # trace_YYYY-MM-DD_HH parent = filepath.parent # 如果当前文件名已经带序号(如 trace_2026-03-22_14_001), # 提取基础名称 parts = stem.rsplit("_", 1) if len(parts) == 2 and parts[1].isdigit() and len(parts[1]) == 3: base_stem = parts[0] else: base_stem = stem # 查找已有的轮转文件,确定下一个序号 seq = 1 while True: candidate = parent / f"{base_stem}_{seq:03d}.jsonl" if not candidate.exists() or candidate.stat().st_size < MAX_FILE_SIZE: return candidate seq += 1 def _update_index(self, date_dir: Path, filename: str, record_count_delta: int) -> None: """更新索引文件 _index.json。 索引结构: { "files": { "trace_2026-03-22_14.jsonl": { "record_count": 42, "file_size": 12345 } } } """ index_path = date_dir / "_index.json" # 读取现有索引 index: dict[str, Any] = {"files": {}} if index_path.exists(): try: index = json.loads(index_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): index = {"files": {}} if "files" not in index: index["files"] = {} # 更新文件条目 file_entry = index["files"].get(filename, {"record_count": 0, "file_size": 0}) file_entry["record_count"] = file_entry.get("record_count", 0) + record_count_delta # 更新实际文件大小 file_path = date_dir / filename if file_path.exists(): file_entry["file_size"] = file_path.stat().st_size else: file_entry["file_size"] = 0 index["files"][filename] = file_entry # 写回索引 index_path.write_text( json.dumps(index, ensure_ascii=False, indent=2), encoding="utf-8", ) async def write_trace(self, ctx: TraceContext) -> None: """异步写入 TraceContext 到 JSON Lines 文件。 写入失败时静默处理(仅记录日志),不影响请求处理。 """ try: # 序列化 data = serialize_trace(ctx) line = json.dumps(data, ensure_ascii=False, default=str) # 确定写入路径 dt = ctx.start_time if isinstance(ctx.start_time, datetime) else datetime.now() filepath = get_log_file(dt, base_dir=self.base_dir) # 确保目录存在 filepath.parent.mkdir(parents=True, exist_ok=True) # 检查是否需要轮转 filepath = self._rotate_if_needed(filepath) # 异步写入(在线程池中执行 IO 操作) await asyncio.to_thread(self._sync_write, filepath, line) except Exception: # 写入失败不影响请求处理,仅记录日志 logger.warning("Trace 日志写入失败", exc_info=True) def _sync_write(self, filepath: Path, line: str) -> None: """同步写入一行 JSON 到文件,并更新索引。""" # 追加写入 with open(filepath, "a", encoding="utf-8") as f: f.write(line + "\n") # 更新索引 date_dir = filepath.parent filename = filepath.name self._update_index(date_dir, filename, record_count_delta=1) # ────────────────────────────────────────────── # 单例访问 # ────────────────────────────────────────────── _writer_instance: TraceWriter | None = None def get_trace_writer() -> TraceWriter: """获取全局 TraceWriter 单例。""" global _writer_instance if _writer_instance is None: _writer_instance = TraceWriter() return _writer_instance