Files
Neo-ZQYY/apps/backend/app/ai/cache_service.py

189 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AI 缓存读写服务。
负责 biz.ai_cache 表的 CRUD 和保留策略管理。
所有查询和写入操作强制 site_id 隔离。
"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from app.database import get_connection
logger = logging.getLogger(__name__)
class AICacheService:
"""AI 缓存读写服务。"""
def get_latest(
self,
cache_type: str,
site_id: int,
target_id: str,
) -> dict | None:
"""查询最新缓存记录。
按 (cache_type, site_id, target_id) 查询 created_at 最新的一条。
无记录时返回 None。
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, cache_type, site_id, target_id,
result_json, score, triggered_by,
created_at, expires_at
FROM biz.ai_cache
WHERE cache_type = %s AND site_id = %s AND target_id = %s
ORDER BY created_at DESC
LIMIT 1
""",
(cache_type, site_id, target_id),
)
columns = [desc[0] for desc in cur.description]
row = cur.fetchone()
if row is None:
return None
return _row_to_dict(columns, row)
finally:
conn.close()
def get_history(
self,
cache_type: str,
site_id: int,
target_id: str,
limit: int = 2,
) -> list[dict]:
"""查询历史缓存记录(按 created_at DESC用于 Prompt reference。
无记录时返回空列表。
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, cache_type, site_id, target_id,
result_json, score, triggered_by,
created_at, expires_at
FROM biz.ai_cache
WHERE cache_type = %s AND site_id = %s AND target_id = %s
ORDER BY created_at DESC
LIMIT %s
""",
(cache_type, site_id, target_id, limit),
)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
return [_row_to_dict(columns, row) for row in rows]
finally:
conn.close()
def write_cache(
self,
cache_type: str,
site_id: int,
target_id: str,
result_json: dict,
triggered_by: str | None = None,
score: int | None = None,
expires_at: datetime | None = None,
) -> int:
"""写入缓存记录,返回 id。写入后清理超限记录。"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO biz.ai_cache
(cache_type, site_id, target_id, result_json,
triggered_by, score, expires_at)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
cache_type,
site_id,
target_id,
json.dumps(result_json, ensure_ascii=False),
triggered_by,
score,
expires_at,
),
)
row = cur.fetchone()
conn.commit()
cache_id: int = row[0]
except Exception:
conn.rollback()
raise
finally:
conn.close()
# 写入成功后清理超限记录(失败仅记录警告,不影响写入结果)
try:
deleted = self._cleanup_excess(cache_type, site_id, target_id)
if deleted > 0:
logger.info(
"清理超限缓存: cache_type=%s site_id=%s target_id=%s 删除=%d",
cache_type, site_id, target_id, deleted,
)
except Exception:
logger.warning(
"清理超限缓存失败: cache_type=%s site_id=%s target_id=%s",
cache_type, site_id, target_id,
exc_info=True,
)
return cache_id
def _cleanup_excess(
self,
cache_type: str,
site_id: int,
target_id: str,
max_count: int = 500,
) -> int:
"""清理超限记录,保留最近 max_count 条,返回删除数量。"""
conn = get_connection()
try:
with conn.cursor() as cur:
# 删除超出保留上限的最旧记录
cur.execute(
"""
DELETE FROM biz.ai_cache
WHERE id IN (
SELECT id FROM biz.ai_cache
WHERE cache_type = %s AND site_id = %s AND target_id = %s
ORDER BY created_at DESC
OFFSET %s
)
""",
(cache_type, site_id, target_id, max_count),
)
deleted = cur.rowcount
conn.commit()
return deleted
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _row_to_dict(columns: list[str], row: tuple) -> dict:
"""将数据库行转换为 dict处理特殊类型序列化。"""
result = {}
for col, val in zip(columns, row):
if isinstance(val, datetime):
result[col] = val.isoformat()
else:
result[col] = val
return result