Files
Neo-ZQYY/apps/backend/app/ai/dispatcher.py
2026-03-15 10:15:02 +08:00

339 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 事件调度与调用链编排器。
根据业务事件(消费、备注、任务分配)编排 AI 应用调用链,
确保执行顺序和数据依赖正确。
调用链:
- 消费事件无助教App3 → App8 → App7
- 消费事件有助教App3 → App8 → App7 + App4 → App5
- 备注事件App6 → App8
- 任务分配事件App4 → App5读已有 App8 缓存)
容错策略:
- 某步失败记录错误日志,后续应用使用已有缓存继续
- 失败应用写入失败 conversation 记录
- 整条链后台异步执行,不阻塞业务请求
"""
from __future__ import annotations
import json
import logging
from typing import Any, Callable, Coroutine
from app.ai.bailian_client import BailianClient
from app.ai.cache_service import AICacheService
from app.ai.conversation_service import ConversationService
from app.ai.schemas import CacheTypeEnum
logger = logging.getLogger(__name__)
class AIDispatcher:
"""AI 应用调用链编排器。"""
def __init__(
self,
bailian: BailianClient,
cache_svc: AICacheService,
conv_svc: ConversationService,
) -> None:
self.bailian = bailian
self.cache_svc = cache_svc
self.conv_svc = conv_svc
async def handle_consumption_event(
self,
member_id: int,
site_id: int,
settle_id: int,
assistant_id: int | None = None,
) -> None:
"""消费事件链App3 → App8 → App7+ App4 → App5 如有助教)。"""
from app.ai.apps.app3_clue import run as app3_run
from app.ai.apps.app4_analysis import run as app4_run
from app.ai.apps.app5_tactics import run as app5_run
from app.ai.apps.app7_customer import run as app7_run
from app.ai.apps.app8_consolidation import run as app8_run
context: dict[str, Any] = {
"member_id": member_id,
"site_id": site_id,
"settle_id": settle_id,
"user_id": "system",
"nickname": "",
}
# 步骤 1App3 线索分析
app3_result = await self._run_step("app3_clue", app3_run, context)
# 步骤 2App8 线索整理(需要 App3 的 clues
app8_context = {**context}
# 从 App3 结果提取 clues同时从缓存获取 App6 已有线索
if app3_result:
app8_context["app3_clues"] = app3_result.get("clues", [])
app8_context["app3_generated_at"] = None # 刚生成,无需时间戳
else:
app8_context["app3_clues"] = []
app8_context["app3_generated_at"] = None
# 从缓存获取 App6 已有线索
app6_cache = self.cache_svc.get_latest(
CacheTypeEnum.APP6_NOTE_ANALYSIS.value, site_id, str(member_id),
)
if app6_cache:
app6_result_json = app6_cache.get("result_json", {})
if isinstance(app6_result_json, str):
try:
app6_result_json = json.loads(app6_result_json)
except (json.JSONDecodeError, TypeError):
app6_result_json = {}
app8_context["app6_clues"] = app6_result_json.get("clues", [])
app8_context["app6_generated_at"] = app6_cache.get("created_at")
else:
app8_context["app6_clues"] = []
app8_context["app6_generated_at"] = None
await self._run_step("app8_consolidation", app8_run, app8_context)
# 步骤 3App7 客户分析
await self._run_step("app7_customer", app7_run, context)
# 步骤 4可选如有助教App4 → App5
if assistant_id is not None:
app4_context = {**context, "assistant_id": assistant_id}
app4_result = await self._run_step("app4_analysis", app4_run, app4_context)
app5_context = {
**context,
"assistant_id": assistant_id,
"app4_result": app4_result or {},
}
await self._run_step("app5_tactics", app5_run, app5_context)
async def handle_note_event(
self,
member_id: int,
site_id: int,
note_id: int,
note_content: str,
noted_by_name: str,
) -> None:
"""备注事件链App6 → App8。"""
from app.ai.apps.app6_note import run as app6_run
from app.ai.apps.app8_consolidation import run as app8_run
context: dict[str, Any] = {
"member_id": member_id,
"site_id": site_id,
"note_id": note_id,
"note_content": note_content,
"noted_by_name": noted_by_name,
"user_id": "system",
"nickname": "",
}
# 步骤 1App6 备注分析
app6_result = await self._run_step("app6_note", app6_run, context)
# 步骤 2App8 线索整理(需要 App6 的 clues
app8_context: dict[str, Any] = {
"member_id": member_id,
"site_id": site_id,
"user_id": "system",
"nickname": "",
}
if app6_result:
app8_context["app6_clues"] = app6_result.get("clues", [])
app8_context["app6_generated_at"] = None
else:
app8_context["app6_clues"] = []
app8_context["app6_generated_at"] = None
# 从缓存获取 App3 已有线索
app3_cache = self.cache_svc.get_latest(
CacheTypeEnum.APP3_CLUE.value, site_id, str(member_id),
)
if app3_cache:
app3_result_json = app3_cache.get("result_json", {})
if isinstance(app3_result_json, str):
try:
app3_result_json = json.loads(app3_result_json)
except (json.JSONDecodeError, TypeError):
app3_result_json = {}
app8_context["app3_clues"] = app3_result_json.get("clues", [])
app8_context["app3_generated_at"] = app3_cache.get("created_at")
else:
app8_context["app3_clues"] = []
app8_context["app3_generated_at"] = None
await self._run_step("app8_consolidation", app8_run, app8_context)
async def handle_task_assign_event(
self,
assistant_id: int,
member_id: int,
site_id: int,
task_type: str,
) -> None:
"""任务分配事件链App4 → App5读已有 App8 缓存)。"""
from app.ai.apps.app4_analysis import run as app4_run
from app.ai.apps.app5_tactics import run as app5_run
context: dict[str, Any] = {
"assistant_id": assistant_id,
"member_id": member_id,
"site_id": site_id,
"task_type": task_type,
"user_id": "system",
"nickname": "",
}
# 步骤 1App4 关系分析
app4_result = await self._run_step("app4_analysis", app4_run, context)
# 步骤 2App5 话术参考
app5_context = {
**context,
"app4_result": app4_result or {},
}
await self._run_step("app5_tactics", app5_run, app5_context)
async def _run_chain(
self,
chain: list[tuple[str, Callable[..., Coroutine], dict]],
) -> None:
"""串行执行调用链,某步失败记录日志后继续。
Args:
chain: [(app_name, run_func, context), ...] 的列表
"""
for app_name, run_func, ctx in chain:
await self._run_step(app_name, run_func, ctx)
async def _run_step(
self,
app_name: str,
run_func: Callable[..., Coroutine],
context: dict,
) -> dict | None:
"""执行单个应用步骤,失败时记录日志并写入失败 conversation。
Returns:
应用返回结果,失败时返回 None
"""
try:
result = await run_func(
context,
self.bailian,
self.cache_svc,
self.conv_svc,
)
logger.info("调用链步骤成功: %s", app_name)
return result
except Exception:
logger.exception("调用链步骤失败: %s", app_name)
# 写入失败 conversation 记录
try:
site_id = context.get("site_id", 0)
conv_id = self.conv_svc.create_conversation(
user_id="system",
nickname="",
app_id=app_name,
site_id=site_id,
source_context={"error": True, "chain_step": app_name},
)
self.conv_svc.add_message(
conversation_id=conv_id,
role="system",
content=f"调用链步骤 {app_name} 执行失败",
)
except Exception:
logger.exception("写入失败 conversation 记录也失败: %s", app_name)
return None
def _create_ai_event_handlers(dispatcher: AIDispatcher) -> dict[str, Callable]:
"""创建 AI 事件处理器,用于注册到 trigger_scheduler。
每个处理器从 payload 提取参数,通过 asyncio.create_task 后台执行,
不阻塞同步的 fire_event 调用。
Returns:
{event_job_type: handler_func} 映射
"""
import asyncio
def _get_or_create_loop() -> asyncio.AbstractEventLoop:
"""获取当前事件循环,兼容同步调用场景。"""
try:
return asyncio.get_running_loop()
except RuntimeError:
return asyncio.new_event_loop()
def handle_consumption_settled(payload: dict | None = None, **_kw: Any) -> None:
"""消费结算事件处理器(同步入口,内部异步执行)。"""
if not payload:
logger.warning("consumption_settled 事件缺少 payload")
return
loop = _get_or_create_loop()
loop.create_task(
dispatcher.handle_consumption_event(
member_id=payload["member_id"],
site_id=payload["site_id"],
settle_id=payload["settle_id"],
assistant_id=payload.get("assistant_id"),
)
)
def handle_note_created(payload: dict | None = None, **_kw: Any) -> None:
"""备注创建事件处理器。"""
if not payload:
logger.warning("note_created 事件缺少 payload")
return
loop = _get_or_create_loop()
loop.create_task(
dispatcher.handle_note_event(
member_id=payload["member_id"],
site_id=payload["site_id"],
note_id=payload["note_id"],
note_content=payload.get("note_content", ""),
noted_by_name=payload.get("noted_by_name", ""),
)
)
def handle_task_assigned(payload: dict | None = None, **_kw: Any) -> None:
"""任务分配事件处理器。"""
if not payload:
logger.warning("task_assigned 事件缺少 payload")
return
loop = _get_or_create_loop()
loop.create_task(
dispatcher.handle_task_assign_event(
assistant_id=payload["assistant_id"],
member_id=payload["member_id"],
site_id=payload["site_id"],
task_type=payload.get("task_type", ""),
)
)
return {
"ai_consumption_settled": handle_consumption_settled,
"ai_note_created": handle_note_created,
"ai_task_assigned": handle_task_assigned,
}
def register_ai_handlers(dispatcher: AIDispatcher) -> None:
"""将 AI 事件处理器注册到 trigger_scheduler。
在 FastAPI lifespan 中调用,将三个 AI 事件处理器
注册为 trigger_scheduler 的 job handler。
"""
from app.services.trigger_scheduler import register_job
handlers = _create_ai_event_handlers(dispatcher)
for job_type, handler in handlers.items():
register_job(job_type, handler)
logger.info("已注册 AI 事件处理器: %s", job_type)