Files
Neo-ZQYY/apps/backend/app/ai/prompts/app4_analysis_prompt.py
Neo caf179a5da feat: 2026-04-15~05-02 累积变更基线 — AI 重构 + Runtime Context + DWS 修复
涵盖(每条对应已存的审计记录):
- AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生)
  audit: 2026-04-20__ai-module-complete.md
- admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager
  audit: 2026-04-21__admin-web-ai-management-suite.md
- App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance)
  audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md
- App2 prewarm 全过滤器 + AI 触发器 cron reschedule
  audit: 2026-04-21__app2-finance-prewarm-all-filters.md
  migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql
- AppType 联合类型对齐 + adminAiAppTypes.test.ts
  audit: 2026-04-30__admin_web_ai_app_type_alignment.md
- DashScope tokens_used 提取修复
  audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md
- App3 线索完整详情 prompt
  audit: 2026-05-01__backend_app3_full_detail_prompt.md
- Runtime Context 沙箱(5-1~5-2 主线):
  - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router
  - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts
  - migration: 20260501__runtime_context_sandbox.sql
  - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py
  - database/changes: 7 份 sandbox_* 验证报告
- 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整
  + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py)

合规:
- .gitignore 启用 tmp/ 排除
- 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留)

待验证清单:
- docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md
  每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
2026-05-04 02:30:19 +08:00

178 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""应用 4 关系分析 / 任务建议 Prompt 拼装。
助教被分配召回任务或参与新结算时触发。
- 数据源fetch_assistant_info + fetch_service_history + fetch_member_consumption_data + fetch_member_notes
- 输出字段task_description / action_suggestions / one_line_summary
- system prompt 在百炼控制台配置
返回:单个 prompt 字符串。
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import Any
from app.ai.cache_service import AICacheService
from app.ai.data_fetchers import (
fetch_assistant_info,
fetch_member_consumption_data,
fetch_member_notes,
fetch_service_history,
)
from app.ai.schemas import CacheTypeEnum
from app.services.runtime_context import as_runtime_business_now_str
logger = logging.getLogger(__name__)
_MAX_PROMPT_LEN = 8000
async def build_prompt(
context: dict,
cache_svc: AICacheService | None = None,
) -> str:
"""构建 App4 prompt 字符串。
Args:
context: site_id, assistant_id, member_id
cache_svc: 缓存服务,用于读取 reference 历史数据
Returns:
JSON 序列化后的 prompt 字符串
"""
site_id = context["site_id"]
assistant_id = context["assistant_id"]
member_id = context["member_id"]
results = await asyncio.gather(
fetch_assistant_info(site_id, assistant_id),
fetch_service_history(site_id, assistant_id, member_id),
fetch_member_consumption_data(site_id, member_id),
fetch_member_notes(site_id, member_id),
return_exceptions=True,
)
warnings: list[str] = []
assistant_info = results[0] if not isinstance(results[0], Exception) else {}
if isinstance(results[0], Exception):
warnings.append("助教信息获取失败")
logger.warning("App4 助教信息获取失败: %s", results[0])
service_history = results[1] if not isinstance(results[1], Exception) else []
if isinstance(results[1], Exception):
warnings.append("服务历史获取失败")
logger.warning("App4 服务历史获取失败: %s", results[1])
if isinstance(results[2], Exception):
member_data = _default_member_data()
warnings.append("消费数据获取失败")
logger.warning("App4 消费数据获取失败: %s", results[2])
else:
member_data = results[2]
notes = results[3] if not isinstance(results[3], Exception) else []
if isinstance(results[3], Exception):
warnings.append("备注获取失败")
logger.warning("App4 备注获取失败: %s", results[3])
payload: dict[str, Any] = {
"current_time": as_runtime_business_now_str(site_id, fmt="%Y-%m-%d %H:%M"),
"assistant_id": assistant_id,
"member_id": member_id,
"assistant_info": assistant_info or "⚠ 助教信息获取失败",
"service_history": service_history or "暂无服务记录",
"task_assignment_basis": {
"consumption_records": member_data.get("consumption_records", []) or "该客户暂无消费记录",
"member_cards": member_data.get("member_cards", []),
"card_balance_total": member_data.get("card_balance_total", 0),
"stored_value_balance_total": member_data.get("stored_value_balance_total", 0),
"expected_visit_date": member_data.get("expected_visit_date"),
"days_since_last_visit": member_data.get("days_since_last_visit"),
},
"customer_data": {
"member_nickname": member_data.get("member_nickname", ""),
"notes": notes or "暂无备注",
},
"reference": _build_reference(site_id, member_id, cache_svc),
}
if warnings:
payload["_data_warnings"] = warnings
return _truncate_payload(payload)
def _default_member_data() -> dict:
return {
"member_nickname": "",
"consumption_records": [],
"member_cards": [],
"card_balance_total": 0,
"stored_value_balance_total": 0,
"expected_visit_date": None,
"days_since_last_visit": None,
}
def _build_reference(
site_id: int,
member_id: int,
cache_svc: AICacheService | None,
) -> dict:
"""组装 App8 最新 + 最近 2 条历史。"""
if cache_svc is None:
return {}
ref: dict = {}
target_id = str(member_id)
latest = cache_svc.get_latest(
CacheTypeEnum.APP8_CLUE_CONSOLIDATED.value, site_id, target_id,
)
if latest:
ref["app8_latest"] = {
"result_json": latest.get("result_json"),
"generated_at": latest.get("created_at"),
}
history = cache_svc.get_history(
CacheTypeEnum.APP8_CLUE_CONSOLIDATED.value, site_id, target_id, limit=2,
)
if history:
ref["app8_history"] = [
{"result_json": h.get("result_json"), "generated_at": h.get("created_at")}
for h in history
]
return ref
def _truncate_payload(payload: dict) -> str:
"""按优先级截断 service_history → consumption_records → notes控制 prompt 长度。"""
text = json.dumps(payload, ensure_ascii=False, default=str)
if len(text) <= _MAX_PROMPT_LEN:
return text
sh = payload.get("service_history")
if isinstance(sh, list) and len(sh) > 5:
payload["service_history"] = sh[:5]
payload["_truncated_service_history"] = f"服务记录已截断,原始 {len(sh)}"
text = json.dumps(payload, ensure_ascii=False, default=str)
if len(text) > _MAX_PROMPT_LEN:
records = payload["task_assignment_basis"].get("consumption_records")
if isinstance(records, list) and len(records) > 5:
payload["task_assignment_basis"]["consumption_records"] = records[:5]
payload["task_assignment_basis"]["_truncated"] = f"消费记录已截断,原始 {len(records)}"
text = json.dumps(payload, ensure_ascii=False, default=str)
if len(text) > _MAX_PROMPT_LEN:
n = payload["customer_data"].get("notes")
if isinstance(n, list) and len(n) > 10:
payload["customer_data"]["notes"] = n[:10]
payload["customer_data"]["_truncated_notes"] = f"备注已截断,原始 {len(n)}"
text = json.dumps(payload, ensure_ascii=False, default=str)
return text