# -*- coding: utf-8 -*- """ 助教业务任务明细报告生成工具 分组结构:指标说明 → 总览 → 按助教分组(内按任务类型分组)→ 趋势分析 用法: python tools/task-analysis/gen_task_report.py python tools/task-analysis/gen_task_report.py --output docs/custom-report.md python tools/task-analysis/gen_task_report.py --status active,completed """ from __future__ import annotations import argparse import os import sys from datetime import date from collections import defaultdict, OrderedDict from pathlib import Path import psycopg2 _TOOL_DIR = Path(__file__).resolve().parent _ROOT = _TOOL_DIR.parents[1] from dotenv import load_dotenv load_dotenv(_ROOT / ".env", override=False) load_dotenv(_ROOT / "apps" / "etl" / "connectors" / "feiqiu" / ".env", override=False) # ── 常量 ── TYPE_ORDER = ["high_priority_recall", "priority_recall", "follow_up_visit", "relationship_building"] TYPE_LABELS = { "high_priority_recall": "高优先召回", "priority_recall": "优先召回", "follow_up_visit": "客户回访", "relationship_building": "关系构建", } STATUS_LABELS = { "active": "进行中", "completed": "已完成", "resolved": "已解除", "expired": "已过期", "inactive": "已关闭", } # 排序权重:已完成 -> 进行中 -> 已解除 -> 已过期 -> 已关闭 STATUS_SORT = {"completed": 0, "active": 1, "resolved": 2, "expired": 3, "inactive": 4} def fmt(v, digits=2): if v is None: return "-" return f"{float(v):.{digits}f}" def fmt_mobile(m): if not m: return "-" return str(m) # ── 数据加载 ── def load_etl_data(dsn: str) -> dict: conn = psycopg2.connect(dsn) conn.set_client_encoding("UTF8") cur = conn.cursor() # 助教(昵称 + 离职) cur.execute( "SELECT assistant_id, nickname, COALESCE(leave_status, 0) " "FROM dwd.dim_assistant WHERE scd2_is_current = 1" ) asst_names = {} asst_left = set() for r in cur.fetchall(): asst_names[r[0]] = r[1] if r[2] == 1: asst_left.add(r[0]) # 客户昵称 + 手机号 cur.execute("SELECT member_id, nickname, mobile FROM dwd.dim_member WHERE scd2_is_current = 1") member_names = {} member_mobiles = {} for r in cur.fetchall(): member_names[r[0]] = r[1] member_mobiles[r[0]] = r[2] # relation_index(最新快照) cur.execute(""" SELECT DISTINCT ON (assistant_id, member_id) assistant_id, member_id, rs_display, os_label, session_count, days_since_last_session, ms_display, ml_display FROM dws.dws_member_assistant_relation_index ORDER BY assistant_id, member_id, stat_date DESC """) ri_map = {} for r in cur.fetchall(): ri_map[(r[0], r[1])] = { "rs": r[2], "os_label": r[3], "sessions": r[4], "days_since": r[5], "ms": r[6], "ml": r[7], } # WBI(含 status 用于判断客户是否已转老客) cur.execute(""" SELECT DISTINCT ON (member_id) member_id, display_score, status FROM dws.dws_member_winback_index ORDER BY member_id, stat_date DESC """) wbi_map = {} wbi_status_map = {} for r in cur.fetchall(): wbi_map[r[0]] = r[1] wbi_status_map[r[0]] = r[2] # NCI cur.execute(""" SELECT DISTINCT ON (member_id) member_id, display_score FROM dws.dws_member_newconv_index ORDER BY member_id, stat_date DESC """) nci_map = {r[0]: r[1] for r in cur.fetchall()} # 客户最后到店时间(最新结算时间) cur.execute(""" SELECT member_id, MAX(pay_time)::date FROM dwd.dwd_settlement_head WHERE settle_type IN (1, 3) AND member_id > 0 GROUP BY member_id """) last_visit_map = {r[0]: r[1] for r in cur.fetchall()} conn.close() return { "asst_names": asst_names, "asst_left": asst_left, "member_names": member_names, "member_mobiles": member_mobiles, "ri": ri_map, "wbi": wbi_map, "wbi_status": wbi_status_map, "nci": nci_map, "last_visit": last_visit_map, } def load_tasks(dsn: str, status_filter: list[str] | None = None) -> list[tuple]: conn = psycopg2.connect(dsn) conn.set_client_encoding("UTF8") cur = conn.cursor() sql = """ SELECT id, assistant_id, member_id, task_type, status, priority_score, created_at::date, updated_at::date, completed_at::date, completion_type FROM biz.coach_tasks """ if status_filter: placeholders = ",".join(["%s"] * len(status_filter)) sql += f" WHERE status IN ({placeholders})" cur.execute(sql, status_filter) else: cur.execute(sql) rows = cur.fetchall() conn.close() return rows # -- 报告生成 -- def generate_report(etl: dict, tasks: list[tuple]) -> str: asst_names = etl["asst_names"] asst_left = etl["asst_left"] member_names = etl["member_names"] member_mobiles = etl["member_mobiles"] ri_map = etl["ri"] wbi_map = etl["wbi"] wbi_status = etl["wbi_status"] nci_map = etl["nci"] last_visit = etl["last_visit"] L = [] # 行缓冲 L.append("# 助教业务任务明细报告") L.append("") L.append(f"> 导出日期:{date.today()}") L.append("") # -- 指标说明 -- L.append("## 指标说明") L.append("") L.append("| 指标 | 全称 | 含义 | 范围 |") L.append("|------|------|------|------|") L.append("| **WBI** | 流失回赢指数 | 老客流失风险,越高越需要召回 | 0-10 |") L.append("| **NCI** | 新客转化指数 | 新客沉默风险,越高越需要跟进 | 0-10 |") L.append("| **RS** | 关系强度指数 | 助教与客户亲密度(频次+时长+近期性) | 0-10 |") L.append("| **OS** | 归属份额标签 | 客户主要由谁服务 | MAIN / COMANAGE / POOL |") L.append("| **MS** | 升温动量 | 近期频率 vs 长期频率 | 0-10 |") L.append("| **ML** | 付费关联 | 人工台账充值关联强度 | 0-10 |") L.append("") L.append("**OS**:MAIN=主服务(>=70%) | COMANAGE=共管(>=30%) | POOL=池内") L.append("") L.append("**任务判定**:`max(WBI,NCI)>7` -> 高优先召回 | `>5` -> 优先召回 | `1 关系构建 | 客户到店 -> 完成召回 -> 生成回访(48h)") L.append("") # -- 一、总览 -- status_count = defaultdict(int) type_status = defaultdict(lambda: defaultdict(int)) for t in tasks: status_count[t[4]] += 1 type_status[t[3]][t[4]] += 1 L.append("## 一、总览") L.append("") L.append("| 状态 | 数量 |") L.append("|------|------|") for s in ["active", "completed", "resolved", "expired", "inactive"]: if status_count[s]: L.append(f"| {STATUS_LABELS[s]} | {status_count[s]} |") L.append("") L.append("| 任务类型 | 进行中 | 已完成 | 已解除 | 已过期 | 已关闭 |") L.append("|----------|--------|--------|--------|--------|--------|") for tt in TYPE_ORDER: d = type_status[tt] L.append(f"| {TYPE_LABELS[tt]} | {d.get('active',0)} | {d.get('completed',0)} | {d.get('resolved',0)} | {d.get('expired',0)} | {d.get('inactive',0)} |") L.append("") # ── 二、助教任务详情(按助教 → 任务类型 → 状态排序) ── # 按助教分组 by_asst: dict[int, list[tuple]] = defaultdict(list) for t in tasks: by_asst[t[1]].append(t) # 助教排序:在职优先,二级按任务数降序 sorted_assts = sorted( by_asst.keys(), key=lambda a: (1 if a in asst_left else 0, -len(by_asst[a])), ) L.append("## 二、助教任务详情") L.append("") for aid in sorted_assts: atasks = by_asst[aid] aname = asst_names.get(aid, str(aid)) left_tag = "【已离职】" if aid in asst_left else "【在职】" active_n = sum(1 for t in atasks if t[4] == "active") completed_n = sum(1 for t in atasks if t[4] == "completed") L.append(f"### {aname}{left_tag}(进行中 {active_n} / 已完成 {completed_n} / 共 {len(atasks)})") L.append("") # 按任务类型分组 by_type: dict[str, list[tuple]] = defaultdict(list) for t in atasks: by_type[t[3]].append(t) for tt in TYPE_ORDER: tt_tasks = by_type.get(tt) if not tt_tasks: continue # 按状态排序:已完成 → 进行中 → 已过期 tt_tasks.sort(key=lambda t: (STATUS_SORT.get(t[4], 9), -(t[5] or 0))) L.append(f"**{TYPE_LABELS[tt]}**({len(tt_tasks)})") L.append("") if tt in ("high_priority_recall", "priority_recall"): L.append("| 状态 | 客户 | 手机号 | 优先级 | WBI | NCI | RS | OS | 服务次数 | 末次距今 | 创建 | 更新 | 最后到店 |") L.append("|------|------|--------|--------|-----|-----|----|-----|----------|----------|------|------|----------|") elif tt == "follow_up_visit": L.append("| 状态 | 客户 | 手机号 | RS | OS | 服务次数 | 创建 | 更新 | 最后到店 | 完成/过期 |") L.append("|------|------|--------|----|-----|----------|------|------|----------|-----------|") else: L.append("| 状态 | 客户 | 手机号 | RS | OS | 服务次数 | 末次距今 | MS | ML | 创建 | 更新 | 最后到店 |") L.append("|------|------|--------|----|-----|----------|----------|-----|-----|------|------|----------|") for t in tt_tasks: tid, _, mid, _, status, prio, created, updated, completed_at, comp_type = t mname = member_names.get(mid, str(mid)) mobile = fmt_mobile(member_mobiles.get(mid)) st = STATUS_LABELS.get(status, status) ri = ri_map.get((aid, mid), {}) wbi = wbi_map.get(mid) nci_raw = nci_map.get(mid) # 客户已转老客时,NCI 显示"老客"而非过时的历史高分 is_old = wbi_status.get(mid) == "OLD" nci_str = "老客" if (is_old and nci_raw is not None) else fmt(nci_raw) lv = last_visit.get(mid) lv_str = str(lv) if lv else "-" if tt in ("high_priority_recall", "priority_recall"): L.append( f"| {st} | {mname} | {mobile} | {fmt(prio)} | {fmt(wbi)} | {nci_str}" f" | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')} | {ri.get('sessions', '-')}" f" | {ri.get('days_since', '-')} | {created} | {updated} | {lv_str} |" ) elif tt == "follow_up_visit": L.append( f"| {st} | {mname} | {mobile} | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')}" f" | {ri.get('sessions', '-')} | {created} | {updated} | {lv_str} | {completed_at or '-'} |" ) else: L.append( f"| {st} | {mname} | {mobile} | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')}" f" | {ri.get('sessions', '-')} | {ri.get('days_since', '-')}" f" | {fmt(ri.get('ms'))} | {fmt(ri.get('ml'))} | {created} | {updated} | {lv_str} |" ) L.append("") # ── 三、趋势分析 ── L.append("## 三、趋势分析") L.append("") # 3.1 月度趋势 L.append("### 3.1 月度创建/完成/过期") L.append("") L.append("| 月份 | 创建 | 完成 | 过期 |") L.append("|------|------|------|------|") monthly = defaultdict(lambda: [0, 0, 0]) for t in tasks: m = t[6].replace(day=1) monthly[m][0] += 1 if t[4] == "completed": monthly[m][1] += 1 if t[4] == "expired": monthly[m][2] += 1 for m in sorted(monthly): c = monthly[m] L.append(f"| {m} | {c[0]} | {c[1]} | {c[2]} |") L.append("") # 3.2 召回完成率 recall_tasks = [t for t in tasks if t[3] in ("high_priority_recall", "priority_recall")] if recall_tasks: total_recall = len(recall_tasks) completed_recall = sum(1 for t in recall_tasks if t[4] == "completed") rate = completed_recall / total_recall * 100 if total_recall else 0 L.append("### 3.2 召回完成率") L.append("") L.append(f"- 召回任务总数:{total_recall}") L.append(f"- 已完成:{completed_recall}") L.append(f"- 完成率:**{rate:.1f}%**") L.append("") # 3.3 助教任务量排行 L.append("### 3.3 助教任务量排行(进行中)") L.append("") L.append("| 排名 | 助教 | 高优先 | 优先 | 回访 | 关系 | 合计 |") L.append("|------|------|--------|------|------|------|------|") asst_active = defaultdict(lambda: defaultdict(int)) for t in tasks: if t[4] == "active": asst_active[t[1]][t[3]] += 1 rank = 1 for aid in sorted(asst_active, key=lambda a: (1 if a in asst_left else 0, -sum(asst_active[a].values()))): d = asst_active[aid] aname = asst_names.get(aid, str(aid)) left_tag = "【离职】" if aid in asst_left else "" total = sum(d.values()) L.append( f"| {rank} | {aname}{left_tag} | {d.get('high_priority_recall',0)}" f" | {d.get('priority_recall',0)} | {d.get('follow_up_visit',0)}" f" | {d.get('relationship_building',0)} | {total} |" ) rank += 1 L.append("") return "\n".join(L) # ── CLI ── def main(): parser = argparse.ArgumentParser(description="助教业务任务明细报告") parser.add_argument("--output", "-o", default=str(_ROOT / "docs" / "assistant-task-detail-report.md")) parser.add_argument("--status", default=None, help="状态过滤,逗号分隔") args = parser.parse_args() pg_dsn = os.environ.get("PG_DSN") app_dsn = os.environ.get("APP_DB_DSN") if not pg_dsn or not app_dsn: print("错误:PG_DSN 和 APP_DB_DSN 必须在 .env 中配置", file=sys.stderr) sys.exit(1) status_filter = [s.strip() for s in args.status.split(",")] if args.status else None etl_data = load_etl_data(pg_dsn) tasks = load_tasks(app_dsn, status_filter) report = generate_report(etl_data, tasks) out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(report, encoding="utf-8") print(f"Done. {report.count(chr(10))+1} lines -> {out_path}") if __name__ == "__main__": main()