Files
Neo-ZQYY/apps/backend/app/services/task_manager.py

1041 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
任务管理服务
负责任务 CRUD、置顶、放弃、取消放弃等操作。
通过 FDW 读取客户信息和 RS 指数,计算爱心 icon 档位。
RNS1.1 扩展get_task_list_v2TASK-1、get_task_detailTASK-2
"""
import json
import logging
from datetime import datetime
from decimal import Decimal
from fastapi import HTTPException
from app.services import fdw_queries
from app.services.task_generator import compute_heart_icon
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免纯函数测试时触发模块级导入失败。"""
from app.database import get_connection
return get_connection()
def _record_history(
cur,
task_id: int,
action: str,
old_status: str | None = None,
new_status: str | None = None,
old_task_type: str | None = None,
new_task_type: str | None = None,
detail: dict | None = None,
) -> None:
"""在 coach_task_history 中记录变更。"""
cur.execute(
"""
INSERT INTO biz.coach_task_history
(task_id, action, old_status, new_status,
old_task_type, new_task_type, detail)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
task_id,
action,
old_status,
new_status,
old_task_type,
new_task_type,
json.dumps(detail) if detail else None,
),
)
def _get_assistant_id(conn, user_id: int, site_id: int) -> int:
"""
通过 user_assistant_binding 获取 assistant_id。
找不到绑定关系时抛出 403。
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT assistant_id
FROM auth.user_assistant_binding
WHERE user_id = %s AND site_id = %s AND assistant_id IS NOT NULL
LIMIT 1
""",
(user_id, site_id),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=403, detail="权限不足")
return row[0]
def _verify_task_ownership(
conn, task_id: int, assistant_id: int, site_id: int, required_status: str | None = None
) -> dict:
"""
验证任务归属并返回任务信息。
- 任务不存在 → 404
- 不属于当前助教 → 403
- required_status 不匹配 → 409
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, task_type, status, is_pinned, abandon_reason,
assistant_id, site_id
FROM biz.coach_tasks
WHERE id = %s
""",
(task_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="资源不存在")
task = {
"id": row[0],
"task_type": row[1],
"status": row[2],
"is_pinned": row[3],
"abandon_reason": row[4],
"assistant_id": row[5],
"site_id": row[6],
}
if task["site_id"] != site_id or task["assistant_id"] != assistant_id:
raise HTTPException(status_code=403, detail="权限不足")
if required_status and task["status"] != required_status:
raise HTTPException(status_code=409, detail="任务状态不允许此操作")
return task
async def get_task_list(user_id: int, site_id: int) -> list[dict]:
"""
获取助教的任务列表(含有效 + 已放弃)。
1. 通过 auth.user_assistant_binding 获取 assistant_id
2. 查询 biz.coach_tasks WHERE status IN ('active', 'abandoned')
3. 通过 FDW 读取客户基本信息dim_member和 RS 指数
4. 计算爱心 icon 档位
5. 排序abandoned 排最后 → is_pinned DESC → priority_score DESC → created_at ASC
FDW 查询需要 SET LOCAL app.current_site_id。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# 查询有效 + 已放弃任务abandoned 排最后)
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, task_type, status, priority_score, is_pinned,
expires_at, created_at, member_id, abandon_reason
FROM biz.coach_tasks
WHERE site_id = %s
AND assistant_id = %s
AND status IN ('active', 'abandoned')
ORDER BY
CASE WHEN status = 'abandoned' THEN 1 ELSE 0 END ASC,
is_pinned DESC,
priority_score DESC NULLS LAST,
created_at ASC
""",
(site_id, assistant_id),
)
tasks = cur.fetchall()
conn.commit()
if not tasks:
return []
member_ids = list({t[7] for t in tasks})
# 通过 FDW 读取客户信息和 RS 指数(需要 SET LOCAL app.current_site_id
member_info_map: dict[int, dict] = {}
rs_map: dict[int, Decimal] = {}
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
# 读取客户基本信息
cur.execute(
"""
SELECT member_id, member_name, member_phone
FROM fdw_etl.v_dim_member
WHERE member_id = ANY(%s)
""",
(member_ids,),
)
for row in cur.fetchall():
member_info_map[row[0]] = {
"member_name": row[1],
"member_phone": row[2],
}
# 读取 RS 指数
cur.execute(
"""
SELECT member_id, COALESCE(rs_display, 0)
FROM fdw_etl.v_dws_member_assistant_relation_index
WHERE assistant_id = %s
AND member_id = ANY(%s)
""",
(assistant_id, member_ids),
)
for row in cur.fetchall():
rs_map[row[0]] = Decimal(str(row[1]))
conn.commit()
# 组装结果
result = []
for task_row in tasks:
(task_id, task_type, status, priority_score,
is_pinned, expires_at, created_at, member_id, abandon_reason) = task_row
info = member_info_map.get(member_id, {})
rs_score = rs_map.get(member_id, Decimal("0"))
heart_icon = compute_heart_icon(rs_score)
result.append({
"id": task_id,
"task_type": task_type,
"status": status,
"priority_score": float(priority_score) if priority_score else None,
"is_pinned": is_pinned,
"expires_at": expires_at.isoformat() if expires_at else None,
"created_at": created_at.isoformat() if created_at else None,
"member_id": member_id,
"member_name": info.get("member_name"),
"member_phone": info.get("member_phone"),
"rs_score": float(rs_score),
"heart_icon": heart_icon,
"abandon_reason": abandon_reason,
})
return result
finally:
conn.close()
async def pin_task(task_id: int, user_id: int, site_id: int) -> dict:
"""
置顶任务。
验证任务归属后设置 is_pinned=TRUE记录 history。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="active"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET is_pinned = TRUE, updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_record_history(
cur,
task_id,
action="pin",
old_status="active",
new_status="active",
old_task_type=task["task_type"],
new_task_type=task["task_type"],
detail={"is_pinned": True},
)
conn.commit()
return {"id": task_id, "is_pinned": True}
finally:
conn.close()
async def unpin_task(task_id: int, user_id: int, site_id: int) -> dict:
"""
取消置顶。
验证任务归属后设置 is_pinned=FALSE。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="active"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET is_pinned = FALSE, updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
conn.commit()
return {"id": task_id, "is_pinned": False}
finally:
conn.close()
async def abandon_task(
task_id: int, user_id: int, site_id: int, reason: str
) -> dict:
"""
放弃任务。
1. 验证 reason 非空(空或纯空白 → 422
2. 验证任务归属和 status='active'
3. 设置 status='abandoned', abandon_reason=reason
4. 记录 coach_task_history
"""
if not reason or not reason.strip():
raise HTTPException(status_code=422, detail="放弃原因不能为空")
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="active"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'abandoned',
abandon_reason = %s,
updated_at = NOW()
WHERE id = %s
""",
(reason, task_id),
)
_record_history(
cur,
task_id,
action="abandon",
old_status="active",
new_status="abandoned",
old_task_type=task["task_type"],
new_task_type=task["task_type"],
detail={"abandon_reason": reason},
)
conn.commit()
return {"id": task_id, "status": "abandoned"}
finally:
conn.close()
async def cancel_abandon(task_id: int, user_id: int, site_id: int) -> dict:
"""
取消放弃。
1. 验证任务归属和 status='abandoned'
2. 恢复 status='active', 清空 abandon_reason
3. 记录 coach_task_history
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="abandoned"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'active',
is_pinned = FALSE,
abandon_reason = NULL,
updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_record_history(
cur,
task_id,
action="cancel_abandon",
old_status="abandoned",
new_status="active",
old_task_type=task["task_type"],
new_task_type=task["task_type"],
)
conn.commit()
return {"id": task_id, "status": "active", "is_pinned": False}
finally:
conn.close()
# ---------------------------------------------------------------------------
# RNS1.1 扩展:辅助常量与工具函数
# ---------------------------------------------------------------------------
# 任务类型 → 中文标签
_TASK_TYPE_LABEL_MAP: dict[str, str] = {
"high_priority_recall": "高优先召回",
"priority_recall": "优先召回",
"follow_up_visit": "客户回访",
"relationship_building": "关系构建",
}
# 课程类型 → courseTypeClass 枚举映射design.md 定义)
_COURSE_TYPE_CLASS_MAP: dict[str, str] = {
"basic": "basic",
"陪打": "basic",
"基础课": "basic",
"vip": "vip",
"包厢": "vip",
"包厢课": "vip",
"tip": "tip",
"超休": "tip",
"激励课": "tip",
"recharge": "recharge",
"充值": "recharge",
"incentive": "incentive",
"激励": "incentive",
}
# 维客线索 category → tag_color 映射
_CATEGORY_COLOR_MAP: dict[str, str] = {
"客户基础": "#0052d9",
"客户基础信息": "#0052d9",
"消费习惯": "#e34d59",
"玩法偏好": "#00a870",
"促销偏好": "#ed7b2f",
"促销接受": "#ed7b2f",
"社交关系": "#0594fa",
"重要反馈": "#a25eb5",
}
def map_course_type_class(raw_course_type: str | None) -> str:
"""将原始课程类型映射为统一枚举值(不带 tag- 前缀)。"""
if not raw_course_type:
return "basic"
return _COURSE_TYPE_CLASS_MAP.get(raw_course_type.strip(), "basic")
def compute_income_trend(current_income: float, prev_income: float) -> tuple[str, str]:
"""
计算收入趋势。
返回 (income_trend, income_trend_dir)。
如 (1000, 800) → ("↑200", "up")
"""
diff = current_income - prev_income
direction = "up" if diff >= 0 else "down"
arrow = "" if diff >= 0 else ""
trend = f"{arrow}{abs(diff):.0f}"
return trend, direction
def sanitize_tag(raw_tag: str | None) -> str:
"""去除 tag 中的换行符,多行标签使用空格分隔。"""
if not raw_tag:
return ""
return raw_tag.replace("\n", " ").strip()
def _extract_emoji_and_text(summary: str | None) -> tuple[str, str]:
"""
从 summary 中提取 emoji 前缀和正文。
AI 写入格式: "📅 偏好周末下午时段消费" → ("📅", "偏好周末下午时段消费")
手动写入无 emoji: "喜欢打中式" → ("", "喜欢打中式")
"""
if not summary:
return "", ""
# 检查第一个字符是否为 emoji非 ASCII 且非中文常用范围)
first_char = summary[0]
if ord(first_char) > 0x2600 and summary[1:2] == " ":
return first_char, summary[2:].strip()
return "", summary.strip()
def _format_time(dt: datetime | None) -> str | None:
"""格式化时间为 ISO 字符串。"""
if dt is None:
return None
return dt.isoformat() if hasattr(dt, "isoformat") else str(dt)
# ---------------------------------------------------------------------------
# RNS1.1get_task_list_v2TASK-1 扩展版任务列表)
# ---------------------------------------------------------------------------
async def get_task_list_v2(
user_id: int,
site_id: int,
status: str,
page: int,
page_size: int,
) -> dict:
"""
扩展版任务列表TASK-1
返回 { items, total, page, pageSize, performance }。
逻辑:
1. _get_assistant_id() 获取 assistant_id
2. 查询 coach_tasks 带分页LIMIT/OFFSET + COUNT(*)
3. fdw_queries 批量获取会员信息、余额、lastVisitDays
4. fdw_queries.get_salary_calc() 获取绩效概览
5. 查询 ai_cache 获取 aiSuggestion
6. 组装 TaskListResponse
扩展字段lastVisitDays/balance/aiSuggestion采用优雅降级。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# ── 1. 查询任务列表(带分页 + 总数) ──
# 状态映射:前端 pending → active
db_status = "active" if status == "pending" else status
with conn.cursor() as cur:
# 总数
cur.execute(
"""
SELECT COUNT(*)
FROM biz.coach_tasks
WHERE site_id = %s AND assistant_id = %s AND status = %s
""",
(site_id, assistant_id, db_status),
)
total = cur.fetchone()[0]
# 分页查询
offset = (page - 1) * page_size
cur.execute(
"""
SELECT id, task_type, status, priority_score, is_pinned,
expires_at, created_at, member_id, abandon_reason
FROM biz.coach_tasks
WHERE site_id = %s AND assistant_id = %s AND status = %s
ORDER BY is_pinned DESC,
priority_score DESC NULLS LAST,
created_at ASC
LIMIT %s OFFSET %s
""",
(site_id, assistant_id, db_status, page_size, offset),
)
tasks = cur.fetchall()
conn.commit()
if not tasks:
# 即使无任务也需要返回绩效概览
performance = _build_performance_summary(conn, site_id, assistant_id)
return {
"items": [],
"total": 0,
"page": page,
"page_size": page_size,
"performance": performance,
}
member_ids = list({t[7] for t in tasks})
# ── 2. FDW 批量查询会员信息 ──
member_info_map: dict[int, dict] = {}
try:
member_info_map = fdw_queries.get_member_info(conn, site_id, member_ids)
except Exception:
logger.warning("FDW 查询会员信息失败", exc_info=True)
# ── 3. FDW 批量查询余额(优雅降级) ──
balance_map: dict[int, Decimal] = {}
try:
balance_map = fdw_queries.get_member_balance(conn, site_id, member_ids)
except Exception:
logger.warning("FDW 查询余额失败", exc_info=True)
# ── 4. FDW 批量查询 lastVisitDays优雅降级 ──
last_visit_map: dict[int, int | None] = {}
try:
last_visit_map = fdw_queries.get_last_visit_days(conn, site_id, member_ids)
except Exception:
logger.warning("FDW 查询 lastVisitDays 失败", exc_info=True)
# ── 5. RS 指数(用于 heart_score ──
rs_map: dict[int, Decimal] = {}
try:
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
cur.execute(
"""
SELECT member_id, COALESCE(rs_display, 0)
FROM fdw_etl.v_dws_member_assistant_relation_index
WHERE assistant_id = %s AND member_id = ANY(%s)
""",
(assistant_id, member_ids),
)
for row in cur.fetchall():
rs_map[row[0]] = Decimal(str(row[1]))
conn.commit()
except Exception:
logger.warning("FDW 查询 RS 指数失败", exc_info=True)
try:
conn.rollback()
except Exception:
pass
# ── 6. 查询 ai_cache 获取 aiSuggestion优雅降级 ──
ai_suggestion_map: dict[int, str] = {}
try:
member_id_strs = [str(mid) for mid in member_ids]
with conn.cursor() as cur:
cur.execute(
"""
SELECT target_id, result_json
FROM biz.ai_cache
WHERE site_id = %s
AND target_id = ANY(%s)
AND cache_type = 'app4_analysis'
ORDER BY created_at DESC
""",
(site_id, member_id_strs),
)
seen: set[str] = set()
for row in cur.fetchall():
target_id_str = str(row[0])
if target_id_str not in seen:
seen.add(target_id_str)
result = row[1] if isinstance(row[1], dict) else {}
summary = result.get("summary", "")
if summary:
ai_suggestion_map[int(target_id_str)] = summary
conn.commit()
except Exception:
logger.warning("查询 ai_cache aiSuggestion 失败", exc_info=True)
# ── 7. 查询备注存在性has_note ──
task_ids = [t[0] for t in tasks]
has_note_set: set[int] = set()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT DISTINCT task_id
FROM biz.notes
WHERE task_id = ANY(%s)
""",
(task_ids,),
)
for row in cur.fetchall():
has_note_set.add(row[0])
conn.commit()
except Exception:
logger.warning("查询备注存在性失败", exc_info=True)
# ── 8. 绩效概览 ──
performance = _build_performance_summary(conn, site_id, assistant_id)
# ── 9. 组装 items ──
items = []
for task_row in tasks:
(task_id, task_type, task_status, priority_score,
is_pinned, expires_at, created_at, member_id, abandon_reason) = task_row
info = member_info_map.get(member_id, {})
customer_name = info.get("nickname") or info.get("member_name") or "未知客户"
rs_score = rs_map.get(member_id, Decimal("0"))
balance = balance_map.get(member_id)
items.append({
"id": task_id,
"customer_name": customer_name,
"customer_avatar": "/assets/images/avatar-default.png",
"task_type": task_type,
"task_type_label": _TASK_TYPE_LABEL_MAP.get(task_type, task_type),
"deadline": _format_time(expires_at),
"heart_score": float(rs_score),
"hobbies": [], # 暂无数据源,返回空数组
"is_pinned": bool(is_pinned),
"has_note": task_id in has_note_set,
"status": task_status,
"last_visit_days": last_visit_map.get(member_id),
"balance": float(balance) if balance is not None else None,
"ai_suggestion": ai_suggestion_map.get(member_id),
})
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size,
"performance": performance,
}
finally:
conn.close()
def _build_performance_summary(conn, site_id: int, assistant_id: int) -> dict:
"""
构建绩效概览PerformanceSummary
从 fdw_queries.get_salary_calc 获取当月和上月数据,
计算收入趋势。
"""
now = datetime.now()
year, month = now.year, now.month
# 当月绩效
salary = None
try:
salary = fdw_queries.get_salary_calc(conn, site_id, assistant_id, year, month)
except Exception:
logger.warning("FDW 查询当月绩效失败", exc_info=True)
# 上月绩效(用于收入趋势)
prev_year, prev_month = (year, month - 1) if month > 1 else (year - 1, 12)
prev_salary = None
try:
prev_salary = fdw_queries.get_salary_calc(conn, site_id, assistant_id, prev_year, prev_month)
except Exception:
logger.warning("FDW 查询上月绩效失败", exc_info=True)
current_income = salary["total_income"] if salary else 0.0
prev_income = prev_salary["total_income"] if prev_salary else 0.0
income_trend, income_trend_dir = compute_income_trend(current_income, prev_income)
tier_nodes = salary["tier_nodes"] if salary and salary.get("tier_nodes") else [0]
# tier_nodes 可能是 JSON 字符串或列表
if isinstance(tier_nodes, str):
try:
tier_nodes = json.loads(tier_nodes)
except (json.JSONDecodeError, TypeError):
tier_nodes = [0]
return {
"total_hours": salary["total_hours"] if salary else 0.0,
"total_income": current_income,
"total_customers": salary["total_customers"] if salary else 0,
"month_label": f"{month}",
"tier_nodes": [float(n) for n in tier_nodes] if tier_nodes else [0],
"basic_hours": salary["basic_hours"] if salary else 0.0,
"bonus_hours": salary["bonus_hours"] if salary else 0.0,
"current_tier": salary["tier_index"] if salary else 0,
"next_tier_hours": salary["next_tier_hours"] if salary else 0.0,
"tier_completed": salary["tier_completed"] if salary else False,
"bonus_money": 0.0 if (salary and salary.get("tier_completed")) else (salary["bonus_money"] if salary else 0.0),
"income_trend": income_trend,
"income_trend_dir": income_trend_dir,
"prev_month": f"{prev_month}",
"current_tier_label": salary["coach_level"] if salary else "",
}
# ---------------------------------------------------------------------------
# RNS1.1get_task_detailTASK-2 任务详情完整版)
# ---------------------------------------------------------------------------
async def get_task_detail(
task_id: int,
user_id: int,
site_id: int,
) -> dict:
"""
任务详情完整版TASK-2
返回基础信息 + retentionClues + talkingPoints + serviceSummary
+ serviceRecords + aiAnalysis + notes + customerId。
权限校验:任务不存在 → 404不属于当前助教 → 403。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# ── 1. 查询任务基础信息 ──
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, task_type, status, priority_score, is_pinned,
expires_at, created_at, member_id, abandon_reason,
assistant_id, site_id
FROM biz.coach_tasks
WHERE id = %s
""",
(task_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="任务不存在")
task_assistant_id = row[9]
task_site_id = row[10]
if task_site_id != site_id or task_assistant_id != assistant_id:
raise HTTPException(status_code=403, detail="无权访问该任务")
member_id = row[7]
task_type = row[1]
task_status = row[2]
is_pinned = row[4]
expires_at = row[5]
# ── 2. FDW 查询会员信息 ──
member_info_map: dict[int, dict] = {}
try:
member_info_map = fdw_queries.get_member_info(conn, site_id, [member_id])
except Exception:
logger.warning("FDW 查询会员信息失败", exc_info=True)
info = member_info_map.get(member_id, {})
customer_name = info.get("nickname") or "未知客户"
# RS 指数
rs_score = Decimal("0")
try:
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
cur.execute(
"""
SELECT COALESCE(rs_display, 0)
FROM fdw_etl.v_dws_member_assistant_relation_index
WHERE assistant_id = %s AND member_id = %s
""",
(assistant_id, member_id),
)
rs_row = cur.fetchone()
if rs_row:
rs_score = Decimal(str(rs_row[0]))
conn.commit()
except Exception:
logger.warning("FDW 查询 RS 指数失败", exc_info=True)
try:
conn.rollback()
except Exception:
pass
# ── 3. 查询维客线索 ──
retention_clues = []
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, category, summary, detail, source
FROM public.member_retention_clue
WHERE member_id = %s AND site_id = %s
ORDER BY recorded_at DESC
""",
(member_id, site_id),
)
for clue_row in cur.fetchall():
category = clue_row[1] or ""
summary_raw = clue_row[2] or ""
detail = clue_row[3]
source = clue_row[4] or "manual"
emoji, text = _extract_emoji_and_text(summary_raw)
tag = sanitize_tag(category)
tag_color = _CATEGORY_COLOR_MAP.get(tag, "#999999")
retention_clues.append({
"tag": tag,
"tag_color": tag_color,
"emoji": emoji,
"text": text,
"source": source,
"desc": detail,
})
conn.commit()
except Exception:
logger.warning("查询维客线索失败", exc_info=True)
# ── 4. 查询 AI 缓存talkingPoints + aiAnalysis ──
talking_points: list[str] = []
ai_analysis = {"summary": "", "suggestions": []}
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT cache_type, result_json
FROM biz.ai_cache
WHERE target_id = %s AND site_id = %s
AND cache_type IN ('app4_analysis', 'app5_talking_points')
ORDER BY created_at DESC
""",
(str(member_id), site_id),
)
seen_types: set[str] = set()
for cache_row in cur.fetchall():
cache_type = cache_row[0]
if cache_type in seen_types:
continue
seen_types.add(cache_type)
result = cache_row[1] if isinstance(cache_row[1], dict) else {}
if cache_type == "app5_talking_points":
# talkingPoints: 话术列表
points = result.get("talking_points", [])
if isinstance(points, list):
talking_points = [str(p) for p in points]
elif cache_type == "app4_analysis":
# aiAnalysis: summary + suggestions
ai_analysis = {
"summary": result.get("summary", ""),
"suggestions": result.get("suggestions", []),
}
conn.commit()
except Exception:
logger.warning("查询 AI 缓存失败", exc_info=True)
# ── 5. FDW 查询服务记录(最多 20 条) ──
service_records_raw: list[dict] = []
try:
service_records_raw = fdw_queries.get_service_records_for_task(
conn, site_id, assistant_id, member_id, limit=20
)
except Exception:
logger.warning("FDW 查询服务记录失败", exc_info=True)
service_records = []
total_hours = 0.0
total_income = 0.0
for rec in service_records_raw:
hours = rec.get("service_hours", 0.0)
income = rec.get("income", 0.0)
total_hours += hours
total_income += income
# 时间格式化
settle_time = rec.get("settle_time")
date_str = ""
if settle_time:
if hasattr(settle_time, "strftime"):
date_str = settle_time.strftime("%Y-%m-%d")
else:
date_str = str(settle_time)[:10]
raw_course_type = rec.get("course_type", "")
type_class = map_course_type_class(raw_course_type)
service_records.append({
"table": rec.get("table_name"),
"type": raw_course_type or "基础课",
"type_class": type_class,
"record_type": "recharge" if type_class == "recharge" else "course",
"duration": hours,
"duration_raw": rec.get("service_hours_raw"),
"income": income,
"is_estimate": rec.get("is_estimate"),
"drinks": None,
"date": date_str,
})
avg_income = total_income / len(service_records) if service_records else 0.0
service_summary = {
"total_hours": round(total_hours, 2),
"total_income": round(total_income, 2),
"avg_income": round(avg_income, 2),
}
# ── 6. 查询备注(最多 20 条) ──
notes: list[dict] = []
has_note = False
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, content, type, ai_score, created_at
FROM biz.notes
WHERE task_id = %s
ORDER BY created_at DESC
LIMIT 20
""",
(task_id,),
)
for note_row in cur.fetchall():
has_note = True
note_type = note_row[2] or "normal"
# type → tag_type/tag_label 映射
tag_label = "回访" if note_type == "follow_up" else "普通"
notes.append({
"id": note_row[0],
"content": note_row[1] or "",
"tag_type": note_type,
"tag_label": tag_label,
"created_at": _format_time(note_row[4]) or "",
"score": note_row[3],
})
conn.commit()
except Exception:
logger.warning("查询备注失败", exc_info=True)
# ── 7. 组装 TaskDetailResponse ──
return {
"id": task_id,
"customer_name": customer_name,
"customer_avatar": "/assets/images/avatar-default.png",
"task_type": task_type,
"task_type_label": _TASK_TYPE_LABEL_MAP.get(task_type, task_type),
"deadline": _format_time(expires_at),
"heart_score": float(rs_score),
"hobbies": [],
"is_pinned": bool(is_pinned),
"has_note": has_note,
"status": task_status,
"customer_id": member_id,
"retention_clues": retention_clues,
"talking_points": talking_points,
"service_summary": service_summary,
"service_records": service_records,
"ai_analysis": ai_analysis,
"notes": notes,
}
finally:
conn.close()