Files
Neo-ZQYY/apps/backend/app/services/scheduler.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

385 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""调度器服务
后台 asyncio 循环,每 30 秒检查一次到期的调度任务,
将其 TaskConfig 入队到 TaskQueue。
核心逻辑:
- check_and_enqueue():查询 enabled=true 且 next_run_at <= now 的调度任务
- start() / stop():管理后台循环生命周期
- _calculate_next_run():根据 ScheduleConfig 计算下次执行时间
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timedelta, timezone
from ..database import get_connection
from ..schemas.schedules import ScheduleConfigSchema
from ..schemas.tasks import TaskConfigSchema
from .task_queue import task_queue
from app.trace.decorators import trace_service
logger = logging.getLogger(__name__)
# 调度器轮询间隔(秒)
SCHEDULER_POLL_INTERVAL = 30
def _parse_time(time_str: str) -> tuple[int, int]:
"""解析 HH:MM 格式的时间字符串,返回 (hour, minute)。"""
parts = time_str.split(":")
return int(parts[0]), int(parts[1])
def _convert_interval_to_seconds(value: int, unit: str) -> int:
"""将间隔值转换为秒数。
Args:
value: 间隔数值0 = 无限制)
unit: 间隔单位,支持 "minutes""hours""days"
Returns:
对应的秒数value <= 0 时返回 0
"""
if value <= 0:
return 0
multipliers = {"minutes": 60, "hours": 3600, "days": 86400}
return value * multipliers.get(unit, 60)
@trace_service(description_zh="calculate_next_run", description_en="Calculate Next Run")
def calculate_next_run(
schedule_config: ScheduleConfigSchema,
now: datetime | None = None,
) -> datetime | None:
"""根据调度配置计算下次执行时间。
Args:
schedule_config: 调度配置
now: 当前时间(默认 UTC now方便测试注入
Returns:
下次执行时间UTConce 类型返回 None 表示不再执行
"""
if now is None:
now = datetime.now(timezone.utc)
stype = schedule_config.schedule_type
if stype == "once":
# 一次性任务执行后不再调度
return None
if stype == "interval":
unit_map = {
"minutes": timedelta(minutes=schedule_config.interval_value),
"hours": timedelta(hours=schedule_config.interval_value),
"days": timedelta(days=schedule_config.interval_value),
}
delta = unit_map.get(schedule_config.interval_unit)
if delta is None:
logger.warning("未知的 interval_unit: %s", schedule_config.interval_unit)
return None
return now + delta
if stype == "daily":
hour, minute = _parse_time(schedule_config.daily_time)
# 计算明天的 daily_time
tomorrow = now + timedelta(days=1)
return tomorrow.replace(hour=hour, minute=minute, second=0, microsecond=0)
if stype == "weekly":
hour, minute = _parse_time(schedule_config.weekly_time)
days = sorted(schedule_config.weekly_days) if schedule_config.weekly_days else [1]
# ISO weekday: 1=Monday ... 7=Sunday
current_weekday = now.isoweekday()
# 找到下一个匹配的 weekday
for day in days:
if day > current_weekday:
delta_days = day - current_weekday
next_dt = now + timedelta(days=delta_days)
return next_dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
# 本周没有更晚的 weekday跳到下周第一个
first_day = days[0]
delta_days = 7 - current_weekday + first_day
next_dt = now + timedelta(days=delta_days)
return next_dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
if stype == "cron":
# 简单 cron 解析:仅支持 "minute hour * * *" 格式(每日定时)
# 复杂 cron 表达式可后续引入 croniter 库
return _parse_simple_cron(schedule_config.cron_expression, now)
logger.warning("未知的 schedule_type: %s", stype)
return None
def _parse_simple_cron(expression: str, now: datetime) -> datetime | None:
"""简单 cron 解析器,支持基本的 5 字段格式。
支持的格式:
- "M H * * *" → 每天 H:M
- "M H * * D" → 每周 D 的 H:MD 为 0-60=Sunday
- 其他格式回退到每天 04:00
不支持范围、列表、步进等高级语法。如需完整 cron 支持,
可在 pyproject.toml 中添加 croniter 依赖。
"""
parts = expression.strip().split()
if len(parts) != 5:
logger.warning("无法解析 cron 表达式: %s,回退到明天 04:00", expression)
tomorrow = now + timedelta(days=1)
return tomorrow.replace(hour=4, minute=0, second=0, microsecond=0)
minute_str, hour_str, dom, month, dow = parts
try:
minute = int(minute_str) if minute_str != "*" else 0
hour = int(hour_str) if hour_str != "*" else 0
except ValueError:
logger.warning("cron 表达式时间字段无法解析: %s,回退到明天 04:00", expression)
tomorrow = now + timedelta(days=1)
return tomorrow.replace(hour=4, minute=0, second=0, microsecond=0)
# 如果指定了 day-of-week非 *
if dow != "*":
try:
cron_dow = int(dow) # 0=Sunday, 1=Monday, ..., 6=Saturday
except ValueError:
tomorrow = now + timedelta(days=1)
return tomorrow.replace(hour=hour, minute=minute, second=0, microsecond=0)
# 转换为 ISO weekday1=Monday, 7=Sunday
iso_dow = 7 if cron_dow == 0 else cron_dow
current_iso = now.isoweekday()
if iso_dow > current_iso:
delta_days = iso_dow - current_iso
elif iso_dow < current_iso:
delta_days = 7 - current_iso + iso_dow
else:
# 同一天,看时间是否已过
target_today = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if now < target_today:
delta_days = 0
else:
delta_days = 7
next_dt = now + timedelta(days=delta_days)
return next_dt.replace(hour=hour, minute=minute, second=0, microsecond=0)
# 每天定时dom=* month=* dow=*
tomorrow = now + timedelta(days=1)
return tomorrow.replace(hour=hour, minute=minute, second=0, microsecond=0)
class Scheduler:
"""基于 PostgreSQL 的定时调度器
后台 asyncio 循环每 SCHEDULER_POLL_INTERVAL 秒检查一次到期任务,
将其 TaskConfig 入队到 TaskQueue。
"""
def __init__(self) -> None:
self._running = False
self._loop_task: asyncio.Task | None = None
# ------------------------------------------------------------------
# 核心:检查到期任务并入队
# ------------------------------------------------------------------
def check_and_enqueue(self) -> int:
"""查询 enabled=true 且 next_run_at <= now 的调度任务,将其入队。
Returns:
本次入队的任务数量
"""
conn = get_connection()
enqueued = 0
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, site_id, task_config, schedule_config,
min_run_interval_value, min_run_interval_unit,
last_run_at, last_status, min_run_intervals
FROM scheduled_tasks
WHERE enabled = TRUE
AND next_run_at IS NOT NULL
AND next_run_at <= NOW()
ORDER BY next_run_at ASC
"""
)
rows = cur.fetchall()
now = datetime.now(timezone.utc)
for row in rows:
task_id = str(row[0])
site_id = row[1]
task_config_raw = row[2] if isinstance(row[2], dict) else json.loads(row[2])
schedule_config_raw = row[3] if isinstance(row[3], dict) else json.loads(row[3])
min_interval_value = row[4] or 0
min_interval_unit = row[5] or "minutes"
last_run_at = row[6]
last_status = row[7]
# per-task 间隔:取所有任务中最大的间隔作为有效间隔
min_run_intervals_raw = row[8] if isinstance(row[8], dict) else json.loads(row[8]) if row[8] else {}
# 计算有效间隔per-task 最大值 vs schedule 级别,取较大者
effective_interval_seconds = _convert_interval_to_seconds(
min_interval_value, min_interval_unit
)
for _task_code, interval_cfg in min_run_intervals_raw.items():
if isinstance(interval_cfg, dict):
task_seconds = _convert_interval_to_seconds(
interval_cfg.get("value", 0),
interval_cfg.get("unit", "minutes"),
)
if task_seconds > effective_interval_seconds:
effective_interval_seconds = task_seconds
try:
config = TaskConfigSchema(**task_config_raw)
schedule_cfg = ScheduleConfigSchema(**schedule_config_raw)
except Exception:
logger.exception("调度任务 [%s] 配置反序列化失败,跳过", task_id)
continue
# 1. 并发检查:上次仍在运行中 → 跳过
if last_status == "running":
logger.warning(
"调度任务 [%s] skipped_concurrent上次执行仍在运行中",
task_id,
)
continue
# 2. 间隔检查:最小运行间隔未到 → 跳过并推进 next_run_at
if effective_interval_seconds > 0 and last_run_at is not None:
elapsed = (now - last_run_at).total_seconds()
if elapsed < effective_interval_seconds:
# 推进 next_run_at = last_run_at + interval
next_run_at_pushed = last_run_at + timedelta(
seconds=effective_interval_seconds
)
with conn.cursor() as cur:
cur.execute(
"""
UPDATE scheduled_tasks
SET next_run_at = %s,
updated_at = NOW()
WHERE id = %s
""",
(next_run_at_pushed, task_id),
)
conn.commit()
logger.info(
"调度任务 [%s] skipped_interval最小间隔未到"
"(已过 %.0fs / 需 %dsnext_run_at 推进至 %s",
task_id,
elapsed,
effective_interval_seconds,
next_run_at_pushed,
)
continue
# 3. 正常入队
try:
queue_id = task_queue.enqueue(config, site_id, schedule_id=task_id)
logger.info(
"调度任务 [%s] 入队成功 → queue_id=%s site_id=%s",
task_id, queue_id, site_id,
)
enqueued += 1
except Exception:
logger.exception("调度任务 [%s] 入队失败", task_id)
continue
# 更新调度任务状态
next_run = calculate_next_run(schedule_cfg, now)
with conn.cursor() as cur:
cur.execute(
"""
UPDATE scheduled_tasks
SET last_run_at = NOW(),
run_count = run_count + 1,
next_run_at = %s,
last_status = 'enqueued',
updated_at = NOW()
WHERE id = %s
""",
(next_run, task_id),
)
conn.commit()
except Exception:
logger.exception("check_and_enqueue 执行异常")
try:
conn.rollback()
except Exception:
pass
finally:
conn.close()
if enqueued > 0:
logger.info("本轮调度检查:%d 个任务入队", enqueued)
return enqueued
# ------------------------------------------------------------------
# 后台循环
# ------------------------------------------------------------------
async def _loop(self) -> None:
"""后台 asyncio 循环,每 SCHEDULER_POLL_INTERVAL 秒检查一次。"""
self._running = True
logger.info("Scheduler 后台循环启动(间隔 %ds", SCHEDULER_POLL_INTERVAL)
while self._running:
try:
# 在线程池中执行同步数据库操作,避免阻塞事件循环
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.check_and_enqueue)
# CHANGE 2026-03-23 | 同时检查 trigger_jobs 中到期的 cron/interval 任务
from app.services.trigger_scheduler import check_scheduled_jobs
await loop.run_in_executor(None, check_scheduled_jobs)
except Exception:
logger.exception("Scheduler 循环迭代异常")
await asyncio.sleep(SCHEDULER_POLL_INTERVAL)
logger.info("Scheduler 后台循环停止")
# ------------------------------------------------------------------
# 生命周期
# ------------------------------------------------------------------
def start(self) -> None:
"""启动后台调度循环(在 FastAPI lifespan 中调用)。"""
if self._loop_task is None or self._loop_task.done():
self._loop_task = asyncio.create_task(self._loop())
logger.info("Scheduler 已启动")
async def stop(self) -> None:
"""停止后台调度循环。"""
self._running = False
if self._loop_task and not self._loop_task.done():
self._loop_task.cancel()
try:
await self._loop_task
except asyncio.CancelledError:
pass
self._loop_task = None
logger.info("Scheduler 已停止")
# 全局单例
scheduler = Scheduler()