Files

30 KiB
Raw Permalink Blame History

设计文档P5 AI 集成层miniapp-ai-integration

概述

本设计文档描述 P5-A 阶段 AI 集成层的技术架构与实现方案。系统在现有 FastAPI 后端(apps/backend/)中新增 AI 模块,通过阿里云百炼 API通义千问为 8 个 AI 应用提供统一的调用能力。

核心交付物:

  • 3 张新表(biz.ai_conversationsbiz.ai_messagesbiz.ai_cache
  • 百炼 API 统一封装层(流式 + 非流式)
  • 应用 1 SSE 流式对话端点
  • 应用 2 财务洞察Prompt 完整)+ 应用 8 维客线索整理Prompt 完整)
  • 应用 3/4/5/6/7 触发机制与调用骨架
  • 事件调度与调用链编排
  • AI 缓存读写 API

设计原则:

  • 统一封装:所有 AI 调用经 BailianClient 统一出口,便于重试、计量、日志
  • 事件驱动:复用现有 trigger_scheduler.fire_event() 机制,扩展支持串行调用链
  • 骨架优先P5-A 只实现管道和框架Prompt 细化留给 P5-B 阶段
  • site_id 隔离:所有表和查询强制 site_id 过滤

架构

系统架构图

graph TB
    subgraph 微信小程序
        MP_CHAT[对话页面]
        MP_PAGES[其他页面<br/>财务看板/任务详情/客户详情]
    end

    subgraph FastAPI 后端
        subgraph AI 模块 - apps/backend/app/ai/
            SSE[SSE 端点<br/>/api/ai/chat/stream]
            CACHE_API[缓存 API<br/>/api/ai/cache]
            HISTORY_API[历史对话 API<br/>/api/ai/conversations]
            DISPATCHER[AI Event Dispatcher<br/>调用链编排]
            BAILIAN[BailianClient<br/>百炼 API 封装]
        end

        subgraph 现有服务
            NOTE_SVC[note_service<br/>备注服务]
            TRIGGER[trigger_scheduler<br/>触发器调度]
            TASK_GEN[task_generator<br/>任务生成]
        end
    end

    subgraph 外部服务
        BAILIAN_API[阿里云百炼 API<br/>通义千问]
    end

    subgraph PostgreSQL - zqyy_app
        AI_CONV[biz.ai_conversations]
        AI_MSG[biz.ai_messages]
        AI_CACHE_T[biz.ai_cache]
        CLUE_T[member_retention_clue]
    end

    MP_CHAT -->|SSE| SSE
    MP_PAGES -->|REST| CACHE_API
    MP_CHAT -->|REST| HISTORY_API

    SSE --> BAILIAN
    DISPATCHER --> BAILIAN
    BAILIAN -->|HTTP/SSE| BAILIAN_API

    NOTE_SVC -->|备注提交事件| DISPATCHER
    TRIGGER -->|消费/任务分配事件| DISPATCHER

    DISPATCHER -->|写入| AI_CONV
    DISPATCHER -->|写入| AI_MSG
    DISPATCHER -->|写入| AI_CACHE_T
    DISPATCHER -->|全量替换 AI 线索| CLUE_T

    SSE -->|写入| AI_CONV
    SSE -->|写入| AI_MSG

事件调用链

