Files
Neo-ZQYY/apps/backend/app/routers/xcx_ai_cache.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

62 lines
1.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
小程序 AI 缓存查询路由 —— 查询各 AI 应用的最新缓存结果。
端点清单:
- GET /api/ai/cache/{cache_type}?target_id=xxx — 查询最新缓存
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.ai.cache_service import AICacheService
from app.ai.schemas import CacheTypeEnum
from app.auth.dependencies import CurrentUser, get_current_user
from app.trace.decorators import trace_service
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/ai", tags=["小程序 AI 缓存"])
@router.get("/cache/{cache_type}")
@trace_service("查询 AI 缓存", "Get AI cache")
async def get_ai_cache(
cache_type: str,
target_id: str = Query(..., description="目标 IDmember_id / assistant_id_member_id / 时间维度编码)"),
user: CurrentUser = Depends(get_current_user),
):
"""查询指定类型的最新 AI 缓存结果。
site_id 从 JWT 提取,强制过滤,确保门店隔离。
"""
# 校验 cache_type 合法性
valid_types = {e.value for e in CacheTypeEnum}
if cache_type not in valid_types:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"无效的 cache_type: {cache_type},合法值: {sorted(valid_types)}",
)
cache_svc = AICacheService()
result = cache_svc.get_latest(
cache_type=cache_type,
site_id=user.site_id,
target_id=target_id,
)
if result is None:
return None
return {
"id": result.get("id"),
"cache_type": result.get("cache_type"),
"target_id": result.get("target_id"),
"result_json": result.get("result_json"),
"score": result.get("score"),
"created_at": result.get("created_at"),
}