Files
Neo-ZQYY/scripts/ab_test_app2_prompt.py
Neo caf179a5da feat: 2026-04-15~05-02 累积变更基线 — AI 重构 + Runtime Context + DWS 修复
涵盖(每条对应已存的审计记录):
- AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生)
  audit: 2026-04-20__ai-module-complete.md
- admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager
  audit: 2026-04-21__admin-web-ai-management-suite.md
- App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance)
  audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md
- App2 prewarm 全过滤器 + AI 触发器 cron reschedule
  audit: 2026-04-21__app2-finance-prewarm-all-filters.md
  migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql
- AppType 联合类型对齐 + adminAiAppTypes.test.ts
  audit: 2026-04-30__admin_web_ai_app_type_alignment.md
- DashScope tokens_used 提取修复
  audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md
- App3 线索完整详情 prompt
  audit: 2026-05-01__backend_app3_full_detail_prompt.md
- Runtime Context 沙箱(5-1~5-2 主线):
  - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router
  - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts
  - migration: 20260501__runtime_context_sandbox.sql
  - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py
  - database/changes: 7 份 sandbox_* 验证报告
- 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整
  + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py)

合规:
- .gitignore 启用 tmp/ 排除
- 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留)

待验证清单:
- docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md
  每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
2026-05-04 02:30:19 +08:00

