Files
Neo-ZQYY/apps/backend/app/services/performance_service.py

520 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
绩效服务
负责绩效概览PERF-1和绩效明细PERF-2的业务逻辑。
所有 FDW 查询通过 fdw_queries 模块执行,本模块不直接操作 SQL。
RNS1.1 组件 5。
"""
from __future__ import annotations
import logging
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
from fastapi import HTTPException
from app.services import fdw_queries
from app.services.task_manager import (
_get_assistant_id,
compute_income_trend,
map_course_type_class,
)
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免纯函数测试时触发模块级导入失败。"""
from app.database import get_connection
return get_connection()
# ---------------------------------------------------------------------------
# 纯函数:可被属性测试直接调用
# ---------------------------------------------------------------------------
# 头像颜色预定义集合
_AVATAR_COLORS = [
"#0052d9", "#e34d59", "#00a870", "#ed7b2f",
"#0594fa", "#a25eb5", "#f6c244", "#2ba471",
]
def avatar_char_color(name: str) -> tuple[str, str]:
"""从客户姓名计算 avatarChar 和 avatarColor。"""
if not name:
return ("?", _AVATAR_COLORS[0])
char = name[0]
color = _AVATAR_COLORS[ord(char) % len(_AVATAR_COLORS)]
return (char, color)
def format_income_desc(rate: float, hours: float) -> str:
"""
格式化收入明细描述。
格式: "{rate}元/h × {hours}h"
"""
# 去除不必要的尾零
rate_str = f"{rate:g}"
hours_str = f"{hours:g}"
return f"{rate_str}元/h × {hours_str}h"
def group_records_by_date(
records: list[dict], *, include_avatar: bool = False
) -> list[dict]:
"""
将服务记录按日期分组为 DateGroup 结构。
参数:
records: 服务记录列表(已按 settle_time DESC 排序)
include_avatar: 是否包含 avatarChar/avatarColorPERF-1 需要PERF-2 不需要)
返回按日期倒序排列的 DateGroup 列表。
"""
groups: dict[str, list[dict]] = defaultdict(list)
for rec in records:
settle_time = rec.get("settle_time")
if settle_time is None:
continue
# 提取日期字符串
if hasattr(settle_time, "strftime"):
date_key = settle_time.strftime("%Y-%m-%d")
else:
date_key = str(settle_time)[:10]
# 时间范围
start_time = rec.get("start_time")
end_time = rec.get("end_time")
time_range = _format_time_range(start_time, end_time)
raw_course_type = rec.get("course_type", "")
type_class = map_course_type_class(raw_course_type)
customer_name = rec.get("customer_name") or "未知客户"
record_item: dict = {
"customer_name": customer_name,
"time_range": time_range,
"hours": f"{rec.get('service_hours', 0.0):g}",
"course_type": raw_course_type or "基础课",
"course_type_class": type_class,
"location": rec.get("table_name") or "",
"income": f"{rec.get('income', 0.0):.2f}",
}
if include_avatar:
char, color = avatar_char_color(customer_name)
record_item["avatar_char"] = char
record_item["avatar_color"] = color
groups[date_key].append(record_item)
# 按日期倒序排列
sorted_dates = sorted(groups.keys(), reverse=True)
result = []
for date_key in sorted_dates:
recs = groups[date_key]
total_hours = sum(float(r["hours"]) for r in recs)
total_income = sum(float(r["income"]) for r in recs)
result.append({
"date": date_key,
"total_hours": f"{total_hours:g}",
"total_income": f"{total_income:.2f}",
"records": recs,
})
return result
def paginate_records(
records: list[dict], page: int, page_size: int
) -> tuple[list[dict], bool]:
"""
对记录列表进行分页。
返回 (当前页记录, has_more)。
"""
total = len(records)
start = (page - 1) * page_size
end = start + page_size
page_records = records[start:end]
has_more = total > page * page_size
return page_records, has_more
def compute_summary(records: list[dict]) -> dict:
"""
计算月度汇总。
返回 { total_count, total_hours, total_hours_raw, total_income }。
"""
total_count = len(records)
total_hours = sum(r.get("service_hours", 0.0) for r in records)
total_hours_raw = sum(r.get("service_hours_raw", 0.0) for r in records)
total_income = sum(r.get("income", 0.0) for r in records)
return {
"total_count": total_count,
"total_hours": round(total_hours, 2),
"total_hours_raw": round(total_hours_raw, 2),
"total_income": round(total_income, 2),
}
def _format_time_range(start_time, end_time) -> str:
"""格式化时间范围为 "HH:MM-HH:MM" 格式。"""
parts = []
for t in (start_time, end_time):
if t is None:
parts.append("--:--")
elif hasattr(t, "strftime"):
parts.append(t.strftime("%H:%M"))
else:
s = str(t)
# 尝试提取 HH:MM
if len(s) >= 16:
parts.append(s[11:16])
else:
parts.append(str(t))
return f"{parts[0]}-{parts[1]}"
def _format_date_label(dt) -> str:
"""格式化日期为 "M月D日" 格式。"""
if dt is None:
return ""
if hasattr(dt, "strftime"):
return f"{dt.month}{dt.day}"
s = str(dt)[:10]
try:
d = datetime.strptime(s, "%Y-%m-%d")
return f"{d.month}{d.day}"
except (ValueError, TypeError):
return s
# ---------------------------------------------------------------------------
# PERF-1: 绩效概览
# ---------------------------------------------------------------------------
async def get_overview(
user_id: int, site_id: int, year: int, month: int
) -> dict:
"""
绩效概览PERF-1
1. 获取 assistant_id
2. fdw_queries.get_salary_calc() → 档位/收入/费率
3. fdw_queries.get_service_records() → 按日期分组为 DateGroup含 avatarChar/avatarColor
4. 聚合新客/常客列表
5. 计算 incomeItems含 desc 费率描述)
6. 查询上月收入 lastMonthIncome
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# ── 1. 当月绩效数据 ──
salary = fdw_queries.get_salary_calc(conn, site_id, assistant_id, year, month)
# ── 2. 上月绩效数据(用于 lastMonthIncome ──
prev_year, prev_month = (year, month - 1) if month > 1 else (year - 1, 12)
prev_salary = None
try:
prev_salary = fdw_queries.get_salary_calc(
conn, site_id, assistant_id, prev_year, prev_month
)
except Exception:
logger.warning("FDW 查询上月绩效失败", exc_info=True)
last_month_income = prev_salary["total_income"] if prev_salary else 0.0
# ── 3. 服务记录(全量,用于 DateGroup + 新客/常客) ──
# 获取全部记录(不分页)
all_records = fdw_queries.get_service_records(
conn, site_id, assistant_id, year, month,
limit=10000, offset=0,
)
# 按日期分组(含 avatar
date_groups = group_records_by_date(all_records, include_avatar=True)
# ── 4. 新客/常客列表 ──
new_customers, regular_customers = _build_customer_lists(
conn, site_id, assistant_id, year, month, all_records
)
# ── 5. 构建响应 ──
# 助教信息(从 salary 或默认值)
coach_name = ""
coach_role = ""
store_name = ""
# ⚠️ auth 表结构: users(nickname), user_assistant_binding(binding_type)
# auth.sites 不存在; users 无 display_name; uab 无 role_label
try:
with conn.cursor() as cur:
cur.execute(
"""
SELECT u.nickname, uab.binding_type
FROM auth.user_assistant_binding uab
JOIN auth.users u ON uab.user_id = u.id
WHERE uab.assistant_id = %s AND uab.site_id = %s
LIMIT 1
""",
(assistant_id, site_id),
)
row = cur.fetchone()
if row:
coach_name = row[0] or ""
coach_role = row[1] or ""
conn.commit()
except Exception:
logger.warning("查询助教信息失败", exc_info=True)
current_income = salary["total_income"] if salary else 0.0
basic_rate = salary["basic_rate"] if salary else 0.0
incentive_rate = salary["incentive_rate"] if salary else 0.0
basic_hours = salary["basic_hours"] if salary else 0.0
bonus_hours = salary["bonus_hours"] if salary else 0.0
pd_money = salary["assistant_pd_money_total"] if salary else 0.0
cx_money = salary["assistant_cx_money_total"] if salary else 0.0
# 收入明细项
income_items = _build_income_items(
basic_rate, incentive_rate, basic_hours, bonus_hours,
pd_money, cx_money,
)
# 档位信息
next_basic_rate = salary["next_tier_basic_rate"] if salary else 0.0
next_incentive_rate = salary["next_tier_incentive_rate"] if salary else 0.0
upgrade_hours = salary["next_tier_hours"] if salary else 0.0
total_hours = salary["total_hours"] if salary else 0.0
upgrade_hours_needed = max(0.0, upgrade_hours - total_hours)
tier_completed = salary["tier_completed"] if salary else False
upgrade_bonus = 0.0 if tier_completed else (salary["bonus_money"] if salary else 0.0)
return {
"coach_name": coach_name,
"coach_role": coach_role,
"store_name": store_name,
"monthly_income": f"¥{current_income:,.0f}",
"last_month_income": f"¥{last_month_income:,.0f}",
"current_tier": {
"basic_rate": basic_rate,
"incentive_rate": incentive_rate,
},
"next_tier": {
"basic_rate": next_basic_rate,
"incentive_rate": next_incentive_rate,
},
"upgrade_hours_needed": round(upgrade_hours_needed, 2),
"upgrade_bonus": upgrade_bonus,
"income_items": income_items,
"monthly_total": f"¥{current_income:,.2f}",
"this_month_records": date_groups,
"new_customers": new_customers,
"regular_customers": regular_customers,
}
finally:
conn.close()
def _build_income_items(
basic_rate: float,
incentive_rate: float,
basic_hours: float,
bonus_hours: float,
pd_money: float,
cx_money: float,
) -> list[dict]:
"""构建收入明细项列表。"""
items = []
# 基础课收入
if basic_hours > 0 or pd_money > 0:
items.append({
"icon": "💰",
"label": "基础课收入",
"desc": format_income_desc(basic_rate, basic_hours),
"value": f"¥{pd_money:,.2f}",
})
# 激励课收入
if bonus_hours > 0 or cx_money > 0:
items.append({
"icon": "🎯",
"label": "激励课收入",
"desc": format_income_desc(incentive_rate, bonus_hours),
"value": f"¥{cx_money:,.2f}",
})
return items
def _build_customer_lists(
conn,
site_id: int,
assistant_id: int,
year: int,
month: int,
all_records: list[dict],
) -> tuple[list[dict], list[dict]]:
"""
构建新客和常客列表。
新客: 本月有服务记录但本月之前无记录的客户
常客: 本月服务次数 ≥ 2 的客户
"""
if not all_records:
return [], []
# 按 member_id 聚合本月记录
member_stats: dict[int, dict] = {}
for rec in all_records:
mid = rec.get("member_id")
if mid is None:
continue
if mid not in member_stats:
member_stats[mid] = {
"customer_name": rec.get("customer_name") or "未知客户",
"count": 0,
"total_hours": 0.0,
"total_income": 0.0,
"last_service": rec.get("settle_time"),
}
stats = member_stats[mid]
stats["count"] += 1
stats["total_hours"] += rec.get("service_hours", 0.0)
stats["total_income"] += rec.get("income", 0.0)
# 更新最后服务时间(记录已按 settle_time DESC 排序,第一条即最新)
if stats["last_service"] is None:
stats["last_service"] = rec.get("settle_time")
member_ids = list(member_stats.keys())
# 查询历史记录(本月之前是否有服务记录)
# ⚠️ 直连 ETL 库查询 app.v_dwd_assistant_service_log RLS 视图
# 列名映射: assistant_id → site_assistant_id, member_id → tenant_member_id,
# is_trash → is_delete (int, 0=正常), settle_time → create_time
historical_members: set[int] = set()
try:
start_date = f"{year}-{month:02d}-01"
with fdw_queries._fdw_context(conn, site_id) as cur:
cur.execute(
"""
SELECT DISTINCT tenant_member_id
FROM app.v_dwd_assistant_service_log
WHERE site_assistant_id = %s
AND is_delete = 0
AND create_time < %s::timestamptz
AND tenant_member_id = ANY(%s)
""",
(assistant_id, start_date, member_ids),
)
for row in cur.fetchall():
historical_members.add(row[0])
except Exception:
logger.warning("查询历史客户记录失败", exc_info=True)
new_customers = []
regular_customers = []
for mid, stats in member_stats.items():
name = stats["customer_name"]
char, color = avatar_char_color(name)
# 新客:历史无记录
if mid not in historical_members:
last_service_dt = stats["last_service"]
new_customers.append({
"name": name,
"avatar_char": char,
"avatar_color": color,
"last_service": _format_date_label(last_service_dt),
"count": stats["count"],
})
# 常客:本月 ≥ 2 次
if stats["count"] >= 2:
regular_customers.append({
"name": name,
"avatar_char": char,
"avatar_color": color,
"hours": round(stats["total_hours"], 2),
"income": f"¥{stats['total_income']:,.2f}",
"count": stats["count"],
})
# 新客按最后服务时间倒序
new_customers.sort(key=lambda x: x.get("last_service", ""), reverse=True)
# 常客按收入倒序
regular_customers.sort(
key=lambda x: float(x.get("income", "¥0").replace("¥", "").replace(",", "")),
reverse=True,
)
return new_customers, regular_customers
# ---------------------------------------------------------------------------
# PERF-2: 绩效明细
# ---------------------------------------------------------------------------
async def get_records(
user_id: int, site_id: int,
year: int, month: int, page: int, page_size: int,
) -> dict:
"""
绩效明细PERF-2
1. 获取 assistant_id
2. fdw_queries.get_service_records() 带分页
3. 按日期分组为 dateGroups不含 avatarChar/avatarColor
4. 计算 summary 汇总
5. 返回 { summary, dateGroups, hasMore }
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# 先获取全量记录用于 summary 计算
all_records = fdw_queries.get_service_records(
conn, site_id, assistant_id, year, month,
limit=100000, offset=0,
)
# 计算月度汇总
summary = compute_summary(all_records)
# 分页获取记录
offset = (page - 1) * page_size
page_records = fdw_queries.get_service_records(
conn, site_id, assistant_id, year, month,
limit=page_size, offset=offset,
)
# 判断 hasMore
has_more = len(all_records) > page * page_size
# 按日期分组(不含 avatar
date_groups = group_records_by_date(page_records, include_avatar=False)
return {
"summary": summary,
"date_groups": date_groups,
"has_more": has_more,
}
finally:
conn.close()