# -*- coding: utf-8 -*- """ 触发器调度框架(Trigger Scheduler) 统一管理 cron/interval/event 三种触发方式,驱动后台任务执行。 - cron/interval 类型通过 check_scheduled_jobs() 轮询 next_run_at 触发 - event 类型通过 fire_event() 方法直接触发 - 每个 job 独立事务,失败不影响其他触发器 """ from __future__ import annotations import logging from datetime import datetime, timedelta, timezone from typing import Any, Callable logger = logging.getLogger(__name__) def _get_connection(): """延迟导入 get_connection,避免纯函数测试时触发模块级导入失败。""" from app.database import get_connection return get_connection() # job_type → 执行函数的注册表 _JOB_REGISTRY: dict[str, Callable] = {} def register_job(job_type: str, handler: Callable) -> None: """注册 job_type 对应的执行函数。""" _JOB_REGISTRY[job_type] = handler def update_job_last_run_at(cur, job_id: int) -> None: """ 在 handler 的事务内更新 last_run_at。 handler 在最终 commit 前调用此函数,将 last_run_at 更新纳入同一事务。 handler 成功 → last_run_at 随事务一起 commit。 handler 失败 → last_run_at 随事务一起 rollback。 """ cur.execute( "UPDATE biz.trigger_jobs SET last_run_at = NOW() WHERE id = %s", (job_id,), ) def fire_event(event_name: str, payload: dict[str, Any] | None = None) -> int: """ 触发事件驱动型任务。 查找 trigger_condition='event' 且 trigger_config.event_name 匹配的 enabled job, 立即执行对应的 handler。 事务安全:将 job_id 传入 handler,由 handler 在最终 commit 前 更新 last_run_at,保证 handler 数据变更与 last_run_at 在同一事务中。 handler 失败时整个事务回滚,last_run_at 不更新。 返回: 执行的 job 数量 """ conn = _get_connection() executed = 0 try: with conn.cursor() as cur: cur.execute( """ SELECT id, job_type, job_name FROM biz.trigger_jobs WHERE status = 'enabled' AND trigger_condition = 'event' AND trigger_config->>'event_name' = %s """, (event_name,), ) rows = cur.fetchall() conn.commit() for job_id, job_type, job_name in rows: handler = _JOB_REGISTRY.get(job_type) if not handler: logger.warning( "未注册的 job_type: %s (job_name=%s)", job_type, job_name ) continue try: # 将 job_id 传入 handler,handler 在最终 commit 前更新 last_run_at handler(payload=payload, job_id=job_id) executed += 1 except Exception: logger.exception("触发器 %s 执行失败", job_name) finally: conn.close() return executed def check_scheduled_jobs() -> int: """ 检查 cron/interval 类型的到期 job 并执行。 由 Scheduler 后台循环调用。 事务安全:将 conn 和 job_id 传入 handler,由 handler 在最终 commit 前 更新 last_run_at 和 next_run_at,保证 handler 数据变更与时间戳在同一事务中。 handler 失败时整个事务回滚。 返回: 执行的 job 数量 """ conn = _get_connection() executed = 0 try: with conn.cursor() as cur: cur.execute( """ SELECT id, job_type, job_name, trigger_condition, trigger_config FROM biz.trigger_jobs WHERE status = 'enabled' AND trigger_condition IN ('cron', 'interval') AND (next_run_at IS NULL OR next_run_at <= NOW()) ORDER BY next_run_at ASC NULLS FIRST """, ) rows = cur.fetchall() conn.commit() for job_id, job_type, job_name, trigger_condition, trigger_config in rows: handler = _JOB_REGISTRY.get(job_type) if not handler: logger.warning("未注册的 job_type: %s", job_type) continue try: # cron/interval handler 接受 conn + job_id,在最终 commit 前更新时间戳 handler(conn=conn, job_id=job_id) # 计算 next_run_at 并更新(在 handler commit 后的新事务中) next_run = _calculate_next_run(trigger_condition, trigger_config) with conn.cursor() as cur: cur.execute("BEGIN") cur.execute( """ UPDATE biz.trigger_jobs SET last_run_at = NOW(), next_run_at = %s WHERE id = %s """, (next_run, job_id), ) conn.commit() executed += 1 except Exception: logger.exception("触发器 %s 执行失败", job_name) conn.rollback() finally: conn.close() return executed def _calculate_next_run( trigger_condition: str, trigger_config: dict ) -> datetime | None: """ 根据触发条件和配置计算下次运行时间。 - interval: now + interval_seconds - cron: 复用 scheduler._parse_simple_cron - event: 返回 None(无 next_run_at) """ now = datetime.now(timezone.utc) if trigger_condition == "interval": seconds = trigger_config.get("interval_seconds", 3600) return now + timedelta(seconds=seconds) elif trigger_condition == "cron": # 延迟导入:支持从 monorepo 根目录和 apps/backend/ 两种路径导入 try: from app.services.scheduler import _parse_simple_cron except ModuleNotFoundError: from apps.backend.app.services.scheduler import _parse_simple_cron return _parse_simple_cron( trigger_config.get("cron_expression", "0 7 * * *"), now ) return None # event 类型无 next_run_at