Files
Neo-ZQYY/apps/backend/app/trace/writer.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

302 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
JSON Lines 日志写入器
负责将 TraceContext 序列化为 JSON 并追加写入 .jsonl 文件。
按日期分目录YYYY-MM-DD/按小时分文件trace_YYYY-MM-DD_HH.jsonl
单文件超过 10MB 自动轮转trace_YYYY-MM-DD_HH_NNN.jsonl
维护 _index.json 索引文件(文件列表、记录数、文件大小)。
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any
from app.trace.config import get_trace_config
from app.trace.context import TraceContext, TraceSpan, SpanType
logger = logging.getLogger(__name__)
# 单文件大小上限10MB
MAX_FILE_SIZE = 10 * 1024 * 1024
# ──────────────────────────────────────────────
# 序列化函数
# ──────────────────────────────────────────────
def serialize_span(span: TraceSpan) -> dict[str, Any]:
"""将 TraceSpan 转为可 JSON 序列化的 dict。
包含所有必需字段span_type, module, function, description_zh,
description_en, params, result_summary, duration_ms, timestamp, extra。
"""
return {
"span_type": span.span_type,
"module": span.module,
"function": span.function,
"description_zh": span.description_zh,
"description_en": span.description_en,
"params": span.params,
"result_summary": span.result_summary,
"duration_ms": span.duration_ms,
"timestamp": span.timestamp,
"extra": span.extra,
}
def serialize_trace(ctx: TraceContext) -> dict[str, Any]:
"""将 TraceContext 转为可 JSON 序列化的 dict。
包含完整字段request_id, trace_type, timestamp, method, path,
status_code, total_duration_ms, user_id, site_id,
db_query_count, db_total_ms, error, spans。
衍生字段从 spans 列表计算:
- status_code: 从 HTTP_OUT span 的 extra 或 result_summary 提取
- total_duration_ms: 从 HTTP_OUT span 的 duration_ms 获取
- db_query_count: 统计 DB_QUERY span 数量
- db_total_ms: 累加 DB_QUERY span 的 duration_ms
- error: 从 ERROR/DB_ERROR span 提取错误信息
"""
spans = ctx.spans
# 衍生字段status_code — 从 HTTP_OUT span 提取
status_code: int | None = None
total_duration_ms: float = 0.0
for s in spans:
if s.span_type == SpanType.HTTP_OUT:
# 优先从 extra 中取 status_code
status_code = s.extra.get("status_code")
if status_code is None:
# 尝试从 result_summary 解析,如 "200 OK, 3.2KB body"
summary = s.result_summary or ""
parts = summary.split()
if parts and parts[0].isdigit():
status_code = int(parts[0])
total_duration_ms = s.duration_ms
break
# 衍生字段db_query_count, db_total_ms
db_query_count = 0
db_total_ms = 0.0
for s in spans:
if s.span_type == SpanType.DB_QUERY:
db_query_count += 1
db_total_ms += s.duration_ms
# 衍生字段error — 从 ERROR/DB_ERROR span 提取
error: str | None = None
for s in spans:
if s.span_type in (SpanType.ERROR, SpanType.DB_ERROR):
# 拼接错误信息:优先 result_summary其次 description_zh
error = s.result_summary or s.description_zh or s.description_en
break
return {
"request_id": ctx.request_id,
"trace_type": ctx.trace_type,
"timestamp": ctx.start_time.isoformat() if isinstance(ctx.start_time, datetime) else str(ctx.start_time),
"method": ctx.method,
"path": ctx.path,
"status_code": status_code,
"total_duration_ms": total_duration_ms,
"user_id": ctx.user_id,
"site_id": ctx.site_id,
"db_query_count": db_query_count,
"db_total_ms": round(db_total_ms, 2),
"error": error,
"spans": [serialize_span(s) for s in spans],
}
# ──────────────────────────────────────────────
# 路径生成函数
# ──────────────────────────────────────────────
def get_log_dir(dt: datetime, base_dir: str | None = None) -> Path:
"""返回日期目录路径:{base_dir}/{YYYY-MM-DD}/。
Args:
dt: 时间戳,用于确定日期目录名
base_dir: 日志根目录,默认从 TraceConfig 读取
"""
if base_dir is None:
base_dir = get_trace_config().log_dir
date_str = dt.strftime("%Y-%m-%d")
return Path(base_dir) / date_str
def get_log_file(dt: datetime, base_dir: str | None = None) -> Path:
"""返回当前小时的日志文件路径:{base_dir}/{YYYY-MM-DD}/trace_YYYY-MM-DD_HH.jsonl。
Args:
dt: 时间戳,用于确定日期目录和小时文件名
base_dir: 日志根目录,默认从 TraceConfig 读取
"""
date_dir = get_log_dir(dt, base_dir=base_dir)
filename = f"trace_{dt.strftime('%Y-%m-%d_%H')}.jsonl"
return date_dir / filename
# ──────────────────────────────────────────────
# TraceWriter 类
# ──────────────────────────────────────────────
class TraceWriter:
"""JSON Lines 日志写入器。
负责将 TraceContext 序列化为 JSON 并追加写入 .jsonl 文件。
支持按日期分目录、按小时分文件、超 10MB 自动轮转、索引维护。
"""
def __init__(self, base_dir: str | None = None):
"""初始化写入器。
Args:
base_dir: 日志根目录,默认从 TraceConfig 读取
"""
self._base_dir = base_dir
@property
def base_dir(self) -> str:
"""获取日志根目录。"""
if self._base_dir is not None:
return self._base_dir
return get_trace_config().log_dir
def _rotate_if_needed(self, filepath: Path) -> Path:
"""检查文件大小,需要时返回轮转后的新路径。
如果文件不存在或未超过 10MB返回原路径。
如果超过 10MB返回带序号后缀的新路径trace_YYYY-MM-DD_HH_001.jsonl
"""
if not filepath.exists():
return filepath
if filepath.stat().st_size < MAX_FILE_SIZE:
return filepath
# 需要轮转:查找下一个可用序号
stem = filepath.stem # trace_YYYY-MM-DD_HH
parent = filepath.parent
# 如果当前文件名已经带序号(如 trace_2026-03-22_14_001
# 提取基础名称
parts = stem.rsplit("_", 1)
if len(parts) == 2 and parts[1].isdigit() and len(parts[1]) == 3:
base_stem = parts[0]
else:
base_stem = stem
# 查找已有的轮转文件,确定下一个序号
seq = 1
while True:
candidate = parent / f"{base_stem}_{seq:03d}.jsonl"
if not candidate.exists() or candidate.stat().st_size < MAX_FILE_SIZE:
return candidate
seq += 1
def _update_index(self, date_dir: Path, filename: str, record_count_delta: int) -> None:
"""更新索引文件 _index.json。
索引结构:
{
"files": {
"trace_2026-03-22_14.jsonl": {
"record_count": 42,
"file_size": 12345
}
}
}
"""
index_path = date_dir / "_index.json"
# 读取现有索引
index: dict[str, Any] = {"files": {}}
if index_path.exists():
try:
index = json.loads(index_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
index = {"files": {}}
if "files" not in index:
index["files"] = {}
# 更新文件条目
file_entry = index["files"].get(filename, {"record_count": 0, "file_size": 0})
file_entry["record_count"] = file_entry.get("record_count", 0) + record_count_delta
# 更新实际文件大小
file_path = date_dir / filename
if file_path.exists():
file_entry["file_size"] = file_path.stat().st_size
else:
file_entry["file_size"] = 0
index["files"][filename] = file_entry
# 写回索引
index_path.write_text(
json.dumps(index, ensure_ascii=False, indent=2),
encoding="utf-8",
)
async def write_trace(self, ctx: TraceContext) -> None:
"""异步写入 TraceContext 到 JSON Lines 文件。
写入失败时静默处理(仅记录日志),不影响请求处理。
"""
try:
# 序列化
data = serialize_trace(ctx)
line = json.dumps(data, ensure_ascii=False, default=str)
# 确定写入路径
dt = ctx.start_time if isinstance(ctx.start_time, datetime) else datetime.now()
filepath = get_log_file(dt, base_dir=self.base_dir)
# 确保目录存在
filepath.parent.mkdir(parents=True, exist_ok=True)
# 检查是否需要轮转
filepath = self._rotate_if_needed(filepath)
# 异步写入(在线程池中执行 IO 操作)
await asyncio.to_thread(self._sync_write, filepath, line)
except Exception:
# 写入失败不影响请求处理,仅记录日志
logger.warning("Trace 日志写入失败", exc_info=True)
def _sync_write(self, filepath: Path, line: str) -> None:
"""同步写入一行 JSON 到文件,并更新索引。"""
# 追加写入
with open(filepath, "a", encoding="utf-8") as f:
f.write(line + "\n")
# 更新索引
date_dir = filepath.parent
filename = filepath.name
self._update_index(date_dir, filename, record_count_delta=1)
# ──────────────────────────────────────────────
# 单例访问
# ──────────────────────────────────────────────
_writer_instance: TraceWriter | None = None
def get_trace_writer() -> TraceWriter:
"""获取全局 TraceWriter 单例。"""
global _writer_instance
if _writer_instance is None:
_writer_instance = TraceWriter()
return _writer_instance