Files
Neo-ZQYY/docs/specs/P14-ai-dashscope-migration/design.md
Neo 70324d8542 chore: 文档与 IDE 配置整理
- .kiro/specs/ → docs/specs/(41 个历史需求 spec 迁移,移除 .config.kiro)
- CLAUDE.md 三层拆分:根文件精简 + apps/backend/CLAUDE.md + .claude/commands/
- 新增 /spec-close、/pre-change 两个工作流命令
- DDL 基线刷新(从测试库重新导出 11 个文件,dws 35→38 表,biz 18→21 表)
- BD_Manual → BD_manual 命名统一(48 个文件)
- 修复 3 处文档与数据库不一致(auth.users.status 默认值、scheduled_tasks 字段、RLS 视图数)
- 新增 BD_manual_public_rbac_tables.md(public schema 8 张 RBAC/工作流表)
- 合并 biz.trigger_jobs 文档(10→12 字段,归档独立文档)
- docs/database/README.md 索引更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:02:37 +08:00

29 KiB
Raw Blame History

设计文档 — P14AI 模块改造 — DashScope 迁移 + 调度器完善

概述

本设计将 AI 模块从 openai SDK通用模型 API迁移到 dashscope SDKApplication API使 8 个百炼智能体应用能通过各自的 app_id 调用,充分利用百炼控制台配置的 System Prompt 和 MCP 工具。同时修复调度器 asyncio 嵌套问题、打通事件触发链、新增熔断/限流/Token 预算控制,并完成相关数据库变更。

设计决策与理由