sequenceDiagram
    participant E as 业务事件
    participant D as AI Dispatcher
    participant A3 as App3 线索
    participant A8 as App8 整理
    participant A7 as App7 客户分析
    participant A4 as App4 关系分析
    participant A5 as App5 话术

    Note over E,A5: 消费事件链(无助教)
    E->>D: consumption_event(member_id, site_id)
    D->>A3: 调用(串行)
    A3-->>D: 线索结果 → ai_cache
    D->>A8: 调用(串行)
    A8-->>D: 整合线索 → ai_cache + member_retention_clue
    D->>A7: 调用(串行)
    A7-->>D: 客户分析 → ai_cache

    Note over E,A5: 消费事件链(有助教)
    E->>D: consumption_event(member_id, assistant_id, site_id)
    D->>A3: 调用
    A3-->>D: 线索结果
    D->>A8: 调用
    A8-->>D: 整合线索
    D->>A7: 调用
    D->>A4: 调用A8 完成后)
    A4-->>D: 关系分析
    D->>A5: 调用A4 完成后)
    A5-->>D: 话术参考

    Note over E,A5: 备注事件链
    E->>D: note_event(member_id, note_id, site_id)
    D->>+A6: 调用
    Note right of A6: App6 备注分析
    A6-->>-D: 线索 + 评分
    D->>A8: 调用
    A8-->>D: 整合线索

    Note over E,A5: 任务分配事件链
    E->>D: task_assign_event(assistant_id, member_id, site_id)
    D->>A4: 调用(读已有 A8 缓存)
    A4-->>D: 关系分析
    D->>A5: 调用
    A5-->>D: 话术参考

模块目录结构

apps/backend/app/ai/
├── __init__.py
├── bailian_client.py      # 百炼 API 统一封装
├── dispatcher.py           # AI 事件调度与调用链编排
├── cache_service.py        # AI 缓存读写服务
├── conversation_service.py # 对话记录持久化服务
├── apps/
│   ├── __init__.py
│   ├── app1_chat.py        # 应用 1 通用对话
│   ├── app2_finance.py     # 应用 2 财务洞察
│   ├── app3_clue.py        # 应用 3 客户数据维客线索
│   ├── app4_analysis.py    # 应用 4 关系分析
│   ├── app5_tactics.py     # 应用 5 话术参考
│   ├── app6_note.py        # 应用 6 备注分析
│   ├── app7_customer.py    # 应用 7 客户分析
│   └── app8_consolidation.py # 应用 8 维客线索整理
├── prompts/
│   ├── __init__.py
│   ├── app2_finance_prompt.py  # 应用 2 完整 Prompt
│   └── app8_consolidation_prompt.py # 应用 8 完整 Prompt
└── schemas.py              # Pydantic 模型

apps/backend/app/routers/
├── xcx_ai_chat.py          # SSE 对话路由
└── xcx_ai_cache.py         # 缓存查询路由

组件与接口

1. BailianClient百炼 API 统一封装)

文件:apps/backend/app/ai/bailian_client.py

技术方案(基于百炼官方文档):

  • 流式调用:使用 OpenAI 兼容接口(百炼支持 OpenAI SDK 协议),stream=True 返回 SSE 事件流
  • 非流式调用stream=False,返回完整 JSON 响应
  • JSON 输出模式:通过 System Prompt 约束 + response_format={"type": "json_object"} 参数(百炼兼容 OpenAI 的 JSON mode
  • 重试策略:指数退避,最多 3 次,基础间隔 1s → 2s → 4s
  • SDK 选择:使用 openai Python SDK百炼兼容 OpenAI 协议),base_url 指向百炼端点
class BailianClient:
    """百炼 API 统一封装层。"""

    def __init__(self, api_key: str, base_url: str, model: str):
        """
        Args:
            api_key: 百炼 API Key从 BAILIAN_API_KEY 环境变量读取)
            base_url: 百炼 API 端点(从 BAILIAN_BASE_URL 环境变量读取)
            model: 模型标识(如 qwen-plus
        """

    async def chat_stream(
        self,
        messages: list[dict],
        *,
        temperature: float = 0.7,
        max_tokens: int = 2000,
    ) -> AsyncGenerator[str, None]:
        """流式调用,逐 chunk 返回文本。用于应用 1 SSE。"""

    async def chat_json(
        self,
        messages: list[dict],
        *,
        temperature: float = 0.3,
        max_tokens: int = 4000,
    ) -> tuple[dict, int]:
        """非流式调用,返回解析后的 JSON dict 和 tokens_used。
        用于应用 2-8。

        Raises:
            BailianJsonParseError: JSON 解析失败时抛出
            BailianApiError: API 调用失败(重试耗尽后)
        """

    def _inject_current_time(self, messages: list[dict]) -> list[dict]:
        """在首条消息的 content JSON 中注入 current_time 字段。"""

    async def _call_with_retry(self, **kwargs) -> Any:
        """带指数退避的重试封装。"""

