# -*- coding: utf-8 -*- """ 任务生成器(Task Generator) 每日 4:00 运行,基于 WBI/NCI/RS 指数为每个助教生成/更新任务。 本模块包含: - TaskPriority 枚举:任务类型优先级定义 - TASK_TYPE_PRIORITY 映射:task_type 字符串 → 优先级 - IndexData 数据类:客户-助教对的指数数据 - determine_task_type():根据指数确定任务类型(纯函数) - should_replace_task():判断是否应替换现有任务(纯函数) - compute_heart_icon():根据 RS 指数计算爱心 icon 档位(纯函数) """ from decimal import Decimal from dataclasses import dataclass from enum import IntEnum class TaskPriority(IntEnum): """任务类型优先级,数值越小优先级越高。""" HIGH_PRIORITY_RECALL = 0 PRIORITY_RECALL = 0 FOLLOW_UP_VISIT = 1 RELATIONSHIP_BUILDING = 2 TASK_TYPE_PRIORITY: dict[str, TaskPriority] = { "high_priority_recall": TaskPriority.HIGH_PRIORITY_RECALL, "priority_recall": TaskPriority.PRIORITY_RECALL, "follow_up_visit": TaskPriority.FOLLOW_UP_VISIT, "relationship_building": TaskPriority.RELATIONSHIP_BUILDING, } @dataclass class IndexData: """某客户-助教对的指数数据。""" site_id: int assistant_id: int member_id: int wbi: Decimal # 流失回赢指数 nci: Decimal # 新客转化指数 rs: Decimal # 关系强度指数 has_active_recall: bool # 是否有活跃召回任务 has_follow_up_note: bool # 召回完成后是否有回访备注 def determine_task_type(index_data: IndexData) -> str | None: """ 根据指数数据确定应生成的任务类型。 优先级规则(高 → 低): 1. max(WBI, NCI) > 7 → high_priority_recall 2. max(WBI, NCI) > 5 → priority_recall 3. RS < 6 → relationship_building 4. 不满足任何条件 → None(不生成任务) 返回: task_type 字符串或 None """ priority_score = max(index_data.wbi, index_data.nci) if priority_score > 7: return "high_priority_recall" if priority_score > 5: return "priority_recall" if index_data.rs < 6: return "relationship_building" return None def should_replace_task(existing_type: str, new_type: str) -> bool: """ 判断新任务类型是否应替换现有任务类型。 规则:类型不同即替换,相同类型不替换。 """ if existing_type == new_type: return False return True def compute_heart_icon(rs_score: Decimal) -> str: """ 根据 RS 指数计算爱心 icon 档位。 档位规则: - RS > 8.5 → 💖 - 7 < RS ≤ 8.5 → 🧡 - 5 < RS ≤ 7 → 💛 - RS ≤ 5 → 💙 """ if rs_score > Decimal("8.5"): return "💖" if rs_score > Decimal("7"): return "🧡" if rs_score > Decimal("5"): return "💛" return "💙" # ── run() 主流程 ────────────────────────────────────────────── import logging logger = logging.getLogger(__name__) def _get_connection(): """延迟导入 get_connection,避免纯函数测试时触发模块级导入失败。""" from app.database import get_connection return get_connection() def run() -> dict: """ 任务生成器主流程。 1. 通过 auth.user_assistant_binding 获取所有已绑定助教 2. 对每个助教-客户对,通过 FDW 读取 WBI/NCI/RS 指数 3. 调用 determine_task_type() 确定任务类型 4. 检查已存在的 active 任务:相同 task_type → 跳过; 不同 task_type → 关闭旧任务 + 创建新任务 + 记录 history 5. 处理 follow_up_visit 的 48 小时滞留机制(expires_at 填充) 6. 更新 trigger_jobs 时间戳 返回: {"created": int, "replaced": int, "skipped": int} """ stats = {"created": 0, "replaced": 0, "skipped": 0} conn = _get_connection() try: # ── 1. 获取所有已绑定助教 ── with conn.cursor() as cur: cur.execute( """ SELECT DISTINCT site_id, assistant_id FROM auth.user_assistant_binding WHERE assistant_id IS NOT NULL """ ) bindings = cur.fetchall() conn.commit() # ── 2. 逐助教处理 ── for site_id, assistant_id in bindings: try: _process_assistant(conn, site_id, assistant_id, stats) except Exception: logger.exception( "处理助教失败: site_id=%s, assistant_id=%s", site_id, assistant_id, ) conn.rollback() # ── 6. 更新 trigger_jobs 时间戳 ── with conn.cursor() as cur: cur.execute( """ UPDATE biz.trigger_jobs SET last_run_at = NOW() WHERE job_name = 'task_generator' """ ) conn.commit() finally: conn.close() logger.info( "任务生成器完成: created=%d, replaced=%d, skipped=%d", stats["created"], stats["replaced"], stats["skipped"], ) return stats def _process_assistant( conn, site_id: int, assistant_id: int, stats: dict ) -> None: """处理单个助教下所有客户-助教对的任务生成。""" # 通过 FDW 读取该助教关联的客户指数数据 # 需要 SET LOCAL app.current_site_id 以启用 RLS with conn.cursor() as cur: cur.execute("BEGIN") cur.execute( "SET LOCAL app.current_site_id = %s", (str(site_id),) ) # 读取 WBI(流失回赢指数) cur.execute( """ SELECT member_id, COALESCE(display_score, 0) FROM fdw_etl.v_dws_member_winback_index """ ) wbi_map = {row[0]: Decimal(str(row[1])) for row in cur.fetchall()} # 读取 NCI(新客转化指数) cur.execute( """ SELECT member_id, COALESCE(display_score, 0) FROM fdw_etl.v_dws_member_newconv_index """ ) nci_map = {row[0]: Decimal(str(row[1])) for row in cur.fetchall()} # 读取 RS(关系强度指数)— 按 assistant_id 过滤 cur.execute( """ SELECT member_id, COALESCE(rs_display, 0) FROM fdw_etl.v_dws_member_assistant_relation_index WHERE assistant_id = %s """, (assistant_id,), ) rs_map = {row[0]: Decimal(str(row[1])) for row in cur.fetchall()} conn.commit() # 合并所有涉及的 member_id all_member_ids = set(wbi_map.keys()) | set(nci_map.keys()) | set(rs_map.keys()) # 逐客户处理,每对独立事务 for member_id in all_member_ids: try: _process_member_task( conn, site_id, assistant_id, member_id, wbi_map.get(member_id, Decimal("0")), nci_map.get(member_id, Decimal("0")), rs_map.get(member_id, Decimal("0")), stats, ) except Exception: logger.exception( "处理客户任务失败: site_id=%s, assistant_id=%s, member_id=%s", site_id, assistant_id, member_id, ) conn.rollback() def _process_member_task( conn, site_id: int, assistant_id: int, member_id: int, wbi: Decimal, nci: Decimal, rs: Decimal, stats: dict, ) -> None: """ 处理单个客户-助教对的任务生成/更新。 每对独立事务,失败不影响其他。 """ index_data = IndexData( site_id=site_id, assistant_id=assistant_id, member_id=member_id, wbi=wbi, nci=nci, rs=rs, # follow_up_visit 条件由外部传入;当前简化:不自动生成 follow_up_visit # (follow_up_visit 由召回完成检测器触发,不在 task_generator 主动生成) has_active_recall=True, has_follow_up_note=True, ) new_task_type = determine_task_type(index_data) if new_task_type is None: # 不满足任何条件 → 检查是否有需要填充 expires_at 的 follow_up_visit _handle_no_task_condition(conn, site_id, assistant_id, member_id) stats["skipped"] += 1 return priority_score = max(wbi, nci) with conn.cursor() as cur: cur.execute("BEGIN") # ── 4. 检查已存在的 active 任务 ── cur.execute( """ SELECT id, task_type, expires_at, created_at FROM biz.coach_tasks WHERE site_id = %s AND assistant_id = %s AND member_id = %s AND status = 'active' ORDER BY created_at DESC """, (site_id, assistant_id, member_id), ) existing_tasks = cur.fetchall() # 检查是否已有相同 task_type 的 active 任务 same_type_exists = any(row[1] == new_task_type for row in existing_tasks) if same_type_exists: # 相同 task_type → 跳过 conn.commit() stats["skipped"] += 1 return # 不同 task_type 的 active 任务 → 关闭旧任务 + 创建新任务 for task_id, old_type, old_expires_at, old_created_at in existing_tasks: if should_replace_task(old_type, new_task_type): # 特殊处理:旧任务是 follow_up_visit → 填充 expires_at 而非直接 inactive if old_type == "follow_up_visit" and old_expires_at is None: # 需求 5.2: follow_up_visit 被高优先级任务顶替时, # 填充 expires_at = created_at + 48h,保持 active cur.execute( """ UPDATE biz.coach_tasks SET expires_at = created_at + INTERVAL '48 hours', updated_at = NOW() WHERE id = %s """, (task_id,), ) _insert_history( cur, task_id, action="expires_at_filled", old_status="active", new_status="active", old_task_type=old_type, new_task_type=old_type, detail={"reason": "higher_priority_task_created"}, ) else: # 关闭旧任务 cur.execute( """ UPDATE biz.coach_tasks SET status = 'inactive', updated_at = NOW() WHERE id = %s """, (task_id,), ) _insert_history( cur, task_id, action="type_change_close", old_status="active", new_status="inactive", old_task_type=old_type, new_task_type=new_task_type, ) stats["replaced"] += 1 # ── 创建新任务 ── # follow_up_visit 生成时 expires_at = NULL(需求 4.1) expires_at_val = None # 需求 4.4: 若新任务是 follow_up_visit 且已存在有 expires_at 的旧 follow_up_visit # → 旧任务已在上面被标记为 inactive,新任务 expires_at = NULL cur.execute( """ INSERT INTO biz.coach_tasks (site_id, assistant_id, member_id, task_type, status, priority_score, expires_at, parent_task_id) VALUES (%s, %s, %s, %s, 'active', %s, %s, %s) RETURNING id """, ( site_id, assistant_id, member_id, new_task_type, float(priority_score), expires_at_val, # parent_task_id: 关联最近被关闭的旧任务(如有) existing_tasks[0][0] if existing_tasks else None, ), ) new_task_id = cur.fetchone()[0] _insert_history( cur, new_task_id, action="created", old_status=None, new_status="active", old_task_type=existing_tasks[0][1] if existing_tasks else None, new_task_type=new_task_type, ) stats["created"] += 1 conn.commit() def _handle_no_task_condition( conn, site_id: int, assistant_id: int, member_id: int ) -> None: """ 当不满足任何任务生成条件时,检查是否有 follow_up_visit 需要填充 expires_at。 需求 4.2: 当 follow_up_visit 的触发条件不再满足时, 填充 expires_at = created_at + 48h。 """ with conn.cursor() as cur: cur.execute("BEGIN") cur.execute( """ SELECT id, expires_at FROM biz.coach_tasks WHERE site_id = %s AND assistant_id = %s AND member_id = %s AND task_type = 'follow_up_visit' AND status = 'active' AND expires_at IS NULL """, (site_id, assistant_id, member_id), ) rows = cur.fetchall() for task_id, _ in rows: cur.execute( """ UPDATE biz.coach_tasks SET expires_at = created_at + INTERVAL '48 hours', updated_at = NOW() WHERE id = %s """, (task_id,), ) _insert_history( cur, task_id, action="expires_at_filled", old_status="active", new_status="active", detail={"reason": "condition_no_longer_met"}, ) conn.commit() def _insert_history( cur, task_id: int, action: str, old_status: str | None = None, new_status: str | None = None, old_task_type: str | None = None, new_task_type: str | None = None, detail: dict | None = None, ) -> None: """在 coach_task_history 中记录变更。""" import json cur.execute( """ INSERT INTO biz.coach_task_history (task_id, action, old_status, new_status, old_task_type, new_task_type, detail) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( task_id, action, old_status, new_status, old_task_type, new_task_type, json.dumps(detail) if detail else None, ), )