feat: batch update - gift card breakdown spec, backend APIs, miniprogram pages, ETL finance recharge, docs & migrations

This commit is contained in:
Neo
2026-03-20 01:43:48 +08:00
parent 075caf067f
commit 79f9a0e1da
437 changed files with 118603 additions and 976 deletions

View File

@@ -0,0 +1,663 @@
# AI_CHANGELOG
# - 2026-03-20 | Prompt: RNS1.3 E2E 修复 | _build_recharge 环比比较逻辑修正compare 参数传递),
# _empty_revenue 新增确保包含所有必需字段skills 暂返回空列表
"""
看板业务逻辑服务层。
提供 BOARD-1助教看板、BOARD-2客户看板、BOARD-3财务看板
日期范围计算、环比计算等通用工具函数,以及各看板的编排函数。
"""
from __future__ import annotations
import calendar
from datetime import date, timedelta
from decimal import Decimal, ROUND_HALF_UP
# ---------------------------------------------------------------------------
# 通用工具函数
# ---------------------------------------------------------------------------
def _calc_date_range(
time_enum: str, ref_date: date | None = None
) -> tuple[date, date]:
"""
根据时间枚举计算当期日期范围。
支持 BOARD-1 的 6 种枚举snake_case和 BOARD-3 的 8 种枚举camelCase
同义枚举(如 last_month / lastMonth映射到相同逻辑。
返回 (start_date, end_date),均为 date 类型end_date 为区间末日(含)。
"""
today = ref_date or date.today()
# --- 当月 ---
if time_enum == "month":
start = today.replace(day=1)
end = today.replace(day=calendar.monthrange(today.year, today.month)[1])
return start, end
# --- 上月 ---
if time_enum in ("last_month", "lastMonth"):
first_of_this_month = today.replace(day=1)
last_month_end = first_of_this_month - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
return last_month_start, last_month_end
# --- 本周(周一 ~ 周日)---
if time_enum == "week":
monday = today - timedelta(days=today.weekday())
sunday = monday + timedelta(days=6)
return monday, sunday
# --- 上周 ---
if time_enum == "lastWeek":
this_monday = today - timedelta(days=today.weekday())
last_sunday = this_monday - timedelta(days=1)
last_monday = last_sunday - timedelta(days=6)
return last_monday, last_sunday
# --- 本季度 ---
if time_enum == "quarter":
q_start_month = (today.month - 1) // 3 * 3 + 1
start = date(today.year, q_start_month, 1)
q_end_month = q_start_month + 2
end = date(today.year, q_end_month, calendar.monthrange(today.year, q_end_month)[1])
return start, end
# --- 上季度 ---
if time_enum in ("last_quarter", "lastQuarter"):
q_start_month = (today.month - 1) // 3 * 3 + 1
# 上季度末日 = 本季度首日前一天
this_q_start = date(today.year, q_start_month, 1)
prev_q_end = this_q_start - timedelta(days=1)
prev_q_start_month = (prev_q_end.month - 1) // 3 * 3 + 1
prev_q_start = date(prev_q_end.year, prev_q_start_month, 1)
return prev_q_start, prev_q_end
# --- 前 3 个月(不含本月)---
if time_enum in ("last_3m", "quarter3"):
# end = 上月末日
first_of_this_month = today.replace(day=1)
end = first_of_this_month - timedelta(days=1)
# start = 往前推 3 个月的首日
start = _month_offset(first_of_this_month, -3)
return start, end
# --- 前 6 个月(不含本月)---
if time_enum in ("last_6m", "half6"):
first_of_this_month = today.replace(day=1)
end = first_of_this_month - timedelta(days=1)
start = _month_offset(first_of_this_month, -6)
return start, end
raise ValueError(f"不支持的时间枚举: {time_enum}")
def _month_offset(d: date, months: int) -> date:
"""将日期 d 偏移 months 个月,保持 day=1。仅用于内部月份偏移计算。"""
# d 应为某月 1 日
total_months = d.year * 12 + (d.month - 1) + months
y, m = divmod(total_months, 12)
m += 1
return date(y, m, 1)
def _calc_prev_range(start_date: date, end_date: date) -> tuple[date, date]:
"""
根据当期范围计算上期日期范围。
上期长度等于当期长度prev_end = start_date - 1 天。
"""
period_length = (end_date - start_date).days + 1
prev_end = start_date - timedelta(days=1)
prev_start = prev_end - timedelta(days=period_length - 1)
return prev_start, prev_end
def calc_compare(current: Decimal, previous: Decimal) -> dict:
"""
统一环比计算。
返回:
- compare: str — "12.5%" / "新增" / "持平"
- is_down: bool — 是否下降
- is_flat: bool — 是否持平
规则:
- previous=0, current≠0 → "新增", is_down=False, is_flat=False
- previous=0, current=0 → "持平", is_down=False, is_flat=True
- 正常计算: (current - previous) / previous × 100%
- 正值 → is_down=False; 负值 → is_down=True; 零 → is_flat=True
"""
if previous == 0:
if current != 0:
return {"compare": "新增", "is_down": False, "is_flat": False}
return {"compare": "持平", "is_down": False, "is_flat": True}
diff = current - previous
pct = (diff / previous * 100).quantize(Decimal("0.1"), rounding=ROUND_HALF_UP)
if pct > 0:
return {"compare": f"{pct}%", "is_down": False, "is_flat": False}
if pct < 0:
return {"compare": f"{abs(pct)}%", "is_down": True, "is_flat": False}
# pct == 0当期 == 上期)
return {"compare": "持平", "is_down": False, "is_flat": True}
import logging
from typing import Any
from fastapi import HTTPException
from app.services import fdw_queries
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免纯函数测试时触发模块级导入失败。"""
from app.database import get_connection
return get_connection()
# ---------------------------------------------------------------------------
# 排序映射
# ---------------------------------------------------------------------------
_SORT_KEY_MAP = {
"perf_desc": ("perf_hours", True),
"perf_asc": ("perf_hours", False),
"salary_desc": ("salary", True),
"salary_asc": ("salary", False),
"sv_desc": ("sv_amount", True),
"task_desc": ("task_total", True),
}
_SORT_DIM_MAP = {
"perf_desc": "perf", "perf_asc": "perf",
"salary_desc": "salary", "salary_asc": "salary",
"sv_desc": "sv", "task_desc": "task",
}
# ---------------------------------------------------------------------------
# BOARD-1 助教看板
# ---------------------------------------------------------------------------
async def get_coach_board(
sort: str, skill: str, time: str, site_id: int
) -> dict:
"""
BOARD-1助教看板。扁平返回所有维度字段。
参数互斥time=last_6m + sort=sv_desc → HTTP 400。
"""
# 参数互斥校验
if time == "last_6m" and sort == "sv_desc":
raise HTTPException(
status_code=400,
detail="最近6个月不支持客源储值排序",
)
start_date, end_date = _calc_date_range(time)
start_str = str(start_date)
end_str = str(end_date)
conn = _get_connection()
try:
# 1. 助教列表
assistants = fdw_queries.get_all_assistants(conn, site_id, skill)
if not assistants:
return {"items": [], "dim_type": _SORT_DIM_MAP.get(sort, "perf")}
aid_list = [a["assistant_id"] for a in assistants]
# 2. 批量查询绩效
salary_map = fdw_queries.get_salary_calc_batch(
conn, site_id, aid_list, start_str, end_str
)
# 3. Top 客户(降级为空)
top_map: dict[int, list[str]] = {}
try:
top_map = fdw_queries.get_top_customers_for_coaches(
conn, site_id, aid_list
)
except Exception:
logger.warning("BOARD-1 topCustomers 查询失败,降级为空", exc_info=True)
# 4. 储值数据
sv_map: dict[int, dict] = {}
try:
sv_map = fdw_queries.get_coach_sv_data(
conn, site_id, aid_list, start_str, end_str
)
except Exception:
logger.warning("BOARD-1 sv 数据查询失败,降级为空", exc_info=True)
# 5. 任务数据
task_map = _query_coach_tasks(conn, site_id, aid_list, start_str, end_str)
# 6. 组装扁平响应
items = []
for a in assistants:
aid = a["assistant_id"]
sal = salary_map.get(aid, {})
sv = sv_map.get(aid, {})
tasks = task_map.get(aid, {})
top_custs = top_map.get(aid, [])
name = a["name"]
initial = name[0] if name else ""
perf_hours = sal.get("effective_hours", 0.0)
salary_val = sal.get("gross_salary", 0.0)
task_recall = tasks.get("recall", 0)
task_callback = tasks.get("callback", 0)
items.append({
"id": aid,
"name": name,
"initial": initial,
"avatar_gradient": "",
"level": sal.get("level_name", a.get("level", "")),
"skills": [], # CHANGE 2026-03-20 | v_dim_assistant 无 skill 列,暂返回空
"top_customers": top_custs,
"perf_hours": perf_hours,
"perf_hours_before": None,
"perf_gap": None,
"perf_reached": False,
"salary": salary_val,
"salary_perf_hours": perf_hours,
"salary_perf_before": None,
"sv_amount": sv.get("sv_amount", 0.0),
"sv_customer_count": sv.get("sv_customer_count", 0),
"sv_consume": sv.get("sv_consume", 0.0),
"task_recall": task_recall,
"task_callback": task_callback,
"task_total": task_recall + task_callback,
})
# 7. 排序
sort_key, sort_desc = _SORT_KEY_MAP.get(sort, ("perf_hours", True))
items.sort(key=lambda x: x.get(sort_key, 0), reverse=sort_desc)
# 移除内部排序字段
for item in items:
item.pop("task_total", None)
return {
"items": items,
"dim_type": _SORT_DIM_MAP.get(sort, "perf"),
}
finally:
conn.close()
def _query_coach_tasks(
conn: Any, site_id: int, assistant_ids: list[int],
start_date: str, end_date: str,
) -> dict[int, dict]:
"""
查询助教任务完成数BOARD-1 task 维度)。
来源: biz.coach_tasks按 task_type 分类统计 recall/callback。
"""
if not assistant_ids:
return {}
result: dict[int, dict] = {}
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT assistant_id,
COUNT(*) FILTER (WHERE task_type = 'recall') AS recall_count,
COUNT(*) FILTER (WHERE task_type = 'callback') AS callback_count
FROM biz.coach_tasks
WHERE assistant_id = ANY(%s)
AND site_id = %s
AND completed_at >= %s::date
AND completed_at <= %s::date
AND status = 'completed'
GROUP BY assistant_id
""",
(assistant_ids, site_id, start_date, end_date),
)
for row in cur.fetchall():
result[row[0]] = {
"recall": row[1] or 0,
"callback": row[2] or 0,
}
conn.commit()
except Exception:
logger.warning("BOARD-1 任务查询失败,降级为空", exc_info=True)
return result
# ---------------------------------------------------------------------------
# BOARD-2 客户看板
# ---------------------------------------------------------------------------
# 维度 → FDW 查询函数映射
_DIMENSION_QUERY_MAP = {
"recall": "get_customer_board_recall",
"potential": "get_customer_board_potential",
"balance": "get_customer_board_balance",
"recharge": "get_customer_board_recharge",
"recent": "get_customer_board_recent",
"spend60": "get_customer_board_spend60",
"freq60": "get_customer_board_freq60",
"loyal": "get_customer_board_loyal",
}
async def get_customer_board(
dimension: str, project: str, page: int, page_size: int, site_id: int
) -> dict:
"""
BOARD-2客户看板。按维度返回专属字段 + 分页。
"""
query_fn_name = _DIMENSION_QUERY_MAP.get(dimension)
if not query_fn_name:
raise HTTPException(status_code=400, detail=f"不支持的维度: {dimension}")
query_fn = getattr(fdw_queries, query_fn_name)
conn = _get_connection()
try:
# 1. 按维度查询分页数据
result = query_fn(conn, site_id, project, page, page_size)
items = result["items"]
# 2. 批量查询客户关联助教
member_ids = [item["member_id"] for item in items if item.get("member_id")]
assistants_map: dict[int, list[dict]] = {}
if member_ids:
try:
assistants_map = fdw_queries.get_customer_assistants(
conn, site_id, member_ids
)
except Exception:
logger.warning("BOARD-2 客户助教查询失败,降级为空", exc_info=True)
# 3. 组装响应(添加基础字段 + assistants
for item in items:
mid = item.get("member_id", 0)
name = item.get("name", "")
item["id"] = mid
item["initial"] = name[0] if name else ""
item["avatar_cls"] = ""
item["assistants"] = assistants_map.get(mid, [])
return {
"items": items,
"total": result["total"],
"page": result["page"],
"page_size": result["page_size"],
}
finally:
conn.close()
# ---------------------------------------------------------------------------
# BOARD-3 财务看板
# ---------------------------------------------------------------------------
async def get_finance_board(
time: str, area: str, compare: int, site_id: int
) -> dict:
"""
BOARD-3财务看板。6 板块独立查询、独立降级。
area≠all 时 recharge 返回 null。
compare=1 时计算上期范围并调用 calc_compare。
compare=0 时环比字段为 None序列化时排除
"""
start_date, end_date = _calc_date_range(time)
start_str = str(start_date)
end_str = str(end_date)
prev_start_str = None
prev_end_str = None
if compare == 1:
prev_start, prev_end = _calc_prev_range(start_date, end_date)
prev_start_str = str(prev_start)
prev_end_str = str(prev_end)
conn = _get_connection()
try:
# 各板块独立 try/except
overview = _build_overview(conn, site_id, start_str, end_str,
prev_start_str, prev_end_str, compare)
recharge = None
if area == "all":
recharge = _build_recharge(conn, site_id, start_str, end_str,
prev_start_str, prev_end_str, compare)
revenue = _build_revenue(conn, site_id, start_str, end_str, area)
cashflow = _build_cashflow(conn, site_id, start_str, end_str,
prev_start_str, prev_end_str, compare)
expense = _build_expense(conn, site_id, start_str, end_str,
prev_start_str, prev_end_str, compare)
coach_analysis = _build_coach_analysis(conn, site_id, start_str, end_str,
prev_start_str, prev_end_str, compare)
return {
"overview": overview,
"recharge": recharge,
"revenue": revenue,
"cashflow": cashflow,
"expense": expense,
"coach_analysis": coach_analysis,
}
finally:
conn.close()
def _build_overview(
conn: Any, site_id: int, start: str, end: str,
prev_start: str | None, prev_end: str | None, compare: int,
) -> dict:
"""经营一览板块。"""
try:
data = fdw_queries.get_finance_overview(conn, site_id, start, end)
except Exception:
logger.warning("overview 查询失败,降级为空", exc_info=True)
return _empty_overview()
result = {**data}
if compare == 1 and prev_start and prev_end:
try:
prev = fdw_queries.get_finance_overview(conn, site_id, prev_start, prev_end)
_attach_compare(result, data, prev, [
"occurrence", "discount", "discount_rate", "confirmed_revenue",
"cash_in", "cash_out", "cash_balance", "balance_rate",
])
except Exception:
logger.warning("overview 环比查询失败", exc_info=True)
return result
def _build_recharge(
conn: Any, site_id: int, start: str, end: str,
prev_start: str | None, prev_end: str | None, compare: int,
) -> dict | None:
"""预收资产板块。"""
try:
data = fdw_queries.get_finance_recharge(conn, site_id, start, end)
except Exception:
logger.warning("recharge 查询失败,降级为 null", exc_info=True)
return None
if compare == 1 and prev_start and prev_end:
try:
prev = fdw_queries.get_finance_recharge(conn, site_id, prev_start, prev_end)
_attach_compare(data, data, prev, [
"actual_income", "first_charge", "renew_charge",
"consumed", "card_balance",
])
# 赠送卡矩阵环比
for i, row in enumerate(data.get("gift_rows", [])):
prev_row = prev.get("gift_rows", [{}] * 3)[i] if i < len(prev.get("gift_rows", [])) else {}
for key in ["total", "liquor", "table_fee", "voucher"]:
# gift_rows 的 cell 是 GiftCell dict{"value": float}
cur_cell = row.get(key, {})
prev_cell = prev_row.get(key, {})
cur_val = Decimal(str(cur_cell.get("value", 0) if isinstance(cur_cell, dict) else cur_cell))
prev_val = Decimal(str(prev_cell.get("value", 0) if isinstance(prev_cell, dict) else prev_cell))
cmp = calc_compare(cur_val, prev_val)
if isinstance(cur_cell, dict):
cur_cell["compare"] = cmp["compare"]
cur_cell["down"] = cmp["is_down"]
cur_cell["flat"] = cmp["is_flat"]
else:
row[key] = {"value": float(cur_val), "compare": cmp["compare"], "down": cmp["is_down"], "flat": cmp["is_flat"]}
except Exception:
logger.warning("recharge 环比查询失败", exc_info=True)
return data
def _build_revenue(
conn: Any, site_id: int, start: str, end: str, area: str,
) -> dict:
"""应计收入板块。"""
try:
return fdw_queries.get_finance_revenue(conn, site_id, start, end, area)
except Exception:
logger.warning("revenue 查询失败,降级为空", exc_info=True)
return _empty_revenue()
def _build_cashflow(
conn: Any, site_id: int, start: str, end: str,
prev_start: str | None, prev_end: str | None, compare: int,
) -> dict:
"""现金流入板块。"""
try:
data = fdw_queries.get_finance_cashflow(conn, site_id, start, end)
except Exception:
logger.warning("cashflow 查询失败,降级为空", exc_info=True)
return {"consume_items": [], "recharge_items": [], "total": 0.0}
return data
def _build_expense(
conn: Any, site_id: int, start: str, end: str,
prev_start: str | None, prev_end: str | None, compare: int,
) -> dict:
"""现金流出板块。"""
try:
data = fdw_queries.get_finance_expense(conn, site_id, start, end)
except Exception:
logger.warning("expense 查询失败,降级为空", exc_info=True)
return {
"operation_items": [], "fixed_items": [],
"coach_items": [], "platform_items": [], "total": 0.0,
}
if compare == 1 and prev_start and prev_end:
try:
prev = fdw_queries.get_finance_expense(conn, site_id, prev_start, prev_end)
total_cmp = calc_compare(
Decimal(str(data["total"])), Decimal(str(prev["total"]))
)
data["total_compare"] = total_cmp["compare"]
data["total_down"] = total_cmp["is_down"]
data["total_flat"] = total_cmp["is_flat"]
except Exception:
logger.warning("expense 环比查询失败", exc_info=True)
return data
def _build_coach_analysis(
conn: Any, site_id: int, start: str, end: str,
prev_start: str | None, prev_end: str | None, compare: int,
) -> dict:
"""助教分析板块。"""
try:
data = fdw_queries.get_finance_coach_analysis(conn, site_id, start, end)
except Exception:
logger.warning("coachAnalysis 查询失败,降级为空", exc_info=True)
empty_table = {"total_pay": 0.0, "total_share": 0.0, "avg_hourly": 0.0, "rows": []}
return {"basic": empty_table, "incentive": {**empty_table}}
if compare == 1 and prev_start and prev_end:
try:
prev = fdw_queries.get_finance_coach_analysis(
conn, site_id, prev_start, prev_end
)
for key in ("basic", "incentive"):
cur_t = data[key]
prev_t = prev[key]
_attach_compare(cur_t, cur_t, prev_t, [
"total_pay", "total_share", "avg_hourly",
])
except Exception:
logger.warning("coachAnalysis 环比查询失败", exc_info=True)
return data
# ---------------------------------------------------------------------------
# 环比辅助
# ---------------------------------------------------------------------------
def _attach_compare(
target: dict, current: dict, previous: dict, fields: list[str]
) -> None:
"""为 target dict 中的指定字段附加环比三元组。"""
for field in fields:
cur_val = Decimal(str(current.get(field, 0)))
prev_val = Decimal(str(previous.get(field, 0)))
cmp = calc_compare(cur_val, prev_val)
target[f"{field}_compare"] = cmp["compare"]
target[f"{field}_down"] = cmp["is_down"]
target[f"{field}_flat"] = cmp["is_flat"]
# ---------------------------------------------------------------------------
# 空默认值工厂(优雅降级用)
# ---------------------------------------------------------------------------
def _empty_overview() -> dict:
return {
"occurrence": 0.0, "discount": 0.0, "discount_rate": 0.0,
"confirmed_revenue": 0.0, "cash_in": 0.0, "cash_out": 0.0,
"cash_balance": 0.0, "balance_rate": 0.0,
}
def _empty_revenue() -> dict:
"""应计收入空默认值(优雅降级用)。
CHANGE 2026-03-20 | 新增,确保包含所有必需字段:
price_items, total_occurrence, confirmed_total, channel_items
"""
return {
"structure_rows": [],
"price_items": [],
"total_occurrence": 0.0,
"discount_items": [],
"confirmed_total": 0.0,
"channel_items": [],
}

