# -*- coding: utf-8 -*- """ 召回完成检测器(Recall Completion Detector) ETL 数据更新后,通过 FDW 读取助教服务记录, 匹配活跃任务标记为 completed,记录 completed_at 和 completed_task_type 快照, 触发 recall_completed 事件通知备注回溯重分类器。 由 trigger_jobs 中的 recall_completion_check 配置驱动(event: etl_data_updated)。 """ import json import logging logger = logging.getLogger(__name__) def _get_connection(): """延迟导入 get_connection,避免模块级导入失败。""" from app.database import get_connection return get_connection() def _insert_history( cur, task_id: int, action: str, old_status: str | None = None, new_status: str | None = None, old_task_type: str | None = None, new_task_type: str | None = None, detail: dict | None = None, ) -> None: """在 coach_task_history 中记录变更。""" cur.execute( """ INSERT INTO biz.coach_task_history (task_id, action, old_status, new_status, old_task_type, new_task_type, detail) VALUES (%s, %s, %s, %s, %s, %s, %s) """, ( task_id, action, old_status, new_status, old_task_type, new_task_type, json.dumps(detail) if detail else None, ), ) def run(payload: dict | None = None, job_id: int | None = None) -> dict: """ 召回完成检测主流程。 1. 从 trigger_jobs 读取 last_run_at 作为增量过滤基准 2. 获取所有 distinct site_id(从 active 任务中) 3. 对每个 site_id,SET LOCAL app.current_site_id 后 通过 FDW 读取 v_dwd_assistant_service_log 中 service_time > last_run_at 的新增服务记录 4. 对每条服务记录,查找 biz.coach_tasks 中匹配的 (site_id, assistant_id, member_id) 且 status='active' 的任务 5. 将匹配任务标记为 completed: - status = 'completed' - completed_at = 服务时间 - completed_task_type = 当前 task_type(快照) 6. 记录 coach_task_history 7. 触发 fire_event('recall_completed', {site_id, assistant_id, member_id, service_time}) 参数: payload: 事件载荷(event 触发时由 trigger_scheduler 传入) job_id: 触发器 job ID(由 trigger_scheduler 传入),用于在最终事务中 更新 last_run_at,保证 handler 数据变更与 last_run_at 原子提交 返回: {"completed_count": int} """ completed_count = 0 conn = _get_connection() try: # ── 1. 读取 last_run_at ── with conn.cursor() as cur: cur.execute( """ SELECT last_run_at FROM biz.trigger_jobs WHERE job_name = 'recall_completion_check' """ ) row = cur.fetchone() last_run_at = row[0] if row else None conn.commit() # ── 2. 获取所有有 active 任务的 distinct site_id ── with conn.cursor() as cur: cur.execute( """ SELECT DISTINCT site_id FROM biz.coach_tasks WHERE status = 'active' """ ) site_ids = [r[0] for r in cur.fetchall()] conn.commit() # ── 3. 逐 site_id 读取新增服务记录 ── for site_id in site_ids: try: count = _process_site(conn, site_id, last_run_at) completed_count += count except Exception: logger.exception( "处理门店召回检测失败: site_id=%s", site_id ) conn.rollback() # ── 事务安全(T5):handler 成功后更新 last_run_at ── # job_id 由 trigger_scheduler 传入,在 handler 最终事务中更新 # handler 异常时此处不会执行(异常向上传播),last_run_at 不变 if job_id is not None: from app.services.trigger_scheduler import update_job_last_run_at with conn.cursor() as cur: cur.execute("BEGIN") update_job_last_run_at(cur, job_id) conn.commit() finally: conn.close() logger.info("召回完成检测完成: completed_count=%d", completed_count) return {"completed_count": completed_count} def _process_site(conn, site_id: int, last_run_at) -> int: """ 处理单个门店的召回完成检测。 通过 FDW 读取新增服务记录,匹配 active 任务并标记 completed。 返回本门店完成的任务数。 """ completed = 0 # 通过 FDW 读取新增服务记录(需要 SET LOCAL 启用 RLS) with conn.cursor() as cur: cur.execute("BEGIN") cur.execute( "SET LOCAL app.current_site_id = %s", (str(site_id),) ) if last_run_at is not None: cur.execute( """ SELECT DISTINCT assistant_id, member_id, service_time FROM fdw_etl.v_dwd_assistant_service_log WHERE service_time > %s ORDER BY service_time ASC """, (last_run_at,), ) else: # 首次运行,读取所有服务记录 cur.execute( """ SELECT DISTINCT assistant_id, member_id, service_time FROM fdw_etl.v_dwd_assistant_service_log ORDER BY service_time ASC """ ) service_records = cur.fetchall() conn.commit() # ── 4-7. 逐条服务记录匹配并处理 ── for assistant_id, member_id, service_time in service_records: try: count = _process_service_record( conn, site_id, assistant_id, member_id, service_time ) completed += count except Exception: logger.exception( "处理服务记录失败: site_id=%s, assistant_id=%s, member_id=%s", site_id, assistant_id, member_id, ) conn.rollback() return completed def _process_service_record( conn, site_id: int, assistant_id: int, member_id: int, service_time, ) -> int: """ 处理单条服务记录:匹配 active 任务并标记 completed。 每条服务记录独立事务,失败不影响其他。 返回本次完成的任务数。 """ completed = 0 with conn.cursor() as cur: cur.execute("BEGIN") # 查找匹配的 active 召回类任务(仅完成召回任务,回访/关系构建不在此处理) cur.execute( """ SELECT id, task_type FROM biz.coach_tasks WHERE site_id = %s AND assistant_id = %s AND member_id = %s AND status = 'active' AND task_type IN ('high_priority_recall', 'priority_recall') """, (site_id, assistant_id, member_id), ) active_tasks = cur.fetchall() if not active_tasks: conn.commit() return 0 # 将所有匹配的 active 任务标记为 completed for task_id, task_type in active_tasks: cur.execute( """ UPDATE biz.coach_tasks SET status = 'completed', completed_at = %s, completed_task_type = %s, updated_at = NOW() WHERE id = %s AND status = 'active' """, (service_time, task_type, task_id), ) _insert_history( cur, task_id, action="completed", old_status="active", new_status="completed", old_task_type=task_type, new_task_type=task_type, detail={ "service_time": str(service_time), "completed_task_type": task_type, }, ) completed += 1 conn.commit() # ── 7. 触发 recall_completed 事件 ── # 延迟导入 fire_event 避免循环依赖 try: from app.services.trigger_scheduler import fire_event fire_event( "recall_completed", { "site_id": site_id, "assistant_id": assistant_id, "member_id": member_id, "service_time": str(service_time), }, ) except Exception: logger.exception( "触发 recall_completed 事件失败: site_id=%s, assistant_id=%s, member_id=%s", site_id, assistant_id, member_id, ) return completed