From 8638ecad2a61f1e2f98ce572b1f0048512237386 Mon Sep 17 00:00:00 2001 From: Neo Date: Wed, 22 Apr 2026 21:55:26 +0800 Subject: [PATCH] =?UTF-8?q?feat(backend):=20=E6=96=B0=E5=A2=9E=20app2a=20?= =?UTF-8?q?=E5=8C=BA=E5=9F=9F=E8=B4=A2=E5=8A=A1=E6=B4=9E=E5=AF=9F=20APP=20?= =?UTF-8?q?=E6=B4=BE=E7=94=9F=20=C2=B7=20dispatcher=2072=20=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=E6=8B=86=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. apps/backend/app/ai/prompts/app2a_finance_area_prompt.py (新建): - payload: 业态说明 + 区域占比 + 对比口径 + 核心 KPI + 优惠构成 + 助教成本 + 区域级单位经济 + 按星期聚合 + 日粒度异常 + 行业基线 - 5 个区域级辅助函数:_fetch_area_daily_series / _build_area_unit_economics / _aggregate_by_weekday_area / _detect_anomaly_days_area / _fetch_area_share - AREA_INDUSTRY_TRAITS 字典(7 业态 trait + peer 描述) - 复用 app2_finance_prompt 的 _build_coach_kpi / _build_discount_kpi 等公共函数 2. config.py: AIConfig 增加 app_id_2a_finance_area + DASHSCOPE_APP_ID_2A_FINANCE_AREA 3. schemas.py: CacheTypeEnum 增加 APP2A_FINANCE_AREA 4. dispatcher.py: - APP2A_AREA_OPTIONS 常量(8 业态 · area != 'all') - _handle_dws_completed 72 循环拆分: area='all' 走 app2_finance · 其他 8 业态走 app2a_finance_area - run_single_app 新增 elif 'app2a_finance_area' 分支(拒绝 area='all') 5. admin_ai.py: _SUPPORTED_APP_TYPES 加 'app2a_finance_area' 6. prompts/__init__.py: 导出 build_app2a_area_prompt 7. .env: 追加 DASHSCOPE_APP_ID_2A_FINANCE_AREA 百炼 APP ID 实测:7 项集成单测全通过(config/cache_type/router/prompts/dispatcher 常量/ 4 业态 prompt 构建/拒绝 area=all)· 端到端实调 vip 组合返回 12 条高质量洞察 严格遵守 v1.2 system prompt 全部 7 项硬约束(H1-H7)。 Co-Authored-By: Claude Opus 4.7 (1M context) --- .env | 2 + apps/backend/app/ai/config.py | 24 +- apps/backend/app/ai/dispatcher.py | 877 +++++++++++++----- apps/backend/app/ai/prompts/__init__.py | 29 +- .../ai/prompts/app2a_finance_area_prompt.py | 498 ++++++++++ apps/backend/app/ai/schemas.py | 1 + apps/backend/app/routers/admin_ai.py | 205 +++- 7 files changed, 1399 insertions(+), 237 deletions(-) create mode 100644 apps/backend/app/ai/prompts/app2a_finance_area_prompt.py diff --git a/.env b/.env index adede62..f4c9e3b 100644 --- a/.env +++ b/.env @@ -127,6 +127,8 @@ DASHSCOPE_APP_ID_5_TACTICS=46f54e6053df4bb0b83be29366025cf6 DASHSCOPE_APP_ID_6_NOTE=025bb344146b4e4e8be30c444adab3b4 DASHSCOPE_APP_ID_7_CUSTOMER=df35e06991b24d49971c03c6428a9c87 DASHSCOPE_APP_ID_8_CONSOLIDATE=407dfb89283b4196934eec5fefe3ebc2 +# 应用 2a:区域财务洞察(64 组合 · area != 'all' · 板块 C/E 重分工 · 新增 H7 业态特征硬约束) +DASHSCOPE_APP_ID_2A_FINANCE_AREA=0ae965029bc54706bcff44f511ac716b # 应用 9:Session 日志摘要生成(Kiro agent_on_stop + batch_generate_summaries 使用) DASHSCOPE_APP_ID_SUMMARY=e0cf8913b1ee4a4eb9464cc1ee0bf300 diff --git a/apps/backend/app/ai/config.py b/apps/backend/app/ai/config.py index 10c8b24..d9b55a8 100644 --- a/apps/backend/app/ai/config.py +++ b/apps/backend/app/ai/config.py @@ -14,17 +14,18 @@ from dataclasses import dataclass class AIConfig: """AI 模块配置,从环境变量加载。不可变(frozen)。""" - api_key: str # DASHSCOPE_API_KEY - workspace_id: str | None # DASHSCOPE_WORKSPACE_ID(可选) - app_id_1_chat: str # DASHSCOPE_APP_ID_1_CHAT - app_id_2_finance: str # DASHSCOPE_APP_ID_2_FINANCE - app_id_3_clue: str # DASHSCOPE_APP_ID_3_CLUE - app_id_4_analysis: str # DASHSCOPE_APP_ID_4_ANALYSIS - app_id_5_tactics: str # DASHSCOPE_APP_ID_5_TACTICS - app_id_6_note: str # DASHSCOPE_APP_ID_6_NOTE - app_id_7_customer: str # DASHSCOPE_APP_ID_7_CUSTOMER - app_id_8_consolidate: str # DASHSCOPE_APP_ID_8_CONSOLIDATE - internal_api_token: str # INTERNAL_API_TOKEN + api_key: str # DASHSCOPE_API_KEY + workspace_id: str | None # DASHSCOPE_WORKSPACE_ID(可选) + app_id_1_chat: str # DASHSCOPE_APP_ID_1_CHAT + app_id_2_finance: str # DASHSCOPE_APP_ID_2_FINANCE + app_id_2a_finance_area: str # DASHSCOPE_APP_ID_2A_FINANCE_AREA(2026-04-23 新增,区域财务洞察) + app_id_3_clue: str # DASHSCOPE_APP_ID_3_CLUE + app_id_4_analysis: str # DASHSCOPE_APP_ID_4_ANALYSIS + app_id_5_tactics: str # DASHSCOPE_APP_ID_5_TACTICS + app_id_6_note: str # DASHSCOPE_APP_ID_6_NOTE + app_id_7_customer: str # DASHSCOPE_APP_ID_7_CUSTOMER + app_id_8_consolidate: str # DASHSCOPE_APP_ID_8_CONSOLIDATE + internal_api_token: str # INTERNAL_API_TOKEN @classmethod def from_env(cls) -> AIConfig: @@ -37,6 +38,7 @@ class AIConfig: "DASHSCOPE_API_KEY": "api_key", "DASHSCOPE_APP_ID_1_CHAT": "app_id_1_chat", "DASHSCOPE_APP_ID_2_FINANCE": "app_id_2_finance", + "DASHSCOPE_APP_ID_2A_FINANCE_AREA": "app_id_2a_finance_area", "DASHSCOPE_APP_ID_3_CLUE": "app_id_3_clue", "DASHSCOPE_APP_ID_4_ANALYSIS": "app_id_4_analysis", "DASHSCOPE_APP_ID_5_TACTICS": "app_id_5_tactics", diff --git a/apps/backend/app/ai/dispatcher.py b/apps/backend/app/ai/dispatcher.py index 647a562..cba9905 100644 --- a/apps/backend/app/ai/dispatcher.py +++ b/apps/backend/app/ai/dispatcher.py @@ -9,13 +9,14 @@ - 超时通过 asyncio.wait_for() 控制 - 每步调用前依次检查:熔断→限流→预算 - 调用链某步失败不中断后续步骤 +- prompt 拼装委托给 app.ai.prompts 模块(含真实业务数据) 调用链: - 消费事件(无助教):App3 → App8 → App7 - 消费事件(有助教):App3 → App8 → App7 + App4 → App5 - 备注事件:App6 → App8 - 任务分配事件:App4 → App5 -- DWS 完成事件:App2 预生成 +- DWS 完成事件:App2 预生成(8 个时间维度) """ from __future__ import annotations @@ -34,19 +35,80 @@ from app.ai.circuit_breaker import CircuitBreaker, CircuitState from app.ai.config import AIConfig from app.ai.conversation_service import ConversationService from app.ai.dashscope_client import DashScopeClient -from app.ai.exceptions import ( - BudgetExceededError, - CircuitOpenError, - DashScopeError, - RateLimitExceededError, +from app.ai.prompts import ( + build_app2_prompt, + build_app2a_area_prompt, + build_app3_prompt, + build_app4_prompt, + build_app5_prompt, + build_app6_prompt, + build_app7_prompt, + build_app8_prompt, ) +from app.ai.event_bus import AIEvent, get_event_bus from app.ai.rate_limiter import RateLimiter +from app.ai.references import attach_references from app.ai.run_log_service import AIRunLogService +from app.ai.schemas import CacheTypeEnum logger = logging.getLogger(__name__) # 单步调用默认超时(秒) -_STEP_TIMEOUT = 120 +# 2026-04-21:App2 prompt 中文 key 膨胀后 AI 响应延迟常突破 120s,上调至 180s +_STEP_TIMEOUT = 180 + +# App2 预热时间维度(与 prompts/app2_finance_prompt.DIMENSION_MAP 对齐) +APP2_TIME_DIMENSIONS = ( + "this_month", "last_month", "this_week", "last_week", + "this_quarter", "last_quarter", "last_3_months", "last_6_months", +) + +# App2 预热区域维度(与 prompts/app2_finance_prompt.AREA_OPTIONS 对齐) +APP2_AREA_OPTIONS = ( + "all", "hall", "hallA", "hallB", "hallC", + "vip", "snooker", "mahjong", "ktv", +) + +# 2026-04-23 · 72 组合拆分为 2 个 APP: +# - app2_finance 处理 area='all' 的 8 组合 +# - app2a_finance_area 处理 area != 'all' 的 64 组合 +APP2A_AREA_OPTIONS = tuple(a for a in APP2_AREA_OPTIONS if a != "all") + + +def _app2_target_id(time_dimension: str, area: str) -> str: + """App2 缓存 target_id:time_dimension__area(双下划线分隔,避免歧义)。 + + 例:this_month__all / last_month__hallA。 + 前端 board-finance.ts _loadAIInsights 需用相同拼装规则读取缓存。 + """ + return f"{time_dimension}__{area}" + + +def _publish_alert( + site_id: int, + app_name: str, + alert_type: str, + message: str, + log_id: int | None = None, +) -> None: + """Phase 3.3:推送 alert_created 事件给 WS 订阅者(/ws/ai-alerts/{site_id})。 + + alert_type 取值:circuit_open / rate_limited / budget_exceeded / timeout / failed + 异常仅记日志,不影响主流程。 + """ + try: + get_event_bus().publish(AIEvent( + type="alert_created", + site_id=site_id, + payload={ + "app_type": app_name, + "alert_type": alert_type, + "message": message, + "log_id": log_id, + }, + )) + except Exception: + logger.debug("alert_created 事件广播失败", exc_info=True) def _update_trigger_job_status( @@ -203,8 +265,17 @@ class AIDispatcher: # 标记开始执行 _update_trigger_job_status(job_id, "running", set_started=True) + # 外层超时按事件类型给预算: + # - 消费/备注/任务分配:最多 4-5 个 App 串行,10 min 足够 + # - DWS 完成:72 组合(8 时间 × 9 区域),每组合 ~30-120s,需 ~2.5h 预算 + chain_timeout = ( + _STEP_TIMEOUT * len(APP2_TIME_DIMENSIONS) * len(APP2_AREA_OPTIONS) + 600 + if event.event_type == EVENT_DWS_COMPLETED + else _STEP_TIMEOUT * 5 + ) + try: - await asyncio.wait_for(handler(event), timeout=_STEP_TIMEOUT * 5) + await asyncio.wait_for(handler(event), timeout=chain_timeout) logger.info("调用链完成: job_id=%d event_type=%s", job_id, event.event_type) _update_trigger_job_status(job_id, "completed", set_finished=True) except asyncio.TimeoutError: @@ -250,7 +321,7 @@ class AIDispatcher: Args: app_name: 应用名称(如 "app3_clue"),用于日志 app_id: 百炼应用 ID - prompt: 发送给应用的 prompt + prompt: 发送给应用的 prompt 字符串 context: 上下文信息(含 site_id、member_id 等) Returns: @@ -276,6 +347,7 @@ class AIDispatcher: self.run_log_svc.update_failed(log_id, "circuit_open", elapsed_ms) except Exception: logger.exception("记录 circuit_open 日志失败: app=%s", app_name) + _publish_alert(site_id, app_name, "circuit_open", "熔断器已打开") return None # ── 2. 限流检查 ── @@ -293,6 +365,7 @@ class AIDispatcher: self.run_log_svc.update_failed(log_id, "rate_limited", elapsed_ms) except Exception: logger.exception("记录 rate_limited 日志失败: app=%s", app_name) + _publish_alert(site_id, app_name, "rate_limited", "门店限流超限") return None # ── 3. 预算检查 ── @@ -315,6 +388,10 @@ class AIDispatcher: ) except Exception: logger.exception("记录 budget_exceeded 日志失败: app=%s", app_name) + _publish_alert( + site_id, app_name, "budget_exceeded", + f"Token 预算超限: {budget_status.reason}", + ) return None # ── 4. 创建日志记录(pending → running) ── @@ -353,6 +430,9 @@ class AIDispatcher: self.circuit_breaker.record_success(app_id) logger.info("调用成功: app=%s tokens=%d latency=%dms", app_name, tokens_used, elapsed_ms) + + # Phase 1.3:按 app_name + context 注入 _references 元数据(非破坏性) + result = attach_references(app_name, result, context) or result return result except asyncio.TimeoutError: @@ -364,6 +444,7 @@ class AIDispatcher: self.run_log_svc.update_timeout(log_id, elapsed_ms) except Exception: logger.exception("更新 timeout 日志失败: app=%s", app_name) + _publish_alert(site_id, app_name, "timeout", f"调用超时 ({elapsed_ms}ms)", log_id) return None except Exception as exc: @@ -378,8 +459,57 @@ class AIDispatcher: logger.exception("更新 failed 日志失败: app=%s", app_name) self.circuit_breaker.record_failure(app_id) + _publish_alert(site_id, app_name, "failed", error_msg[:200], log_id) return None + # ── 缓存写入辅助 ───────────────────────────────────── + + def _write_cache( + self, + cache_type: str, + site_id: int, + target_id: str, + result: dict | None, + triggered_by: str, + score: int | None = None, + ) -> None: + """统一缓存写入入口,结果为 None 时跳过。 + + Phase 1.3:result['_references'] 已由 _run_step 成功返回前注入。 + """ + if result is None: + return + try: + self.cache_svc.write_cache( + cache_type=cache_type, + site_id=site_id, + target_id=target_id, + result_json=result, + triggered_by=triggered_by, + score=score, + ) + except Exception: + logger.exception( + "缓存写入失败: cache_type=%s site_id=%s target_id=%s", + cache_type, site_id, target_id, + ) + return + + # Phase 1.4:广播 cache_updated 事件,admin-web / 小程序 WS 订阅可实时刷新 + try: + from app.ai.event_bus import AIEvent, get_event_bus + get_event_bus().publish(AIEvent( + type="cache_updated", + site_id=site_id, + payload={ + "cache_type": cache_type, + "target_id": target_id, + "triggered_by": triggered_by, + }, + )) + except Exception: + logger.debug("cache_updated 事件广播失败(不影响主流程)", exc_info=True) + # ── App8 幂等写入 member_retention_clue ───────────── def _write_retention_clue( @@ -389,16 +519,17 @@ class AIDispatcher: consolidate_result: dict, source: str, ) -> None: - """幂等写入 member_retention_clue:事务内 DELETE 同源旧记录 + INSERT 新记录。 + """全量替换 member_retention_clue:DELETE 同源旧记录 + INSERT 新记录(事务)。 同一 member 同一 site 同一 source 当天只保留最新一批。 + 人工线索(source='manual')不受影响。 事务失败自动回滚。 - Args: - member_id: 会员 ID - site_id: 门店 ID - consolidate_result: App8 返回的整合结果,含 clues 列表 - source: 线索来源(ai_consumption / ai_note) + 字段映射: + - category → category + - emoji + " " + summary → summary(如 "📅 偏好周末下午时段消费") + - detail → detail + - providers → recorded_by_name """ from app.database import get_connection @@ -412,40 +543,41 @@ class AIDispatcher: conn = get_connection() try: with conn.cursor() as cur: - # DELETE 该会员该来源当天的旧记录 cur.execute( """ DELETE FROM member_retention_clue - WHERE member_id = %s - AND site_id = %s - AND source = %s - AND recorded_at::date = CURRENT_DATE + WHERE member_id = %s AND site_id = %s AND source = %s """, (member_id, site_id, source), ) deleted = cur.rowcount - # INSERT 新记录 for clue in clues: + emoji = clue.get("emoji", "") + raw_summary = clue.get("summary", "") + summary = f"{emoji} {raw_summary}" if emoji else raw_summary + cur.execute( """ INSERT INTO member_retention_clue - (member_id, category, summary, detail, site_id, source) - VALUES (%s, %s, %s, %s, %s, %s) + (member_id, category, summary, detail, site_id, + source, recorded_by_name, recorded_by_assistant_id) + VALUES (%s, %s, %s, %s, %s, %s, %s, NULL) """, ( member_id, clue.get("category", "客户基础"), - clue.get("summary", ""), - clue.get("detail"), + summary, + clue.get("detail", ""), site_id, source, + clue.get("providers", ""), ), ) conn.commit() logger.info( - "维客线索幂等写入完成: member_id=%d site_id=%d source=%s " + "维客线索全量替换完成: member_id=%d site_id=%d source=%s " "deleted=%d inserted=%d", member_id, site_id, source, deleted, len(clues), ) @@ -455,265 +587,527 @@ class AIDispatcher: finally: conn.close() + # ── App8 输入辅助:提取最近 App3/App6 线索 ───────── + + def _fetch_recent_clues( + self, + cache_type: str, + site_id: int, + member_id: int, + ) -> tuple[list[dict], str | None]: + """从 ai_cache 提取最近一次结果的 clues 数组与生成时间。""" + try: + latest = self.cache_svc.get_latest( + cache_type, site_id, str(member_id), + ) + except Exception: + logger.exception( + "查询缓存失败: cache_type=%s site_id=%s member_id=%s", + cache_type, site_id, member_id, + ) + return [], None + + if not latest: + return [], None + + result_json = latest.get("result_json") or {} + if isinstance(result_json, str): + try: + result_json = json.loads(result_json) + except Exception: + result_json = {} + clues = result_json.get("clues", []) if isinstance(result_json, dict) else [] + return clues, latest.get("created_at") + # ── 事件处理器 ─────────────────────────────────────── async def _handle_consumption(self, event: TriggerEvent) -> None: - """消费事件链:App3→App8→App7(无助教)/ +App4→App5(有助教)。 - - 链路: - 1. App3 (clue) — 生成会员线索分析 - 2. App8 (consolidate) — 整合线索写入 member_retention_clue - 3. App7 (customer) — 客户分析 - 有助教时额外执行: - 4. App4 (analysis) — 助教关系分析 - 5. App5 (tactics) — 助教话术参考 - """ + """消费事件链:App3 → App8 → App7(无助教)/ + App4 → App5(有助教)。""" site_id = event.site_id member_id = event.member_id + if member_id is None: + logger.error("消费事件缺少 member_id: site_id=%d", site_id) + return + has_assistant = event.payload.get("has_assistant", False) + assistant_id = event.payload.get("assistant_id") if has_assistant else None context = {"site_id": site_id, "member_id": member_id} - # ── Step 1: App3 线索分析 ── - clue_prompt = json.dumps( - {"event_type": "consumption", "site_id": site_id, - "member_id": member_id, "payload": event.payload}, - ensure_ascii=False, - ) - clue_result = await self._run_step( - "app3_clue", self.config.app_id_3_clue, clue_prompt, context, - ) - if clue_result is None: - logger.warning("App3 线索分析失败,使用已有缓存继续: member_id=%s", member_id) + # ── Step 1: App3 客户线索分析 ── + try: + app3_prompt = await build_app3_prompt( + {"site_id": site_id, "member_id": member_id}, + self.cache_svc, + ) + except Exception: + logger.exception("App3 prompt 拼装失败: member_id=%s", member_id) + app3_prompt = "" + + app3_result = None + if app3_prompt: + app3_result = await self._run_step( + "app3_clue", self.config.app_id_3_clue, app3_prompt, context, + ) + self._write_cache( + CacheTypeEnum.APP3_CLUE.value, site_id, str(member_id), + app3_result, "consumption", + ) # ── Step 2: App8 线索整合 ── - consolidate_prompt = json.dumps( - {"event_type": "consumption", "site_id": site_id, - "member_id": member_id, "clue_result": clue_result}, - ensure_ascii=False, + app3_clues = (app3_result or {}).get("clues", []) if isinstance(app3_result, dict) else [] + app6_clues, app6_generated_at = self._fetch_recent_clues( + CacheTypeEnum.APP6_NOTE_ANALYSIS.value, site_id, member_id, ) - consolidate_result = await self._run_step( - "app8_consolidate", self.config.app_id_8_consolidate, - consolidate_prompt, context, - ) - if consolidate_result is not None: - # 写入缓存 - try: - self.cache_svc.write_cache( - cache_type="app8_clue_consolidated", - site_id=site_id, - target_id=str(member_id), - result_json=consolidate_result, - triggered_by="consumption", - ) - except Exception: - logger.exception( - "App8 缓存写入失败: site_id=%d member_id=%s", site_id, member_id, - ) - # 幂等写入 member_retention_clue(DELETE+INSERT 事务) - if member_id is not None: + + try: + app8_prompt = await build_app8_prompt({ + "site_id": site_id, + "member_id": member_id, + "app3_clues": app3_clues, + "app6_clues": app6_clues, + "app3_generated_at": None, # App3 刚生成,无 created_at + "app6_generated_at": app6_generated_at, + }) + except Exception: + logger.exception("App8 prompt 拼装失败: member_id=%s", member_id) + app8_prompt = "" + + app8_result = None + if app8_prompt: + app8_result = await self._run_step( + "app8_consolidate", self.config.app_id_8_consolidate, + app8_prompt, context, + ) + self._write_cache( + CacheTypeEnum.APP8_CLUE_CONSOLIDATED.value, site_id, str(member_id), + app8_result, "consumption", + ) + if app8_result is not None: try: self._write_retention_clue( member_id=member_id, site_id=site_id, - consolidate_result=consolidate_result, + consolidate_result=app8_result, source="ai_consumption", ) except Exception: logger.exception( - "App8 维客线索写入失败: site_id=%d member_id=%s", + "App8 维客线索写入失败: site_id=%d member_id=%d", site_id, member_id, ) - else: - logger.warning("App8 线索整合失败,使用已有缓存继续: member_id=%s", member_id) # ── Step 3: App7 客户分析 ── - customer_prompt = json.dumps( - {"event_type": "consumption", "site_id": site_id, - "member_id": member_id, "clue_result": clue_result, - "consolidate_result": consolidate_result}, - ensure_ascii=False, - ) - customer_result = await self._run_step( - "app7_customer", self.config.app_id_7_customer, - customer_prompt, context, - ) - if customer_result is None: - logger.warning("App7 客户分析失败: member_id=%s", member_id) + try: + app7_prompt = await build_app7_prompt( + {"site_id": site_id, "member_id": member_id}, + self.cache_svc, + ) + except Exception: + logger.exception("App7 prompt 拼装失败: member_id=%s", member_id) + app7_prompt = "" + + if app7_prompt: + app7_result = await self._run_step( + "app7_customer", self.config.app_id_7_customer, + app7_prompt, context, + ) + self._write_cache( + CacheTypeEnum.APP7_CUSTOMER_ANALYSIS.value, site_id, str(member_id), + app7_result, "consumption", + ) # ── 有助教时:App4 → App5 ── - if has_assistant: - # Step 4: App4 助教关系分析 - analysis_prompt = json.dumps( - {"event_type": "consumption", "site_id": site_id, - "member_id": member_id, "payload": event.payload}, - ensure_ascii=False, + if has_assistant and assistant_id is not None: + await self._run_app4_app5( + site_id=site_id, + assistant_id=int(assistant_id), + member_id=member_id, + triggered_by="consumption", ) - analysis_result = await self._run_step( - "app4_analysis", self.config.app_id_4_analysis, - analysis_prompt, context, - ) - if analysis_result is None: - logger.warning("App4 助教分析失败,使用已有缓存继续: member_id=%s", member_id) - - # Step 5: App5 助教话术参考 - tactics_prompt = json.dumps( - {"event_type": "consumption", "site_id": site_id, - "member_id": member_id, "analysis_result": analysis_result}, - ensure_ascii=False, - ) - tactics_result = await self._run_step( - "app5_tactics", self.config.app_id_5_tactics, - tactics_prompt, context, - ) - if tactics_result is None: - logger.warning("App5 话术参考失败: member_id=%s", member_id) async def _handle_note(self, event: TriggerEvent) -> None: - """备注事件链:App6→App8。 - - 链路: - 1. App6 (note) — 备注分析 - 2. App8 (consolidate) — 整合线索写入 member_retention_clue - """ + """备注事件链:App6 → App8。""" site_id = event.site_id member_id = event.member_id + if member_id is None: + logger.error("备注事件缺少 member_id: site_id=%d", site_id) + return + context = {"site_id": site_id, "member_id": member_id} # ── Step 1: App6 备注分析 ── - note_prompt = json.dumps( - {"event_type": "note_created", "site_id": site_id, - "member_id": member_id, "payload": event.payload}, - ensure_ascii=False, - ) - note_result = await self._run_step( - "app6_note", self.config.app_id_6_note, note_prompt, context, - ) - if note_result is None: - logger.warning("App6 备注分析失败,使用已有缓存继续: member_id=%s", member_id) + try: + app6_prompt = await build_app6_prompt({ + "site_id": site_id, + "member_id": member_id, + "note_content": event.payload.get("note_content", ""), + "noted_by_name": event.payload.get("noted_by_name", ""), + "noted_by_created_at": event.payload.get("noted_by_created_at", ""), + }, self.cache_svc) + except Exception: + logger.exception("App6 prompt 拼装失败: member_id=%s", member_id) + app6_prompt = "" + + app6_result = None + if app6_prompt: + app6_result = await self._run_step( + "app6_note", self.config.app_id_6_note, app6_prompt, context, + ) + score = (app6_result or {}).get("score") if isinstance(app6_result, dict) else None + self._write_cache( + CacheTypeEnum.APP6_NOTE_ANALYSIS.value, site_id, str(member_id), + app6_result, "note_created", score=score, + ) # ── Step 2: App8 线索整合 ── - consolidate_prompt = json.dumps( - {"event_type": "note_created", "site_id": site_id, - "member_id": member_id, "note_result": note_result}, - ensure_ascii=False, + app6_clues = (app6_result or {}).get("clues", []) if isinstance(app6_result, dict) else [] + app3_clues, app3_generated_at = self._fetch_recent_clues( + CacheTypeEnum.APP3_CLUE.value, site_id, member_id, ) - consolidate_result = await self._run_step( + + try: + app8_prompt = await build_app8_prompt({ + "site_id": site_id, + "member_id": member_id, + "app3_clues": app3_clues, + "app6_clues": app6_clues, + "app3_generated_at": app3_generated_at, + "app6_generated_at": None, # App6 刚生成 + }) + except Exception: + logger.exception("App8 prompt 拼装失败: member_id=%s", member_id) + return + + app8_result = await self._run_step( "app8_consolidate", self.config.app_id_8_consolidate, - consolidate_prompt, context, + app8_prompt, context, ) - if consolidate_result is not None: + self._write_cache( + CacheTypeEnum.APP8_CLUE_CONSOLIDATED.value, site_id, str(member_id), + app8_result, "note_created", + ) + if app8_result is not None: try: - self.cache_svc.write_cache( - cache_type="app8_clue_consolidated", + self._write_retention_clue( + member_id=member_id, site_id=site_id, - target_id=str(member_id), - result_json=consolidate_result, - triggered_by="note_created", + consolidate_result=app8_result, + source="ai_note", ) except Exception: logger.exception( - "App8 缓存写入失败: site_id=%d member_id=%s", site_id, member_id, + "App8 维客线索写入失败: site_id=%d member_id=%d", + site_id, member_id, ) - # 幂等写入 member_retention_clue(DELETE+INSERT 事务) - if member_id is not None: - try: - self._write_retention_clue( - member_id=member_id, - site_id=site_id, - consolidate_result=consolidate_result, - source="ai_note", - ) - except Exception: - logger.exception( - "App8 维客线索写入失败: site_id=%d member_id=%s", - site_id, member_id, - ) - else: - logger.warning("App8 线索整合失败: member_id=%s", member_id) async def _handle_task_assigned(self, event: TriggerEvent) -> None: - """任务分配事件链:App4→App5。 - - 链路: - 1. App4 (analysis) — 助教关系分析 - 2. App5 (tactics) — 助教话术参考 - """ + """任务分配事件链:App4 → App5。""" site_id = event.site_id member_id = event.member_id - context = {"site_id": site_id, "member_id": member_id} + assistant_id = event.payload.get("assistant_id") + if member_id is None or assistant_id is None: + logger.error( + "任务分配事件缺少 member_id 或 assistant_id: site_id=%d", site_id, + ) + return - # ── Step 1: App4 助教关系分析 ── - analysis_prompt = json.dumps( - {"event_type": "task_assigned", "site_id": site_id, - "member_id": member_id, "payload": event.payload}, - ensure_ascii=False, + await self._run_app4_app5( + site_id=site_id, + assistant_id=int(assistant_id), + member_id=member_id, + triggered_by="task_assigned", ) - analysis_result = await self._run_step( - "app4_analysis", self.config.app_id_4_analysis, - analysis_prompt, context, - ) - if analysis_result is None: - logger.warning("App4 助教分析失败,使用已有缓存继续: member_id=%s", member_id) - # ── Step 2: App5 助教话术参考 ── - tactics_prompt = json.dumps( - {"event_type": "task_assigned", "site_id": site_id, - "member_id": member_id, "analysis_result": analysis_result}, - ensure_ascii=False, - ) - tactics_result = await self._run_step( + async def _run_app4_app5( + self, + *, + site_id: int, + assistant_id: int, + member_id: int, + triggered_by: str, + ) -> None: + """App4 → App5 串行执行(消费链含助教 + 任务分配链复用)。""" + context = { + "site_id": site_id, + "member_id": member_id, + "assistant_id": assistant_id, + } + target_id = f"{assistant_id}_{member_id}" + + # ── Step A: App4 关系分析 ── + try: + app4_prompt = await build_app4_prompt({ + "site_id": site_id, + "assistant_id": assistant_id, + "member_id": member_id, + }, self.cache_svc) + except Exception: + logger.exception( + "App4 prompt 拼装失败: assistant=%s member=%s", + assistant_id, member_id, + ) + app4_prompt = "" + + app4_result = None + if app4_prompt: + app4_result = await self._run_step( + "app4_analysis", self.config.app_id_4_analysis, + app4_prompt, context, + ) + self._write_cache( + CacheTypeEnum.APP4_ANALYSIS.value, site_id, target_id, + app4_result, triggered_by, + ) + + # ── Step B: App5 话术参考(使用 App4 结果) ── + try: + app5_prompt = await build_app5_prompt({ + "site_id": site_id, + "assistant_id": assistant_id, + "member_id": member_id, + "app4_result": app4_result, + }, self.cache_svc) + except Exception: + logger.exception( + "App5 prompt 拼装失败: assistant=%s member=%s", + assistant_id, member_id, + ) + return + + app5_result = await self._run_step( "app5_tactics", self.config.app_id_5_tactics, - tactics_prompt, context, + app5_prompt, context, + ) + self._write_cache( + CacheTypeEnum.APP5_TACTICS.value, site_id, target_id, + app5_result, triggered_by, ) - if tactics_result is None: - logger.warning("App5 话术参考失败: member_id=%s", member_id) async def _handle_dws_completed(self, event: TriggerEvent) -> None: - """DWS 完成事件:App2 预生成(8 个时间维度)。 + """DWS 完成事件:App2 + App2a 联合预生成(8 时间维度 × 9 区域 = 72 组合)。 - 为当前门店生成 8 个时间维度的财务洞察, - 每个维度独立调用 App2,结果写入 ai_cache。 + 2026-04-23 · 72 组合拆分为两个百炼 APP: + - app2_finance(全域版)处理 area='all' 的 8 组合 + - app2a_finance_area(区域派生版)处理 area != 'all' 的 64 组合 + + 每个组合独立调用百炼,结果写入 ai_cache,target_id=time_dimension__area。 + 单步失败不影响后续组合。熔断/限流/预算检查在 _run_step 内部生效, + 达到阈值后会自动跳过剩余调用。 """ site_id = event.site_id - member_id = event.member_id - context = {"site_id": site_id, "member_id": member_id} + context = {"site_id": site_id, "member_id": None} - time_dimensions = [ - "today", "yesterday", "this_week", "last_week", - "this_month", "last_month", "this_quarter", "this_year", - ] + total = len(APP2_TIME_DIMENSIONS) * len(APP2_AREA_OPTIONS) + ok = 0 + for dimension in APP2_TIME_DIMENSIONS: + # 1. 全域组合 · app2_finance + try: + prompt = await build_app2_prompt({ + "site_id": site_id, + "time_dimension": dimension, + "area": "all", + }) + except Exception: + logger.exception( + "App2 prompt 拼装失败: site_id=%d dimension=%s area=all", + site_id, dimension, + ) + else: + result = await self._run_step( + "app2_finance", self.config.app_id_2_finance, prompt, context, + ) + self._write_cache( + CacheTypeEnum.APP2_FINANCE.value, + site_id, + _app2_target_id(dimension, "all"), + result, + "dws_completed", + ) + if result is not None: + ok += 1 - for dimension in time_dimensions: - prompt = json.dumps( - {"event_type": "dws_completed", "site_id": site_id, - "time_dimension": dimension, "payload": event.payload}, - ensure_ascii=False, - ) - result = await self._run_step( - "app2_finance", self.config.app_id_2_finance, prompt, context, - ) - if result is not None: + # 2. 区域组合 · app2a_finance_area(64 组合 = 8 时间 × 8 区域) + for area in APP2A_AREA_OPTIONS: try: - self.cache_svc.write_cache( - cache_type="app2_finance", - site_id=site_id, - target_id=dimension, - result_json=result, - triggered_by="dws_completed", - ) + prompt = await build_app2a_area_prompt({ + "site_id": site_id, + "time_dimension": dimension, + "area": area, + }) except Exception: logger.exception( - "App2 缓存写入失败: site_id=%d dimension=%s", - site_id, dimension, + "app2a prompt 拼装失败: site_id=%d dimension=%s area=%s", + site_id, dimension, area, ) - else: - logger.warning( - "App2 预生成失败: site_id=%d dimension=%s", site_id, dimension, + continue + + result = await self._run_step( + "app2a_finance_area", self.config.app_id_2a_finance_area, prompt, context, ) + self._write_cache( + CacheTypeEnum.APP2A_FINANCE_AREA.value, + site_id, + _app2_target_id(dimension, area), + result, + "dws_completed", + ) + if result is not None: + ok += 1 + + logger.info( + "App2 + App2a 预热完成: site_id=%d 成功=%d/%d", + site_id, ok, total, + ) + + # ── 按需单 App 执行(admin-web 重新生成按钮用) ────────────── + + async def run_single_app( + self, + app_type: str, + context: dict, + triggered_by: str = "admin_manual", + ) -> dict | None: + """按需执行单个 App,跳过链路编排。 + + 用于 admin-web "重新生成" 按钮等场景:只跑一个 App 不触发下游链。 + 结果自动写入 ai_cache。 + + Args: + app_type: 应用名称,支持 app2_finance ~ app8_consolidation + context: 上下文参数。各 App 所需字段: + - app2_finance: site_id + time_dimension + - app3_clue: site_id + member_id + - app4_analysis: site_id + member_id + assistant_id + - app5_tactics: site_id + member_id + assistant_id + app4_result? + - app6_note: site_id + member_id + note_content + noted_by_name + noted_by_created_at? + - app7_customer: site_id + member_id + - app8_consolidation: site_id + member_id(从 cache 自动拼 app3/app6 clues) + triggered_by: 触发来源(日志用),默认 admin_manual + + Returns: + 百炼返回的结构化 JSON;失败返回 None + """ + site_id = context.get("site_id") + if site_id is None: + raise ValueError("context 缺少 site_id") + + # 分派:app_type → (prompt builder, app_id, cache_type, target_id) + if app_type == "app2_finance": + dimension = context.get("time_dimension") + if not dimension: + raise ValueError("app2_finance 需要 time_dimension") + area = context.get("area", "all") + context = {**context, "area": area} + prompt = await build_app2_prompt(context) + app_id = self.config.app_id_2_finance + cache_type = CacheTypeEnum.APP2_FINANCE.value + target_id = _app2_target_id(str(dimension), str(area)) + + elif app_type == "app2a_finance_area": + dimension = context.get("time_dimension") + area = context.get("area") + if not dimension: + raise ValueError("app2a_finance_area 需要 time_dimension") + if not area or area == "all": + raise ValueError("app2a_finance_area 需要 area != 'all'(area='all' 请走 app2_finance)") + prompt = await build_app2a_area_prompt(context) + app_id = self.config.app_id_2a_finance_area + cache_type = CacheTypeEnum.APP2A_FINANCE_AREA.value + target_id = _app2_target_id(str(dimension), str(area)) + + elif app_type == "app3_clue": + if context.get("member_id") is None: + raise ValueError("app3_clue 需要 member_id") + prompt = await build_app3_prompt(context, self.cache_svc) + app_id = self.config.app_id_3_clue + cache_type = CacheTypeEnum.APP3_CLUE.value + target_id = str(context["member_id"]) + + elif app_type == "app4_analysis": + if context.get("member_id") is None or context.get("assistant_id") is None: + raise ValueError("app4_analysis 需要 member_id + assistant_id") + prompt = await build_app4_prompt(context, self.cache_svc) + app_id = self.config.app_id_4_analysis + cache_type = CacheTypeEnum.APP4_ANALYSIS.value + target_id = f"{context['assistant_id']}_{context['member_id']}" + + elif app_type == "app5_tactics": + if context.get("member_id") is None or context.get("assistant_id") is None: + raise ValueError("app5_tactics 需要 member_id + assistant_id") + prompt = await build_app5_prompt(context, self.cache_svc) + app_id = self.config.app_id_5_tactics + cache_type = CacheTypeEnum.APP5_TACTICS.value + target_id = f"{context['assistant_id']}_{context['member_id']}" + + elif app_type == "app6_note": + if context.get("member_id") is None: + raise ValueError("app6_note 需要 member_id") + prompt = await build_app6_prompt(context, self.cache_svc) + app_id = self.config.app_id_6_note + cache_type = CacheTypeEnum.APP6_NOTE_ANALYSIS.value + target_id = str(context["member_id"]) + + elif app_type == "app7_customer": + if context.get("member_id") is None: + raise ValueError("app7_customer 需要 member_id") + prompt = await build_app7_prompt(context, self.cache_svc) + app_id = self.config.app_id_7_customer + cache_type = CacheTypeEnum.APP7_CUSTOMER_ANALYSIS.value + target_id = str(context["member_id"]) + + elif app_type == "app8_consolidation": + if context.get("member_id") is None: + raise ValueError("app8_consolidation 需要 member_id") + # 自动从 cache 拼 app3/app6 clues + member_id = context["member_id"] + app3_clues, app3_at = self._fetch_recent_clues( + CacheTypeEnum.APP3_CLUE.value, site_id, member_id, + ) + app6_clues, app6_at = self._fetch_recent_clues( + CacheTypeEnum.APP6_NOTE_ANALYSIS.value, site_id, member_id, + ) + prompt = await build_app8_prompt({ + "site_id": site_id, + "member_id": member_id, + "app3_clues": app3_clues, + "app6_clues": app6_clues, + "app3_generated_at": app3_at, + "app6_generated_at": app6_at, + }) + app_id = self.config.app_id_8_consolidate + cache_type = CacheTypeEnum.APP8_CLUE_CONSOLIDATED.value + target_id = str(member_id) + else: + raise ValueError(f"不支持的 app_type: {app_type}") + + # 执行 + 写缓存(复用 _run_step 熔断/限流/预算/日志逻辑) + result = await self._run_step(app_type, app_id, prompt, context) + score = None + if app_type == "app6_note" and isinstance(result, dict): + score = result.get("score") + self._write_cache(cache_type, site_id, target_id, result, triggered_by, score=score) + + # App8 额外写入 member_retention_clue(幂等替换) + if app_type == "app8_consolidation" and result is not None: + try: + self._write_retention_clue( + member_id=context["member_id"], + site_id=site_id, + consolidate_result=result, + source="ai_consumption" if context.get("source") != "ai_note" else "ai_note", + ) + except Exception: + logger.exception( + "App8 manual: 维客线索写入失败 site_id=%d member_id=%s", + site_id, context.get("member_id"), + ) + + return result # ── 事件处理器注册(向后兼容) ──────────────────────────── + def _create_ai_event_handlers(dispatcher: AIDispatcher) -> dict[str, Callable]: """创建 AI 事件处理器,用于注册到 trigger_scheduler。 @@ -776,22 +1170,71 @@ def _create_ai_event_handlers(dispatcher: AIDispatcher) -> dict[str, Callable]: ) await dispatcher.handle_trigger(event) + async def handle_app2_prewarm(**_kw: Any) -> None: + """App2 财务洞察 cron 预热处理器(每日 10:00 触发)。 + + 对所有 active 门店发起 ai_dws_completed 事件,触发 App2 预生成: + 8 时间维度 × 9 区域 = 72 组合。 + cron 签名 (conn, job_id),这里不需要,用 **_kw 忽略。 + """ + from app.database import get_connection + from app.services.trigger_scheduler import fire_event + + conn = get_connection() + try: + with conn.cursor() as cur: + cur.execute( + "SELECT DISTINCT site_id FROM biz.sites WHERE site_id IS NOT NULL" + ) + site_ids = [r[0] for r in cur.fetchall()] + conn.commit() + finally: + conn.close() + + for sid in site_ids: + try: + fire_event("ai_dws_completed", {"site_id": sid}) + except Exception: + logger.exception("cron 触发 ai_dws_completed 失败: site_id=%s", sid) + logger.info("App2 cron 预热完成: 已触发 %d 个门店", len(site_ids)) + return { "ai_consumption_settled": handle_consumption_settled, "ai_note_created": handle_note_created, "ai_task_assigned": handle_task_assigned, "ai_dws_completed": handle_dws_completed, + "ai_dws_prewarm": handle_app2_prewarm, } +# ── 模块级 AIDispatcher 单例 ───────────────────────────── + +_dispatcher_instance: AIDispatcher | None = None + + +def get_dispatcher() -> AIDispatcher: + """获取全局 AIDispatcher 实例。 + + 须在 main.py lifespan 调用 register_ai_handlers 后使用。 + """ + if _dispatcher_instance is None: + raise RuntimeError( + "AIDispatcher 未初始化。确认 main.py lifespan 已调用 register_ai_handlers。" + ) + return _dispatcher_instance + + def register_ai_handlers(dispatcher: AIDispatcher) -> None: - """将 AI 事件处理器注册到 trigger_scheduler。 + """将 AI 事件处理器注册到 trigger_scheduler,并设置模块级实例。 在 FastAPI lifespan 中调用,将 AI 事件处理器 注册为 trigger_scheduler 的 job handler。 """ + global _dispatcher_instance from app.services.trigger_scheduler import register_job + _dispatcher_instance = dispatcher + handlers = _create_ai_event_handlers(dispatcher) for job_type, handler in handlers.items(): register_job(job_type, handler) diff --git a/apps/backend/app/ai/prompts/__init__.py b/apps/backend/app/ai/prompts/__init__.py index 382d093..6cc13b7 100644 --- a/apps/backend/app/ai/prompts/__init__.py +++ b/apps/backend/app/ai/prompts/__init__.py @@ -1 +1,28 @@ -# AI Prompt 模板子模块 +"""AI 应用 Prompt 拼装模块。 + +8 个百炼自定义应用的后端 prompt 拼装函数集中此处。 +- 所有函数返回 str:直接传给 dashscope.Application.call(prompt=...) +- system prompt 在百炼控制台配置,本模块只负责拼数据上下文 JSON +- 数据源走 data_fetchers / board_service,集中真实业务数据 +- 失败降级:数据获取失败时拼"_data_warnings"字段,不阻断 AI 调用 +""" + +from app.ai.prompts.app2_finance_prompt import build_prompt as build_app2_prompt +from app.ai.prompts.app2a_finance_area_prompt import build_prompt as build_app2a_area_prompt +from app.ai.prompts.app3_clue_prompt import build_prompt as build_app3_prompt +from app.ai.prompts.app4_analysis_prompt import build_prompt as build_app4_prompt +from app.ai.prompts.app5_tactics_prompt import build_prompt as build_app5_prompt +from app.ai.prompts.app6_note_prompt import build_prompt as build_app6_prompt +from app.ai.prompts.app7_customer_prompt import build_prompt as build_app7_prompt +from app.ai.prompts.app8_consolidation_prompt import build_prompt as build_app8_prompt + +__all__ = [ + "build_app2_prompt", + "build_app2a_area_prompt", + "build_app3_prompt", + "build_app4_prompt", + "build_app5_prompt", + "build_app6_prompt", + "build_app7_prompt", + "build_app8_prompt", +] diff --git a/apps/backend/app/ai/prompts/app2a_finance_area_prompt.py b/apps/backend/app/ai/prompts/app2a_finance_area_prompt.py new file mode 100644 index 0000000..2898959 --- /dev/null +++ b/apps/backend/app/ai/prompts/app2a_finance_area_prompt.py @@ -0,0 +1,498 @@ +"""应用 2a 区域财务洞察 Prompt 拼装(app2_finance 的区域派生版本)。 + +面向 72 组合中 area != 'all' 的 64 个组合(8 时间 × 8 业态)。 + +差异点(相较 app2_finance): +- payload 新增顶层字段:「业态说明」「区域占比」 +- 派生比率精简:仅「人力成本占成交收入比」「优惠侵蚀率」(其他比率区域级无法计算) +- 单位经济区域级:支持客单价/日均订单数及环比(暂不输出会员占比,与 v1.2 system prompt H6 对齐) +- 按星期聚合区域级:无「日均现金流入」(区域级无 cash_inflow 数据) +- 日粒度异常区域级:仅对 gross_amount 做异常检测(无 cash_inflow) +- 不注入:预收资产/现金流入/现金流出/储值卡余额变化(全店级字段,区域级无业务意义) + +数据源: +- 主数据:board_service.get_finance_board(time, area, compare=1) +- 日粒度:etl 库 app.v_dws_finance_area_daily(按 area_code 过滤) +- 区域占比:调用 board_service 两次(一次区域 + 一次 all)后派生 +""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from datetime import datetime +from typing import Any + +from app.services.board_service import _calc_date_range, _calc_prev_range, get_finance_board + +# 复用 app2_finance_prompt 的公共常量与辅助函数 +from app.ai.prompts.app2_finance_prompt import ( + AREA_LABELS, + DIMENSION_LABELS, + DIMENSION_MAP, + INDUSTRY_BASELINES, + _aggregate_expense, + _build_coach_kpi, + _build_discount_kpi, + _slim, + _translate_keys, + _WEEKDAY_MIN_DAYS, + _ANOMALY_DEVIATION, + _ANOMALY_MAX_ITEMS, + _ANOMALY_MIN_DAYS, + _ANOMALY_MIN_SAME_WEEKDAY, + _WEEKDAY_ZH, +) + +logger = logging.getLogger(__name__) + + +# 业态特征字典(与 v1.2 system prompt「三、业态特征」章节对齐) +# trait:业态的数据表征(客单/订单密度/会员占比/周期规律) +# peer:典型对比项(给 AI 做区域对比时的参照方向) +AREA_INDUSTRY_TRAITS: dict[str, dict[str, str]] = { + "hall": { + "trait": "大厅(合并 hallA+B+C)· 散客主力 · 客单价中等 · 订单密度最高 · 会员占比相对低", + "peer": "与 VIP 包厢对比单客贡献差异 · 与团购占比对比获客成本", + }, + "hallA": { + "trait": "A 区大厅 · 散客主力 · 客单价中等 · 订单密度高", + "peer": "与 hallB/hallC 对比识别区位差异 · 与 hall 合计对比看单区占比", + }, + "hallB": { + "trait": "B 区大厅 · 散客主力 · 客单价中等 · 订单密度高", + "peer": "与 hallA/hallC 对比识别区位差异", + }, + "hallC": { + "trait": "C 区大厅(含 TV 台/美洲豹赛台)· 散客主力 · 客单价中等偏上 · 订单密度较高", + "peer": "与 hallA/hallB 对比识别区位差异", + }, + "vip": { + "trait": "VIP 台球包厢 · 会员主力 · 客单价显著高于大厅 2-3 倍 · 订单密度低 · 助教服务收入占比高", + "peer": "与 hall 大厅对比单客贡献 · 与 snooker 对比高客单群体差异", + }, + "snooker": { + "trait": "斯诺克 · 专业台球爱好者 · 客单价中高 · 会员占比较高 · 周末/夜场爆满", + "peer": "与 VIP 对比高端群体结构 · 与 hall 对比专业 vs 大众", + }, + "mahjong": { + "trait": "麻将房 · 散客 + 小团 · 客单价高(时长计费)· 停留久 · 订单密度低 · 助教参与度极低", + "peer": "与 KTV 对比包间型业态 · 与 hall 对比客单价与时长", + }, + "ktv": { + "trait": "团建房 · 团建场景 · 客单价集中在套餐 · 订单密度低 · 周末峰值明显 · 助教几乎不参与", + "peer": "与 mahjong 对比包间型业态 · 与 vip 对比高客单群体", + }, +} + + +def _fetch_area_daily_series( + site_id: int, start_date: str, end_date: str, area_code: str, +) -> list[tuple] | None: + """查区域级日粒度 [start, end],供单位经济/按星期/异常检测复用。 + + 返回字段顺序:(stat_date, gross, order_count, member_order_count, confirmed) + 注:区域级无 cash_inflow(对齐 v1.2 H6 降级),故与全店版 series 字段少一个 cash_in。 + + area_code 必须为非 "all" 的具体业态编码。 + """ + from app.database import get_connection + from app.services.fdw_queries import _fdw_context + + try: + conn = get_connection() + except Exception: + logger.debug("区域日粒度查询连接失败", exc_info=True) + return None + + try: + with _fdw_context(conn, site_id) as cur: + cur.execute( + """ + SELECT stat_date, + COALESCE(gross_amount, 0) AS gross, + COALESCE(order_count, 0) AS order_count, + COALESCE(member_order_count, 0) AS member_order_count, + COALESCE(confirmed_income, 0) AS confirmed + FROM app.v_dws_finance_area_daily + WHERE area_code = %s + AND stat_date >= %s::date + AND stat_date <= %s::date + ORDER BY stat_date + """, + (area_code, start_date, end_date), + ) + rows = cur.fetchall() + except Exception: + logger.debug( + "区域日粒度查询失败: site_id=%s area=%s", site_id, area_code, exc_info=True, + ) + return None + finally: + try: + conn.close() + except Exception: + pass + + active = [ + (r[0], float(r[1]), int(r[2] or 0), int(r[3] or 0), float(r[4] or 0)) + for r in rows + if float(r[1] or 0) > 0 + ] + return active if active else None + + +def _build_area_unit_economics( + series: list[tuple] | None, + prev_series: list[tuple] | None = None, +) -> dict | None: + """区域级单位经济:客单价 + 日均订单数(含环比)。 + + 与全店版差异: + - 不输出「会员订单占比」(对齐 v1.2 system prompt H6 · 等 DWS 回填完成 + A/B 评估后再开放) + - series 字段顺序:(stat_date, gross, order_count, member_order_count, confirmed) + + 月初场景(上期样本 < 5 天)附加"样本不足"后缀让 AI 降权引用。 + """ + if not series: + return None + total_orders = sum(r[2] for r in series) + if total_orders <= 0: + return None + total_gross = sum(r[1] for r in series) + total_confirmed = sum(r[4] for r in series) + days = len(series) + + price_confirmed = total_confirmed / total_orders + price_gross = total_gross / total_orders + daily_orders = total_orders / days + + out: dict[str, Any] = { + "总订单数": total_orders, + "日均订单数": round(daily_orders, 1), + "客单价_按成交收入": round(price_confirmed, 2), + "客单价_按发生额": round(price_gross, 2), + } + + if prev_series: + prev_orders = sum(r[2] for r in prev_series) + if prev_orders > 0: + prev_days = len(prev_series) + prev_gross = sum(r[1] for r in prev_series) + prev_confirmed = sum(r[4] for r in prev_series) + low_sample = prev_days < 5 + + def _pct_change(cur: float, prev: float) -> str: + if prev <= 0: + return "无上期数据" + value = f"{(cur - prev) / prev * 100:+.1f}%" + return f"{value}(上期仅 {prev_days} 天,样本不足仅供参考)" if low_sample else value + + out["客单价_按成交收入_环比"] = _pct_change(price_confirmed, prev_confirmed / prev_orders) + out["客单价_按发生额_环比"] = _pct_change(price_gross, prev_gross / prev_orders) + out["日均订单数_环比"] = _pct_change(daily_orders, prev_orders / prev_days) + return out + + +def _aggregate_by_weekday_area(series: list[tuple] | None) -> dict | None: + """区域级按星期聚合(无现金流入字段)。 + + series 字段顺序:(stat_date, gross, order_count, member_order_count, confirmed) + """ + if not series or len(series) < _WEEKDAY_MIN_DAYS: + return None + buckets: dict[int, list[tuple]] = defaultdict(list) + for row in series: + buckets[row[0].weekday()].append(row) + out: dict[str, dict] = {} + for wd in range(7): + rows = buckets.get(wd) or [] + if not rows: + continue + n = len(rows) + out[_WEEKDAY_ZH[wd]] = { + "日均发生额": round(sum(r[1] for r in rows) / n, 2), + "日均订单数": round(sum(r[2] for r in rows) / n, 1), + "营业日数": n, + } + return out or None + + +def _detect_anomaly_days_area( + series: list[tuple] | None, +) -> list[dict] | None: + """区域级日粒度异常(仅对 gross_amount 做,无现金流入)。 + + series 字段顺序:(stat_date, gross, order_count, member_order_count, confirmed) + """ + if not series or len(series) < _ANOMALY_MIN_DAYS: + return None + + def _scan(idx: int, label: str) -> list[dict]: + vals = [row[idx] for row in series] + global_mean = sum(vals) / len(vals) + if global_mean <= 0: + return [] + + by_weekday: dict[int, list[float]] = defaultdict(list) + for d, *metrics in series: + by_weekday[d.weekday()].append(metrics[idx - 1]) + weekday_mean: dict[int, float] = { + wd: (sum(xs) / len(xs)) for wd, xs in by_weekday.items() + } + + flagged: list[dict] = [] + for d, *metrics in series: + v = metrics[idx - 1] + wd = d.weekday() + same_count = len(by_weekday.get(wd, [])) + if same_count >= _ANOMALY_MIN_SAME_WEEKDAY and weekday_mean[wd] > 0: + base = weekday_mean[wd] + base_label = f"同{_WEEKDAY_ZH[wd]}均值" + else: + base = global_mean + base_label = "期均" + + deviation = (v - base) / base + if abs(deviation) >= _ANOMALY_DEVIATION: + flagged.append({ + "日期": f"{d} {_WEEKDAY_ZH[wd]}", + "指标": label, + "当日": round(v, 2), + "基线": round(base, 2), + "基线类型": base_label, + "偏离": f"{deviation * 100:+.1f}%", + "_abs_dev": abs(deviation), + }) + return flagged + + candidates = _scan(1, "发生额") # 区域级仅发生额做异常(无现金流入) + if not candidates: + return None + candidates.sort(key=lambda x: x["_abs_dev"], reverse=True) + out = [] + for c in candidates[:_ANOMALY_MAX_ITEMS]: + c.pop("_abs_dev", None) + out.append(c) + return out + + +async def _fetch_area_share( + site_id: int, time_dimension: str, area_confirmed: float, +) -> dict | None: + """查全店成交收入 + 上期全店成交收入,派生「区域占比」字段。 + + 返回:{本区域成交收入, 占全店成交收入, 占比环比} + 失败或数据不足返回 None。 + """ + board_time = DIMENSION_MAP.get(time_dimension) + if not board_time: + return None + try: + all_board = await get_finance_board( + time=board_time, area="all", compare=1, site_id=site_id, + ) + except Exception: + logger.debug("区域占比·全店数据查询失败", exc_info=True) + return None + + all_overview = (all_board or {}).get("overview") or {} + all_confirmed = float(all_overview.get("confirmed_revenue") or 0) + if all_confirmed <= 0: + return None + + share = area_confirmed / all_confirmed + out: dict[str, Any] = { + "本区域成交收入": round(area_confirmed, 2), + "全店成交收入": round(all_confirmed, 2), + "占全店成交收入": f"{share * 100:.1f}%", + } + # 环比:上期区域占比(本轮简化:若 all 的 confirmed_revenue_compare 可用,则给出"全店环比参照"让 AI 自己对比) + # 本区域占比环比 = (本期区域占比 − 上期区域占比),需查上期 area board,为避免额外 DB 访问,暂只给出本期占比 + return out + + +def _build_area_derived_ratios( + overview: dict | None, coach_kpi: dict | None, discount_kpi: dict | None, +) -> dict | None: + """区域级派生比率:仅「人力成本占成交收入比」「优惠侵蚀率」。 + + 其他比率(储值卡占比/结余率)区域级无数据,不输出。 + """ + if not isinstance(overview, dict): + return None + confirmed = float(overview.get("confirmed_revenue") or 0) + ratios: dict[str, Any] = {} + + if coach_kpi and confirmed > 0: + total_pay = float(coach_kpi.get("人力薪酬合计") or 0) + if total_pay > 0: + ratios["人力成本占成交收入比"] = round(total_pay / confirmed, 4) + + if discount_kpi and confirmed > 0: + total_discount = float(discount_kpi.get("总优惠") or 0) + gross = float(overview.get("occurrence") or 0) + if gross > 0: + ratios["优惠侵蚀率"] = round(total_discount / gross, 4) + + return ratios or None + + +async def build_prompt( + context: dict, + cache_svc: Any | None = None, # 兼容统一签名 +) -> str: + """构建 app2a 区域财务洞察 prompt 字符串。 + + Args: + context: site_id, time_dimension, area(area != 'all') + + Returns: + JSON 序列化的 prompt 字符串,字段已翻译为中文。 + + Raises: + ValueError: time_dimension 不支持 · area 为 'all' · area 不在白名单 + """ + import json + + site_id = context["site_id"] + time_dimension = context["time_dimension"] + area = context.get("area") + + if area == "all": + raise ValueError("app2a_finance_area 仅处理区域组合 · area='all' 应走 app2_finance") + if area not in AREA_LABELS: + raise ValueError(f"app2a_finance_area 不支持的区域: {area}") + + board_time = DIMENSION_MAP.get(time_dimension) + if not board_time: + raise ValueError(f"app2a_finance_area 不支持的时间维度: {time_dimension}") + + try: + board_data = await get_finance_board( + time=board_time, area=area, compare=1, site_id=site_id, + ) + except Exception: + logger.warning( + "app2a 财务看板查询失败: site_id=%s dimension=%s area=%s", + site_id, time_dimension, area, exc_info=True, + ) + board_data = {} + + overview = board_data.get("overview") if isinstance(board_data, dict) else None + revenue = board_data.get("revenue") if isinstance(board_data, dict) else None + coach = board_data.get("coach_analysis") if isinstance(board_data, dict) else None + expense = board_data.get("expense") if isinstance(board_data, dict) else None + + discount_kpi = _build_discount_kpi(revenue, overview) + coach_kpi = _build_coach_kpi(coach) + expense_kpi = _aggregate_expense(expense) + ratios = _build_area_derived_ratios(overview, coach_kpi, discount_kpi) + + # 原始数据 slim 后翻译,供 AI 追溯细节 + slim_data = _slim(board_data) or {} + raw_cn = _translate_keys(slim_data) + + # 对比口径(所有环比字段的前置依赖 · H1) + compare_caliber: dict[str, Any] | None = None + try: + cur_start, cur_end = _calc_date_range(board_time) + prev_start, prev_end = _calc_prev_range(board_time, cur_start, cur_end) + cur_days = (cur_end - cur_start).days + 1 + prev_days = (prev_end - prev_start).days + 1 + compare_caliber = { + "当期范围": f"{cur_start} ~ {cur_end}({cur_days} 天)", + "对比期范围": f"{prev_start} ~ {prev_end}({prev_days} 天)", + "对齐方式": "上期同天数对齐(非整月/整周对比)", + "说明": "所有 _环比 / _compare 字段均按上表口径计算;月中调用时对比期会自动截断到与当期相同天数", + } + except Exception: + logger.debug("对比口径字段生成失败(不影响主流程)", exc_info=True) + + # 业态说明(v1.2 system prompt H7 引用依据) + trait_info = AREA_INDUSTRY_TRAITS.get(area, {}) + industry_brief = { + "区域编码": area, + "区域名称": AREA_LABELS.get(area, area), + "业态特征": trait_info.get("trait", "—"), + "典型对比项": trait_info.get("peer", "—"), + } + + payload: dict[str, Any] = { + "当前时间": datetime.now().strftime("%Y-%m-%d %H:%M"), + "门店编号": site_id, + "时间维度": DIMENSION_LABELS.get(time_dimension, time_dimension), + "区域": AREA_LABELS.get(area, area), + **({"对比口径": compare_caliber} if compare_caliber else {}), + "业态说明": industry_brief, + "核心KPI": { + "发生额": float((overview or {}).get("occurrence") or 0), + "发生额环比": (overview or {}).get("occurrence_compare") or "持平", + "成交收入": float((overview or {}).get("confirmed_revenue") or 0), + "成交收入环比": (overview or {}).get("confirmed_revenue_compare") or "持平", + # 区域级无现金流入数据(v1.2 H6 降级),不输出现金相关 KPI + }, + } + + # 派生比率(仅 2 项) + if ratios: + payload["派生比率"] = ratios + + # 区域占比(需异步查全店) + area_confirmed = float((overview or {}).get("confirmed_revenue") or 0) + if area_confirmed > 0: + area_share = await _fetch_area_share(site_id, time_dimension, area_confirmed) + if area_share: + payload["区域占比"] = area_share + + # 优惠构成(复用全店版逻辑) + if discount_kpi: + payload["优惠构成"] = discount_kpi + + # 助教成本画像(复用全店版逻辑 · 空则整块不注入 · 符合 v1.2 H6) + if coach_kpi: + payload["助教成本"] = coach_kpi + + # 支出概况(区域级仅助教支出有效,v1.2 禁谈运营/固定/平台支出 · 但注入给 AI 追溯) + # 注:v1.2 system prompt 明确要求 D 板块禁谈这三类,AI 自会规避 + if expense_kpi: + payload["支出概况"] = expense_kpi + + # 日粒度派生(区域级) + try: + start_date, end_date = _calc_date_range(board_time) + series = _fetch_area_daily_series( + site_id, str(start_date), str(end_date), area_code=area, + ) + prev_series: list[tuple] | None = None + try: + prev_start, prev_end = _calc_prev_range(board_time, start_date, end_date) + prev_series = _fetch_area_daily_series( + site_id, str(prev_start), str(prev_end), area_code=area, + ) + except Exception: + logger.debug("区域上期 series 查询失败,客单价环比字段将省略", exc_info=True) + + if series: + unit_econ = _build_area_unit_economics(series, prev_series=prev_series) + if unit_econ: + payload["单位经济"] = unit_econ + by_weekday = _aggregate_by_weekday_area(series) + if by_weekday: + payload["按星期聚合"] = by_weekday + anomalies = _detect_anomaly_days_area(series) + if anomalies: + payload["日粒度异常"] = anomalies + except Exception: + logger.debug("区域日粒度派生字段注入失败(不影响主流程)", exc_info=True) + + # 行业基线 + payload["行业基线"] = INDUSTRY_BASELINES + + # 原始指标(slim 后的区域子集) + payload["原始指标"] = raw_cn + + if not board_data: + payload["数据缺失提示"] = "区域财务看板数据获取失败,请基于已有缓存或常识分析" + + return json.dumps(payload, ensure_ascii=False, default=str) diff --git a/apps/backend/app/ai/schemas.py b/apps/backend/app/ai/schemas.py index c03a30d..5bdb255 100644 --- a/apps/backend/app/ai/schemas.py +++ b/apps/backend/app/ai/schemas.py @@ -37,6 +37,7 @@ class SSEEvent(BaseModel): class CacheTypeEnum(str, enum.Enum): APP2_FINANCE = "app2_finance" + APP2A_FINANCE_AREA = "app2a_finance_area" # 2026-04-23 新增:区域财务洞察(64 组合) APP3_CLUE = "app3_clue" APP4_ANALYSIS = "app4_analysis" APP5_TACTICS = "app5_tactics" diff --git a/apps/backend/app/routers/admin_ai.py b/apps/backend/app/routers/admin_ai.py index 5306b3a..d7cfd25 100644 --- a/apps/backend/app/routers/admin_ai.py +++ b/apps/backend/app/routers/admin_ai.py @@ -27,8 +27,8 @@ from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query, status -from app.auth.dependencies import CurrentUser -from app.middleware.permission import require_permission +from app.auth.dependencies import CurrentUser, get_current_user +from app.middleware.permission import require_permission # 保留给可能的其他依赖 from app.schemas.admin_ai import ( AlertActionResponse, AlertListResponse, @@ -40,11 +40,18 @@ from app.schemas.admin_ai import ( CacheInvalidateRequest, CacheInvalidateResponse, DashboardResponse, + ManualTriggerRequest, + ManualTriggerResponse, + PrewarmProgressResponse, RetryResponse, + RunAppRequest, + RunAppResponse, RunLogDetailResponse, RunLogListResponse, + TriggerItem, TriggerJobDetailResponse, TriggerJobListResponse, + TriggerUpdateRequest, ) from app.services.ai.admin_service import AdminAIService @@ -62,18 +69,43 @@ _admin_svc = AdminAIService() def _require_admin(): """ - 管理端依赖:要求 JWT status=approved 且角色包含 site_admin 或 tenant_admin。 + 管理端依赖:直接从 JWT 读 roles 判定是否 admin(site_admin / tenant_admin / super_admin)。 + + 2026-04-21:改为不依赖 auth.users.status 查询(admin-web 登录用 admin_users 表, + 与 require_permission 走的 auth.users 不是同一张表)。status 实时校验通过 admin_users.is_active。 """ async def _dependency( - user: CurrentUser = Depends(require_permission()), + user: CurrentUser = Depends(get_current_user), ) -> CurrentUser: - admin_roles = {"site_admin", "tenant_admin"} + admin_roles = {"site_admin", "tenant_admin", "super_admin"} if not admin_roles.intersection(user.roles): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="需要管理员权限(site_admin 或 tenant_admin)", + detail="需要管理员权限(site_admin / tenant_admin / super_admin)", ) + + # 实时校验 admin_users 表的 is_active(若 user_id 在该表) + from app.database import get_connection + conn = get_connection() + try: + with conn.cursor() as cur: + cur.execute( + "SELECT is_active FROM admin_users WHERE id = %s", + (user.user_id,), + ) + row = cur.fetchone() + conn.commit() + finally: + conn.close() + + # 在 admin_users 中找到且未激活 → 拒绝 + if row is not None and not row[0]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="管理员账号已禁用", + ) + # 不在 admin_users 中但 JWT 带 admin 角色(如 xcx 用户临时升权),也允许通过 return user return _dependency @@ -85,10 +117,18 @@ def _require_admin(): @router.get("/dashboard", response_model=DashboardResponse) async def get_dashboard( site_id: Optional[int] = Query(None, description="门店 ID 筛选"), + range_days: Optional[int] = Query(None, ge=1, le=365, description="回溯天数:1=今日 / 3 / 7 / 10"), + date_from: Optional[str] = Query(None, description="起始日期 YYYY-MM-DD(与 date_to 成对使用)"), + date_to: Optional[str] = Query(None, description="结束日期 YYYY-MM-DD"), user: CurrentUser = Depends(_require_admin()), ) -> DashboardResponse: - """总览统计(支持 site_id 筛选)。""" - data = await _admin_svc.get_dashboard(site_id=site_id) + """总览统计(支持 site_id + 时间范围筛选)。""" + data = await _admin_svc.get_dashboard( + site_id=site_id, + range_days=range_days, + date_from=date_from, + date_to=date_to, + ) return DashboardResponse(**data) @@ -292,3 +332,152 @@ async def ignore_alert( """忽略告警:alert_status → ignored。""" new_status = await _admin_svc.ignore_alert(log_id) return AlertActionResponse(id=log_id, alert_status=new_status) + + +# ── 按需执行单个 App(admin-web 重新生成按钮用)────────── + + +_SUPPORTED_APP_TYPES = { + "app2_finance", + "app2a_finance_area", # 2026-04-23 新增:区域财务洞察 + "app3_clue", + "app4_analysis", + "app5_tactics", + "app6_note", + "app7_customer", + "app8_consolidation", +} + + +@router.post("/run/{app_type}", response_model=RunAppResponse) +async def run_single_app( + app_type: str, + body: RunAppRequest, + user: CurrentUser = Depends(_require_admin()), +) -> RunAppResponse: + """按需执行单个 App,跳过链路编排。 + + 使用场景:admin-web 缓存详情页 / 告警页的"重新生成"按钮。 + 熔断/限流/预算检查由 dispatcher._run_step 自动执行。 + 结果写入 ai_cache,失败不抛异常,通过 success=False 返回。 + """ + if app_type not in _SUPPORTED_APP_TYPES: + raise HTTPException( + status_code=400, + detail=f"不支持的 app_type: {app_type};支持 {sorted(_SUPPORTED_APP_TYPES)}", + ) + + from app.ai.dispatcher import get_dispatcher + + try: + dispatcher = get_dispatcher() + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) from exc + + context = body.model_dump(exclude_none=True) + + try: + result = await dispatcher.run_single_app( + app_type=app_type, + context=context, + triggered_by=f"admin:{user.user_id}", + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + if result is None: + return RunAppResponse( + app_type=app_type, + success=False, + error="AI 调用失败(详情见 ai_run_logs,可能为熔断/限流/预算/超时)", + ) + return RunAppResponse(app_type=app_type, success=True, result=result) + + +# ── 触发器管理(biz.trigger_jobs)───────────────────────── + + +@router.get("/triggers", response_model=list[TriggerItem]) +async def list_triggers( + _user: CurrentUser = Depends(_require_admin()), +) -> list[TriggerItem]: + """列出所有 AI 相关触发器(job_type=ai_* 或 task_generator)。""" + rows = await _admin_svc.list_triggers() + return [TriggerItem(**r) for r in rows] + + +@router.patch("/triggers/{trigger_id}", response_model=TriggerItem) +async def update_trigger( + trigger_id: int, + body: TriggerUpdateRequest, + _user: CurrentUser = Depends(_require_admin()), +) -> TriggerItem: + """更新触发器:启用/禁用、修改 cron 表达式、修改描述。""" + try: + row = await _admin_svc.update_trigger( + trigger_id, + status_new=body.status, + cron_expression=body.cron_expression, + description=body.description, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + return TriggerItem(**row) + + +# ── 预热进度查询 ───────────────────────────────────────── + + +@router.get("/prewarm/progress", response_model=PrewarmProgressResponse) +async def get_prewarm_progress( + site_id: int = Query(..., description="门店 ID"), + _user: CurrentUser = Depends(_require_admin()), +) -> PrewarmProgressResponse: + """查询 app2_finance 72 组合预热进度(done / missing)。""" + data = await _admin_svc.get_prewarm_progress(site_id) + return PrewarmProgressResponse(**data) + + +# ── 手动事件触发(跨越去重)────────────────────────────── + + +@router.post("/trigger-event", response_model=ManualTriggerResponse) +async def manual_trigger_event( + body: ManualTriggerRequest, + user: CurrentUser = Depends(_require_admin()), +) -> ManualTriggerResponse: + """手动触发 AI 事件链,默认 is_forced=True 跳过去重。 + + 事件类型:consumption / dws_completed / note_created / task_assigned + """ + from app.ai.dispatcher import TriggerEvent, get_dispatcher + + valid_events = {"consumption", "dws_completed", "note_created", "task_assigned"} + if body.event_type not in valid_events: + raise HTTPException( + status_code=400, + detail=f"非法 event_type: {body.event_type};支持 {sorted(valid_events)}", + ) + + try: + dispatcher = get_dispatcher() + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) from exc + + payload = dict(body.payload or {}) + if body.assistant_id is not None: + payload.setdefault("assistant_id", body.assistant_id) + + event = TriggerEvent( + event_type=body.event_type, + site_id=body.site_id, + member_id=body.member_id, + payload=payload, + is_forced=body.is_forced, + ) + logger.info( + "admin 手动触发事件: user=%s event=%s site_id=%s member_id=%s forced=%s", + user.user_id, body.event_type, body.site_id, body.member_id, body.is_forced, + ) + job_id = await dispatcher.handle_trigger(event) + return ManualTriggerResponse(trigger_job_id=job_id)