Files
Neo-ZQYY/apps/backend/app/routers/internal_ai.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

140 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
内部 AI 触发 API — ETL/内部服务调用入口。
端点:
- POST /api/internal/ai/trigger — 接收事件触发请求,异步执行 AI 调用链
认证方式Authorization: Internal-Token {token}
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel, Field
from app.ai.config import AIConfig
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/internal/ai", tags=["internal-ai"])
# ── 请求/响应模型 ────────────────────────────────────────────
class TriggerRequest(BaseModel):
"""内部触发请求体。"""
event_type: str = Field(..., description="事件类型: consumption / dws_completed / note_created / task_assigned")
connector_type: str = Field("feiqiu", description="连接器类型")
site_id: int = Field(..., description="门店 ID")
member_id: int | None = Field(None, description="会员 ID可选")
payload: dict | None = Field(None, description="附加数据")
is_forced: bool = Field(False, description="是否强制执行(跳过去重)")
class TriggerResponse(BaseModel):
"""触发响应。"""
trigger_job_id: int
status: str = "pending"
# ── 认证依赖 ─────────────────────────────────────────────────
def verify_internal_token(authorization: str = Header(...)) -> str:
"""校验 Internal-Token 认证。
Header 格式Authorization: Internal-Token {token}
token 不匹配或缺失时返回 HTTP 401。
"""
prefix = "Internal-Token "
if not authorization.startswith(prefix):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证格式,需要 Internal-Token",
)
token = authorization[len(prefix):]
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 不能为空",
)
# 从环境变量加载期望 token
try:
config = AIConfig.from_env()
except ValueError:
logger.error("AIConfig 加载失败,无法校验 internal token")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="AI 配置异常",
)
if token != config.internal_api_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token 不匹配",
)
return token
# ── 端点 ─────────────────────────────────────────────────────
@router.post("/trigger", response_model=TriggerResponse)
async def trigger_ai_event(
body: TriggerRequest,
_token: str = Depends(verify_internal_token),
) -> TriggerResponse:
"""接收 ETL/内部事件,写 ai_trigger_jobs 后异步执行。
立即返回 trigger_job_id调用链在后台异步执行。
"""
from app.ai.dispatcher import AIDispatcher, TriggerEvent
# 构建触发事件
event = TriggerEvent(
event_type=body.event_type,
site_id=body.site_id,
member_id=body.member_id,
connector_type=body.connector_type,
payload=body.payload or {},
is_forced=body.is_forced,
)
# 获取 dispatcher 实例并触发
# 延迟导入避免循环依赖dispatcher 实例由应用启动时创建
dispatcher = _get_dispatcher()
job_id = await dispatcher.handle_trigger(event)
return TriggerResponse(trigger_job_id=job_id, status="pending")
# ── 辅助函数 ─────────────────────────────────────────────────
# 全局 dispatcher 实例(应用启动时初始化)
_dispatcher_instance: AIDispatcher | None = None
def set_dispatcher(dispatcher: "AIDispatcher") -> None:
"""设置全局 dispatcher 实例(应用启动时调用)。"""
global _dispatcher_instance
_dispatcher_instance = dispatcher
def _get_dispatcher() -> "AIDispatcher":
"""获取全局 dispatcher 实例。"""
if _dispatcher_instance is None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="AI Dispatcher 尚未初始化",
)
return _dispatcher_instance