Files
Neo-ZQYY/tools/task-analysis/gen_task_report.py
Neo 2a7a5d68aa feat: 2026-04-15~04-20 累积变更基线 — 多主线合流
主线 1: rns1-customer-coach-api + 04-miniapp-core-business 后端实施
  - 新增 GET /xcx/coaches/{id}/banner 轻量接口
  - performance/records 加 coach_id 参数 + view_board_coach 权限分流
  - coach/customer/performance/board/task 服务层重构
  - fdw_queries 结算单粒度聚合 + consumption_summary 视图统一
  - task_generator 回访宽限 72h + UPSERT 替代策略 + Step 5 保底清理
  - recall_detector settle_type=3 双重限制 + 门店级 resolved

主线 2: 小程序权限分流 + 新增 coach-service-records 管理者视角业绩明细页
  - perf-progress 共享模块去重 task-list/coach-detail 动画逻辑
  - isScattered 散客标记端到端
  - foodDetail/phoneFull/creator* 字段透传

主线 3: P19 指数回测框架 Phase 1+2
  - 3 个指数表 stat_date 日快照模式
  - 新增 DWS_INDEX_BACKFILL / DWS_TASK_SIMULATION 工具任务
  - task_engine 升级 HTTP 实时 + 推演回测双模式

主线 4: Core 维度层启用
  - 新增 CORE_DIM_SYNC 任务(DWD → core 4 维度表)
  - 修复 app 视图空查询问题

主线 5: member_project_tag 改为 LAST_30_VISITS 消费次数窗口

主线 6: 2 个迁移 SQL 已执行(stat_date + member_project_tag 新窗口)
  - schema 基线与 DDL 快照同步

主线 7: 开发机路径迁移 C:\NeoZQYY → C:\Project\NeoZQYY(约 95% 改动量)

附带: 新建运维脚本(churned_customer_report / simulate_historical_tasks /
      backfill_index_snapshots)+ tools/task-analysis/ 任务分析工具

合计 157 文件。未包含中间产物(tmp/ .playwright-mcp/ inspect-* excel/sheet 分析 txt)。
审计记录见下一个 commit。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-20 06:32:07 +08:00

