Files
Neo-ZQYY/apps/backend/app/routers/admin_ai.py
Neo 421e193041 fix(ai): F1-5a 沙箱 batch-run 接入 runtime_context (W1 / 阶段 A 主体)
Neo F1-5 反馈: "让沙箱起到其真正的作用. 真正的模拟日期, 仅能看到沙箱设定日期
及之前日期的数据, 并运行 AI 的各个业务."

调研发现 (4 个并行子代理): batch-run 端点 _run_batch 是空壳 stub
(只 logger.info, 实际不跑 AI), GUC apply_runtime_session_vars 0 处调用
(dead code), 7 张业务表 6 张有 runtime 复合索引唯独 ai_run_logs 漏建,
App2/2a 3 行 _calc_date_range 漏传 ref_date.

本 commit (F1-5a 阶段 A 主体, F1-5b 后续完整 zqyy_app RLS 视图层):

后端核心:
- admin_service.py: _run_batch 真实化 (Semaphore(5)+asyncio.gather+
  return_exceptions=True+ctx_snapshot 防漂移); estimate 入口抓
  RuntimeContext 快照, confirm 取出传给 worker
- admin_ai.py: confirm_batch_run lazy 注入 dispatcher
- admin_service.retry_trigger_job: INSERT 落 runtime_mode +
  sandbox_instance_id 列 (用 runtime_insert_columns helper)
- runtime_context.py: get_runtime_context 加 bind_to_session 参数,
  激活 GUC app.current_business_date / app.current_runtime_mode
- run_log_service.create_log: 启用 bind_to_session=True 试点

App2/2a 3 行 ref_date 修复:
- app2_finance_prompt.py:817 储值卡余额变化板块
- app2_finance_prompt.py:841 日粒度 series + 异常检测窗口
- app2a_finance_area_prompt.py:466 区域日粒度 series

DB:
- migrations/20260505__ai_run_logs_runtime_index.sql:
  补 (site_id, runtime_mode, sandbox_instance_id, created_at DESC) 复合索引

前端:
- AIOperations.tsx: 顶部加 sandbox 模式提示条 (Alert 显示 sandbox_date +
  sandbox_instance_id + 影响范围 + 切回 live 入口)

未做 (留 F1-5b 完整 zqyy_app RLS 视图层一并):
- B1 admin_service 6 处 CURRENT_DATE -> business_date
- B2 fdw_queries 异常分支兜底
- GUC 完整传递 (fdw_queries / page_context 等)
- 测试 3 套 (.gitignore:71 排除, F2-2 入仓时 commit)
- P20 SPEC \xa76/\xa710/\xa711/\xa715 (F1-5b 完整收口后同步更准确)

Neo 决策: docs/_overview/wave1-findings/F1-5-impl-decisions.md

详见 docs/audit/changes/2026-05-05__wave1_f1_5a_sandbox_batch_run.md
2026-05-05 03:01:48 +08:00

