"""App2a 区域财务洞察 system prompt 含金量评估脚本。
对 8 业态 × 3 轮 = 24 次百炼调用采样,验证 v1.2 system prompt 输出质量是否
达到 V5.1 全域版同等水准(店长视角:准确性/洞察深度/稳定性)。
用法:
PYTHONIOENCODING=utf-8 .venv/Scripts/python.exe scripts/ab_test_app2a_area.py
输出:
export/ai-ab-test/round_v1_app2a_area/_round.json # 24 个原始文件
export/ai-ab-test/round_v1_app2a_area/summary.json # 汇总报告
"""
from __future__ import annotations
import asyncio
import json
import os
import re
import sys
import time
from pathlib import Path
sys.path.insert(0, 'apps/backend')
from dotenv import load_dotenv
load_dotenv(dotenv_path=os.path.join(os.getcwd(), '.env'))
from app.ai.config import AIConfig
from app.ai.dashscope_client import DashScopeClient
from app.ai.prompts import build_app2a_area_prompt
SITE_ID = 2790685415443269
TIME_DIMENSION = 'this_month'
AREAS = ('hall', 'hallA', 'hallB', 'hallC', 'vip', 'snooker', 'mahjong', 'ktv')
ROUNDS_PER_AREA = 3
OUT_DIR = Path('export/ai-ab-test/round_v1_app2a_area')
def classify_light(content: str) -> str:
if re.search(r'🔴|红灯', content):
return 'red'
if re.search(r'🟡|黄灯', content):
return 'yellow'
if re.search(r'🟢|绿灯', content):
return 'green'
return 'unknown'
def analyze(parsed) -> dict:
"""分析单次返回的含金量指标。"""
if not isinstance(parsed, list):
parsed = parsed.get('insights') if isinstance(parsed, dict) else None
if not isinstance(parsed, list):
return {'count': 0, 'valid': False, 'reasons': ['未返回列表']}
count = len(parsed)
# 硬约束检查
has_align = False # H1 对比口径
has_number_in_trend = True # H2 趋势词是否都有数字锚点
has_industry_fake = False # H3 是否编造行业数字
has_trait_ref = False # H7 是否引用业态说明
light = None
tracking_has_action = False
for ins in parsed:
content = (ins.get('content') or '') if isinstance(ins, dict) else ''
seq = ins.get('seq') if isinstance(ins, dict) else None
if '对比口径' in content or '同天数对齐' in content or re.search(r'当期\s*\d+\s*天', content):
has_align = True
if '业态说明' in content or '业态特征' in content or '业态' in content:
has_trait_ref = True
# 趋势词检查
trend_words = re.findall(r'(下滑|下降|上升|提升|收缩|萎缩|承压|走弱|走强|加剧|恶化|显著|大幅)', content)
if trend_words:
sentences = re.split(r'[;。;]', content)
for s in sentences:
if any(w in s for w in trend_words) and not re.search(r'[-+]?\d+(?:\.\d+)?\s*(?:%|元)', s):
has_number_in_trend = False
break
# H3: 行业警戒线类编造
if re.search(r'行业(均值|警戒线|标准)\s*\d', content):
has_industry_fake = True
if seq == 11:
light = classify_light(content)
if seq == 12:
if re.search(r'(每(周|日|月)|双周)', content) and re.search(r'(启动|暂停|停用|核查|调整|触发|输出|排查)', content):
tracking_has_action = True
return {
'count': count,
'valid': count == 12,
'h1_align_caliber': has_align,
'h2_no_bare_trend': has_number_in_trend,
'h3_no_fake_industry': not has_industry_fake,
'h7_trait_ref': has_trait_ref,
'seq11_light': light,
'seq12_has_action': tracking_has_action,
}
async def run_one(client, cfg, area: str, round_idx: int) -> dict:
prompt = await build_app2a_area_prompt({
'site_id': SITE_ID,
'time_dimension': TIME_DIMENSION,
'area': area,
})
t0 = time.monotonic()
try:
parsed, tokens, _ = await client.call_app(
app_id=cfg.app_id_2a_finance_area,
prompt=prompt,
)
dt = time.monotonic() - t0
ok = True
error = None
except Exception as e:
dt = time.monotonic() - t0
parsed = None
tokens = 0
ok = False
error = f'{type(e).__name__}: {e}'
return {
'area': area,
'round_idx': round_idx,
'ok': ok,
'duration_s': round(dt, 2),
'tokens': tokens,
'prompt_len': len(prompt),
'parsed': parsed,
'error': error,
'analysis': analyze(parsed) if parsed else None,
}
async def main():
cfg = AIConfig.from_env()
client = DashScopeClient(api_key=cfg.api_key, workspace_id=cfg.workspace_id)
OUT_DIR.mkdir(parents=True, exist_ok=True)
total = len(AREAS) * ROUNDS_PER_AREA
results = []
done = 0
for area in AREAS:
for i in range(1, ROUNDS_PER_AREA + 1):
done += 1
print(f'[{done:>2}/{total}] {area:8s} round {i} ...', flush=True)
r = await run_one(client, cfg, area, i)
out_file = OUT_DIR / f'{area}_round{i}.json'
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(r, f, ensure_ascii=False, indent=2, default=str)
status = 'OK' if r['ok'] else f"FAIL ({r['error']})"
analysis = r.get('analysis') or {}
print(f' {status} · {r["duration_s"]}s · tokens={r["tokens"]} · 12 条={analysis.get("valid")} · 三色={analysis.get("seq11_light")}')
results.append(r)
# 汇总报告
success = [r for r in results if r['ok']]
summary = {
'total': total,
'success': len(success),
'failed': total - len(success),
'avg_duration_s': round(sum(r['duration_s'] for r in success) / max(len(success), 1), 2),
'avg_tokens': int(sum(r['tokens'] for r in success) / max(len(success), 1)),
'by_area': {},
'hard_constraints_pass_rate': {},
}
# 按区域分组
for area in AREAS:
area_results = [r for r in results if r['area'] == area]
ok_rounds = [r for r in area_results if r['ok']]
valid_12 = sum(1 for r in ok_rounds if (r.get('analysis') or {}).get('valid'))
lights = [(r.get('analysis') or {}).get('seq11_light') for r in ok_rounds]
summary['by_area'][area] = {
'rounds': len(area_results),
'ok': len(ok_rounds),
'valid_12_pct': round(100 * valid_12 / max(len(area_results), 1), 1),
'light_distribution': {lg: lights.count(lg) for lg in ('red', 'yellow', 'green', 'unknown')},
}
# 硬约束通过率(按 H1/H2/H3/H7/seq12 分别)
def pass_rate(key: str) -> float:
ok_rounds = [r for r in success if r.get('analysis')]
if not ok_rounds:
return 0.0
hits = sum(1 for r in ok_rounds if (r.get('analysis') or {}).get(key))
return round(100 * hits / len(ok_rounds), 1)
summary['hard_constraints_pass_rate'] = {
'H1 对比口径显式': pass_rate('h1_align_caliber'),
'H2 无裸趋势词': pass_rate('h2_no_bare_trend'),
'H3 无编造行业数字': pass_rate('h3_no_fake_industry'),
'H7 引用业态特征': pass_rate('h7_trait_ref'),
'seq12 含触发动作': pass_rate('seq12_has_action'),
}
with open(OUT_DIR / 'summary.json', 'w', encoding='utf-8') as f:
json.dump(summary, f, ensure_ascii=False, indent=2, default=str)
print()
print('=' * 60)
print(f'采样完成:{summary["success"]}/{summary["total"]} 成功')
print(f'平均耗时:{summary["avg_duration_s"]}s · 平均 tokens:{summary["avg_tokens"]}')
print()
print('硬约束通过率:')
for k, v in summary['hard_constraints_pass_rate'].items():
print(f' {k}: {v}%')
print()
print('各业态 valid_12 率:')
for area, info in summary['by_area'].items():
print(f' {area:8s}: {info["valid_12_pct"]}% · 灯色 {info["light_distribution"]}')
if __name__ == '__main__':
asyncio.run(main())