Files
Neo-ZQYY/apps/backend/app/services/task_generator.py
2026-04-10 06:24:13 +08:00

1269 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# AI_CHANGELOG
# - 2026-03-20 | Prompt: H2 FDW→直连ETL统一改造 | _process_assistant() 中 3 处 fdw_etl.v_dws_member_*
# 改为直连 ETL 库查询 app.v_dws_member_*。使用 fdw_queries._fdw_context()。
# 这是风险最高的改造点WBI/NCI 全表扫描无 WHERERLS 是唯一门店过滤手段。
# - 2026-03-24 07:00:13 | Prompt: P17 助教客户归属与任务生成引擎 | 完全重写 run() 和 _process_assistant()。
# 入口从 auth.user_assistant_binding 改为 OS 归属对fdw_queries.get_ownership_pairs
# 新增四级漏斗任务判定(含 os_label 约束)、客户转移子流程(三重保护)、
# 参数化配置cfg_task_generator_params。纯函数 determine_task_type/should_replace_task/
# compute_heart_icon 保持不变。
# 审计记录: docs/audit/changes/2026-03-24__p17-assistant-ownership-task-engine.md
# - 2026-03-25 | Prompt: 保底 relationship_building 任务 | _run_for_site() 新增 Step 3b
# 调用 _generate_baseline_relationship_tasks()。新增函数查询 session_count > 0 的全量
# 服务关系对 → 排除已有 active 任务的对 → 批量 upsert relationship_building 任务。
# 依赖 fdw_queries.get_all_service_pairs() 和 idx_coach_tasks_rb_unique_active 索引。
# - 2026-03-25 | Prompt: 保底任务独立连接修复 | 将 _generate_baseline_relationship_tasks()
# 从 _run_for_site() Step 3b 提升到 run_task_generation() 主循环,使用独立业务库连接,
# 避免与四级漏斗共享事务状态导致异常被静默吞掉。签名从 (conn, site_id, stats) 改为
# (site_id, stats),内部自行管理连接生命周期。
# -*- coding: utf-8 -*-
"""
任务生成器Task Generator— P17 重写版
每日 07:00 运行,基于 OS 归属 + WBI/NCI/RS 指数为助教生成/更新任务。
本模块包含:
- TaskPriority 枚举:任务类型优先级定义
- TASK_TYPE_PRIORITY 映射task_type 字符串 → 优先级
- IndexData 数据类:客户-助教对的指数数据
- determine_task_type():根据指数确定任务类型(纯函数)
- should_replace_task():判断是否应替换现有任务(纯函数)
- compute_heart_icon():根据 RS 指数计算爱心 icon 档位(纯函数)
- load_params():从 cfg_task_generator_params 加载参数(支持门店级覆盖)
- run()主流程P17 重写版,以 OS 归属为入口)
- _run_for_site():单门店处理
- _process_pair():单个 (assistant_id, member_id) 对的任务生成
- _run_transfer_check():客户转移子流程(三重保护)
"""
from decimal import Decimal
from dataclasses import dataclass
from enum import IntEnum
from app.trace.decorators import trace_service
class TaskPriority(IntEnum):
"""任务类型优先级,数值越小优先级越高。"""
HIGH_PRIORITY_RECALL = 0
PRIORITY_RECALL = 0
FOLLOW_UP_VISIT = 1
RELATIONSHIP_BUILDING = 2
TASK_TYPE_PRIORITY: dict[str, TaskPriority] = {
"high_priority_recall": TaskPriority.HIGH_PRIORITY_RECALL,
"priority_recall": TaskPriority.PRIORITY_RECALL,
"follow_up_visit": TaskPriority.FOLLOW_UP_VISIT,
"relationship_building": TaskPriority.RELATIONSHIP_BUILDING,
}
@dataclass
class IndexData:
"""某客户-助教对的指数数据。"""
site_id: int
assistant_id: int
member_id: int
wbi: Decimal # 流失回赢指数
nci: Decimal # 新客转化指数
rs: Decimal # 关系强度指数
has_active_recall: bool # 是否有活跃召回任务
has_follow_up_note: bool # 召回完成后是否有回访备注
@trace_service(description_zh="determine_task_type", description_en="Determine Task Type")
def determine_task_type(index_data: IndexData) -> str | None:
"""
根据指数数据确定应生成的任务类型。
优先级规则(高 → 低):
1. max(WBI, NCI) > 7 → high_priority_recall
2. max(WBI, NCI) > 5 → priority_recall
3. 1 < RS < 6 → relationship_buildingRS ≤ 1 视为无有效交互数据,不生成)
4. 不满足任何条件 → None不生成任务
返回: task_type 字符串或 None
"""
priority_score = max(index_data.wbi, index_data.nci)
if priority_score > 7:
return "high_priority_recall"
if priority_score > 5:
return "priority_recall"
# RS ≤ 1 视为"无有效交互数据"不生成任务1 < RS < 6 才生成关系构建
if index_data.rs > 1 and index_data.rs < 6:
return "relationship_building"
return None
@trace_service(description_zh="should_replace_task", description_en="Should Replace Task")
def should_replace_task(existing_type: str, new_type: str) -> bool:
"""
判断新任务类型是否应替换现有任务类型。
规则:类型不同即替换,相同类型不替换。
"""
if existing_type == new_type:
return False
return True
@trace_service(description_zh="compute_heart_icon", description_en="Compute Heart Icon")
def compute_heart_icon(rs_score: Decimal) -> str:
"""
根据 RS 指数计算爱心 icon 档位。
档位规则:
- RS > 8.5 → 💖
- 7 < RS ≤ 8.5 → 🧡
- 5 < RS ≤ 7 → 💛
- RS ≤ 5 → 💙
"""
if rs_score > Decimal("8.5"):
return "💖"
if rs_score > Decimal("7"):
return "🧡"
if rs_score > Decimal("5"):
return "💛"
return "💙"
# ── P17 参数加载 ─────────────────────────────────────────────
# 默认参数(与 cfg_task_generator_params 种子数据一致)
_DEFAULT_PARAMS: dict[str, float] = {
"high_priority_recall_threshold": 7.5,
"priority_recall_threshold": 4.0,
"rs_min_for_relationship": 1.0,
"rs_max_for_relationship": 6.0,
"consecutive_recall_fail_cycles": 3,
"min_wbi_for_transfer": 3.0,
"guard_assistant_coverage_ratio": 0.0, # 不再要求绑定率
"guard_new_assistant_days": 10,
"transfer_score_w_rs": 0.5,
"transfer_score_w_ms": 0.3,
"transfer_score_w_ml": 0.2,
"max_transfer_count": 4,
"follow_up_visit_retention_hours": 48,
# CHANGE 2026-03-29 | OS 分级分配:升级倍数参数
"escalation_comanage_multiplier": 2.5,
"escalation_pool_multiplier": 4.0,
"default_ideal_interval_days": 10.0,
}
def load_params(conn, site_id: int) -> dict[str, float]:
"""
从 biz.cfg_task_generator_params 加载参数。
继承链:代码默认 → 全局默认site_id IS NULL→ 门店覆盖site_id = ?
"""
params = dict(_DEFAULT_PARAMS)
with conn.cursor() as cur:
# 全局默认 + 门店覆盖一次查出
cur.execute(
"""
SELECT param_key, param_value, site_id
FROM biz.cfg_task_generator_params
WHERE site_id IS NULL OR site_id = %s
ORDER BY site_id NULLS FIRST
""",
(site_id,),
)
for key, value, sid in cur.fetchall():
# site_id IS NULL 先加载全局site_id = ? 后覆盖(门店级)
params[key] = float(value)
conn.commit()
return params
# ── run() 主流程P17 重写版)──────────────────────────────────
import logging
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免纯函数测试时触发模块级导入失败。"""
from app.database import get_connection
return get_connection()
@trace_service(description_zh="执行任务生成", description_en="Run task generation")
def run() -> dict:
"""
P17 任务生成器主流程。
1. 以 OS 归属为入口(取代旧的 user_assistant_binding 入口)
查询 v_dws_member_assistant_relation_index WHERE os_label IN (MAIN, COMANAGE)
2. 批量读取 WBI/NCI
3. 逐 (assistant_id, member_id) 对判定任务类型并写入
4. 客户转移检查(独立子流程)
5. 更新 trigger_jobs 时间戳
返回: {"created": int, "replaced": int, "skipped": int, "transferred": int}
"""
stats = {"created": 0, "replaced": 0, "skipped": 0, "transferred": 0}
conn = _get_connection()
try:
# ── 1. 获取所有涉及的 distinct site_id ──
# CHANGE 2026-03-29 | 从 ETL 关系指数表获取 site_ids替代 user_assistant_binding
# 确保未绑定小程序的门店也能生成任务数据
from app.services import fdw_queries
site_ids = fdw_queries.get_active_site_ids(conn)
# ── 2. 逐门店处理(四级漏斗:归属对任务生成) ──
for site_id in site_ids:
try:
_run_for_site(conn, site_id, stats)
except Exception:
logger.exception(
"处理门店失败: site_id=%s", site_id
)
conn.rollback()
# ── 3. 保底 relationship_building 任务(独立连接,避免事务污染) ──
# 对所有 session_count > 0 的全量关系对,若无任何 active 任务,
# 补充一条 relationship_building。用独立连接确保不受 Step 2 事务状态影响。
for site_id in site_ids:
try:
_generate_baseline_relationship_tasks(site_id, stats)
except Exception:
logger.exception(
"保底关系构建任务生成失败: site_id=%s", site_id
)
# ── 4. 统计任务数据并写入汇总表B: 月度 + C: 历史总计) ──
for site_id in site_ids:
try:
_update_task_stats(conn, site_id)
except Exception:
logger.exception("任务统计写入失败: site_id=%s", site_id)
conn.rollback()
# ── 5. 更新 trigger_jobs 时间戳 + 统计 ──
import json as _json
with conn.cursor() as cur:
cur.execute(
"""
UPDATE biz.trigger_jobs
SET last_run_at = NOW(),
last_stats = %s
WHERE job_name = 'task_generator'
""",
(_json.dumps(stats),),
)
conn.commit()
finally:
conn.close()
logger.info(
"任务生成器完成: created=%d, replaced=%d, skipped=%d, transferred=%d",
stats["created"],
stats["replaced"],
stats["skipped"],
stats["transferred"],
)
return stats
def _run_for_site(conn, site_id: int, stats: dict) -> None:
"""
单门店处理流程。
CHANGE 2026-03-31 | 客户级别升级/转移:
基于 t_v / ideal_interval 决定分配范围,不依赖任务是否存在。
- ratio < 2.5 → 只给 MAIN
- ratio ≥ 2.5 → MAIN + COMANAGE
- ratio ≥ 4.0 → MAIN + COMANAGE + POOL(最高RS)
- 无 MAIN 时兜底分配给 POOL
"""
from app.services import fdw_queries
params = load_params(conn, site_id)
# ── Step 1: 查询全量关系对 + 指数 ──
all_pairs = fdw_queries.get_all_service_pairs(conn, site_id)
if not all_pairs:
return
ownership_pairs = fdw_queries.get_ownership_pairs(conn, site_id)
wbi_map = fdw_queries.get_wbi_batch(conn, site_id)
nci_map = fdw_queries.get_nci_batch(conn, site_id)
ideal_interval_map = fdw_queries.get_ideal_interval_batch(conn, site_id)
# 批量读取 t_v从 WBI 表)
tv_map: dict[int, float] = {}
from app.services.fdw_queries import _fdw_context
with _fdw_context(conn, site_id) as cur:
cur.execute(
"SELECT member_id, t_v FROM app.v_dws_member_winback_index WHERE t_v IS NOT NULL"
)
for row in cur.fetchall():
tv_map[row[0]] = float(row[1])
# NCI 的 t_v
with _fdw_context(conn, site_id) as cur:
cur.execute(
"SELECT member_id, t_v FROM app.v_dws_member_newconv_index WHERE t_v IS NOT NULL"
)
for row in cur.fetchall():
if row[0] not in tv_map:
tv_map[row[0]] = float(row[1])
default_interval = params["default_ideal_interval_days"]
comanage_multiplier = params["escalation_comanage_multiplier"]
pool_multiplier = params["escalation_pool_multiplier"]
high_threshold = Decimal(str(params["high_priority_recall_threshold"]))
normal_threshold = Decimal(str(params["priority_recall_threshold"]))
# ── Step 2: 按 member_id 分组关系对 ──
pairs_by_member: dict[int, dict[str, list[dict]]] = {}
for p in ownership_pairs:
mid = p["member_id"]
if mid not in pairs_by_member:
pairs_by_member[mid] = {"MAIN": [], "COMANAGE": [], "POOL": []}
label = p["os_label"]
if label in pairs_by_member[mid]:
pairs_by_member[mid][label].append(p)
# 补充 POOL 对ownership_pairs 只有 MAIN/COMANAGE
for p in all_pairs:
mid = p["member_id"]
if mid not in pairs_by_member:
pairs_by_member[mid] = {"MAIN": [], "COMANAGE": [], "POOL": []}
# 如果不在 ownership_pairs 中,就是 POOL
aid = p["assistant_id"]
is_in_ownership = any(
op["assistant_id"] == aid and op["member_id"] == mid
for op in ownership_pairs
)
if not is_in_ownership:
pairs_by_member[mid]["POOL"].append(p)
# ── Step 3: 逐客户生成召回任务(客户级别升级/转移) ──
assigned_set: set[tuple[int, int]] = set() # 去重:(member_id, assistant_id)
for mid, groups in pairs_by_member.items():
wbi = wbi_map.get(mid, Decimal("0"))
nci = nci_map.get(mid, Decimal("0"))
priority_score = max(wbi, nci)
# 四级漏斗:判断任务类型
if priority_score > high_threshold:
task_type = "high_priority_recall"
elif priority_score > normal_threshold:
task_type = "priority_recall"
else:
# 不满足召回条件,跳过(关系构建由保底任务处理)
# 但仍然给 MAIN/COMANAGE 生成关系构建
for pair in groups["MAIN"]:
try:
_process_pair(conn, site_id, pair["assistant_id"], mid, wbi, nci, pair["rs"], params, stats)
except Exception:
logger.exception("处理 MAIN 关系构建失败: %s/%s", site_id, mid)
conn.rollback()
for pair in groups["COMANAGE"]:
try:
_process_pair_relationship_only(conn, site_id, pair["assistant_id"], mid, pair["rs"], params, stats)
except Exception:
logger.exception("处理 COMANAGE 关系构建失败: %s/%s", site_id, mid)
conn.rollback()
continue
# 计算 ratio = t_v / ideal_interval
t_v = tv_map.get(mid, 60.0)
ideal = ideal_interval_map.get(mid, default_interval)
if ideal <= 0:
ideal = default_interval
ratio = t_v / ideal
# 确定分配范围
targets: list[tuple[int, Decimal, str]] = [] # (assistant_id, rs, reason)
# 始终给 MAIN
for pair in groups["MAIN"]:
targets.append((pair["assistant_id"], pair["rs"], "MAIN"))
# ratio ≥ comanage_multiplier → 也给 COMANAGE
if ratio >= comanage_multiplier:
for pair in groups["COMANAGE"]:
targets.append((pair["assistant_id"], pair["rs"], "COMANAGE_escalation"))
# ratio ≥ pool_multiplier → 也给 POOL 最高 RS
if ratio >= pool_multiplier and groups["POOL"]:
best_pool = max(groups["POOL"], key=lambda p: float(p["rs"]))
targets.append((best_pool["assistant_id"], best_pool["rs"], "POOL_transfer"))
# 无 MAIN 兜底
if not groups["MAIN"]:
if ratio >= pool_multiplier and groups["POOL"]:
best_pool = max(groups["POOL"], key=lambda p: float(p["rs"]))
targets.append((best_pool["assistant_id"], best_pool["rs"], "POOL_fallback"))
elif ratio >= comanage_multiplier and groups["COMANAGE"]:
for pair in groups["COMANAGE"]:
targets.append((pair["assistant_id"], pair["rs"], "COMANAGE_fallback"))
elif groups["POOL"]:
best_pool = max(groups["POOL"], key=lambda p: float(p["rs"]))
targets.append((best_pool["assistant_id"], best_pool["rs"], "POOL_fallback"))
# 去重并生成任务
for aid, rs, reason in targets:
if (mid, aid) in assigned_set:
continue
assigned_set.add((mid, aid))
try:
_process_pair(conn, site_id, aid, mid, wbi, nci, rs, params, stats)
except Exception:
logger.exception(
"生成召回任务失败: site_id=%s, assistant=%s, member=%s, reason=%s",
site_id, aid, mid, reason,
)
conn.rollback()
# CHANGE 2026-03-29 | OS 分级分配COMANAGE 仅生成关系构建任务(不生成召回任务)
def _process_pair_relationship_only(
conn,
site_id: int,
assistant_id: int,
member_id: int,
rs: Decimal,
params: dict[str, float],
stats: dict,
) -> None:
"""COMANAGE 助教仅生成关系构建任务(召回任务需等升级检查)。"""
rs_min = Decimal(str(params["rs_min_for_relationship"]))
rs_max = Decimal(str(params["rs_max_for_relationship"]))
if not (rs > rs_min and rs < rs_max):
stats["skipped"] += 1
return
with conn.cursor() as cur:
cur.execute("BEGIN")
# 检查是否已有 active 的 relationship_building
cur.execute(
"""
SELECT id FROM biz.coach_tasks
WHERE site_id = %s AND assistant_id = %s AND member_id = %s
AND task_type = 'relationship_building' AND status = 'active'
""",
(site_id, assistant_id, member_id),
)
if cur.fetchone():
conn.commit()
stats["skipped"] += 1
return
cur.execute(
"""
INSERT INTO biz.coach_tasks
(site_id, assistant_id, member_id, task_type, status, priority_score)
VALUES (%s, %s, %s, 'relationship_building', 'active', %s)
ON CONFLICT (site_id, assistant_id, member_id, task_type)
WHERE status = 'active'
DO NOTHING
RETURNING id
""",
(site_id, assistant_id, member_id, float(rs)),
)
row = cur.fetchone()
if row:
_insert_history(cur, row[0], action="created",
old_status=None, new_status="active",
new_task_type="relationship_building",
detail={"reason": "comanage_relationship_building"})
stats["created"] += 1
else:
stats["skipped"] += 1
conn.commit()
def _process_pair(
conn,
site_id: int,
assistant_id: int,
member_id: int,
wbi: Decimal,
nci: Decimal,
rs: Decimal,
params: dict[str, float],
stats: dict,
) -> None:
"""
P17 处理单个 (assistant_id, member_id) 归属对的任务生成/更新。
参数化阈值从 params 读取P17 第 5 节),不再硬编码。
每对独立事务,失败不影响其他。
"""
# 参数化任务类型判定
priority_score = max(wbi, nci)
high_threshold = Decimal(str(params["high_priority_recall_threshold"]))
normal_threshold = Decimal(str(params["priority_recall_threshold"]))
rs_min = Decimal(str(params["rs_min_for_relationship"]))
rs_max = Decimal(str(params["rs_max_for_relationship"]))
# 四级漏斗P17 第 4.2 节)
if priority_score > high_threshold:
new_task_type = "high_priority_recall"
elif priority_score > normal_threshold:
new_task_type = "priority_recall"
elif rs > rs_min and rs < rs_max:
new_task_type = "relationship_building"
else:
new_task_type = None
if new_task_type is None:
_handle_no_task_condition(conn, site_id, assistant_id, member_id)
stats["skipped"] += 1
return
with conn.cursor() as cur:
cur.execute("BEGIN")
# ── 检查已存在的 active 任务 ──
cur.execute(
"""
SELECT id, task_type, expires_at, created_at
FROM biz.coach_tasks
WHERE site_id = %s
AND assistant_id = %s
AND member_id = %s
AND status = 'active'
ORDER BY created_at DESC
""",
(site_id, assistant_id, member_id),
)
existing_tasks = cur.fetchall()
# Case A: 已存在相同类型的 active 任务 → 跳过
same_type_exists = any(row[1] == new_task_type for row in existing_tasks)
if same_type_exists:
conn.commit()
stats["skipped"] += 1
return
# Case B: 不同类型的 active 任务 → 关闭旧任务 + 创建新任务
for task_id, old_type, old_expires_at, old_created_at in existing_tasks:
if should_replace_task(old_type, new_task_type):
# follow_up_visit 被高优先级任务顶替时,填充 expires_at 而非直接 inactive
if old_type == "follow_up_visit" and old_expires_at is None:
cur.execute(
"""
UPDATE biz.coach_tasks
SET expires_at = created_at + INTERVAL '48 hours',
updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_insert_history(
cur,
task_id,
action="expires_at_filled",
old_status="active",
new_status="active",
old_task_type=old_type,
new_task_type=old_type,
detail={"reason": "higher_priority_task_created"},
)
else:
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'inactive', updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_insert_history(
cur,
task_id,
action="type_change_close",
old_status="active",
new_status="inactive",
old_task_type=old_type,
new_task_type=new_task_type,
)
stats["replaced"] += 1
# ── 创建新任务 ──
expires_at_val = None
cur.execute(
"""
INSERT INTO biz.coach_tasks
(site_id, assistant_id, member_id, task_type, status,
priority_score, expires_at, parent_task_id)
VALUES (%s, %s, %s, %s, 'active', %s, %s, %s)
RETURNING id
""",
(
site_id,
assistant_id,
member_id,
new_task_type,
float(priority_score),
expires_at_val,
existing_tasks[0][0] if existing_tasks else None,
),
)
new_task_id = cur.fetchone()[0]
_insert_history(
cur,
new_task_id,
action="created",
old_status=None,
new_status="active",
old_task_type=existing_tasks[0][1] if existing_tasks else None,
new_task_type=new_task_type,
)
stats["created"] += 1
conn.commit()
# ── 客户转移子流程P17 第 3 节)──────────────────────────────
def _fdw_context_for_transfer(conn, site_id: int):
"""转移检查中直连 ETL 库查询 dim_assistant入职时间保护"""
from contextlib import contextmanager
from app.database import get_etl_readonly_connection
@contextmanager
def _ctx():
etl_conn = get_etl_readonly_connection(site_id)
try:
with etl_conn.cursor() as cur:
yield cur
etl_conn.commit()
finally:
etl_conn.close()
return _ctx()
def _run_transfer_check(
conn, site_id: int, params: dict[str, float],
ideal_interval_map: dict[int, float], stats: dict
) -> None:
"""
P17 客户转移子流程CHANGE 2026-03-29: 用升级倍数替代固定天数)。
三重保护:
1. 门店助教规模保护(绑定率 < 阈值 → 全局禁用)
2. 入驻时间保护(新助教 < N 天 → 不参与候选)
3. 服务关系门槛os_label 必须为 POOL
触发条件:任务未完成时长 / ideal_interval_days ≥ escalation_pool_multiplier
"""
from app.services import fdw_queries
# ── 保护 1: 门店助教规模检查 ──
with conn.cursor() as cur:
cur.execute(
"""
SELECT
COUNT(*) FILTER (WHERE assistant_id IS NOT NULL) AS bound_count,
COUNT(*) AS total_count
FROM auth.user_assistant_binding
WHERE site_id = %s
AND is_removed = false
""",
(site_id,),
)
row = cur.fetchone()
conn.commit()
if not row or row[1] == 0:
return
coverage_ratio = row[0] / row[1]
if coverage_ratio < params["guard_assistant_coverage_ratio"]:
logger.info(
"门店 %s 助教绑定率 %.2f < %.2f,跳过客户转移",
site_id, coverage_ratio, params["guard_assistant_coverage_ratio"],
)
return
# ── 查找 active 召回任务候选 ──
# CHANGE 2026-03-29 | 不再用固定天数过滤,改为查询所有 active 召回任务
# 后续在 Python 中用升级倍数判定
pool_multiplier = params["escalation_pool_multiplier"]
default_interval = params["default_ideal_interval_days"]
min_wbi = params["min_wbi_for_transfer"]
max_transfers = int(params["max_transfer_count"])
with conn.cursor() as cur:
cur.execute(
"""
SELECT ct.id, ct.assistant_id, ct.member_id, ct.task_type,
ct.transfer_count, ct.created_at
FROM biz.coach_tasks ct
WHERE ct.site_id = %s
AND ct.status = 'active'
AND ct.task_type IN ('high_priority_recall', 'priority_recall')
AND ct.transfer_count < %s
""",
(site_id, max_transfers),
)
candidates = cur.fetchall()
conn.commit()
if not candidates:
return
# 批量读取 WBI 用于过滤
wbi_map = fdw_queries.get_wbi_batch(conn, site_id)
guard_days = int(params["guard_new_assistant_days"])
w_rs = params["transfer_score_w_rs"]
w_ms = params["transfer_score_w_ms"]
w_ml = params["transfer_score_w_ml"]
from datetime import datetime, timezone
now = datetime.now(timezone.utc)
for task_id, from_assistant_id, member_id, task_type, transfer_count, created_at in candidates:
# CHANGE 2026-03-29 | 用升级倍数判定是否触发转移
ideal_days = ideal_interval_map.get(member_id, default_interval)
if ideal_days <= 0:
ideal_days = default_interval
task_age_days = (now - created_at).total_seconds() / 86400.0
escalation_ratio = task_age_days / ideal_days
if escalation_ratio < pool_multiplier:
continue
member_wbi = float(wbi_map.get(member_id, Decimal("0")))
if member_wbi < min_wbi:
continue
# 获取 POOL 候选助教(保护 3: 服务关系门槛)
pool = fdw_queries.get_pool_assistants(conn, site_id, member_id)
if not pool:
continue
# ── 保护 2: 入职时间保护CHANGE 2026-03-30: 改用 dim_assistant.entry_time ──
pool_assistant_ids = [a["assistant_id"] for a in pool]
with _fdw_context_for_transfer(conn, site_id) as cur:
cur.execute(
"""
SELECT assistant_id, entry_time
FROM dwd.dim_assistant
WHERE assistant_id = ANY(%s)
AND scd2_is_current = 1
AND COALESCE(leave_status, 0) = 0
""",
(pool_assistant_ids,),
)
entry_dates = {r[0]: r[1] for r in cur.fetchall()}
from datetime import datetime, timezone, timedelta
now = datetime.now(timezone.utc)
eligible = []
for a in pool:
aid = a["assistant_id"]
entry_at = entry_dates.get(aid)
if entry_at is None:
continue
# 入职时间保护
if (now - entry_at).days < guard_days:
continue
eligible.append(a)
if not eligible:
continue
# ── 按转移得分选最优候选 ──
best = max(
eligible,
key=lambda a: (
w_rs * float(a["rs"])
+ w_ms * float(a["ms"])
+ w_ml * float(a["ml"])
),
)
transfer_score = (
w_rs * float(best["rs"])
+ w_ms * float(best["ms"])
+ w_ml * float(best["ml"])
)
# ── 执行转移 ──
_do_transfer(
conn,
site_id=site_id,
member_id=member_id,
from_task_id=task_id,
from_assistant_id=from_assistant_id,
to_assistant_id=best["assistant_id"],
task_type=task_type,
transfer_count=transfer_count,
transfer_score=transfer_score,
max_transfers=max_transfers,
guard_checks={
"coverage_ratio": {"value": coverage_ratio, "threshold": params["guard_assistant_coverage_ratio"], "pass": True},
"new_assistant_days": {"value": guard_days, "pass": True},
"service_relation": {"os_label": "POOL", "pass": True},
},
stats=stats,
)
def _do_transfer(
conn,
*,
site_id: int,
member_id: int,
from_task_id: int,
from_assistant_id: int,
to_assistant_id: int,
task_type: str,
transfer_count: int,
transfer_score: float,
max_transfers: int,
guard_checks: dict,
stats: dict,
) -> None:
"""
执行单次客户转移。
1. 原任务标记为 'transferred'
2. 为新助教创建 active 任务(继承 transfer_count + 1
3. 写入 coach_task_transfer_log
4. 如果 transfer_count + 1 >= max_transfers新任务状态为 'pending_review'
"""
import json
new_transfer_count = transfer_count + 1
# 超过上限 → pending_review否则 active
new_status = "pending_review" if new_transfer_count >= max_transfers else "active"
with conn.cursor() as cur:
cur.execute("BEGIN")
# CHANGE 2026-03-30 | 转移时原任务保持 active不隐藏仅记录历史
_insert_history(
cur,
from_task_id,
action="transfer_spawned",
old_status="active",
new_status="active",
old_task_type=task_type,
new_task_type=task_type,
detail={
"to_assistant_id": to_assistant_id,
"transfer_score": transfer_score,
},
)
# 2. 为新助教创建任务
cur.execute(
"""
INSERT INTO biz.coach_tasks
(site_id, assistant_id, member_id, task_type, status,
priority_score, transfer_count, transferred_from)
VALUES (%s, %s, %s, %s, %s, 0, %s, %s)
RETURNING id
""",
(
site_id,
to_assistant_id,
member_id,
task_type,
new_status,
new_transfer_count,
from_task_id,
),
)
new_task_id = cur.fetchone()[0]
_insert_history(
cur,
new_task_id,
action="transferred_in",
old_status=None,
new_status=new_status,
old_task_type=None,
new_task_type=task_type,
detail={
"from_assistant_id": from_assistant_id,
"from_task_id": from_task_id,
"transfer_count": new_transfer_count,
},
)
# 3. 写入转移日志
cur.execute(
"""
INSERT INTO biz.coach_task_transfer_log
(site_id, member_id, from_assistant_id, to_assistant_id,
from_task_id, to_task_id, transfer_reason, guard_checks,
transfer_score)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
site_id,
member_id,
from_assistant_id,
to_assistant_id,
from_task_id,
new_task_id,
f"连续召回未完成WBI 持续高于阈值(第 {new_transfer_count} 次转移)",
json.dumps(guard_checks),
transfer_score,
),
)
conn.commit()
stats["transferred"] += 1
logger.info(
"客户转移: site=%s, member=%s, from_assistant=%s → to_assistant=%s, "
"score=%.2f, status=%s",
site_id, member_id, from_assistant_id, to_assistant_id,
transfer_score, new_status,
)
# ── 保底 relationship_building 任务生成 ─────────────────────────
def _update_task_stats(conn, site_id: int) -> None:
"""
统计任务数据并写入汇总表。
CHANGE 2026-03-31 | B: 月度汇总 + C: 历史总计
B: biz.dws_assistant_task_monthly — 按助教+月份 upsert
C: dws.dws_member_assistant_relation_index — 按助教-客户对更新历史总计
"""
# ── B: 月度汇总 ──
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
"""
INSERT INTO biz.dws_assistant_task_monthly
(site_id, assistant_id, stat_month,
recall_created, follow_up_created, relationship_created, total_created,
recall_completed, follow_up_completed, total_completed,
abandoned_count, transferred_count, updated_at)
SELECT
ct.site_id,
ct.assistant_id,
DATE_TRUNC('month', ct.created_at)::date AS stat_month,
COUNT(*) FILTER (WHERE ct.task_type IN ('high_priority_recall','priority_recall')) AS recall_created,
COUNT(*) FILTER (WHERE ct.task_type = 'follow_up_visit') AS follow_up_created,
COUNT(*) FILTER (WHERE ct.task_type = 'relationship_building') AS relationship_created,
COUNT(*) AS total_created,
COUNT(*) FILTER (WHERE ct.status = 'completed' AND ct.task_type IN ('high_priority_recall','priority_recall')) AS recall_completed,
COUNT(*) FILTER (WHERE ct.status = 'completed' AND ct.task_type = 'follow_up_visit') AS follow_up_completed,
COUNT(*) FILTER (WHERE ct.status = 'completed') AS total_completed,
COUNT(*) FILTER (WHERE ct.status = 'abandoned') AS abandoned_count,
COUNT(*) FILTER (WHERE ct.status = 'transferred') AS transferred_count,
NOW()
FROM biz.coach_tasks ct
WHERE ct.site_id = %s
GROUP BY ct.site_id, ct.assistant_id, DATE_TRUNC('month', ct.created_at)::date
ON CONFLICT (site_id, assistant_id, stat_month)
DO UPDATE SET
recall_created = EXCLUDED.recall_created,
follow_up_created = EXCLUDED.follow_up_created,
relationship_created = EXCLUDED.relationship_created,
total_created = EXCLUDED.total_created,
recall_completed = EXCLUDED.recall_completed,
follow_up_completed = EXCLUDED.follow_up_completed,
total_completed = EXCLUDED.total_completed,
abandoned_count = EXCLUDED.abandoned_count,
transferred_count = EXCLUDED.transferred_count,
updated_at = NOW()
""",
(site_id,),
)
conn.commit()
# ── C: 历史总计(写入 ETL 关系指数表) ──
try:
from app.database import get_etl_write_connection
etl_conn = get_etl_write_connection()
try:
# 先从业务库聚合历史总计
with conn.cursor() as cur:
cur.execute(
"""
SELECT assistant_id, member_id,
COUNT(*) FILTER (WHERE task_type IN ('high_priority_recall','priority_recall')) AS recall_created_total,
COUNT(*) FILTER (WHERE status = 'completed' AND task_type IN ('high_priority_recall','priority_recall')) AS recall_completed_total,
COUNT(*) FILTER (WHERE task_type = 'follow_up_visit') AS follow_up_created_total,
COUNT(*) FILTER (WHERE status = 'completed' AND task_type = 'follow_up_visit') AS follow_up_completed_total,
COUNT(*) AS total_created,
COUNT(*) FILTER (WHERE status = 'completed') AS total_completed
FROM biz.coach_tasks
WHERE site_id = %s
GROUP BY assistant_id, member_id
""",
(site_id,),
)
pair_stats = cur.fetchall()
conn.commit()
# 写入 ETL 关系指数表
with etl_conn.cursor() as cur:
for row in pair_stats:
aid, mid, rc, rcc, fc, fcc, tc, tcc = row
cur.execute(
"""
UPDATE dws.dws_member_assistant_relation_index
SET recall_created_total = %s,
recall_completed_total = %s,
follow_up_created_total = %s,
follow_up_completed_total = %s,
total_created = %s,
total_completed = %s
WHERE site_id = %s AND assistant_id = %s AND member_id = %s
""",
(rc, rcc, fc, fcc, tc, tcc, site_id, aid, mid),
)
etl_conn.commit()
finally:
etl_conn.close()
except Exception:
logger.exception("历史总计写入 ETL 关系指数表失败: site_id=%s", site_id)
def _generate_baseline_relationship_tasks(
site_id: int, stats: dict
) -> None:
"""
为所有确切发生过服务关系的 (assistant, member) 对生成保底 relationship_building 任务。
使用独立的业务库连接和 ETL 连接,避免与四级漏斗共享事务状态。
逻辑:
1. 从 ETL 查询 session_count > 0 的全量关系对
2. 排除已有任何类型 active 任务的对
3. 对剩余的 INSERT relationship_building利用 partial unique index 保证幂等)
⚠️ 不改变现有四级漏斗逻辑——漏斗已生成的任务不受影响,
本步骤只补充漏斗未覆盖的"保底"任务。
"""
# CHANGE 2026-03-25 | Prompt: 保底任务独立连接 | 从 _run_for_site 提升到
# run_task_generation 主循环,使用独立连接避免事务污染导致静默失败。
from app.services import fdw_queries
# Step 1: 全量服务关系对(独立 ETL 连接_fdw_context 内部管理)
biz_conn = _get_connection()
try:
all_pairs = fdw_queries.get_all_service_pairs(biz_conn, site_id)
if not all_pairs:
return
# Step 2: 查询该门店所有已有 active 任务的 (assistant_id, member_id) 对
with biz_conn.cursor() as cur:
cur.execute(
"""
SELECT assistant_id, member_id
FROM biz.coach_tasks
WHERE site_id = %s
AND status = 'active'
AND task_type IN (
'high_priority_recall', 'priority_recall',
'follow_up_visit', 'relationship_building'
)
""",
(site_id,),
)
existing_active = {(row[0], row[1]) for row in cur.fetchall()}
biz_conn.commit()
# Step 3: 过滤出需要生成保底任务的对
pairs_to_insert = [
p for p in all_pairs
if (p["assistant_id"], p["member_id"]) not in existing_active
]
if not pairs_to_insert:
return
# Step 4: 批量插入(利用 idx_coach_tasks_rb_unique_active partial unique index
for pair in pairs_to_insert:
try:
with biz_conn.cursor() as cur:
cur.execute(
"""
INSERT INTO biz.coach_tasks
(site_id, assistant_id, member_id, task_type, status,
priority_score)
VALUES (%s, %s, %s, 'relationship_building', 'active', %s)
ON CONFLICT (site_id, assistant_id, member_id, task_type)
WHERE status = 'active'
DO NOTHING
RETURNING id
""",
(
site_id,
pair["assistant_id"],
pair["member_id"],
float(pair["rs"]),
),
)
row = cur.fetchone()
if row:
_insert_history(
cur,
row[0],
action="created",
old_status=None,
new_status="active",
old_task_type=None,
new_task_type="relationship_building",
detail={"reason": "baseline_relationship_building"},
)
stats["created"] += 1
else:
stats["skipped"] += 1
biz_conn.commit()
except Exception:
logger.exception(
"保底任务插入失败: site_id=%s, assistant_id=%s, member_id=%s",
site_id,
pair["assistant_id"],
pair["member_id"],
)
biz_conn.rollback()
finally:
biz_conn.close()
# ── 辅助函数(保留自旧版)──────────────────────────────────────
def _handle_no_task_condition(
conn, site_id: int, assistant_id: int, member_id: int
) -> None:
"""
当不满足任何任务生成条件时:
1. follow_up_visit → 填充 expires_at = created_at + 48h
2. high_priority_recall / priority_recall → 直接关闭inactive
CHANGE 2026-03-24 | Prompt: 修复召回任务不自动关闭 bug |
原实现只处理 follow_up_visit召回任务指数降到 0 后永远不会被自动关闭。
"""
with conn.cursor() as cur:
cur.execute("BEGIN")
# ── 1. follow_up_visit: 填充 expires_at ──
cur.execute(
"""
SELECT id, expires_at
FROM biz.coach_tasks
WHERE site_id = %s
AND assistant_id = %s
AND member_id = %s
AND task_type = 'follow_up_visit'
AND status = 'active'
AND expires_at IS NULL
""",
(site_id, assistant_id, member_id),
)
for task_id, _ in cur.fetchall():
cur.execute(
"""
UPDATE biz.coach_tasks
SET expires_at = created_at + INTERVAL '48 hours',
updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_insert_history(
cur,
task_id,
action="expires_at_filled",
old_status="active",
new_status="active",
detail={"reason": "condition_no_longer_met"},
)
# ── 2. 召回任务: 指数不再满足阈值,直接关闭 ──
cur.execute(
"""
SELECT id, task_type
FROM biz.coach_tasks
WHERE site_id = %s
AND assistant_id = %s
AND member_id = %s
AND task_type IN ('high_priority_recall', 'priority_recall')
AND status = 'active'
""",
(site_id, assistant_id, member_id),
)
for task_id, old_type in cur.fetchall():
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'inactive', updated_at = NOW()
WHERE id = %s
""",
(task_id,),
)
_insert_history(
cur,
task_id,
action="auto_close_index_below_threshold",
old_status="active",
new_status="inactive",
old_task_type=old_type,
new_task_type=old_type,
detail={"reason": "index_below_threshold_no_task_generated"},
)
conn.commit()
def _insert_history(
cur,
task_id: int,
action: str,
old_status: str | None = None,
new_status: str | None = None,
old_task_type: str | None = None,
new_task_type: str | None = None,
detail: dict | None = None,
) -> None:
"""在 coach_task_history 中记录变更。"""
import json
cur.execute(
"""
INSERT INTO biz.coach_task_history
(task_id, action, old_status, new_status,
old_task_type, new_task_type, detail)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
task_id,
action,
old_status,
new_status,
old_task_type,
new_task_type,
json.dumps(detail) if detail else None,
),
)