# -*- coding: utf-8 -*- """ Trace 覆盖率扫描器 通过文件内容搜索(grep 式)检测 trace 覆盖情况,不使用 AST 解析。 扫描维度:路由覆盖、Service 覆盖、Job 覆盖、SSE/WS 覆盖。 扫描结果缓存在内存中,支持手动刷新。 """ from __future__ import annotations import logging import re from datetime import datetime from pathlib import Path from typing import Any logger = logging.getLogger(__name__) # 后端 app 目录(相对于项目根目录) _APP_DIR = Path(__file__).resolve().parent.parent # apps/backend/app/ # ── 模块级缓存 ──────────────────────────────────────────── _cached_result: dict[str, Any] | None = None def get_cached_coverage() -> dict[str, Any] | None: """返回缓存的覆盖率扫描结果,无缓存时返回 None。""" return _cached_result def run_coverage_scan() -> dict[str, Any]: """执行覆盖率扫描并更新缓存,返回扫描结果。""" global _cached_result result: dict[str, Any] = { "scan_time": datetime.now().isoformat(), "routes": _scan_routes(), "services": _scan_services(), "jobs": _scan_jobs(), "sse_endpoints": _scan_sse(), "ws_endpoints": _scan_ws(), } _cached_result = result return result # ── 路由覆盖扫描 ────────────────────────────────────────── # 匹配 @router.get/post/put/delete/patch 装饰器 _ROUTE_DECORATOR_RE = re.compile(r"@router\.(get|post|put|delete|patch)\(") # 匹配紧跟装饰器后的 async def / def 函数名 _FUNC_DEF_RE = re.compile(r"(?:async\s+)?def\s+(\w+)\s*\(") def _scan_routes() -> dict[str, Any]: """扫描 xcx_*.py 路由文件,统计路由函数数量。 覆盖判定:xcx_* 路由文件中的路由函数默认被 TraceMiddleware 覆盖 (因为中间件拦截 /api/xcx/ 前缀)。 """ routers_dir = _APP_DIR / "routers" total_routes: list[str] = [] covered_routes: list[str] = [] uncovered_routes: list[str] = [] if not routers_dir.exists(): return {"total": 0, "covered": 0, "uncovered": []} for py_file in sorted(routers_dir.glob("xcx_*.py")): module_name = py_file.stem content = py_file.read_text(encoding="utf-8") lines = content.splitlines() # 查找路由装饰器后面的函数名 i = 0 while i < len(lines): if _ROUTE_DECORATOR_RE.search(lines[i]): # 向下查找函数定义(可能隔几行装饰器) for j in range(i, min(i + 5, len(lines))): m = _FUNC_DEF_RE.search(lines[j]) if m: func_name = m.group(1) route_id = f"{module_name}.{func_name}" total_routes.append(route_id) # xcx_* 路由默认被 TraceMiddleware 覆盖 covered_routes.append(route_id) break i += 1 return { "total": len(total_routes), "covered": len(covered_routes), "uncovered": uncovered_routes, } # ── Service 覆盖扫描 ───────────────────────────────────── def _scan_services() -> dict[str, Any]: """扫描 services/*.py 中的公开函数,检查是否有 @trace_service 装饰器。""" services_dir = _APP_DIR / "services" total_funcs: list[str] = [] covered_funcs: list[str] = [] uncovered_funcs: list[str] = [] if not services_dir.exists(): return {"total": 0, "covered": 0, "uncovered": []} for py_file in sorted(services_dir.rglob("*.py")): if py_file.name.startswith("_"): continue module_name = py_file.stem content = py_file.read_text(encoding="utf-8") lines = content.splitlines() has_trace_import = "trace_service" in content i = 0 while i < len(lines): line = lines[i].rstrip() # 匹配模块级公开函数(不以空格开头、非 _ 开头) m = re.match(r"^(?:async\s+)?def\s+([a-zA-Z]\w*)\s*\(", line) if m: func_name = m.group(1) func_id = f"{module_name}.{func_name}" total_funcs.append(func_id) # 检查前面几行是否有 @trace_service decorated = False if has_trace_import: for k in range(max(0, i - 5), i): if "@trace_service" in lines[k]: decorated = True break if decorated: covered_funcs.append(func_id) else: uncovered_funcs.append(func_id) i += 1 return { "total": len(total_funcs), "covered": len(covered_funcs), "uncovered": uncovered_funcs, } # ── Job 覆盖扫描 ───────────────────────────────────────── # 4 个已知 job handler _KNOWN_JOBS = [ "task_generator", "task_expiry_check", "recall_completion_check", "note_reclassify_backfill", ] def _scan_jobs() -> dict[str, Any]: """检查 main.py 中 4 个 job handler 是否被 trace_job 包装。""" main_py = _APP_DIR / "main.py" covered: list[str] = [] uncovered: list[str] = [] if not main_py.exists(): return {"total": len(_KNOWN_JOBS), "covered": 0, "uncovered": _KNOWN_JOBS[:]} content = main_py.read_text(encoding="utf-8") for job_name in _KNOWN_JOBS: # 检查是否有 trace_job("job_name") 包装 if f'trace_job("{job_name}")' in content: covered.append(job_name) else: uncovered.append(job_name) return { "total": len(_KNOWN_JOBS), "covered": len(covered), "uncovered": uncovered, } # ── SSE 覆盖扫描 ───────────────────────────────────────── def _scan_sse() -> dict[str, Any]: """检查 SSE 端点(xcx_chat.py)是否集成了 sse_wrapper。""" chat_file = _APP_DIR / "routers" / "xcx_chat.py" total = 1 # 目前只有 xcx_chat 一个 SSE 端点 covered: list[str] = [] uncovered: list[str] = [] if not chat_file.exists(): return {"total": total, "covered": 0, "uncovered": ["xcx_chat.chat_stream"]} content = chat_file.read_text(encoding="utf-8") if "sse_wrapper" in content or "from app.trace.sse_wrapper" in content: covered.append("xcx_chat.chat_stream") else: uncovered.append("xcx_chat.chat_stream") return { "total": total, "covered": len(covered), "uncovered": uncovered, } # ── WebSocket 覆盖扫描 ─────────────────────────────────── def _scan_ws() -> dict[str, Any]: """检查 WebSocket 端点(ws/logs.py)是否集成了 ws_wrapper。""" ws_file = _APP_DIR / "ws" / "logs.py" total = 1 # 目前只有 ws_logs 一个 WS 端点 covered: list[str] = [] uncovered: list[str] = [] if not ws_file.exists(): return {"total": total, "covered": 0, "uncovered": ["ws_logs.ws_logs"]} content = ws_file.read_text(encoding="utf-8") if "ws_wrapper" in content or "from app.trace.ws_wrapper" in content: covered.append("ws_logs.ws_logs") else: uncovered.append("ws_logs.ws_logs") return { "total": total, "covered": len(covered), "uncovered": uncovered, }