Files
Neo-ZQYY/apps/backend/app/ai/dashscope_client.py
Neo caf179a5da feat: 2026-04-15~05-02 累积变更基线 — AI 重构 + Runtime Context + DWS 修复
涵盖(每条对应已存的审计记录):
- AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生)
  audit: 2026-04-20__ai-module-complete.md
- admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager
  audit: 2026-04-21__admin-web-ai-management-suite.md
- App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance)
  audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md
- App2 prewarm 全过滤器 + AI 触发器 cron reschedule
  audit: 2026-04-21__app2-finance-prewarm-all-filters.md
  migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql
- AppType 联合类型对齐 + adminAiAppTypes.test.ts
  audit: 2026-04-30__admin_web_ai_app_type_alignment.md
- DashScope tokens_used 提取修复
  audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md
- App3 线索完整详情 prompt
  audit: 2026-05-01__backend_app3_full_detail_prompt.md
- Runtime Context 沙箱(5-1~5-2 主线):
  - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router
  - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts
  - migration: 20260501__runtime_context_sandbox.sql
  - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py
  - database/changes: 7 份 sandbox_* 验证报告
- 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整
  + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py)

合规:
- .gitignore 启用 tmp/ 排除
- 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留)

待验证清单:
- docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md
  每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
2026-05-04 02:30:19 +08:00