环境变量(新增到 .env / .env.template

BAILIAN_API_KEY=sk-xxx
BAILIAN_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
BAILIAN_MODEL=qwen-plus

2. AI Event Dispatcher事件调度器

文件:apps/backend/app/ai/dispatcher.py

调度器负责根据业务事件编排 AI 应用调用链。与现有 trigger_scheduler 的关系:

  • trigger_scheduler.fire_event() 触发业务事件 → 调用 ai_dispatcher 对应的 handler
  • ai_dispatcher 内部管理串行调用链的执行顺序
class AIDispatcher:
    """AI 应用调用链编排器。"""

    async def handle_consumption_event(
        self,
        member_id: int,
        site_id: int,
        settle_id: int,
        assistant_id: int | None = None,
    ) -> None:
        """消费事件链App3 → App8 → App7+ App4 → App5 如有助教)。"""

    async def handle_note_event(
        self,
        member_id: int,
        site_id: int,
        note_id: int,
        note_content: str,
        noted_by_name: str,
    ) -> None:
        """备注事件链App6 → App8。"""

    async def handle_task_assign_event(
        self,
        assistant_id: int,
        member_id: int,
        site_id: int,
        task_type: str,
    ) -> None:
        """任务分配事件链App4 → App5读已有 App8 缓存)。"""

    async def _run_chain(
        self,
        chain: list[Callable],
        context: dict,
    ) -> None:
        """串行执行调用链,某步失败记录日志后继续。"""

容错策略:

  • 调用链中某个应用失败 → 记录错误日志 + 写入 ai_conversations(标记失败)
  • 后续应用使用已有缓存继续执行,不阻塞整条链
  • 整条链在后台异步执行,不阻塞业务请求

3. AI Cache Service缓存读写服务

文件:apps/backend/app/ai/cache_service.py

class AICacheService:
    """AI 缓存读写服务。"""

    def get_latest(
        self,
        cache_type: str,
        site_id: int,
        target_id: str,
    ) -> dict | None:
        """查询最新缓存记录。"""

    def get_history(
        self,
        cache_type: str,
        site_id: int,
        target_id: str,
        limit: int = 2,
    ) -> list[dict]:
        """查询历史缓存记录(按 created_at DESC用于 Prompt reference。"""

    def write_cache(
        self,
        cache_type: str,
        site_id: int,
        target_id: str,
        result_json: dict,
        triggered_by: str | None = None,
        score: int | None = None,
        expires_at: datetime | None = None,
    ) -> int:
        """写入缓存记录,返回 id。写入后异步清理超限记录。"""

    def _cleanup_excess(
        self,
        cache_type: str,
        site_id: int,
        target_id: str,
        max_count: int = 500,
    ) -> int:
        """清理超限记录,保留最近 max_count 条,返回删除数量。"""

4. Conversation Service对话记录持久化

文件:apps/backend/app/ai/conversation_service.py

class ConversationService:
    """AI 对话记录持久化服务。"""

    def create_conversation(
        self,
        user_id: int | str,
        nickname: str,
        app_id: str,
        site_id: int,
        source_page: str | None = None,
        source_context: dict | None = None,
    ) -> int:
        """创建对话记录,返回 conversation_id。
        系统自动调用时 user_id 为 'system'。"""

    def add_message(
        self,
        conversation_id: int,
        role: str,
        content: str,
        tokens_used: int | None = None,
    ) -> int:
        """添加消息记录,返回 message_id。"""

    def get_conversations(
        self,
        user_id: int,
        site_id: int,
        page: int = 1,
        page_size: int = 20,
    ) -> list[dict]:
        """查询用户历史对话列表,按时间倒序,懒加载。"""

    def get_messages(
        self,
        conversation_id: int,
    ) -> list[dict]:
        """查询对话的所有消息。"""

5. Clue Writer维客线索写入器

文件:集成在 apps/backend/app/ai/apps/app8_consolidation.py

class ClueWriter:
    """维客线索全量替换写入器。"""

    def replace_ai_clues(
        self,
        member_id: int,
        site_id: int,
        clues: list[dict],
    ) -> int:
        """全量替换该客户的 AI 来源线索。

        1. DELETE FROM member_retention_clue
           WHERE member_id = %s AND site_id = %s
             AND source IN ('ai_consumption', 'ai_note')
        2. INSERT 新线索(人工线索 source='manual' 不受影响)

        字段映射:
        - category → category
        - emoji + summary → summary如 "📅 偏好周末下午时段消费"
        - detail → detail
        - providers → recorded_by_name
        - source: 纯 App3 → ai_consumption纯 App6 → ai_note混合 → ai_consumption
        - recorded_by_assistant_id: NULL系统触发

        返回写入的线索数量。
        """

6. API 端点

6.1 SSE 对话端点

路由文件:apps/backend/app/routers/xcx_ai_chat.py

POST /api/ai/chat/stream
  Content-Type: application/json
  Accept: text/event-stream

  Request Body:
    {
      "message": "string",
      "source_page": "string",
      "page_context": {},
      "screen_content": "string"  // 页面可见内容文本化
    }

  SSE Events:
    data: {"type": "chunk", "content": "..."}
    data: {"type": "done", "conversation_id": 123, "tokens_used": 456}
    data: {"type": "error", "message": "..."}

  认证JWT Token从 xcx_auth 获取)
  隔离:从 JWT 中提取 user_id、site_id、nickname、role

