"""页面上下文文本化模块(应用 1 专用)。 根据 contextType 从数据库获取对应页面数据, 格式化为结构化中文文本(≤ 2000 字符),供 AI 理解当前场景。 不传入 member_phone 等断档敏感字段。 """ from __future__ import annotations import asyncio import logging from datetime import date, datetime from decimal import Decimal from functools import partial from typing import Any from app.database import get_connection, get_etl_readonly_connection logger = logging.getLogger(__name__) MAX_PAGE_CONTEXT_LENGTH = 2000 FDW_QUERY_TIMEOUT_SEC = 5 # 支持的 10 种页面类型 SUPPORTED_PAGE_TYPES = { "task-detail", "customer-detail", "coach-detail", "board-finance", "board-customer", "board-coach", "performance", "my-profile", "task-list", "customer-service-records", } async def build_page_text( source_page: str, context_id: int | str | None, site_id: int, filters: dict | None = None, ) -> str: """将页面数据转换为 AI 可读的结构化中文文本。 Args: source_page: 页面类型(contextType) context_id: 实体 ID(contextId) site_id: 门店 ID filters: 看板类页面的筛选参数 Returns: 结构化中文文本(≤ 2000 字符),失败时返回降级文本 """ if not source_page or source_page not in SUPPORTED_PAGE_TYPES: return "" try: loop = asyncio.get_event_loop() text = await loop.run_in_executor( None, partial(_build_page_text_sync, source_page, context_id, site_id, filters or {}), ) # 截断保护 if len(text) > MAX_PAGE_CONTEXT_LENGTH: text = text[:MAX_PAGE_CONTEXT_LENGTH - 20] + "\n…(上下文已截断)" return text except Exception: logger.exception("页面上下文获取失败: source_page=%s", source_page) return "页面上下文获取失败,请直接描述您的问题" def _build_page_text_sync( source_page: str, context_id: int | str | None, site_id: int, filters: dict, ) -> str: """同步路由到对应页面文本化函数。""" handlers = { "task-detail": _text_task_detail, "customer-detail": _text_customer_detail, "coach-detail": _text_coach_detail, "board-finance": _text_board_finance, "board-customer": _text_board_customer, "board-coach": _text_board_coach, "performance": _text_performance, "my-profile": _text_my_profile, "task-list": _text_task_list, "customer-service-records": _text_customer_service_records, } handler = handlers.get(source_page) if not handler: return "" return handler(context_id, site_id, filters) # ── 详情类页面 ────────────────────────────────────────────────── def _text_task_detail( context_id: int | str | None, site_id: int, filters: dict ) -> str: """任务详情页文本化。""" if not context_id: return "" task_id = int(context_id) conn = get_connection() try: with conn.cursor() as cur: # 任务信息 cur.execute( """ SELECT ct.task_type, ct.status, ct.deadline, ct.member_id, ct.assistant_id, dm.nickname AS member_nickname, da.nickname AS assistant_nickname FROM biz.coach_tasks ct LEFT JOIN biz.coach_tasks_member_view dm ON dm.member_id = ct.member_id AND dm.site_id = ct.site_id LEFT JOIN biz.coach_tasks_assistant_view da ON da.assistant_id = ct.assistant_id AND da.site_id = ct.site_id WHERE ct.id = %s AND ct.site_id = %s """, (task_id, site_id), ) task = cur.fetchone() if not task: return f"任务 {task_id} 不存在" task_type, status, deadline, member_id, assistant_id, member_nick, asst_nick = task # 最近备注(最多 3 条) cur.execute( """ SELECT content, created_at FROM biz.notes WHERE task_id = %s AND site_id = %s ORDER BY created_at DESC LIMIT 3 """, (task_id, site_id), ) notes = cur.fetchall() # AI 缓存(最新分析) cur.execute( """ SELECT result_json, created_at FROM biz.ai_cache WHERE cache_type = 'app4_analysis' AND site_id = %s AND target_id = %s ORDER BY created_at DESC LIMIT 1 """, (site_id, f"{assistant_id}_{member_id}"), ) ai_row = cur.fetchone() lines = [ "【任务详情】", f" 任务类型:{task_type or '未知'}", f" 状态:{status or '未知'}", f" 截止日期:{_fmt_date(deadline)}", f" 客户:{member_nick or f'ID:{member_id}'}", f" 助教:{asst_nick or f'ID:{assistant_id}'}", ] if notes: lines.append("【最近备注】") for content, created_at in notes: short = (content or "")[:100] lines.append(f" {_fmt_date(created_at)} {short}") if ai_row: lines.append(f"【AI 分析】最近更新于 {_fmt_date(ai_row[1])}") return "\n".join(lines) finally: conn.close() def _text_customer_detail( context_id: int | str | None, site_id: int, filters: dict ) -> str: """客户详情页文本化。""" if not context_id: return "" member_id = int(context_id) # 复用 member_data 的同步查询(避免循环导入,直接查询) etl_conn = None biz_conn = None try: etl_conn = get_etl_readonly_connection(site_id) with etl_conn.cursor() as cur: cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), ) # CHANGE 2026-03-23 | Prompt: FDW 迁移——fdw_etl.* → app.* 直连 ETL 库 # 会员信息 cur.execute( """ SELECT nickname FROM app.v_dim_member WHERE member_id = %s AND scd2_is_current = 1 """, (member_id,), ) m_row = cur.fetchone() nickname = m_row[0] if m_row else f"ID:{member_id}" # 最近 5 条消费 cur.execute( """ SELECT settle_date, items_sum, room_name FROM app.v_dwd_settlement_head WHERE member_id = %s AND settle_type IN (1, 3) ORDER BY settle_date DESC LIMIT 5 """, (member_id,), ) recent = cur.fetchall() # 余额 cur.execute( """ SELECT balance_amount FROM app.v_dws_member_consumption_summary WHERE member_id = %s LIMIT 1 """, (member_id,), ) bal_row = cur.fetchone() etl_conn.commit() # 维客线索 biz_conn = get_connection() with biz_conn.cursor() as cur: cur.execute( """ SELECT summary FROM member_retention_clue WHERE member_id = %s AND site_id = %s ORDER BY created_at DESC LIMIT 5 """, (member_id, site_id), ) clues = cur.fetchall() lines = [ "【客户详情】", f" 昵称:{nickname}", f" 储值余额:{_fmt_decimal(bal_row[0]) if bal_row else '未知'}", ] if recent: lines.append("【近期消费】") for sd, amt, room in recent: lines.append(f" {_fmt_date(sd)} ¥{_fmt_decimal(amt)} {room or ''}") if clues: lines.append("【维客线索】") for (summary,) in clues: lines.append(f" {summary}") return "\n".join(lines) finally: if etl_conn: etl_conn.close() if biz_conn: biz_conn.close() def _text_coach_detail( context_id: int | str | None, site_id: int, filters: dict ) -> str: """助教详情页文本化。""" if not context_id: return "" assistant_id = int(context_id) etl_conn = None biz_conn = None try: etl_conn = get_etl_readonly_connection(site_id) with etl_conn.cursor() as cur: cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), ) cur.execute( """ SELECT nickname, level, hire_date FROM app.v_dim_assistant WHERE assistant_id = %s LIMIT 1 """, (assistant_id,), ) row = cur.fetchone() etl_conn.commit() if not row: return f"助教 {assistant_id} 不存在" nickname, level, hire_date = row biz_conn = get_connection() with biz_conn.cursor() as cur: # 任务统计 cur.execute( """ SELECT status, COUNT(*) FROM biz.coach_tasks WHERE assistant_id = %s AND site_id = %s GROUP BY status """, (assistant_id, site_id), ) task_stats = cur.fetchall() lines = [ "【助教详情】", f" 花名:{nickname or ''}", f" 级别:{level or ''}", f" 入职日期:{_fmt_date(hire_date)}", ] if task_stats: lines.append("【任务统计】") for status, cnt in task_stats: lines.append(f" {status}: {cnt} 个") return "\n".join(lines) finally: if etl_conn: etl_conn.close() if biz_conn: biz_conn.close() # ── 看板类页面 ────────────────────────────────────────────────── def _text_board_finance( context_id: int | str | None, site_id: int, filters: dict ) -> str: """财务看板文本化。""" time_dim = filters.get("timeDimension", "this_month") area = filters.get("areaFilter", "") etl_conn = None try: etl_conn = get_etl_readonly_connection(site_id) with etl_conn.cursor() as cur: cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), ) # 简化查询:获取汇总数据 cur.execute( """ SELECT COUNT(*) AS settle_count, COALESCE(SUM(items_sum), 0) AS total_revenue, COALESCE(AVG(items_sum), 0) AS avg_revenue FROM app.v_dwd_settlement_head WHERE settle_type IN (1, 3) AND settle_date >= (CURRENT_DATE - INTERVAL '1 month') """, ) row = cur.fetchone() etl_conn.commit() lines = [ "【财务看板】", f" 时间维度:{time_dim}", ] if area: lines.append(f" 区域筛选:{area}") if row: lines.append(f" 结算笔数:{row[0]}") lines.append(f" 总营收:¥{_fmt_decimal(row[1])}") lines.append(f" 笔均:¥{_fmt_decimal(row[2])}") return "\n".join(lines) finally: if etl_conn: etl_conn.close() def _text_board_customer( context_id: int | str | None, site_id: int, filters: dict ) -> str: """客户看板文本化。""" dimension = filters.get("dimension", "consumption") type_filter = filters.get("typeFilter", "") etl_conn = None try: etl_conn = get_etl_readonly_connection(site_id) with etl_conn.cursor() as cur: cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), ) # Top 10 客户 cur.execute( """ SELECT dm.nickname, COALESCE(SUM(sh.items_sum), 0) AS total_consumption FROM app.v_dwd_settlement_head sh JOIN app.v_dim_member dm ON dm.member_id = sh.member_id AND dm.scd2_is_current = 1 WHERE sh.settle_type IN (1, 3) AND sh.member_id > 0 AND sh.settle_date >= (CURRENT_DATE - INTERVAL '1 month') GROUP BY dm.nickname ORDER BY total_consumption DESC LIMIT 10 """, ) rows = cur.fetchall() etl_conn.commit() lines = [ "【客户看板】", f" 排序维度:{dimension}", ] if type_filter: lines.append(f" 类型筛选:{type_filter}") if rows: lines.append(" Top 10 客户:") for i, (nick, amt) in enumerate(rows, 1): lines.append(f" {i}. {nick or '未知'} ¥{_fmt_decimal(amt)}") return "\n".join(lines) finally: if etl_conn: etl_conn.close() def _text_board_coach( context_id: int | str | None, site_id: int, filters: dict ) -> str: """助教看板文本化。""" dimension = filters.get("dimension", "service_count") project = filters.get("projectFilter", "") time_dim = filters.get("timeDimension", "this_month") etl_conn = None try: etl_conn = get_etl_readonly_connection(site_id) with etl_conn.cursor() as cur: cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), ) cur.execute( """ SELECT da.nickname, COUNT(*) AS service_count, COALESCE(SUM(sl.ledger_amount), 0) AS total_revenue FROM app.v_dwd_assistant_service_log sl JOIN app.v_dim_assistant da ON da.assistant_id = sl.site_assistant_id WHERE sl.is_delete = 0 AND sl.create_time >= (CURRENT_DATE - INTERVAL '1 month') GROUP BY da.nickname ORDER BY service_count DESC LIMIT 10 """, ) rows = cur.fetchall() etl_conn.commit() lines = [ "【助教看板】", f" 排序维度:{dimension}", f" 时间维度:{time_dim}", ] if project: lines.append(f" 技能筛选:{project}") if rows: lines.append(" Top 10 助教:") for i, (nick, cnt, amt) in enumerate(rows, 1): lines.append(f" {i}. {nick or '未知'} 服务{cnt}次 ¥{_fmt_decimal(amt)}") return "\n".join(lines) finally: if etl_conn: etl_conn.close() # ── 其他页面 ────────────────────────────────────────────────── def _text_performance( context_id: int | str | None, site_id: int, filters: dict ) -> str: """绩效页面文本化。""" time_dim = filters.get("timeDimension", "this_month") etl_conn = None try: etl_conn = get_etl_readonly_connection(site_id) with etl_conn.cursor() as cur: cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), ) cur.execute( """ SELECT da.nickname, sc.performance_tier, sc.monthly_customers FROM app.v_dws_assistant_salary_calc sc JOIN app.v_dim_assistant da ON da.assistant_id = sc.assistant_id ORDER BY sc.calc_month DESC, sc.monthly_customers DESC LIMIT 10 """, ) rows = cur.fetchall() etl_conn.commit() lines = [ "【绩效数据】", f" 时间维度:{time_dim}", ] if rows: for nick, tier, customers in rows: lines.append(f" {nick or '未知'} {tier or ''} 服务{customers or 0}人") return "\n".join(lines) finally: if etl_conn: etl_conn.close() def _text_my_profile( context_id: int | str | None, site_id: int, filters: dict ) -> str: """个人信息页文本化。""" return "【个人信息】\n 当前为个人信息页面,可查询个人绩效和任务情况。" def _text_task_list( context_id: int | str | None, site_id: int, filters: dict ) -> str: """任务列表页文本化。""" if not context_id: # 无特定任务,返回概要 conn = get_connection() try: with conn.cursor() as cur: cur.execute( """ SELECT status, COUNT(*) FROM biz.coach_tasks WHERE site_id = %s GROUP BY status """, (site_id,), ) stats = cur.fetchall() lines = ["【任务列表】"] for status, cnt in stats: lines.append(f" {status}: {cnt} 个") return "\n".join(lines) finally: conn.close() # 有特定任务 ID,复用 task-detail return _text_task_detail(context_id, site_id, filters) def _text_customer_service_records( context_id: int | str | None, site_id: int, filters: dict ) -> str: """客户服务记录页文本化。""" if not context_id: return "" member_id = int(context_id) etl_conn = None try: etl_conn = get_etl_readonly_connection(site_id) with etl_conn.cursor() as cur: cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), ) cur.execute( """ SELECT create_time, real_use_seconds / 60 AS duration_minutes, ledger_amount, site_table_id FROM app.v_dwd_assistant_service_log WHERE tenant_member_id = %s AND is_delete = 0 ORDER BY create_time DESC LIMIT 10 """, (member_id,), ) rows = cur.fetchall() etl_conn.commit() lines = ["【服务记录】"] if not rows: lines.append(" 暂无服务记录") else: for sd, dur, amt, room in rows: lines.append( f" {_fmt_date(sd)} {dur or 0}分钟 ¥{_fmt_decimal(amt)} {room or ''}" ) return "\n".join(lines) finally: if etl_conn: etl_conn.close() # ── 工具函数 ────────────────────────────────────────────────── def _fmt_date(val: Any) -> str: """格式化日期值。""" if isinstance(val, datetime): return val.strftime("%Y-%m-%d %H:%M") if isinstance(val, date): return val.isoformat() return str(val) if val else "未知" def _fmt_decimal(val: Any) -> str: """格式化金额值。""" if val is None: return "0.00" if isinstance(val, Decimal): return f"{val:.2f}" if isinstance(val, float): return f"{val:.2f}" return str(val)