决策 理由
asyncio.to_thread() 包装同步 SDK dashscope.Application.call() 是同步方法FastAPI 全 asyncto_thread 是标准库方案,无需引入第三方异步包装
流式调用用 asyncio.Queue 桥接 Application.call(stream=True) 返回同步迭代器,需在线程中消费后桥接到 async generator保持 SSE 端点的 async 特性
熔断器内存实现(非 Redis 单实例部署,无需分布式状态;按 app_id 独立计数,结构简单
限流器内存计数器 同上,单实例部署;滑动窗口计数器足够
Token 预算从 ai_run_logs 聚合 避免引入额外计数表,利用已有日志数据;查询频率低(每次调用前一次)
session_id 云端 + 本地双轨 百炼云端管理上下文减少 token 消耗,本地持久化保证 session 过期后可恢复
内部 API 用 INTERNAL_API_TOKEN 而非 JWT ETL 进程是内部服务,不需要用户身份;简单 token 认证足够且配置简单
App2~8 纯重试不做本地 JSON 修复 百炼控制台已配置 System Prompt 要求 JSON 输出,本地修复容易引入错误数据

架构

整体架构

graph TB
    subgraph "外部触发"
        ETL["ETL DWS 任务"]
        XCX["小程序助教"]
        TM["task_manager"]
    end

    subgraph "入口层"
        SSE["SSE 端点<br/>POST /api/xcx/chat/stream"]
        IAPI["内部触发 API<br/>POST /api/internal/ai/trigger"]
        EVT["事件发射<br/>fire_event()"]
    end

    subgraph "防护层"
        CB["CircuitBreaker<br/>按 app_id 独立"]
        RL["RateLimiter<br/>用户/门店维度"]
        BT["BudgetTracker<br/>日/月 token 预算"]
    end

    subgraph "核心层"
        DSC["DashScopeClient<br/>Application.call() 包装"]
        DISP["Dispatcher<br/>事件调度 + 调用链编排"]
    end

    subgraph "服务层"
        CONV["ConversationService<br/>session_id 双轨"]
        CACHE["AICacheService<br/>status 字段 + 过期策略"]
    end

    subgraph "存储层"
        DB_CONV["ai_conversations<br/>+session_id"]
        DB_MSG["ai_messages"]
        DB_CACHE["ai_cache<br/>+status"]
        DB_LOG["ai_run_logs<br/>新表"]
        DB_JOB["ai_trigger_jobs<br/>新表"]
        DB_CLUE["member_retention_clue"]
    end

    ETL -->|HTTP POST| IAPI
    XCX -->|API 路由| EVT
    TM -->|fire_event| EVT

    SSE --> CB --> RL --> BT --> DSC
    IAPI --> DISP
    EVT --> DISP

    DISP --> CB
    DSC -->|App1 流式| SSE
    DSC -->|App2~8 单轮| DISP

    DISP --> CONV
    DISP --> CACHE
    DSC --> DB_LOG

    CONV --> DB_CONV
    CONV --> DB_MSG
    CACHE --> DB_CACHE
    DISP --> DB_JOB
    DISP --> DB_CLUE

调用链流程

sequenceDiagram
    participant ETL as ETL DWS
    participant API as Internal AI API
    participant DISP as Dispatcher
    participant CB as CircuitBreaker
    participant RL as RateLimiter
    participant BT as BudgetTracker
    participant DSC as DashScopeClient
    participant DB as PostgreSQL

    ETL->>API: POST /api/internal/ai/trigger
    API->>DB: INSERT ai_trigger_jobs (pending)
    API-->>ETL: {trigger_job_id, status: "pending"}

    API->>DISP: asyncio.create_task(handle_event)
    DISP->>DISP: 去重检查 (event_type, member_id, site_id, date)

    loop 调用链每一步 (App3→App8→App7)
        DISP->>CB: 检查 app_id 熔断状态
        CB-->>DISP: closed/open/half_open
        DISP->>RL: 检查门店限流
        RL-->>DISP: allowed/rejected
        DISP->>BT: 检查 token 预算
        BT-->>DISP: within_budget/exceeded

        DISP->>DB: INSERT ai_run_logs (pending→running)
        DISP->>DSC: Application.call(app_id, prompt)
        DSC-->>DISP: response
        DISP->>DB: UPDATE ai_run_logs (success/failed)
        DISP->>DB: UPSERT ai_cache
    end

    DISP->>DB: UPDATE ai_trigger_jobs (completed)

组件与接口

1. DashScopeClient替代 BailianClient

文件:apps/backend/app/ai/dashscope_client.py(完全重写 bailian_client.py

class DashScopeClient:
    """DashScope Application API 统一封装层。"""

    MAX_RETRIES = 3
    BASE_INTERVAL = 1  # 秒

    def __init__(self, api_key: str, workspace_id: str | None = None):
        """初始化。dashscope 通过全局 dashscope.api_key 设置密钥。"""

    async def call_app_stream(
        self,
        app_id: str,
        prompt: str,
        session_id: str | None = None,
        biz_params: dict | None = None,
    ) -> AsyncGenerator[str, None]:
        """App1 流式调用。
        在线程中消费同步迭代器,通过 asyncio.Queue 桥接到 async generator。
        """

    async def call_app(
        self,
        app_id: str,
        prompt: str,
        session_id: str | None = None,
        biz_params: dict | None = None,
    ) -> tuple[dict, int, str | None]:
        """App2~8 单轮调用。
        返回 (parsed_json, tokens_used, new_session_id)。
        """

    async def _call_with_retry(self, func: Callable, **kwargs) -> Any:
        """指数退避重试1s→2s→4sHTTP 4xx 不重试5xx/超时/连接错误重试。"""

流式桥接实现要点:

async def call_app_stream(self, app_id, prompt, session_id=None, biz_params=None):
    queue: asyncio.Queue[str | None] = asyncio.Queue()

    def _consume_in_thread():
        """在线程中消费同步迭代器,逐 chunk 放入 queue。"""
        response = Application.call(
            app_id=app_id, prompt=prompt, session_id=session_id,
            biz_params=biz_params, stream=True, incremental_output=True,
        )
        for chunk in response:
            if chunk.status_code == 200:
                text = chunk.output.get("text", "")
                if text:
                    # 线程安全地放入 queue
                    asyncio.run_coroutine_threadsafe(queue.put(text), loop)
            else:
                raise DashScopeApiError(chunk.message, chunk.status_code)
        asyncio.run_coroutine_threadsafe(queue.put(None), loop)  # 结束信号

    loop = asyncio.get_running_loop()
    loop.run_in_executor(None, _consume_in_thread)

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

2. CircuitBreaker熔断器

文件:apps/backend/app/ai/circuit_breaker.py(新增)

class CircuitState(enum.Enum):
    CLOSED = "closed"       # 正常
    OPEN = "open"           # 熔断中
    HALF_OPEN = "half_open" # 探测中

class CircuitBreaker:
    """按 app_id 独立的熔断器。"""

    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self._breakers: dict[str, _BreakerState] = {}

    def check(self, app_id: str) -> CircuitState:
        """检查当前状态OPEN 时直接拒绝。"""

    def record_success(self, app_id: str) -> None:
        """记录成功HALF_OPEN→CLOSED。"""

    def record_failure(self, app_id: str) -> None:
        """记录失败连续达阈值→OPENHALF_OPEN 失败→重新 OPEN。"""

class _BreakerState:
    state: CircuitState
    failure_count: int
    last_failure_time: float
    last_state_change: float

3. RateLimiter限流器

文件:apps/backend/app/ai/rate_limiter.py(新增)

class RateLimiter:
    """滑动窗口内存限流器。"""

    def __init__(self):
        self._user_windows: dict[str, deque[float]] = {}   # App1: user_id → 时间戳队列
        self._store_windows: dict[str, deque[float]] = {}   # App2~8: site_id → 时间戳队列

    def check_user_rate(self, user_id: str, limit: int = 10, window_seconds: int = 60) -> bool:
        """App1 每用户每分钟限流。返回 True 表示允许。"""

    def check_store_rate(self, site_id: int, limit: int = 100, window_seconds: int = 3600) -> bool:
        """App2~8 每门店每小时限流。返回 True 表示允许。"""

4. BudgetTrackerToken 预算追踪)

文件:apps/backend/app/ai/budget_tracker.py(新增)

class BudgetTracker:
    """Token 预算追踪器,从 ai_run_logs 聚合。"""

    def __init__(
        self,
        daily_limit: int = 100_000,
        monthly_limit: int = 2_000_000,
    ):
        self.daily_limit = daily_limit
        self.monthly_limit = monthly_limit

    def check_budget(self) -> BudgetStatus:
        """检查当前预算状态。返回 BudgetStatus(allowed, daily_used, monthly_used)。"""

    def _get_daily_usage(self) -> int:
        """从 ai_run_logs 聚合今日 token 消耗。"""

    def _get_monthly_usage(self) -> int:
        """从 ai_run_logs 聚合本月 token 消耗。"""

@dataclass
class BudgetStatus:
    allowed: bool
    daily_used: int
    monthly_used: int
    reason: str | None = None  # "daily_exceeded" / "monthly_exceeded" / None

5. Dispatcher调度器重写

文件:apps/backend/app/ai/dispatcher.py(重写)

关键变更:

  • 移除所有 asyncio.run()asyncio.new_event_loop()
  • 所有入口改为 async def,用 asyncio.create_task() 发起后台任务
  • 超时用 asyncio.wait_for()
  • 集成 CircuitBreaker、RateLimiter、BudgetTracker
  • 新增 ai_trigger_jobs 记录
  • 新增去重逻辑
class AIDispatcher:
    """AI 事件调度与调用链编排器。"""

    def __init__(
        self,
        client: DashScopeClient,
        cache_svc: AICacheService,
        conv_svc: ConversationService,
        circuit_breaker: CircuitBreaker,
        rate_limiter: RateLimiter,
        budget_tracker: BudgetTracker,
    ): ...

    async def handle_trigger(self, event: TriggerEvent) -> int:
        """统一事件入口。写 ai_trigger_jobs 后异步执行调用链。返回 trigger_job_id。"""

    async def _execute_chain(self, job_id: int, event: TriggerEvent) -> None:
        """执行调用链,根据 event_type 分发。"""

    async def _check_dedup(self, event: TriggerEvent) -> bool:
        """去重检查:(event_type, member_id, site_id, date)。"""

    async def _run_step(self, app_name: str, app_id: str, prompt: str, context: dict) -> dict | None:
        """执行单步:熔断检查→限流检查→预算检查→调用→记录日志。"""

    # 事件处理器
    async def _handle_consumption(self, event: TriggerEvent) -> None: ...
    async def _handle_note(self, event: TriggerEvent) -> None: ...
    async def _handle_task_assigned(self, event: TriggerEvent) -> None: ...
    async def _handle_dws_completed(self, event: TriggerEvent) -> None: ...

6. AI Config环境变量

文件:apps/backend/app/ai/config.py(重写)

@dataclass(frozen=True)
class AIConfig:
    """AI 模块配置,从环境变量加载。"""
    api_key: str                    # DASHSCOPE_API_KEY
    workspace_id: str | None        # DASHSCOPE_WORKSPACE_ID可选
    app_id_1_chat: str              # DASHSCOPE_APP_ID_1_CHAT
    app_id_2_finance: str           # DASHSCOPE_APP_ID_2_FINANCE
    app_id_3_clue: str              # ...
    app_id_4_analysis: str
    app_id_5_tactics: str
    app_id_6_note: str
    app_id_7_customer: str
    app_id_8_consolidate: str
    internal_api_token: str         # INTERNAL_API_TOKEN

    @classmethod
    def from_env(cls) -> "AIConfig":
        """从环境变量加载,缺失必需变量时立即报错。"""

7. Internal AI API内部触发接口

文件:apps/backend/app/routers/internal_ai.py(新增)

router = APIRouter(prefix="/api/internal/ai", tags=["internal-ai"])

@router.post("/trigger")
async def trigger_ai_event(body: TriggerRequest, token: str = Depends(verify_internal_token)):
    """接收 ETL/内部事件,写 ai_trigger_jobs 后异步执行。"""

class TriggerRequest(BaseModel):
    event_type: str           # consumption / dws_completed / note_created / task_assigned
    connector_type: str = "feiqiu"
    site_id: int
    member_id: int | None = None
    payload: dict | None = None

def verify_internal_token(authorization: str = Header(...)) -> str:
    """校验 Internal-Token {token}。"""

8. SSE 端点适配

文件:apps/backend/app/routers/xcx_chat.py(修改)

变更要点:

  • _get_bailian_client()_get_dashscope_client()
  • bailian.chat_stream(messages)client.call_app_stream(app_id, prompt, session_id, biz_params)
  • _build_ai_messages() 改为构建 prompt + biz_params
  • 流结束后记录 ai_run_logs
  • 保持 SSE 事件格式不变:event: message / event: done / event: error

9. ETL 触发集成

文件:apps/etl/connectors/feiqiu/tasks/ 相关 DWS 任务(修改)

变更要点:

  • DWS 任务完成后,通过 httpx 发送 POST /api/internal/ai/trigger
  • 携带 Authorization: Internal-Token {INTERNAL_API_TOKEN} Header
  • 事件类型:dws_completed(触发 App2 预生成)或 consumption(触发消费事件链)

10. AI 运行日志服务

文件:apps/backend/app/ai/run_log_service.py(新增)

class AIRunLogService:
    """AI 运行日志 CRUD。"""

    def create_log(self, site_id: int, app_type: str, trigger_type: str, **kwargs) -> int:
        """创建日志记录status: pending返回 log_id。"""

    def update_running(self, log_id: int) -> None:
        """更新为 running。"""

    def update_success(self, log_id: int, response_text: str, tokens_used: int, latency_ms: int) -> None:
        """更新为 success。"""

    def update_failed(self, log_id: int, error_message: str, latency_ms: int) -> None:
        """更新为 failed。"""

    def update_timeout(self, log_id: int, latency_ms: int) -> None:
        """更新为 timeout。"""

    def get_daily_token_usage(self) -> int:
        """聚合今日 token 消耗。"""

    def get_monthly_token_usage(self) -> int:
        """聚合本月 token 消耗。"""

数据模型

新增表

biz.ai_run_logsAI 运行记录)