6.2 历史对话 API

GET /api/ai/conversations?page=1&page_size=20
  → [{ id, app_id, source_page, created_at, first_message_preview }]

GET /api/ai/conversations/{conversation_id}/messages
  → [{ id, role, content, tokens_used, created_at }]

6.3 缓存查询 API

路由文件:apps/backend/app/routers/xcx_ai_cache.py

GET /api/ai/cache/{cache_type}?target_id=xxx
  → { id, cache_type, target_id, result_json, score, created_at }

  认证JWT Token
  隔离site_id 从 JWT 提取,强制过滤

7. 各应用骨架接口

每个应用实现统一的调用接口:

# 应用基类模式(非继承,约定接口)
async def run(
    context: dict,          # 包含 member_id, site_id, 及应用特定参数
    bailian: BailianClient,
    cache_svc: AICacheService,
    conv_svc: ConversationService,
) -> dict:
    """
    执行 AI 应用调用。

    1. 构建 Promptbuild_prompt
    2. 调用百炼 APIbailian.chat_json
    3. 写入 ai_conversations + ai_messages
    4. 写入 ai_cache
    5. 返回结果 dict
    """

骨架应用App3/4/5/6/7build_prompt 函数留接口:

def build_prompt(context: dict) -> list[dict]:
    """构建 Prompt 消息列表。

    P5-A 阶段:返回占位 Prompt标注待细化字段。
    P5-B 阶段:由对应页面 spec 补充完整 Prompt。
    """
    # TODO: P5-B 细化
    return [
        {"role": "system", "content": json.dumps({
            "task": "...",
            "current_time": "",  # BailianClient 自动注入
            # 以下字段待细化
            "data": {},
            "reference": {},
        })},
    ]

数据模型

DDLbiz.ai_conversations

CREATE TABLE IF NOT EXISTS biz.ai_conversations (
    id              BIGSERIAL    PRIMARY KEY,
    user_id         VARCHAR(50)  NOT NULL,          -- 用户 ID 或 'system'
    nickname        VARCHAR(100) NOT NULL DEFAULT '',
    app_id          VARCHAR(30)  NOT NULL,           -- app1_chat / app2_finance / ... / app8_consolidation
    site_id         BIGINT       NOT NULL,
    source_page     VARCHAR(100),                    -- 来源页面标识
    source_context  JSONB,                           -- 页面上下文 JSON
    created_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW()
);

