Files
Neo-ZQYY/apps/backend/app/services/task_manager.py
2026-03-15 10:15:02 +08:00

402 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
任务管理服务
负责任务 CRUD、置顶、放弃、取消放弃等操作。
通过 FDW 读取客户信息和 RS 指数,计算爱心 icon 档位。
"""
import json
import logging
from decimal import Decimal
from fastapi import HTTPException
from app.services.task_generator import compute_heart_icon
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免纯函数测试时触发模块级导入失败。"""
from app.database import get_connection
return get_connection()
def _record_history(
cur,
task_id: int,
action: str,
old_status: str | None = None,
new_status: str | None = None,
old_task_type: str | None = None,
new_task_type: str | None = None,
detail: dict | None = None,
) -> None:
"""在 coach_task_history 中记录变更。"""
cur.execute(
"""
INSERT INTO biz.coach_task_history
(task_id, action, old_status, new_status,
old_task_type, new_task_type, detail)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
task_id,
action,
old_status,
new_status,
old_task_type,
new_task_type,
json.dumps(detail) if detail else None,
),
)
def _get_assistant_id(conn, user_id: int, site_id: int) -> int:
"""
通过 user_assistant_binding 获取 assistant_id。
找不到绑定关系时抛出 403。
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT assistant_id
FROM auth.user_assistant_binding
WHERE user_id = %s AND site_id = %s AND assistant_id IS NOT NULL
LIMIT 1
""",
(user_id, site_id),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=403, detail="权限不足")
return row[0]
def _verify_task_ownership(
conn, task_id: int, assistant_id: int, site_id: int, required_status: str | None = None
) -> dict:
"""
验证任务归属并返回任务信息。
- 任务不存在 → 404
- 不属于当前助教 → 403
- required_status 不匹配 → 409
"""
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, task_type, status, is_pinned, abandon_reason,
assistant_id, site_id
FROM biz.coach_tasks
WHERE id = %s
""",
(task_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(status_code=404, detail="资源不存在")
task = {
"id": row[0],
"task_type": row[1],
"status": row[2],
"is_pinned": row[3],
"abandon_reason": row[4],
"assistant_id": row[5],
"site_id": row[6],
}
if task["site_id"] != site_id or task["assistant_id"] != assistant_id:
raise HTTPException(status_code=403, detail="权限不足")
if required_status and task["status"] != required_status:
raise HTTPException(status_code=409, detail="任务状态不允许此操作")
return task
async def get_task_list(user_id: int, site_id: int) -> list[dict]:
"""
获取助教的任务列表(含有效 + 已放弃)。
1. 通过 auth.user_assistant_binding 获取 assistant_id
2. 查询 biz.coach_tasks WHERE status IN ('active', 'abandoned')
3. 通过 FDW 读取客户基本信息dim_member和 RS 指数
4. 计算爱心 icon 档位
5. 排序abandoned 排最后 → is_pinned DESC → priority_score DESC → created_at ASC
FDW 查询需要 SET LOCAL app.current_site_id。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
# 查询有效 + 已放弃任务abandoned 排最后)
with conn.cursor() as cur:
cur.execute(
"""
SELECT id, task_type, status, priority_score, is_pinned,
expires_at, created_at, member_id, abandon_reason
FROM biz.coach_tasks
WHERE site_id = %s
AND assistant_id = %s
AND status IN ('active', 'abandoned')
ORDER BY
CASE WHEN status = 'abandoned' THEN 1 ELSE 0 END ASC,
is_pinned DESC,
priority_score DESC NULLS LAST,
created_at ASC
""",
(site_id, assistant_id),
)
tasks = cur.fetchall()
conn.commit()
if not tasks:
return []
member_ids = list({t[7] for t in tasks})
# 通过 FDW 读取客户信息和 RS 指数(需要 SET LOCAL app.current_site_id
member_info_map: dict[int, dict] = {}
rs_map: dict[int, Decimal] = {}
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"SET LOCAL app.current_site_id = %s", (str(site_id),)
)
# 读取客户基本信息
cur.execute(
"""
SELECT member_id, member_name, member_phone
FROM fdw_etl.v_dim_member
WHERE member_id = ANY(%s)
""",
(member_ids,),
)
for row in cur.fetchall():
member_info_map[row[0]] = {
"member_name": row[1],
"member_phone": row[2],
}
# 读取 RS 指数
cur.execute(
"""
SELECT member_id, COALESCE(rs_display, 0)
FROM fdw_etl.v_dws_member_assistant_relation_index
WHERE assistant_id = %s
AND member_id = ANY(%s)
""",
(assistant_id, member_ids),
)
for row in cur.fetchall():
rs_map[row[0]] = Decimal(str(row[1]))
conn.commit()
# 组装结果
result = []
for task_row in tasks:
(task_id, task_type, status, priority_score,
is_pinned, expires_at, created_at, member_id, abandon_reason) = task_row
info = member_info_map.get(member_id, {})
rs_score = rs_map.get(member_id, Decimal("0"))
heart_icon = compute_heart_icon(rs_score)
result.append({
"id": task_id,
"task_type": task_type,
"status": status,
"priority_score": float(priority_score) if priority_score else None,
"is_pinned": is_pinned,
"expires_at": expires_at.isoformat() if expires_at else None,
"created_at": created_at.isoformat() if created_at else None,
"member_id": member_id,
"member_name": info.get("member_name"),
"member_phone": info.get("member_phone"),
"rs_score": float(rs_score),
"heart_icon": heart_icon,
"abandon_reason": abandon_reason,
})
return result
finally:
conn.close()
async def pin_task(task_id: int, user_id: int, site_id: int) -> dict:
"""
置顶任务。
验证任务归属后设置 is_pinned=TRUE记录 history。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="active"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET is_pinned = TRUE, updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_record_history(
cur,
task_id,
action="pin",
old_status="active",
new_status="active",
old_task_type=task["task_type"],
new_task_type=task["task_type"],
detail={"is_pinned": True},
)
conn.commit()
return {"id": task_id, "is_pinned": True}
finally:
conn.close()
async def unpin_task(task_id: int, user_id: int, site_id: int) -> dict:
"""
取消置顶。
验证任务归属后设置 is_pinned=FALSE。
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="active"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET is_pinned = FALSE, updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
conn.commit()
return {"id": task_id, "is_pinned": False}
finally:
conn.close()
async def abandon_task(
task_id: int, user_id: int, site_id: int, reason: str
) -> dict:
"""
放弃任务。
1. 验证 reason 非空(空或纯空白 → 422
2. 验证任务归属和 status='active'
3. 设置 status='abandoned', abandon_reason=reason
4. 记录 coach_task_history
"""
if not reason or not reason.strip():
raise HTTPException(status_code=422, detail="放弃原因不能为空")
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="active"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'abandoned',
abandon_reason = %s,
updated_at = NOW()
WHERE id = %s
""",
(reason, task_id),
)
_record_history(
cur,
task_id,
action="abandon",
old_status="active",
new_status="abandoned",
old_task_type=task["task_type"],
new_task_type=task["task_type"],
detail={"abandon_reason": reason},
)
conn.commit()
return {"id": task_id, "status": "abandoned"}
finally:
conn.close()
async def cancel_abandon(task_id: int, user_id: int, site_id: int) -> dict:
"""
取消放弃。
1. 验证任务归属和 status='abandoned'
2. 恢复 status='active', 清空 abandon_reason
3. 记录 coach_task_history
"""
conn = _get_connection()
try:
assistant_id = _get_assistant_id(conn, user_id, site_id)
task = _verify_task_ownership(
conn, task_id, assistant_id, site_id, required_status="abandoned"
)
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'active',
is_pinned = FALSE,
abandon_reason = NULL,
updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_record_history(
cur,
task_id,
action="cancel_abandon",
old_status="abandoned",
new_status="active",
old_task_type=task["task_type"],
new_task_type=task["task_type"],
)
conn.commit()
return {"id": task_id, "status": "active", "is_pinned": False}
finally:
conn.close()