224 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""App2 财务洞察 system prompt A/B 测试脚本。
流程:
- 对同一 payload 连续调用百炼 N 次(默认 10 次),绕过 AI cache
- 存档每次原始 JSON 到 export/ai-ab-test/round_<label>/
- 输出稳定性汇总长度分布、12 条齐整率、三色灯分布、加粗使用、关键字段命中率
用法:
# Round A当前百炼上的 system prompt调用前用户已确认未替换
PYTHONIOENCODING=utf-8 .venv/Scripts/python.exe scripts/ab_test_app2_prompt.py --label a --rounds 10
# Round B用户替换为 v4 concise 后执行
PYTHONIOENCODING=utf-8 .venv/Scripts/python.exe scripts/ab_test_app2_prompt.py --label b --rounds 10
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
import time
from pathlib import Path
sys.path.insert(0, 'apps/backend')
from dotenv import load_dotenv
load_dotenv(dotenv_path=os.path.join(os.getcwd(), '.env'))
from app.ai.config import AIConfig
from app.ai.dashscope_client import DashScopeClient
from app.ai.prompts.app2_finance_prompt import build_prompt
SITE_ID = 2790685415443269
TIME_DIMENSION = 'this_month'
AREA = 'all'
OUT_ROOT = Path('export/ai-ab-test')
async def run_one(client: DashScopeClient, app_id: str, prompt: str, round_idx: int) -> dict:
"""单次百炼调用,返回结构化结果(含时长/tokens/解析状态)。"""
t0 = time.monotonic()
try:
parsed, tokens, _ = await client.call_app(app_id=app_id, prompt=prompt)
dt = time.monotonic() - t0
return {
'ok': True,
'round_idx': round_idx,
'duration_s': round(dt, 2),
'tokens': tokens,
'parsed': parsed,
'error': None,
}
except Exception as e:
dt = time.monotonic() - t0
return {
'ok': False,
'round_idx': round_idx,
'duration_s': round(dt, 2),
'tokens': 0,
'parsed': None,
'error': f'{type(e).__name__}: {e}',
}
def classify_light(content: str) -> str:
"""识别三色灯类型。"""
if re.search(r'🔴|红灯', content):
return 'red'
if re.search(r'🟡|黄灯', content):
return 'yellow'
if re.search(r'🟢|绿灯', content):
return 'green'
return 'unknown'
def analyze_insights(parsed: dict | None) -> dict:
"""分析单次返回的洞察数组质量。"""
if not parsed:
return {'insights_count': 0, 'has_12': False, 'light': 'unknown', 'bold_count': 0, 'seq_complete': False}
insights = parsed.get('insights') or []
if not isinstance(insights, list):
return {'insights_count': 0, 'has_12': False, 'light': 'unknown', 'bold_count': 0, 'seq_complete': False}
count = len(insights)
# seq 完整性检查(期望 1-12
seqs = [ins.get('seq') for ins in insights if isinstance(ins, dict)]
seq_complete = sorted([s for s in seqs if isinstance(s, int)]) == list(range(1, 13))
# seq 11 的三色灯
seq11 = next((ins for ins in insights if isinstance(ins, dict) and ins.get('seq') == 11), None)
light = classify_light(seq11.get('content') or '') if seq11 else 'unknown'
# 加粗使用总次数(**...** 模式)
bold_count = 0
for ins in insights:
if isinstance(ins, dict):
body = ins.get('content') or ''
bold_count += len(re.findall(r'\*\*[^*]+\*\*', body))
return {
'insights_count': count,
'has_12': count == 12,
'seq_complete': seq_complete,
'light': light,
'bold_count': bold_count,
}
async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('--label', required=True, help='测试轮次标识a/b/v5 等)')
parser.add_argument('--rounds', type=int, default=10, help='本次调用次数')
parser.add_argument('--delay', type=float, default=1.5, help='每次调用间延时(秒),避免限流')
parser.add_argument('--resume', action='store_true', help='断点续跑:起始 idx = 已有 round_*.json 数量 + 1')
args = parser.parse_args()
cfg = AIConfig.from_env()
client = DashScopeClient(api_key=cfg.api_key, workspace_id=cfg.workspace_id)
app_id = cfg.app_id_2_finance
# 构建 prompt仅一次10 次调用同一份 payload
prompt = await build_prompt({
'site_id': SITE_ID,
'time_dimension': TIME_DIMENSION,
'area': AREA,
})
print(f'[setup] prompt 长度 = {len(prompt)} 字符')
print(f'[setup] app_id = {app_id}')
print(f'[setup] label = {args.label.upper()}, rounds = {args.rounds}, delay = {args.delay}s')
print()
out_dir = OUT_ROOT / f'round_{args.label}'
out_dir.mkdir(parents=True, exist_ok=True)
# 存档本轮使用的 prompt 快照
(out_dir / '_prompt_snapshot.json').write_text(
json.dumps(json.loads(prompt), ensure_ascii=False, indent=2),
encoding='utf-8',
)
# 断点续跑:统计目录里已有 round_XX.json 数量,新 idx 从 existing+1 开始
if args.resume:
existing = len(list(out_dir.glob('round_*.json')))
start_idx = existing + 1
end_idx = start_idx + args.rounds - 1
print(f'[resume] 已有 {existing} 份,本次追加 idx {start_idx}~{end_idx}')
else:
start_idx = 1
end_idx = args.rounds
summary: list[dict] = []
for i in range(start_idx, end_idx + 1):
print(f'[round {args.label.upper()} · {i:02d}/{end_idx}] 调用中...', end=' ', flush=True)
result = await run_one(client, app_id, prompt, i)
analysis = analyze_insights(result['parsed'])
row = {
'round_idx': i,
'ok': result['ok'],
'duration_s': result['duration_s'],
'tokens': result['tokens'],
'error': result['error'],
**analysis,
}
summary.append(row)
# 存档单次结果(原始 + 分析)
snapshot = {
'meta': {
'label': args.label,
'round_idx': i,
'time_dimension': TIME_DIMENSION,
'area': AREA,
'duration_s': result['duration_s'],
'tokens': result['tokens'],
'ok': result['ok'],
'error': result['error'],
**analysis,
},
'parsed': result['parsed'],
}
path = out_dir / f'round_{i:02d}.json'
path.write_text(json.dumps(snapshot, ensure_ascii=False, indent=2), encoding='utf-8')
if result['ok']:
print(f'ok · {result["duration_s"]}s · tokens={result["tokens"]} · count={analysis["insights_count"]} · light={analysis["light"]} · bold={analysis["bold_count"]}')
else:
print(f'FAIL · {result["duration_s"]}s · {result["error"]}')
if i < end_idx:
await asyncio.sleep(args.delay)
# 汇总统计
ok_rows = [r for r in summary if r['ok']]
print()
print(f'=== Round {args.label.upper()} 汇总({len(ok_rows)}/{args.rounds} 成功)===')
if ok_rows:
durations = [r['duration_s'] for r in ok_rows]
tokens = [r['tokens'] for r in ok_rows]
has_12_rate = sum(1 for r in ok_rows if r['has_12']) / len(ok_rows)
seq_complete_rate = sum(1 for r in ok_rows if r['seq_complete']) / len(ok_rows)
bold_avg = sum(r['bold_count'] for r in ok_rows) / len(ok_rows)
lights: dict[str, int] = {}
for r in ok_rows:
lights[r['light']] = lights.get(r['light'], 0) + 1
print(f' 时长: min={min(durations):.1f}s / max={max(durations):.1f}s / avg={sum(durations)/len(durations):.1f}s')
print(f' tokens: min={min(tokens)} / max={max(tokens)} / avg={sum(tokens)/len(tokens):.0f}')
print(f' 12 条齐整率: {has_12_rate:.0%}')
print(f' seq 1-12 完整率: {seq_complete_rate:.0%}')
print(f' 平均加粗数: {bold_avg:.1f} 次/次')
print(f' 三色灯分布: {lights}')
# 存档汇总 CSVresume 模式下追加,首次写 header
import csv
csv_path = out_dir / '_summary.csv'
mode = 'a' if args.resume and csv_path.exists() else 'w'
with csv_path.open(mode, encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=list(summary[0].keys()) if summary else [])
if mode == 'w':
writer.writeheader()
writer.writerows(summary)
print(f'[done] 存档到: {out_dir}')
print(f'[done] CSV 汇总: {csv_path}')
if __name__ == '__main__':
asyncio.run(main())