CREATE TABLE biz.ai_run_logs (
    id              BIGSERIAL PRIMARY KEY,
    site_id         BIGINT NOT NULL,
    app_type        VARCHAR(30) NOT NULL,       -- app1_chat / app2_finance / ...
    trigger_type    VARCHAR(20) NOT NULL,        -- user / scheduled / event / forced
    member_id       BIGINT,
    request_prompt  TEXT,                        -- 截断前 2000 字符
    response_text   TEXT,
    tokens_used     INTEGER DEFAULT 0,
    latency_ms      INTEGER,
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
                    -- pending / running / success / failed / timeout / budget_exceeded
    error_message   TEXT,
    session_id      VARCHAR(100),
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    finished_at     TIMESTAMPTZ
);

CREATE INDEX idx_ai_run_logs_site_app ON biz.ai_run_logs(site_id, app_type);
CREATE INDEX idx_ai_run_logs_created ON biz.ai_run_logs(created_at);
CREATE INDEX idx_ai_run_logs_status ON biz.ai_run_logs(status);

biz.ai_trigger_jobs调度运行记录

CREATE TABLE biz.ai_trigger_jobs (
    id              BIGSERIAL PRIMARY KEY,
    site_id         BIGINT NOT NULL,
    event_type      VARCHAR(30) NOT NULL,
    connector_type  VARCHAR(30) DEFAULT 'feiqiu',
    member_id       BIGINT,
    payload         JSONB,
    status          VARCHAR(20) NOT NULL DEFAULT 'pending',
                    -- pending / running / completed / failed / skipped_duplicate / budget_exceeded
    is_forced       BOOLEAN DEFAULT false,
    app_chain       VARCHAR(100),               -- 如 "app3→app8→app7"
    started_at      TIMESTAMPTZ,
    finished_at     TIMESTAMPTZ,
    error_message   TEXT,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_ai_trigger_jobs_site ON biz.ai_trigger_jobs(site_id, event_type);
CREATE INDEX idx_ai_trigger_jobs_dedup ON biz.ai_trigger_jobs(event_type, member_id, site_id, (created_at::date))
    WHERE status NOT IN ('skipped_duplicate');
CREATE INDEX idx_ai_trigger_jobs_status ON biz.ai_trigger_jobs(status);

已有表变更

biz.ai_conversations — 新增字段

ALTER TABLE biz.ai_conversations ADD COLUMN session_id VARCHAR(100);
  • 存储百炼 session_id格式 conv_{conversation_id}_{created_timestamp}
  • 仅 App1 使用

biz.ai_cache — 新增字段

ALTER TABLE biz.ai_cache ADD COLUMN status VARCHAR(20) DEFAULT 'valid'
    CHECK (status IN ('valid', 'expired', 'invalidated', 'generating'));
  • valid:有效缓存
  • expired:已过期(定时任务标记)
  • invalidated手动失效admin-web 操作)
  • generating:正在生成中(防并发读取不完整数据)

