主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
- 新增 GET /xcx/coaches/{id}/banner 轻量接口
- performance/records 加 coach_id 参数 + view_board_coach 权限分流
- coach/customer/performance/board/task 服务层重构
- fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
- task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
- recall_detector settle_type=3 双重限制 + 门店级 resolved
主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
- perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
- isScattered 散客标记端到端
- foodDetail/phoneFull/creator* 字段透传
主线 3: P19 指数回测框架 Phase 1+2
- 3 个指数表 stat_date 日快照模式
- 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
- task_engine 升级 HTTP 实时 + 推演回测双模式
主线 4: Core 维度层启用
- 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
- 修复 app 视图空查询问题
主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口
主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
- schema 基线与 DDL 快照同步
主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)
附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具
合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
391 lines
15 KiB
Python
391 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
助教业务任务明细报告生成工具
|
||
|
||
分组结构:指标说明 → 总览 → 按助教分组(内按任务类型分组)→ 趋势分析
|
||
|
||
用法:
|
||
python tools/task-analysis/gen_task_report.py
|
||
python tools/task-analysis/gen_task_report.py --output docs/custom-report.md
|
||
python tools/task-analysis/gen_task_report.py --status active,completed
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import os
|
||
import sys
|
||
from datetime import date
|
||
from collections import defaultdict, OrderedDict
|
||
from pathlib import Path
|
||
|
||
import psycopg2
|
||
|
||
_TOOL_DIR = Path(__file__).resolve().parent
|
||
_ROOT = _TOOL_DIR.parents[1]
|
||
|
||
from dotenv import load_dotenv
|
||
load_dotenv(_ROOT / ".env", override=False)
|
||
load_dotenv(_ROOT / "apps" / "etl" / "connectors" / "feiqiu" / ".env", override=False)
|
||
|
||
# ── 常量 ──
|
||
|
||
TYPE_ORDER = ["high_priority_recall", "priority_recall", "follow_up_visit", "relationship_building"]
|
||
TYPE_LABELS = {
|
||
"high_priority_recall": "高优先召回",
|
||
"priority_recall": "优先召回",
|
||
"follow_up_visit": "客户回访",
|
||
"relationship_building": "关系构建",
|
||
}
|
||
STATUS_LABELS = {
|
||
"active": "进行中", "completed": "已完成",
|
||
"resolved": "已解除", "expired": "已过期", "inactive": "已关闭",
|
||
}
|
||
# 排序权重:已完成 -> 进行中 -> 已解除 -> 已过期 -> 已关闭
|
||
STATUS_SORT = {"completed": 0, "active": 1, "resolved": 2, "expired": 3, "inactive": 4}
|
||
|
||
|
||
def fmt(v, digits=2):
|
||
if v is None:
|
||
return "-"
|
||
return f"{float(v):.{digits}f}"
|
||
|
||
|
||
def fmt_mobile(m):
|
||
if not m:
|
||
return "-"
|
||
return str(m)
|
||
|
||
|
||
# ── 数据加载 ──
|
||
|
||
def load_etl_data(dsn: str) -> dict:
|
||
conn = psycopg2.connect(dsn)
|
||
conn.set_client_encoding("UTF8")
|
||
cur = conn.cursor()
|
||
|
||
# 助教(昵称 + 离职)
|
||
cur.execute(
|
||
"SELECT assistant_id, nickname, COALESCE(leave_status, 0) "
|
||
"FROM dwd.dim_assistant WHERE scd2_is_current = 1"
|
||
)
|
||
asst_names = {}
|
||
asst_left = set()
|
||
for r in cur.fetchall():
|
||
asst_names[r[0]] = r[1]
|
||
if r[2] == 1:
|
||
asst_left.add(r[0])
|
||
|
||
# 客户昵称 + 手机号
|
||
cur.execute("SELECT member_id, nickname, mobile FROM dwd.dim_member WHERE scd2_is_current = 1")
|
||
member_names = {}
|
||
member_mobiles = {}
|
||
for r in cur.fetchall():
|
||
member_names[r[0]] = r[1]
|
||
member_mobiles[r[0]] = r[2]
|
||
|
||
# relation_index(最新快照)
|
||
cur.execute("""
|
||
SELECT DISTINCT ON (assistant_id, member_id)
|
||
assistant_id, member_id, rs_display, os_label,
|
||
session_count, days_since_last_session, ms_display, ml_display
|
||
FROM dws.dws_member_assistant_relation_index
|
||
ORDER BY assistant_id, member_id, stat_date DESC
|
||
""")
|
||
ri_map = {}
|
||
for r in cur.fetchall():
|
||
ri_map[(r[0], r[1])] = {
|
||
"rs": r[2], "os_label": r[3],
|
||
"sessions": r[4], "days_since": r[5], "ms": r[6], "ml": r[7],
|
||
}
|
||
|
||
# WBI(含 status 用于判断客户是否已转老客)
|
||
cur.execute("""
|
||
SELECT DISTINCT ON (member_id) member_id, display_score, status
|
||
FROM dws.dws_member_winback_index ORDER BY member_id, stat_date DESC
|
||
""")
|
||
wbi_map = {}
|
||
wbi_status_map = {}
|
||
for r in cur.fetchall():
|
||
wbi_map[r[0]] = r[1]
|
||
wbi_status_map[r[0]] = r[2]
|
||
|
||
# NCI
|
||
cur.execute("""
|
||
SELECT DISTINCT ON (member_id) member_id, display_score
|
||
FROM dws.dws_member_newconv_index ORDER BY member_id, stat_date DESC
|
||
""")
|
||
nci_map = {r[0]: r[1] for r in cur.fetchall()}
|
||
|
||
# 客户最后到店时间(最新结算时间)
|
||
cur.execute("""
|
||
SELECT member_id, MAX(pay_time)::date
|
||
FROM dwd.dwd_settlement_head
|
||
WHERE settle_type IN (1, 3) AND member_id > 0
|
||
GROUP BY member_id
|
||
""")
|
||
last_visit_map = {r[0]: r[1] for r in cur.fetchall()}
|
||
|
||
conn.close()
|
||
return {
|
||
"asst_names": asst_names, "asst_left": asst_left,
|
||
"member_names": member_names, "member_mobiles": member_mobiles,
|
||
"ri": ri_map,
|
||
"wbi": wbi_map, "wbi_status": wbi_status_map,
|
||
"nci": nci_map, "last_visit": last_visit_map,
|
||
}
|
||
|
||
|
||
def load_tasks(dsn: str, status_filter: list[str] | None = None) -> list[tuple]:
|
||
conn = psycopg2.connect(dsn)
|
||
conn.set_client_encoding("UTF8")
|
||
cur = conn.cursor()
|
||
|
||
sql = """
|
||
SELECT id, assistant_id, member_id, task_type, status, priority_score,
|
||
created_at::date, updated_at::date, completed_at::date, completion_type
|
||
FROM biz.coach_tasks
|
||
"""
|
||
if status_filter:
|
||
placeholders = ",".join(["%s"] * len(status_filter))
|
||
sql += f" WHERE status IN ({placeholders})"
|
||
cur.execute(sql, status_filter)
|
||
else:
|
||
cur.execute(sql)
|
||
|
||
rows = cur.fetchall()
|
||
conn.close()
|
||
return rows
|
||
|
||
|
||
# -- 报告生成 --
|
||
|
||
def generate_report(etl: dict, tasks: list[tuple]) -> str:
|
||
asst_names = etl["asst_names"]
|
||
asst_left = etl["asst_left"]
|
||
member_names = etl["member_names"]
|
||
member_mobiles = etl["member_mobiles"]
|
||
ri_map = etl["ri"]
|
||
wbi_map = etl["wbi"]
|
||
wbi_status = etl["wbi_status"]
|
||
nci_map = etl["nci"]
|
||
last_visit = etl["last_visit"]
|
||
|
||
L = [] # 行缓冲
|
||
L.append("# 助教业务任务明细报告")
|
||
L.append("")
|
||
L.append(f"> 导出日期:{date.today()}")
|
||
L.append("")
|
||
|
||
# -- 指标说明 --
|
||
L.append("## 指标说明")
|
||
L.append("")
|
||
L.append("| 指标 | 全称 | 含义 | 范围 |")
|
||
L.append("|------|------|------|------|")
|
||
L.append("| **WBI** | 流失回赢指数 | 老客流失风险,越高越需要召回 | 0-10 |")
|
||
L.append("| **NCI** | 新客转化指数 | 新客沉默风险,越高越需要跟进 | 0-10 |")
|
||
L.append("| **RS** | 关系强度指数 | 助教与客户亲密度(频次+时长+近期性) | 0-10 |")
|
||
L.append("| **OS** | 归属份额标签 | 客户主要由谁服务 | MAIN / COMANAGE / POOL |")
|
||
L.append("| **MS** | 升温动量 | 近期频率 vs 长期频率 | 0-10 |")
|
||
L.append("| **ML** | 付费关联 | 人工台账充值关联强度 | 0-10 |")
|
||
L.append("")
|
||
L.append("**OS**:MAIN=主服务(>=70%) | COMANAGE=共管(>=30%) | POOL=池内")
|
||
L.append("")
|
||
L.append("**任务判定**:`max(WBI,NCI)>7` -> 高优先召回 | `>5` -> 优先召回 | `1<RS<6` -> 关系构建 | 客户到店 -> 完成召回 -> 生成回访(48h)")
|
||
L.append("")
|
||
|
||
# -- 一、总览 --
|
||
status_count = defaultdict(int)
|
||
type_status = defaultdict(lambda: defaultdict(int))
|
||
for t in tasks:
|
||
status_count[t[4]] += 1
|
||
type_status[t[3]][t[4]] += 1
|
||
|
||
L.append("## 一、总览")
|
||
L.append("")
|
||
L.append("| 状态 | 数量 |")
|
||
L.append("|------|------|")
|
||
for s in ["active", "completed", "resolved", "expired", "inactive"]:
|
||
if status_count[s]:
|
||
L.append(f"| {STATUS_LABELS[s]} | {status_count[s]} |")
|
||
L.append("")
|
||
|
||
L.append("| 任务类型 | 进行中 | 已完成 | 已解除 | 已过期 | 已关闭 |")
|
||
L.append("|----------|--------|--------|--------|--------|--------|")
|
||
for tt in TYPE_ORDER:
|
||
d = type_status[tt]
|
||
L.append(f"| {TYPE_LABELS[tt]} | {d.get('active',0)} | {d.get('completed',0)} | {d.get('resolved',0)} | {d.get('expired',0)} | {d.get('inactive',0)} |")
|
||
L.append("")
|
||
|
||
# ── 二、助教任务详情(按助教 → 任务类型 → 状态排序) ──
|
||
# 按助教分组
|
||
by_asst: dict[int, list[tuple]] = defaultdict(list)
|
||
for t in tasks:
|
||
by_asst[t[1]].append(t)
|
||
|
||
# 助教排序:在职优先,二级按任务数降序
|
||
sorted_assts = sorted(
|
||
by_asst.keys(),
|
||
key=lambda a: (1 if a in asst_left else 0, -len(by_asst[a])),
|
||
)
|
||
|
||
L.append("## 二、助教任务详情")
|
||
L.append("")
|
||
|
||
for aid in sorted_assts:
|
||
atasks = by_asst[aid]
|
||
aname = asst_names.get(aid, str(aid))
|
||
left_tag = "【已离职】" if aid in asst_left else "【在职】"
|
||
active_n = sum(1 for t in atasks if t[4] == "active")
|
||
completed_n = sum(1 for t in atasks if t[4] == "completed")
|
||
|
||
L.append(f"### {aname}{left_tag}(进行中 {active_n} / 已完成 {completed_n} / 共 {len(atasks)})")
|
||
L.append("")
|
||
|
||
# 按任务类型分组
|
||
by_type: dict[str, list[tuple]] = defaultdict(list)
|
||
for t in atasks:
|
||
by_type[t[3]].append(t)
|
||
|
||
for tt in TYPE_ORDER:
|
||
tt_tasks = by_type.get(tt)
|
||
if not tt_tasks:
|
||
continue
|
||
|
||
# 按状态排序:已完成 → 进行中 → 已过期
|
||
tt_tasks.sort(key=lambda t: (STATUS_SORT.get(t[4], 9), -(t[5] or 0)))
|
||
|
||
L.append(f"**{TYPE_LABELS[tt]}**({len(tt_tasks)})")
|
||
L.append("")
|
||
|
||
if tt in ("high_priority_recall", "priority_recall"):
|
||
L.append("| 状态 | 客户 | 手机号 | 优先级 | WBI | NCI | RS | OS | 服务次数 | 末次距今 | 创建 | 更新 | 最后到店 |")
|
||
L.append("|------|------|--------|--------|-----|-----|----|-----|----------|----------|------|------|----------|")
|
||
elif tt == "follow_up_visit":
|
||
L.append("| 状态 | 客户 | 手机号 | RS | OS | 服务次数 | 创建 | 更新 | 最后到店 | 完成/过期 |")
|
||
L.append("|------|------|--------|----|-----|----------|------|------|----------|-----------|")
|
||
else:
|
||
L.append("| 状态 | 客户 | 手机号 | RS | OS | 服务次数 | 末次距今 | MS | ML | 创建 | 更新 | 最后到店 |")
|
||
L.append("|------|------|--------|----|-----|----------|----------|-----|-----|------|------|----------|")
|
||
|
||
for t in tt_tasks:
|
||
tid, _, mid, _, status, prio, created, updated, completed_at, comp_type = t
|
||
mname = member_names.get(mid, str(mid))
|
||
mobile = fmt_mobile(member_mobiles.get(mid))
|
||
st = STATUS_LABELS.get(status, status)
|
||
ri = ri_map.get((aid, mid), {})
|
||
wbi = wbi_map.get(mid)
|
||
nci_raw = nci_map.get(mid)
|
||
# 客户已转老客时,NCI 显示"老客"而非过时的历史高分
|
||
is_old = wbi_status.get(mid) == "OLD"
|
||
nci_str = "老客" if (is_old and nci_raw is not None) else fmt(nci_raw)
|
||
lv = last_visit.get(mid)
|
||
lv_str = str(lv) if lv else "-"
|
||
|
||
if tt in ("high_priority_recall", "priority_recall"):
|
||
L.append(
|
||
f"| {st} | {mname} | {mobile} | {fmt(prio)} | {fmt(wbi)} | {nci_str}"
|
||
f" | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')} | {ri.get('sessions', '-')}"
|
||
f" | {ri.get('days_since', '-')} | {created} | {updated} | {lv_str} |"
|
||
)
|
||
elif tt == "follow_up_visit":
|
||
L.append(
|
||
f"| {st} | {mname} | {mobile} | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')}"
|
||
f" | {ri.get('sessions', '-')} | {created} | {updated} | {lv_str} | {completed_at or '-'} |"
|
||
)
|
||
else:
|
||
L.append(
|
||
f"| {st} | {mname} | {mobile} | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')}"
|
||
f" | {ri.get('sessions', '-')} | {ri.get('days_since', '-')}"
|
||
f" | {fmt(ri.get('ms'))} | {fmt(ri.get('ml'))} | {created} | {updated} | {lv_str} |"
|
||
)
|
||
L.append("")
|
||
|
||
# ── 三、趋势分析 ──
|
||
L.append("## 三、趋势分析")
|
||
L.append("")
|
||
|
||
# 3.1 月度趋势
|
||
L.append("### 3.1 月度创建/完成/过期")
|
||
L.append("")
|
||
L.append("| 月份 | 创建 | 完成 | 过期 |")
|
||
L.append("|------|------|------|------|")
|
||
monthly = defaultdict(lambda: [0, 0, 0])
|
||
for t in tasks:
|
||
m = t[6].replace(day=1)
|
||
monthly[m][0] += 1
|
||
if t[4] == "completed":
|
||
monthly[m][1] += 1
|
||
if t[4] == "expired":
|
||
monthly[m][2] += 1
|
||
for m in sorted(monthly):
|
||
c = monthly[m]
|
||
L.append(f"| {m} | {c[0]} | {c[1]} | {c[2]} |")
|
||
L.append("")
|
||
|
||
# 3.2 召回完成率
|
||
recall_tasks = [t for t in tasks if t[3] in ("high_priority_recall", "priority_recall")]
|
||
if recall_tasks:
|
||
total_recall = len(recall_tasks)
|
||
completed_recall = sum(1 for t in recall_tasks if t[4] == "completed")
|
||
rate = completed_recall / total_recall * 100 if total_recall else 0
|
||
L.append("### 3.2 召回完成率")
|
||
L.append("")
|
||
L.append(f"- 召回任务总数:{total_recall}")
|
||
L.append(f"- 已完成:{completed_recall}")
|
||
L.append(f"- 完成率:**{rate:.1f}%**")
|
||
L.append("")
|
||
|
||
# 3.3 助教任务量排行
|
||
L.append("### 3.3 助教任务量排行(进行中)")
|
||
L.append("")
|
||
L.append("| 排名 | 助教 | 高优先 | 优先 | 回访 | 关系 | 合计 |")
|
||
L.append("|------|------|--------|------|------|------|------|")
|
||
asst_active = defaultdict(lambda: defaultdict(int))
|
||
for t in tasks:
|
||
if t[4] == "active":
|
||
asst_active[t[1]][t[3]] += 1
|
||
rank = 1
|
||
for aid in sorted(asst_active, key=lambda a: (1 if a in asst_left else 0, -sum(asst_active[a].values()))):
|
||
d = asst_active[aid]
|
||
aname = asst_names.get(aid, str(aid))
|
||
left_tag = "【离职】" if aid in asst_left else ""
|
||
total = sum(d.values())
|
||
L.append(
|
||
f"| {rank} | {aname}{left_tag} | {d.get('high_priority_recall',0)}"
|
||
f" | {d.get('priority_recall',0)} | {d.get('follow_up_visit',0)}"
|
||
f" | {d.get('relationship_building',0)} | {total} |"
|
||
)
|
||
rank += 1
|
||
L.append("")
|
||
|
||
return "\n".join(L)
|
||
|
||
|
||
# ── CLI ──
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="助教业务任务明细报告")
|
||
parser.add_argument("--output", "-o", default=str(_ROOT / "docs" / "assistant-task-detail-report.md"))
|
||
parser.add_argument("--status", default=None, help="状态过滤,逗号分隔")
|
||
args = parser.parse_args()
|
||
|
||
pg_dsn = os.environ.get("PG_DSN")
|
||
app_dsn = os.environ.get("APP_DB_DSN")
|
||
if not pg_dsn or not app_dsn:
|
||
print("错误:PG_DSN 和 APP_DB_DSN 必须在 .env 中配置", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
status_filter = [s.strip() for s in args.status.split(",")] if args.status else None
|
||
etl_data = load_etl_data(pg_dsn)
|
||
tasks = load_tasks(app_dsn, status_filter)
|
||
report = generate_report(etl_data, tasks)
|
||
|
||
out_path = Path(args.output)
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
out_path.write_text(report, encoding="utf-8")
|
||
print(f"Done. {report.count(chr(10))+1} lines -> {out_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|