366 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""DashScope Application API 统一封装层。
使用 dashscope.Application.call() 调用百炼智能体应用,
替代原 openai SDK 的通用模型 API。
- call_app_stream(): App1 流式调用asyncio.Queue 桥接 async generator
- call_app(): App2~8 单轮调用asyncio.to_thread() 包装
- _call_with_retry(): 指数退避重试1s→2s→4s
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any, AsyncGenerator, Callable
import dashscope
from dashscope import Application
from app.ai.exceptions import (
DashScopeApiError,
DashScopeAuthError,
DashScopeJsonParseError,
DashScopeTimeoutError,
)
logger = logging.getLogger(__name__)
def _field_value(source: Any, key: str, default: Any = None) -> Any:
"""兼容 dict、DashScope DictMixin 和普通对象取字段。"""
if isinstance(source, dict):
return source.get(key, default)
return getattr(source, key, default)
def _safe_int(value: Any) -> int:
"""把 token 字段安全转换为 int异常值按 0 处理。"""
try:
return int(value or 0)
except (TypeError, ValueError):
return 0
def _extract_tokens_used(usage: Any) -> int:
"""从 DashScope usage 多种结构中提取 tokens_used。"""
if not usage:
return 0
models = _field_value(usage, "models")
if models:
total = 0
for model_usage in models:
total += _safe_int(_field_value(model_usage, "input_tokens"))
total += _safe_int(_field_value(model_usage, "output_tokens"))
return total
total_tokens = _field_value(usage, "total_tokens")
if total_tokens is not None:
return _safe_int(total_tokens)
return (
_safe_int(_field_value(usage, "input_tokens"))
+ _safe_int(_field_value(usage, "output_tokens"))
)
class DashScopeClient:
"""DashScope Application API 统一封装层。
通过 app_id 调用百炼控制台配置的智能体应用,
充分利用云端 System Prompt 和 MCP 工具。
"""
MAX_RETRIES = 3
BASE_INTERVAL = 1 # 秒
def __init__(self, api_key: str, workspace_id: str | None = None):
"""初始化。dashscope 通过全局变量设置密钥。
Args:
api_key: DashScope API Key
workspace_id: 百炼工作空间 ID可选
"""
dashscope.api_key = api_key
self._workspace_id = workspace_id
async def call_app_stream(
self,
app_id: str,
prompt: str,
session_id: str | None = None,
biz_params: dict | None = None,
) -> AsyncGenerator[tuple[str, str | None], None]:
"""App1 流式调用,支持 multi-turn session_id 透传。
在线程中消费同步迭代器,通过 asyncio.Queue 桥接到 async generator。
每个 yield 返回 (text_chunk, session_id_or_none) 元组:
- 首次调用(传入 session_id=None百炼在流中会返回新 session_id
应由调用方在流结束后回写 DB。
- 后续调用传入 DB 中的 session_id 后,百炼自动关联历史上下文,
返回的 session_id 通常一致。
Args:
app_id: 百炼应用 ID
prompt: 用户输入
session_id: 百炼 session_id首次对话传 None
biz_params: 业务参数(如 user_prompt_params
Yields:
(text_chunk, session_id_or_none) 元组。
text_chunk 为空字符串时(例如仅承载 session_id 的心跳 chunk
调用方应忽略文本但保留 session_id。
"""
queue: asyncio.Queue[tuple[str, str | None] | BaseException | None] = asyncio.Queue()
loop = asyncio.get_running_loop()
def _consume_in_thread() -> None:
"""在线程中消费同步迭代器,逐 chunk 放入 queue。"""
try:
call_kwargs: dict[str, Any] = {
"app_id": app_id,
"prompt": prompt,
"stream": True,
"incremental_output": True,
}
if session_id is not None:
call_kwargs["session_id"] = session_id
if biz_params is not None:
call_kwargs["biz_params"] = biz_params
if self._workspace_id is not None:
call_kwargs["workspace"] = self._workspace_id
response = Application.call(**call_kwargs)
for chunk in response:
if chunk.status_code == 200:
output = chunk.output if hasattr(chunk, "output") else {}
if isinstance(output, dict):
text = output.get("text", "") or ""
new_sid = output.get("session_id")
else:
text = getattr(output, "text", "") or ""
new_sid = getattr(output, "session_id", None)
# 文本或 session_id 任一非空都推入(心跳 chunk 也传出 session_id
if text or new_sid:
asyncio.run_coroutine_threadsafe(
queue.put((text, new_sid)), loop
)
else:
# 非 200 状态码,构造异常传递给调用方
status = chunk.status_code
msg = getattr(chunk, "message", "") or f"状态码 {status}"
if status == 401:
err = DashScopeAuthError(msg)
else:
err = DashScopeApiError(msg, status_code=status)
asyncio.run_coroutine_threadsafe(
queue.put(err), loop
)
return
# 正常结束信号
asyncio.run_coroutine_threadsafe(queue.put(None), loop)
except Exception as exc:
# 线程内未预期异常,传递给调用方
asyncio.run_coroutine_threadsafe(
queue.put(exc), loop
)
loop.run_in_executor(None, _consume_in_thread)
while True:
item = await queue.get()
if item is None:
break
if isinstance(item, BaseException):
raise item
yield item
async def call_app(
self,
app_id: str,
prompt: str,
session_id: str | None = None,
biz_params: dict | None = None,
) -> tuple[dict, int, str | None]:
"""App2~8 单轮调用。
通过 asyncio.to_thread() 包装同步 Application.call()
解析 response.output.text 获取 JSON 内容。
非合法 JSON 触发重试(最多 3 次),不做本地修复。
Args:
app_id: 百炼应用 ID
prompt: 后端拼好的完整数据 JSON 字符串
session_id: 百炼 session_id可选
biz_params: 业务参数(可选)
Returns:
(parsed_json, tokens_used, new_session_id) 元组
Raises:
DashScopeApiError: API 调用失败(重试耗尽)
DashScopeJsonParseError: JSON 解析失败(重试耗尽)
"""
call_kwargs: dict[str, Any] = {
"app_id": app_id,
"prompt": prompt,
}
if session_id is not None:
call_kwargs["session_id"] = session_id
if biz_params is not None:
call_kwargs["biz_params"] = biz_params
if self._workspace_id is not None:
call_kwargs["workspace"] = self._workspace_id
# 非合法 JSON 纯重试,最多 MAX_RETRIES 次
last_json_error: DashScopeJsonParseError | None = None
for json_attempt in range(self.MAX_RETRIES):
response = await self._call_with_retry(
Application.call, **call_kwargs
)
# 提取 output.text
raw_text: str = ""
if hasattr(response, "output"):
output = response.output
if isinstance(output, dict):
raw_text = output.get("text", "")
elif hasattr(output, "text"):
raw_text = output.text or ""
# 提取 tokens_used
# DashScope Application.call() 返回的 usage 实际结构2026-04 验证):
# ApplicationUsage(models=[ApplicationModelUsage(model_id, input_tokens, output_tokens)])
# 旧代码只处理 dict / total_tokens 两种分支,导致该嵌套结构下 tokens_used 恒为 0
tokens_used = 0
if hasattr(response, "usage") and response.usage:
tokens_used = _extract_tokens_used(response.usage)
# 提取 new_session_id
new_session_id: str | None = None
if hasattr(response, "output") and isinstance(response.output, dict):
new_session_id = response.output.get("session_id")
# 解析 JSON
try:
parsed = json.loads(raw_text)
if isinstance(parsed, list):
# CHANGE 2026-03-23 | Prompt: App2 LLM 返回 list 而非 dict
# 百炼 LLM 有时直接返回 insights 数组而非包裹 dict
# 自动包装为 {"insights": list} 避免无意义重试
logger.info(
"LLM 返回 list长度 %d),自动包装为 {\"insights\": [...]}",
len(parsed),
)
parsed = {"insights": parsed}
if not isinstance(parsed, dict):
raise TypeError(f"期望 dict实际 {type(parsed).__name__}")
return parsed, tokens_used, new_session_id
except (json.JSONDecodeError, TypeError) as e:
last_json_error = DashScopeJsonParseError(
f"JSON 解析失败(第 {json_attempt + 1}/{self.MAX_RETRIES} 次): {e}",
raw_content=raw_text,
)
logger.warning(
"Application API 返回非法 JSON%d/%d 次): %s",
json_attempt + 1,
self.MAX_RETRIES,
raw_text[:500],
)
# 非合法 JSON 纯重试,不做本地修复
continue
# JSON 重试耗尽
raise last_json_error # type: ignore[misc]
async def _call_with_retry(self, func: Callable, **kwargs: Any) -> Any:
"""指数退避重试封装。
重试策略:
- 最多重试 MAX_RETRIES 次(默认 3 次)
- 间隔BASE_INTERVAL × 2^(n-1),即 1s → 2s → 4s
- HTTP 4xx → 不重试立即抛出401 → DashScopeAuthError
- HTTP 5xx / 超时 / 连接错误 → 重试
Args:
func: 同步调用函数(如 Application.call
**kwargs: 传递给 func 的参数
Returns:
API 响应对象status_code == 200
Raises:
DashScopeAuthError: API Key 无效HTTP 401
DashScopeTimeoutError: 调用超时(重试耗尽)
DashScopeApiError: API 调用失败(重试耗尽)
"""
last_error: Exception | None = None
for attempt in range(self.MAX_RETRIES):
try:
response = await asyncio.to_thread(func, **kwargs)
except Exception as exc:
# 网络/连接/超时等底层异常 → 可重试
last_error = exc
if attempt < self.MAX_RETRIES - 1:
wait_time = self.BASE_INTERVAL * (2**attempt)
logger.warning(
"DashScope API 底层异常(第 %d/%d 次),%ds 后重试: %s",
attempt + 1,
self.MAX_RETRIES,
wait_time,
exc,
)
await asyncio.sleep(wait_time)
continue
else:
logger.error(
"DashScope API 底层异常,已达最大重试次数 %d: %s",
self.MAX_RETRIES,
exc,
)
raise DashScopeApiError(
f"DashScope API 调用失败(重试 {self.MAX_RETRIES} 次后): {exc}",
) from exc
# Application.call() 返回 response 对象,通过 status_code 判断成功/失败
status_code = getattr(response, "status_code", None)
if status_code == 200:
return response
# 非 200根据状态码分类处理
message = getattr(response, "message", "") or f"状态码 {status_code}"
if status_code is not None and 400 <= status_code < 500:
# 4xx不重试立即抛出
if status_code == 401:
raise DashScopeAuthError(message)
raise DashScopeApiError(message, status_code=status_code)
# 5xx 或其他未知状态码 → 可重试
last_error = DashScopeApiError(message, status_code=status_code)
if attempt < self.MAX_RETRIES - 1:
wait_time = self.BASE_INTERVAL * (2**attempt)
logger.warning(
"DashScope API 调用失败(第 %d/%d 次,状态码 %s%ds 后重试: %s",
attempt + 1,
self.MAX_RETRIES,
status_code,
wait_time,
message,
)
await asyncio.sleep(wait_time)
else:
logger.error(
"DashScope API 调用失败,已达最大重试次数 %d(状态码 %s: %s",
self.MAX_RETRIES,
status_code,
message,
)
# 重试耗尽
raise last_error # type: ignore[misc]