COMMENT ON TABLE biz.ai_conversations IS 'AI 对话记录:每次 AI 调用(用户主动或系统自动)创建一条';
COMMENT ON COLUMN biz.ai_conversations.app_id IS '应用标识app1_chat / app2_finance / app3_clue / app4_analysis / app5_tactics / app6_note / app7_customer / app8_consolidation';
COMMENT ON COLUMN biz.ai_conversations.user_id IS '用户 ID系统自动调用时为 system';

CREATE INDEX IF NOT EXISTS idx_ai_conv_user_site ON biz.ai_conversations (user_id, site_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_ai_conv_app_site ON biz.ai_conversations (app_id, site_id, created_at DESC);

DDLbiz.ai_messages

CREATE TABLE IF NOT EXISTS biz.ai_messages (
    id              BIGSERIAL    PRIMARY KEY,
    conversation_id BIGINT       NOT NULL REFERENCES biz.ai_conversations(id) ON DELETE CASCADE,
    role            VARCHAR(10)  NOT NULL
        CONSTRAINT chk_ai_msg_role CHECK (role IN ('user', 'assistant', 'system')),
    content         TEXT         NOT NULL,
    tokens_used     INTEGER,                         -- 本条消息消耗的 token 数
    created_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW()
);

COMMENT ON TABLE biz.ai_messages IS 'AI 消息记录:对话中的每条消息(输入/输出/系统)';

CREATE INDEX IF NOT EXISTS idx_ai_msg_conv ON biz.ai_messages (conversation_id, created_at);

DDLbiz.ai_cache

CREATE TABLE IF NOT EXISTS biz.ai_cache (
    id              BIGSERIAL    PRIMARY KEY,
    cache_type      VARCHAR(30)  NOT NULL
        CONSTRAINT chk_ai_cache_type CHECK (
            cache_type IN (
                'app2_finance', 'app3_clue', 'app4_analysis',
                'app5_tactics', 'app6_note_analysis',
                'app7_customer_analysis', 'app8_clue_consolidated'
            )
        ),
    site_id         BIGINT       NOT NULL,
    target_id       VARCHAR(100) NOT NULL,           -- 含义因 cache_type 而异
    result_json     JSONB        NOT NULL,
    score           INTEGER,                         -- 应用 6 专用评分
    triggered_by    VARCHAR(100),                    -- 触发来源标识
    created_at      TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    expires_at      TIMESTAMPTZ                      -- 可选过期时间
);

COMMENT ON TABLE biz.ai_cache IS 'AI 应用缓存:各应用的结构化输出结果';
COMMENT ON COLUMN biz.ai_cache.target_id IS '目标 IDApp2=时间维度编码 / App3,6,7,8=member_id / App4,5={assistant_id}_{member_id}';
COMMENT ON COLUMN biz.ai_cache.score IS '评分:仅应用 6 使用1-10 分)';

CREATE INDEX IF NOT EXISTS idx_ai_cache_lookup ON biz.ai_cache (cache_type, site_id, target_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_ai_cache_cleanup ON biz.ai_cache (cache_type, site_id, target_id, created_at);

target_id 约定

cache_type target_id 格式 示例
app2_finance 时间维度编码 this_month, last_week
app3_clue member_id 12345
app4_analysis {assistant_id}_{member_id} 100_12345
app5_tactics {assistant_id}_{member_id} 100_12345
app6_note_analysis member_id 12345
app7_customer_analysis member_id 12345
app8_clue_consolidated member_id 12345

应用 2 时间维度编码

编码 含义 计算规则
this_month 本月 当前营业日所在月
last_month 上月 当前月 - 1
this_week 本周 当前营业日所在周(周一起)
last_week 上周 当前周 - 1
last_3_months 前 3 月(不含本月) 当前月 - 3 ~ 当前月 - 1
this_quarter 本季 当前营业日所在季度
last_quarter 上季 当前季度 - 1
last_6_months 近 6 月(不含本月) 当前月 - 6 ~ 当前月 - 1

营业日分界点:每日 08:00BUSINESS_DAY_START_HOUR 环境变量)。

应用输出 JSON Schema

应用 3/6 线索格式(写入 ai_cache.result_json

{
  "clues": [
    {
      "category": "消费习惯",
      "summary": "偏好周末下午时段消费",
      "detail": "近 3 个月 8 次消费中 6 次在周六/日 14:00-18:00",
      "emoji": "📅"
    }
  ]
}

应用 6 额外包含 score 字段1-10写入 ai_cache.score

应用 8 整合线索格式

{
  "clues": [
    {
      "category": "消费习惯",
      "summary": "偏好周末下午时段消费",
      "detail": "近 3 个月 8 次消费中 6 次在周六/日 14:00-18:00",
      "emoji": "📅",
      "providers": "系统,张三"
    }
  ]
}

应用 4 关系分析格式

{
  "task_description": "...",
  "action_suggestions": ["建议1", "建议2"],
  "one_line_summary": "..."
}

应用 5 话术参考格式

{
  "tactics": [
    { "scenario": "...", "script": "..." }
  ]
}

应用 7 客户分析格式

{
  "strategies": [
    { "title": "...", "content": "..." }
  ],
  "summary": "..."
}

应用 2 财务洞察格式

{
  "insights": [
    { "seq": 1, "title": "...", "body": "..." }
  ]
}

与现有表的关系

  • member_retention_clue:应用 8 的 ClueWriter 全量替换 source IN ('ai_consumption', 'ai_note') 的记录,source='manual' 的人工线索不受影响
  • biz.notes:应用 6 触发点,note_service.create_note() 中的 ai_analyze_note() 占位函数将被替换为真实调用
  • biz.trigger_jobs:新增 AI 相关的事件触发器配置(consumption_settlednote_createdtask_assigned
  • biz.coach_tasks:应用 4 触发条件之一(任务分配事件)

正确性属性

正确性属性是系统在所有合法执行路径上都应保持为真的特征或行为——本质上是对系统应做什么的形式化陈述。属性是人类可读规格与机器可验证正确性保证之间的桥梁。

Property 1: BailianClient 双模式调用一致性

For any 合法的消息列表,chat_stream 应返回非空的 chunk 序列(拼接后为完整文本),chat_json 应返回可解析的 JSON dict 和正整数 tokens_used。两种模式对相同输入都应成功返回mock API 正常响应时)。

Validates: Requirements 2.1, 2.3, 2.4

Property 2: 指数退避重试策略

For any 失败次数 n1 ≤ n ≤ max_retriesBailianClient 应在第 n 次失败后等待 base_interval × 2^(n-1) 秒后重试;当失败次数超过 max_retries 时,应抛出 BailianApiError。

Validates: Requirements 2.2

Property 3: JSON 解析失败错误处理

For any 非法 JSON 字符串作为 API 响应,chat_json 应抛出 BailianJsonParseError 而非静默返回空值或崩溃。

Validates: Requirements 2.5

Property 4: current_time 注入不变量

For any 消息列表,经 _inject_current_time 处理后,首条消息的 content解析为 JSON应包含 current_time 字段,且值为 ISO 格式的时间字符串(精确到秒),其余消息不受影响。

Validates: Requirements 2.6

Property 5: AI 调用记录持久化 round-trip

For any AI 应用调用app1-app8调用完成后(a) ai_conversations 应包含一条匹配 app_id、site_id 的记录;(b) ai_messages 应包含至少一条 role='system' 或 role='user' 的输入消息和一条 role='assistant' 的输出消息;(c) 输出消息的 tokens_used 应为正整数。

Validates: Requirements 3.2, 3.4, 3.5, 13.1, 13.2, 13.3

Property 6: 历史对话列表排序与分页

For any 用户和 site_id查询历史对话列表返回的记录应按 created_at 严格降序排列,且每页数量不超过 page_size默认 20

Validates: Requirements 3.7

Property 7: 缓存写入 round-trip

For any AI 应用app2-app8的调用结果写入 ai_cache 后,按 (cache_type, site_id, target_id) 查询最新记录应返回与写入内容一致的 result_json。

Validates: Requirements 4.7, 5.6, 6.6, 7.5, 8.6, 9.5, 10.10

Property 8: AI 应用输出 JSON 结构验证

For any AI 应用调用结果的 result_json

  • App2: 应包含 insights 数组,每项含 seq(正整数)、title(非空字符串)、body(非空字符串)
  • App3: 应包含 clues 数组,每条含 category(∈ {客户基础, 消费习惯, 玩法偏好})、summarydetailemoji
  • App4: 应包含 task_descriptionaction_suggestions(数组)、one_line_summary
  • App5: 应包含 tactics 数组
  • App6: 应包含 score1-10 整数)和 clues 数组,每条 category ∈ 6 个枚举值
  • App7: 应包含 strategies 数组(每项含 titlecontent)和 summary
  • App8: 应包含 clues 数组,每条含 category(∈ 6 个枚举值)、summarydetailemojiproviders

Validates: Requirements 4.4, 5.2, 5.3, 5.4, 6.3, 7.3, 8.2, 8.3, 8.4, 9.2, 10.4, 10.5

Property 9: Prompt reference 历史注入

For any 应用 3/4/5/6/7/8 的 Prompt 构建reference 字段应包含相关应用的缓存结果(如有),且历史记录附带 generated_at 时间戳。当缓存不存在时reference 应为空对象。

Validates: Requirements 5.8, 6.4, 6.5, 7.2, 7.4, 8.8, 9.7

Property 10: 事件调用链顺序正确性

For any 业务事件:

  • 消费事件(无助教):调用顺序严格为 App3 → App8 → App7
  • 消费事件(有助教):调用顺序严格为 App3 → App8 → {App7, App4 → App5}App7 和 App4 均在 App8 之后)
  • 备注事件:调用顺序严格为 App6 → App8
  • 任务分配事件:调用顺序严格为 App4 → App5