View File

@@ -0,0 +1,708 @@
# -*- coding: utf-8 -*-
"""
助教服务 —— COACH-1 助教详情。
数据来源:
- ETL 直连fdw_queries助教信息、绩效、TOP 客户、服务记录、历史月份
- 业务库biz.*):助教任务、备注
⚠️ DWD-DOC 强制规则:
- 规则 1: 金额使用 items_sum 口径ledger_amount禁止 consume_money
- 规则 2: 助教费用使用 assistant_pd_money + assistant_cx_money禁止 service_fee
- DQ-6: 会员信息通过 member_id JOIN v_dim_member (scd2_is_current=1)
- DQ-7: 余额通过 member_id JOIN v_dim_member_card_account (scd2_is_current=1)
- 废单排除: is_delete = 0
"""
from __future__ import annotations
import datetime
import logging
from fastapi import HTTPException
from decimal import Decimal
from app.services import fdw_queries
from app.services.task_generator import compute_heart_icon
logger = logging.getLogger(__name__)
# ── 颜色/样式映射 ──────────────────────────────────────────
LEVEL_COLOR_MAP = {
"星级": "#FF6B6B",
"高级": "#FFA726",
"中级": "#42A5F5",
"初级": "#66BB6A",
}
TASK_TYPE_MAP = {
"follow_up_visit": {"label": "回访", "class": "tag-callback"},
"high_priority_recall": {"label": "紧急召回", "class": "tag-recall"},
"priority_recall": {"label": "优先召回", "class": "tag-recall"},
}
# 头像渐变色池(循环使用)
AVATAR_GRADIENTS = [
"from-blue-400 to-blue-600",
"from-green-400 to-green-600",
"from-purple-400 to-purple-600",
"from-orange-400 to-orange-600",
"from-pink-400 to-pink-600",
]
# CHANGE 2026-03-19 | feiqiu-data-rules 规则 6 修复 | 删除硬编码 DEFAULT_TIER_NODES
# 档位节点改为从 cfg_performance_tier 配置表动态读取。
# 旧值 [0, 100, 130, 160, 190, 220] 与配置表实际值 [0, 120, 150, 180, 210] 不一致。
_FALLBACK_TIER_NODES: list[float] = [0, 120, 150, 180, 210] # 仅在配置表查询失败时使用
def _get_biz_connection():
"""延迟导入业务库连接。"""
from app.database import get_connection
return get_connection()
def _get_initial(name: str) -> str:
"""取姓氏首字作为头像文字。"""
return name[0] if name else "?"
def _get_avatar_gradient(index: int) -> str:
"""根据索引循环分配头像渐变色。"""
return AVATAR_GRADIENTS[index % len(AVATAR_GRADIENTS)]
def _format_currency(amount: float) -> str:
"""格式化金额¥6,950。"""
if amount >= 10000:
return f"¥{amount:,.0f}"
return f"¥{amount:,.0f}"
# ── 6.1 核心函数 ──────────────────────────────────────────
async def get_coach_detail(coach_id: int, site_id: int) -> dict:
"""
助教详情COACH-1
核心字段查询失败 → 500扩展模块查询失败 → 空默认值(优雅降级)。
"""
conn = _get_biz_connection()
try:
# ── 核心字段(失败直接抛 500──
assistant_info = fdw_queries.get_assistant_info(conn, site_id, coach_id)
if not assistant_info:
raise HTTPException(status_code=404, detail="助教不存在")
now = datetime.date.today()
# 绩效数据(当月)
salary_this = fdw_queries.get_salary_calc(
conn, site_id, coach_id, now.year, now.month
)
if not salary_this:
salary_this = {}
# customerBalance该助教所有客户余额合计
customer_balance = 0.0
try:
top_custs = fdw_queries.get_coach_top_customers(conn, site_id, coach_id, limit=1000)
member_ids = [c["member_id"] for c in top_custs if c.get("member_id")]
if member_ids:
balance_map = fdw_queries.get_member_balance(conn, site_id, member_ids)
customer_balance = sum(float(v) for v in balance_map.values())
except Exception:
logger.warning("查询 customerBalance 失败,降级为 0", exc_info=True)
# tasksCompleted当月已完成任务数
tasks_completed = 0
try:
month_start = now.replace(day=1)
with conn.cursor() as cur:
cur.execute(
"""
SELECT COUNT(*)
FROM biz.coach_tasks
WHERE assistant_id = %s
AND status = 'completed'
AND updated_at >= %s
""",
(coach_id, month_start),
)
row = cur.fetchone()
tasks_completed = row[0] if row else 0
except Exception:
logger.warning("查询 tasksCompleted 失败,降级为 0", exc_info=True)
# customerCount不重复客户数从 top_customers 获取)
customer_count = 0
try:
cc_map = fdw_queries.get_monthly_customer_count(
conn, site_id, coach_id, [now.strftime("%Y-%m-01")]
)
customer_count = sum(cc_map.values())
except Exception:
logger.warning("查询 customerCount 失败,降级为 0", exc_info=True)
performance = {
"monthly_hours": salary_this.get("total_hours", 0.0),
"monthly_salary": salary_this.get("total_income", 0.0),
"customer_balance": customer_balance,
"tasks_completed": tasks_completed,
"perf_current": salary_this.get("total_hours", 0.0),
# CHANGE 2026-03-19 | perf_target 从 tier_nodes 推算,不再依赖 salary_calc 的硬编码 0
"perf_target": 0.0, # 占位,下方用 tier_nodes 覆盖
}
# ── 扩展模块(独立 try/except 优雅降级)──
# 收入明细 + 档位
try:
income = _build_income(conn, site_id, coach_id, now)
except Exception:
logger.warning("构建 income 失败,降级为空", exc_info=True)
income = {"this_month": [], "last_month": []}
try:
tier_nodes = _build_tier_nodes(conn, site_id)
except Exception:
logger.warning("构建 tierNodes 失败,降级为 fallback", exc_info=True)
tier_nodes = list(_FALLBACK_TIER_NODES)
# CHANGE 2026-03-19 | 用 tier_nodes 推算 perf_target下一档 min_hours
current_hours = performance["perf_current"]
perf_target = tier_nodes[-1] if tier_nodes else 0.0 # 默认最高档
for node in tier_nodes:
if node > current_hours:
perf_target = node
break
performance["perf_target"] = perf_target
# TOP 客户
try:
top_customers = _build_top_customers(conn, site_id, coach_id)
except Exception:
logger.warning("构建 topCustomers 失败,降级为空列表", exc_info=True)
top_customers = []
# 近期服务记录
try:
service_records = _build_service_records(conn, site_id, coach_id)
except Exception:
logger.warning("构建 serviceRecords 失败,降级为空列表", exc_info=True)
service_records = []
# 任务分组
try:
task_groups = _build_task_groups(coach_id, site_id, conn)
except Exception:
logger.warning("构建 taskGroups 失败,降级为空", exc_info=True)
task_groups = {"visible_tasks": [], "hidden_tasks": [], "abandoned_tasks": []}
# 备注
try:
notes = _build_notes(coach_id, site_id, conn)
except Exception:
logger.warning("构建 notes 失败,降级为空列表", exc_info=True)
notes = []
# 历史月份
try:
history_months = _build_history_months(coach_id, site_id, conn)
except Exception:
logger.warning("构建 historyMonths 失败,降级为空列表", exc_info=True)
history_months = []
return {
# 基础信息
"id": coach_id,
"name": assistant_info.get("name", ""),
"avatar": assistant_info.get("avatar", ""),
"level": salary_this.get("coach_level", assistant_info.get("level", "")),
"skills": assistant_info.get("skills", []),
"work_years": assistant_info.get("work_years", 0.0),
"customer_count": customer_count,
"hire_date": assistant_info.get("hire_date"),
# 绩效
"performance": performance,
# 收入
"income": income,
# 档位
"tier_nodes": tier_nodes,
# 任务分组
"visible_tasks": task_groups["visible_tasks"],
"hidden_tasks": task_groups["hidden_tasks"],
"abandoned_tasks": task_groups["abandoned_tasks"],
# TOP 客户
"top_customers": top_customers,
# 近期服务记录
"service_records": service_records,
# 历史月份
"history_months": history_months,
# 备注
"notes": notes,
}
finally:
conn.close()
# ── 6.2 收入明细 + 档位 ──────────────────────────────────
def _build_income(
conn, site_id: int, coach_id: int, now: datetime.date
) -> dict:
"""
构建 income 模块。
thisMonth/lastMonth 各含 4 项:
- 基础课时费base_income / assistant_pd_money
- 激励课时费bonus_income / assistant_cx_money
- 充值提成
- 酒水提成
⚠️ DWD-DOC 规则 2: 使用 assistant_pd_money + assistant_cx_money 拆分。
"""
# 当月
salary_this = fdw_queries.get_salary_calc(
conn, site_id, coach_id, now.year, now.month
) or {}
# 上月
if now.month == 1:
last_year, last_month = now.year - 1, 12
else:
last_year, last_month = now.year, now.month - 1
salary_last = fdw_queries.get_salary_calc(
conn, site_id, coach_id, last_year, last_month
) or {}
def _make_items(salary: dict) -> list[dict]:
return [
{
"label": "基础课时费",
"amount": f"¥{salary.get('assistant_pd_money_total', 0.0):,.0f}",
"color": "#42A5F5",
},
{
"label": "激励课时费",
"amount": f"¥{salary.get('assistant_cx_money_total', 0.0):,.0f}",
"color": "#FFA726",
},
{
"label": "充值提成",
"amount": f"¥{salary.get('bonus_money', 0.0):,.0f}",
"color": "#66BB6A",
},
{
"label": "酒水提成",
"amount": f"¥{salary.get('room_income', 0.0):,.0f}",
"color": "#AB47BC",
},
]
return {
"this_month": _make_items(salary_this),
"last_month": _make_items(salary_last),
}
def _build_tier_nodes(conn: Any, site_id: int) -> list[float]:
"""
从 cfg_performance_tier 配置表构建 tierNodes 档位节点数组。
⚠️ feiqiu-data-rules 规则 6: 绩效档位必须从配置表读取,禁止硬编码。
"""
# CHANGE 2026-03-19 | feiqiu-data-rules 规则 6 修复 | 从配置表动态读取档位节点
try:
tiers = fdw_queries.get_performance_tiers(conn, site_id)
if tiers:
return [t["min_hours"] for t in tiers]
except Exception:
logger.warning("查询 cfg_performance_tier 失败,使用 fallback", exc_info=True)
return list(_FALLBACK_TIER_NODES)
# ── 6.3 TOP 客户 + 近期服务记录 ──────────────────────────
def _build_top_customers(
conn, site_id: int, coach_id: int
) -> list[dict]:
"""
构建 topCustomers 模块(最多 20 条)。
⚠️ DQ-6: 客户姓名通过 v_dim_member 获取。
⚠️ DQ-7: 余额通过 v_dim_member_card_account 获取。
⚠️ DWD-DOC 规则 1: consume 使用 ledger_amountitems_sum 口径)。
heartEmoji 四级映射P6 AC3rs_display 0-10 刻度):
- score > 8.5 → "💖"
- score > 7 → "🧡"
- score > 5 → "💛"
- score ≤ 5 → "💙"
"""
raw = fdw_queries.get_coach_top_customers(conn, site_id, coach_id, limit=20)
if not raw:
return []
# 获取关系指数(用于 heartEmoji
# 批量获取所有客户的关系指数
member_ids = [c["member_id"] for c in raw if c.get("member_id")]
relation_map: dict[int, float] = {}
for mid in member_ids:
try:
rels = fdw_queries.get_relation_index(conn, site_id, mid)
for r in rels:
if r.get("assistant_id") == coach_id:
relation_map[mid] = r.get("relation_index", 0.0)
break
except Exception:
pass
result = []
for i, cust in enumerate(raw):
mid = cust.get("member_id")
name = cust.get("customer_name", "")
score = relation_map.get(mid, 0.0)
# 四级 heart icon 映射P6 AC3rs_display 0-10 刻度)
heart_emoji = compute_heart_icon(Decimal(str(score)))
if score > 8.5:
score_color = "#FF6B6B"
elif score > 7:
score_color = "#FF8C00"
elif score > 5:
score_color = "#FFA726"
else:
score_color = "#5B9BD5"
balance = cust.get("customer_balance", 0.0)
consume = cust.get("total_consume", 0.0)
result.append({
"id": mid or 0,
"name": name,
"initial": _get_initial(name),
"avatar_gradient": _get_avatar_gradient(i),
"heart_emoji": heart_emoji,
"relation_score": f"{score:.2f}",
"score_color": score_color,
"service_count": cust.get("service_count", 0),
"balance": _format_currency(balance),
"consume": _format_currency(consume),
})
return result
def _build_service_records(
conn, site_id: int, coach_id: int
) -> list[dict]:
"""
构建 serviceRecords 模块。
⚠️ DWD-DOC 规则 1: income 使用 ledger_amount。
⚠️ 废单排除: is_delete = 0。
"""
raw = fdw_queries.get_coach_service_records(
conn, site_id, coach_id, limit=20, offset=0
)
if not raw:
return []
result = []
for i, rec in enumerate(raw):
name = rec.get("customer_name", "")
course_type = rec.get("course_type", "")
# type_class 映射
if "激励" in course_type or "超休" in course_type:
type_class = "tag-bonus"
else:
type_class = "tag-base"
create_time = rec.get("create_time")
date_str = create_time.strftime("%Y-%m-%d %H:%M") if create_time else ""
hours = rec.get("service_hours", 0.0)
income = rec.get("income", 0.0)
result.append({
"customer_id": rec.get("member_id"),
"customer_name": name,
"initial": _get_initial(name),
"avatar_gradient": _get_avatar_gradient(i),
"type": course_type or "课程",
"type_class": type_class,
"table": str(rec.get("table_id")) if rec.get("table_id") else None,
"duration": f"{hours:.1f}h",
"income": _format_currency(income),
"date": date_str,
"perf_hours": None,
})
return result
# ── 6.4 任务分组 + 备注 ──────────────────────────────────
def _build_task_groups(
coach_id: int, site_id: int, conn
) -> dict:
"""
构建任务分组。
1. 查询 biz.coach_tasks WHERE assistant_id=coach_id
2. 按 status 分组active→visibleTasks, inactive→hiddenTasks, abandoned→abandonedTasks
3. visible/hidden关联 biz.notes 获取备注列表
4. abandoned取 abandon_reason
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, member_id, task_type, status, abandon_reason
FROM biz.coach_tasks
WHERE assistant_id = %s
AND status IN ('active', 'inactive', 'abandoned')
ORDER BY created_at DESC
""",
(coach_id,),
)
rows = cur.fetchall()
if not rows:
return {"visible_tasks": [], "hidden_tasks": [], "abandoned_tasks": []}
# 收集客户 ID 批量查询姓名
member_ids = list({r[1] for r in rows if r[1]})
member_name_map: dict[int, str] = {}
if member_ids:
try:
info_map = fdw_queries.get_member_info(conn, site_id, member_ids)
for mid, info in info_map.items():
member_name_map[mid] = info.get("nickname", "")
except Exception:
logger.warning("批量查询客户姓名失败", exc_info=True)
# 收集任务 ID 批量查询备注
task_ids = [r[0] for r in rows if r[3] in ("active", "inactive")]
task_notes_map: dict[int, list[dict]] = {}
if task_ids:
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT task_id, is_pinned, content, created_at
FROM biz.notes
WHERE task_id = ANY(%s)
ORDER BY created_at DESC
""",
(task_ids,),
)
for nr in cur.fetchall():
tid = nr[0]
if tid not in task_notes_map:
task_notes_map[tid] = []
task_notes_map[tid].append({
"pinned": bool(nr[1]),
"text": nr[2] or "",
"date": nr[3].isoformat() if nr[3] else "",
})
except Exception:
logger.warning("批量查询任务备注失败", exc_info=True)
visible_tasks = []
hidden_tasks = []
abandoned_tasks = []
for row in rows:
task_id, member_id, task_type, status, abandon_reason = row
customer_name = member_name_map.get(member_id, "")
task_meta = TASK_TYPE_MAP.get(task_type, {"label": task_type or "", "class": "tag-default"})
if status == "abandoned":
abandoned_tasks.append({
"customer_name": customer_name,
"reason": abandon_reason or "",
})
else:
notes_list = task_notes_map.get(task_id, [])
item = {
"type_label": task_meta["label"],
"type_class": task_meta["class"],
"customer_name": customer_name,
"customer_id": member_id,
"note_count": len(notes_list),
"pinned": any(n.get("pinned") for n in notes_list),
"notes": notes_list if notes_list else None,
}
if status == "active":
visible_tasks.append(item)
else:
hidden_tasks.append(item)
return {
"visible_tasks": visible_tasks,
"hidden_tasks": hidden_tasks,
"abandoned_tasks": abandoned_tasks,
}
def _build_notes(coach_id: int, site_id: int, conn) -> list[dict]:
"""
构建助教相关备注列表(最多 20 条)。
查询 biz.notes 中与该助教任务关联的备注,按 created_at 倒序。
⚠️ DQ-6: 客户姓名通过 member_id JOIN v_dim_member。
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT n.id, n.content, n.created_at, n.ai_score,
n.type AS tag_label,
ct.member_id
FROM biz.notes n
LEFT JOIN biz.coach_tasks ct ON n.task_id = ct.id
WHERE ct.assistant_id = %s
ORDER BY n.created_at DESC
LIMIT 20
""",
(coach_id,),
)
rows = cur.fetchall()
if not rows:
return []
# 批量获取客户姓名DQ-6
member_ids = list({r[5] for r in rows if r[5]})
member_name_map: dict[int, str] = {}
if member_ids:
try:
info_map = fdw_queries.get_member_info(conn, site_id, member_ids)
for mid, info in info_map.items():
member_name_map[mid] = info.get("nickname", "")
except Exception:
logger.warning("查询备注客户姓名失败", exc_info=True)
result = []
for r in rows:
result.append({
"id": r[0],
"content": r[1] or "",
"timestamp": r[2].isoformat() if r[2] else "",
"ai_score": r[3],
"customer_name": member_name_map.get(r[5], ""),
"tag_label": r[4] or "",
"created_at": r[2].isoformat() if r[2] else "",
})
return result
# ── 6.5 历史月份统计T2-6──────────────────────────────
def _build_history_months(
coach_id: int, site_id: int, conn
) -> list[dict]:
"""
构建 historyMonths 模块。
1. fdw_queries.get_salary_calc_multi_months() → 最近 6 个月工时/工资
2. fdw_queries.get_monthly_customer_count() → 各月客户数
3. biz.coach_tasks → 各月回访/召回完成数
4. 本月 estimated=True历史月份 estimated=False
5. 格式化customers→"22人"hours→"87.5h"salary→"¥6,950"
"""
now = datetime.date.today()
# 生成最近 6 个月的月份列表(含本月)
months: list[str] = []
for i in range(6):
y = now.year
m = now.month - i
while m <= 0:
m += 12
y -= 1
months.append(f"{y}-{m:02d}-01")
# 批量查询绩效数据
salary_map = fdw_queries.get_salary_calc_multi_months(
conn, site_id, coach_id, months
)
# 批量查询各月客户数
customer_count_map = fdw_queries.get_monthly_customer_count(
conn, site_id, coach_id, months
)
# 查询各月回访/召回完成数
callback_map: dict[str, int] = {}
recall_map: dict[str, int] = {}
try:
six_months_ago = months[-1] # 最早的月份
with conn.cursor() as cur:
cur.execute(
"""
SELECT DATE_TRUNC('month', updated_at)::date AS month,
task_type,
COUNT(*) AS cnt
FROM biz.coach_tasks
WHERE assistant_id = %s
AND status = 'completed'
AND updated_at >= %s::date
GROUP BY DATE_TRUNC('month', updated_at)::date, task_type
""",
(coach_id, six_months_ago),
)
for row in cur.fetchall():
month_key = str(row[0])
task_type = row[1]
cnt = row[2] or 0
if task_type == "follow_up_visit":
callback_map[month_key] = callback_map.get(month_key, 0) + cnt
elif task_type in ("high_priority_recall", "priority_recall"):
recall_map[month_key] = recall_map.get(month_key, 0) + cnt
except Exception:
logger.warning("查询回访/召回完成数失败", exc_info=True)
# 构建结果
current_month_str = now.strftime("%Y-%m-01")
result = []
for i, month_str in enumerate(months):
salary = salary_map.get(month_str, {})
customers = customer_count_map.get(month_str, 0)
hours = salary.get("effective_hours", 0.0)
salary_amount = salary.get("gross_salary", 0.0)
callback_done = callback_map.get(month_str, 0)
recall_done = recall_map.get(month_str, 0)
# 月份标签
if i == 0:
month_label = "本月"
elif i == 1:
month_label = "上月"
else:
# 提取月份数字
m_num = int(month_str.split("-")[1])
month_label = f"{m_num}"
result.append({
"month": month_label,
"estimated": month_str == current_month_str,
"customers": f"{customers}",
"hours": f"{hours:.1f}h",
"salary": _format_currency(salary_amount),
"callback_done": callback_done,
"recall_done": recall_done,
})
return result

View File

@@ -0,0 +1,660 @@
# -*- coding: utf-8 -*-
"""
客户服务 —— CUST-1 客户详情、CUST-2 客户服务记录。
数据来源:
- ETL 直连fdw_queries会员信息、余额、消费、关系指数、服务记录
- 业务库biz.*AI 缓存、维客线索、备注、助教任务
⚠️ DWD-DOC 强制规则:
- 规则 1: 金额使用 items_sum 口径ledger_amount禁止 consume_money
- 规则 2: 助教费用使用 assistant_pd_money + assistant_cx_money禁止 service_fee
- DQ-6: 会员信息通过 member_id JOIN v_dim_member (scd2_is_current=1)
- DQ-7: 余额通过 member_id JOIN v_dim_member_card_account (scd2_is_current=1)
- 废单排除: is_delete = 0
"""
from __future__ import annotations
import json
import logging
from fastapi import HTTPException
from decimal import Decimal
from app.services import fdw_queries
from app.services.task_generator import compute_heart_icon
logger = logging.getLogger(__name__)
# ── 颜色/样式映射 ──────────────────────────────────────────
LEVEL_COLOR_MAP = {
"星级": "#FF6B6B",
"高级": "#FFA726",
"中级": "#42A5F5",
"初级": "#66BB6A",
}
TASK_TYPE_MAP = {
"follow_up_visit": {"label": "回访", "color": "#42A5F5", "bg_class": "bg-blue"},
"high_priority_recall": {"label": "紧急召回", "color": "#FF6B6B", "bg_class": "bg-red"},
"priority_recall": {"label": "优先召回", "color": "#FFA726", "bg_class": "bg-orange"},
}
LEVEL_BG_MAP = {
"星级": "bg-red",
"高级": "bg-orange",
"中级": "bg-blue",
"初级": "bg-green",
}
def _mask_phone(phone: str | None) -> str:
"""手机号脱敏139****5678 格式。"""
if not phone or len(phone) < 7:
return phone or ""
return f"{phone[:3]}****{phone[-4:]}"
def _get_biz_connection():
"""延迟导入业务库连接。"""
from app.database import get_connection
return get_connection()
# ── 3.1 核心函数 ──────────────────────────────────────────
async def get_customer_detail(customer_id: int, site_id: int) -> dict:
"""
客户详情CUST-1
核心字段查询失败 → 500扩展模块查询失败 → 空默认值(优雅降级)。
"""
conn = _get_biz_connection()
try:
# ── 核心字段(失败直接抛 500──
member_info_map = fdw_queries.get_member_info(conn, site_id, [customer_id])
if customer_id not in member_info_map:
raise HTTPException(status_code=404, detail="客户不存在")
info = member_info_map[customer_id]
phone_full = info.get("mobile") or ""
phone = _mask_phone(phone_full)
name = info.get("nickname") or ""
# Banner 字段:查询失败返回 null需求 1.7
balance = None
try:
balance_map = fdw_queries.get_member_balance(conn, site_id, [customer_id])
if customer_id in balance_map:
balance = float(balance_map[customer_id])
except Exception:
logger.warning("查询 balance 失败,降级为 null", exc_info=True)
consumption_60d = None
try:
val = fdw_queries.get_consumption_60d(conn, site_id, customer_id)
if val is not None:
consumption_60d = float(val)
except Exception:
logger.warning("查询 consumption_60d 失败,降级为 null", exc_info=True)
days_since_visit = None
try:
visit_map = fdw_queries.get_last_visit_days(conn, site_id, [customer_id])
if customer_id in visit_map:
days_since_visit = visit_map[customer_id]
except Exception:
logger.warning("查询 daysSinceVisit 失败,降级为 null", exc_info=True)
# ── 扩展模块(独立 try/except 优雅降级)──
try:
ai_insight = _build_ai_insight(customer_id, conn)
except Exception:
logger.warning("构建 aiInsight 失败,降级为空", exc_info=True)
ai_insight = {"summary": "", "strategies": []}
try:
retention_clues = _build_retention_clues(customer_id, conn)
except Exception:
logger.warning("构建 retentionClues 失败,降级为空列表", exc_info=True)
retention_clues = []
try:
notes = _build_notes(customer_id, conn)
except Exception:
logger.warning("构建 notes 失败,降级为空列表", exc_info=True)
notes = []
try:
consumption_records = _build_consumption_records(customer_id, site_id, conn)
except Exception:
logger.warning("构建 consumptionRecords 失败,降级为空列表", exc_info=True)
consumption_records = []
try:
coach_tasks = _build_coach_tasks(customer_id, site_id, conn)
except Exception:
logger.warning("构建 coachTasks 失败,降级为空列表", exc_info=True)
coach_tasks = []
try:
favorite_coaches = _build_favorite_coaches(customer_id, site_id, conn)
except Exception:
logger.warning("构建 favoriteCoaches 失败,降级为空列表", exc_info=True)
favorite_coaches = []
return {
"id": customer_id,
"name": name,
"phone": phone,
"phone_full": phone_full,
"avatar": "",
"member_level": "",
"relation_index": "",
"tags": [],
# Banner
"balance": balance,
"consumption_60d": consumption_60d,
"ideal_interval": None,
"days_since_visit": days_since_visit,
# 扩展模块
"ai_insight": ai_insight,
"coach_tasks": coach_tasks,
"favorite_coaches": favorite_coaches,
"retention_clues": retention_clues,
"consumption_records": consumption_records,
"notes": notes,
}
finally:
conn.close()
# ── 3.2 AI 洞察 / 维客线索 / 备注 ──────────────────────────
def _build_ai_insight(customer_id: int, conn) -> dict:
"""
构建 aiInsight 模块。
查询 biz.ai_cache WHERE cache_type='app4_analysis' AND target_id=customerId
解析 result_json JSON。无缓存时返回空默认值。
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT result_json
FROM biz.ai_cache
WHERE cache_type = 'app4_analysis'
AND target_id = %s
ORDER BY created_at DESC
LIMIT 1
""",
(str(customer_id),),
)
row = cur.fetchone()
if not row or not row[0]:
return {"summary": "", "strategies": []}
try:
data = json.loads(row[0]) if isinstance(row[0], str) else row[0]
except (json.JSONDecodeError, TypeError):
return {"summary": "", "strategies": []}
summary = data.get("summary", "")
strategies_raw = data.get("strategies", [])
strategies = []
for s in strategies_raw:
if isinstance(s, dict):
strategies.append({
"color": s.get("color", ""),
"text": s.get("text", ""),
})
return {"summary": summary, "strategies": strategies}
def _build_retention_clues(customer_id: int, conn) -> list[dict]:
"""
构建 retentionClues 模块。
查询 public.member_retention_clue按 created_at 倒序。
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT clue_type, clue_text
FROM public.member_retention_clue
WHERE member_id = %s
ORDER BY created_at DESC
""",
(customer_id,),
)
rows = cur.fetchall()
return [{"type": r[0] or "", "text": r[1] or ""} for r in rows]
def _build_notes(customer_id: int, conn) -> list[dict]:
"""
构建 notes 模块。
查询 biz.notes WHERE target_type='member',最多 20 条,按 created_at 倒序。
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, type, created_at, content
FROM biz.notes
WHERE target_type = 'member'
AND target_id = %s
ORDER BY created_at DESC
LIMIT 20
""",
(customer_id,),
)
rows = cur.fetchall()
return [
{
"id": r[0],
"tag_label": r[1] or "",
"created_at": r[2].isoformat() if r[2] else "",
"content": r[3] or "",
}
for r in rows
]
# ── 3.3 消费记录 ──────────────────────────────────────────
def _build_consumption_records(
customer_id: int, site_id: int, conn
) -> list[dict]:
"""
构建 consumptionRecords 模块。
调用 fdw_queries.get_consumption_records() 获取结算单列表。
⚠️ DWD-DOC 规则 1: totalAmount 使用 ledger_amountitems_sum 口径)。
⚠️ DWD-DOC 规则 2: coaches fee 使用 assistant_pd_money / assistant_cx_money。
⚠️ 废单排除: is_delete = 0正向交易: settle_type IN (1, 3)。
"""
raw_records = fdw_queries.get_consumption_records(
conn, site_id, customer_id, limit=50, offset=0
)
result = []
for rec in raw_records:
# 构建 coaches 子数组
coaches = []
pd_money = rec.get("assistant_pd_money", 0.0)
cx_money = rec.get("assistant_cx_money", 0.0)
if pd_money:
coaches.append({
"name": rec.get("assistant_name", ""),
"level": rec.get("level", ""),
"level_color": LEVEL_COLOR_MAP.get(rec.get("level", ""), ""),
"course_type": "基础课",
"hours": rec.get("service_hours", 0.0),
"perf_hours": None,
"fee": pd_money,
})
if cx_money:
coaches.append({
"name": rec.get("assistant_name", ""),
"level": rec.get("level", ""),
"level_color": LEVEL_COLOR_MAP.get(rec.get("level", ""), ""),
"course_type": "激励课",
"hours": 0.0,
"perf_hours": None,
"fee": cx_money,
})
settle_time = rec.get("settle_time")
date_str = settle_time.strftime("%Y-%m-%d") if settle_time else ""
start_str = rec.get("start_time")
end_str = rec.get("end_time")
result.append({
"id": rec.get("id", ""),
"type": "table",
"date": date_str,
"table_name": str(rec.get("table_id")) if rec.get("table_id") else None,
"start_time": start_str.isoformat() if start_str else None,
"end_time": end_str.isoformat() if end_str else None,
"duration": int(rec.get("service_hours", 0) * 60),
"table_fee": rec.get("table_charge_money", 0.0),
"table_orig_price": None,
"coaches": coaches,
"food_amount": rec.get("goods_money", 0.0),
"food_orig_price": None,
"total_amount": rec.get("total_amount", 0.0),
"total_orig_price": rec.get("total_amount", 0.0),
"pay_method": "",
"recharge_amount": None,
})
return result
# ── 3.4 关联助教任务T2-2──────────────────────────────
def _build_coach_tasks(
customer_id: int, site_id: int, conn
) -> list[dict]:
"""
构建 coachTasks 模块。
1. 查询 biz.coach_tasks WHERE member_id=customer_id
2. 对每位助教fdw_queries 获取等级、近 60 天统计
3. 映射 levelColor/taskColor/bgClass
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, assistant_id, task_type, status, updated_at
FROM biz.coach_tasks
WHERE member_id = %s
AND status IN ('active', 'inactive')
ORDER BY created_at DESC
""",
(customer_id,),
)
rows = cur.fetchall()
if not rows:
return []
# 收集所有助教 ID批量查询信息
assistant_ids = list({r[1] for r in rows if r[1]})
# 获取助教等级(通过 salary_calc
import datetime
now = datetime.date.today()
salary_map = {}
for aid in assistant_ids:
try:
sc = fdw_queries.get_salary_calc(conn, site_id, aid, now.year, now.month)
if sc:
salary_map[aid] = sc
except Exception:
logger.warning("查询助教 %s 绩效失败", aid, exc_info=True)
# 获取助教姓名
assistant_info_map = {}
for aid in assistant_ids:
try:
info = fdw_queries.get_assistant_info(conn, site_id, aid)
if info:
assistant_info_map[aid] = info
except Exception:
logger.warning("查询助教 %s 信息失败", aid, exc_info=True)
result = []
for row in rows:
task_id, assistant_id, task_type, status, updated_at = row
a_info = assistant_info_map.get(assistant_id, {})
sc = salary_map.get(assistant_id, {})
level = sc.get("coach_level", a_info.get("level", ""))
name = a_info.get("name", "")
task_meta = TASK_TYPE_MAP.get(task_type, {
"label": task_type or "",
"color": "#999",
"bg_class": "bg-gray",
})
# 近 60 天统计
try:
stats = fdw_queries.get_coach_60d_stats(
conn, site_id, assistant_id, customer_id
)
except Exception:
stats = {"service_count": 0, "total_hours": 0.0, "avg_hours": 0.0}
metrics = [
{"label": "服务次数", "value": str(stats["service_count"]), "color": None},
{"label": "总时长", "value": f"{stats['total_hours']:.1f}h", "color": None},
{"label": "次均时长", "value": f"{stats['avg_hours']:.1f}h", "color": None},
]
result.append({
"name": name,
"level": level,
"level_color": LEVEL_COLOR_MAP.get(level, ""),
"task_type": task_meta["label"],
"task_color": task_meta["color"],
"bg_class": LEVEL_BG_MAP.get(level, task_meta["bg_class"]),
"status": status,
"last_service": updated_at.isoformat() if updated_at else None,
"metrics": metrics,
})
return result
# ── 3.5 最亲密助教T2-3──────────────────────────────
def _build_favorite_coaches(
customer_id: int, site_id: int, conn
) -> list[dict]:
"""
构建 favoriteCoaches 模块。
1. fdw_queries.get_relation_index() → 关系指数列表已按降序排列rs_display 0-10 刻度)
2. emoji 四级映射P6 AC3>8.5→💖 / >7→🧡 / >5→💛 / ≤5→💙
3. stats 4 项指标
"""
relations = fdw_queries.get_relation_index(conn, site_id, customer_id)
if not relations:
return []
# 获取助教姓名
assistant_ids = [r["assistant_id"] for r in relations if r.get("assistant_id")]
assistant_info_map = {}
for aid in assistant_ids:
try:
info = fdw_queries.get_assistant_info(conn, site_id, aid)
if info:
assistant_info_map[aid] = info
except Exception:
logger.warning("查询助教 %s 信息失败", aid, exc_info=True)
result = []
for rel in relations:
ri = rel.get("relation_index", 0.0)
aid = rel.get("assistant_id")
a_info = assistant_info_map.get(aid, {})
# 4-level heart icon 映射P6 AC3rs_display 0-10 刻度)
emoji = compute_heart_icon(Decimal(str(ri)))
if ri > 8.5:
index_color, bg_class = "#FF6B6B", "bg-red"
elif ri > 7:
index_color, bg_class = "#FF8C00", "bg-orange"
elif ri > 5:
index_color, bg_class = "#FFA726", "bg-yellow"
else:
index_color, bg_class = "#5B9BD5", "bg-blue"
stats = [
{"label": "基础课时", "value": f"¥{rel.get('total_income', 0):.0f}", "color": None},
{"label": "激励课时", "value": "¥0", "color": None},
{"label": "上课次数", "value": str(rel.get("service_count", 0)), "color": None},
{"label": "总时长", "value": f"{rel.get('total_hours', 0):.1f}h", "color": None},
]
result.append({
"emoji": emoji,
"name": a_info.get("name", ""),
"relation_index": f"{ri:.2f}",
"index_color": index_color,
"bg_class": bg_class,
"stats": stats,
})
return result
# ── CUST-2 客户服务记录T2-4──────────────────────────
async def get_customer_records(
customer_id: int,
site_id: int,
year: int,
month: int,
table: str | None,
page: int,
page_size: int,
) -> dict:
"""
客户服务记录CUST-2
1. fdw_queries.get_member_info() → customerName/customerPhoneDQ-6
2. fdw_queries.get_customer_service_records() → 按月分页记录 + total_count
3. 聚合 monthCount/monthHours从 total_count 和记录工时)
4. fdw_queries.get_total_service_count() → totalServiceCount跨月
5. 构建 ServiceRecordItem 列表,含 recordType/isEstimate
6. hasMore = total_count > page * page_size
"""
conn = _get_biz_connection()
try:
# ── 客户基础信息DQ-6──
member_info_map = fdw_queries.get_member_info(conn, site_id, [customer_id])
if customer_id not in member_info_map:
raise HTTPException(status_code=404, detail="客户不存在")
info = member_info_map[customer_id]
phone_full = info.get("mobile") or ""
phone = _mask_phone(phone_full)
customer_name = info.get("nickname") or ""
# ── 按月分页服务记录 ──
offset = (page - 1) * page_size
records_raw, total_count = fdw_queries.get_customer_service_records(
conn, site_id, customer_id,
year, month, table,
limit=page_size, offset=offset,
)
# ── 月度统计汇总(从全量 total_count + 当页记录工时聚合)──
# monthCount = 当月总记录数不是当页monthHours = 当月总工时
# 需要单独查询当月汇总,因为分页记录只是子集
month_count, month_hours = _get_month_aggregation(
conn, site_id, customer_id, year, month, table
)
# ── 累计服务总次数(跨所有月份)──
total_service_count = fdw_queries.get_total_service_count(
conn, site_id, customer_id
)
# ── 构建记录列表 ──
records = []
for rec in records_raw:
create_time = rec.get("create_time")
date_str = create_time.strftime("%Y-%m-%d") if create_time else ""
start_time = rec.get("start_time")
end_time = rec.get("end_time")
# 时间范围格式化
time_range = None
if start_time and end_time:
time_range = f"{start_time.strftime('%H:%M')}-{end_time.strftime('%H:%M')}"
# recordType: 根据 course_type 判断
course_type = rec.get("course_type", "")
record_type = "recharge" if "充值" in course_type else "course"
# type / type_class 映射
if record_type == "recharge":
type_label = "充值"
type_class = "tag-recharge"
else:
type_label = course_type or "课程"
type_class = "tag-course"
records.append({
"id": str(rec.get("id", "")),
"date": date_str,
"time_range": time_range,
"table": str(rec.get("table_id")) if rec.get("table_id") else None,
"type": type_label,
"type_class": type_class,
"record_type": record_type,
"duration": rec.get("service_hours", 0.0),
"duration_raw": rec.get("service_hours_raw"),
"income": rec.get("income", 0.0),
"is_estimate": rec.get("is_estimate", False),
"drinks": None,
})
has_more = total_count > page * page_size
return {
"customer_name": customer_name,
"customer_phone": phone,
"customer_phone_full": phone_full,
"relation_index": "",
"tables": [],
"total_service_count": total_service_count,
"month_count": month_count,
"month_hours": round(month_hours, 2),
"records": records,
"has_more": has_more,
}
finally:
conn.close()
def _get_month_aggregation(
conn, site_id: int, customer_id: int,
year: int, month: int, table: str | None,
) -> tuple[int, float]:
"""
查询当月汇总统计monthCount + monthHours
复用 fdw_queries 的 _fdw_context 直连 ETL 库。
⚠️ 废单排除: is_delete = 0。
"""
start_date = f"{year}-{month:02d}-01"
if month == 12:
end_date = f"{year + 1}-01-01"
else:
end_date = f"{year}-{month + 1:02d}-01"
base_where = """
tenant_member_id = %s
AND is_delete = 0
AND create_time >= %s::timestamptz
AND create_time < %s::timestamptz
"""
params: list = [customer_id, start_date, end_date]
if table:
base_where += " AND site_table_id::text = %s"
params.append(table)
with fdw_queries._fdw_context(conn, site_id) as cur:
cur.execute(
f"""
SELECT COUNT(*) AS month_count,
COALESCE(SUM(income_seconds / 3600.0), 0) AS month_hours
FROM app.v_dwd_assistant_service_log
WHERE {base_where}
""",
params,
)
row = cur.fetchone()
if not row:
return 0, 0.0
return row[0] or 0, float(row[1]) if row[1] is not None else 0.0

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,519 @@
"""
绩效服务
负责绩效概览PERF-1和绩效明细PERF-2的业务逻辑。
所有 FDW 查询通过 fdw_queries 模块执行,本模块不直接操作 SQL。
RNS1.1 组件 5。
"""
from __future__ import annotations
import logging
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
from fastapi import HTTPException
from app.services import fdw_queries
from app.services.task_manager import (
_get_assistant_id,
compute_income_trend,
map_course_type_class,
)
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免纯函数测试时触发模块级导入失败。"""
from app.database import get_connection
return get_connection()
# ---------------------------------------------------------------------------
# 纯函数:可被属性测试直接调用
# ---------------------------------------------------------------------------
# 头像颜色预定义集合
_AVATAR_COLORS = [
"#0052d9", "#e34d59", "#00a870", "#ed7b2f",
"#0594fa", "#a25eb5", "#f6c244", "#2ba471",
]
def avatar_char_color(name: str) -> tuple[str, str]:
"""从客户姓名计算 avatarChar 和 avatarColor。"""
if not name:
return ("?", _AVATAR_COLORS[0])
char = name[0]
color = _AVATAR_COLORS[ord(char) % len(_AVATAR_COLORS)]
return (char, color)
def format_income_desc(rate: float, hours: float) -> str:
"""
格式化收入明细描述。
格式: "{rate}元/h × {hours}h"
"""
# 去除不必要的尾零
rate_str = f"{rate:g}"
hours_str = f"{hours:g}"
return f"{rate_str}元/h × {hours_str}h"
def group_records_by_date(
records: list[dict], *, include_avatar: bool = False
) -> list[dict]:
"""
将服务记录按日期分组为 DateGroup 结构。
参数:
records: 服务记录列表(已按 settle_time DESC 排序)
include_avatar: 是否包含 avatarChar/avatarColorPERF-1 需要PERF-2 不需要)
返回按日期倒序排列的 DateGroup 列表。
"""
groups: dict[str, list[dict]] = defaultdict(list)
for rec in records:
settle_time = rec.get("settle_time")
if settle_time is None:
continue
# 提取日期字符串
if hasattr(settle_time, "strftime"):
date_key = settle_time.strftime("%Y-%m-%d")
else:
date_key = str(settle_time)[:10]
# 时间范围
start_time = rec.get("start_time")
end_time = rec.get("end_time")
time_range = _format_time_range(start_time, end_time)
raw_course_type = rec.get("course_type", "")
type_class = map_course_type_class(raw_course_type)
customer_name = rec.get("customer_name") or "未知客户"
record_item: dict = {
"customer_name": customer_name,
"time_range": time_range,
"hours": f"{rec.get('service_hours', 0.0):g}",
"course_type": raw_course_type or "基础课",
"course_type_class": type_class,
"location": rec.get("table_name") or "",
"income": f"{rec.get('income', 0.0):.2f}",
}
if include_avatar:
char, color = avatar_char_color(customer_name)
record_item["avatar_char"] = char
record_item["avatar_color"] = color
groups[date_key].append(record_item)
# 按日期倒序排列
sorted_dates = sorted(groups.keys(), reverse=True)
result = []
for date_key in sorted_dates:
recs = groups[date_key]
total_hours = sum(float(r["hours"]) for r in recs)
total_income = sum(float(r["income"]) for r in recs)
result.append({
"date": date_key,
"total_hours": f"{total_hours:g}",
"total_income": f"{total_income:.2f}",
"records": recs,
})
return result
def paginate_records(
records: list[dict], page: int, page_size: int
) -> tuple[list[dict], bool]:
"""
对记录列表进行分页。
返回 (当前页记录, has_more)。
"""
total = len(records)
start = (page - 1) * page_size
end = start + page_size
page_records = records[start:end]
has_more = total > page * page_size
return page_records, has_more
def compute_summary(records: list[dict]) -> dict:
"""
计算月度汇总。
返回 { total_count, total_hours, total_hours_raw, total_income }。
"""
total_count = len(records)
total_hours = sum(r.get("service_hours", 0.0) for r in records)
total_hours_raw = sum(r.get("service_hours_raw", 0.0) for r in records)
total_income = sum(r.get("income", 0.0) for r in records)
return {
"total_count": total_count,
"total_hours": round(total_hours, 2),
"total_hours_raw": round(total_hours_raw, 2),
"total_income": round(total_income, 2),
}
def _format_time_range(start_time, end_time) -> str:
"""格式化时间范围为 "HH:MM-HH:MM" 格式。"""
parts = []
for t in (start_time, end_time):
if t is None:
parts.append("--:--")
elif hasattr(t, "strftime"):
parts.append(t.strftime("%H:%M"))
else:
s = str(t)
# 尝试提取 HH:MM
if len(s) >= 16:
parts.append(s[11:16])
else:
parts.append(str(t))
return f"{parts[0]}-{parts[1]}"
def _format_date_label(dt) -> str:
"""格式化日期为 "M月D日" 格式。"""
if dt is None:
return ""
if hasattr(dt, "strftime"):
return f"{dt.month}{dt.day}"
s = str(dt)[:10]
try:
d = datetime.strptime(s, "%Y-%m-%d")
return f"{d.month}{d.day}"
except (ValueError, TypeError):
return s
# ---------------------------------------------------------------------------
# PERF-1: 绩效概览
# ---------------------------------------------------------------------------
async def get_overview(
user_id: int, site_id: int, year: int, month: int
) -> dict:
"""
绩效概览PERF-1
1. 获取 assistant_id
2. fdw_queries.get_salary_calc() → 档位/收入/费率
3. fdw_queries.get_service_records() → 按日期分组为 DateGroup含 avatarChar/avatarColor
4. 聚合新客/常客列表
5. 计算 incomeItems含 desc 费率描述)
6. 查询上月收入 lastMonthIncome
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# ── 1. 当月绩效数据 ──
salary = fdw_queries.get_salary_calc(conn, site_id, assistant_id, year, month)
# ── 2. 上月绩效数据(用于 lastMonthIncome ──
prev_year, prev_month = (year, month - 1) if month > 1 else (year - 1, 12)
prev_salary = None
try:
prev_salary = fdw_queries.get_salary_calc(
conn, site_id, assistant_id, prev_year, prev_month
)
except Exception:
logger.warning("FDW 查询上月绩效失败", exc_info=True)
last_month_income = prev_salary["total_income"] if prev_salary else 0.0
# ── 3. 服务记录(全量,用于 DateGroup + 新客/常客) ──
# 获取全部记录(不分页)
all_records = fdw_queries.get_service_records(
conn, site_id, assistant_id, year, month,
limit=10000, offset=0,
)
# 按日期分组(含 avatar
date_groups = group_records_by_date(all_records, include_avatar=True)
# ── 4. 新客/常客列表 ──
new_customers, regular_customers = _build_customer_lists(
conn, site_id, assistant_id, year, month, all_records
)
# ── 5. 构建响应 ──
# 助教信息(从 salary 或默认值)
coach_name = ""
coach_role = ""
store_name = ""
# ⚠️ auth 表结构: users(nickname), user_assistant_binding(binding_type)
# auth.sites 不存在; users 无 display_name; uab 无 role_label
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT u.nickname, uab.binding_type
FROM auth.user_assistant_binding uab
JOIN auth.users u ON uab.user_id = u.id
WHERE uab.assistant_id = %s AND uab.site_id = %s
LIMIT 1
""",
(assistant_id, site_id),
)
row = cur.fetchone()
if row:
coach_name = row[0] or ""
coach_role = row[1] or ""
conn.commit()
except Exception:
logger.warning("查询助教信息失败", exc_info=True)
current_income = salary["total_income"] if salary else 0.0
basic_rate = salary["basic_rate"] if salary else 0.0
incentive_rate = salary["incentive_rate"] if salary else 0.0
basic_hours = salary["basic_hours"] if salary else 0.0
bonus_hours = salary["bonus_hours"] if salary else 0.0
pd_money = salary["assistant_pd_money_total"] if salary else 0.0
cx_money = salary["assistant_cx_money_total"] if salary else 0.0
# 收入明细项
income_items = _build_income_items(
basic_rate, incentive_rate, basic_hours, bonus_hours,
pd_money, cx_money,
)
# 档位信息
next_basic_rate = salary["next_tier_basic_rate"] if salary else 0.0
next_incentive_rate = salary["next_tier_incentive_rate"] if salary else 0.0
upgrade_hours = salary["next_tier_hours"] if salary else 0.0
total_hours = salary["total_hours"] if salary else 0.0
upgrade_hours_needed = max(0.0, upgrade_hours - total_hours)
tier_completed = salary["tier_completed"] if salary else False
upgrade_bonus = 0.0 if tier_completed else (salary["bonus_money"] if salary else 0.0)
return {
"coach_name": coach_name,
"coach_role": coach_role,
"store_name": store_name,
"monthly_income": f"¥{current_income:,.0f}",
"last_month_income": f"¥{last_month_income:,.0f}",
"current_tier": {
"basic_rate": basic_rate,
"incentive_rate": incentive_rate,
},
"next_tier": {
"basic_rate": next_basic_rate,
"incentive_rate": next_incentive_rate,
},
"upgrade_hours_needed": round(upgrade_hours_needed, 2),
"upgrade_bonus": upgrade_bonus,
"income_items": income_items,
"monthly_total": f"¥{current_income:,.2f}",
"this_month_records": date_groups,
"new_customers": new_customers,
"regular_customers": regular_customers,
}
finally:
conn.close()
def _build_income_items(
basic_rate: float,
incentive_rate: float,
basic_hours: float,
bonus_hours: float,
pd_money: float,
cx_money: float,
) -> list[dict]:
"""构建收入明细项列表。"""
items = []
# 基础课收入
if basic_hours > 0 or pd_money > 0:
items.append({
"icon": "💰",
"label": "基础课收入",
"desc": format_income_desc(basic_rate, basic_hours),
"value": f"¥{pd_money:,.2f}",
})
# 激励课收入
if bonus_hours > 0 or cx_money > 0:
items.append({
"icon": "🎯",
"label": "激励课收入",
"desc": format_income_desc(incentive_rate, bonus_hours),
"value": f"¥{cx_money:,.2f}",
})
return items
def _build_customer_lists(
conn,
site_id: int,
assistant_id: int,
year: int,
month: int,
all_records: list[dict],
) -> tuple[list[dict], list[dict]]:
"""
构建新客和常客列表。
新客: 本月有服务记录但本月之前无记录的客户
常客: 本月服务次数 ≥ 2 的客户
"""
if not all_records:
return [], []
# 按 member_id 聚合本月记录
member_stats: dict[int, dict] = {}
for rec in all_records:
mid = rec.get("member_id")
if mid is None:
continue
if mid not in member_stats:
member_stats[mid] = {
"customer_name": rec.get("customer_name") or "未知客户",
"count": 0,
"total_hours": 0.0,
"total_income": 0.0,
"last_service": rec.get("settle_time"),
}
stats = member_stats[mid]
stats["count"] += 1
stats["total_hours"] += rec.get("service_hours", 0.0)
stats["total_income"] += rec.get("income", 0.0)
# 更新最后服务时间(记录已按 settle_time DESC 排序,第一条即最新)
if stats["last_service"] is None:
stats["last_service"] = rec.get("settle_time")
member_ids = list(member_stats.keys())
# 查询历史记录(本月之前是否有服务记录)
# ⚠️ 直连 ETL 库查询 app.v_dwd_assistant_service_log RLS 视图
# 列名映射: assistant_id → site_assistant_id, member_id → tenant_member_id,
# is_trash → is_delete (int, 0=正常), settle_time → create_time
historical_members: set[int] = set()
try:
start_date = f"{year}-{month:02d}-01"
with fdw_queries._fdw_context(conn, site_id) as cur:
cur.execute(
"""
SELECT DISTINCT tenant_member_id
FROM app.v_dwd_assistant_service_log
WHERE site_assistant_id = %s
AND is_delete = 0
AND create_time < %s::timestamptz
AND tenant_member_id = ANY(%s)
""",
(assistant_id, start_date, member_ids),
)
for row in cur.fetchall():
historical_members.add(row[0])
except Exception:
logger.warning("查询历史客户记录失败", exc_info=True)
new_customers = []
regular_customers = []
for mid, stats in member_stats.items():
name = stats["customer_name"]
char, color = avatar_char_color(name)
# 新客:历史无记录
if mid not in historical_members:
last_service_dt = stats["last_service"]
new_customers.append({
"name": name,
"avatar_char": char,
"avatar_color": color,
"last_service": _format_date_label(last_service_dt),
"count": stats["count"],
})
# 常客:本月 ≥ 2 次
if stats["count"] >= 2:
regular_customers.append({
"name": name,
"avatar_char": char,
"avatar_color": color,
"hours": round(stats["total_hours"], 2),
"income": f"¥{stats['total_income']:,.2f}",
"count": stats["count"],
})
# 新客按最后服务时间倒序
new_customers.sort(key=lambda x: x.get("last_service", ""), reverse=True)
# 常客按收入倒序
regular_customers.sort(
key=lambda x: float(x.get("income", "¥0").replace("¥", "").replace(",", "")),
reverse=True,
)
return new_customers, regular_customers
# ---------------------------------------------------------------------------
# PERF-2: 绩效明细
# ---------------------------------------------------------------------------
async def get_records(
user_id: int, site_id: int,
year: int, month: int, page: int, page_size: int,
) -> dict:
"""
绩效明细PERF-2
1. 获取 assistant_id
2. fdw_queries.get_service_records() 带分页
3. 按日期分组为 dateGroups不含 avatarChar/avatarColor
4. 计算 summary 汇总
5. 返回 { summary, dateGroups, hasMore }
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# 先获取全量记录用于 summary 计算
all_records = fdw_queries.get_service_records(
conn, site_id, assistant_id, year, month,
limit=100000, offset=0,
)
# 计算月度汇总
summary = compute_summary(all_records)
# 分页获取记录
offset = (page - 1) * page_size
page_records = fdw_queries.get_service_records(
conn, site_id, assistant_id, year, month,
limit=page_size, offset=offset,
)
# 判断 hasMore
has_more = len(all_records) > page * page_size
# 按日期分组(不含 avatar
date_groups = group_records_by_date(page_records, include_avatar=False)
return {
"summary": summary,
"date_groups": date_groups,
"has_more": has_more,
}
finally:
conn.close()

View File

@@ -3,14 +3,18 @@
负责任务 CRUD、置顶、放弃、取消放弃等操作。
通过 FDW 读取客户信息和 RS 指数,计算爱心 icon 档位。
RNS1.1 扩展get_task_list_v2TASK-1、get_task_detailTASK-2
"""
import json
import logging
from datetime import datetime
from decimal import Decimal
from fastapi import HTTPException
from app.services import fdw_queries
from app.services.task_generator import compute_heart_icon
logger = logging.getLogger(__name__)
@@ -399,3 +403,638 @@ async def cancel_abandon(task_id: int, user_id: int, site_id: int) -> dict:
finally:
conn.close()
# ---------------------------------------------------------------------------
# RNS1.1 扩展:辅助常量与工具函数
# ---------------------------------------------------------------------------
# 任务类型 → 中文标签
_TASK_TYPE_LABEL_MAP: dict[str, str] = {
"high_priority_recall": "高优先召回",
"priority_recall": "优先召回",
"follow_up_visit": "客户回访",
"relationship_building": "关系构建",
}
# 课程类型 → courseTypeClass 枚举映射design.md 定义)
_COURSE_TYPE_CLASS_MAP: dict[str, str] = {
"basic": "basic",
"陪打": "basic",
"基础课": "basic",
"vip": "vip",
"包厢": "vip",
"包厢课": "vip",
"tip": "tip",
"超休": "tip",
"激励课": "tip",
"recharge": "recharge",
"充值": "recharge",
"incentive": "incentive",
"激励": "incentive",
}
# 维客线索 category → tag_color 映射
_CATEGORY_COLOR_MAP: dict[str, str] = {
"客户基础": "#0052d9",
"客户基础信息": "#0052d9",
"消费习惯": "#e34d59",
"玩法偏好": "#00a870",
"促销偏好": "#ed7b2f",
"促销接受": "#ed7b2f",
"社交关系": "#0594fa",
"重要反馈": "#a25eb5",
}
def map_course_type_class(raw_course_type: str | None) -> str:
"""将原始课程类型映射为统一枚举值(不带 tag- 前缀)。"""
if not raw_course_type:
return "basic"
return _COURSE_TYPE_CLASS_MAP.get(raw_course_type.strip(), "basic")
def compute_income_trend(current_income: float, prev_income: float) -> tuple[str, str]:
"""
计算收入趋势。
返回 (income_trend, income_trend_dir)。
如 (1000, 800) → ("↑200", "up")
"""
diff = current_income - prev_income
direction = "up" if diff >= 0 else "down"
arrow = "" if diff >= 0 else ""
trend = f"{arrow}{abs(diff):.0f}"
return trend, direction
def sanitize_tag(raw_tag: str | None) -> str:
"""去除 tag 中的换行符,多行标签使用空格分隔。"""
if not raw_tag:
return ""
return raw_tag.replace("\n", " ").strip()
def _extract_emoji_and_text(summary: str | None) -> tuple[str, str]:
"""
从 summary 中提取 emoji 前缀和正文。
AI 写入格式: "📅 偏好周末下午时段消费" → ("📅", "偏好周末下午时段消费")
手动写入无 emoji: "喜欢打中式" → ("", "喜欢打中式")
"""
if not summary:
return "", ""
# 检查第一个字符是否为 emoji非 ASCII 且非中文常用范围)
first_char = summary[0]
if ord(first_char) > 0x2600 and summary[1:2] == " ":
return first_char, summary[2:].strip()
return "", summary.strip()
def _format_time(dt: datetime | None) -> str | None:
"""格式化时间为 ISO 字符串。"""
if dt is None:
return None
return dt.isoformat() if hasattr(dt, "isoformat") else str(dt)
# ---------------------------------------------------------------------------
# RNS1.1get_task_list_v2TASK-1 扩展版任务列表)
# ---------------------------------------------------------------------------
async def get_task_list_v2(
user_id: int,
site_id: int,
status: str,
page: int,
page_size: int,
) -> dict:
"""
扩展版任务列表TASK-1
返回 { items, total, page, pageSize, performance }。
逻辑:
1. _get_assistant_id() 获取 assistant_id
2. 查询 coach_tasks 带分页LIMIT/OFFSET + COUNT(*)
3. fdw_queries 批量获取会员信息、余额、lastVisitDays
4. fdw_queries.get_salary_calc() 获取绩效概览
5. 查询 ai_cache 获取 aiSuggestion
6. 组装 TaskListResponse
扩展字段lastVisitDays/balance/aiSuggestion采用优雅降级。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# ── 1. 查询任务列表(带分页 + 总数) ──
# 状态映射:前端 pending → active
db_status = "active" if status == "pending" else status
with conn.cursor() as cur:
# 总数
cur.execute(
"""
SELECT COUNT(*)
FROM biz.coach_tasks
WHERE site_id = %s AND assistant_id = %s AND status = %s
""",
(site_id, assistant_id, db_status),
)
total = cur.fetchone()[0]
# 分页查询
offset = (page - 1) * page_size
cur.execute(
"""
SELECT id, task_type, status, priority_score, is_pinned,
expires_at, created_at, member_id, abandon_reason
FROM biz.coach_tasks
WHERE site_id = %s AND assistant_id = %s AND status = %s
ORDER BY is_pinned DESC,
priority_score DESC NULLS LAST,
created_at ASC
LIMIT %s OFFSET %s
""",
(site_id, assistant_id, db_status, page_size, offset),
)
tasks = cur.fetchall()
conn.commit()
if not tasks:
# 即使无任务也需要返回绩效概览
performance = _build_performance_summary(conn, site_id, assistant_id)
return {
"items": [],
"total": 0,
"page": page,
"page_size": page_size,
"performance": performance,
}
member_ids = list({t[7] for t in tasks})
# ── 2. FDW 批量查询会员信息 ──
member_info_map: dict[int, dict] = {}
try:
member_info_map = fdw_queries.get_member_info(conn, site_id, member_ids)
except Exception:
logger.warning("FDW 查询会员信息失败", exc_info=True)
# ── 3. FDW 批量查询余额(优雅降级) ──
balance_map: dict[int, Decimal] = {}
try:
balance_map = fdw_queries.get_member_balance(conn, site_id, member_ids)
except Exception:
logger.warning("FDW 查询余额失败", exc_info=True)
# ── 4. FDW 批量查询 lastVisitDays优雅降级 ──
last_visit_map: dict[int, int | None] = {}
try:
last_visit_map = fdw_queries.get_last_visit_days(conn, site_id, member_ids)
except Exception:
logger.warning("FDW 查询 lastVisitDays 失败", exc_info=True)
# ── 5. RS 指数(用于 heart_score ──
rs_map: dict[int, Decimal] = {}
try:
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
cur.execute(
"""
SELECT member_id, COALESCE(rs_display, 0)
FROM fdw_etl.v_dws_member_assistant_relation_index
WHERE assistant_id = %s AND member_id = ANY(%s)
""",
(assistant_id, member_ids),
)
for row in cur.fetchall():
rs_map[row[0]] = Decimal(str(row[1]))
conn.commit()
except Exception:
logger.warning("FDW 查询 RS 指数失败", exc_info=True)
try:
conn.rollback()
except Exception:
pass
# ── 6. 查询 ai_cache 获取 aiSuggestion优雅降级 ──
ai_suggestion_map: dict[int, str] = {}
try:
member_id_strs = [str(mid) for mid in member_ids]
with conn.cursor() as cur:
cur.execute(
"""
SELECT target_id, result_json
FROM biz.ai_cache
WHERE site_id = %s
AND target_id = ANY(%s)
AND cache_type = 'app4_analysis'
ORDER BY created_at DESC
""",
(site_id, member_id_strs),
)
seen: set[str] = set()
for row in cur.fetchall():
target_id_str = str(row[0])
if target_id_str not in seen:
seen.add(target_id_str)
result = row[1] if isinstance(row[1], dict) else {}
summary = result.get("summary", "")
if summary:
ai_suggestion_map[int(target_id_str)] = summary
conn.commit()
except Exception:
logger.warning("查询 ai_cache aiSuggestion 失败", exc_info=True)
# ── 7. 查询备注存在性has_note ──
task_ids = [t[0] for t in tasks]
has_note_set: set[int] = set()
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT DISTINCT task_id
FROM biz.notes
WHERE task_id = ANY(%s)
""",
(task_ids,),
)
for row in cur.fetchall():
has_note_set.add(row[0])
conn.commit()
except Exception:
logger.warning("查询备注存在性失败", exc_info=True)
# ── 8. 绩效概览 ──
performance = _build_performance_summary(conn, site_id, assistant_id)
# ── 9. 组装 items ──
items = []
for task_row in tasks:
(task_id, task_type, task_status, priority_score,
is_pinned, expires_at, created_at, member_id, abandon_reason) = task_row
info = member_info_map.get(member_id, {})
customer_name = info.get("nickname") or info.get("member_name") or "未知客户"
rs_score = rs_map.get(member_id, Decimal("0"))
balance = balance_map.get(member_id)
items.append({
"id": task_id,
"customer_name": customer_name,
"customer_avatar": "/assets/images/avatar-default.png",
"task_type": task_type,
"task_type_label": _TASK_TYPE_LABEL_MAP.get(task_type, task_type),
"deadline": _format_time(expires_at),
"heart_score": float(rs_score),
"hobbies": [], # 暂无数据源,返回空数组
"is_pinned": bool(is_pinned),
"has_note": task_id in has_note_set,
"status": task_status,
"last_visit_days": last_visit_map.get(member_id),
"balance": float(balance) if balance is not None else None,
"ai_suggestion": ai_suggestion_map.get(member_id),
})
return {
"items": items,
"total": total,
"page": page,
"page_size": page_size,
"performance": performance,
}
finally:
conn.close()
def _build_performance_summary(conn, site_id: int, assistant_id: int) -> dict:
"""
构建绩效概览PerformanceSummary
从 fdw_queries.get_salary_calc 获取当月和上月数据,
计算收入趋势。
"""
now = datetime.now()
year, month = now.year, now.month
# 当月绩效
salary = None
try:
salary = fdw_queries.get_salary_calc(conn, site_id, assistant_id, year, month)
except Exception:
logger.warning("FDW 查询当月绩效失败", exc_info=True)
# 上月绩效(用于收入趋势)
prev_year, prev_month = (year, month - 1) if month > 1 else (year - 1, 12)
prev_salary = None
try:
prev_salary = fdw_queries.get_salary_calc(conn, site_id, assistant_id, prev_year, prev_month)
except Exception:
logger.warning("FDW 查询上月绩效失败", exc_info=True)
current_income = salary["total_income"] if salary else 0.0
prev_income = prev_salary["total_income"] if prev_salary else 0.0
income_trend, income_trend_dir = compute_income_trend(current_income, prev_income)
tier_nodes = salary["tier_nodes"] if salary and salary.get("tier_nodes") else [0]
# tier_nodes 可能是 JSON 字符串或列表
if isinstance(tier_nodes, str):
try:
tier_nodes = json.loads(tier_nodes)
except (json.JSONDecodeError, TypeError):
tier_nodes = [0]
return {
"total_hours": salary["total_hours"] if salary else 0.0,
"total_income": current_income,
"total_customers": salary["total_customers"] if salary else 0,
"month_label": f"{month}",
"tier_nodes": [float(n) for n in tier_nodes] if tier_nodes else [0],
"basic_hours": salary["basic_hours"] if salary else 0.0,
"bonus_hours": salary["bonus_hours"] if salary else 0.0,
"current_tier": salary["tier_index"] if salary else 0,
"next_tier_hours": salary["next_tier_hours"] if salary else 0.0,
"tier_completed": salary["tier_completed"] if salary else False,
"bonus_money": 0.0 if (salary and salary.get("tier_completed")) else (salary["bonus_money"] if salary else 0.0),
"income_trend": income_trend,
"income_trend_dir": income_trend_dir,
"prev_month": f"{prev_month}",
"current_tier_label": salary["coach_level"] if salary else "",
}
# ---------------------------------------------------------------------------
# RNS1.1get_task_detailTASK-2 任务详情完整版)
# ---------------------------------------------------------------------------
async def get_task_detail(
task_id: int,
user_id: int,
site_id: int,
) -> dict:
"""
任务详情完整版TASK-2
返回基础信息 + retentionClues + talkingPoints + serviceSummary
+ serviceRecords + aiAnalysis + notes + customerId。
权限校验:任务不存在 → 404不属于当前助教 → 403。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# ── 1. 查询任务基础信息 ──
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, task_type, status, priority_score, is_pinned,
expires_at, created_at, member_id, abandon_reason,
assistant_id, site_id
FROM biz.coach_tasks
WHERE id = %s
""",
(task_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="任务不存在")
task_assistant_id = row[9]
task_site_id = row[10]
if task_site_id != site_id or task_assistant_id != assistant_id:
raise HTTPException(status_code=403, detail="无权访问该任务")
member_id = row[7]
task_type = row[1]
task_status = row[2]
is_pinned = row[4]
expires_at = row[5]
# ── 2. FDW 查询会员信息 ──
member_info_map: dict[int, dict] = {}
try:
member_info_map = fdw_queries.get_member_info(conn, site_id, [member_id])
except Exception:
logger.warning("FDW 查询会员信息失败", exc_info=True)
info = member_info_map.get(member_id, {})
customer_name = info.get("nickname") or "未知客户"
# RS 指数
rs_score = Decimal("0")
try:
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
cur.execute(
"""
SELECT COALESCE(rs_display, 0)
FROM fdw_etl.v_dws_member_assistant_relation_index
WHERE assistant_id = %s AND member_id = %s
""",
(assistant_id, member_id),
)
rs_row = cur.fetchone()
if rs_row:
rs_score = Decimal(str(rs_row[0]))
conn.commit()
except Exception:
logger.warning("FDW 查询 RS 指数失败", exc_info=True)
try:
conn.rollback()
except Exception:
pass
# ── 3. 查询维客线索 ──
retention_clues = []
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, category, summary, detail, source
FROM public.member_retention_clue
WHERE member_id = %s AND site_id = %s
ORDER BY recorded_at DESC
""",
(member_id, site_id),
)
for clue_row in cur.fetchall():
category = clue_row[1] or ""
summary_raw = clue_row[2] or ""
detail = clue_row[3]
source = clue_row[4] or "manual"
emoji, text = _extract_emoji_and_text(summary_raw)
tag = sanitize_tag(category)
tag_color = _CATEGORY_COLOR_MAP.get(tag, "#999999")
retention_clues.append({
"tag": tag,
"tag_color": tag_color,
"emoji": emoji,
"text": text,
"source": source,
"desc": detail,
})
conn.commit()
except Exception:
logger.warning("查询维客线索失败", exc_info=True)
# ── 4. 查询 AI 缓存talkingPoints + aiAnalysis ──
talking_points: list[str] = []
ai_analysis = {"summary": "", "suggestions": []}
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT cache_type, result_json
FROM biz.ai_cache
WHERE target_id = %s AND site_id = %s
AND cache_type IN ('app4_analysis', 'app5_talking_points')
ORDER BY created_at DESC
""",
(str(member_id), site_id),
)
seen_types: set[str] = set()
for cache_row in cur.fetchall():
cache_type = cache_row[0]
if cache_type in seen_types:
continue
seen_types.add(cache_type)
result = cache_row[1] if isinstance(cache_row[1], dict) else {}
if cache_type == "app5_talking_points":
# talkingPoints: 话术列表
points = result.get("talking_points", [])
if isinstance(points, list):
talking_points = [str(p) for p in points]
elif cache_type == "app4_analysis":
# aiAnalysis: summary + suggestions
ai_analysis = {
"summary": result.get("summary", ""),
"suggestions": result.get("suggestions", []),
}
conn.commit()
except Exception:
logger.warning("查询 AI 缓存失败", exc_info=True)
# ── 5. FDW 查询服务记录(最多 20 条) ──
service_records_raw: list[dict] = []
try:
service_records_raw = fdw_queries.get_service_records_for_task(
conn, site_id, assistant_id, member_id, limit=20
)
except Exception:
logger.warning("FDW 查询服务记录失败", exc_info=True)
service_records = []
total_hours = 0.0
total_income = 0.0
for rec in service_records_raw:
hours = rec.get("service_hours", 0.0)
income = rec.get("income", 0.0)
total_hours += hours
total_income += income
# 时间格式化
settle_time = rec.get("settle_time")
date_str = ""
if settle_time:
if hasattr(settle_time, "strftime"):
date_str = settle_time.strftime("%Y-%m-%d")
else:
date_str = str(settle_time)[:10]
raw_course_type = rec.get("course_type", "")
type_class = map_course_type_class(raw_course_type)
service_records.append({
"table": rec.get("table_name"),
"type": raw_course_type or "基础课",
"type_class": type_class,
"record_type": "recharge" if type_class == "recharge" else "course",
"duration": hours,
"duration_raw": rec.get("service_hours_raw"),
"income": income,
"is_estimate": rec.get("is_estimate"),
"drinks": None,
"date": date_str,
})
avg_income = total_income / len(service_records) if service_records else 0.0
service_summary = {
"total_hours": round(total_hours, 2),
"total_income": round(total_income, 2),
"avg_income": round(avg_income, 2),
}
# ── 6. 查询备注(最多 20 条) ──
notes: list[dict] = []
has_note = False
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, content, type, ai_score, created_at
FROM biz.notes
WHERE task_id = %s
ORDER BY created_at DESC
LIMIT 20
""",
(task_id,),
)
for note_row in cur.fetchall():
has_note = True
note_type = note_row[2] or "normal"
# type → tag_type/tag_label 映射
tag_label = "回访" if note_type == "follow_up" else "普通"
notes.append({
"id": note_row[0],
"content": note_row[1] or "",
"tag_type": note_type,
"tag_label": tag_label,
"created_at": _format_time(note_row[4]) or "",
"score": note_row[3],
})
conn.commit()
except Exception:
logger.warning("查询备注失败", exc_info=True)
# ── 7. 组装 TaskDetailResponse ──
return {
"id": task_id,
"customer_name": customer_name,
"customer_avatar": "/assets/images/avatar-default.png",
"task_type": task_type,
"task_type_label": _TASK_TYPE_LABEL_MAP.get(task_type, task_type),
"deadline": _format_time(expires_at),
"heart_score": float(rs_score),
"hobbies": [],
"is_pinned": bool(is_pinned),
"has_note": has_note,
"status": task_status,
"customer_id": member_id,
"retention_clues": retention_clues,
"talking_points": talking_points,
"service_summary": service_summary,
"service_records": service_records,
"ai_analysis": ai_analysis,
"notes": notes,
}
finally:
conn.close()