391 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
助教业务任务明细报告生成工具
分组结构:指标说明 → 总览 → 按助教分组(内按任务类型分组)→ 趋势分析
用法:
python tools/task-analysis/gen_task_report.py
python tools/task-analysis/gen_task_report.py --output docs/custom-report.md
python tools/task-analysis/gen_task_report.py --status active,completed
"""
from __future__ import annotations
import argparse
import os
import sys
from datetime import date
from collections import defaultdict, OrderedDict
from pathlib import Path
import psycopg2
_TOOL_DIR = Path(__file__).resolve().parent
_ROOT = _TOOL_DIR.parents[1]
from dotenv import load_dotenv
load_dotenv(_ROOT / ".env", override=False)
load_dotenv(_ROOT / "apps" / "etl" / "connectors" / "feiqiu" / ".env", override=False)
# ── 常量 ──
TYPE_ORDER = ["high_priority_recall", "priority_recall", "follow_up_visit", "relationship_building"]
TYPE_LABELS = {
"high_priority_recall": "高优先召回",
"priority_recall": "优先召回",
"follow_up_visit": "客户回访",
"relationship_building": "关系构建",
}
STATUS_LABELS = {
"active": "进行中", "completed": "已完成",
"resolved": "已解除", "expired": "已过期", "inactive": "已关闭",
}
# 排序权重:已完成 -> 进行中 -> 已解除 -> 已过期 -> 已关闭
STATUS_SORT = {"completed": 0, "active": 1, "resolved": 2, "expired": 3, "inactive": 4}
def fmt(v, digits=2):
if v is None:
return "-"
return f"{float(v):.{digits}f}"
def fmt_mobile(m):
if not m:
return "-"
return str(m)
# ── 数据加载 ──
def load_etl_data(dsn: str) -> dict:
conn = psycopg2.connect(dsn)
conn.set_client_encoding("UTF8")
cur = conn.cursor()
# 助教(昵称 + 离职)
cur.execute(
"SELECT assistant_id, nickname, COALESCE(leave_status, 0) "
"FROM dwd.dim_assistant WHERE scd2_is_current = 1"
)
asst_names = {}
asst_left = set()
for r in cur.fetchall():
asst_names[r[0]] = r[1]
if r[2] == 1:
asst_left.add(r[0])
# 客户昵称 + 手机号
cur.execute("SELECT member_id, nickname, mobile FROM dwd.dim_member WHERE scd2_is_current = 1")
member_names = {}
member_mobiles = {}
for r in cur.fetchall():
member_names[r[0]] = r[1]
member_mobiles[r[0]] = r[2]
# relation_index最新快照
cur.execute("""
SELECT DISTINCT ON (assistant_id, member_id)
assistant_id, member_id, rs_display, os_label,
session_count, days_since_last_session, ms_display, ml_display
FROM dws.dws_member_assistant_relation_index
ORDER BY assistant_id, member_id, stat_date DESC
""")
ri_map = {}
for r in cur.fetchall():
ri_map[(r[0], r[1])] = {
"rs": r[2], "os_label": r[3],
"sessions": r[4], "days_since": r[5], "ms": r[6], "ml": r[7],
}
# WBI含 status 用于判断客户是否已转老客)
cur.execute("""
SELECT DISTINCT ON (member_id) member_id, display_score, status
FROM dws.dws_member_winback_index ORDER BY member_id, stat_date DESC
""")
wbi_map = {}
wbi_status_map = {}
for r in cur.fetchall():
wbi_map[r[0]] = r[1]
wbi_status_map[r[0]] = r[2]
# NCI
cur.execute("""
SELECT DISTINCT ON (member_id) member_id, display_score
FROM dws.dws_member_newconv_index ORDER BY member_id, stat_date DESC
""")
nci_map = {r[0]: r[1] for r in cur.fetchall()}
# 客户最后到店时间(最新结算时间)
cur.execute("""
SELECT member_id, MAX(pay_time)::date
FROM dwd.dwd_settlement_head
WHERE settle_type IN (1, 3) AND member_id > 0
GROUP BY member_id
""")
last_visit_map = {r[0]: r[1] for r in cur.fetchall()}
conn.close()
return {
"asst_names": asst_names, "asst_left": asst_left,
"member_names": member_names, "member_mobiles": member_mobiles,
"ri": ri_map,
"wbi": wbi_map, "wbi_status": wbi_status_map,
"nci": nci_map, "last_visit": last_visit_map,
}
def load_tasks(dsn: str, status_filter: list[str] | None = None) -> list[tuple]:
conn = psycopg2.connect(dsn)
conn.set_client_encoding("UTF8")
cur = conn.cursor()
sql = """
SELECT id, assistant_id, member_id, task_type, status, priority_score,
created_at::date, updated_at::date, completed_at::date, completion_type
FROM biz.coach_tasks
"""
if status_filter:
placeholders = ",".join(["%s"] * len(status_filter))
sql += f" WHERE status IN ({placeholders})"
cur.execute(sql, status_filter)
else:
cur.execute(sql)
rows = cur.fetchall()
conn.close()
return rows
# -- 报告生成 --
def generate_report(etl: dict, tasks: list[tuple]) -> str:
asst_names = etl["asst_names"]
asst_left = etl["asst_left"]
member_names = etl["member_names"]
member_mobiles = etl["member_mobiles"]
ri_map = etl["ri"]
wbi_map = etl["wbi"]
wbi_status = etl["wbi_status"]
nci_map = etl["nci"]
last_visit = etl["last_visit"]
L = [] # 行缓冲
L.append("# 助教业务任务明细报告")
L.append("")
L.append(f"> 导出日期:{date.today()}")
L.append("")
# -- 指标说明 --
L.append("## 指标说明")
L.append("")
L.append("| 指标 | 全称 | 含义 | 范围 |")
L.append("|------|------|------|------|")
L.append("| **WBI** | 流失回赢指数 | 老客流失风险,越高越需要召回 | 0-10 |")
L.append("| **NCI** | 新客转化指数 | 新客沉默风险,越高越需要跟进 | 0-10 |")
L.append("| **RS** | 关系强度指数 | 助教与客户亲密度(频次+时长+近期性) | 0-10 |")
L.append("| **OS** | 归属份额标签 | 客户主要由谁服务 | MAIN / COMANAGE / POOL |")
L.append("| **MS** | 升温动量 | 近期频率 vs 长期频率 | 0-10 |")
L.append("| **ML** | 付费关联 | 人工台账充值关联强度 | 0-10 |")
L.append("")
L.append("**OS**MAIN=主服务(>=70%) | COMANAGE=共管(>=30%) | POOL=池内")
L.append("")
L.append("**任务判定**`max(WBI,NCI)>7` -> 高优先召回 | `>5` -> 优先召回 | `1<RS<6` -> 关系构建 | 客户到店 -> 完成召回 -> 生成回访(48h)")
L.append("")
# -- 一、总览 --
status_count = defaultdict(int)
type_status = defaultdict(lambda: defaultdict(int))
for t in tasks:
status_count[t[4]] += 1
type_status[t[3]][t[4]] += 1
L.append("## 一、总览")
L.append("")
L.append("| 状态 | 数量 |")
L.append("|------|------|")
for s in ["active", "completed", "resolved", "expired", "inactive"]:
if status_count[s]:
L.append(f"| {STATUS_LABELS[s]} | {status_count[s]} |")
L.append("")
L.append("| 任务类型 | 进行中 | 已完成 | 已解除 | 已过期 | 已关闭 |")
L.append("|----------|--------|--------|--------|--------|--------|")
for tt in TYPE_ORDER:
d = type_status[tt]
L.append(f"| {TYPE_LABELS[tt]} | {d.get('active',0)} | {d.get('completed',0)} | {d.get('resolved',0)} | {d.get('expired',0)} | {d.get('inactive',0)} |")
L.append("")
# ── 二、助教任务详情(按助教 → 任务类型 → 状态排序) ──
# 按助教分组
by_asst: dict[int, list[tuple]] = defaultdict(list)
for t in tasks:
by_asst[t[1]].append(t)
# 助教排序:在职优先,二级按任务数降序
sorted_assts = sorted(
by_asst.keys(),
key=lambda a: (1 if a in asst_left else 0, -len(by_asst[a])),
)
L.append("## 二、助教任务详情")
L.append("")
for aid in sorted_assts:
atasks = by_asst[aid]
aname = asst_names.get(aid, str(aid))
left_tag = "【已离职】" if aid in asst_left else "【在职】"
active_n = sum(1 for t in atasks if t[4] == "active")
completed_n = sum(1 for t in atasks if t[4] == "completed")
L.append(f"### {aname}{left_tag}(进行中 {active_n} / 已完成 {completed_n} / 共 {len(atasks)}")
L.append("")
# 按任务类型分组
by_type: dict[str, list[tuple]] = defaultdict(list)
for t in atasks:
by_type[t[3]].append(t)
for tt in TYPE_ORDER:
tt_tasks = by_type.get(tt)
if not tt_tasks:
continue
# 按状态排序:已完成 → 进行中 → 已过期
tt_tasks.sort(key=lambda t: (STATUS_SORT.get(t[4], 9), -(t[5] or 0)))
L.append(f"**{TYPE_LABELS[tt]}**{len(tt_tasks)}")
L.append("")
if tt in ("high_priority_recall", "priority_recall"):
L.append("| 状态 | 客户 | 手机号 | 优先级 | WBI | NCI | RS | OS | 服务次数 | 末次距今 | 创建 | 更新 | 最后到店 |")
L.append("|------|------|--------|--------|-----|-----|----|-----|----------|----------|------|------|----------|")
elif tt == "follow_up_visit":
L.append("| 状态 | 客户 | 手机号 | RS | OS | 服务次数 | 创建 | 更新 | 最后到店 | 完成/过期 |")
L.append("|------|------|--------|----|-----|----------|------|------|----------|-----------|")
else:
L.append("| 状态 | 客户 | 手机号 | RS | OS | 服务次数 | 末次距今 | MS | ML | 创建 | 更新 | 最后到店 |")
L.append("|------|------|--------|----|-----|----------|----------|-----|-----|------|------|----------|")
for t in tt_tasks:
tid, _, mid, _, status, prio, created, updated, completed_at, comp_type = t
mname = member_names.get(mid, str(mid))
mobile = fmt_mobile(member_mobiles.get(mid))
st = STATUS_LABELS.get(status, status)
ri = ri_map.get((aid, mid), {})
wbi = wbi_map.get(mid)
nci_raw = nci_map.get(mid)
# 客户已转老客时NCI 显示"老客"而非过时的历史高分
is_old = wbi_status.get(mid) == "OLD"
nci_str = "老客" if (is_old and nci_raw is not None) else fmt(nci_raw)
lv = last_visit.get(mid)
lv_str = str(lv) if lv else "-"
if tt in ("high_priority_recall", "priority_recall"):
L.append(
f"| {st} | {mname} | {mobile} | {fmt(prio)} | {fmt(wbi)} | {nci_str}"
f" | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')} | {ri.get('sessions', '-')}"
f" | {ri.get('days_since', '-')} | {created} | {updated} | {lv_str} |"
)
elif tt == "follow_up_visit":
L.append(
f"| {st} | {mname} | {mobile} | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')}"
f" | {ri.get('sessions', '-')} | {created} | {updated} | {lv_str} | {completed_at or '-'} |"
)
else:
L.append(
f"| {st} | {mname} | {mobile} | {fmt(ri.get('rs'))} | {ri.get('os_label', '-')}"
f" | {ri.get('sessions', '-')} | {ri.get('days_since', '-')}"
f" | {fmt(ri.get('ms'))} | {fmt(ri.get('ml'))} | {created} | {updated} | {lv_str} |"
)
L.append("")
# ── 三、趋势分析 ──
L.append("## 三、趋势分析")
L.append("")
# 3.1 月度趋势
L.append("### 3.1 月度创建/完成/过期")
L.append("")
L.append("| 月份 | 创建 | 完成 | 过期 |")
L.append("|------|------|------|------|")
monthly = defaultdict(lambda: [0, 0, 0])
for t in tasks:
m = t[6].replace(day=1)
monthly[m][0] += 1
if t[4] == "completed":
monthly[m][1] += 1
if t[4] == "expired":
monthly[m][2] += 1
for m in sorted(monthly):
c = monthly[m]
L.append(f"| {m} | {c[0]} | {c[1]} | {c[2]} |")
L.append("")
# 3.2 召回完成率
recall_tasks = [t for t in tasks if t[3] in ("high_priority_recall", "priority_recall")]
if recall_tasks:
total_recall = len(recall_tasks)
completed_recall = sum(1 for t in recall_tasks if t[4] == "completed")
rate = completed_recall / total_recall * 100 if total_recall else 0
L.append("### 3.2 召回完成率")
L.append("")
L.append(f"- 召回任务总数:{total_recall}")
L.append(f"- 已完成:{completed_recall}")
L.append(f"- 完成率:**{rate:.1f}%**")
L.append("")
# 3.3 助教任务量排行
L.append("### 3.3 助教任务量排行(进行中)")
L.append("")
L.append("| 排名 | 助教 | 高优先 | 优先 | 回访 | 关系 | 合计 |")
L.append("|------|------|--------|------|------|------|------|")
asst_active = defaultdict(lambda: defaultdict(int))
for t in tasks:
if t[4] == "active":
asst_active[t[1]][t[3]] += 1
rank = 1
for aid in sorted(asst_active, key=lambda a: (1 if a in asst_left else 0, -sum(asst_active[a].values()))):
d = asst_active[aid]
aname = asst_names.get(aid, str(aid))
left_tag = "【离职】" if aid in asst_left else ""
total = sum(d.values())
L.append(
f"| {rank} | {aname}{left_tag} | {d.get('high_priority_recall',0)}"
f" | {d.get('priority_recall',0)} | {d.get('follow_up_visit',0)}"
f" | {d.get('relationship_building',0)} | {total} |"
)
rank += 1
L.append("")
return "\n".join(L)
# ── CLI ──
def main():
parser = argparse.ArgumentParser(description="助教业务任务明细报告")
parser.add_argument("--output", "-o", default=str(_ROOT / "docs" / "assistant-task-detail-report.md"))
parser.add_argument("--status", default=None, help="状态过滤,逗号分隔")
args = parser.parse_args()
pg_dsn = os.environ.get("PG_DSN")
app_dsn = os.environ.get("APP_DB_DSN")
if not pg_dsn or not app_dsn:
print("错误PG_DSN 和 APP_DB_DSN 必须在 .env 中配置", file=sys.stderr)
sys.exit(1)
status_filter = [s.strip() for s in args.status.split(",")] if args.status else None
etl_data = load_etl_data(pg_dsn)
tasks = load_tasks(app_dsn, status_filter)
report = generate_report(etl_data, tasks)
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(report, encoding="utf-8")
print(f"Done. {report.count(chr(10))+1} lines -> {out_path}")
if __name__ == "__main__":
main()