Files
Neo-ZQYY/apps/backend/app/trace/coverage.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

241 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Trace 覆盖率扫描器
通过文件内容搜索grep 式)检测 trace 覆盖情况,不使用 AST 解析。
扫描维度路由覆盖、Service 覆盖、Job 覆盖、SSE/WS 覆盖。
扫描结果缓存在内存中,支持手动刷新。
"""
from __future__ import annotations
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# 后端 app 目录(相对于项目根目录)
_APP_DIR = Path(__file__).resolve().parent.parent # apps/backend/app/
# ── 模块级缓存 ────────────────────────────────────────────
_cached_result: dict[str, Any] | None = None
def get_cached_coverage() -> dict[str, Any] | None:
"""返回缓存的覆盖率扫描结果,无缓存时返回 None。"""
return _cached_result
def run_coverage_scan() -> dict[str, Any]:
"""执行覆盖率扫描并更新缓存,返回扫描结果。"""
global _cached_result
result: dict[str, Any] = {
"scan_time": datetime.now().isoformat(),
"routes": _scan_routes(),
"services": _scan_services(),
"jobs": _scan_jobs(),
"sse_endpoints": _scan_sse(),
"ws_endpoints": _scan_ws(),
}
_cached_result = result
return result
# ── 路由覆盖扫描 ──────────────────────────────────────────
# 匹配 @router.get/post/put/delete/patch 装饰器
_ROUTE_DECORATOR_RE = re.compile(r"@router\.(get|post|put|delete|patch)\(")
# 匹配紧跟装饰器后的 async def / def 函数名
_FUNC_DEF_RE = re.compile(r"(?:async\s+)?def\s+(\w+)\s*\(")
def _scan_routes() -> dict[str, Any]:
"""扫描 xcx_*.py 路由文件,统计路由函数数量。
覆盖判定xcx_* 路由文件中的路由函数默认被 TraceMiddleware 覆盖
(因为中间件拦截 /api/xcx/ 前缀)。
"""
routers_dir = _APP_DIR / "routers"
total_routes: list[str] = []
covered_routes: list[str] = []
uncovered_routes: list[str] = []
if not routers_dir.exists():
return {"total": 0, "covered": 0, "uncovered": []}
for py_file in sorted(routers_dir.glob("xcx_*.py")):
module_name = py_file.stem
content = py_file.read_text(encoding="utf-8")
lines = content.splitlines()
# 查找路由装饰器后面的函数名
i = 0
while i < len(lines):
if _ROUTE_DECORATOR_RE.search(lines[i]):
# 向下查找函数定义(可能隔几行装饰器)
for j in range(i, min(i + 5, len(lines))):
m = _FUNC_DEF_RE.search(lines[j])
if m:
func_name = m.group(1)
route_id = f"{module_name}.{func_name}"
total_routes.append(route_id)
# xcx_* 路由默认被 TraceMiddleware 覆盖
covered_routes.append(route_id)
break
i += 1
return {
"total": len(total_routes),
"covered": len(covered_routes),
"uncovered": uncovered_routes,
}
# ── Service 覆盖扫描 ─────────────────────────────────────
def _scan_services() -> dict[str, Any]:
"""扫描 services/*.py 中的公开函数,检查是否有 @trace_service 装饰器。"""
services_dir = _APP_DIR / "services"
total_funcs: list[str] = []
covered_funcs: list[str] = []
uncovered_funcs: list[str] = []
if not services_dir.exists():
return {"total": 0, "covered": 0, "uncovered": []}
for py_file in sorted(services_dir.rglob("*.py")):
if py_file.name.startswith("_"):
continue
module_name = py_file.stem
content = py_file.read_text(encoding="utf-8")
lines = content.splitlines()
has_trace_import = "trace_service" in content
i = 0
while i < len(lines):
line = lines[i].rstrip()
# 匹配模块级公开函数(不以空格开头、非 _ 开头)
m = re.match(r"^(?:async\s+)?def\s+([a-zA-Z]\w*)\s*\(", line)
if m:
func_name = m.group(1)
func_id = f"{module_name}.{func_name}"
total_funcs.append(func_id)
# 检查前面几行是否有 @trace_service
decorated = False
if has_trace_import:
for k in range(max(0, i - 5), i):
if "@trace_service" in lines[k]:
decorated = True
break
if decorated:
covered_funcs.append(func_id)
else:
uncovered_funcs.append(func_id)
i += 1
return {
"total": len(total_funcs),
"covered": len(covered_funcs),
"uncovered": uncovered_funcs,
}
# ── Job 覆盖扫描 ─────────────────────────────────────────
# 4 个已知 job handler
_KNOWN_JOBS = [
"task_generator",
"task_expiry_check",
"recall_completion_check",
"note_reclassify_backfill",
]
def _scan_jobs() -> dict[str, Any]:
"""检查 main.py 中 4 个 job handler 是否被 trace_job 包装。"""
main_py = _APP_DIR / "main.py"
covered: list[str] = []
uncovered: list[str] = []
if not main_py.exists():
return {"total": len(_KNOWN_JOBS), "covered": 0, "uncovered": _KNOWN_JOBS[:]}
content = main_py.read_text(encoding="utf-8")
for job_name in _KNOWN_JOBS:
# 检查是否有 trace_job("job_name") 包装
if f'trace_job("{job_name}")' in content:
covered.append(job_name)
else:
uncovered.append(job_name)
return {
"total": len(_KNOWN_JOBS),
"covered": len(covered),
"uncovered": uncovered,
}
# ── SSE 覆盖扫描 ─────────────────────────────────────────
def _scan_sse() -> dict[str, Any]:
"""检查 SSE 端点xcx_chat.py是否集成了 sse_wrapper。"""
chat_file = _APP_DIR / "routers" / "xcx_chat.py"
total = 1 # 目前只有 xcx_chat 一个 SSE 端点
covered: list[str] = []
uncovered: list[str] = []
if not chat_file.exists():
return {"total": total, "covered": 0, "uncovered": ["xcx_chat.chat_stream"]}
content = chat_file.read_text(encoding="utf-8")
if "sse_wrapper" in content or "from app.trace.sse_wrapper" in content:
covered.append("xcx_chat.chat_stream")
else:
uncovered.append("xcx_chat.chat_stream")
return {
"total": total,
"covered": len(covered),
"uncovered": uncovered,
}
# ── WebSocket 覆盖扫描 ───────────────────────────────────
def _scan_ws() -> dict[str, Any]:
"""检查 WebSocket 端点ws/logs.py是否集成了 ws_wrapper。"""
ws_file = _APP_DIR / "ws" / "logs.py"
total = 1 # 目前只有 ws_logs 一个 WS 端点
covered: list[str] = []
uncovered: list[str] = []
if not ws_file.exists():
return {"total": total, "covered": 0, "uncovered": ["ws_logs.ws_logs"]}
content = ws_file.read_text(encoding="utf-8")
if "ws_wrapper" in content or "from app.trace.ws_wrapper" in content:
covered.append("ws_logs.ws_logs")
else:
uncovered.append("ws_logs.ws_logs")
return {
"total": total,
"covered": len(covered),
"uncovered": uncovered,
}