"""AI 事件调度与调用链编排器。 根据业务事件(消费、备注、任务分配、DWS 完成)编排 AI 应用调用链, 确保执行顺序和数据依赖正确。 关键设计: - 所有入口均为 async def,无 asyncio.run() / new_event_loop() - 后台任务通过 asyncio.create_task() 发起 - 超时通过 asyncio.wait_for() 控制 - 每步调用前依次检查:熔断→限流→预算 - 调用链某步失败不中断后续步骤 调用链: - 消费事件(无助教):App3 → App8 → App7 - 消费事件(有助教):App3 → App8 → App7 + App4 → App5 - 备注事件:App6 → App8 - 任务分配事件:App4 → App5 - DWS 完成事件:App2 预生成 """ from __future__ import annotations import asyncio import json import logging import time from dataclasses import dataclass, field from datetime import date from typing import Any, Callable from app.ai.budget_tracker import BudgetTracker from app.ai.cache_service import AICacheService from app.ai.circuit_breaker import CircuitBreaker, CircuitState from app.ai.config import AIConfig from app.ai.conversation_service import ConversationService from app.ai.dashscope_client import DashScopeClient from app.ai.exceptions import ( BudgetExceededError, CircuitOpenError, DashScopeError, RateLimitExceededError, ) from app.ai.rate_limiter import RateLimiter from app.ai.run_log_service import AIRunLogService logger = logging.getLogger(__name__) # 单步调用默认超时(秒) _STEP_TIMEOUT = 120 def _update_trigger_job_status( job_id: int, status: str, error_message: str | None = None, set_started: bool = False, set_finished: bool = False, ) -> None: """更新 ai_trigger_jobs 表的状态(仅当 job_id 存在于 DB 时生效)。""" from app.database import get_connection parts = ["status = %s"] params: list[Any] = [status] if set_started: parts.append("started_at = NOW()") if set_finished: parts.append("finished_at = NOW()") if error_message is not None: parts.append("error_message = %s") params.append(error_message) params.append(job_id) conn = get_connection() try: with conn.cursor() as cur: cur.execute( f"UPDATE biz.ai_trigger_jobs SET {', '.join(parts)} WHERE id = %s", params, ) conn.commit() except Exception: conn.rollback() logger.debug("更新 trigger_job %d 状态失败(可能为内存 job)", job_id, exc_info=True) finally: conn.close() @dataclass class TriggerEvent: """统一事件触发载体。""" event_type: str # consumption / note_created / task_assigned / dws_completed site_id: int member_id: int | None = None connector_type: str = "feiqiu" payload: dict = field(default_factory=dict) is_forced: bool = False # 事件类型常量 EVENT_CONSUMPTION = "consumption" EVENT_NOTE_CREATED = "note_created" EVENT_TASK_ASSIGNED = "task_assigned" EVENT_DWS_COMPLETED = "dws_completed" # 事件类型 → 调用链描述(用于 ai_trigger_jobs.app_chain) _EVENT_CHAIN_MAP: dict[str, str] = { EVENT_CONSUMPTION: "app3→app8→app7", EVENT_NOTE_CREATED: "app6→app8", EVENT_TASK_ASSIGNED: "app4→app5", EVENT_DWS_COMPLETED: "app2", } class AIDispatcher: """AI 事件调度与调用链编排器。 集成熔断器、限流器、Token 预算控制, 每步调用前依次检查:熔断→限流→预算→调用→记录日志。 """ def __init__( self, client: DashScopeClient, cache_svc: AICacheService, conv_svc: ConversationService, circuit_breaker: CircuitBreaker, rate_limiter: RateLimiter, budget_tracker: BudgetTracker, run_log_svc: AIRunLogService, config: AIConfig, ) -> None: self.client = client self.cache_svc = cache_svc self.conv_svc = conv_svc self.circuit_breaker = circuit_breaker self.rate_limiter = rate_limiter self.budget_tracker = budget_tracker self.run_log_svc = run_log_svc self.config = config # 内存去重集合:(event_type, member_id, site_id, date_str) # DB 迁移完成后改为查询 ai_trigger_jobs 表 self._dedup_set: set[tuple[str, int | None, int, str]] = set() # 内存 trigger_job 计数器(DB 迁移完成后改为 INSERT RETURNING id) self._next_job_id = 1 # ── 统一事件入口 ───────────────────────────────────── async def handle_trigger(self, event: TriggerEvent) -> int: """统一事件入口。记录 trigger_job 后异步执行调用链。 返回 trigger_job_id(当前为内存自增,DB 迁移后改为数据库 ID)。 """ job_id = self._next_job_id self._next_job_id += 1 app_chain = _EVENT_CHAIN_MAP.get(event.event_type, "unknown") logger.info( "收到触发事件: job_id=%d event_type=%s site_id=%d chain=%s", job_id, event.event_type, event.site_id, app_chain, ) # 去重检查(is_forced 跳过) if not event.is_forced and await self._check_dedup(event): logger.info( "事件去重跳过: job_id=%d event_type=%s site_id=%d member_id=%s", job_id, event.event_type, event.site_id, event.member_id, ) return job_id # 记录去重键 dedup_key = ( event.event_type, event.member_id, event.site_id, date.today().isoformat(), ) self._dedup_set.add(dedup_key) # 后台异步执行调用链,不阻塞返回 asyncio.create_task(self._execute_chain(job_id, event)) return job_id # ── 调用链分发 ─────────────────────────────────────── async def _execute_chain(self, job_id: int, event: TriggerEvent) -> None: """执行调用链,根据 event_type 分发到对应处理器。""" handler_map: dict[str, Callable] = { EVENT_CONSUMPTION: self._handle_consumption, EVENT_NOTE_CREATED: self._handle_note, EVENT_TASK_ASSIGNED: self._handle_task_assigned, EVENT_DWS_COMPLETED: self._handle_dws_completed, } handler = handler_map.get(event.event_type) if handler is None: logger.error("未知事件类型: %s (job_id=%d)", event.event_type, job_id) _update_trigger_job_status(job_id, "failed", error_message=f"未知事件类型: {event.event_type}") return # 标记开始执行 _update_trigger_job_status(job_id, "running", set_started=True) try: await asyncio.wait_for(handler(event), timeout=_STEP_TIMEOUT * 5) logger.info("调用链完成: job_id=%d event_type=%s", job_id, event.event_type) _update_trigger_job_status(job_id, "completed", set_finished=True) except asyncio.TimeoutError: logger.error("调用链超时: job_id=%d event_type=%s", job_id, event.event_type) _update_trigger_job_status(job_id, "failed", error_message="调用链超时", set_finished=True) except Exception as exc: logger.exception("调用链异常: job_id=%d event_type=%s", job_id, event.event_type) _update_trigger_job_status( job_id, "failed", error_message=str(exc)[:500], set_finished=True, ) # ── 去重检查 ───────────────────────────────────────── async def _check_dedup(self, event: TriggerEvent) -> bool: """去重检查:(event_type, member_id, site_id, date) 是否已存在。 返回 True 表示重复(应跳过),False 表示不重复。 当前为内存实现,DB 迁移后改为查询 ai_trigger_jobs 表。 """ dedup_key = ( event.event_type, event.member_id, event.site_id, date.today().isoformat(), ) return dedup_key in self._dedup_set # ── 单步执行 ───────────────────────────────────────── async def _run_step( self, app_name: str, app_id: str, prompt: str, context: dict, ) -> dict | None: """执行单步:熔断检查→限流检查→预算检查→调用→记录日志。 失败返回 None,调用链继续执行后续步骤。 Args: app_name: 应用名称(如 "app3_clue"),用于日志 app_id: 百炼应用 ID prompt: 发送给应用的 prompt context: 上下文信息(含 site_id、member_id 等) Returns: 解析后的 JSON dict,失败时返回 None """ site_id = context.get("site_id", 0) member_id = context.get("member_id") start_time = time.monotonic() # ── 1. 熔断检查 ── state = self.circuit_breaker.check(app_id) if state == CircuitState.OPEN: logger.warning("熔断器 OPEN,跳过: app=%s app_id=%s", app_name, app_id) try: log_id = self.run_log_svc.create_log( site_id=site_id, app_type=app_name, trigger_type="event", member_id=member_id, request_prompt=prompt, ) elapsed_ms = int((time.monotonic() - start_time) * 1000) self.run_log_svc.update_failed(log_id, "circuit_open", elapsed_ms) except Exception: logger.exception("记录 circuit_open 日志失败: app=%s", app_name) return None # ── 2. 限流检查 ── if not self.rate_limiter.check_store_rate(site_id): logger.warning("限流超限,跳过: app=%s site_id=%d", app_name, site_id) try: log_id = self.run_log_svc.create_log( site_id=site_id, app_type=app_name, trigger_type="event", member_id=member_id, request_prompt=prompt, ) elapsed_ms = int((time.monotonic() - start_time) * 1000) self.run_log_svc.update_failed(log_id, "rate_limited", elapsed_ms) except Exception: logger.exception("记录 rate_limited 日志失败: app=%s", app_name) return None # ── 3. 预算检查 ── budget_status = self.budget_tracker.check_budget() if not budget_status.allowed: logger.warning( "预算超限,跳过: app=%s reason=%s", app_name, budget_status.reason, ) try: log_id = self.run_log_svc.create_log( site_id=site_id, app_type=app_name, trigger_type="event", member_id=member_id, request_prompt=prompt, ) elapsed_ms = int((time.monotonic() - start_time) * 1000) self.run_log_svc.update_failed( log_id, f"budget_exceeded:{budget_status.reason}", elapsed_ms, ) except Exception: logger.exception("记录 budget_exceeded 日志失败: app=%s", app_name) return None # ── 4. 创建日志记录(pending → running) ── log_id: int | None = None try: log_id = self.run_log_svc.create_log( site_id=site_id, app_type=app_name, trigger_type="event", member_id=member_id, request_prompt=prompt, ) self.run_log_svc.update_running(log_id) except Exception: logger.exception("创建/更新运行日志失败: app=%s", app_name) # ── 5. 调用 DashScope ── try: result, tokens_used, _session_id = await asyncio.wait_for( self.client.call_app(app_id, prompt), timeout=_STEP_TIMEOUT, ) # 成功:更新日志 + 熔断器 elapsed_ms = int((time.monotonic() - start_time) * 1000) if log_id is not None: try: self.run_log_svc.update_success( log_id, response_text=json.dumps(result, ensure_ascii=False), tokens_used=tokens_used, latency_ms=elapsed_ms, ) except Exception: logger.exception("更新 success 日志失败: app=%s log_id=%d", app_name, log_id) self.circuit_breaker.record_success(app_id) logger.info("调用成功: app=%s tokens=%d latency=%dms", app_name, tokens_used, elapsed_ms) return result except asyncio.TimeoutError: # 超时 elapsed_ms = int((time.monotonic() - start_time) * 1000) logger.error("调用超时: app=%s latency=%dms", app_name, elapsed_ms) if log_id is not None: try: self.run_log_svc.update_timeout(log_id, elapsed_ms) except Exception: logger.exception("更新 timeout 日志失败: app=%s", app_name) return None except Exception as exc: # 其他失败 elapsed_ms = int((time.monotonic() - start_time) * 1000) error_msg = f"{type(exc).__name__}: {exc}" logger.exception("调用失败: app=%s error=%s", app_name, error_msg) if log_id is not None: try: self.run_log_svc.update_failed(log_id, error_msg, elapsed_ms) except Exception: logger.exception("更新 failed 日志失败: app=%s", app_name) self.circuit_breaker.record_failure(app_id) return None # ── App8 幂等写入 member_retention_clue ───────────── def _write_retention_clue( self, member_id: int, site_id: int, consolidate_result: dict, source: str, ) -> None: """幂等写入 member_retention_clue:事务内 DELETE 同源旧记录 + INSERT 新记录。 同一 member 同一 site 同一 source 当天只保留最新一批。 事务失败自动回滚。 Args: member_id: 会员 ID site_id: 门店 ID consolidate_result: App8 返回的整合结果,含 clues 列表 source: 线索来源(ai_consumption / ai_note) """ from app.database import get_connection clues = consolidate_result.get("clues", []) if not clues: logger.info( "App8 无线索数据,跳过写入: member_id=%d site_id=%d", member_id, site_id, ) return conn = get_connection() try: with conn.cursor() as cur: # DELETE 该会员该来源当天的旧记录 cur.execute( """ DELETE FROM member_retention_clue WHERE member_id = %s AND site_id = %s AND source = %s AND recorded_at::date = CURRENT_DATE """, (member_id, site_id, source), ) deleted = cur.rowcount # INSERT 新记录 for clue in clues: cur.execute( """ INSERT INTO member_retention_clue (member_id, category, summary, detail, site_id, source) VALUES (%s, %s, %s, %s, %s, %s) """, ( member_id, clue.get("category", "客户基础"), clue.get("summary", ""), clue.get("detail"), site_id, source, ), ) conn.commit() logger.info( "维客线索幂等写入完成: member_id=%d site_id=%d source=%s " "deleted=%d inserted=%d", member_id, site_id, source, deleted, len(clues), ) except Exception: conn.rollback() raise finally: conn.close() # ── 事件处理器 ─────────────────────────────────────── async def _handle_consumption(self, event: TriggerEvent) -> None: """消费事件链:App3→App8→App7(无助教)/ +App4→App5(有助教)。 链路: 1. App3 (clue) — 生成会员线索分析 2. App8 (consolidate) — 整合线索写入 member_retention_clue 3. App7 (customer) — 客户分析 有助教时额外执行: 4. App4 (analysis) — 助教关系分析 5. App5 (tactics) — 助教话术参考 """ site_id = event.site_id member_id = event.member_id has_assistant = event.payload.get("has_assistant", False) context = {"site_id": site_id, "member_id": member_id} # ── Step 1: App3 线索分析 ── clue_prompt = json.dumps( {"event_type": "consumption", "site_id": site_id, "member_id": member_id, "payload": event.payload}, ensure_ascii=False, ) clue_result = await self._run_step( "app3_clue", self.config.app_id_3_clue, clue_prompt, context, ) if clue_result is None: logger.warning("App3 线索分析失败,使用已有缓存继续: member_id=%s", member_id) # ── Step 2: App8 线索整合 ── consolidate_prompt = json.dumps( {"event_type": "consumption", "site_id": site_id, "member_id": member_id, "clue_result": clue_result}, ensure_ascii=False, ) consolidate_result = await self._run_step( "app8_consolidate", self.config.app_id_8_consolidate, consolidate_prompt, context, ) if consolidate_result is not None: # 写入缓存 try: self.cache_svc.write_cache( cache_type="app8_clue_consolidated", site_id=site_id, target_id=str(member_id), result_json=consolidate_result, triggered_by="consumption", ) except Exception: logger.exception( "App8 缓存写入失败: site_id=%d member_id=%s", site_id, member_id, ) # 幂等写入 member_retention_clue(DELETE+INSERT 事务) if member_id is not None: try: self._write_retention_clue( member_id=member_id, site_id=site_id, consolidate_result=consolidate_result, source="ai_consumption", ) except Exception: logger.exception( "App8 维客线索写入失败: site_id=%d member_id=%s", site_id, member_id, ) else: logger.warning("App8 线索整合失败,使用已有缓存继续: member_id=%s", member_id) # ── Step 3: App7 客户分析 ── customer_prompt = json.dumps( {"event_type": "consumption", "site_id": site_id, "member_id": member_id, "clue_result": clue_result, "consolidate_result": consolidate_result}, ensure_ascii=False, ) customer_result = await self._run_step( "app7_customer", self.config.app_id_7_customer, customer_prompt, context, ) if customer_result is None: logger.warning("App7 客户分析失败: member_id=%s", member_id) # ── 有助教时:App4 → App5 ── if has_assistant: # Step 4: App4 助教关系分析 analysis_prompt = json.dumps( {"event_type": "consumption", "site_id": site_id, "member_id": member_id, "payload": event.payload}, ensure_ascii=False, ) analysis_result = await self._run_step( "app4_analysis", self.config.app_id_4_analysis, analysis_prompt, context, ) if analysis_result is None: logger.warning("App4 助教分析失败,使用已有缓存继续: member_id=%s", member_id) # Step 5: App5 助教话术参考 tactics_prompt = json.dumps( {"event_type": "consumption", "site_id": site_id, "member_id": member_id, "analysis_result": analysis_result}, ensure_ascii=False, ) tactics_result = await self._run_step( "app5_tactics", self.config.app_id_5_tactics, tactics_prompt, context, ) if tactics_result is None: logger.warning("App5 话术参考失败: member_id=%s", member_id) async def _handle_note(self, event: TriggerEvent) -> None: """备注事件链:App6→App8。 链路: 1. App6 (note) — 备注分析 2. App8 (consolidate) — 整合线索写入 member_retention_clue """ site_id = event.site_id member_id = event.member_id context = {"site_id": site_id, "member_id": member_id} # ── Step 1: App6 备注分析 ── note_prompt = json.dumps( {"event_type": "note_created", "site_id": site_id, "member_id": member_id, "payload": event.payload}, ensure_ascii=False, ) note_result = await self._run_step( "app6_note", self.config.app_id_6_note, note_prompt, context, ) if note_result is None: logger.warning("App6 备注分析失败,使用已有缓存继续: member_id=%s", member_id) # ── Step 2: App8 线索整合 ── consolidate_prompt = json.dumps( {"event_type": "note_created", "site_id": site_id, "member_id": member_id, "note_result": note_result}, ensure_ascii=False, ) consolidate_result = await self._run_step( "app8_consolidate", self.config.app_id_8_consolidate, consolidate_prompt, context, ) if consolidate_result is not None: try: self.cache_svc.write_cache( cache_type="app8_clue_consolidated", site_id=site_id, target_id=str(member_id), result_json=consolidate_result, triggered_by="note_created", ) except Exception: logger.exception( "App8 缓存写入失败: site_id=%d member_id=%s", site_id, member_id, ) # 幂等写入 member_retention_clue(DELETE+INSERT 事务) if member_id is not None: try: self._write_retention_clue( member_id=member_id, site_id=site_id, consolidate_result=consolidate_result, source="ai_note", ) except Exception: logger.exception( "App8 维客线索写入失败: site_id=%d member_id=%s", site_id, member_id, ) else: logger.warning("App8 线索整合失败: member_id=%s", member_id) async def _handle_task_assigned(self, event: TriggerEvent) -> None: """任务分配事件链:App4→App5。 链路: 1. App4 (analysis) — 助教关系分析 2. App5 (tactics) — 助教话术参考 """ site_id = event.site_id member_id = event.member_id context = {"site_id": site_id, "member_id": member_id} # ── Step 1: App4 助教关系分析 ── analysis_prompt = json.dumps( {"event_type": "task_assigned", "site_id": site_id, "member_id": member_id, "payload": event.payload}, ensure_ascii=False, ) analysis_result = await self._run_step( "app4_analysis", self.config.app_id_4_analysis, analysis_prompt, context, ) if analysis_result is None: logger.warning("App4 助教分析失败,使用已有缓存继续: member_id=%s", member_id) # ── Step 2: App5 助教话术参考 ── tactics_prompt = json.dumps( {"event_type": "task_assigned", "site_id": site_id, "member_id": member_id, "analysis_result": analysis_result}, ensure_ascii=False, ) tactics_result = await self._run_step( "app5_tactics", self.config.app_id_5_tactics, tactics_prompt, context, ) if tactics_result is None: logger.warning("App5 话术参考失败: member_id=%s", member_id) async def _handle_dws_completed(self, event: TriggerEvent) -> None: """DWS 完成事件:App2 预生成(8 个时间维度)。 为当前门店生成 8 个时间维度的财务洞察, 每个维度独立调用 App2,结果写入 ai_cache。 """ site_id = event.site_id member_id = event.member_id context = {"site_id": site_id, "member_id": member_id} time_dimensions = [ "today", "yesterday", "this_week", "last_week", "this_month", "last_month", "this_quarter", "this_year", ] for dimension in time_dimensions: prompt = json.dumps( {"event_type": "dws_completed", "site_id": site_id, "time_dimension": dimension, "payload": event.payload}, ensure_ascii=False, ) result = await self._run_step( "app2_finance", self.config.app_id_2_finance, prompt, context, ) if result is not None: try: self.cache_svc.write_cache( cache_type="app2_finance", site_id=site_id, target_id=dimension, result_json=result, triggered_by="dws_completed", ) except Exception: logger.exception( "App2 缓存写入失败: site_id=%d dimension=%s", site_id, dimension, ) else: logger.warning( "App2 预生成失败: site_id=%d dimension=%s", site_id, dimension, ) # ── 事件处理器注册(向后兼容) ──────────────────────────── def _create_ai_event_handlers(dispatcher: AIDispatcher) -> dict[str, Callable]: """创建 AI 事件处理器,用于注册到 trigger_scheduler。 每个处理器从 payload 提取参数,构造 TriggerEvent, 通过 asyncio.create_task 后台执行,不阻塞调用方。 Returns: {event_job_type: handler_func} 映射 """ async def handle_consumption_settled(payload: dict | None = None, **_kw: Any) -> None: """消费结算事件处理器(async 入口)。""" if not payload: logger.warning("consumption_settled 事件缺少 payload") return event = TriggerEvent( event_type=EVENT_CONSUMPTION, site_id=payload["site_id"], member_id=payload.get("member_id"), payload=payload, ) await dispatcher.handle_trigger(event) async def handle_note_created(payload: dict | None = None, **_kw: Any) -> None: """备注创建事件处理器。""" if not payload: logger.warning("note_created 事件缺少 payload") return event = TriggerEvent( event_type=EVENT_NOTE_CREATED, site_id=payload["site_id"], member_id=payload.get("member_id"), payload=payload, ) await dispatcher.handle_trigger(event) async def handle_task_assigned(payload: dict | None = None, **_kw: Any) -> None: """任务分配事件处理器。""" if not payload: logger.warning("task_assigned 事件缺少 payload") return event = TriggerEvent( event_type=EVENT_TASK_ASSIGNED, site_id=payload["site_id"], member_id=payload.get("member_id"), payload=payload, ) await dispatcher.handle_trigger(event) async def handle_dws_completed(payload: dict | None = None, **_kw: Any) -> None: """DWS 完成事件处理器。""" if not payload: logger.warning("dws_completed 事件缺少 payload") return event = TriggerEvent( event_type=EVENT_DWS_COMPLETED, site_id=payload["site_id"], member_id=payload.get("member_id"), payload=payload, ) await dispatcher.handle_trigger(event) return { "ai_consumption_settled": handle_consumption_settled, "ai_note_created": handle_note_created, "ai_task_assigned": handle_task_assigned, "ai_dws_completed": handle_dws_completed, } def register_ai_handlers(dispatcher: AIDispatcher) -> None: """将 AI 事件处理器注册到 trigger_scheduler。 在 FastAPI lifespan 中调用,将 AI 事件处理器 注册为 trigger_scheduler 的 job handler。 """ from app.services.trigger_scheduler import register_job handlers = _create_ai_event_handlers(dispatcher) for job_type, handler in handlers.items(): register_job(job_type, handler) logger.info("已注册 AI 事件处理器: %s", job_type)