缓存过期策略

App cache_type expires_at
App2 app2_finance 当日 23:59:59
App3 app3_clue 7 天
App4 app4_analysis 7 天
App5 app5_tactics 7 天
App6 app6_note_analysis 30 天
App7 app7_customer_analysis 7 天
App8 app8_clue_consolidated 7 天

环境变量映射

旧变量 新变量 说明
BAILIAN_API_KEY DASHSCOPE_API_KEY API Key
BAILIAN_BASE_URL (删除) Application API 不需要 base_url
BAILIAN_MODEL (删除) 通过 app_id 指定应用
(新增) DASHSCOPE_WORKSPACE_ID 工作空间 ID可选
BAILIAN_APP_ID_1_CHAT DASHSCOPE_APP_ID_1_CHAT App1
BAILIAN_APP_ID_2_FINANCE DASHSCOPE_APP_ID_2_FINANCE App2
BAILIAN_APP_ID_3_CLUE DASHSCOPE_APP_ID_3_CLUE App3
BAILIAN_APP_ID_4_ANALYSIS DASHSCOPE_APP_ID_4_ANALYSIS App4
BAILIAN_APP_ID_5_TACTICS DASHSCOPE_APP_ID_5_TACTICS App5
BAILIAN_APP_ID_6_NOTE DASHSCOPE_APP_ID_6_NOTE App6
BAILIAN_APP_ID_7_CUSTOMER DASHSCOPE_APP_ID_7_CUSTOMER App7
BAILIAN_APP_ID_8_CONSOLIDATE DASHSCOPE_APP_ID_8_CONSOLIDATE App8
(新增) INTERNAL_API_TOKEN 内部 API 认证 token

