包含多个会话的累积代码变更: - backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔 - admin-web: ETL 状态页、任务管理、调度配置、登录优化 - miniprogram: 看板页面、聊天集成、UI 组件、导航更新 - etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强 - tenant-admin: 项目初始化 - db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8) - packages/shared: 枚举和工具函数更新 - tools: 数据库工具、报表生成、健康检查 - docs: PRD/架构/部署/合约文档更新 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
302 lines
11 KiB
Python
302 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
JSON Lines 日志写入器
|
||
|
||
负责将 TraceContext 序列化为 JSON 并追加写入 .jsonl 文件。
|
||
按日期分目录(YYYY-MM-DD/),按小时分文件(trace_YYYY-MM-DD_HH.jsonl),
|
||
单文件超过 10MB 自动轮转(trace_YYYY-MM-DD_HH_NNN.jsonl)。
|
||
维护 _index.json 索引文件(文件列表、记录数、文件大小)。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from app.trace.config import get_trace_config
|
||
from app.trace.context import TraceContext, TraceSpan, SpanType
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 单文件大小上限(10MB)
|
||
MAX_FILE_SIZE = 10 * 1024 * 1024
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 序列化函数
|
||
# ──────────────────────────────────────────────
|
||
|
||
def serialize_span(span: TraceSpan) -> dict[str, Any]:
|
||
"""将 TraceSpan 转为可 JSON 序列化的 dict。
|
||
|
||
包含所有必需字段:span_type, module, function, description_zh,
|
||
description_en, params, result_summary, duration_ms, timestamp, extra。
|
||
"""
|
||
return {
|
||
"span_type": span.span_type,
|
||
"module": span.module,
|
||
"function": span.function,
|
||
"description_zh": span.description_zh,
|
||
"description_en": span.description_en,
|
||
"params": span.params,
|
||
"result_summary": span.result_summary,
|
||
"duration_ms": span.duration_ms,
|
||
"timestamp": span.timestamp,
|
||
"extra": span.extra,
|
||
}
|
||
|
||
|
||
def serialize_trace(ctx: TraceContext) -> dict[str, Any]:
|
||
"""将 TraceContext 转为可 JSON 序列化的 dict。
|
||
|
||
包含完整字段:request_id, trace_type, timestamp, method, path,
|
||
status_code, total_duration_ms, user_id, site_id,
|
||
db_query_count, db_total_ms, error, spans。
|
||
|
||
衍生字段从 spans 列表计算:
|
||
- status_code: 从 HTTP_OUT span 的 extra 或 result_summary 提取
|
||
- total_duration_ms: 从 HTTP_OUT span 的 duration_ms 获取
|
||
- db_query_count: 统计 DB_QUERY span 数量
|
||
- db_total_ms: 累加 DB_QUERY span 的 duration_ms
|
||
- error: 从 ERROR/DB_ERROR span 提取错误信息
|
||
"""
|
||
spans = ctx.spans
|
||
|
||
# 衍生字段:status_code — 从 HTTP_OUT span 提取
|
||
status_code: int | None = None
|
||
total_duration_ms: float = 0.0
|
||
for s in spans:
|
||
if s.span_type == SpanType.HTTP_OUT:
|
||
# 优先从 extra 中取 status_code
|
||
status_code = s.extra.get("status_code")
|
||
if status_code is None:
|
||
# 尝试从 result_summary 解析,如 "200 OK, 3.2KB body"
|
||
summary = s.result_summary or ""
|
||
parts = summary.split()
|
||
if parts and parts[0].isdigit():
|
||
status_code = int(parts[0])
|
||
total_duration_ms = s.duration_ms
|
||
break
|
||
|
||
# 衍生字段:db_query_count, db_total_ms
|
||
db_query_count = 0
|
||
db_total_ms = 0.0
|
||
for s in spans:
|
||
if s.span_type == SpanType.DB_QUERY:
|
||
db_query_count += 1
|
||
db_total_ms += s.duration_ms
|
||
|
||
# 衍生字段:error — 从 ERROR/DB_ERROR span 提取
|
||
error: str | None = None
|
||
for s in spans:
|
||
if s.span_type in (SpanType.ERROR, SpanType.DB_ERROR):
|
||
# 拼接错误信息:优先 result_summary,其次 description_zh
|
||
error = s.result_summary or s.description_zh or s.description_en
|
||
break
|
||
|
||
return {
|
||
"request_id": ctx.request_id,
|
||
"trace_type": ctx.trace_type,
|
||
"timestamp": ctx.start_time.isoformat() if isinstance(ctx.start_time, datetime) else str(ctx.start_time),
|
||
"method": ctx.method,
|
||
"path": ctx.path,
|
||
"status_code": status_code,
|
||
"total_duration_ms": total_duration_ms,
|
||
"user_id": ctx.user_id,
|
||
"site_id": ctx.site_id,
|
||
"db_query_count": db_query_count,
|
||
"db_total_ms": round(db_total_ms, 2),
|
||
"error": error,
|
||
"spans": [serialize_span(s) for s in spans],
|
||
}
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 路径生成函数
|
||
# ──────────────────────────────────────────────
|
||
|
||
def get_log_dir(dt: datetime, base_dir: str | None = None) -> Path:
|
||
"""返回日期目录路径:{base_dir}/{YYYY-MM-DD}/。
|
||
|
||
Args:
|
||
dt: 时间戳,用于确定日期目录名
|
||
base_dir: 日志根目录,默认从 TraceConfig 读取
|
||
"""
|
||
if base_dir is None:
|
||
base_dir = get_trace_config().log_dir
|
||
date_str = dt.strftime("%Y-%m-%d")
|
||
return Path(base_dir) / date_str
|
||
|
||
|
||
def get_log_file(dt: datetime, base_dir: str | None = None) -> Path:
|
||
"""返回当前小时的日志文件路径:{base_dir}/{YYYY-MM-DD}/trace_YYYY-MM-DD_HH.jsonl。
|
||
|
||
Args:
|
||
dt: 时间戳,用于确定日期目录和小时文件名
|
||
base_dir: 日志根目录,默认从 TraceConfig 读取
|
||
"""
|
||
date_dir = get_log_dir(dt, base_dir=base_dir)
|
||
filename = f"trace_{dt.strftime('%Y-%m-%d_%H')}.jsonl"
|
||
return date_dir / filename
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# TraceWriter 类
|
||
# ──────────────────────────────────────────────
|
||
|
||
class TraceWriter:
|
||
"""JSON Lines 日志写入器。
|
||
|
||
负责将 TraceContext 序列化为 JSON 并追加写入 .jsonl 文件。
|
||
支持按日期分目录、按小时分文件、超 10MB 自动轮转、索引维护。
|
||
"""
|
||
|
||
def __init__(self, base_dir: str | None = None):
|
||
"""初始化写入器。
|
||
|
||
Args:
|
||
base_dir: 日志根目录,默认从 TraceConfig 读取
|
||
"""
|
||
self._base_dir = base_dir
|
||
|
||
@property
|
||
def base_dir(self) -> str:
|
||
"""获取日志根目录。"""
|
||
if self._base_dir is not None:
|
||
return self._base_dir
|
||
return get_trace_config().log_dir
|
||
|
||
def _rotate_if_needed(self, filepath: Path) -> Path:
|
||
"""检查文件大小,需要时返回轮转后的新路径。
|
||
|
||
如果文件不存在或未超过 10MB,返回原路径。
|
||
如果超过 10MB,返回带序号后缀的新路径(trace_YYYY-MM-DD_HH_001.jsonl)。
|
||
"""
|
||
if not filepath.exists():
|
||
return filepath
|
||
|
||
if filepath.stat().st_size < MAX_FILE_SIZE:
|
||
return filepath
|
||
|
||
# 需要轮转:查找下一个可用序号
|
||
stem = filepath.stem # trace_YYYY-MM-DD_HH
|
||
parent = filepath.parent
|
||
|
||
# 如果当前文件名已经带序号(如 trace_2026-03-22_14_001),
|
||
# 提取基础名称
|
||
parts = stem.rsplit("_", 1)
|
||
if len(parts) == 2 and parts[1].isdigit() and len(parts[1]) == 3:
|
||
base_stem = parts[0]
|
||
else:
|
||
base_stem = stem
|
||
|
||
# 查找已有的轮转文件,确定下一个序号
|
||
seq = 1
|
||
while True:
|
||
candidate = parent / f"{base_stem}_{seq:03d}.jsonl"
|
||
if not candidate.exists() or candidate.stat().st_size < MAX_FILE_SIZE:
|
||
return candidate
|
||
seq += 1
|
||
|
||
def _update_index(self, date_dir: Path, filename: str, record_count_delta: int) -> None:
|
||
"""更新索引文件 _index.json。
|
||
|
||
索引结构:
|
||
{
|
||
"files": {
|
||
"trace_2026-03-22_14.jsonl": {
|
||
"record_count": 42,
|
||
"file_size": 12345
|
||
}
|
||
}
|
||
}
|
||
"""
|
||
index_path = date_dir / "_index.json"
|
||
|
||
# 读取现有索引
|
||
index: dict[str, Any] = {"files": {}}
|
||
if index_path.exists():
|
||
try:
|
||
index = json.loads(index_path.read_text(encoding="utf-8"))
|
||
except (json.JSONDecodeError, OSError):
|
||
index = {"files": {}}
|
||
|
||
if "files" not in index:
|
||
index["files"] = {}
|
||
|
||
# 更新文件条目
|
||
file_entry = index["files"].get(filename, {"record_count": 0, "file_size": 0})
|
||
file_entry["record_count"] = file_entry.get("record_count", 0) + record_count_delta
|
||
|
||
# 更新实际文件大小
|
||
file_path = date_dir / filename
|
||
if file_path.exists():
|
||
file_entry["file_size"] = file_path.stat().st_size
|
||
else:
|
||
file_entry["file_size"] = 0
|
||
|
||
index["files"][filename] = file_entry
|
||
|
||
# 写回索引
|
||
index_path.write_text(
|
||
json.dumps(index, ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
async def write_trace(self, ctx: TraceContext) -> None:
|
||
"""异步写入 TraceContext 到 JSON Lines 文件。
|
||
|
||
写入失败时静默处理(仅记录日志),不影响请求处理。
|
||
"""
|
||
try:
|
||
# 序列化
|
||
data = serialize_trace(ctx)
|
||
line = json.dumps(data, ensure_ascii=False, default=str)
|
||
|
||
# 确定写入路径
|
||
dt = ctx.start_time if isinstance(ctx.start_time, datetime) else datetime.now()
|
||
filepath = get_log_file(dt, base_dir=self.base_dir)
|
||
|
||
# 确保目录存在
|
||
filepath.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 检查是否需要轮转
|
||
filepath = self._rotate_if_needed(filepath)
|
||
|
||
# 异步写入(在线程池中执行 IO 操作)
|
||
await asyncio.to_thread(self._sync_write, filepath, line)
|
||
|
||
except Exception:
|
||
# 写入失败不影响请求处理,仅记录日志
|
||
logger.warning("Trace 日志写入失败", exc_info=True)
|
||
|
||
def _sync_write(self, filepath: Path, line: str) -> None:
|
||
"""同步写入一行 JSON 到文件,并更新索引。"""
|
||
# 追加写入
|
||
with open(filepath, "a", encoding="utf-8") as f:
|
||
f.write(line + "\n")
|
||
|
||
# 更新索引
|
||
date_dir = filepath.parent
|
||
filename = filepath.name
|
||
self._update_index(date_dir, filename, record_count_delta=1)
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# 单例访问
|
||
# ──────────────────────────────────────────────
|
||
|
||
_writer_instance: TraceWriter | None = None
|
||
|
||
|
||
def get_trace_writer() -> TraceWriter:
|
||
"""获取全局 TraceWriter 单例。"""
|
||
global _writer_instance
|
||
if _writer_instance is None:
|
||
_writer_instance = TraceWriter()
|
||
return _writer_instance
|