涵盖(每条对应已存的审计记录): - AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生) audit: 2026-04-20__ai-module-complete.md - admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager audit: 2026-04-21__admin-web-ai-management-suite.md - App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance) audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md - App2 prewarm 全过滤器 + AI 触发器 cron reschedule audit: 2026-04-21__app2-finance-prewarm-all-filters.md migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql - AppType 联合类型对齐 + adminAiAppTypes.test.ts audit: 2026-04-30__admin_web_ai_app_type_alignment.md - DashScope tokens_used 提取修复 audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md - App3 线索完整详情 prompt audit: 2026-05-01__backend_app3_full_detail_prompt.md - Runtime Context 沙箱(5-1~5-2 主线): - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts - migration: 20260501__runtime_context_sandbox.sql - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py - database/changes: 7 份 sandbox_* 验证报告 - 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整 + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py) 合规: - .gitignore 启用 tmp/ 排除 - 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留) 待验证清单: - docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md 每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
383 lines
13 KiB
Python
383 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
触发器调度框架(Trigger Scheduler)
|
||
|
||
统一管理 cron/interval/event 三种触发方式,驱动后台任务执行。
|
||
|
||
- cron/interval 类型通过 check_scheduled_jobs() 轮询 next_run_at 触发
|
||
- event 类型通过 fire_event() 方法直接触发
|
||
- 每个 job 独立事务,失败不影响其他触发器
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import inspect
|
||
import logging
|
||
import threading
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any, Callable
|
||
|
||
from app.trace.decorators import trace_service
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _invoke_handler(handler: Callable, **kwargs: Any) -> Any:
|
||
"""统一调用 handler,自动识别 sync / async。
|
||
|
||
- sync handler:直接返回结果
|
||
- async handler:
|
||
- 当前线程有 running loop → loop.create_task(coro),后台异步执行
|
||
- 当前线程无 running loop → 新起 daemon 线程跑 asyncio.run(coro),不阻塞调用方
|
||
|
||
说明:fire_event / check_scheduled_jobs 是 sync 函数,但部分 handler
|
||
(如 dispatcher 注册的 AI 事件 handler)是 async def,本包装器保证正确调度。
|
||
"""
|
||
result = handler(**kwargs)
|
||
if not inspect.iscoroutine(result):
|
||
return result
|
||
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
loop.create_task(result)
|
||
return None
|
||
except RuntimeError:
|
||
# 同步线程(无 running loop):用后台线程异步执行 coroutine,不阻塞调用方
|
||
threading.Thread(
|
||
target=lambda coro=result: asyncio.run(coro),
|
||
daemon=True,
|
||
).start()
|
||
return None
|
||
|
||
|
||
def _get_connection():
|
||
"""延迟导入 get_connection,避免纯函数测试时触发模块级导入失败。"""
|
||
from app.database import get_connection
|
||
return get_connection()
|
||
|
||
# job_type → 执行函数的注册表
|
||
_JOB_REGISTRY: dict[str, Callable] = {}
|
||
|
||
|
||
@trace_service(description_zh="register_job", description_en="Register Job")
|
||
def register_job(job_type: str, handler: Callable) -> None:
|
||
"""注册 job_type 对应的执行函数。"""
|
||
_JOB_REGISTRY[job_type] = handler
|
||
|
||
|
||
@trace_service(description_zh="update_job_last_run_at", description_en="Update Job Last Run At")
|
||
def update_job_last_run_at(cur, job_id: int) -> None:
|
||
"""
|
||
在 handler 的事务内更新 last_run_at。
|
||
|
||
handler 在最终 commit 前调用此函数,将 last_run_at 更新纳入同一事务。
|
||
handler 成功 → last_run_at 随事务一起 commit。
|
||
handler 失败 → last_run_at 随事务一起 rollback。
|
||
"""
|
||
cur.execute(
|
||
"UPDATE biz.trigger_jobs SET last_run_at = NOW() WHERE id = %s",
|
||
(job_id,),
|
||
)
|
||
|
||
|
||
@trace_service(description_zh="触发调度事件", description_en="Fire scheduler event")
|
||
def fire_event(event_name: str, payload: dict[str, Any] | None = None) -> int:
|
||
"""
|
||
触发事件驱动型任务。
|
||
|
||
查找 trigger_condition='event' 且 trigger_config.event_name 匹配的 enabled job,
|
||
立即执行对应的 handler。
|
||
|
||
事务安全:将 job_id 传入 handler,由 handler 在最终 commit 前
|
||
更新 last_run_at,保证 handler 数据变更与 last_run_at 在同一事务中。
|
||
handler 失败时整个事务回滚,last_run_at 不更新。
|
||
|
||
返回: 执行的 job 数量
|
||
"""
|
||
conn = _get_connection()
|
||
executed = 0
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id, job_type, job_name
|
||
FROM biz.trigger_jobs
|
||
WHERE status = 'enabled'
|
||
AND trigger_condition = 'event'
|
||
AND trigger_config->>'event_name' = %s
|
||
""",
|
||
(event_name,),
|
||
)
|
||
rows = cur.fetchall()
|
||
conn.commit()
|
||
|
||
for job_id, job_type, job_name in rows:
|
||
handler = _JOB_REGISTRY.get(job_type)
|
||
if not handler:
|
||
logger.warning(
|
||
"未注册的 job_type: %s (job_name=%s)", job_type, job_name
|
||
)
|
||
continue
|
||
try:
|
||
# 将 job_id 传入 handler,handler 在最终 commit 前更新 last_run_at
|
||
# async handler 经 _invoke_handler 自动调度
|
||
_invoke_handler(handler, payload=payload, job_id=job_id)
|
||
executed += 1
|
||
except Exception:
|
||
logger.exception("触发器 %s 执行失败", job_name)
|
||
|
||
finally:
|
||
conn.close()
|
||
return executed
|
||
|
||
|
||
@trace_service(description_zh="检查定时任务", description_en="Check scheduled jobs")
|
||
def check_scheduled_jobs() -> int:
|
||
"""
|
||
检查 cron/interval 类型的到期 job 并执行。
|
||
|
||
由 Scheduler 后台循环调用。
|
||
|
||
事务安全:将 conn 和 job_id 传入 handler,由 handler 在最终 commit 前
|
||
更新 last_run_at 和 next_run_at,保证 handler 数据变更与时间戳在同一事务中。
|
||
handler 失败时整个事务回滚。
|
||
|
||
返回: 执行的 job 数量
|
||
"""
|
||
conn = _get_connection()
|
||
executed = 0
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id, job_type, job_name, trigger_condition, trigger_config
|
||
FROM biz.trigger_jobs
|
||
WHERE status = 'enabled'
|
||
AND trigger_condition IN ('cron', 'interval')
|
||
AND (next_run_at IS NULL OR next_run_at <= NOW())
|
||
ORDER BY next_run_at ASC NULLS FIRST
|
||
""",
|
||
)
|
||
rows = cur.fetchall()
|
||
conn.commit()
|
||
|
||
for job_id, job_type, job_name, trigger_condition, trigger_config in rows:
|
||
handler = _JOB_REGISTRY.get(job_type)
|
||
if not handler:
|
||
logger.warning("未注册的 job_type: %s", job_type)
|
||
continue
|
||
try:
|
||
# cron/interval handler 接受 conn + job_id,在最终 commit 前更新时间戳
|
||
# async handler 经 _invoke_handler 自动调度
|
||
_invoke_handler(handler, conn=conn, job_id=job_id)
|
||
# 计算 next_run_at 并更新(在 handler commit 后的新事务中)
|
||
next_run = _calculate_next_run(trigger_condition, trigger_config)
|
||
with conn.cursor() as cur:
|
||
cur.execute("BEGIN")
|
||
cur.execute(
|
||
"""
|
||
UPDATE biz.trigger_jobs
|
||
SET last_run_at = NOW(), next_run_at = %s, last_error = NULL
|
||
WHERE id = %s
|
||
""",
|
||
(next_run, job_id),
|
||
)
|
||
conn.commit()
|
||
executed += 1
|
||
except Exception as exc:
|
||
logger.exception("触发器 %s 执行失败", job_name)
|
||
conn.rollback()
|
||
# 记录错误到 last_error 字段
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"UPDATE biz.trigger_jobs SET last_error = %s WHERE id = %s",
|
||
(str(exc)[:500], job_id),
|
||
)
|
||
conn.commit()
|
||
except Exception:
|
||
logger.debug("记录 last_error 失败", exc_info=True)
|
||
try:
|
||
conn.rollback()
|
||
except Exception:
|
||
pass
|
||
|
||
finally:
|
||
conn.close()
|
||
return executed
|
||
|
||
|
||
def _calculate_next_run(
|
||
trigger_condition: str, trigger_config: dict
|
||
) -> datetime | None:
|
||
"""
|
||
根据触发条件和配置计算下次运行时间。
|
||
|
||
- interval: now + interval_seconds
|
||
- cron: 复用 scheduler._parse_simple_cron
|
||
- event: 返回 None(无 next_run_at)
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
if trigger_condition == "interval":
|
||
seconds = trigger_config.get("interval_seconds", 3600)
|
||
return now + timedelta(seconds=seconds)
|
||
elif trigger_condition == "cron":
|
||
# 延迟导入:支持从 monorepo 根目录和 apps/backend/ 两种路径导入
|
||
try:
|
||
from app.services.scheduler import _parse_simple_cron
|
||
except ModuleNotFoundError:
|
||
from apps.backend.app.services.scheduler import _parse_simple_cron
|
||
|
||
return _parse_simple_cron(
|
||
trigger_config.get("cron_expression", "0 7 * * *"), now
|
||
)
|
||
return None # event 类型无 next_run_at
|
||
|
||
|
||
def check_startup_jobs() -> list[dict]:
|
||
"""
|
||
启动时检查 cron/interval 类型任务今天是否执行过。
|
||
|
||
返回未执行的任务列表,供启动横幅提示。
|
||
不自动执行,由用户通过管理页面手动确认。
|
||
"""
|
||
from datetime import date
|
||
|
||
conn = _get_connection()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id, job_name, trigger_condition, trigger_config,
|
||
last_run_at, description
|
||
FROM biz.trigger_jobs
|
||
WHERE status = 'enabled'
|
||
AND trigger_condition IN ('cron', 'interval')
|
||
ORDER BY id
|
||
"""
|
||
)
|
||
rows = cur.fetchall()
|
||
conn.commit()
|
||
|
||
today = date.today()
|
||
pending = []
|
||
for row in rows:
|
||
job_id, job_name, trigger_condition, trigger_config, last_run_at, description = row
|
||
ran_today = False
|
||
if last_run_at is not None:
|
||
# last_run_at 可能带时区,取 date 部分比较
|
||
ran_today = last_run_at.date() == today if hasattr(last_run_at, 'date') else False
|
||
if not ran_today:
|
||
pending.append({
|
||
"id": job_id,
|
||
"job_name": job_name,
|
||
"trigger_condition": trigger_condition,
|
||
"description": description or job_name,
|
||
"last_run_at": str(last_run_at) if last_run_at else "从未执行",
|
||
})
|
||
return pending
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def run_job_by_id(job_id: int) -> dict:
|
||
"""
|
||
手动触发指定 job(通过管理页面调用)。
|
||
|
||
返回 {"success": bool, "message": str}。
|
||
"""
|
||
conn = _get_connection()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id, job_type, job_name, trigger_condition, trigger_config
|
||
FROM biz.trigger_jobs
|
||
WHERE id = %s
|
||
""",
|
||
(job_id,),
|
||
)
|
||
row = cur.fetchone()
|
||
conn.commit()
|
||
|
||
if not row:
|
||
return {"success": False, "message": f"任务 {job_id} 不存在"}
|
||
|
||
_, job_type, job_name, trigger_condition, trigger_config = row
|
||
handler = _JOB_REGISTRY.get(job_type)
|
||
if not handler:
|
||
return {"success": False, "message": f"任务 {job_name} 未注册处理器"}
|
||
|
||
try:
|
||
_invoke_handler(handler)
|
||
# 更新 last_run_at 和 next_run_at
|
||
next_run = _calculate_next_run(trigger_condition, trigger_config)
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
UPDATE biz.trigger_jobs
|
||
SET last_run_at = NOW(), next_run_at = %s, last_error = NULL
|
||
WHERE id = %s
|
||
""",
|
||
(next_run, job_id),
|
||
)
|
||
conn.commit()
|
||
return {"success": True, "message": f"任务 {job_name} 执行成功"}
|
||
except Exception as exc:
|
||
logger.exception("手动触发 %s 失败", job_name)
|
||
conn.rollback()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"UPDATE biz.trigger_jobs SET last_error = %s WHERE id = %s",
|
||
(str(exc)[:500], job_id),
|
||
)
|
||
conn.commit()
|
||
except Exception:
|
||
try:
|
||
conn.rollback()
|
||
except Exception:
|
||
pass
|
||
return {"success": False, "message": f"任务 {job_name} 执行失败: {str(exc)[:200]}"}
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def list_trigger_jobs() -> list[dict]:
|
||
"""
|
||
获取所有 trigger_jobs 列表(管理页面用)。
|
||
"""
|
||
conn = _get_connection()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT id, job_type, job_name, trigger_condition, trigger_config,
|
||
last_run_at, next_run_at, status, description, last_error,
|
||
created_at
|
||
FROM biz.trigger_jobs
|
||
ORDER BY id
|
||
"""
|
||
)
|
||
rows = cur.fetchall()
|
||
conn.commit()
|
||
|
||
result = []
|
||
for row in rows:
|
||
result.append({
|
||
"id": row[0],
|
||
"job_type": row[1],
|
||
"job_name": row[2],
|
||
"trigger_condition": row[3],
|
||
"trigger_config": row[4],
|
||
"last_run_at": row[5].isoformat() if row[5] else None,
|
||
"next_run_at": row[6].isoformat() if row[6] else None,
|
||
"status": row[7],
|
||
"description": row[8],
|
||
"last_error": row[9],
|
||
"created_at": row[10].isoformat() if row[10] else None,
|
||
})
|
||
return result
|
||
finally:
|
||
conn.close()
|