正确性属性

属性Property是在系统所有合法执行路径上都应成立的特征或行为——本质上是对"系统应该做什么"的形式化陈述。属性是人类可读规格说明与机器可验证正确性保证之间的桥梁。

Property 1: 重试策略正确性

对于任意 DashScopeClient 调用和任意错误类型序列HTTP 4xx 错误应立即抛出不重试HTTP 5xx/超时/连接错误应最多重试 3 次(间隔 1s→2s→4s非合法 JSON 响应应触发重试而非本地修复。

Validates: Requirements 1.5, 1.6

Property 2: 环境变量校验完整性

对于任意 必需环境变量(DASHSCOPE_API_KEY 和 8 个 DASHSCOPE_APP_ID_*)的子集缺失,AIConfig.from_env() 应抛出异常,不返回包含空字符串的配置对象。

Validates: Requirements 2.5

Property 3: session_id 格式不变量

对于任意 conversation_id正整数和 created_timestamp生成的 session_id 应匹配正则 ^conv_\d+_\d+$,且从 session_id 可以反解出原始 conversation_id。

Validates: Requirements 3.1

Property 4: 对话复用规则

对于任意 入口类型和时间差组合对话复用决策应满足task 入口始终复用已有对话、customer/coach 入口在 3 天内复用、general 入口始终新建。

Validates: Requirements 3.6

Property 5: 熔断器 app_id 隔离

对于任意 两个不同的 app_id对其中一个记录任意次数的失败不应改变另一个 app_id 的熔断状态。

Validates: Requirements 5.1

Property 6: 熔断器状态机转换

对于任意 app_id 和任意成功/失败事件序列,熔断器状态转换应满足:连续 5 次失败→OPENOPEN 经过 60 秒→HALF_OPENHALF_OPEN + 成功→CLOSEDHALF_OPEN + 失败→OPEN。且 CLOSED 状态下任何成功事件应重置失败计数。

Validates: Requirements 5.2, 5.3, 5.4, 5.5

Property 7: 限流器窗口控制

对于任意 用户 ID 或门店 ID在滑动窗口内App1: 60 秒/10 次App2~8: 3600 秒/100 次)请求次数未超过阈值时应允许,超过阈值时应拒绝;窗口外的历史请求不影响当前判断。

