Files
Neo-ZQYY/apps/backend/app/services/trigger_scheduler.py
2026-03-15 10:15:02 +08:00

182 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
触发器调度框架Trigger Scheduler
统一管理 cron/interval/event 三种触发方式,驱动后台任务执行。
- cron/interval 类型通过 check_scheduled_jobs() 轮询 next_run_at 触发
- event 类型通过 fire_event() 方法直接触发
- 每个 job 独立事务,失败不影响其他触发器
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Callable
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免纯函数测试时触发模块级导入失败。"""
from app.database import get_connection
return get_connection()
# job_type → 执行函数的注册表
_JOB_REGISTRY: dict[str, Callable] = {}
def register_job(job_type: str, handler: Callable) -> None:
"""注册 job_type 对应的执行函数。"""
_JOB_REGISTRY[job_type] = handler
def update_job_last_run_at(cur, job_id: int) -> None:
"""
在 handler 的事务内更新 last_run_at。
handler 在最终 commit 前调用此函数,将 last_run_at 更新纳入同一事务。
handler 成功 → last_run_at 随事务一起 commit。
handler 失败 → last_run_at 随事务一起 rollback。
"""
cur.execute(
"UPDATE biz.trigger_jobs SET last_run_at = NOW() WHERE id = %s",
(job_id,),
)
def fire_event(event_name: str, payload: dict[str, Any] | None = None) -> int:
"""
触发事件驱动型任务。
查找 trigger_condition='event' 且 trigger_config.event_name 匹配的 enabled job
立即执行对应的 handler。
事务安全:将 job_id 传入 handler由 handler 在最终 commit 前
更新 last_run_at保证 handler 数据变更与 last_run_at 在同一事务中。
handler 失败时整个事务回滚last_run_at 不更新。
返回: 执行的 job 数量
"""
conn = _get_connection()
executed = 0
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, job_type, job_name
FROM biz.trigger_jobs
WHERE status = 'enabled'
AND trigger_condition = 'event'
AND trigger_config->>'event_name' = %s
""",
(event_name,),
)
rows = cur.fetchall()
conn.commit()
for job_id, job_type, job_name in rows:
handler = _JOB_REGISTRY.get(job_type)
if not handler:
logger.warning(
"未注册的 job_type: %s (job_name=%s)", job_type, job_name
)
continue
try:
# 将 job_id 传入 handlerhandler 在最终 commit 前更新 last_run_at
handler(payload=payload, job_id=job_id)
executed += 1
except Exception:
logger.exception("触发器 %s 执行失败", job_name)
finally:
conn.close()
return executed
def check_scheduled_jobs() -> int:
"""
检查 cron/interval 类型的到期 job 并执行。
由 Scheduler 后台循环调用。
事务安全:将 conn 和 job_id 传入 handler由 handler 在最终 commit 前
更新 last_run_at 和 next_run_at保证 handler 数据变更与时间戳在同一事务中。
handler 失败时整个事务回滚。
返回: 执行的 job 数量
"""
conn = _get_connection()
executed = 0
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, job_type, job_name, trigger_condition, trigger_config
FROM biz.trigger_jobs
WHERE status = 'enabled'
AND trigger_condition IN ('cron', 'interval')
AND (next_run_at IS NULL OR next_run_at <= NOW())
ORDER BY next_run_at ASC NULLS FIRST
""",
)
rows = cur.fetchall()
conn.commit()
for job_id, job_type, job_name, trigger_condition, trigger_config in rows:
handler = _JOB_REGISTRY.get(job_type)
if not handler:
logger.warning("未注册的 job_type: %s", job_type)
continue
try:
# cron/interval handler 接受 conn + job_id在最终 commit 前更新时间戳
handler(conn=conn, job_id=job_id)
# 计算 next_run_at 并更新(在 handler commit 后的新事务中)
next_run = _calculate_next_run(trigger_condition, trigger_config)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.trigger_jobs
SET last_run_at = NOW(), next_run_at = %s
WHERE id = %s
""",
(next_run, job_id),
)
conn.commit()
executed += 1
except Exception:
logger.exception("触发器 %s 执行失败", job_name)
conn.rollback()
finally:
conn.close()
return executed
def _calculate_next_run(
trigger_condition: str, trigger_config: dict
) -> datetime | None:
"""
根据触发条件和配置计算下次运行时间。
- interval: now + interval_seconds
- cron: 复用 scheduler._parse_simple_cron
- event: 返回 None无 next_run_at
"""
now = datetime.now(timezone.utc)
if trigger_condition == "interval":
seconds = trigger_config.get("interval_seconds", 3600)
return now + timedelta(seconds=seconds)
elif trigger_condition == "cron":
# 延迟导入:支持从 monorepo 根目录和 apps/backend/ 两种路径导入
try:
from app.services.scheduler import _parse_simple_cron
except ModuleNotFoundError:
from apps.backend.app.services.scheduler import _parse_simple_cron
return _parse_simple_cron(
trigger_config.get("cron_expression", "0 7 * * *"), now
)
return None # event 类型无 next_run_at