Files
Neo-ZQYY/apps/backend/app/services/recall_detector.py
Neo caf179a5da feat: 2026-04-15~05-02 累积变更基线 — AI 重构 + Runtime Context + DWS 修复
涵盖(每条对应已存的审计记录):
- AI 模块拆分:apps/backend/app/ai/apps -> prompts/(8 个 APP + app2a 派生)
  audit: 2026-04-20__ai-module-complete.md
- admin-web AI 管理套件:AIDashboard / AIOperations / AIRunLogs / AITriggers / TriggerManager
  audit: 2026-04-21__admin-web-ai-management-suite.md
- App2 财务洞察 prompt v3 -> v5.1 + 小程序 AI 接入(chat / board-finance)
  audit: 2026-04-22__app2_prompt_v5_1_and_miniprogram_ai_insight.md
- App2 prewarm 全过滤器 + AI 触发器 cron reschedule
  audit: 2026-04-21__app2-finance-prewarm-all-filters.md
  migration: 20260420_ai_trigger_jobs_and_app2_prewarm.sql / 20260421_app2_prewarm_cron_reschedule.sql
- AppType 联合类型对齐 + adminAiAppTypes.test.ts
  audit: 2026-04-30__admin_web_ai_app_type_alignment.md
- DashScope tokens_used 提取修复
  audit: 2026-04-30__backend_dashscope_tokens_used_extraction.md
- App3 线索完整详情 prompt
  audit: 2026-05-01__backend_app3_full_detail_prompt.md
- Runtime Context 沙箱(5-1~5-2 主线):
  - 后端 schema/service + admin_runtime_context / xcx_runtime_clock 两个 router
  - admin-web RuntimeContext.tsx + miniprogram runtime-clock.ts
  - migration: 20260501__runtime_context_sandbox.sql
  - tools/db/verify_admin_web_sandbox.py + verify_sandbox_end_to_end.py
  - database/changes: 7 份 sandbox_* 验证报告
- 飞球 DWS 修复:finance_area_daily 区域汇总 + task_engine 调整
  + RLS 视图业务日上界(migration 20260502 + scripts/ops/gen_rls_business_date_migration.py)

合规:
- .gitignore 启用 tmp/ 排除
- 不入仓:apps/etl/connectors/feiqiu/.env(API_TOKEN secret,本地修改保留)

待验证清单:
- docs/audit/changes/2026-05-04__cumulative_baseline_pending_verification.md
  每个主题的功能完整性 / 上线验证几乎都未收口,按优先级 P0~P3 逐一处理
2026-05-04 02:30:19 +08:00