Validates: Requirements 11.1, 11.2, 11.3, 11.4, 11.5, 11.6

Property 11: 调用链容错不变量

For any 调用链执行过程中某个应用调用失败,后续应用应继续执行(使用已有缓存),整条链不应因单点失败而中断。失败的应用应有错误日志记录。

Validates: Requirements 11.7

Property 12: ClueWriter 全量替换不变量

For any member_id 和 site_id执行 ClueWriter.replace_ai_clues(member_id, site_id, new_clues) 后:

  • (a) 该客户的 AI 来源线索source IN ('ai_consumption', 'ai_note'))应恰好等于 new_clues 的数量
  • (b) 人工线索source='manual')的数量应与替换前完全一致
  • (c) 写入的记录中 recorded_by_assistant_id 应为 NULL
  • (d) summary 字段应为 emoji + 空格 + 原始 summary 的拼接格式

Validates: Requirements 10.7, 10.8, 10.9

Property 13: 缓存查询 site_id 隔离

For any 两个不同的 site_idA 和 B写入 site_id=A 的缓存记录后,以 site_id=B 查询应返回空结果(即使 cache_type 和 target_id 相同)。

Validates: Requirements 12.1, 12.5

Property 14: 缓存保留上限

For any (cache_type, site_id, target_id) 组合,无论写入多少条记录,清理后该组合的记录总数应 ≤ 500。