Validates: Requirements 6.1, 6.2

Property 8: Token 预算检查正确性

对于任意 一组 ai_run_logs 记录(含 tokens_usedcreated_at),日聚合应等于当日所有 status='success' 记录的 tokens_used 之和,月聚合应等于当月之和;当日聚合 ≥ 100,000 或月聚合 ≥ 2,000,000 时,check_budget() 应返回 allowed=false

Validates: Requirements 7.1, 7.3

Property 9: 事件类型到调用链映射

对于任意 事件类型Dispatcher 应执行正确的调用链:consumption(无助教)→ App3→App8→App7consumption(有助教)→ App3→App8→App7 + App4→App5note_created → App6→App8task_assigned → App4→App5dws_completed → App28 个时间维度)。

Validates: Requirements 9.1, 9.2, 9.3, 11.1

Property 10: 调用链容错不中断

对于任意 调用链和任意失败步骤位置,该步骤失败后链中后续步骤仍应继续执行(使用已有缓存),不中断整条链。

Validates: Requirements 9.4

Property 11: 内部 API 认证

对于任意 HTTP 请求到 /api/internal/ai/trigger,当 Authorization Header 缺失或 token 不匹配 INTERNAL_API_TOKEN 时应返回 HTTP 401token 匹配时应正常处理请求。

