"""应用 3 客户数据维客线索分析 Prompt 拼装。 消费事件触发,从客户消费数据提取维客线索。 - 数据源:fetch_member_consumption_data(DWS) - 金额口径:items_sum(禁止 consume_money) - 线索 category:客户基础 / 消费习惯 / 玩法偏好(3 选 1) - 线索 providers 统一为"系统" - system prompt 在百炼控制台配置,本模块只拼数据上下文 JSON 返回:单个 prompt 字符串(直接传给 Application.call)。 """ from __future__ import annotations import json import logging from typing import Any from app.ai.cache_service import AICacheService from app.ai.data_fetchers import fetch_member_consumption_data from app.ai.schemas import CacheTypeEnum from app.services.runtime_context import as_runtime_business_now_str logger = logging.getLogger(__name__) # prompt 观测阈值:历史上 4000 字会触发裁剪;现保留完整消费明细,仅用于测试/审计参考 _MAX_PROMPT_LEN = 4000 async def build_prompt( context: dict, cache_svc: AICacheService | None = None, ) -> str: """构建 App3 prompt 字符串。 Args: context: site_id, member_id cache_svc: 缓存服务,用于读取 reference 历史数据 Returns: JSON 序列化后的 prompt 字符串 """ site_id = context["site_id"] member_id = context["member_id"] # 数据获取(失败降级) fetch_failed = False try: member_data = await fetch_member_consumption_data(site_id, member_id) except Exception: logger.warning( "App3 消费数据获取失败: site_id=%s member_id=%s", site_id, member_id, exc_info=True, ) member_data = _default_member_data() fetch_failed = True consumption_records = member_data.get("consumption_records") or [] if not consumption_records: consumption_records = ( "⚠ 消费数据获取失败,该客户暂无消费记录可供分析" if fetch_failed else "该客户暂无消费记录" ) payload: dict[str, Any] = { "current_time": as_runtime_business_now_str(site_id, fmt="%Y-%m-%d %H:%M"), "member_id": member_id, "member_nickname": member_data.get("member_nickname", ""), "main_data": { "consumption_records": consumption_records, "member_cards": member_data.get("member_cards", []), "card_balance_total": member_data.get("card_balance_total", 0), "stored_value_balance_total": member_data.get("stored_value_balance_total", 0), "expected_visit_date": member_data.get("expected_visit_date"), "days_since_last_visit": member_data.get("days_since_last_visit"), }, "reference": _build_reference(site_id, member_id, cache_svc), } # 完整明细策略:App3 需要尽量保留消费行为模式,不在本地裁剪消费记录。 # 真实 App3 完整 100 条明细调用已验证可在 180s 单步超时内返回。 text = json.dumps(payload, ensure_ascii=False, default=str) return text def _default_member_data() -> dict: return { "member_nickname": "", "consumption_records": [], "member_cards": [], "card_balance_total": 0, "stored_value_balance_total": 0, "expected_visit_date": None, "days_since_last_visit": None, } def _build_reference( site_id: int, member_id: int, cache_svc: AICacheService | None, ) -> dict: """组装参考字段:App6 备注线索最新 + App8 历史最近 2 条。""" if cache_svc is None: return {} ref: dict = {} target_id = str(member_id) app6_latest = cache_svc.get_latest( CacheTypeEnum.APP6_NOTE.value, site_id, target_id, ) if app6_latest: ref["app6_note_clues"] = { "result_json": app6_latest.get("result_json"), "generated_at": app6_latest.get("created_at"), } app8_history = cache_svc.get_history( CacheTypeEnum.APP8_CONSOLIDATION.value, site_id, target_id, limit=2, ) if app8_history: ref["app8_history"] = [ { "result_json": h.get("result_json"), "generated_at": h.get("created_at"), } for h in app8_history ] return ref