Validates: Requirements 12.3

错误处理

百炼 API 层

错误场景 处理策略
API 超时 指数退避重试(最多 3 次),超时阈值 30s
API 返回 HTTP 4xx 不重试,立即抛出 BailianApiError
API 返回 HTTP 5xx 指数退避重试
响应非法 JSON 抛出 BailianJsonParseError记录原始响应到日志
API Key 无效 不重试,抛出 BailianAuthError记录告警日志
流式连接中断 已接收的 chunk 拼接为部分回复,标记 incomplete

事件调度层

错误场景 处理策略
调用链中某应用失败 记录错误日志 + 写入失败 conversation 记录,后续应用使用已有缓存继续
数据库连接失败 整条链中止,记录错误日志
缓存查询失败 传空 reference 继续执行,不阻塞

SSE 端点层

错误场景 处理策略
用户未认证 返回 HTTP 401
消息为空 返回 HTTP 422
流式过程中百炼 API 失败 发送 {"type": "error", "message": "..."} SSE 事件
客户端断开连接 取消百炼 API 调用,清理资源

缓存服务层

错误场景 处理策略
查询无结果 返回 null/None不抛异常
写入失败 抛出异常,由调用方处理
清理超限失败 记录警告日志,不影响写入操作

ClueWriter 层