495 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
管理端 — AI 监控后台路由。
端点清单13 个,全部需要 JWT + admin 角色):
- GET /api/admin/ai/dashboard — 总览统计
- GET /api/admin/ai/trigger-jobs — 调度任务分页列表
- GET /api/admin/ai/trigger-jobs/{job_id} — 调度任务详情
- POST /api/admin/ai/trigger-jobs/{job_id}/retry — 手动重跑
- GET /api/admin/ai/run-logs — 调用记录分页列表
- GET /api/admin/ai/run-logs/{log_id} — 调用记录详情
- POST /api/admin/ai/cache/invalidate — 缓存失效
- GET /api/admin/ai/budget — Token 预算
- POST /api/admin/ai/batch-run — 创建批量执行(返回预估)
- POST /api/admin/ai/batch-run/confirm — 确认批量执行
- GET /api/admin/ai/alerts — 告警列表
- POST /api/admin/ai/alerts/{log_id}/ack — 确认告警
- POST /api/admin/ai/alerts/{log_id}/ignore — 忽略告警
需求: A1.1, A2.1, A2.4, A3.1, A4.1, A4.3, A5.1, A6.1, A7.1, A7.3, A8.1, A8.2, A8.3, A9.1, A9.2, A9.3
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.auth.dependencies import CurrentUser, get_current_user
from app.middleware.permission import require_permission # 保留给可能的其他依赖
from app.schemas.admin_ai import (
AlertActionResponse,
AlertListResponse,
BatchRunConfirm,
BatchRunConfirmResponse,
BatchRunEstimate,
BatchRunRequest,
BudgetResponse,
CacheInvalidateRequest,
CacheInvalidateResponse,
DashboardResponse,
ManualTriggerRequest,
ManualTriggerResponse,
PrewarmProgressResponse,
RetryResponse,
RunAppRequest,
RunAppResponse,
RunLogDetailResponse,
RunLogListResponse,
TriggerItem,
TriggerJobDetailResponse,
TriggerJobListResponse,
TriggerUpdateRequest,
)
from app.services.ai.admin_service import AdminAIService
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/admin/ai", tags=["admin-ai"])
# ── 模块级服务实例 ────────────────────────────────────────
_admin_svc = AdminAIService()
# ── 权限依赖 ──────────────────────────────────────────────
def _require_admin():
"""
管理端依赖:直接从 JWT 读 roles 判定是否 adminsite_admin / tenant_admin / super_admin
2026-04-21改为不依赖 auth.users.status 查询admin-web 登录用 admin_users 表,
与 require_permission 走的 auth.users 不是同一张表。status 实时校验通过 admin_users.is_active。
"""
async def _dependency(
user: CurrentUser = Depends(get_current_user),
) -> CurrentUser:
admin_roles = {"site_admin", "tenant_admin", "super_admin"}
if not admin_roles.intersection(user.roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="需要管理员权限site_admin / tenant_admin / super_admin",
)
# 实时校验 admin_users 表的 is_active若 user_id 在该表)
from app.database import get_connection
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT is_active FROM admin_users WHERE id = %s",
(user.user_id,),
)
row = cur.fetchone()
conn.commit()
finally:
conn.close()
# 在 admin_users 中找到且未激活 → 拒绝
if row is not None and not row[0]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="管理员账号已禁用",
)
# 不在 admin_users 中但 JWT 带 admin 角色(如 xcx 用户临时升权),也允许通过
return user
return _dependency
# ── Dashboard ─────────────────────────────────────────────
@router.get("/dashboard", response_model=DashboardResponse)
async def get_dashboard(
site_id: Optional[int] = Query(None, description="门店 ID 筛选"),
range_days: Optional[int] = Query(None, ge=1, le=365, description="回溯天数1=今日 / 3 / 7 / 10"),
date_from: Optional[str] = Query(None, description="起始日期 YYYY-MM-DD与 date_to 成对使用)"),
date_to: Optional[str] = Query(None, description="结束日期 YYYY-MM-DD"),
user: CurrentUser = Depends(_require_admin()),
) -> DashboardResponse:
"""总览统计(支持 site_id + 时间范围筛选)。"""
data = await _admin_svc.get_dashboard(
site_id=site_id,
range_days=range_days,
date_from=date_from,
date_to=date_to,
)
return DashboardResponse(**data)
# ── 调度任务 ──────────────────────────────────────────────
@router.get("/trigger-jobs", response_model=TriggerJobListResponse)
async def list_trigger_jobs(
event_type: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
site_id: Optional[int] = Query(None),
date_from: Optional[str] = Query(None, description="起始日期 YYYY-MM-DD"),
date_to: Optional[str] = Query(None, description="截止日期 YYYY-MM-DD"),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
user: CurrentUser = Depends(_require_admin()),
) -> TriggerJobListResponse:
"""调度任务分页列表。"""
filters: dict = {}
if event_type is not None:
filters["event_type"] = event_type
if status_filter is not None:
filters["status"] = status_filter
if site_id is not None:
filters["site_id"] = site_id
if date_from is not None:
filters["date_from"] = date_from
if date_to is not None:
filters["date_to"] = date_to
data = await _admin_svc.list_trigger_jobs(filters, page=page, page_size=page_size)
return TriggerJobListResponse(**data)
@router.get("/trigger-jobs/{job_id}", response_model=TriggerJobDetailResponse)
async def get_trigger_job(
job_id: int,
user: CurrentUser = Depends(_require_admin()),
) -> TriggerJobDetailResponse:
"""调度任务详情。"""
data = await _admin_svc.get_trigger_job(job_id)
if data is None:
raise HTTPException(status_code=404, detail="调度任务不存在")
return TriggerJobDetailResponse(**data)
@router.post("/trigger-jobs/{job_id}/retry", response_model=RetryResponse)
async def retry_trigger_job(
job_id: int,
user: CurrentUser = Depends(_require_admin()),
) -> RetryResponse:
"""手动重跑:创建新 trigger_jobis_forced=true异步执行。"""
try:
new_job_id = await _admin_svc.retry_trigger_job(job_id)
except ValueError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
return RetryResponse(trigger_job_id=new_job_id, status="pending")
# ── 调用记录 ──────────────────────────────────────────────
@router.get("/run-logs", response_model=RunLogListResponse)
async def list_run_logs(
app_type: Optional[str] = Query(None),
status_filter: Optional[str] = Query(None, alias="status"),
trigger_type: Optional[str] = Query(None),
site_id: Optional[int] = Query(None),
date_from: Optional[str] = Query(None, description="起始日期 YYYY-MM-DD"),
date_to: Optional[str] = Query(None, description="截止日期 YYYY-MM-DD"),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
user: CurrentUser = Depends(_require_admin()),
) -> RunLogListResponse:
"""调用记录分页列表。"""
filters: dict = {}
if app_type is not None:
filters["app_type"] = app_type
if status_filter is not None:
filters["status"] = status_filter
if trigger_type is not None:
filters["trigger_type"] = trigger_type
if site_id is not None:
filters["site_id"] = site_id
if date_from is not None:
filters["date_from"] = date_from
if date_to is not None:
filters["date_to"] = date_to
data = await _admin_svc.list_run_logs(filters, page=page, page_size=page_size)
return RunLogListResponse(**data)
@router.get("/run-logs/{log_id}", response_model=RunLogDetailResponse)
async def get_run_log(
log_id: int,
user: CurrentUser = Depends(_require_admin()),
) -> RunLogDetailResponse:
"""调用记录详情(含完整 prompt/response/error不脱敏"""
data = await _admin_svc.get_run_log(log_id)
if data is None:
raise HTTPException(status_code=404, detail="调用记录不存在")
return RunLogDetailResponse(**data)
# ── 缓存管理 ──────────────────────────────────────────────
@router.post("/cache/invalidate", response_model=CacheInvalidateResponse)
async def invalidate_cache(
body: CacheInvalidateRequest,
user: CurrentUser = Depends(_require_admin()),
) -> CacheInvalidateResponse:
"""批量缓存失效:将匹配条件的 ai_cache.status 设为 invalidated。"""
affected = await _admin_svc.invalidate_cache(
site_id=body.site_id,
app_type=body.app_type,
member_id=body.member_id,
)
return CacheInvalidateResponse(affected_count=affected)
# ── Token 预算 ────────────────────────────────────────────
@router.get("/budget", response_model=BudgetResponse)
async def get_budget(
user: CurrentUser = Depends(_require_admin()),
) -> BudgetResponse:
"""Token 预算使用情况:日/月已用量、上限、百分比。"""
data = await _admin_svc.get_budget()
return BudgetResponse(**data)
# ── 批量执行 ──────────────────────────────────────────────
@router.post("/batch-run", response_model=BatchRunEstimate)
async def create_batch_run(
body: BatchRunRequest,
user: CurrentUser = Depends(_require_admin()),
) -> BatchRunEstimate:
"""创建批量执行请求,返回预估(不立即执行)。"""
data = await _admin_svc.estimate_batch(
app_types=body.app_types,
member_ids=body.member_ids,
site_id=body.site_id,
)
return BatchRunEstimate(**data)
@router.post("/batch-run/confirm", response_model=BatchRunConfirmResponse)
async def confirm_batch_run(
body: BatchRunConfirm,
user: CurrentUser = Depends(_require_admin()),
) -> BatchRunConfirmResponse:
"""确认批量执行,后台异步执行。
F1-5a: lazy 注入 dispatcher 到 _admin_svc(首次调用时绑定),
避免模块加载顺序问题(dispatcher 通常在 lifespan startup 才初始化)。
"""
if _admin_svc._dispatcher is None:
try:
from app.ai.dispatcher import get_dispatcher
_admin_svc.set_dispatcher(get_dispatcher())
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=f"Dispatcher 未初始化: {exc}") from exc
try:
await _admin_svc.confirm_batch(batch_id=body.batch_id)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return BatchRunConfirmResponse(status="started")
# ── 告警管理 ──────────────────────────────────────────────
@router.get("/alerts", response_model=AlertListResponse)
async def list_alerts(
alert_status: Optional[str] = Query(None, description="pending / acknowledged / ignored"),
site_id: Optional[int] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
user: CurrentUser = Depends(_require_admin()),
) -> AlertListResponse:
"""告警列表ai_run_logs WHERE status IN ('failed','timeout','circuit_open'))。"""
data = await _admin_svc.list_alerts(
alert_status=alert_status,
site_id=site_id,
page=page,
page_size=page_size,
)
return AlertListResponse(**data)
@router.post("/alerts/{log_id}/ack", response_model=AlertActionResponse)
async def ack_alert(
log_id: int,
user: CurrentUser = Depends(_require_admin()),
) -> AlertActionResponse:
"""确认告警alert_status → acknowledged。"""
new_status = await _admin_svc.ack_alert(log_id)
return AlertActionResponse(id=log_id, alert_status=new_status)
@router.post("/alerts/{log_id}/ignore", response_model=AlertActionResponse)
async def ignore_alert(
log_id: int,
user: CurrentUser = Depends(_require_admin()),
) -> AlertActionResponse:
"""忽略告警alert_status → ignored。"""
new_status = await _admin_svc.ignore_alert(log_id)
return AlertActionResponse(id=log_id, alert_status=new_status)
# ── 按需执行单个 Appadmin-web 重新生成按钮用)──────────
_SUPPORTED_APP_TYPES = {
"app2_finance",
"app2a_finance_area", # 2026-04-23 新增:区域财务洞察
"app3_clue",
"app4_analysis",
"app5_tactics",
"app6_note",
"app7_customer",
"app8_consolidation",
}
@router.post("/run/{app_type}", response_model=RunAppResponse)
async def run_single_app(
app_type: str,
body: RunAppRequest,
user: CurrentUser = Depends(_require_admin()),
) -> RunAppResponse:
"""按需执行单个 App跳过链路编排。
使用场景admin-web 缓存详情页 / 告警页的"重新生成"按钮。
熔断/限流/预算检查由 dispatcher._run_step 自动执行。
结果写入 ai_cache失败不抛异常通过 success=False 返回。
"""
if app_type not in _SUPPORTED_APP_TYPES:
raise HTTPException(
status_code=400,
detail=f"不支持的 app_type: {app_type};支持 {sorted(_SUPPORTED_APP_TYPES)}",
)
from app.ai.dispatcher import get_dispatcher
try:
dispatcher = get_dispatcher()
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
context = body.model_dump(exclude_none=True)
try:
result = await dispatcher.run_single_app(
app_type=app_type,
context=context,
triggered_by=f"admin:{user.user_id}",
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
if result is None:
return RunAppResponse(
app_type=app_type,
success=False,
error="AI 调用失败(详情见 ai_run_logs可能为熔断/限流/预算/超时)",
)
return RunAppResponse(app_type=app_type, success=True, result=result)
# ── 触发器管理biz.trigger_jobs─────────────────────────
@router.get("/triggers", response_model=list[TriggerItem])
async def list_triggers(
_user: CurrentUser = Depends(_require_admin()),
) -> list[TriggerItem]:
"""列出所有 AI 相关触发器job_type=ai_* 或 task_generator"""
rows = await _admin_svc.list_triggers()
return [TriggerItem(**r) for r in rows]
@router.patch("/triggers/{trigger_id}", response_model=TriggerItem)
async def update_trigger(
trigger_id: int,
body: TriggerUpdateRequest,
_user: CurrentUser = Depends(_require_admin()),
) -> TriggerItem:
"""更新触发器:启用/禁用、修改 cron 表达式、修改描述。"""
try:
row = await _admin_svc.update_trigger(
trigger_id,
status_new=body.status,
cron_expression=body.cron_expression,
description=body.description,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return TriggerItem(**row)
# ── 预热进度查询 ─────────────────────────────────────────
@router.get("/prewarm/progress", response_model=PrewarmProgressResponse)
async def get_prewarm_progress(
site_id: int = Query(..., description="门店 ID"),
_user: CurrentUser = Depends(_require_admin()),
) -> PrewarmProgressResponse:
"""查询 app2_finance 72 组合预热进度done / missing"""
data = await _admin_svc.get_prewarm_progress(site_id)
return PrewarmProgressResponse(**data)
# ── 手动事件触发(跨越去重)──────────────────────────────
@router.post("/trigger-event", response_model=ManualTriggerResponse)
async def manual_trigger_event(
body: ManualTriggerRequest,
user: CurrentUser = Depends(_require_admin()),
) -> ManualTriggerResponse:
"""手动触发 AI 事件链,默认 is_forced=True 跳过去重。
事件类型consumption / dws_completed / note_created / task_assigned
"""
from app.ai.dispatcher import TriggerEvent, get_dispatcher
valid_events = {"consumption", "dws_completed", "note_created", "task_assigned"}
if body.event_type not in valid_events:
raise HTTPException(
status_code=400,
detail=f"非法 event_type: {body.event_type};支持 {sorted(valid_events)}",
)
try:
dispatcher = get_dispatcher()
except RuntimeError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
payload = dict(body.payload or {})
if body.assistant_id is not None:
payload.setdefault("assistant_id", body.assistant_id)
event = TriggerEvent(
event_type=body.event_type,
site_id=body.site_id,
member_id=body.member_id,
payload=payload,
is_forced=body.is_forced,
)
logger.info(
"admin 手动触发事件: user=%s event=%s site_id=%s member_id=%s forced=%s",
user.user_id, body.event_type, body.site_id, body.member_id, body.is_forced,
)
job_id = await dispatcher.handle_trigger(event)
return ManualTriggerResponse(trigger_job_id=job_id)