Files
Neo-ZQYY/docs/prd/specs/P14-ai-dashscope-migration.md
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

512 lines
18 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# P14AI 模块改造 — DashScope 迁移 + 调度器完善
> 状态Draft | 创建日期2026-03-21 | 依赖P5AI 集成层、RNS1.4CHAT 对齐)
> 后续P15监控后台 + 测试重建 + 回填)
---
## 一、执行摘要
当前 AI 模块使用 `openai` SDK 的通用模型 API`chat.completions.create`),但项目的 8 个 App 均为百炼控制台创建的智能体应用(各有独立 `app_id`)。通用模型 API 不接受 `app_id`,等于绕过了百炼控制台配置的 System Prompt、MCP 工具等全部能力。
本 PRD 将 SDK 从 `openai` 切换到 `dashscope`Application API一步到位完成迁移同时修复调度器、事件触发、熔断限流等核心问题。
### 问题来源
- `docs/reports/2026-03-21__ai_module_issues.md`18 个问题4 P0 / 6 P1 / 5 P2 / 3 P3
- AI 全链路测试报告 + 86 项 Gap 审查
### 改动范围
| 模块 | 改动内容 |
|------|---------|
| `apps/backend/app/ai/` | SDK 替换、客户端重写、调度器修复 |
| `apps/backend/app/services/ai/` | 缓存服务、对话服务适配 |
| `apps/backend/app/routers/` | 内部触发 API、SSE 端点适配 |
| `apps/etl/connectors/feiqiu/` | DWS 完成后 HTTP 触发 |
| `db/zqyy_app/` | DDL 迁移(新增表、字段) |
| `.env` / `.env.template` | 环境变量统一 |
---
## 二、技术方案
### 2.1 SDK 替换openai → dashscope Application API
**当前**`openai.AsyncOpenAI` + `chat.completions.create`
**目标**`dashscope.Application.call` + `asyncio.to_thread()` 包装
#### 调用方式对比
```python
# ===== 当前(错误)=====
from openai import AsyncOpenAI
client = AsyncOpenAI(api_key=key, base_url=bailian_url)
response = await client.chat.completions.create(
model=model, messages=messages, response_format={"type": "json_object"}
)
# ===== 目标(正确)=====
import dashscope
from dashscope import Application
# App1 流式调用
response = Application.call(
app_id=app_id,
prompt=user_message,
session_id=session_id, # 云端对话管理
biz_params={"user_prompt_params": {"User_ID": uid, "Role": role, "Nickname": name}},
stream=True,
incremental_output=True,
)
# App2~8 单轮调用
response = Application.call(
app_id=app_id,
prompt=data_json, # 后端拼好的完整数据 JSON
)
```
#### async 包装
`dashscope.Application.call()` 是同步方法,当前后端全 async。采用 `asyncio.to_thread()` 包装:
```python
import asyncio
async def call_application(app_id: str, prompt: str, **kwargs) -> dict:
return await asyncio.to_thread(
Application.call,
app_id=app_id,
prompt=prompt,
**kwargs
)
```
流式调用App1需特殊处理`Application.call(stream=True)` 返回同步迭代器,需在线程中消费后通过 `asyncio.Queue` 桥接到 async generator。
### 2.2 环境变量统一
**废弃**(本轮直接删除,不保留兼容):
- `BAILIAN_API_KEY`
- `BAILIAN_BASE_URL`
- `BAILIAN_MODEL`
**新增**
- `DASHSCOPE_API_KEY` — DashScope API Key
- `DASHSCOPE_WORKSPACE_ID` — 百炼工作空间 ID可选
**保留不变**(仅改前缀注释):
| 旧变量 | 新变量 | 说明 |
|--------|--------|------|
| `BAILIAN_APP_ID_1_CHAT` | `DASHSCOPE_APP_ID_1_CHAT` | App1 通用对话 |
| `BAILIAN_APP_ID_2_FINANCE` | `DASHSCOPE_APP_ID_2_FINANCE` | App2 财务洞察 |
| `BAILIAN_APP_ID_3_CLUE` | `DASHSCOPE_APP_ID_3_CLUE` | App3 维客线索 |
| `BAILIAN_APP_ID_4_ANALYSIS` | `DASHSCOPE_APP_ID_4_ANALYSIS` | App4 关系分析 |
| `BAILIAN_APP_ID_5_TACTICS` | `DASHSCOPE_APP_ID_5_TACTICS` | App5 话术参考 |
| `BAILIAN_APP_ID_6_NOTE` | `DASHSCOPE_APP_ID_6_NOTE` | App6 备注分析 |
| `BAILIAN_APP_ID_7_CUSTOMER` | `DASHSCOPE_APP_ID_7_CUSTOMER` | App7 客户分析 |
| `BAILIAN_APP_ID_8_CONSOLIDATE` | `DASHSCOPE_APP_ID_8_CONSOLIDATE` | App8 维客线索整理 |
### 2.3 App1 对话管理session_id 云端 + 本地双轨
**策略**
1. 每次 App1 调用携带 `session_id`(百炼云端管理上下文)
2. 同时将消息写入本地 `ai_messages` 表(持久化)
3. `session_id` 过期1 小时无请求)时,从本地 `ai_messages` 重建 `messages` 数组传给百炼
4. 对话复用规则不变task 入口无时限复用、customer/coach 入口 3 天时限、general 入口始终新建
**session_id 生成规则**
- 格式:`conv_{conversation_id}_{created_timestamp}`
- 存储在 `ai_conversations.session_id` 字段(新增)
**过期重建流程**
```
用户发消息 → 查 ai_conversations 获取 session_id
→ 尝试用 session_id 调用百炼
→ 成功 → 正常返回
→ 失败session 过期)→ 从 ai_messages 加载历史
→ 用 messages 数组(不带 session_id调用百炼
→ 百炼返回新 session_id → 更新本地
```
### 2.4 App2~8 单轮调用
所有非对话类应用统一为单轮 `prompt` 调用:
- 后端用 `build_prompt()` 拼好完整数据 JSON
- 通过 `prompt` 参数传入 `Application.call`
- 百炼侧 System Prompt 已在控制台配置,代码不再维护
**JSON 兜底策略**:百炼返回非合法 JSON 时,纯重试,最大 3 次。不做本地解析修复。
### 2.5 App1 参数传递
App1 使用 `biz_params.user_prompt_params` 传模板变量:
- `User_ID`:用户 ID
- `Role`角色member / assistant / admin
- `Nickname`:昵称
同时用 `prompt` 传页面上下文source_page、page_context、screen_content
App2~8 不使用 `biz_params`,数据 JSON 直接作为 `prompt` 传入。
---
## 三、熔断 / 限流 / 降级
### 3.1 熔断器
```
连续 5 次失败 → 熔断 60 秒(所有请求直接返回降级响应)
→ 60 秒后进入半开状态 → 放行 1 个请求
→ 成功 → 关闭熔断
→ 失败 → 重新熔断 60 秒
```
熔断粒度:按 `app_id` 独立计数。App1 熔断不影响 App2~8。
### 3.2 Token 预算控制
| 维度 | 默认上限 | 说明 |
|------|---------|------|
| 日预算 | 100,000 tokens | 所有 App 合计 |
| 月预算 | 2,000,000 tokens | 所有 App 合计 |
超预算时:
- App1用户对话返回友好提示"AI 服务今日额度已用完,请明天再试"
- App2~8后台任务跳过执行记录 `budget_exceeded` 状态
预算数据来源:`ai_run_logs.tokens_used` 按日/月聚合。
### 3.3 限流
- App1每用户每分钟 10 次
- App2~8每门店每小时 100 次(合计)
---
## 四、调度器修复
### 4.1 dispatcher.py asyncio 修复
当前问题:`dispatcher.py` 中存在 `asyncio.run()` 嵌套调用,在已有事件循环的 FastAPI 环境中会报错。
修复方案:
- 移除所有 `asyncio.run()` 调用
- 所有调度入口改为 `async def`
- 使用 `asyncio.create_task()` 发起异步任务
- 超时控制用 `asyncio.wait_for()`
### 4.2 事件触发打通
#### 消费事件ETL 侧发射)
```
ETL DWS 任务完成
→ HTTP POST /api/internal/ai/trigger
→ payload: { event_type: "consumption", connector_type: "feiqiu", site_id, member_id, settlement_id }
→ 后端写 ai_trigger_jobs 记录
→ 异步执行调用链App3 → App8 → App7+ App4 → App5 如有助教)
```
内部 API 设计为连接器无关接口,`connector_type` 字段标识来源,为多平台扩展预留。
#### 备注事件(后端 API 路由发射)
```
小程序助教提交备注
→ 后端 API 路由 fire_event
→ event_type: "note_created"
→ 调用链App6 → App8
```
#### 任务分配事件task_manager 自动触发)
```
task_manager 自动分配任务
→ fire_event: "task_assigned"
→ 调用链App4 → App5
```
### 4.3 App2 预生成
- 触发时机DWS 完成后ETL 通过内部 API 触发
- 门店范围:当前写死 `2790685415443269`(多门店支持记入 BACKLOG
- 时间维度8 个(今日/昨日/本周/上周/本月/上月/本季/上季)
- 每日调用量1 门店 × 8 维度 = 8 次
### 4.4 幂等与去重
| 场景 | 策略 |
|------|------|
| 自动触发 | 按 `(event_type, member_id, site_id, date)` 去重,重复事件跳过 |
| 手动重跑 | 允许强制执行,`ai_trigger_jobs.is_forced = true`,后台明显标记 "forced" |
| App8 | 强幂等DELETE + INSERT `member_retention_clue`,同一 member 同一天只执行一次 |
---
## 五、缓存策略
### 5.1 过期时间(按 App 分开)
| App | cache_type | expires_at 策略 |
|-----|-----------|----------------|
| App2 | app2_finance | 当日 23:59:59每日刷新 |
| App3 | app3_clue | 7 天 |
| App4 | app4_analysis | 7 天 |
| App5 | app5_tactics | 7 天 |
| App6 | app6_note_analysis | 30 天 |
| App7 | app7_customer_analysis | 7 天 |
| App8 | app8_clue_consolidated | 7 天 |
### 5.2 ai_cache 新增字段
```sql
ALTER TABLE biz.ai_cache ADD COLUMN status VARCHAR(20) DEFAULT 'valid'
CHECK (status IN ('valid', 'expired', 'invalidated', 'generating'));
```
- `valid`:有效缓存
- `expired`:已过期(定时任务标记)
- `invalidated`手动失效admin-web 操作)
- `generating`:正在生成中(防并发)
### 5.3 数据保留上限
| App | 保留策略 |
|-----|---------|
| App1 | 不自动删除(用户对话记录) |
| App2~8 | 每个 App 保留最新 20,000 条 `ai_cache` 记录 |
---
## 六、数据库变更
所有新表放在 `biz` schema。
### 6.1 新增表ai_run_logsAI 运行记录)
```sql
CREATE TABLE biz.ai_run_logs (
id BIGSERIAL PRIMARY KEY,
site_id BIGINT NOT NULL,
app_type VARCHAR(30) NOT NULL, -- app1_chat / app2_finance / ...
trigger_type VARCHAR(20) NOT NULL, -- user / scheduled / event / forced
member_id BIGINT, -- 关联会员(可空)
request_prompt TEXT, -- 输入 prompt截断前 2000 字符)
response_text TEXT, -- 输出全文
tokens_used INTEGER DEFAULT 0,
latency_ms INTEGER, -- 响应耗时(毫秒)
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending / running / success / failed / timeout / budget_exceeded
error_message TEXT,
session_id VARCHAR(100), -- App1 百炼 session_id
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
finished_at TIMESTAMPTZ
);
CREATE INDEX idx_ai_run_logs_site_app ON biz.ai_run_logs(site_id, app_type);
CREATE INDEX idx_ai_run_logs_created ON biz.ai_run_logs(created_at);
CREATE INDEX idx_ai_run_logs_status ON biz.ai_run_logs(status);
```
### 6.2 新增表ai_trigger_jobs调度运行记录
```sql
CREATE TABLE biz.ai_trigger_jobs (
id BIGSERIAL PRIMARY KEY,
site_id BIGINT NOT NULL,
event_type VARCHAR(30) NOT NULL, -- consumption / note_created / task_assigned / scheduled / manual
connector_type VARCHAR(30) DEFAULT 'feiqiu', -- 连接器类型(多平台预留)
member_id BIGINT,
payload JSONB, -- 事件原始数据
status VARCHAR(20) NOT NULL DEFAULT 'pending',
-- pending / running / completed / failed / skipped_duplicate / budget_exceeded
is_forced BOOLEAN DEFAULT false, -- 手动强制执行标记
app_chain VARCHAR(100), -- 执行链:如 "app3→app8→app7"
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_ai_trigger_jobs_site ON biz.ai_trigger_jobs(site_id, event_type);
CREATE INDEX idx_ai_trigger_jobs_dedup ON biz.ai_trigger_jobs(event_type, member_id, site_id, (created_at::date))
WHERE status NOT IN ('skipped_duplicate');
CREATE INDEX idx_ai_trigger_jobs_status ON biz.ai_trigger_jobs(status);
```
### 6.3 ai_conversations 新增字段
```sql
ALTER TABLE biz.ai_conversations ADD COLUMN session_id VARCHAR(100);
```
### 6.4 ai_cache 新增字段
```sql
ALTER TABLE biz.ai_cache ADD COLUMN status VARCHAR(20) DEFAULT 'valid'
CHECK (status IN ('valid', 'expired', 'invalidated', 'generating'));
```
---
## 七、内部 API 设计
### 7.1 ETL → 后端触发接口
```
POST /api/internal/ai/trigger
Authorization: Internal-Token {INTERNAL_API_TOKEN}
Content-Type: application/json
{
"event_type": "consumption", // consumption / dws_completed
"connector_type": "feiqiu",
"site_id": 2790685415443269,
"member_id": 12345, // 可选
"payload": { // 事件附加数据
"settlement_id": "xxx",
"dws_task": "DWS_MEMBER_CONSUMPTION"
}
}
Response 200:
{
"trigger_job_id": 1001,
"status": "pending"
}
```
### 7.2 认证方式
内部 API 使用独立的 `INTERNAL_API_TOKEN` 环境变量,不走 JWT。ETL 进程通过 HTTP Header 传递。
---
## 八、文件变更清单
### 需要重写的文件
| 文件 | 说明 |
|------|------|
| `apps/backend/app/ai/bailian_client.py` | 完全重写为 `dashscope_client.py` |
| `apps/backend/app/ai/dispatcher.py` | asyncio 修复 + 事件触发链 |
| `apps/backend/app/ai/config.py` | 环境变量 BAILIAN_* → DASHSCOPE_* |
### 需要新增的文件
| 文件 | 说明 |
|------|------|
| `apps/backend/app/ai/circuit_breaker.py` | 熔断器实现 |
| `apps/backend/app/ai/rate_limiter.py` | 限流器实现 |
| `apps/backend/app/ai/budget_tracker.py` | Token 预算追踪 |
| `apps/backend/app/routers/internal_ai.py` | 内部触发 API 路由 |
| `db/zqyy_app/migrations/YYYYMMDD_ai_run_logs.sql` | DDL 迁移脚本 |
### 需要修改的文件
| 文件 | 说明 |
|------|------|
| `apps/backend/app/services/ai/cache_service.py` | 新增 status 字段处理 |
| `apps/backend/app/services/ai/chat_service.py` | session_id 双轨逻辑 |
| `apps/etl/connectors/feiqiu/tasks/` | DWS 完成后 HTTP 触发 |
| `.env` / `.env.template` | 环境变量更新 |
| `pyproject.toml` | 依赖:移除 openai新增 dashscope |
---
## 九、问题修复映射
本 PRD 覆盖的问题(来自 `docs/reports/2026-03-21__ai_module_issues.md`
| 问题编号 | 优先级 | 描述 | 本 PRD 对应章节 |
|---------|--------|------|----------------|
| P0-1 | P0 | App3~8 JSON 格式化失败 | §2.4Application API 原生支持) |
| P0-2 | P0 | AI 事件未触发 | §4.2(事件触发打通) |
| P0-3 | P0 | App2 无定时机制 | §4.3App2 预生成) |
| P0-4 | P0 | ETL→AI 联动断裂 | §4.2 + §7.1(内部 API |
| P1-1 | P1 | 缓存过期未检查 | §5缓存策略 |
| P1-2 | P1 | 无限流/熔断 | §3熔断/限流/降级) |
| P1-3 | P1 | 无 Token 预算控制 | §3.2Token 预算) |
| P1-4 | P1 | dispatcher asyncio 问题 | §4.1asyncio 修复) |
| P1-5 | P1 | 前端无刷新机制 | 不在本 PRD 范围(前端改动) |
| P1-6 | P1 | FDW 数据获取不一致 | 不在本 PRD 范围(数据层) |
| P2-3 | P2 | 缓存清理过宽松 | §5.3(保留上限) |
| P2-5 | P2 | 对话记录无清理 | §5.3App1 不删,其他 2 万条) |
| P3-1 | P3 | 迁移 DashScope SDK | §2.1(核心改动) |
| P3-2 | P3 | 调度器独立化 | §4调度器修复 |
---
## 十、收尾标准流程
> 参考历史 Spec 收尾模板P5、P13、RNS1.4
### 10.1 DDL 迁移
1. 编写迁移脚本 `db/zqyy_app/migrations/YYYYMMDD_p14_ai_module.sql`
2. 在测试库 `test_zqyy_app` 执行并验证
3. 编写回滚脚本(逆序 DROP
4. 合并到 DDL 基线 `db/zqyy_app/ddl/`
### 10.2 BD 手册更新
- 更新 `docs/database/BD_Manual_ai_tables.md`:新增 `ai_run_logs``ai_trigger_jobs` 表结构
- 更新 `ai_cache``status` 字段说明
- 更新 `ai_conversations``session_id` 字段说明
### 10.3 文档同步
| 文档 | 更新内容 |
|------|---------|
| `docs/prd/ai-app-prompts.md` | 环境变量映射更新BAILIAN_* → DASHSCOPE_* |
| `apps/backend/README.md` | AI 模块架构说明更新 |
| `docs/DOCUMENTATION-MAP.md` | 新增文档条目 |
| `.env.template` | 环境变量模板更新 |
| `docs/deployment/EXPORT-PATHS.md` | 如有新输出路径则更新 |
### 10.4 属性测试
- 更新 `tests/` 下 AI 相关属性测试,适配新的 `dashscope` 调用方式
- 新增属性测试覆盖熔断器状态转换、Token 预算计算、去重逻辑
### 10.5 最终检查点
- [ ] 所有 8 个 App 通过 Application API 调用成功
- [ ] App1 流式输出正常session_id 双轨工作
- [ ] App2~8 返回合法 JSON
- [ ] 事件触发链完整消费→App3→App8→App7
- [ ] ETL → 后端内部 API 联通
- [ ] 熔断器在连续失败后正确触发
- [ ] Token 预算超限后正确降级
- [ ] 环境变量全部切换到 DASHSCOPE_*
- [ ] DDL 迁移脚本在测试库执行通过
- [ ] BD 手册已更新
- [ ] 属性测试全部通过
---
## 十一、风险与缓解
| 风险 | 影响 | 缓解措施 |
|------|------|---------|
| DashScope SDK 同步调用阻塞事件循环 | App1 响应延迟 | `asyncio.to_thread()` + 线程池大小限制 |
| session_id 过期重建增加 token 消耗 | 成本上升 | 限制重建时的历史消息条数(最近 20 条) |
| 百炼控制台 System Prompt 与代码不一致 | 输出质量下降 | 代码不再维护 System Prompt以控制台为准 |
| 环境变量一步切换导致部署遗漏 | 服务不可用 | 部署检查清单 + 启动时校验必需变量 |
| App8 写业务表member_retention_clue幂等失败 | 数据不一致 | 事务包裹 DELETE + INSERT失败自动回滚 |
---
## 十二、不在本 PRD 范围
以下内容由 P15 或后续 PRD 处理:
- admin-web AI 监控后台P15
- 全链路测试重建P15
- 历史回填P15
- 旧测试脚本归档P15
- 多门店支持BACKLOG
- 消息队列(单独 PRD
- Prompt 版本管理BACKLOG
- 前端刷新机制P1-5前端改动
- FDW 数据一致性P1-6数据层改动