Files
Neo-ZQYY/apps/backend/app/ai/run_log_service.py
Neo caf179a5da feat: 2026-04-15~05-02 累积变更基线 — AI 重构 + Runtime Context + DWS 修复
涵盖(每条对应已存的审计记录):
- AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生)
  audit: 2026-04-20__ai-module-complete.md
- admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager
  audit: 2026-04-21__admin-web-ai-management-suite.md
- App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance)
  audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md
- App2 prewarm 全过滤器 + AI 触发器 cron reschedule
  audit: 2026-04-21__app2-finance-prewarm-all-filters.md
  migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql
- AppType 联合类型对齐 + adminAiAppTypes.test.ts
  audit: 2026-04-30__admin_web_ai_app_type_alignment.md
- DashScope tokens_used 提取修复
  audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md
- App3 线索完整详情 prompt
  audit: 2026-05-01__backend_app3_full_detail_prompt.md
- Runtime Context 沙箱(5-1~5-2 主线):
  - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router
  - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts
  - migration: 20260501__runtime_context_sandbox.sql
  - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py
  - database/changes: 7 份 sandbox_* 验证报告
- 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整
  + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py)

合规:
- .gitignore 启用 tmp/ 排除
- 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留)

待验证清单:
- docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md
  每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
2026-05-04 02:30:19 +08:00

217 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 运行日志服务 — biz.ai_run_logs 表的 CRUD 操作。
每次 Application API 调用前创建 pending 记录,调用过程中更新状态,
调用结束后记录结果。同时提供日/月 token 聚合查询,实现 UsageProvider 协议
以便注入 BudgetTracker。
request_prompt 写入前截断为前 2000 字符,避免大 prompt 占用过多存储。
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Callable
import psycopg2.extensions
from app.services.runtime_context import LIVE_INSTANCE_ID, MODE_LIVE, MODE_SANDBOX, get_runtime_context
# prompt 最大存储长度
# 2026-04-222000→8000。app2_finance 真实 prompt 约 4-8KB72 组合财务看板 + 中文 key 膨胀),
# 2000 字符截断会丢掉 optimization-critical 字段(如 discount_items 含团购折扣明细),
# admin-web 调用详情页无法完整审阅 → 提高到 8000 覆盖绝大部分场景
_MAX_PROMPT_LENGTH = 8000
def _truncate_prompt(prompt: str | None) -> str | None:
"""截断 prompt 为 _MAX_PROMPT_LENGTH 字符上限。None 原样返回。"""
if prompt is None:
return None
return prompt[:_MAX_PROMPT_LENGTH]
class AIRunLogService:
"""AI 运行日志 CRUD实现 UsageProvider 协议。
构造函数接受 get_conn callable每次操作时获取数据库连接
避免长期持有连接导致超时或连接池耗尽。
"""
def __init__(self, get_conn: Callable[[], psycopg2.extensions.connection]) -> None:
self._get_conn = get_conn
# ── 创建 ──────────────────────────────────────────────
def create_log(
self,
site_id: int,
app_type: str,
trigger_type: str,
*,
member_id: int | None = None,
request_prompt: str | None = None,
session_id: str | None = None,
) -> int:
"""创建日志记录status: pending返回 log_id。
request_prompt 自动截断为前 2000 字符。
"""
truncated = _truncate_prompt(request_prompt)
conn = self._get_conn()
try:
ctx = get_runtime_context(site_id, conn=conn)
runtime_mode = MODE_SANDBOX if ctx.is_sandbox else MODE_LIVE
sandbox_instance_id = ctx.sandbox_instance_id if ctx.is_sandbox else LIVE_INSTANCE_ID
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO biz.ai_run_logs
(site_id, app_type, trigger_type, member_id,
request_prompt, session_id, status,
runtime_mode, sandbox_instance_id)
VALUES (%s, %s, %s, %s, %s, %s, 'pending', %s, %s)
RETURNING id
""",
(site_id, app_type, trigger_type, member_id,
truncated, session_id, runtime_mode, sandbox_instance_id),
)
row = cur.fetchone()
assert row is not None, "INSERT RETURNING 应返回 id"
log_id: int = row[0]
conn.commit()
return log_id
except Exception:
conn.rollback()
raise
# ── 状态转换 ──────────────────────────────────────────
def update_running(self, log_id: int) -> None:
"""更新为 running。"""
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.ai_run_logs
SET status = 'running'
WHERE id = %s
""",
(log_id,),
)
conn.commit()
except Exception:
conn.rollback()
raise
def update_success(
self,
log_id: int,
response_text: str,
tokens_used: int,
latency_ms: int,
) -> None:
"""更新为 success记录响应、token 消耗和耗时。"""
now = datetime.now(timezone.utc)
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.ai_run_logs
SET status = 'success',
response_text = %s,
tokens_used = %s,
latency_ms = %s,
finished_at = %s
WHERE id = %s
""",
(response_text, tokens_used, latency_ms, now, log_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
def update_failed(
self,
log_id: int,
error_message: str,
latency_ms: int,
) -> None:
"""更新为 failed记录错误信息和耗时。"""
now = datetime.now(timezone.utc)
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.ai_run_logs
SET status = 'failed',
error_message = %s,
latency_ms = %s,
finished_at = %s
WHERE id = %s
""",
(error_message, latency_ms, now, log_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
def update_timeout(self, log_id: int, latency_ms: int) -> None:
"""更新为 timeout。"""
now = datetime.now(timezone.utc)
conn = self._get_conn()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.ai_run_logs
SET status = 'timeout',
latency_ms = %s,
finished_at = %s
WHERE id = %s
""",
(latency_ms, now, log_id),
)
conn.commit()
except Exception:
conn.rollback()
raise
# ── UsageProvider 协议实现 ────────────────────────────
def get_daily_usage(self) -> int:
"""聚合今日 token 消耗status='success'created_at 为今日)。"""
conn = self._get_conn()
with conn.cursor() as cur:
cur.execute(
"""
SELECT COALESCE(SUM(tokens_used), 0)
FROM biz.ai_run_logs
WHERE status = 'success'
AND created_at >= CURRENT_DATE
AND created_at < CURRENT_DATE + INTERVAL '1 day'
"""
)
row = cur.fetchone()
return int(row[0]) if row else 0
def get_monthly_usage(self) -> int:
"""聚合本月 token 消耗status='success'created_at 为本月)。"""
conn = self._get_conn()
with conn.cursor() as cur:
cur.execute(
"""
SELECT COALESCE(SUM(tokens_used), 0)
FROM biz.ai_run_logs
WHERE status = 'success'
AND created_at >= date_trunc('month', CURRENT_DATE)
AND created_at < date_trunc('month', CURRENT_DATE) + INTERVAL '1 month'
"""
)
row = cur.fetchone()
return int(row[0]) if row else 0