Files
Neo-ZQYY/apps/backend/app/services/ai/admin_service.py
Neo 1baa21222b fix(backend): F1-5a 走查发现 2 个生产 bug
- xcx_runtime_clock.py: require_approved 是 factory,Depends 必须 ()
  调用,否则 user 是 function 不是 CurrentUser → AttributeError 500
  → 沙盒在小程序所有页面失效的根因(getBusinessClock 一直降级 localFallback)

- admin_service.py:retry_trigger_job INSERT payload 字段是 jsonb,
  psycopg2 读出是 dict,未 Json() wrap 直接 INSERT 触发
  "can't adapt type 'dict'" → 生产环境点重试必 500
  (该 bug 在 6f8f1231 即引入,F1-5a 走查时通过 SQL 复现端到端验证暴露)

走查覆盖:
- xcx_runtime_clock: 修后小程序 GET /api/xcx/runtime/clock 200,
  返回完整 sandbox ctx(business_date / sandbox_instance_id)
- retry_trigger_job: SQL 复现 INSERT 包含真实 jsonb payload
  ({foo:bar,n:42}),修后 runtime_mode=sandbox + sandbox_instance_id
  + payload 完整保留全部 PASS

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 11:52:40 +08:00

1006 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 监控后台聚合服务层。
提供 Dashboard 总览、调度任务管理、调用记录查询、缓存失效、
Token 预算、批量执行(含成本二次确认)、告警管理等功能。
所有数据库操作使用 psycopg2 同步连接,方法签名为 asyncFastAPI 兼容)。
查询强制 site_id 隔离(当 site_id 参数不为 None 时)。
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from datetime import datetime, timezone, timedelta
from typing import TYPE_CHECKING, Any
import psycopg2.extras
from app.ai.budget_tracker import BudgetTracker
from app.database import get_connection
from app.services.runtime_context import (
RuntimeContext,
get_runtime_context,
runtime_insert_columns,
)
if TYPE_CHECKING:
from app.ai.dispatcher import AIDispatcher
logger = logging.getLogger(__name__)
# 批量执行预估:每次调用平均 Token 消耗
AVG_TOKENS_PER_CALL = 2000
# 批量执行内存存储 TTL
_BATCH_TTL_SECONDS = 600 # 10 分钟
# F1-5a 批量执行并发上限
# Neo 决策 N=5与 dispatcher 现有 circuit_breaker / rate_limiter 配合,不打爆 DashScope 1000 RPM 限制)
_BATCH_CONCURRENCY = 5
class AdminAIService:
"""AI 监控后台聚合服务。"""
def __init__(self, budget_tracker: BudgetTracker | None = None) -> None:
self._budget = budget_tracker
self._batch_store: dict[str, dict] = {} # batch_id → {params, ctx_snapshot, expires_at}
self._dispatcher: AIDispatcher | None = None # F1-5a: lifespan 启动时注入
def set_dispatcher(self, dispatcher: AIDispatcher) -> None:
"""F1-5a: lifespan 启动时注入 dispatcher用于 _run_batch 实际执行 AI 调用。"""
self._dispatcher = dispatcher
# ── Dashboard ─────────────────────────────────────────
async def get_dashboard(
self,
site_id: int | None = None,
range_days: int | None = None,
date_from: str | None = None,
date_to: str | None = None,
) -> dict:
"""聚合所有 Dashboard 数据。
时间范围优先级:
1. 若 date_from / date_to 同时给出(指定日期)→ 闭区间 [from, to]
2. 若 range_days=N → [CURRENT_DATE - (N-1) days, 现在]
3. 默认 range_days=1今日
"""
today_stats = await self._get_range_stats(site_id, range_days, date_from, date_to)
trend_7d = await self._get_7d_trend(site_id)
app_dist = await self._get_app_distribution(site_id)
app_health = await self._get_app_health(site_id)
budget = await self.get_budget()
recent_alerts = await self._get_recent_alerts(site_id)
return {
**today_stats,
"trend_7d": trend_7d,
"app_distribution": app_dist,
"budget": budget,
"recent_alerts": recent_alerts,
"app_health": app_health,
}
async def _get_range_stats(
self,
site_id: int | None,
range_days: int | None,
date_from: str | None,
date_to: str | None,
) -> dict:
"""指定时间段内的调用次数、成功率、Token 消耗、平均延迟。
字段名沿用 today_* 前缀以兼容前端 DashboardResponse schema。
"""
site_clause, site_params = _site_filter(site_id)
if date_from and date_to:
time_clause = "created_at >= %s::date AND created_at < (%s::date + INTERVAL '1 day')"
time_params: tuple = (date_from, date_to)
else:
days = range_days if range_days and range_days > 0 else 1
time_clause = (
"created_at >= CURRENT_DATE - (%s::int - 1) * INTERVAL '1 day' "
"AND created_at < CURRENT_DATE + INTERVAL '1 day'"
)
time_params = (days,)
params = time_params + site_params
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT
COUNT(*) AS total_calls,
COUNT(*) FILTER (WHERE status = 'success') AS success_count,
COALESCE(SUM(tokens_used), 0) AS total_tokens,
COALESCE(AVG(latency_ms) FILTER (WHERE latency_ms IS NOT NULL), 0)
AS avg_latency
FROM biz.ai_run_logs
WHERE {time_clause}
{site_clause}
""",
params,
)
row = cur.fetchone()
conn.commit()
finally:
conn.close()
total, success, tokens, avg_lat = row if row else (0, 0, 0, 0)
rate = round(success / total, 4) if total > 0 else 0.0
return {
"today_calls": total,
"today_success_rate": rate,
"today_tokens": int(tokens),
"today_avg_latency_ms": round(float(avg_lat), 2),
}
async def _get_7d_trend(self, site_id: int | None) -> list[dict]:
"""近 7 天按日聚合。"""
site_clause, params = _site_filter(site_id)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT
created_at::date AS day,
COUNT(*) AS calls,
COUNT(*) FILTER (WHERE status = 'success') AS success_count
FROM biz.ai_run_logs
WHERE created_at >= CURRENT_DATE - INTERVAL '6 days'
{site_clause}
GROUP BY day
ORDER BY day
""",
params,
)
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return [
{
"date": row[0].isoformat(),
"calls": row[1],
"success_rate": round(row[2] / row[1], 4) if row[1] > 0 else 0.0,
}
for row in rows
]
async def _get_app_distribution(self, site_id: int | None) -> list[dict]:
"""各 App 调用占比。"""
site_clause, params = _site_filter(site_id)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT app_type, COUNT(*) AS cnt
FROM biz.ai_run_logs
WHERE created_at >= CURRENT_DATE - INTERVAL '6 days'
{site_clause}
GROUP BY app_type
ORDER BY cnt DESC
""",
params,
)
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
total = sum(r[1] for r in rows) or 1
return [
{
"app_type": row[0],
"count": row[1],
"percentage": round(row[1] / total, 4),
}
for row in rows
]
async def _get_app_health(self, site_id: int | None) -> list[dict]:
"""各 App 最近一次调用状态。"""
site_clause, params = _site_filter(site_id)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT DISTINCT ON (app_type)
app_type,
status AS last_status,
created_at AS last_call_at
FROM biz.ai_run_logs
WHERE TRUE {site_clause}
ORDER BY app_type, created_at DESC
""",
params,
)
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return [
{
"app_type": row[0],
"last_status": row[1],
"last_call_at": row[2].isoformat() if row[2] else None,
}
for row in rows
]
async def _get_recent_alerts(self, site_id: int | None, limit: int = 10) -> list[dict]:
"""最近告警事件Dashboard 用)。"""
site_clause, params = _site_filter(site_id)
params = (*params, limit)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT id, app_type, status, alert_status,
error_message, created_at
FROM biz.ai_run_logs
WHERE status IN ('failed', 'timeout', 'circuit_open')
{site_clause}
ORDER BY created_at DESC
LIMIT %s
""",
params,
)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return [_row_to_dict(cols, r) for r in rows]
# ── 调度任务 ──────────────────────────────────────────
async def list_trigger_jobs(
self, filters: dict, page: int = 1, page_size: int = 20,
) -> dict:
"""分页查询 ai_trigger_jobs + 今日去重统计。"""
where_parts: list[str] = []
params: list = []
for key in ("event_type", "status", "site_id"):
if filters.get(key) is not None:
where_parts.append(f"{key} = %s")
params.append(filters[key])
if filters.get("date_from"):
where_parts.append("created_at >= %s")
params.append(filters["date_from"])
if filters.get("date_to"):
where_parts.append("created_at <= %s")
params.append(filters["date_to"])
where_sql = ("WHERE " + " AND ".join(where_parts)) if where_parts else ""
offset = (page - 1) * page_size
conn = get_connection()
try:
with conn.cursor() as cur:
# 总数
cur.execute(
f"SELECT COUNT(*) FROM biz.ai_trigger_jobs {where_sql}",
params,
)
total = cur.fetchone()[0]
# 分页数据
cur.execute(
f"""
SELECT id, event_type, member_id, status, app_chain,
is_forced, site_id, started_at, finished_at, created_at
FROM biz.ai_trigger_jobs
{where_sql}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(*params, page_size, offset),
)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
# 今日去重跳过数
cur.execute(
"""
SELECT COUNT(*)
FROM biz.ai_trigger_jobs
WHERE status = 'skipped_duplicate'
AND created_at >= CURRENT_DATE
AND created_at < CURRENT_DATE + INTERVAL '1 day'
""",
)
today_skipped = cur.fetchone()[0]
conn.commit()
finally:
conn.close()
return {
"items": [_row_to_dict(cols, r) for r in rows],
"total": total,
"page": page,
"page_size": page_size,
"today_skipped_duplicates": today_skipped,
}
async def get_trigger_job(self, job_id: int) -> dict | None:
"""单条调度任务详情。"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, event_type, member_id, status, app_chain,
is_forced, site_id, started_at, finished_at,
created_at, payload, error_message, connector_type
FROM biz.ai_trigger_jobs
WHERE id = %s
""",
(job_id,),
)
cols = [d[0] for d in cur.description]
row = cur.fetchone()
conn.commit()
finally:
conn.close()
if row is None:
return None
return _row_to_dict(cols, row)
async def retry_trigger_job(self, job_id: int) -> int:
"""创建新 trigger_jobis_forced=true返回新 job_id。
F1-5a: INSERT 显式落 runtime_mode + sandbox_instance_id
与原 trigger_job 的 runtime 上下文保持一致(避免依赖默认值导致重试时丢失 sandbox 标记)。
"""
original = await self.get_trigger_job(job_id)
if original is None:
raise ValueError(f"trigger_job {job_id} 不存在")
site_id = original["site_id"]
cols, placeholders, runtime_params = runtime_insert_columns(site_id)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
INSERT INTO biz.ai_trigger_jobs
(event_type, member_id, site_id, connector_type,
payload, app_chain, is_forced, status, {cols})
VALUES (%s, %s, %s, %s, %s, %s, true, 'pending', {placeholders})
RETURNING id
""",
(
original["event_type"],
original.get("member_id"),
site_id,
original.get("connector_type", "feiqiu"),
# F1-5a 走查发现:psycopg2 把 jsonb 列读成 dict,
# INSERT 时需 Json() 适配,否则 "can't adapt type 'dict'"
psycopg2.extras.Json(original["payload"])
if original.get("payload") is not None
else None,
original.get("app_chain"),
*runtime_params,
),
)
new_id = cur.fetchone()[0]
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return new_id
# ── 调用记录 ──────────────────────────────────────────
async def list_run_logs(
self, filters: dict, page: int = 1, page_size: int = 20,
) -> dict:
"""分页查询 ai_run_logs。"""
where_parts: list[str] = []
params: list = []
for key in ("app_type", "status", "trigger_type", "site_id"):
if filters.get(key) is not None:
where_parts.append(f"{key} = %s")
params.append(filters[key])
if filters.get("date_from"):
where_parts.append("created_at >= %s")
params.append(filters["date_from"])
if filters.get("date_to"):
where_parts.append("created_at <= %s")
params.append(filters["date_to"])
where_sql = ("WHERE " + " AND ".join(where_parts)) if where_parts else ""
offset = (page - 1) * page_size
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"SELECT COUNT(*) FROM biz.ai_run_logs {where_sql}",
params,
)
total = cur.fetchone()[0]
cur.execute(
f"""
SELECT id, app_type, trigger_type, member_id,
tokens_used, latency_ms, status, site_id, created_at
FROM biz.ai_run_logs
{where_sql}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(*params, page_size, offset),
)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return {
"items": [_row_to_dict(cols, r) for r in rows],
"total": total,
"page": page,
"page_size": page_size,
}
async def get_run_log(self, log_id: int) -> dict | None:
"""单条调用记录详情(含完整 prompt/response不脱敏"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, app_type, trigger_type, member_id,
tokens_used, latency_ms, status, site_id,
created_at, request_prompt, response_text,
error_message, session_id, finished_at
FROM biz.ai_run_logs
WHERE id = %s
""",
(log_id,),
)
cols = [d[0] for d in cur.description]
row = cur.fetchone()
conn.commit()
finally:
conn.close()
if row is None:
return None
return _row_to_dict(cols, row)
# ── 缓存管理 ──────────────────────────────────────────
async def invalidate_cache(
self, site_id: int, app_type: str | None = None, member_id: int | None = None,
) -> int:
"""批量缓存失效,返回受影响记录数。"""
where_parts = ["site_id = %s"]
params: list = [site_id]
if app_type is not None:
where_parts.append("cache_type = %s")
params.append(app_type)
if member_id is not None:
where_parts.append("target_id = %s")
params.append(str(member_id))
where_sql = " AND ".join(where_parts)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
UPDATE biz.ai_cache
SET status = 'invalidated'
WHERE {where_sql}
AND status != 'invalidated'
""",
params,
)
affected = cur.rowcount
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# Phase 1.4:广播 cache_invalidated 事件admin-web / 小程序可实时刷新
if affected > 0:
try:
from app.ai.event_bus import AIEvent, get_event_bus
get_event_bus().publish(AIEvent(
type="cache_invalidated",
site_id=site_id,
payload={
"cache_type": app_type,
"member_id": member_id,
"affected": affected,
},
))
except Exception:
logger.debug("cache_invalidated 事件广播失败", exc_info=True)
return affected
# ── Token 预算 ────────────────────────────────────────
async def get_budget(self) -> dict:
"""Token 预算使用情况。"""
if self._budget is not None:
status = self._budget.check_budget()
daily_limit = self._budget.daily_limit
monthly_limit = self._budget.monthly_limit
return {
"daily_used": status.daily_used,
"daily_limit": daily_limit,
"daily_pct": round(status.daily_used / daily_limit, 4) if daily_limit > 0 else 0.0,
"monthly_used": status.monthly_used,
"monthly_limit": monthly_limit,
"monthly_pct": round(status.monthly_used / monthly_limit, 4) if monthly_limit > 0 else 0.0,
}
# 无 BudgetTracker 时直接查询
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT
COALESCE(SUM(tokens_used) FILTER (
WHERE created_at >= CURRENT_DATE
AND created_at < CURRENT_DATE + INTERVAL '1 day'
), 0) AS daily_used,
COALESCE(SUM(tokens_used) FILTER (
WHERE created_at >= date_trunc('month', CURRENT_DATE)
AND created_at < date_trunc('month', CURRENT_DATE) + INTERVAL '1 month'
), 0) AS monthly_used
FROM biz.ai_run_logs
WHERE status = 'success'
""",
)
row = cur.fetchone()
conn.commit()
finally:
conn.close()
daily_used, monthly_used = (int(row[0]), int(row[1])) if row else (0, 0)
daily_limit = 100_000
monthly_limit = 2_000_000
return {
"daily_used": daily_used,
"daily_limit": daily_limit,
"daily_pct": round(daily_used / daily_limit, 4) if daily_limit > 0 else 0.0,
"monthly_used": monthly_used,
"monthly_limit": monthly_limit,
"monthly_pct": round(monthly_used / monthly_limit, 4) if monthly_limit > 0 else 0.0,
}
# ── 批量执行 ──────────────────────────────────────────
async def estimate_batch(
self, app_types: list[str], member_ids: list[int], site_id: int,
) -> dict:
"""生成 batch_id存入内存TTL 10min返回预估。
F1-5a: estimate 阶段抓 RuntimeContext 快照存入 _batch_store
confirm 时取出传给 _run_batch避免 estimate→confirm 间 Neo 切 sandbox 模式造成数据漂移污染。
"""
self._cleanup_expired_batches()
batch_id = uuid.uuid4().hex
estimated_calls = len(app_types) * len(member_ids)
estimated_tokens = estimated_calls * AVG_TOKENS_PER_CALL
# F1-5a: 抓 ctx_snapshot
ctx_snapshot = get_runtime_context(site_id)
self._batch_store[batch_id] = {
"params": {
"app_types": app_types,
"member_ids": member_ids,
"site_id": site_id,
"batch_id": batch_id, # 用于 _run_batch 内 triggered_by 标注
},
"ctx_snapshot": ctx_snapshot,
"expires_at": datetime.now(timezone.utc) + timedelta(seconds=_BATCH_TTL_SECONDS),
}
logger.info(
"批量执行预估: batch_id=%s apps=%s members=%d site_id=%s mode=%s sandbox_date=%s",
batch_id,
app_types,
len(member_ids),
site_id,
ctx_snapshot.mode,
ctx_snapshot.sandbox_date.isoformat() if ctx_snapshot.sandbox_date else None,
)
return {
"batch_id": batch_id,
"estimated_calls": estimated_calls,
"estimated_tokens": estimated_tokens,
}
async def confirm_batch(self, batch_id: str) -> None:
"""取出参数 + ctx_snapshot异步执行批量调用。"""
self._cleanup_expired_batches()
entry = self._batch_store.pop(batch_id, None)
if entry is None:
raise ValueError(f"batch_id 无效或已过期: {batch_id}")
params = entry["params"]
ctx_snapshot: RuntimeContext = entry["ctx_snapshot"]
logger.info(
"批量执行确认: batch_id=%s apps=%s members=%d site_id=%s mode=%s",
batch_id,
params["app_types"],
len(params["member_ids"]),
params["site_id"],
ctx_snapshot.mode,
)
# F1-5a: 后台异步执行,传入 ctx_snapshot 防止运行时 Neo 切模式
asyncio.create_task(self._run_batch(params, ctx_snapshot))
async def _run_batch(
self, params: dict[str, Any], ctx_snapshot: RuntimeContext,
) -> None:
"""F1-5a: 后台批量执行真正实现。
- Semaphore(_BATCH_CONCURRENCY=5) 限并发,避免打爆 DashScope 限流
- asyncio.gather + return_exceptions=True单个 member 失败不连坐
- context 显式带 business_date来自 ctx_snapshotprompt builder 自取沙箱日期
- triggered_by=f"batch:{batch_id}":打标 ai_run_logsWave 2 进度查询基础
"""
if self._dispatcher is None:
logger.error(
"批量执行失败: dispatcher 未注入 batch_id=%s",
params.get("batch_id"),
)
return
app_types: list[str] = params["app_types"]
member_ids: list[int] = params["member_ids"]
site_id: int = params["site_id"]
batch_id: str = params["batch_id"]
sem = asyncio.Semaphore(_BATCH_CONCURRENCY)
triggered_by = f"batch:{batch_id}"
business_date_iso = ctx_snapshot.business_date.isoformat()
async def _run_one(app_type: str, member_id: int) -> None:
async with sem:
try:
await self._dispatcher.run_single_app(
app_type=app_type,
context={
"site_id": site_id,
"member_id": member_id,
"business_date": business_date_iso,
},
triggered_by=triggered_by,
)
except Exception:
# 单个失败已写 ai_run_logs此处仅记录不连坐其他 member
logger.exception(
"批量执行单步失败 batch_id=%s app=%s member=%s",
batch_id, app_type, member_id,
)
tasks = [
asyncio.create_task(_run_one(at, mid))
for at in app_types
for mid in member_ids
]
logger.info(
"批量执行开始: batch_id=%s tasks=%d concurrency=%d mode=%s",
batch_id, len(tasks), _BATCH_CONCURRENCY, ctx_snapshot.mode,
)
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("批量执行完成: batch_id=%s tasks=%d", batch_id, len(tasks))
def _cleanup_expired_batches(self) -> None:
"""清理过期 batch。"""
now = datetime.now(timezone.utc)
expired = [
bid for bid, entry in self._batch_store.items()
if entry["expires_at"] <= now
]
for bid in expired:
del self._batch_store[bid]
if expired:
logger.debug("清理过期 batch: %d", len(expired))
# ── 告警管理 ──────────────────────────────────────────
async def list_alerts(
self,
alert_status: str | None = None,
site_id: int | None = None,
page: int = 1,
page_size: int = 20,
) -> dict:
"""告警列表ai_run_logs WHERE status IN ('failed','timeout','circuit_open')。"""
where_parts = ["status IN ('failed', 'timeout', 'circuit_open')"]
params: list = []
if alert_status is not None:
if alert_status == "pending":
# pending 包含 NULL 和 'pending'
where_parts.append("(alert_status IS NULL OR alert_status = 'pending')")
else:
where_parts.append("alert_status = %s")
params.append(alert_status)
if site_id is not None:
where_parts.append("site_id = %s")
params.append(site_id)
where_sql = "WHERE " + " AND ".join(where_parts)
offset = (page - 1) * page_size
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"SELECT COUNT(*) FROM biz.ai_run_logs {where_sql}",
params,
)
total = cur.fetchone()[0]
cur.execute(
f"""
SELECT id, app_type, status, alert_status,
error_message, created_at
FROM biz.ai_run_logs
{where_sql}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""",
(*params, page_size, offset),
)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return {
"items": [_row_to_dict(cols, r) for r in rows],
"total": total,
"page": page,
"page_size": page_size,
}
async def ack_alert(self, log_id: int) -> str:
"""确认告警alert_status → acknowledged。"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.ai_run_logs
SET alert_status = 'acknowledged'
WHERE id = %s
AND status IN ('failed', 'timeout', 'circuit_open')
""",
(log_id,),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return "acknowledged"
async def ignore_alert(self, log_id: int) -> str:
"""忽略告警alert_status → ignored。"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.ai_run_logs
SET alert_status = 'ignored'
WHERE id = %s
AND status IN ('failed', 'timeout', 'circuit_open')
""",
(log_id,),
)
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return "ignored"
# ── 触发器管理biz.trigger_jobs───────────────────────
async def list_triggers(self) -> list[dict]:
"""列出所有 AI 相关触发器job_type 以 ai_ 开头 + task_generator
返回字段id / job_name / job_type / trigger_condition / trigger_config /
status / description / last_run_at / next_run_at / last_error
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, job_name, job_type, trigger_condition,
trigger_config, status, description,
last_run_at, next_run_at, last_error
FROM biz.trigger_jobs
WHERE job_type LIKE 'ai_%' OR job_name = 'task_generator'
ORDER BY trigger_condition DESC, job_name
"""
)
cols = [d[0] for d in cur.description]
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
return [_row_to_dict(cols, r) for r in rows]
async def update_trigger(
self, trigger_id: int,
status_new: str | None = None,
cron_expression: str | None = None,
description: str | None = None,
) -> dict:
"""更新触发器:启用/禁用、修改 cron、改描述。
仅允许修改 ai_ 前缀或 task_generator 的触发器。
"""
if status_new is not None and status_new not in ("enabled", "disabled"):
raise ValueError(f"非法 status: {status_new}")
sets: list[str] = []
params: list = []
if status_new is not None:
sets.append("status = %s")
params.append(status_new)
if cron_expression is not None:
sets.append("trigger_config = jsonb_set(trigger_config, '{cron_expression}', to_jsonb(%s::text))")
params.append(cron_expression)
if description is not None:
sets.append("description = %s")
params.append(description)
if not sets:
raise ValueError("至少修改一个字段")
params.append(trigger_id)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
UPDATE biz.trigger_jobs
SET {", ".join(sets)}
WHERE id = %s
AND (job_type LIKE 'ai_%%' OR job_name = 'task_generator')
RETURNING id, job_name, job_type, trigger_condition,
trigger_config, status, description,
last_run_at, next_run_at, last_error
""",
params,
)
row = cur.fetchone()
if row is None:
conn.rollback()
raise ValueError("触发器不存在或不可修改")
cols = [d[0] for d in cur.description]
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
return _row_to_dict(cols, row)
# ── 预热进度app2_finance 72 组合)──────────────────────
async def get_prewarm_progress(self, site_id: int) -> dict:
"""查询 app2_finance 72 组合缓存进度。
返回total=72, done=N, missing=[{time_dimension, area}], last_updated
"""
time_dims = (
"this_month", "last_month", "this_week", "last_week",
"this_quarter", "last_quarter", "last_3_months", "last_6_months",
)
areas = (
"all", "hall", "hallA", "hallB", "hallC",
"vip", "snooker", "mahjong", "ktv",
)
expected = {f"{t}__{a}" for t in time_dims for a in areas}
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT target_id, max(created_at) AS last_updated
FROM biz.ai_cache
WHERE cache_type = 'app2_finance'
AND site_id = %s
AND target_id LIKE %s ESCAPE '\\'
GROUP BY target_id
""",
(site_id, r'%\_\_%'),
)
rows = cur.fetchall()
conn.commit()
finally:
conn.close()
done_map = {r[0]: r[1] for r in rows}
missing = sorted(expected - set(done_map.keys()))
last = max(done_map.values()) if done_map else None
return {
"total": len(expected),
"done": len(expected & set(done_map.keys())),
"missing": [
{"target_id": m, "time_dimension": m.split("__")[0], "area": m.split("__")[1]}
for m in missing
],
"last_updated": last.isoformat() if last else None,
}
# ── 工具函数 ──────────────────────────────────────────────
def _site_filter(site_id: int | None) -> tuple[str, tuple]:
"""生成 site_id 过滤子句和参数。"""
if site_id is None:
return "", ()
return "AND site_id = %s", (site_id,)
def _row_to_dict(columns: list[str], row: tuple) -> dict:
"""将数据库行转换为 dict处理 datetime 序列化。"""
result = {}
for col, val in zip(columns, row):
if isinstance(val, datetime):
result[col] = val.isoformat()
else:
result[col] = val
return result