492 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
召回完成检测器Recall Completion Detector
ETL 数据更新后,扫描所有 MAIN 关系对的结算记录,
记录广义召回事件recall_events匹配活跃任务标记 completed
对所有到店的 MAIN 关联客户生成回访任务follow_up_visit
由 trigger_jobs 中的 recall_completion_check 配置驱动event: etl_data_updated
CHANGE 2026-04-08 | Fix-13 改造:
- 扫描范围从"有 active 任务的客户"扩大为"所有 os_label='MAIN' 的关联客户"
- 新增 recall_events 事件表记录广义召回(按天去重)
- 无 active 任务的客户到店也生成 follow_up_visit
CHANGE 2026-04-12 | 召回完成逻辑调整:
- settle_type=3 仅计入有 BONUS 服务的订单(纯商品不算到店)
- 门店级召回解除:客户到店后,未服务助教的召回任务标记 resolved
"""
import json
import logging
from datetime import timedelta
from app.services.runtime_context import (
LIVE_INSTANCE_ID,
MODE_LIVE,
MODE_SANDBOX,
get_runtime_context,
task_runtime_filter,
)
from app.trace.decorators import trace_service
logger = logging.getLogger(__name__)
def _get_connection():
"""延迟导入 get_connection避免模块级导入失败。"""
from app.database import get_connection
return get_connection()
def _insert_history(
cur,
task_id: int,
action: str,
old_status: str | None = None,
new_status: str | None = None,
old_task_type: str | None = None,
new_task_type: str | None = None,
detail: dict | None = None,
) -> None:
"""在 coach_task_history 中记录变更。"""
cur.execute(
"""
INSERT INTO biz.coach_task_history
(task_id, action, old_status, new_status,
old_task_type, new_task_type, detail)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(
task_id,
action,
old_status,
new_status,
old_task_type,
new_task_type,
json.dumps(detail) if detail else None,
),
)
@trace_service(description_zh="执行维客检测", description_en="Run recall detection")
def run(payload: dict | None = None, job_id: int | None = None) -> dict:
"""
召回完成检测主流程。
CHANGE 2026-04-08 | Fix-13 改造:扫描所有 MAIN 关系对。
1. 从 biz.sites 获取所有活跃门店
2. 对每个 site_id通过 _fdw_context 扫描 MAIN 关系对的结算记录
3. 有结算 → 写 recall_events + 完成任务(如有)+ 生成回访
"""
completed_count = 0
event_count = 0
resolved_count = 0
conn = _get_connection()
try:
# ── 1. 从业务库获取所有活跃门店 ──
with conn.cursor() as cur:
cur.execute(
"SELECT site_id FROM biz.sites WHERE is_active = true"
)
site_ids = [r[0] for r in cur.fetchall()]
conn.commit()
# ── 2. 逐 site_id 处理 ──
for site_id in site_ids:
try:
result = _process_site(conn, site_id)
completed_count += result["completed"]
event_count += result["events"]
resolved_count += result["resolved"]
except Exception:
logger.exception(
"处理门店召回检测失败: site_id=%s", site_id
)
conn.rollback()
# ── 更新 last_run_at兼容 trigger_scheduler 调度记录) ──
if job_id is not None:
from app.services.trigger_scheduler import update_job_last_run_at
with conn.cursor() as cur:
cur.execute("BEGIN")
update_job_last_run_at(cur, job_id)
conn.commit()
finally:
conn.close()
logger.info(
"召回完成检测完成: completed=%d, events=%d, resolved=%d",
completed_count, event_count, resolved_count,
)
return {
"completed_count": completed_count,
"event_count": event_count,
"resolved_count": resolved_count,
}
def _process_site(conn, site_id: int) -> dict:
"""
处理单个门店的广义召回检测。
CHANGE 2026-04-08 | Fix-13 改造:
1. 从 ETL 查询所有 os_label='MAIN' 的 (assistant_id, member_id) 对
2. 批量查询这些客户的最新结算记录
3. 对每个有新结算的关系对:写 recall_events + 完成任务 + 生成回访
CHANGE 2026-04-12 | 门店级召回解除:
4. 客户到店后,未被服务的助教的召回任务标记 resolved
"""
completed = 0
events = 0
resolved = 0
from app.services.fdw_queries import _fdw_context
runtime_ctx = get_runtime_context(site_id, conn=conn)
runtime_now = runtime_ctx.business_now
runtime_mode = MODE_SANDBOX if runtime_ctx.is_sandbox else MODE_LIVE
sandbox_instance_id = runtime_ctx.sandbox_instance_id if runtime_ctx.is_sandbox else LIVE_INSTANCE_ID
# ── 1. 获取本门店所有 MAIN 关系对 ──
with _fdw_context(conn, site_id) as cur:
cur.execute(
"""
SELECT assistant_id, member_id
FROM app.v_dws_member_assistant_relation_index
WHERE os_label = 'MAIN'
"""
)
main_pairs = [(r[0], r[1]) for r in cur.fetchall()]
if not main_pairs:
return {"completed": 0, "events": 0, "resolved": 0}
# ── 2. 批量查询这些客户的最新结算时间 ──
member_ids = list({mid for _, mid in main_pairs})
settlement_map: dict[tuple[int, int], object] = {} # (assistant_id, member_id) → latest_pay_time
with _fdw_context(conn, site_id) as cur:
# 助教级结算(用于狭义完成判定)
# settle_type=1 全部计入settle_type=3 仅计入有 BONUS 服务的
cur.execute(
"""
SELECT sl.site_assistant_id AS assistant_id,
sh.member_id,
MAX(sh.pay_time) AS latest_pay_time
FROM app.v_dwd_settlement_head sh
JOIN app.v_dwd_assistant_service_log sl
ON sl.order_settle_id = sh.order_settle_id
AND sl.is_delete = 0
WHERE sh.member_id = ANY(%s)
AND sh.pay_time <= %s
AND (
sh.settle_type = 1
OR (sh.settle_type = 3 AND sl.order_assistant_type = 2)
)
GROUP BY sl.site_assistant_id, sh.member_id
""",
(member_ids, runtime_now),
)
for row in cur.fetchall():
settlement_map[(row[0], row[1])] = row[2]
# 门店级到店检测(含无助教服务的 settle_type=1用于 resolved 判定)
cur.execute(
"""
SELECT sh.member_id, MAX(sh.pay_time) AS latest_pay_time
FROM app.v_dwd_settlement_head sh
WHERE sh.member_id = ANY(%s)
AND sh.pay_time <= %s
AND (
sh.settle_type = 1
OR (sh.settle_type = 3 AND EXISTS (
SELECT 1 FROM app.v_dwd_assistant_service_log sl
WHERE sl.order_settle_id = sh.order_settle_id
AND sl.is_delete = 0
AND sl.order_assistant_type = 2
))
)
GROUP BY sh.member_id
""",
(member_ids, runtime_now),
)
member_visited_map = {}
for row in cur.fetchall():
member_visited_map[row[0]] = row[1]
# ── 3. 获取本门店所有 active 的召回/回访任务(用于匹配) ──
active_tasks_map: dict[tuple[int, int], list] = {} # (assistant_id, member_id) → [(id, task_type, created_at)]
runtime_clause, runtime_params = task_runtime_filter(site_id, conn=conn)
with conn.cursor() as cur:
cur.execute(
f"""
SELECT id, assistant_id, member_id, task_type, created_at
FROM biz.coach_tasks
WHERE site_id = %s
{runtime_clause}
AND status = 'active'
AND task_type IN ('high_priority_recall', 'priority_recall', 'follow_up_visit')
""",
[site_id, *runtime_params],
)
for row in cur.fetchall():
key = (row[1], row[2])
active_tasks_map.setdefault(key, []).append(
{"id": row[0], "task_type": row[3], "created_at": row[4]}
)
conn.commit()
# ── 4. 逐关系对处理 ──
for assistant_id, member_id in main_pairs:
latest_pay = settlement_map.get((assistant_id, member_id))
if latest_pay is None:
continue
active_tasks = active_tasks_map.get((assistant_id, member_id), [])
try:
result = _process_pair(
conn, site_id, assistant_id, member_id,
latest_pay, active_tasks, runtime_now, runtime_mode, sandbox_instance_id,
)
completed += result["completed"]
events += result["events"]
except Exception:
logger.exception(
"处理关系对失败: site_id=%s, assistant_id=%s, member_id=%s",
site_id, assistant_id, member_id,
)
conn.rollback()
# ── 5. 门店级召回解除:客户到店后,未被服务的助教任务标记 resolved ──
# 服务助教的任务已在 Step 4 中 completedcommitted
# 此处查到的 active 召回任务是未被服务的助教持有的
for member_id, pay_time in member_visited_map.items():
try:
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute(
f"""
SELECT id, assistant_id, task_type, created_at
FROM biz.coach_tasks
WHERE site_id = %s AND member_id = %s
{runtime_clause}
AND status = 'active'
AND task_type IN ('high_priority_recall', 'priority_recall')
AND created_at < %s
""",
[site_id, member_id, *runtime_params, pay_time],
)
remaining = cur.fetchall()
for task_id, aid, task_type, _ in remaining:
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'resolved', updated_at = %s
WHERE id = %s AND status = 'active'
""",
(runtime_now, task_id),
)
_insert_history(
cur, task_id,
action="customer_returned",
old_status="active",
new_status="resolved",
old_task_type=task_type,
new_task_type=task_type,
detail={
"reason": "customer_visited_store",
"service_time": str(pay_time),
},
)
resolved += 1
conn.commit()
except Exception:
logger.exception(
"门店级召回解除失败: site_id=%s, member_id=%s",
site_id, member_id,
)
conn.rollback()
return {"completed": completed, "events": events, "resolved": resolved}
def _process_pair(
conn,
site_id: int,
assistant_id: int,
member_id: int,
latest_pay_time,
active_tasks: list[dict],
runtime_now,
runtime_mode: str,
sandbox_instance_id: str,
) -> dict:
"""
处理单个 MAIN 关系对的召回检测。
CHANGE 2026-04-08 | Fix-13 改造:
- 写 recall_eventsON CONFLICT DO NOTHING 按天去重)
- 有 active 召回任务且 pay_time > created_at → 完成任务
- 关闭旧回访 → 新建回访72h
- 无 active 任务也生成回访
返回: {"completed": int, "events": int}
"""
completed = 0
events = 0
with conn.cursor() as cur:
cur.execute("BEGIN")
# ── 1. 写 recall_events按天去重 ──
# 先查是否有匹配的召回任务(用于填充 task_id/task_type
recall_tasks = [
t for t in active_tasks
if t["task_type"] in ("high_priority_recall", "priority_recall")
and latest_pay_time > t["created_at"]
]
event_task_id = recall_tasks[0]["id"] if recall_tasks else None
event_task_type = recall_tasks[0]["task_type"] if recall_tasks else None
cur.execute(
"""
INSERT INTO biz.recall_events
(site_id, assistant_id, member_id, pay_time, task_id, task_type,
created_at, runtime_mode, sandbox_instance_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (site_id, assistant_id, member_id, runtime_mode, sandbox_instance_id,
(date_trunc('day', pay_time AT TIME ZONE 'Asia/Shanghai')))
DO NOTHING
RETURNING id
""",
(site_id, assistant_id, member_id, latest_pay_time,
event_task_id, event_task_type, runtime_now, runtime_mode, sandbox_instance_id),
)
inserted = cur.fetchone()
if inserted is None:
# 今天已记录过,跳过后续处理(避免重复生成回访)
conn.commit()
return {"completed": 0, "events": 0}
events = 1
# ── 2. 完成匹配的召回任务 ──
has_active_recall = len(recall_tasks) > 0
for task in recall_tasks:
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'completed',
completed_at = %s,
completed_task_type = %s,
completion_type = 'auto',
updated_at = %s
WHERE id = %s AND status = 'active'
""",
(latest_pay_time, task["task_type"], runtime_now, task["id"]),
)
_insert_history(
cur,
task["id"],
action="completed",
old_status="active",
new_status="completed",
old_task_type=task["task_type"],
new_task_type=task["task_type"],
detail={
"service_time": str(latest_pay_time),
"completed_task_type": task["task_type"],
},
)
completed += 1
# ── 3. 关闭已有的 active 回访任务 ──
cur.execute(
"""
SELECT id FROM biz.coach_tasks
WHERE site_id = %s AND assistant_id = %s AND member_id = %s
AND task_type = 'follow_up_visit' AND status = 'active'
AND runtime_mode = %s AND sandbox_instance_id = %s
""",
(site_id, assistant_id, member_id, runtime_mode, sandbox_instance_id),
)
old_follow_ups = cur.fetchall()
for (old_id,) in old_follow_ups:
cur.execute(
"""
UPDATE biz.coach_tasks
SET status = 'inactive', updated_at = %s
WHERE id = %s
""",
(runtime_now, old_id),
)
_insert_history(
cur, old_id,
action="superseded_by_new_visit",
old_status="active", new_status="inactive",
old_task_type="follow_up_visit", new_task_type="follow_up_visit",
detail={"reason": "new_service_record", "service_time": str(latest_pay_time)},
)
# ── 4. 创建新的回访任务72h / 3天过期 ──
expires_at = (
latest_pay_time + timedelta(hours=72)
if hasattr(latest_pay_time, '__add__') else None
)
cur.execute(
"""
INSERT INTO biz.coach_tasks
(site_id, assistant_id, member_id, task_type, status,
expires_at, created_at, updated_at, runtime_mode, sandbox_instance_id)
VALUES (%s, %s, %s, 'follow_up_visit', 'active', %s, %s, %s, %s, %s)
RETURNING id
""",
(
site_id, assistant_id, member_id, expires_at, runtime_now,
runtime_now, runtime_mode, sandbox_instance_id,
),
)
new_follow_up_id = cur.fetchone()[0]
_insert_history(
cur, new_follow_up_id,
action="created",
old_status=None, new_status="active",
new_task_type="follow_up_visit",
detail={
"reason": "service_record_detected",
"service_time": str(latest_pay_time),
"had_recall": has_active_recall,
},
)
conn.commit()
# ── 5. 触发 recall_completed 事件(仅当有召回任务被完成时) ──
if has_active_recall:
try:
from app.services.trigger_scheduler import fire_event
fire_event(
"recall_completed",
{
"site_id": site_id,
"assistant_id": assistant_id,
"member_id": member_id,
"service_time": str(latest_pay_time),
},
)
except Exception:
logger.exception(
"触发 recall_completed 事件失败: site_id=%s, assistant_id=%s, member_id=%s",
site_id, assistant_id, member_id,
)
return {"completed": completed, "events": events}