Files
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

1064 lines
43 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 设计文档 — P15AI 监控后台 + 测试重建 + 回填
> 权威参考:**实施过程中如遇细节不明确,应优先查阅 PRD 原文 `docs/prd/specs/P15-ai-monitoring-testing.md`**
## 概述
本设计覆盖六个模块:
- **模块 A**:后端 API13 个端点,`/api/admin/ai/*`
- **模块 B**前端页面4 个 admin-web 页面Dashboard、调度状态、调用明细、手动操作
- **模块 C**:测试体系重建(旧脚本归档 + 15 场景全链路测试 + 7 个属性测试更新)
- **模块 D**:历史数据回填(`scripts/ops/ai_backfill.py`,分批 + 断点续跑)
- **模块 E**:数据保留与清理(定时任务,每日 03:00
- **模块 F**查询优化与文档收尾BRIN 索引 + 文档同步)
### 设计决策与理由
| 决策 | 理由 |
|------|------|
| 告警状态用 `ai_run_logs.alert_status` 字段而非独立告警表 | 告警来源就是 `ai_run_logs` 中的失败/超时/熔断记录,独立表需要同步维护且增加 JOIN 开销;单字段方案查询简单、无数据一致性风险 |
| `alert_status` 默认 `NULL`(非告警记录)而非 `pending` | 绝大多数记录是成功的NULL 避免无意义的存储和索引膨胀;仅 status IN ('failed','timeout','circuit_open') 的记录才设置 `alert_status='pending'` |
| 批量执行用 `batch_id` + 内存缓存而非持久化表 | 批量操作是低频管理操作batch_id 有效期短10 分钟),无需持久化;内存 dict 足够,单实例部署无分布式问题 |
| 清理定时任务复用现有 `Scheduler` + `scheduled_tasks` | 后端已有调度器基础设施P14新增一条 `scheduled_tasks` 记录即可,无需引入额外定时框架 |
| BRIN 索引而非 B-tree 索引用于 `ai_run_logs.created_at` | `ai_run_logs` 按时间顺序插入BRIN 索引体积小(约 B-tree 的 1/100适合时间范围扫描的 Dashboard 聚合查询 |
| 全链路测试 Mock + 真实 API 双轨 | Mock 模式保证 CI 稳定性和速度;真实 API 模式用于集成验证,通过环境变量 `AI_TEST_MODE` 切换 |
| 回填脚本独立运行而非通过 admin-web 触发 | 回填是一次性操作脚本可断点续跑、可在终端监控进度admin-web 的 batch-run API 用于日常小批量操作 |
| 前端 4 个页面各自独立文件而非 Tab 切换 | 每个页面功能独立、数据源不同,独立路由便于直接链接和浏览器历史导航 |
## 架构
### 整体架构
```mermaid
graph TB
subgraph "admin-web 前端"
DASH["Dashboard 页面<br/>统计卡片+趋势图+饼图+预算+告警"]
SCHED["调度状态页面<br/>分页表格+筛选+去重统计"]
LOGS["调用明细页面<br/>分页表格+详情抽屉"]
OPS["手动操作页面<br/>重跑+缓存失效+批量+告警"]
end
subgraph "后端 API 层"
ROUTER["admin_ai.py<br/>13 个端点<br/>/api/admin/ai/*"]
end
subgraph "服务层"
ADMIN_SVC["AdminAIService<br/>聚合查询+批量操作+告警管理"]
end
subgraph "P14 已有基础设施(复用)"
DISP["AIDispatcher<br/>事件调度+调用链"]
CB["CircuitBreaker"]
RL["RateLimiter"]
BT["BudgetTracker"]
LOG_SVC["AIRunLogService"]
CACHE_SVC["AICacheService"]
end
subgraph "存储层"
DB_LOG["ai_run_logs<br/>+alert_status 字段"]
DB_JOB["ai_trigger_jobs"]
DB_CACHE["ai_cache"]
DB_CLUE["member_retention_clue"]
end
subgraph "定时任务"
CLEANUP["数据清理任务<br/>每日 03:00"]
end
subgraph "运维脚本"
BACKFILL["ai_backfill.py<br/>历史数据回填"]
end
DASH --> ROUTER
SCHED --> ROUTER
LOGS --> ROUTER
OPS --> ROUTER
ROUTER --> ADMIN_SVC
ADMIN_SVC --> LOG_SVC
ADMIN_SVC --> CACHE_SVC
ADMIN_SVC --> DISP
ADMIN_SVC --> BT
ADMIN_SVC --> DB_LOG
ADMIN_SVC --> DB_JOB
ADMIN_SVC --> DB_CACHE
CLEANUP --> DB_LOG
CLEANUP --> DB_JOB
CLEANUP --> DB_CACHE
BACKFILL --> DISP
BACKFILL --> DB_CLUE
```
### 成本二次确认流程
```mermaid
sequenceDiagram
participant Admin as 管理员
participant FE as admin-web
participant API as /api/admin/ai
participant SVC as AdminAIService
participant MEM as 内存 batch_store
participant DISP as AIDispatcher
Admin->>FE: 选择 App + 会员 + 门店
FE->>API: POST /batch-run {app_types, member_ids, site_id}
API->>SVC: estimate_batch()
SVC-->>API: {batch_id, estimated_calls, estimated_tokens}
API-->>FE: 返回预估信息
SVC->>MEM: 存储 batch_id → paramsTTL 10min
FE->>Admin: 弹窗:"将执行 N 次调用,预估 M tokens"
Admin->>FE: 点击"确认执行"
FE->>API: POST /batch-run/confirm {batch_id}
API->>SVC: confirm_batch(batch_id)
SVC->>MEM: 取出并删除 batch_id
SVC->>DISP: asyncio.create_task(execute_batch)
API-->>FE: {status: "started"}
```
### 数据清理流程
```mermaid
flowchart TD
A[定时任务触发<br/>每日 03:00] --> B[删除 90 天前<br/>ai_run_logs]
B --> C[删除 90 天前<br/>ai_trigger_jobs]
C --> D{遍历每个 App 类型<br/>App2~App8}
D --> E[统计该 App 的<br/>ai_cache 记录数]
E --> F{记录数 > 20000?}
F -->|是| G[删除最旧的超出记录]
F -->|否| H[跳过]
G --> D
H --> D
D -->|全部完成| I[记录清理结果到日志]
```
## 组件与接口
### 模块 A后端 API
#### 1. admin_ai.py — AI 监控路由(新建)
文件:`apps/backend/app/routers/admin_ai.py`
```python
router = APIRouter(prefix="/api/admin/ai", tags=["admin-ai"])
# ---- Dashboard ----
@router.get("/dashboard")
async def get_dashboard(site_id: int | None = None, user=Depends(require_admin)) -> DashboardResponse:
"""总览统计:今日调用/成功率/Token/延迟 + 7天趋势 + App分布 + 预算 + 告警 + App健康。"""
# ---- 调度任务 ----
@router.get("/trigger-jobs")
async def list_trigger_jobs(
event_type: str | None = None, status: str | None = None,
site_id: int | None = None, date_from: str | None = None, date_to: str | None = None,
page: int = 1, page_size: int = 20, user=Depends(require_admin),
) -> TriggerJobListResponse:
"""调度任务分页列表 + 今日去重统计。"""
@router.get("/trigger-jobs/{job_id}")
async def get_trigger_job(job_id: int, user=Depends(require_admin)) -> TriggerJobDetailResponse:
"""调度任务详情(含 payload、error_message"""
@router.post("/trigger-jobs/{job_id}/retry")
async def retry_trigger_job(job_id: int, user=Depends(require_admin)) -> RetryResponse:
"""手动重跑:创建新 trigger_jobis_forced=true异步执行。"""
# ---- 调用记录 ----
@router.get("/run-logs")
async def list_run_logs(
app_type: str | None = None, status: str | None = None, trigger_type: str | None = None,
site_id: int | None = None, date_from: str | None = None, date_to: str | None = None,
page: int = 1, page_size: int = 20, user=Depends(require_admin),
) -> RunLogListResponse:
"""调用记录分页列表。"""
@router.get("/run-logs/{log_id}")
async def get_run_log(log_id: int, user=Depends(require_admin)) -> RunLogDetailResponse:
"""调用记录详情(含完整 prompt/response/error不脱敏"""
# ---- 缓存管理 ----
@router.post("/cache/invalidate")
async def invalidate_cache(body: CacheInvalidateRequest, user=Depends(require_admin)) -> CacheInvalidateResponse:
"""批量缓存失效:将匹配条件的 ai_cache.status 设为 invalidated。"""
# ---- Token 预算 ----
@router.get("/budget")
async def get_budget(user=Depends(require_admin)) -> BudgetResponse:
"""Token 预算使用情况:日/月已用量、上限、百分比。"""
# ---- 批量执行 ----
@router.post("/batch-run")
async def create_batch_run(body: BatchRunRequest, user=Depends(require_admin)) -> BatchRunEstimate:
"""创建批量执行请求,返回预估(不立即执行)。"""
@router.post("/batch-run/confirm")
async def confirm_batch_run(body: BatchRunConfirm, user=Depends(require_admin)) -> BatchRunConfirmResponse:
"""确认批量执行,后台异步执行。"""
# ---- 告警管理 ----
@router.get("/alerts")
async def list_alerts(
alert_status: str | None = None, site_id: int | None = None,
page: int = 1, page_size: int = 20, user=Depends(require_admin),
) -> AlertListResponse:
"""告警列表ai_run_logs WHERE status IN ('failed','timeout','circuit_open'))。"""
@router.post("/alerts/{log_id}/ack")
async def ack_alert(log_id: int, user=Depends(require_admin)) -> AlertActionResponse:
"""确认告警alert_status → acknowledged。"""
@router.post("/alerts/{log_id}/ignore")
async def ignore_alert(log_id: int, user=Depends(require_admin)) -> AlertActionResponse:
"""忽略告警alert_status → ignored。"""
```
#### 2. admin_ai Schema新建
文件:`apps/backend/app/schemas/admin_ai.py`
```python
# ---- Dashboard ----
class DashboardResponse(BaseModel):
today_calls: int
today_success_rate: float # 0.0 ~ 1.0
today_tokens: int
today_avg_latency_ms: float
trend_7d: list[DailyTrend] # 近 7 天按日聚合
app_distribution: list[AppDistItem] # 各 App 调用占比
budget: BudgetInfo # 日/月预算进度
recent_alerts: list[AlertItem] # 最近告警
app_health: list[AppHealthItem] # 各 App 最近调用状态
class DailyTrend(BaseModel):
date: str # YYYY-MM-DD
calls: int
success_rate: float
class AppDistItem(BaseModel):
app_type: str
count: int
percentage: float
class BudgetInfo(BaseModel):
daily_used: int
daily_limit: int
daily_pct: float
monthly_used: int
monthly_limit: int
monthly_pct: float
class AlertItem(BaseModel):
id: int
app_type: str
status: str # failed / timeout / circuit_open
alert_status: str | None # pending / acknowledged / ignored
error_message: str | None
created_at: str
class AppHealthItem(BaseModel):
app_type: str
last_status: str | None
last_call_at: str | None
# ---- 调度任务 ----
class TriggerJobListResponse(BaseModel):
items: list[TriggerJobItem]
total: int
page: int
page_size: int
today_skipped_duplicates: int # 今日去重跳过数
class TriggerJobItem(BaseModel):
id: int
event_type: str
member_id: int | None
status: str
app_chain: str | None
is_forced: bool
site_id: int
started_at: str | None
finished_at: str | None
created_at: str
class TriggerJobDetailResponse(TriggerJobItem):
payload: dict | None
error_message: str | None
connector_type: str
class RetryResponse(BaseModel):
trigger_job_id: int
status: str # "pending"
# ---- 调用记录 ----
class RunLogListResponse(BaseModel):
items: list[RunLogItem]
total: int
page: int
page_size: int
class RunLogItem(BaseModel):
id: int
app_type: str
trigger_type: str
member_id: int | None
tokens_used: int
latency_ms: int | None
status: str
site_id: int
created_at: str
class RunLogDetailResponse(RunLogItem):
request_prompt: str | None # 完整 prompt不截断
response_text: str | None # 完整 response
error_message: str | None
session_id: str | None
finished_at: str | None
# ---- 缓存失效 ----
class CacheInvalidateRequest(BaseModel):
site_id: int # 必填
app_type: str | None = None # 可选
member_id: int | None = None # 可选
class CacheInvalidateResponse(BaseModel):
affected_count: int
# ---- Token 预算 ----
class BudgetResponse(BaseModel):
daily_used: int
daily_limit: int
daily_pct: float
monthly_used: int
monthly_limit: int
monthly_pct: float
# ---- 批量执行 ----
class BatchRunRequest(BaseModel):
app_types: list[str]
member_ids: list[int]
site_id: int
class BatchRunEstimate(BaseModel):
batch_id: str
estimated_calls: int
estimated_tokens: int
class BatchRunConfirm(BaseModel):
batch_id: str
class BatchRunConfirmResponse(BaseModel):
status: str # "started"
# ---- 告警 ----
class AlertListResponse(BaseModel):
items: list[AlertItem]
total: int
page: int
page_size: int
class AlertActionResponse(BaseModel):
id: int
alert_status: str
```
#### 3. AdminAIService新建
文件:`apps/backend/app/services/ai/admin_service.py`
```python
class AdminAIService:
"""AI 监控后台聚合服务。"""
def __init__(self, dispatcher: AIDispatcher, budget_tracker: BudgetTracker):
self._dispatcher = dispatcher
self._budget = budget_tracker
self._batch_store: dict[str, BatchInfo] = {} # batch_id → params + 过期时间
# ---- Dashboard ----
async def get_dashboard(self, site_id: int | None = None) -> dict:
"""聚合 Dashboard 所有数据今日统计、7天趋势、App分布、预算、告警、App健康。"""
async def _get_today_stats(self, site_id: int | None) -> dict:
"""今日调用次数、成功率、Token 消耗、平均延迟。"""
async def _get_7d_trend(self, site_id: int | None) -> list[dict]:
"""近 7 天按日聚合。"""
async def _get_app_distribution(self, site_id: int | None) -> list[dict]:
"""各 App 调用占比。"""
async def _get_app_health(self, site_id: int | None) -> list[dict]:
"""各 App 最近一次调用状态。"""
# ---- 调度任务 ----
async def list_trigger_jobs(self, filters: dict, page: int, page_size: int) -> dict:
"""分页查询 ai_trigger_jobs + 今日去重统计。"""
async def get_trigger_job(self, job_id: int) -> dict:
"""单条调度任务详情。"""
async def retry_trigger_job(self, job_id: int) -> int:
"""手动重跑:创建新 trigger_jobis_forced=true调用 dispatcher 异步执行。返回新 job_id。"""
# ---- 调用记录 ----
async def list_run_logs(self, filters: dict, page: int, page_size: int) -> dict:
"""分页查询 ai_run_logs。"""
async def get_run_log(self, log_id: int) -> dict:
"""单条调用记录详情(含完整 prompt/response"""
# ---- 缓存管理 ----
async def invalidate_cache(self, site_id: int, app_type: str | None, member_id: int | None) -> int:
"""批量缓存失效,返回受影响记录数。"""
# ---- Token 预算 ----
async def get_budget(self) -> dict:
"""Token 预算使用情况。"""
# ---- 批量执行 ----
async def estimate_batch(self, app_types: list[str], member_ids: list[int], site_id: int) -> dict:
"""计算预估,生成 batch_id 存入内存TTL 10min"""
async def confirm_batch(self, batch_id: str) -> None:
"""确认批量执行,取出 batch_id 参数,异步执行。"""
def _cleanup_expired_batches(self) -> None:
"""清理过期的 batch_id。"""
# ---- 告警管理 ----
async def list_alerts(self, alert_status: str | None, site_id: int | None, page: int, page_size: int) -> dict:
"""告警列表ai_run_logs WHERE status IN ('failed','timeout','circuit_open')。"""
async def ack_alert(self, log_id: int) -> str:
"""确认告警UPDATE ai_run_logs SET alert_status='acknowledged'"""
async def ignore_alert(self, log_id: int) -> str:
"""忽略告警UPDATE ai_run_logs SET alert_status='ignored'"""
```
#### 4. 数据清理服务(新建)
文件:`apps/backend/app/services/ai/cleanup_service.py`
```python
class AICleanupService:
"""AI 数据清理服务,由定时任务调用。"""
RETENTION_DAYS = 90
CACHE_LIMIT_PER_APP = 20_000
CACHE_APP_TYPES = [
"app2_finance", "app3_clue", "app4_analysis", "app5_tactics",
"app6_note_analysis", "app7_customer_analysis", "app8_clue_consolidated",
]
async def run_cleanup(self) -> dict:
"""执行全部清理,返回各步骤删除记录数。"""
async def _cleanup_run_logs(self) -> int:
"""DELETE FROM ai_run_logs WHERE created_at < now() - 90 days。"""
async def _cleanup_trigger_jobs(self) -> int:
"""DELETE FROM ai_trigger_jobs WHERE created_at < now() - 90 days。"""
async def _cleanup_cache(self) -> dict[str, int]:
"""每个 App 类型保留最新 20,000 条,删除超出部分。"""
```
### 模块 B前端页面
#### 5. admin-web API 层(新建)
文件:`apps/admin-web/src/api/adminAI.ts`
```typescript
// Dashboard
export const getDashboard = (siteId?: number) =>
client.get('/api/admin/ai/dashboard', { params: { site_id: siteId } });
// 调度任务
export const getTriggerJobs = (params: TriggerJobQuery) =>
client.get('/api/admin/ai/trigger-jobs', { params });
export const getTriggerJobDetail = (id: number) =>
client.get(`/api/admin/ai/trigger-jobs/${id}`);
export const retryTriggerJob = (id: number) =>
client.post(`/api/admin/ai/trigger-jobs/${id}/retry`);
// 调用记录
export const getRunLogs = (params: RunLogQuery) =>
client.get('/api/admin/ai/run-logs', { params });
export const getRunLogDetail = (id: number) =>
client.get(`/api/admin/ai/run-logs/${id}`);
// 缓存管理
export const invalidateCache = (data: CacheInvalidateReq) =>
client.post('/api/admin/ai/cache/invalidate', data);
// Token 预算
export const getBudget = () =>
client.get('/api/admin/ai/budget');
// 批量执行
export const createBatchRun = (data: BatchRunReq) =>
client.post('/api/admin/ai/batch-run', data);
export const confirmBatchRun = (batchId: string) =>
client.post('/api/admin/ai/batch-run/confirm', { batch_id: batchId });
// 告警
export const getAlerts = (params: AlertQuery) =>
client.get('/api/admin/ai/alerts', { params });
export const ackAlert = (id: number) =>
client.post(`/api/admin/ai/alerts/${id}/ack`);
export const ignoreAlert = (id: number) =>
client.post(`/api/admin/ai/alerts/${id}/ignore`);
```
#### 6. 前端页面组件
文件:`apps/admin-web/src/pages/AIDashboard.tsx`(新建)
Dashboard 页面布局:
- 顶部:门店筛选 Select + 刷新按钮
- 第一行4 个 Statistic 卡片今日调用、成功率、Token 消耗、平均延迟)
- 第二行左7 天趋势折线图Ant Design Charts / 内联 SVG
- 第二行右App 调用占比饼图
- 第三行左Token 预算进度条(日/月各一个 Progress
- 第三行右App 健康状态列表Badge 标记状态)
- 第四行:告警列表 Table + App 配置信息(只读 Descriptions
文件:`apps/admin-web/src/pages/AITriggerJobs.tsx`(新建)
调度状态页面布局:
- 顶部筛选器行event_type Select + status Select + site_id Select + DatePicker.RangePicker
- 统计行:今日去重跳过数 Statistic
- 主体Table 分页表格(事件类型、会员、状态 Tag、执行链、耗时、操作列
- 操作列:查看详情 → Modal、手动重跑 → Popconfirm
文件:`apps/admin-web/src/pages/AIRunLogs.tsx`(新建)
调用明细页面布局:
- 顶部筛选器行app_type + status + trigger_type + site_id + DatePicker.RangePicker
- 主体Table 分页表格app_type、trigger_type、member_id、tokens、延迟、状态 Tag
- 点击行Drawer 抽屉展示完整 prompt代码块、完整 response代码块、error_message
文件:`apps/admin-web/src/pages/AIOperations.tsx`(新建)
手动操作页面布局4 个 Card 区域):
- Card 1 — 手动重跑App Select + member_id Input + site_id Select + 执行 Button
- Card 2 — 缓存失效app_type Select + member_id Input + site_id Select + 失效 Button + 结果 message
- Card 3 — 批量执行app_types Checkbox.Group + member_ids TextArea + site_id Select → 预估 → 确认弹窗 Modal
- Card 4 — 告警管理Table告警列表+ 每行"确认"/"忽略" Button
### 模块 D回填脚本
#### 7. ai_backfill.py新建
文件:`scripts/ops/ai_backfill.py`
```python
"""历史数据回填脚本。
用法:
cd C:\\Project\\NeoZQYY
uv run python scripts/ops/ai_backfill.py [--dry-run] [--batch-size 10] [--interval 5]
功能:
1. 查询半年内活跃会员(消费/备注/任务变更三者并集)
2. 分批执行 AI 调用链(每批 10 人,间隔 5 秒)
3. 断点续跑(已完成 member_id 记录到本地文件)
4. App8 写 member_retention_clueDELETE + INSERT 事务)
"""
SITE_ID = 2790685415443269
CHECKPOINT_FILE = "scripts/ops/_ai_backfill_checkpoint.json"
BATCH_SIZE = 10
BATCH_INTERVAL = 5 # 秒
DATE_FROM = "2025-09-21"
DATE_TO = "2026-03-21"
class AIBackfillRunner:
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
self.completed: set[int] = set()
def load_checkpoint(self) -> None:
"""从本地文件加载已完成的 member_id 集合。"""
def save_checkpoint(self) -> None:
"""保存已完成的 member_id 到本地文件。"""
async def query_active_members(self) -> list[int]:
"""查询半年内活跃会员(消费 备注 任务变更)。"""
async def run_member(self, member_id: int) -> None:
"""对单个会员执行调用链:
App3→App8→App7
如有助教关联→App4→App5
如有备注→App6→App8再次整合
验证 App5 输出包含分类字段P2-1 修复)。
"""
async def run(self) -> None:
"""主入口:加载断点 → 查询会员 → 分批执行 → 保存断点。"""
def print_estimate(self, member_count: int) -> None:
"""输出预估信息:会员数、预估调用次数、预估 Token 消耗。"""
```
## 数据模型
### 已有表变更
#### biz.ai_run_logs — 新增字段P15
```sql
ALTER TABLE biz.ai_run_logs
ADD COLUMN alert_status VARCHAR(20) DEFAULT NULL
CHECK (alert_status IN ('pending', 'acknowledged', 'ignored'));
COMMENT ON COLUMN biz.ai_run_logs.alert_status IS
'P15 新增 — 告警处理状态NULL非告警记录/ pending待处理/ acknowledged已确认/ ignored已忽略'
'仅 status IN (''failed'',''timeout'',''circuit_open'') 的记录才设置此字段。';
-- 告警列表查询索引(筛选失败/超时/熔断 + alert_status
CREATE INDEX idx_ai_run_logs_alert
ON biz.ai_run_logs(alert_status, created_at DESC)
WHERE status IN ('failed', 'timeout', 'circuit_open');
```
设计决策:在 `ai_run_logs` 上新增 `alert_status` 字段而非创建独立告警表。理由:
- 告警来源就是 `ai_run_logs` 中的失败/超时/熔断记录1:1 关系
- 独立表需要维护外键和数据同步,增加复杂度
- 单字段方案查询简单(一次 WHERE 即可),无需 JOIN
- 绝大多数记录(成功调用)的 `alert_status` 为 NULL不占索引空间部分索引
#### biz.ai_run_logs — 新增 BRIN 索引P15
```sql
-- Dashboard 聚合查询优化BRIN 索引适合按时间顺序插入的表
CREATE INDEX idx_ai_run_logs_created_brin
ON biz.ai_run_logs USING BRIN (created_at)
WITH (pages_per_range = 32);
-- 可选:删除原有 B-tree 索引(如果 BRIN 索引足够覆盖查询)
-- DROP INDEX IF EXISTS biz.idx_ai_run_logs_created;
```
注意:保留原有 `idx_ai_run_logs_created` B-tree 索引,因为 Token 预算聚合BudgetTracker需要精确的等值/范围查询BRIN 索引用于 Dashboard 的大范围时间聚合。
### 迁移脚本
文件:`db/zqyy_app/migrations/2026-03-23__p15_ai_monitoring.sql`
```sql
-- P15AI 监控后台 — ai_run_logs 新增 alert_status 字段 + BRIN 索引
-- 依赖P14 迁移2026-03-22__p14_ai_module.sql
BEGIN;
-- 1. 新增 alert_status 字段
ALTER TABLE biz.ai_run_logs
ADD COLUMN IF NOT EXISTS alert_status VARCHAR(20) DEFAULT NULL;
ALTER TABLE biz.ai_run_logs
ADD CONSTRAINT chk_ai_run_logs_alert_status
CHECK (alert_status IS NULL OR alert_status IN ('pending', 'acknowledged', 'ignored'));
COMMENT ON COLUMN biz.ai_run_logs.alert_status IS
'P15 — 告警处理状态NULL/pending/acknowledged/ignored';
-- 2. 告警查询部分索引
CREATE INDEX IF NOT EXISTS idx_ai_run_logs_alert
ON biz.ai_run_logs(alert_status, created_at DESC)
WHERE status IN ('failed', 'timeout', 'circuit_open');
-- 3. BRIN 索引Dashboard 聚合优化)
CREATE INDEX IF NOT EXISTS idx_ai_run_logs_created_brin
ON biz.ai_run_logs USING BRIN (created_at)
WITH (pages_per_range = 32);
-- 4. 回填已有失败记录的 alert_status
UPDATE biz.ai_run_logs
SET alert_status = 'pending'
WHERE status IN ('failed', 'timeout', 'circuit_open')
AND alert_status IS NULL;
COMMIT;
```
### 回滚脚本
```sql
-- P15 回滚
BEGIN;
DROP INDEX IF EXISTS biz.idx_ai_run_logs_created_brin;
DROP INDEX IF EXISTS biz.idx_ai_run_logs_alert;
ALTER TABLE biz.ai_run_logs DROP CONSTRAINT IF EXISTS chk_ai_run_logs_alert_status;
ALTER TABLE biz.ai_run_logs DROP COLUMN IF EXISTS alert_status;
COMMIT;
```
### 数据保留策略汇总
| 数据类型 | 保留策略 | 清理方式 |
|---------|---------|---------|
| App1 对话记录(`ai_conversations` + `ai_messages` | 永久保留 | 不清理 |
| App2~8 缓存(`ai_cache` | 每个 App 保留最新 20,000 条 | 定时任务:按 `created_at` 排序,超出部分 DELETE |
| AI 运行记录(`ai_run_logs` | 保留 90 天 | 定时任务:`DELETE WHERE created_at < now() - interval '90 days'` |
| 调度记录(`ai_trigger_jobs` | 保留 90 天 | 同上 |
清理定时任务:每日凌晨 03:00 执行,通过 `scheduled_tasks` 表注册。
## 正确性属性
*属性Property是在系统所有合法执行路径上都应成立的特征或行为——本质上是对"系统应该做什么"的形式化陈述。属性是人类可读规格说明与机器可验证正确性保证之间的桥梁。*
> **与 P14 属性测试的关系**P14 已实现 20 个属性测试(见 `.kiro/specs/P14-ai-dashscope-migration/design.md`。P15 的 7 个 PRD 属性测试中5 个是对 P14 已有测试的更新/增强(标注 `P14 更新`2 个是 P15 全新属性。此外 P15 新增 7 个监控后台相关属性。
---
### PRD §3.3 指定的 7 个属性测试
### Property 1: 熔断器状态机 — 状态转换合法性P14 Property 6 更新)
*对于任意* app_id 和任意成功/失败事件序列,熔断器状态转换应满足:连续 5 次失败→OPENOPEN 经过 recovery_timeout→HALF_OPENHALF_OPEN + 成功→CLOSEDHALF_OPEN + 失败→OPEN。任意操作序列不产生非法状态即状态值始终为 closed/open/half_open 之一)。
**Validates: Requirements C3.1**
> P15 更新点:增强测试覆盖,确保 P14 迁移后熔断器在新 DashScope 调用链中行为一致。
### Property 2: Token 预算计算 — 日/月聚合不变量P14 Property 8 更新)
*对于任意* 一组 `ai_run_logs` 记录(含 `tokens_used``created_at`),日聚合值应等于当日所有 `status='success'` 记录的 `tokens_used` 之和月聚合值应等于当月之和。Dashboard API 和 Budget API 返回的聚合值应与直接 SQL 聚合结果一致。
**Validates: Requirements C3.2, A6.2**
> P15 更新点:验证 AdminAIService 的聚合查询与 BudgetTracker 的聚合结果一致。
### Property 3: 去重逻辑 — 唯一性不变量P14 Property 12 更新)
*对于任意* 两个具有相同 `(event_type, member_id, site_id, date)` 的自动触发事件,第二个应被跳过(`skipped_duplicate`);但当 `is_forced=true` 时(如 admin-web 手动重跑),即使存在重复也应正常执行。
**Validates: Requirements C3.3, A3.2**
> P15 更新点:验证 admin-web 手动重跑is_forced=true正确绕过去重检查。
### Property 4: 缓存过期 — expires_at 过期状态一致性P15 新增)
*对于任意* `ai_cache` 记录,当 `expires_at < now()` 时,该记录的 `status` 应为 `expired`(由定时标记或查询时懒标记)。查询缓存时,过期记录不应被返回为有效缓存。
**Validates: Requirements C3.4**
### Property 5: App8 幂等 — member_retention_clue 唯一性P14 Property 13 更新)
*对于任意* member_id 和日期,多次执行 App8 写入 `member_retention_clue` 后,该 member 该天的记录数应恒为 1DELETE + INSERT 事务保证)。
**Validates: Requirements C3.5**
> P15 更新点:验证回填脚本中的 App8 写入同样满足幂等性。
### Property 6: session_id 重建 — round-trip 属性P15 新增)
*对于任意* App1 对话(含本地 `ai_messages` 记录),当百炼 session_id 过期后重建会话时,重建后的 messages 数组应与本地 `ai_messages` 表中的记录一致(内容和顺序均匹配)。
**Validates: Requirements C3.6**
### Property 7: 限流计数 — 窗口内请求数上限P14 Property 7 更新)
*对于任意* 用户 ID 或门店 ID在滑动窗口内App1: 60 秒/10 次App2~8: 3600 秒/100 次)请求次数未超过阈值时应允许,超过阈值时应拒绝;窗口外的历史请求不影响当前判断。
**Validates: Requirements C3.7**
> P15 更新点:确保限流器在高并发回填场景下行为正确。
---
### P15 监控后台新增属性
### Property 8: Dashboard 聚合正确性
*对于任意* 一组 `ai_run_logs` 记录和任意 `site_id` 筛选条件Dashboard API 返回的统计数据应满足:
- `today_calls` = 今日记录总数
- `today_success_rate` = 今日 success 记录数 / 今日记录总数
- `today_tokens` = 今日所有记录的 `tokens_used` 之和
- `today_avg_latency_ms` = 今日所有记录的 `latency_ms` 平均值
- `trend_7d` 中每天的 `calls``success_rate` 与按日分组聚合结果一致
- `app_distribution` 中各 App 的 `count` 之和等于总调用数,`percentage` 之和约等于 1.0
**Validates: Requirements A1.1, A1.2, A1.3, A1.6**
### Property 9: 告警筛选与状态转换
*对于任意* 一组 `ai_run_logs` 记录,告警列表应仅包含 `status IN ('failed', 'timeout', 'circuit_open')` 的记录。*对于任意* 告警记录,执行 ack 操作后 `alert_status` 应为 `acknowledged`,执行 ignore 操作后应为 `ignored``alert_status` 的值域始终为 `{NULL, 'pending', 'acknowledged', 'ignored'}`
**Validates: Requirements A8.1, A8.2, A8.3, A8.4**
### Property 10: 缓存失效精确性
*对于任意* `ai_cache` 数据集和任意筛选条件组合(`site_id` 必填,`app_type``member_id` 可选),执行缓存失效后:匹配条件的记录 `status` 应为 `invalidated`,不匹配条件的记录 `status` 应保持不变。返回的 `affected_count` 应等于实际被更新的记录数。
**Validates: Requirements A5.1, A5.2, A5.3**
### Property 11: 批量执行预估公式
*对于任意* `app_types` 列表和 `member_ids` 列表,`estimated_calls` 应等于 `len(app_types) × len(member_ids)``estimated_tokens` 应等于 `estimated_calls × avg_tokens_per_call`(其中 `avg_tokens_per_call` 为系统配置常量,默认 2000
**Validates: Requirements A7.1, A7.2**
### Property 12: 分页与筛选正确性
*对于任意* `ai_run_logs``ai_trigger_jobs` 数据集、任意筛选条件组合和分页参数page, page_size返回的 `items` 应满足:
- 所有 item 均匹配所有指定的筛选条件
- `len(items) <= page_size`
- `total` 等于匹配筛选条件的总记录数
- 当指定 `site_id` 时,所有 item 的 `site_id` 均等于指定值
**Validates: Requirements A2.1, A2.2, A4.1, A4.2, A1.6**
### Property 13: 回填断点续跑 round-trip
*对于任意* 已完成的 member_id 集合,保存到 checkpoint 文件后重新加载,得到的集合应与原始集合完全一致(序列化/反序列化 round-trip
**Validates: Requirements D1.4**
### Property 14: 数据保留不变量
*对于任意* 数据库状态,执行清理任务后应满足:
- `ai_run_logs` 中不存在 `created_at < now() - 90 days` 的记录
- `ai_trigger_jobs` 中不存在 `created_at < now() - 90 days` 的记录
- 每个 App 类型App2~8`ai_cache` 记录数不超过 20,000
- `ai_conversations``ai_messages` 记录数不减少(永久保留)
**Validates: Requirements E1.1, E1.2, E1.3, E1.4**
## 错误处理
### Admin AI API 错误处理
| 场景 | HTTP 状态码 | 响应 |
|------|------------|------|
| JWT Token 缺失或无效 | 401 | `{"code": 401, "message": "未授权"}` |
| 调度任务不存在 | 404 | `{"code": 404, "message": "调度任务不存在"}` |
| 调用记录不存在 | 404 | `{"code": 404, "message": "调用记录不存在"}` |
| 告警记录不存在或非告警状态 | 404 | `{"code": 404, "message": "告警记录不存在"}` |
| batch_id 无效或已过期 | 400 | `{"code": 400, "message": "批量执行请求无效或已过期"}` |
| batch_id 已确认(重复确认) | 409 | `{"code": 409, "message": "批量执行已确认"}` |
| 缓存失效缺少 site_id | 422 | `{"code": 422, "message": "site_id 为必填参数"}` |
| 筛选参数格式错误(日期等) | 422 | `{"code": 422, "message": "参数格式错误: ..."}` |
| Dashboard 聚合查询超时 | 504 | `{"code": 504, "message": "查询超时,请缩小时间范围"}` |
| 数据库连接失败 | 500 | `{"code": 500, "message": "服务内部错误"}` |
### 回填脚本错误处理
| 场景 | 处理方式 |
|------|---------|
| 单个会员调用链失败 | 记录错误日志,跳过该会员,继续下一个;不写入 checkpoint |
| App8 写入 member_retention_clue 失败 | 事务回滚,记录错误到 ai_trigger_jobs |
| 数据库连接中断 | 保存当前 checkpoint退出脚本下次从断点继续 |
| App5 输出缺少分类字段 | 记录警告日志,不中断链路 |
| 预算超限 | 保存 checkpoint输出提示信息退出脚本 |
### 清理任务错误处理
| 场景 | 处理方式 |
|------|---------|
| 单步清理失败(如 ai_run_logs 删除失败) | 记录错误日志,继续执行后续步骤 |
| 清理过程中数据库锁等待超时 | 设置 `statement_timeout = 300000`5 分钟),超时后跳过 |
| 清理任务整体失败 | 记录错误到 scheduled_tasks 的执行历史,下次定时触发重试 |
### 降级策略
- Dashboard 聚合查询超时:前端展示"数据加载中",建议缩小时间范围
- 批量执行中某个会员失败:跳过该会员,继续执行其余会员,最终返回成功/失败统计
- 告警 ack/ignore 失败:前端提示"操作失败,请重试"
## 测试策略
### 属性测试Property-Based Testing
使用 `hypothesis` 库(项目已有依赖),每个属性测试最少运行 100 次迭代。
测试文件位于 `tests/` 目录Monorepo 级属性测试),按组件分文件:
| 测试文件 | 覆盖属性 | P14/P15 关系 |
|---------|---------|-------------|
| `tests/test_circuit_breaker_props.py` | Property 1熔断器状态机 | P14 Property 6 更新 |
| `tests/test_budget_tracker_props.py` | Property 2Token 预算聚合) | P14 Property 8 更新 |
| `tests/test_dispatcher_props.py` | Property 3去重唯一性 | P14 Property 12 更新 |
| `tests/test_cache_service_props.py` | Property 4缓存过期一致性 | P15 新增 |
| `tests/test_app8_idempotent.py` | Property 5App8 幂等) | P14 Property 13 更新 |
| `tests/test_session_props.py` | Property 6session_id 重建 round-trip | P15 新增 |
| `tests/test_rate_limiter_props.py` | Property 7限流窗口 | P14 Property 7 更新 |
| `tests/test_admin_ai_dashboard_props.py` | Property 8Dashboard 聚合) | P15 新增 |
| `tests/test_admin_ai_alert_props.py` | Property 9告警筛选与状态 | P15 新增 |
| `tests/test_admin_ai_cache_props.py` | Property 10缓存失效精确性 | P15 新增 |
| `tests/test_admin_ai_batch_props.py` | Property 11批量预估公式 | P15 新增 |
| `tests/test_admin_ai_pagination_props.py` | Property 12分页与筛选 | P15 新增 |
| `tests/test_backfill_props.py` | Property 13断点续跑 round-trip | P15 新增 |
| `tests/test_ai_cleanup_props.py` | Property 14数据保留不变量 | P15 新增 |
每个属性测试必须包含注释标签:
```python
# Feature: ai-monitoring-testing, Property 1: 熔断器状态机 — 状态转换合法性
@given(st.lists(st.sampled_from(["success", "failure"]), min_size=1, max_size=50))
@settings(max_examples=100)
def test_circuit_breaker_state_machine(events):
...
```
### 单元测试
单元测试覆盖属性测试不适合的场景(具体示例、集成点、边界条件):
| 测试文件 | 覆盖内容 |
|---------|---------|
| `apps/backend/tests/unit/test_admin_ai_router.py` | 13 个 API 端点的请求/响应格式、JWT 认证拒绝、参数校验 |
| `apps/backend/tests/unit/test_admin_ai_service.py` | AdminAIService 各方法的边界条件(空数据集、单条记录等) |
| `apps/backend/tests/unit/test_ai_cleanup_service.py` | 清理服务的边界条件(无数据、刚好 20000 条、跨日边界) |
| `apps/backend/tests/unit/test_backfill_script.py` | 回填脚本的 checkpoint 文件读写、空会员列表、批次划分 |
### 全链路测试(模块 C2
文件:`apps/backend/tests/integration/test_ai_full_chain.py`
15 个场景的全链路测试,支持 Mock 模式和真实 API 模式双轨:
```python
# 通过环境变量切换模式
AI_TEST_MODE = os.environ.get("AI_TEST_MODE", "mock") # mock / real
# Mock 模式mock DashScopeClient.call_app() 返回预设 JSON
# Real 模式:调用真实百炼 API需要 DASHSCOPE_API_KEY
```
| 测试函数 | 场景 | 模式 |
|---------|------|------|
| `test_app1_10_entries` | App1 对话 10 种入口 | Mock + Real |
| `test_app2_pregenerate` | App2 定时预生成 8 维度 | Mock + Real |
| `test_consumption_chain` | 消费事件链 App3→App8→App7 | Mock + Real |
| `test_coach_consumption_chain` | 助教消费链 App3→App8→App7→App4→App5 | Mock + Real |
| `test_note_chain` | 备注事件链 App6→App8 | Mock + Real |
| `test_task_chain` | 任务分配链 App4→App5 | Mock + Real |
| `test_cache_hit` | 缓存命中 | Mock |
| `test_cache_invalidation` | 缓存失效后重新生成 | Mock |
| `test_circuit_breaker_flow` | 熔断触发→半开→恢复 | Mock |
| `test_budget_exceeded` | Token 预算超限降级 | Mock |
| `test_failure_logging` | 失败记录完整性 | Mock |
| `test_admin_visibility` | admin-web API 可见性 | Mock |
| `test_json_fallback` | JSON 兜底重试 3 次 | Mock |
| `test_app8_idempotent` | App8 幂等验证 | Mock |
| `test_content_quality` | 内容质量(必要字段+格式) | Mock + Real |
### 测试配置
```python
# conftest.py 中的 hypothesis 配置
from hypothesis import settings
settings.register_profile("ci", max_examples=200, deadline=10000)
settings.register_profile("dev", max_examples=100, deadline=5000)
```
### 测试依赖
- 属性测试:`hypothesis`(已有)
- Mock`unittest.mock`(标准库)
- DashScope 调用mock `Application.call()`,不依赖真实 API
- 数据库:使用 `test_zqyy_app` 测试库(遵循 `testing-env.md` 规范)
- 全链路测试:`AI_TEST_MODE` 环境变量控制 mock/real 模式
## 涉及文件汇总
### 模块 A后端 API新增
| 文件路径 | 操作 | 说明 |
|---------|------|------|
| `apps/backend/app/routers/admin_ai.py` | 新建 | 13 个 admin AI API 端点 |
| `apps/backend/app/schemas/admin_ai.py` | 新建 | 请求/响应 Pydantic Schema |
| `apps/backend/app/services/ai/admin_service.py` | 新建 | AdminAIService 聚合服务 |
| `apps/backend/app/services/ai/cleanup_service.py` | 新建 | AICleanupService 数据清理 |
| `apps/backend/app/main.py` | 修改 | 注册 admin_ai.router |
### 模块 B前端页面新增
| 文件路径 | 操作 | 说明 |
|---------|------|------|
| `apps/admin-web/src/api/adminAI.ts` | 新建 | Admin AI API 调用封装 |
| `apps/admin-web/src/pages/AIDashboard.tsx` | 新建 | AI 运行总览页面 |
| `apps/admin-web/src/pages/AITriggerJobs.tsx` | 新建 | 调度状态页面 |
| `apps/admin-web/src/pages/AIRunLogs.tsx` | 新建 | 调用明细页面 |
| `apps/admin-web/src/pages/AIOperations.tsx` | 新建 | 手动操作页面 |
| `apps/admin-web/src/App.tsx` | 修改 | 新增 4 个路由 + 侧边栏菜单项 |
### 模块 C测试体系归档 + 新增)
| 文件路径 | 操作 | 说明 |
|---------|------|------|
| `scripts/ops/_archived/ai_full_chain_test.py` | 移入 | 从 `scripts/ops/` 归档 |
| `scripts/ops/_archived/test_chat_e2e.py` | 移入 | 从 `scripts/ops/` 归档 |
| `scripts/ops/_archived/test_chat_ai_quality.py` | 移入 | 从 `scripts/ops/` 归档 |
| `scripts/ops/_archived/test_bailian_apps.py` | 移入 | 从 `scripts/ops/` 归档 |
| `scripts/ops/_archived/test_bailian_single.py` | 移入 | 从 `scripts/ops/` 归档 |
| `scripts/ops/_archived/test_bailian_full.py` | 移入 | 从 `scripts/ops/` 归档 |
| `scripts/ops/_archived/_run_ai_tests_remaining.py` | 移入 | 从 `scripts/ops/` 归档 |
| `apps/backend/tests/_archived/test_ai_bailian.py` | 移入 | 从 `apps/backend/tests/` 归档 |
| `docs/reports/_archived/2026-03-21__ai_full_chain_test.md` | 移入 | 从 `docs/reports/` 归档 |
| `apps/backend/tests/integration/test_ai_full_chain.py` | 新建 | 15 场景全链路测试 |
| `tests/test_circuit_breaker_props.py` | 修改 | Property 1 更新 |
| `tests/test_budget_tracker_props.py` | 修改 | Property 2 更新 |
| `tests/test_dispatcher_props.py` | 修改 | Property 3 更新 |
| `tests/test_cache_service_props.py` | 修改 | Property 4 新增 |
| `tests/test_app8_idempotent.py` | 修改 | Property 5 更新 |
| `tests/test_session_props.py` | 修改 | Property 6 新增 |
| `tests/test_rate_limiter_props.py` | 修改 | Property 7 更新 |
| `tests/test_admin_ai_dashboard_props.py` | 新建 | Property 8 |
| `tests/test_admin_ai_alert_props.py` | 新建 | Property 9 |
| `tests/test_admin_ai_cache_props.py` | 新建 | Property 10 |
| `tests/test_admin_ai_batch_props.py` | 新建 | Property 11 |
| `tests/test_admin_ai_pagination_props.py` | 新建 | Property 12 |
| `tests/test_backfill_props.py` | 新建 | Property 13 |
| `tests/test_ai_cleanup_props.py` | 新建 | Property 14 |
### 模块 D回填脚本新增
| 文件路径 | 操作 | 说明 |
|---------|------|------|
| `scripts/ops/ai_backfill.py` | 新建 | 历史数据回填脚本 |
### 模块 E数据保留新增
| 文件路径 | 操作 | 说明 |
|---------|------|------|
| `apps/backend/app/services/ai/cleanup_service.py` | 新建 | 同模块 A清理服务 |
### 模块 F查询优化与收尾
| 文件路径 | 操作 | 说明 |
|---------|------|------|
| `db/zqyy_app/migrations/2026-03-23__p15_ai_monitoring.sql` | 新建 | DDL 迁移alert_status + BRIN 索引) |
| `docs/database/ddl/zqyy_app__biz.sql` | 修改 | 合并 P15 DDL 到基线 |
| `docs/database/BD_Manual_ai_tables.md` | 修改 | 补充 alert_status 字段 + admin API 查询模式 |
| `apps/admin-web/README.md` | 修改 | 新增 AI 监控页面说明 |
| `apps/backend/README.md` | 修改 | 新增 admin AI API 说明 |
| `docs/DOCUMENTATION-MAP.md` | 修改 | 新增 P15 相关条目 |
### 单元测试(新增)
| 文件路径 | 操作 | 说明 |
|---------|------|------|
| `apps/backend/tests/unit/test_admin_ai_router.py` | 新建 | API 端点单元测试 |
| `apps/backend/tests/unit/test_admin_ai_service.py` | 新建 | AdminAIService 单元测试 |
| `apps/backend/tests/unit/test_ai_cleanup_service.py` | 新建 | 清理服务单元测试 |
| `apps/backend/tests/unit/test_backfill_script.py` | 新建 | 回填脚本单元测试 |