Validates: Requirements 10.2, 10.3

Property 12: 事件去重与强制执行

对于任意 两个具有相同 (event_type, member_id, site_id, date) 的自动触发事件,第二个应被跳过(skipped_duplicate);但当 is_forced=true 时,即使存在重复也应正常执行。

Validates: Requirements 12.1, 12.2

Property 13: App8 幂等写入

对于任意 member_id 和日期,多次执行 App8 写入 member_retention_clue 后,该 member 该天的记录数应恒为 1DELETE + INSERT 事务保证)。

Validates: Requirements 12.3

Property 14: 缓存过期策略正确性

对于任意 App 类型和写入时间,缓存记录的 expires_at 应匹配该 App 的过期策略App2 为当日 23:59:59、App3/4/5/7/8 为写入时间 + 7 天、App6 为写入时间 + 30 天。

Validates: Requirements 13.1, 11.3

Property 15: 缓存查询过滤

对于任意 缓存数据集(包含各种 statusexpires_at 值),查询结果应仅包含 status='valid'expires_at > now()expires_at IS NULL 的记录;generatingexpiredinvalidated 状态的记录不应出现在查询结果中。

Validates: Requirements 13.4

Property 16: 缓存保留上限

对于任意 App 类型App2~8和任意数量的缓存写入清理后每个 App 的 ai_cache 记录数不超过 20,000 条App1 对话记录不受此限制。

Validates: Requirements 13.5

Property 17: SSE 事件流格式

对于任意 流式对话输出序列包括正常完成和中途错误SSE 事件流应由零或多个 event: message 事件组成,最终以恰好一个 event: done(正常)或 event: error(异常)事件结束。

Validates: Requirements 15.2

Property 18: AI 运行日志状态机

对于任意 AI 调用(成功/失败/超时/预算超限),ai_run_logs 记录的状态转换应满足:正常调用经历 pending→running→success/failed/timeout;预算超限直接创建 budget_exceeded 状态。且 success 状态的记录必须包含 response_texttokens_usedlatency_msfinished_atfailed 状态必须包含 error_message

Validates: Requirements 16.1, 16.2, 16.3, 16.4, 16.5

Property 19: Prompt 截断不变量

对于任意 长度的 prompt 字符串,存储到 ai_run_logs.request_prompt 后的长度不超过 2000 字符;长度 ≤ 2000 的 prompt 应完整保留。

Validates: Requirements 16.6

Property 20: Application API 响应解析

对于任意 合法的 Application.call() 响应对象(response.output.text 为 JSON 字符串),解析应正确提取并返回等价的 Python dictresponse.output.text 为非法 JSON 时应触发重试。

Validates: Requirements 4.4

错误处理

异常层级

DashScopeError (基类)
├── DashScopeApiError          — Application API 调用失败(重试耗尽)
│   ├── DashScopeAuthError     — API Key 无效HTTP 401
│   └── DashScopeTimeoutError  — 调用超时
├── DashScopeJsonParseError    — 响应 JSON 解析失败(重试耗尽)
├── CircuitOpenError           — 熔断器处于 OPEN 状态
├── RateLimitExceededError     — 限流阈值超限
└── BudgetExceededError        — Token 预算超限

