Files
Neo-ZQYY/apps/backend/app/routers/schedules.py

395 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""调度任务 CRUD API
提供 8 个端点:
- GET /api/schedules — 列表(按 site_id 过滤)
- POST /api/schedules — 创建(支持 run_immediately
- PUT /api/schedules/{id} — 更新
- DELETE /api/schedules/{id} — 删除
- PATCH /api/schedules/{id}/toggle — 启用/禁用
- GET /api/schedules/{id}/history — 调度任务执行历史
- POST /api/schedules/{id}/run — 手动执行一次(不更新调度间隔)
所有端点需要 JWT 认证site_id 从 JWT 提取。
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.auth.dependencies import CurrentUser, get_current_user
from app.database import get_connection
from app.schemas.schedules import (
CreateScheduleRequest,
ScheduleResponse,
UpdateScheduleRequest,
)
from app.schemas.execution import ExecutionHistoryItem
from app.schemas.tasks import TaskConfigSchema
from app.services.scheduler import calculate_next_run
from app.services.task_queue import task_queue
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/schedules", tags=["调度管理"])
def _row_to_response(row) -> ScheduleResponse:
"""将数据库行转换为 ScheduleResponse。"""
task_config = row[4] if isinstance(row[4], dict) else json.loads(row[4])
schedule_config = row[5] if isinstance(row[5], dict) else json.loads(row[5])
return ScheduleResponse(
id=str(row[0]),
site_id=row[1],
name=row[2],
task_codes=row[3] or [],
task_config=task_config,
schedule_config=schedule_config,
enabled=row[6],
last_run_at=row[7],
next_run_at=row[8],
run_count=row[9],
last_status=row[10],
created_at=row[11],
updated_at=row[12],
)
# 查询列列表,复用于多个端点
_SELECT_COLS = """
id, site_id, name, task_codes, task_config, schedule_config,
enabled, last_run_at, next_run_at, run_count, last_status,
created_at, updated_at
"""
# ── GET /api/schedules — 列表 ────────────────────────────────
@router.get("", response_model=list[ScheduleResponse])
async def list_schedules(
user: CurrentUser = Depends(get_current_user),
) -> list[ScheduleResponse]:
"""获取当前门店的所有调度任务。"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"SELECT {_SELECT_COLS} FROM scheduled_tasks WHERE site_id = %s ORDER BY created_at DESC",
(user.site_id,),
)
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return [_row_to_response(row) for row in rows]
# ── POST /api/schedules — 创建 ──────────────────────────────
@router.post("", response_model=ScheduleResponse, status_code=status.HTTP_201_CREATED)
async def create_schedule(
body: CreateScheduleRequest,
user: CurrentUser = Depends(get_current_user),
) -> ScheduleResponse:
"""创建调度任务,自动计算 next_run_at。支持 run_immediately 立即入队执行一次。"""
now = datetime.now(timezone.utc)
next_run = calculate_next_run(body.schedule_config, now)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
INSERT INTO scheduled_tasks
(site_id, name, task_codes, task_config, schedule_config, enabled, next_run_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING {_SELECT_COLS}
""",
(
user.site_id,
body.name,
body.task_codes,
json.dumps(body.task_config),
body.schedule_config.model_dump_json(),
body.schedule_config.enabled,
next_run,
),
)
row = cur.fetchone()
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
response = _row_to_response(row)
# 立即执行一次(入队,不影响调度间隔)
if body.run_immediately:
try:
config = TaskConfigSchema(**body.task_config)
config = config.model_copy(update={"store_id": user.site_id})
task_queue.enqueue(config, user.site_id, schedule_id=response.id)
except Exception:
logger.warning("创建调度后立即执行入队失败 schedule_id=%s", response.id, exc_info=True)
return response
# ── PUT /api/schedules/{id} — 更新 ──────────────────────────
@router.put("/{schedule_id}", response_model=ScheduleResponse)
async def update_schedule(
schedule_id: str,
body: UpdateScheduleRequest,
user: CurrentUser = Depends(get_current_user),
) -> ScheduleResponse:
"""更新调度任务,仅更新请求中提供的字段。"""
# 构建动态 SET 子句
set_parts: list[str] = []
params: list = []
if body.name is not None:
set_parts.append("name = %s")
params.append(body.name)
if body.task_codes is not None:
set_parts.append("task_codes = %s")
params.append(body.task_codes)
if body.task_config is not None:
set_parts.append("task_config = %s")
params.append(json.dumps(body.task_config))
if body.schedule_config is not None:
set_parts.append("schedule_config = %s")
params.append(body.schedule_config.model_dump_json())
# 更新调度配置时重新计算 next_run_at
now = datetime.now(timezone.utc)
next_run = calculate_next_run(body.schedule_config, now)
set_parts.append("next_run_at = %s")
params.append(next_run)
if not set_parts:
raise HTTPException(
status_code=422,
detail="至少需要提供一个更新字段",
)
set_parts.append("updated_at = NOW()")
set_clause = ", ".join(set_parts)
params.extend([schedule_id, user.site_id])
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
UPDATE scheduled_tasks
SET {set_clause}
WHERE id = %s AND site_id = %s
RETURNING {_SELECT_COLS}
""",
params,
)
row = cur.fetchone()
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
if row is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="调度任务不存在",
)
return _row_to_response(row)
# ── DELETE /api/schedules/{id} — 删除 ────────────────────────
@router.delete("/{schedule_id}")
async def delete_schedule(
schedule_id: str,
user: CurrentUser = Depends(get_current_user),
) -> dict:
"""删除调度任务。"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM scheduled_tasks WHERE id = %s AND site_id = %s",
(schedule_id, user.site_id),
)
deleted = cur.rowcount
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
if deleted == 0:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="调度任务不存在",
)
return {"message": "调度任务已删除"}
# ── PATCH /api/schedules/{id}/toggle — 启用/禁用 ─────────────
@router.patch("/{schedule_id}/toggle", response_model=ScheduleResponse)
async def toggle_schedule(
schedule_id: str,
user: CurrentUser = Depends(get_current_user),
) -> ScheduleResponse:
"""切换调度任务的启用/禁用状态。
禁用时 next_run_at 置 NULL启用时重新计算 next_run_at。
"""
conn = get_connection()
try:
# 先查询当前状态和调度配置
with conn.cursor() as cur:
cur.execute(
"SELECT enabled, schedule_config FROM scheduled_tasks WHERE id = %s AND site_id = %s",
(schedule_id, user.site_id),
)
row = cur.fetchone()
if row is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="调度任务不存在",
)
current_enabled = row[0]
new_enabled = not current_enabled
if new_enabled:
# 启用:重新计算 next_run_at
schedule_config_raw = row[1] if isinstance(row[1], dict) else json.loads(row[1])
from app.schemas.schedules import ScheduleConfigSchema
schedule_cfg = ScheduleConfigSchema(**schedule_config_raw)
now = datetime.now(timezone.utc)
next_run = calculate_next_run(schedule_cfg, now)
else:
# 禁用next_run_at 置 NULL
next_run = None
with conn.cursor() as cur:
cur.execute(
f"""
UPDATE scheduled_tasks
SET enabled = %s, next_run_at = %s, updated_at = NOW()
WHERE id = %s AND site_id = %s
RETURNING {_SELECT_COLS}
""",
(new_enabled, next_run, schedule_id, user.site_id),
)
updated_row = cur.fetchone()
conn.commit()
except HTTPException:
raise
except Exception:
conn.rollback()
raise
finally:
conn.close()
return _row_to_response(updated_row)
# ── POST /api/schedules/{id}/run — 手动执行一次 ──────────────
@router.post("/{schedule_id}/run")
async def run_schedule_now(
schedule_id: str,
user: CurrentUser = Depends(get_current_user),
) -> dict:
"""手动触发调度任务执行一次,不更新 last_run_at / next_run_at / run_count。
读取调度任务的 task_config构造 TaskConfigSchema 后入队执行。
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT task_config, site_id FROM scheduled_tasks WHERE id = %s AND site_id = %s",
(schedule_id, user.site_id),
)
row = cur.fetchone()
conn.commit()
finally:
conn.close()
if row is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="调度任务不存在",
)
task_config_raw = row[0] if isinstance(row[0], dict) else json.loads(row[0])
config = TaskConfigSchema(**task_config_raw)
config = config.model_copy(update={"store_id": user.site_id})
task_id = task_queue.enqueue(config, user.site_id, schedule_id=schedule_id)
return {"message": "已提交到执行队列", "task_id": task_id}
# ── GET /api/schedules/{id}/history — 执行历史 ────────────────
@router.get("/{schedule_id}/history", response_model=list[ExecutionHistoryItem])
async def get_schedule_history(
schedule_id: str,
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
user: CurrentUser = Depends(get_current_user),
) -> list[ExecutionHistoryItem]:
"""获取调度任务的执行历史记录,按开始时间倒序,支持分页。"""
offset = (page - 1) * page_size
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, site_id, task_codes, status, started_at, finished_at,
exit_code, duration_ms, command, summary, schedule_id
FROM task_execution_log
WHERE schedule_id = %s AND site_id = %s
ORDER BY started_at DESC
LIMIT %s OFFSET %s
""",
(schedule_id, user.site_id, page_size, offset),
)
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return [
ExecutionHistoryItem(
id=str(row[0]),
site_id=row[1],
task_codes=row[2] or [],
status=row[3],
started_at=row[4],
finished_at=row[5],
exit_code=row[6],
duration_ms=row[7],
command=row[8],
summary=row[9],
schedule_id=str(row[10]) if row[10] else None,
)
for row in rows
]