错误场景 处理策略
全量替换事务失败 回滚整个事务,保留原有线索不变
线索数据不符合 CHECK 约束 回滚事务记录错误日志category 枚举不匹配)

测试策略

属性测试Property-Based Testing

  • 测试库hypothesisPython
  • 最小迭代次数:每个属性测试 100 次
  • 测试文件位置tests/test_p5_ai_integration_properties.pyMonorepo 级)+ apps/backend/tests/test_ai_*.py(模块级)
  • 标签格式# Feature: 05-miniapp-ai-integration, Property {N}: {property_text}

每个正确性属性对应一个属性测试:

Property 测试策略 生成器
P1: 双模式调用 Mock 百炼 API验证两种模式返回格式 随机消息列表
P2: 重试策略 Mock 可控失败次数的 API 随机失败次数 (0-5)
P3: JSON 解析失败 Mock 返回非法 JSON 随机非 JSON 字符串
P4: current_time 注入 纯函数测试 随机消息列表
P5: 记录持久化 Mock 百炼 + 真实 DBtest_zqyy_app 随机 app_id、消息内容
P6: 历史列表排序 真实 DB 随机对话记录(随机时间戳)
P7: 缓存 round-trip 真实 DB 随机 cache_type、target_id、result_json
P8: 输出 JSON 结构 JSON Schema 验证 随机 AI 响应(符合各应用 schema
P9: reference 历史注入 Mock 缓存数据 随机缓存记录(含/不含历史)
P10: 调用链顺序 Mock 所有应用,记录调用序列 随机事件类型和参数
P11: 调用链容错 Mock 随机应用失败 随机失败位置
P12: ClueWriter 替换 真实 DB 随机线索列表 + 预置人工线索
P13: site_id 隔离 真实 DB 随机 site_id 对
P14: 缓存上限 真实 DB 批量写入(>500 条)

单元测试

单元测试聚焦于具体示例和边界条件,与属性测试互补:

测试范围 测试内容
表结构验证 验证 3 张表的列、类型、约束、索引(需求 1.1-1.5
App2 时间维度 验证 8 个时间维度编码的计算逻辑(营业日分界点 08:00
App2 字段映射 验证 Prompt 使用 items_sum 口径而非 consume_money
SSE 协议 验证 Content-Type: text/event-stream 和事件格式
ClueWriter 字段映射 验证 emoji+summary 拼接、source 判断逻辑
缓存 CHECK 约束 验证非法 cache_type 被拒绝
App6 评分范围 验证 score 字段存储在 ai_cache.score

集成测试

测试范围 测试内容
完整消费事件链 Mock 百炼 API验证 App3→App8→App7 全链路
备注事件链 Mock 百炼 API验证 App6→App8 全链路
note_service 集成 验证 ai_analyze_note 占位函数被替换后的调用流程
SSE 端到端 使用 httpx 的 SSE 客户端验证流式响应