错误处理策略

错误类型 App1用户对话 App2~8后台任务
DashScopeApiError (5xx) SSE event: error + 友好提示 记录 failedai_run_logs,链继续
DashScopeAuthError (401) SSE event: error + "AI 服务配置异常" 记录 failed,链中断(全局配置问题)
DashScopeTimeoutError SSE event: error + "AI 响应超时" 记录 timeout,链继续
DashScopeJsonParseError 不适用App1 不解析 JSON 重试 3 次后记录 failed,链继续
CircuitOpenError SSE event: error + "AI 服务暂时不可用" 记录 circuit_open,跳过该步骤
RateLimitExceededError HTTP 429 + "请求过于频繁" 记录 rate_limited,跳过执行
BudgetExceededError SSE event: error + "AI 额度已用完" 记录 budget_exceeded,跳过执行
session_id 过期 从本地加载历史重建,对用户透明 不适用App2~8 无 session
App8 幂等写入失败 不适用 事务回滚,记录错误到 ai_trigger_jobs

降级策略

  • 熔断状态下App1 返回友好提示App2~8 跳过执行,前端展示已有缓存
  • 预算超限App1 返回"额度已用完"App2~8 跳过,记录 budget_exceeded
  • 限流超限App1 返回"请求过于频繁"App2~8 跳过,记录 rate_limited
  • 调用链某步失败:记录错误,后续步骤使用已有缓存继续

测试策略

属性测试Property-Based Testing

使用 hypothesis 库(项目已有依赖),每个属性测试最少运行 100 次迭代。

测试文件位于 tests/ 目录Monorepo 级属性测试),按组件分文件:

测试文件 覆盖属性
tests/test_circuit_breaker_props.py Property 5隔离、Property 6状态机
tests/test_rate_limiter_props.py Property 7窗口控制
tests/test_budget_tracker_props.py Property 8预算检查
tests/test_dispatcher_props.py Property 9调用链映射、Property 10容错、Property 12去重
tests/test_cache_service_props.py Property 14过期策略、Property 15查询过滤、Property 16保留上限
tests/test_dashscope_client_props.py Property 1重试、Property 20响应解析
tests/test_ai_config_props.py Property 2环境变量校验
tests/test_session_props.py Property 3session_id 格式、Property 4对话复用
tests/test_run_log_props.py Property 18日志状态机、Property 19Prompt 截断)
tests/test_sse_props.py Property 17SSE 格式)

每个属性测试必须包含注释标签:

# Feature: P14-ai-dashscope-migration, Property 6: 熔断器状态机转换
@given(st.lists(st.sampled_from(["success", "failure"]), min_size=1, max_size=50))
def test_circuit_breaker_state_machine(events):
    ...

单元测试

单元测试覆盖属性测试不适合的场景:

测试文件 覆盖内容
tests/test_internal_ai_api.py Property 11认证、API 端点集成
tests/test_app8_idempotent.py Property 13幂等写入、事务回滚
tests/test_sse_endpoint.py SSE 端点集成、错误事件
tests/test_session_recovery.py session_id 过期恢复流程Requirements 3.5
tests/test_app2_pregenerate.py App2 预生成 8 个时间维度Requirements 11.2

测试配置

# conftest.py 中的 hypothesis 配置
from hypothesis import settings
settings.register_profile("ci", max_examples=200, deadline=10000)
settings.register_profile("dev", max_examples=100, deadline=5000)

测试依赖

  • 属性测试:hypothesis(已有)
  • Mockunittest.mock(标准库)
  • DashScope 调用mock Application.call(),不依赖真实 API
  • 数据库:使用 test_zqyy_app 测试库(遵循 testing-env.md 规范)