Files
Neo-ZQYY/apps/backend/app/routers/admin_triggers.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

178 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""管理端 — 触发器统一视图 API
提供 1 个端点:
- GET /api/admin/triggers/unified — 聚合三张表的触发器数据
数据源:
- biz.trigger_jobs业务触发器→ source="biz"
- biz.ai_trigger_jobsAI 事件链,最近 100 条)→ source="ai"
- public.scheduled_tasksETL 调度)→ source="etl"
某数据源查询失败时记录日志,返回其他数据源数据。
需求: 4.1, 4.2, 4.3
"""
from __future__ import annotations
import logging
from fastapi import APIRouter, Depends
from app.auth.dependencies import CurrentUser, get_current_user
from app.database import get_connection
from app.schemas.admin_triggers import UnifiedTriggerItem
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/admin/triggers", tags=["系统管理"])
def _fetch_biz_triggers(conn) -> list[UnifiedTriggerItem]:
"""查询 biz.trigger_jobs映射 source='biz'"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, job_name, trigger_condition, status,
last_run_at, next_run_at, last_error
FROM biz.trigger_jobs
ORDER BY id
"""
)
rows = cur.fetchall()
return [
UnifiedTriggerItem(
id=row[0],
name=row[1],
source="biz",
trigger_condition=row[2] or "",
status=row[3] or "",
last_run_at=str(row[4]) if row[4] is not None else None,
next_run_at=str(row[5]) if row[5] is not None else None,
last_error=row[6],
)
for row in rows
]
def _fetch_ai_triggers(conn) -> list[UnifiedTriggerItem]:
"""查询 biz.ai_trigger_jobs最近 100 条),映射 source='ai'
字段映射DDL 实际列 → UnifiedTriggerItem
- id → id
- event_type → nameai_trigger_jobs 无 job_name 列)
- 'event' → trigger_conditionAI 触发器均为事件驱动)
- status → status
- started_at → last_run_atai_trigger_jobs 无 last_run_at 列)
- None → next_run_at事件驱动无预定下次执行时间
- error_message → last_errorai_trigger_jobs 列名为 error_message
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, event_type, status,
started_at, error_message
FROM biz.ai_trigger_jobs
ORDER BY id DESC
LIMIT 100
"""
)
rows = cur.fetchall()
return [
UnifiedTriggerItem(
id=row[0],
name=row[1] or "",
source="ai",
trigger_condition="event",
status=row[2] or "",
last_run_at=str(row[3]) if row[3] is not None else None,
next_run_at=None,
last_error=row[4],
)
for row in rows
]
def _fetch_etl_triggers(conn) -> list[UnifiedTriggerItem]:
"""查询 public.scheduled_tasks映射 source='etl'
字段映射DDL 实际列 → UnifiedTriggerItem
- id → idUUID转为字符串后取 hashcode 作为 int 不合适,改用 row_number
- name → name
- schedule_config->>'schedule_type' → trigger_condition
- last_status / enabled → status组合判断
- last_run_at → last_run_at
- next_run_at → next_run_at
- None → last_errorscheduled_tasks 无 last_error 列)
注意scheduled_tasks.id 是 UUID 类型UnifiedTriggerItem.id 是 int。
使用 ROW_NUMBER() 生成临时整数 ID加 100000 偏移避免与其他数据源冲突。
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT ROW_NUMBER() OVER (ORDER BY created_at) + 100000 AS row_id,
name,
schedule_config->>'schedule_type' AS schedule_type,
CASE
WHEN enabled = FALSE THEN 'disabled'
WHEN last_status IS NOT NULL THEN last_status
ELSE 'idle'
END AS status,
last_run_at,
next_run_at
FROM scheduled_tasks
ORDER BY created_at
"""
)
rows = cur.fetchall()
return [
UnifiedTriggerItem(
id=int(row[0]),
name=row[1] or "",
source="etl",
trigger_condition=row[2] or "unknown",
status=row[3] or "idle",
last_run_at=str(row[4]) if row[4] is not None else None,
next_run_at=str(row[5]) if row[5] is not None else None,
last_error=None,
)
for row in rows
]
@router.get("/unified", response_model=list[UnifiedTriggerItem])
async def get_unified_triggers(
user: CurrentUser = Depends(get_current_user),
) -> list[UnifiedTriggerItem]:
"""聚合三张表的触发器数据。
依次查询 biz.trigger_jobs、biz.ai_trigger_jobs、scheduled_tasks
某数据源查询失败时记录日志并跳过,返回其他数据源的数据。
"""
results: list[UnifiedTriggerItem] = []
conn = get_connection()
try:
# 数据源 1biz.trigger_jobs
try:
results.extend(_fetch_biz_triggers(conn))
except Exception:
logger.warning("查询 biz.trigger_jobs 失败", exc_info=True)
# 数据源 2biz.ai_trigger_jobs
try:
results.extend(_fetch_ai_triggers(conn))
except Exception:
logger.warning("查询 biz.ai_trigger_jobs 失败", exc_info=True)
# 数据源 3public.scheduled_tasks
try:
results.extend(_fetch_etl_triggers(conn))
except Exception:
logger.warning("查询 scheduled_tasks 失败", exc_info=True)
return results
finally:
conn.close()