涵盖(每条对应已存的审计记录): - AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生) audit: 2026-04-20__ai-module-complete.md - admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager audit: 2026-04-21__admin-web-ai-management-suite.md - App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance) audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md - App2 prewarm 全过滤器 + AI 触发器 cron reschedule audit: 2026-04-21__app2-finance-prewarm-all-filters.md migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql - AppType 联合类型对齐 + adminAiAppTypes.test.ts audit: 2026-04-30__admin_web_ai_app_type_alignment.md - DashScope tokens_used 提取修复 audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md - App3 线索完整详情 prompt audit: 2026-05-01__backend_app3_full_detail_prompt.md - Runtime Context 沙箱(5-1~5-2 主线): - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts - migration: 20260501__runtime_context_sandbox.sql - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py - database/changes: 7 份 sandbox_* 验证报告 - 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整 + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py) 合规: - .gitignore 启用 tmp/ 排除 - 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留) 待验证清单: - docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md 每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
366 lines
14 KiB
Python
366 lines
14 KiB
Python
"""DashScope Application API 统一封装层。
|
||
|
||
使用 dashscope.Application.call() 调用百炼智能体应用,
|
||
替代原 openai SDK 的通用模型 API。
|
||
|
||
- call_app_stream(): App1 流式调用,asyncio.Queue 桥接 async generator
|
||
- call_app(): App2~8 单轮调用,asyncio.to_thread() 包装
|
||
- _call_with_retry(): 指数退避重试(1s→2s→4s)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
from typing import Any, AsyncGenerator, Callable
|
||
|
||
import dashscope
|
||
from dashscope import Application
|
||
|
||
from app.ai.exceptions import (
|
||
DashScopeApiError,
|
||
DashScopeAuthError,
|
||
DashScopeJsonParseError,
|
||
DashScopeTimeoutError,
|
||
)
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _field_value(source: Any, key: str, default: Any = None) -> Any:
|
||
"""兼容 dict、DashScope DictMixin 和普通对象取字段。"""
|
||
if isinstance(source, dict):
|
||
return source.get(key, default)
|
||
return getattr(source, key, default)
|
||
|
||
|
||
def _safe_int(value: Any) -> int:
|
||
"""把 token 字段安全转换为 int,异常值按 0 处理。"""
|
||
try:
|
||
return int(value or 0)
|
||
except (TypeError, ValueError):
|
||
return 0
|
||
|
||
|
||
def _extract_tokens_used(usage: Any) -> int:
|
||
"""从 DashScope usage 多种结构中提取 tokens_used。"""
|
||
if not usage:
|
||
return 0
|
||
|
||
models = _field_value(usage, "models")
|
||
if models:
|
||
total = 0
|
||
for model_usage in models:
|
||
total += _safe_int(_field_value(model_usage, "input_tokens"))
|
||
total += _safe_int(_field_value(model_usage, "output_tokens"))
|
||
return total
|
||
|
||
total_tokens = _field_value(usage, "total_tokens")
|
||
if total_tokens is not None:
|
||
return _safe_int(total_tokens)
|
||
|
||
return (
|
||
_safe_int(_field_value(usage, "input_tokens"))
|
||
+ _safe_int(_field_value(usage, "output_tokens"))
|
||
)
|
||
|
||
|
||
class DashScopeClient:
|
||
"""DashScope Application API 统一封装层。
|
||
|
||
通过 app_id 调用百炼控制台配置的智能体应用,
|
||
充分利用云端 System Prompt 和 MCP 工具。
|
||
"""
|
||
|
||
MAX_RETRIES = 3
|
||
BASE_INTERVAL = 1 # 秒
|
||
|
||
def __init__(self, api_key: str, workspace_id: str | None = None):
|
||
"""初始化。dashscope 通过全局变量设置密钥。
|
||
|
||
Args:
|
||
api_key: DashScope API Key
|
||
workspace_id: 百炼工作空间 ID(可选)
|
||
"""
|
||
dashscope.api_key = api_key
|
||
self._workspace_id = workspace_id
|
||
|
||
async def call_app_stream(
|
||
self,
|
||
app_id: str,
|
||
prompt: str,
|
||
session_id: str | None = None,
|
||
biz_params: dict | None = None,
|
||
) -> AsyncGenerator[tuple[str, str | None], None]:
|
||
"""App1 流式调用,支持 multi-turn session_id 透传。
|
||
|
||
在线程中消费同步迭代器,通过 asyncio.Queue 桥接到 async generator。
|
||
每个 yield 返回 (text_chunk, session_id_or_none) 元组:
|
||
- 首次调用(传入 session_id=None)时,百炼在流中会返回新 session_id,
|
||
应由调用方在流结束后回写 DB。
|
||
- 后续调用传入 DB 中的 session_id 后,百炼自动关联历史上下文,
|
||
返回的 session_id 通常一致。
|
||
|
||
Args:
|
||
app_id: 百炼应用 ID
|
||
prompt: 用户输入
|
||
session_id: 百炼 session_id;首次对话传 None
|
||
biz_params: 业务参数(如 user_prompt_params)
|
||
|
||
Yields:
|
||
(text_chunk, session_id_or_none) 元组。
|
||
text_chunk 为空字符串时(例如仅承载 session_id 的心跳 chunk),
|
||
调用方应忽略文本但保留 session_id。
|
||
"""
|
||
queue: asyncio.Queue[tuple[str, str | None] | BaseException | None] = asyncio.Queue()
|
||
loop = asyncio.get_running_loop()
|
||
|
||
def _consume_in_thread() -> None:
|
||
"""在线程中消费同步迭代器,逐 chunk 放入 queue。"""
|
||
try:
|
||
call_kwargs: dict[str, Any] = {
|
||
"app_id": app_id,
|
||
"prompt": prompt,
|
||
"stream": True,
|
||
"incremental_output": True,
|
||
}
|
||
if session_id is not None:
|
||
call_kwargs["session_id"] = session_id
|
||
if biz_params is not None:
|
||
call_kwargs["biz_params"] = biz_params
|
||
if self._workspace_id is not None:
|
||
call_kwargs["workspace"] = self._workspace_id
|
||
|
||
response = Application.call(**call_kwargs)
|
||
for chunk in response:
|
||
if chunk.status_code == 200:
|
||
output = chunk.output if hasattr(chunk, "output") else {}
|
||
if isinstance(output, dict):
|
||
text = output.get("text", "") or ""
|
||
new_sid = output.get("session_id")
|
||
else:
|
||
text = getattr(output, "text", "") or ""
|
||
new_sid = getattr(output, "session_id", None)
|
||
# 文本或 session_id 任一非空都推入(心跳 chunk 也传出 session_id)
|
||
if text or new_sid:
|
||
asyncio.run_coroutine_threadsafe(
|
||
queue.put((text, new_sid)), loop
|
||
)
|
||
else:
|
||
# 非 200 状态码,构造异常传递给调用方
|
||
status = chunk.status_code
|
||
msg = getattr(chunk, "message", "") or f"状态码 {status}"
|
||
if status == 401:
|
||
err = DashScopeAuthError(msg)
|
||
else:
|
||
err = DashScopeApiError(msg, status_code=status)
|
||
asyncio.run_coroutine_threadsafe(
|
||
queue.put(err), loop
|
||
)
|
||
return
|
||
# 正常结束信号
|
||
asyncio.run_coroutine_threadsafe(queue.put(None), loop)
|
||
except Exception as exc:
|
||
# 线程内未预期异常,传递给调用方
|
||
asyncio.run_coroutine_threadsafe(
|
||
queue.put(exc), loop
|
||
)
|
||
|
||
loop.run_in_executor(None, _consume_in_thread)
|
||
|
||
while True:
|
||
item = await queue.get()
|
||
if item is None:
|
||
break
|
||
if isinstance(item, BaseException):
|
||
raise item
|
||
yield item
|
||
|
||
async def call_app(
|
||
self,
|
||
app_id: str,
|
||
prompt: str,
|
||
session_id: str | None = None,
|
||
biz_params: dict | None = None,
|
||
) -> tuple[dict, int, str | None]:
|
||
"""App2~8 单轮调用。
|
||
|
||
通过 asyncio.to_thread() 包装同步 Application.call(),
|
||
解析 response.output.text 获取 JSON 内容。
|
||
非合法 JSON 触发重试(最多 3 次),不做本地修复。
|
||
|
||
Args:
|
||
app_id: 百炼应用 ID
|
||
prompt: 后端拼好的完整数据 JSON 字符串
|
||
session_id: 百炼 session_id(可选)
|
||
biz_params: 业务参数(可选)
|
||
|
||
Returns:
|
||
(parsed_json, tokens_used, new_session_id) 元组
|
||
|
||
Raises:
|
||
DashScopeApiError: API 调用失败(重试耗尽)
|
||
DashScopeJsonParseError: JSON 解析失败(重试耗尽)
|
||
"""
|
||
call_kwargs: dict[str, Any] = {
|
||
"app_id": app_id,
|
||
"prompt": prompt,
|
||
}
|
||
if session_id is not None:
|
||
call_kwargs["session_id"] = session_id
|
||
if biz_params is not None:
|
||
call_kwargs["biz_params"] = biz_params
|
||
if self._workspace_id is not None:
|
||
call_kwargs["workspace"] = self._workspace_id
|
||
|
||
# 非合法 JSON 纯重试,最多 MAX_RETRIES 次
|
||
last_json_error: DashScopeJsonParseError | None = None
|
||
for json_attempt in range(self.MAX_RETRIES):
|
||
response = await self._call_with_retry(
|
||
Application.call, **call_kwargs
|
||
)
|
||
|
||
# 提取 output.text
|
||
raw_text: str = ""
|
||
if hasattr(response, "output"):
|
||
output = response.output
|
||
if isinstance(output, dict):
|
||
raw_text = output.get("text", "")
|
||
elif hasattr(output, "text"):
|
||
raw_text = output.text or ""
|
||
|
||
# 提取 tokens_used
|
||
# DashScope Application.call() 返回的 usage 实际结构(2026-04 验证):
|
||
# ApplicationUsage(models=[ApplicationModelUsage(model_id, input_tokens, output_tokens)])
|
||
# 旧代码只处理 dict / total_tokens 两种分支,导致该嵌套结构下 tokens_used 恒为 0
|
||
tokens_used = 0
|
||
if hasattr(response, "usage") and response.usage:
|
||
tokens_used = _extract_tokens_used(response.usage)
|
||
|
||
# 提取 new_session_id
|
||
new_session_id: str | None = None
|
||
if hasattr(response, "output") and isinstance(response.output, dict):
|
||
new_session_id = response.output.get("session_id")
|
||
|
||
# 解析 JSON
|
||
try:
|
||
parsed = json.loads(raw_text)
|
||
if isinstance(parsed, list):
|
||
# CHANGE 2026-03-23 | Prompt: App2 LLM 返回 list 而非 dict
|
||
# 百炼 LLM 有时直接返回 insights 数组而非包裹 dict,
|
||
# 自动包装为 {"insights": list} 避免无意义重试
|
||
logger.info(
|
||
"LLM 返回 list(长度 %d),自动包装为 {\"insights\": [...]}",
|
||
len(parsed),
|
||
)
|
||
parsed = {"insights": parsed}
|
||
if not isinstance(parsed, dict):
|
||
raise TypeError(f"期望 dict,实际 {type(parsed).__name__}")
|
||
return parsed, tokens_used, new_session_id
|
||
except (json.JSONDecodeError, TypeError) as e:
|
||
last_json_error = DashScopeJsonParseError(
|
||
f"JSON 解析失败(第 {json_attempt + 1}/{self.MAX_RETRIES} 次): {e}",
|
||
raw_content=raw_text,
|
||
)
|
||
logger.warning(
|
||
"Application API 返回非法 JSON(第 %d/%d 次): %s",
|
||
json_attempt + 1,
|
||
self.MAX_RETRIES,
|
||
raw_text[:500],
|
||
)
|
||
# 非合法 JSON 纯重试,不做本地修复
|
||
continue
|
||
|
||
# JSON 重试耗尽
|
||
raise last_json_error # type: ignore[misc]
|
||
|
||
async def _call_with_retry(self, func: Callable, **kwargs: Any) -> Any:
|
||
"""指数退避重试封装。
|
||
|
||
重试策略:
|
||
- 最多重试 MAX_RETRIES 次(默认 3 次)
|
||
- 间隔:BASE_INTERVAL × 2^(n-1),即 1s → 2s → 4s
|
||
- HTTP 4xx → 不重试,立即抛出(401 → DashScopeAuthError)
|
||
- HTTP 5xx / 超时 / 连接错误 → 重试
|
||
|
||
Args:
|
||
func: 同步调用函数(如 Application.call)
|
||
**kwargs: 传递给 func 的参数
|
||
|
||
Returns:
|
||
API 响应对象(status_code == 200)
|
||
|
||
Raises:
|
||
DashScopeAuthError: API Key 无效(HTTP 401)
|
||
DashScopeTimeoutError: 调用超时(重试耗尽)
|
||
DashScopeApiError: API 调用失败(重试耗尽)
|
||
"""
|
||
last_error: Exception | None = None
|
||
|
||
for attempt in range(self.MAX_RETRIES):
|
||
try:
|
||
response = await asyncio.to_thread(func, **kwargs)
|
||
except Exception as exc:
|
||
# 网络/连接/超时等底层异常 → 可重试
|
||
last_error = exc
|
||
if attempt < self.MAX_RETRIES - 1:
|
||
wait_time = self.BASE_INTERVAL * (2**attempt)
|
||
logger.warning(
|
||
"DashScope API 底层异常(第 %d/%d 次),%ds 后重试: %s",
|
||
attempt + 1,
|
||
self.MAX_RETRIES,
|
||
wait_time,
|
||
exc,
|
||
)
|
||
await asyncio.sleep(wait_time)
|
||
continue
|
||
else:
|
||
logger.error(
|
||
"DashScope API 底层异常,已达最大重试次数 %d: %s",
|
||
self.MAX_RETRIES,
|
||
exc,
|
||
)
|
||
raise DashScopeApiError(
|
||
f"DashScope API 调用失败(重试 {self.MAX_RETRIES} 次后): {exc}",
|
||
) from exc
|
||
|
||
# Application.call() 返回 response 对象,通过 status_code 判断成功/失败
|
||
status_code = getattr(response, "status_code", None)
|
||
|
||
if status_code == 200:
|
||
return response
|
||
|
||
# 非 200:根据状态码分类处理
|
||
message = getattr(response, "message", "") or f"状态码 {status_code}"
|
||
|
||
if status_code is not None and 400 <= status_code < 500:
|
||
# 4xx:不重试,立即抛出
|
||
if status_code == 401:
|
||
raise DashScopeAuthError(message)
|
||
raise DashScopeApiError(message, status_code=status_code)
|
||
|
||
# 5xx 或其他未知状态码 → 可重试
|
||
last_error = DashScopeApiError(message, status_code=status_code)
|
||
if attempt < self.MAX_RETRIES - 1:
|
||
wait_time = self.BASE_INTERVAL * (2**attempt)
|
||
logger.warning(
|
||
"DashScope API 调用失败(第 %d/%d 次,状态码 %s),%ds 后重试: %s",
|
||
attempt + 1,
|
||
self.MAX_RETRIES,
|
||
status_code,
|
||
wait_time,
|
||
message,
|
||
)
|
||
await asyncio.sleep(wait_time)
|
||
else:
|
||
logger.error(
|
||
"DashScope API 调用失败,已达最大重试次数 %d(状态码 %s): %s",
|
||
self.MAX_RETRIES,
|
||
status_code,
|
||
message,
|
||
)
|
||
|
||
# 重试耗尽
|
||
raise last_error # type: ignore[misc]
|