Files
Neo-ZQYY/apps/backend/app/trace/job_wrapper.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

128 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
后台 Job 执行追踪包装器
提供 trace_job() 包装函数,在 lifespan 注册的 job handler 外层包裹,
追踪后台任务执行全过程JOB_START → 内部 span → JOB_END / JOB_ERROR
Job trace 使用独立的 request_idjob_ 前缀),写入同一日志文件。
内部的 SERVICE / DB_QUERY span 通过 contextvars 自动关联到 job trace。
"""
from __future__ import annotations
import functools
import time
import traceback
from datetime import datetime
from typing import Any, Callable
from app.trace.config import get_trace_config
from app.trace.context import (
SpanType,
TraceSpan,
create_job_trace,
set_current_trace,
trace_context_var,
)
from app.trace.writer import TraceWriter
def trace_job(job_name: str) -> Callable:
"""装饰器:包装 job handler追踪后台任务执行。
用法::
@trace_job("task_generator")
def my_handler(**kwargs):
...
或在 register_job 时包装::
register_job("task_generator", trace_job("task_generator")(lambda **_kw: task_generator.run()))
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
cfg = get_trace_config()
if not cfg.enabled:
return func(*args, **kwargs)
# 创建 job trace 上下文
ctx = create_job_trace(job_name)
token = set_current_trace(ctx)
# JOB_START span
ctx.add_span(TraceSpan(
span_type=SpanType.JOB_START,
module="trace.job_wrapper",
function=f"trace_job({job_name})",
description_zh=f"后台任务开始: {job_name}",
description_en=f"Background job started: {job_name}",
params={"job_name": job_name},
result_summary="",
duration_ms=0.0,
timestamp=datetime.now().isoformat(),
))
t0 = time.perf_counter()
try:
result = func(*args, **kwargs)
duration_ms = (time.perf_counter() - t0) * 1000
# JOB_END span
ctx.add_span(TraceSpan(
span_type=SpanType.JOB_END,
module="trace.job_wrapper",
function=f"trace_job({job_name})",
description_zh=f"后台任务完成: {job_name}, 耗时={duration_ms:.0f}ms",
description_en=f"Background job completed: {job_name}, duration={duration_ms:.0f}ms",
params={"job_name": job_name},
result_summary=f"completed in {duration_ms:.0f}ms",
duration_ms=duration_ms,
timestamp=datetime.now().isoformat(),
))
return result
except Exception as exc:
duration_ms = (time.perf_counter() - t0) * 1000
tb_lines = traceback.format_exception(type(exc), exc, exc.__traceback__)
stack_summary = "".join(tb_lines[-5:])
# JOB_ERROR span
ctx.add_span(TraceSpan(
span_type=SpanType.JOB_ERROR,
module="trace.job_wrapper",
function=f"trace_job({job_name})",
description_zh=f"后台任务异常: {job_name}, {type(exc).__name__}: {exc}",
description_en=f"Background job error: {job_name}, {type(exc).__name__}: {exc}",
params={"job_name": job_name, "error_type": type(exc).__name__},
result_summary=f"{type(exc).__name__}: {exc}",
duration_ms=duration_ms,
timestamp=datetime.now().isoformat(),
extra={"stack_summary": stack_summary},
))
raise
finally:
# 同步写入日志文件job handler 在同步上下文中执行)
try:
import json
from app.trace.writer import serialize_trace, get_log_file
writer = TraceWriter()
data = serialize_trace(ctx)
line = json.dumps(data, ensure_ascii=False, default=str)
dt = ctx.start_time if isinstance(ctx.start_time, datetime) else datetime.now()
filepath = get_log_file(dt, base_dir=writer.base_dir)
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath = writer._rotate_if_needed(filepath)
writer._sync_write(filepath, line)
except Exception:
pass # 写入失败不影响业务
# 恢复 contextvars
trace_context_var.set(None)
return wrapper
return decorator