# -*- coding: utf-8 -*- """ 有效期轮询器(Task Expiry Checker) 每小时运行一次,检查 expires_at 不为 NULL 且已过期的 active 任务, 将其标记为 inactive 并记录 history。 由 trigger_jobs 中的 task_expiry_check 配置驱动。 """ import json import logging from app.services.runtime_context import as_runtime_now_param, task_runtime_filter from app.trace.decorators import trace_service logger = logging.getLogger(__name__) def _get_connection(): """延迟导入 get_connection,避免模块级导入失败。""" from app.database import get_connection return get_connection() def _insert_history( cur, task_id: int, action: str, old_status: str | None = None, new_status: str | None = None, old_task_type: str | None = None, new_task_type: str | None = None, detail: dict | None = None, ) -> None: """在 coach_task_history 中记录变更。""" cur.execute( """ INSERT INTO biz.coach_task_history (task_id, action, old_status, new_status, old_task_type, new_task_type, detail) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( task_id, action, old_status, new_status, old_task_type, new_task_type, json.dumps(detail) if detail else None, ), ) @trace_service(description_zh="执行任务过期检查", description_en="Run task expiry check") def run() -> dict: """ 有效期轮询主流程。 1. SELECT id, task_type FROM biz.coach_tasks WHERE expires_at IS NOT NULL AND expires_at < NOW() AND status = 'active' 2. 逐条 UPDATE status = 'inactive' 3. INSERT coach_task_history (action='expired') 每条过期任务独立事务,失败不影响其他。 返回: {"expired_count": int} """ expired_count = 0 conn = _get_connection() try: # 查询所有已过期的 active 任务。沙箱模式按业务时间判断,并只处理当前运行实例。 expired_tasks = [] with conn.cursor() as cur: cur.execute("SELECT site_id FROM biz.sites WHERE is_active = true") site_ids = [row[0] for row in cur.fetchall()] for site_id in site_ids: runtime_now = as_runtime_now_param(site_id, conn=conn) runtime_clause, runtime_params = task_runtime_filter(site_id, conn=conn) cur.execute( f""" SELECT id, task_type, site_id FROM biz.coach_tasks WHERE site_id = %s {runtime_clause} AND expires_at IS NOT NULL AND expires_at < %s AND status = 'active' """, [site_id, *runtime_params, runtime_now], ) expired_tasks.extend(cur.fetchall()) conn.commit() # 逐条处理,每条独立事务 for task_id, task_type, site_id in expired_tasks: try: runtime_now = as_runtime_now_param(site_id, conn=conn) with conn.cursor() as cur: cur.execute("BEGIN") cur.execute( """ UPDATE biz.coach_tasks SET status = 'inactive', updated_at = %s WHERE id = %s AND status = 'active' """, (runtime_now, task_id), ) _insert_history( cur, task_id, action="expired", old_status="active", new_status="inactive", old_task_type=task_type, new_task_type=task_type, ) conn.commit() expired_count += 1 except Exception: logger.exception("处理过期任务失败: task_id=%s", task_id) conn.rollback() finally: conn.close() logger.info("有效期轮询完成: expired_count=%d", expired_count) return {"expired_count": expired_count}