# -*- coding: utf-8 -*- """ 流失客户回访报告生成脚本。 功能: - 从 Excel(2025年1-6月)和测试数据库(2025年7月起)提取客户数据 - 按手机号归并,筛选月消费≥3000 且最后到店<2026-04-01 的流失客户 - 生成打印优化的 HTML 报告(汇总首页 + 客户详情页) 用法: cd c:/Project/NeoZQYY python scripts/ops/churned_customer_report.py """ from __future__ import annotations import os import re import sys from collections import defaultdict from datetime import datetime, date from pathlib import Path # 加载环境变量 sys.path.insert(0, str(Path(__file__).resolve().parent)) from _env_paths import ensure_repo_root ensure_repo_root() import pandas as pd import psycopg2 # ─── 配置 ─────────────────────────────────────────────── EXCEL_PATH = Path("tmp/2025年1-6月.xlsx") OUTPUT_PATH = Path("tmp/churned_customer_report.html") PG_DSN = os.environ["PG_DSN"] # 筛选条件 MIN_MONTHLY_CONSUME = 3000 # 任意月消费 ≥ 此值 LAST_VISIT_BEFORE = date(2026, 4, 1) # 最后到店早于此日期 REPORT_DATE = date(2026, 4, 11) # 报告日期(计算距今天数) # 横轴范围 MONTHS = pd.period_range("2025-01", "2026-03", freq="M") MONTH_LABELS = [p.strftime("%Y-%m") for p in MONTHS] # ─── 助教名归一化 ───────────────────────────────────────── def normalize_assistant_name(name: str) -> str: """去掉 Excel 助教名中的编号前缀,如 '38号橙子' → '橙子', '17小爱' → '小爱'。""" if not name or not isinstance(name, str): return str(name) if name else "" name = name.strip() # 去掉前导数字+可选"号" cleaned = re.sub(r"^\d+号?", "", name).strip() return cleaned if cleaned else name # ═══════════════════════════════════════════════════════════ # 第一部分:Excel 数据提取 # ═══════════════════════════════════════════════════════════ def load_excel_data() -> dict: """从 Excel 提取消费、充值、助教服务数据,按手机号组织。""" print("[Excel] 读取文件...") xls = pd.ExcelFile(EXCEL_PATH) # 用户表:ID → 手机号/姓名 df_user = pd.read_excel(xls, sheet_name="用户表") user_id_to_phone = dict(zip(df_user["用户表.ID"], df_user["用户表.手机号"].astype(str))) user_id_to_name = dict(zip(df_user["用户表.ID"], df_user["用户表.姓名"].astype(str))) # 订单表:订单ID → 用户ID(用于助教服务关联) df_orders = pd.read_excel(xls, sheet_name="订单表") order_to_user = dict(zip(df_orders["订单表.ID"], df_orders["订单表.用户 ID"])) # ── 1. 消费数据(流水构成)── df_flow = pd.read_excel(xls, sheet_name="流水构成") df_flow["支付时间"] = pd.to_datetime(df_flow["支付时间"], errors="coerce") df_flow = df_flow[df_flow["用户ID"] > 0].copy() df_flow["手机号"] = df_flow["用户ID"].map(user_id_to_phone) df_flow["月份"] = df_flow["支付时间"].dt.to_period("M").astype(str) # items_sum 口径 df_flow["items_sum"] = ( df_flow["开台台费小计"].fillna(0) + df_flow["商品总额"].fillna(0) + df_flow["助教在店费"].fillna(0) + df_flow["助教超休费"].fillna(0) ) # 按手机号+月份聚合消费 consume_monthly = ( df_flow.groupby(["手机号", "月份"]) .agg( items_sum=("items_sum", "sum"), table_fee=("开台台费小计", "sum"), goods_amount=("商品总额", "sum"), assistant_amount=("助教在店费", lambda x: x.fillna(0).sum() + 0), # 陪打 cx_amount=("助教超休费", lambda x: x.fillna(0).sum()), ) .reset_index() ) # 助教总额 = 陪打 + 超休 consume_monthly["assistant_total"] = ( consume_monthly["assistant_amount"] + consume_monthly["cx_amount"] ) # 到店记录(每笔订单算一次到店) visit_records = [] for _, row in df_flow.iterrows(): if pd.notna(row["支付时间"]) and row.get("手机号"): visit_records.append({ "手机号": row["手机号"], "visit_date": row["支付时间"].date(), "visit_time": row["支付时间"], "items_sum": row["items_sum"], "table_fee": row.get("开台台费小计", 0) or 0, "goods_amount": row.get("商品总额", 0) or 0, "assistant_amount": (row.get("助教在店费", 0) or 0) + (row.get("助教超休费", 0) or 0), "duration_min": 0, # Excel 无精确时长 "assistant_names": [], }) # ── 2. 充值数据 ── df_rc = pd.read_excel(xls, sheet_name="会员卡充值记录表") df_rc["时间"] = pd.to_datetime(df_rc["会员卡充值记录表.时间"], errors="coerce") df_rc["手机号"] = df_rc["会员卡充值记录表.用户 ID"].map(user_id_to_phone) df_rc["月份"] = df_rc["时间"].dt.to_period("M").astype(str) recharge_monthly = ( df_rc.groupby(["手机号", "月份"])["会员卡充值记录表.充值金额"] .sum() .reset_index() .rename(columns={"会员卡充值记录表.充值金额": "recharge_amount"}) ) # ── 3. 助教陪打服务 ── df_ast = pd.read_excel(xls, sheet_name="助教服务记录表") df_ast["用户ID"] = df_ast["助教服务记录表.订单 ID"].map(order_to_user) df_ast = df_ast[df_ast["用户ID"] > 0].copy() df_ast["手机号"] = df_ast["用户ID"].map(user_id_to_phone) df_ast["支付时间"] = pd.to_datetime(df_ast["助教服务记录表.支付时间"], errors="coerce") df_ast["月份"] = df_ast["支付时间"].dt.to_period("M").astype(str) df_ast["助教名"] = df_ast["助教服务记录表.助教用户名"].apply(normalize_assistant_name) df_ast["时长小时"] = df_ast["助教服务记录表.助教时长秒"].fillna(0) / 3600.0 df_ast["加权时长"] = df_ast["时长小时"] * 1.0 # 陪打权重 1.0 # ── 4. 助教超休服务 ── df_cx = pd.read_excel(xls, sheet_name="助教超休记录表") df_cx["用户ID"] = df_cx["助教超休记录表.订单 ID"].map(order_to_user) df_cx = df_cx[df_cx["用户ID"] > 0].copy() df_cx["手机号"] = df_cx["用户ID"].map(user_id_to_phone) df_cx["支付时间"] = pd.to_datetime(df_cx["助教超休记录表.支付时间"], errors="coerce") df_cx["月份"] = df_cx["支付时间"].dt.to_period("M").astype(str) df_cx["助教名"] = df_cx["助教超休记录表.助教用户名"].apply(normalize_assistant_name) df_cx["时长小时"] = df_cx["助教超休记录表.超休小时数"].fillna(0) df_cx["加权时长"] = df_cx["时长小时"] * 1.5 # 超休权重 1.5 # 合并助教服务 ast_all = pd.concat([ df_ast[["手机号", "月份", "助教名", "时长小时", "加权时长"]], df_cx[["手机号", "月份", "助教名", "时长小时", "加权时长"]], ], ignore_index=True) # 客户姓名映射 phone_to_names: dict[str, set] = defaultdict(set) for uid, phone in user_id_to_phone.items(): name = user_id_to_name.get(uid, "") if name and name != "nan": phone_to_names[phone].add(name) # 到店时间偏好(补充助教名到 visit_records) # 为 visit_records 补充助教名 order_assistants: dict[int, list[str]] = defaultdict(list) for _, row in df_ast.iterrows(): oid = row["助教服务记录表.订单 ID"] order_assistants[oid].append(row["助教名"]) for _, row in df_cx.iterrows(): oid = row["助教超休记录表.订单 ID"] order_assistants[oid].append(row["助教名"]) # 重建 visit_records(按订单去重) visit_by_order: dict[tuple, dict] = {} for _, row in df_flow.iterrows(): if pd.isna(row["支付时间"]) or not row.get("手机号"): continue # 用流水构成中的 订单ID 关联助教 oid = row.get("订单ID", 0) key = (row["手机号"], oid if oid else id(row)) ast_names = order_assistants.get(oid, []) visit_by_order[key] = { "手机号": row["手机号"], "visit_date": row["支付时间"].date(), "visit_time": row["支付时间"], "items_sum": row["items_sum"], "table_fee": row.get("开台台费小计", 0) or 0, "goods_amount": row.get("商品总额", 0) or 0, "assistant_amount": (row.get("助教在店费", 0) or 0) + (row.get("助教超休费", 0) or 0), "duration_min": 0, "assistant_names": ast_names, } visit_records = list(visit_by_order.values()) print(f"[Excel] 消费记录: {len(consume_monthly)} 条, 充值记录: {len(recharge_monthly)} 条, " f"助教服务: {len(ast_all)} 条, 到店记录: {len(visit_records)} 条") return { "consume_monthly": consume_monthly, "recharge_monthly": recharge_monthly, "assistant_services": ast_all, "visit_records": visit_records, "phone_to_names": phone_to_names, } # ═══════════════════════════════════════════════════════════ # 第二部分:数据库数据提取 # ═══════════════════════════════════════════════════════════ def load_db_data() -> dict: """从数据库提取消费、充值、助教服务、余额数据。""" print("[DB] 连接数据库...") conn = psycopg2.connect(PG_DSN) cur = conn.cursor() # ── 1. 月度消费(items_sum 口径)── cur.execute(""" SELECT dm.mobile AS phone, DATE_TRUNC('month', sh.pay_time)::date AS month, SUM(COALESCE(sh.table_charge_money,0) + COALESCE(sh.goods_money,0) + COALESCE(sh.assistant_pd_money,0) + COALESCE(sh.assistant_cx_money,0)) AS items_sum, SUM(COALESCE(sh.table_charge_money,0)) AS table_fee, SUM(COALESCE(sh.goods_money,0)) AS goods_amount, SUM(COALESCE(sh.assistant_pd_money,0)) AS assistant_pd, SUM(COALESCE(sh.assistant_cx_money,0)) AS assistant_cx FROM dwd.dwd_settlement_head sh JOIN dwd.dim_member dm ON sh.member_id = dm.member_id AND dm.scd2_is_current = 1 WHERE sh.settle_type IN (1, 3) AND dm.mobile IS NOT NULL AND dm.mobile != '' GROUP BY dm.mobile, DATE_TRUNC('month', sh.pay_time) """) rows = cur.fetchall() consume_monthly = pd.DataFrame(rows, columns=[ "手机号", "month_date", "items_sum", "table_fee", "goods_amount", "assistant_pd", "assistant_cx", ]) consume_monthly["月份"] = consume_monthly["month_date"].apply( lambda d: d.strftime("%Y-%m")) consume_monthly["assistant_total"] = ( consume_monthly["assistant_pd"] + consume_monthly["assistant_cx"] ) # 转 float for col in ["items_sum", "table_fee", "goods_amount", "assistant_pd", "assistant_cx", "assistant_total"]: consume_monthly[col] = consume_monthly[col].astype(float) # ── 2. 到店记录(每笔结算单 = 一次到店)── cur.execute(""" SELECT dm.mobile AS phone, sh.pay_time::date AS visit_date, sh.pay_time AS visit_time, COALESCE(sh.table_charge_money,0) + COALESCE(sh.goods_money,0) + COALESCE(sh.assistant_pd_money,0) + COALESCE(sh.assistant_cx_money,0) AS items_sum, COALESCE(sh.table_charge_money,0) AS table_fee, COALESCE(sh.goods_money,0) AS goods_amount, COALESCE(sh.assistant_pd_money,0) + COALESCE(sh.assistant_cx_money,0) AS assistant_amount, sh.order_settle_id FROM dwd.dwd_settlement_head sh JOIN dwd.dim_member dm ON sh.member_id = dm.member_id AND dm.scd2_is_current = 1 WHERE sh.settle_type IN (1, 3) AND dm.mobile IS NOT NULL AND dm.mobile != '' ORDER BY dm.mobile, sh.pay_time """) visit_rows = cur.fetchall() # 获取每个结算单的助教名 cur.execute(""" SELECT asl.order_settle_id, asl.nickname FROM dwd.dwd_assistant_service_log asl LEFT JOIN dwd.dwd_assistant_service_log_ex ex ON asl.assistant_service_id = ex.assistant_service_id WHERE asl.is_delete = 0 AND (ex.is_trash IS NULL OR ex.is_trash = 0) """) settle_assistants: dict[int, list[str]] = defaultdict(list) for sid, name in cur.fetchall(): if name: settle_assistants[sid].append(name) visit_records = [] for phone, vdate, vtime, isum, tfee, goods, ast_amt, sid in visit_rows: visit_records.append({ "手机号": phone, "visit_date": vdate, "visit_time": vtime, "items_sum": float(isum), "table_fee": float(tfee), "goods_amount": float(goods), "assistant_amount": float(ast_amt), "duration_min": 0, "assistant_names": settle_assistants.get(sid, []), }) # ── 3. 充值数据 ── cur.execute(""" SELECT dm.mobile AS phone, DATE_TRUNC('month', ro.pay_time)::date AS month, SUM(COALESCE(ro.pay_amount, 0)) AS recharge_amount FROM dwd.dwd_recharge_order ro JOIN dwd.dim_member dm ON ro.member_id = dm.member_id AND dm.scd2_is_current = 1 WHERE dm.mobile IS NOT NULL AND dm.mobile != '' GROUP BY dm.mobile, DATE_TRUNC('month', ro.pay_time) """) recharge_rows = cur.fetchall() recharge_monthly = pd.DataFrame(recharge_rows, columns=["手机号", "month_date", "recharge_amount"]) recharge_monthly["月份"] = recharge_monthly["month_date"].apply(lambda d: d.strftime("%Y-%m")) recharge_monthly["recharge_amount"] = recharge_monthly["recharge_amount"].astype(float) # ── 4. 助教服务(带类型区分)── cur.execute(""" SELECT dm.mobile AS phone, DATE_TRUNC('month', asl.create_time)::date AS month, asl.nickname AS assistant_name, asl.order_assistant_type, asl.income_seconds FROM dwd.dwd_assistant_service_log asl JOIN dwd.dim_member dm ON asl.tenant_member_id = dm.member_id AND dm.scd2_is_current = 1 LEFT JOIN dwd.dwd_assistant_service_log_ex ex ON asl.assistant_service_id = ex.assistant_service_id WHERE asl.is_delete = 0 AND (ex.is_trash IS NULL OR ex.is_trash = 0) AND dm.mobile IS NOT NULL AND dm.mobile != '' """) ast_rows = cur.fetchall() ast_data = [] for phone, mdate, aname, atype, secs in ast_rows: hours = (secs or 0) / 3600.0 weight = 1.5 if atype == 2 else 1.0 ast_data.append({ "手机号": phone, "月份": mdate.strftime("%Y-%m"), "助教名": aname or "", "时长小时": hours, "加权时长": hours * weight, }) assistant_services = pd.DataFrame(ast_data) if ast_data else pd.DataFrame( columns=["手机号", "月份", "助教名", "时长小时", "加权时长"]) # ── 5. 客户姓名(dim_member)── cur.execute(""" SELECT mobile, nickname FROM dwd.dim_member WHERE scd2_is_current = 1 AND mobile IS NOT NULL AND mobile != '' """) phone_to_names: dict[str, set] = defaultdict(set) for phone, name in cur.fetchall(): if name: phone_to_names[phone].add(name) # ── 6. 会员卡余额 ── cur.execute(""" SELECT member_mobile, member_card_type_name, member_card_grade_code_name, balance, principal_balance FROM dwd.dim_member_card_account WHERE scd2_is_current = 1 AND member_mobile IS NOT NULL AND member_mobile != '' """) card_info: dict[str, list[dict]] = defaultdict(list) for phone, ctype, grade, bal, pbal in cur.fetchall(): card_info[phone].append({ "card_type": ctype or "", "grade": grade or "", "balance": float(bal or 0), "principal": float(pbal or 0), }) conn.close() print(f"[DB] 消费记录: {len(consume_monthly)} 条, 充值记录: {len(recharge_monthly)} 条, " f"助教服务: {len(assistant_services)} 条, 到店记录: {len(visit_records)} 条") return { "consume_monthly": consume_monthly[["手机号", "月份", "items_sum", "table_fee", "goods_amount", "assistant_total"]], "recharge_monthly": recharge_monthly[["手机号", "月份", "recharge_amount"]], "assistant_services": assistant_services, "visit_records": visit_records, "phone_to_names": phone_to_names, "card_info": card_info, } # ═══════════════════════════════════════════════════════════ # 第三部分:数据合并与筛选 # ═══════════════════════════════════════════════════════════ def merge_and_filter(excel_data: dict, db_data: dict) -> list[dict]: """合并两个数据源,筛选流失客户,计算所有指标。""" print("[合并] 按手机号合并数据...") # 合并月度消费 consume_all = pd.concat([ excel_data["consume_monthly"][["手机号", "月份", "items_sum", "table_fee", "goods_amount", "assistant_total"]], db_data["consume_monthly"], ], ignore_index=True) consume_all = consume_all.groupby(["手机号", "月份"]).sum(numeric_only=True).reset_index() # 合并充值 recharge_all = pd.concat([ excel_data["recharge_monthly"], db_data["recharge_monthly"], ], ignore_index=True) recharge_all = recharge_all.groupby(["手机号", "月份"]).sum(numeric_only=True).reset_index() # 合并助教服务 ast_all = pd.concat([ excel_data["assistant_services"], db_data["assistant_services"], ], ignore_index=True) # 合并到店记录 visits_all = excel_data["visit_records"] + db_data["visit_records"] # 合并姓名 phone_names: dict[str, set] = defaultdict(set) for src in [excel_data["phone_to_names"], db_data["phone_to_names"]]: for phone, names in src.items(): phone_names[phone].update(names) # 会员卡信息(仅数据库有当前值) card_info = db_data.get("card_info", {}) # ── 筛选:任意月消费 ≥ 3000 ── max_monthly = consume_all.groupby("手机号")["items_sum"].max().reset_index() max_monthly.columns = ["手机号", "max_monthly"] qualified_phones = set( max_monthly[max_monthly["max_monthly"] >= MIN_MONTHLY_CONSUME]["手机号"] ) print(f"[筛选] 任意月消费≥{MIN_MONTHLY_CONSUME}: {len(qualified_phones)} 人") # ── 筛选:最后到店 < LAST_VISIT_BEFORE ── visits_by_phone: dict[str, list[dict]] = defaultdict(list) for v in visits_all: visits_by_phone[v["手机号"]].append(v) last_visit: dict[str, date] = {} for phone, vlist in visits_by_phone.items(): last_visit[phone] = max(v["visit_date"] for v in vlist) churned_phones = set() for phone in qualified_phones: lv = last_visit.get(phone) if lv and lv < LAST_VISIT_BEFORE: churned_phones.add(phone) print(f"[筛选] 最后到店<{LAST_VISIT_BEFORE}: {len(churned_phones)} 人") # ── 构建客户记录 ── customers = [] for phone in churned_phones: # 月度消费序列 c_df = consume_all[consume_all["手机号"] == phone].set_index("月份") monthly_consume = {} for m in MONTH_LABELS: monthly_consume[m] = float(c_df.loc[m, "items_sum"]) if m in c_df.index else 0 # 月度充值序列 r_df = recharge_all[recharge_all["手机号"] == phone].set_index("月份") monthly_recharge = {} for m in MONTH_LABELS: monthly_recharge[m] = float(r_df.loc[m, "recharge_amount"]) if m in r_df.index else 0 # 消费构成 total_table = float(c_df["table_fee"].sum()) if "table_fee" in c_df.columns else 0 total_goods = float(c_df["goods_amount"].sum()) if "goods_amount" in c_df.columns else 0 total_ast = float(c_df["assistant_total"].sum()) if "assistant_total" in c_df.columns else 0 total_consume = sum(monthly_consume.values()) # 评分 max_m = max(monthly_consume.values()) if monthly_consume else 0 active_months = sum(1 for v in monthly_consume.values() if v > 0) avg_m = total_consume / active_months if active_months > 0 else 0 score = max_m * 0.6 + avg_m * 0.4 # 到店统计 vlist = sorted(visits_by_phone.get(phone, []), key=lambda x: x["visit_date"]) total_visits = len(vlist) last_v = last_visit[phone] days_since = (REPORT_DATE - last_v).days avg_ticket = total_consume / total_visits if total_visits > 0 else 0 # 月度到店次数 monthly_visits: dict[str, int] = defaultdict(int) for v in vlist: m = v["visit_date"].strftime("%Y-%m") if m in MONTH_LABELS: monthly_visits[m] += 1 # 到店时段偏好 weekday_count = 0 weekend_count = 0 day_count = 0 night_count = 0 for v in vlist: vt = v["visit_time"] if hasattr(vt, "weekday"): wd = vt.weekday() else: wd = vt.date().weekday() if hasattr(vt, "date") else 0 if wd < 5: weekday_count += 1 else: weekend_count += 1 hour = vt.hour if hasattr(vt, "hour") else 12 if 6 <= hour < 18: day_count += 1 else: night_count += 1 # 最近 3 次到店 recent_3 = vlist[-3:] if len(vlist) >= 3 else vlist[:] recent_3.reverse() # 最近的排前面 # 助教偏好(加权时长) ast_df = ast_all[ast_all["手机号"] == phone].copy() if len(ast_all) > 0 else pd.DataFrame() assistant_monthly: dict[str, dict[str, float]] = defaultdict(lambda: defaultdict(float)) assistant_totals: dict[str, float] = defaultdict(float) if len(ast_df) > 0: for _, row in ast_df.iterrows(): aname = row["助教名"] month = row["月份"] wt = float(row["加权时长"]) assistant_monthly[aname][month] += wt assistant_totals[aname] += wt # 助教按总时长排序 sorted_assistants = sorted(assistant_totals.keys(), key=lambda a: -assistant_totals[a]) # 会员卡 cards = card_info.get(phone, []) total_balance = sum(c["balance"] for c in cards) customers.append({ "phone": phone, "names": " / ".join(sorted(phone_names.get(phone, {"未知"}))), "score": score, "max_monthly": max_m, "avg_monthly": avg_m, "total_consume": total_consume, "total_visits": total_visits, "last_visit": last_v, "days_since": days_since, "avg_ticket": avg_ticket, "monthly_consume": monthly_consume, "monthly_recharge": monthly_recharge, "monthly_visits": {m: monthly_visits.get(m, 0) for m in MONTH_LABELS}, "total_recharge": sum(monthly_recharge.values()), "total_table_fee": total_table, "total_goods": total_goods, "total_assistant": total_ast, "weekday_count": weekday_count, "weekend_count": weekend_count, "day_count": day_count, "night_count": night_count, "recent_3_visits": recent_3, "all_visits": vlist, # 全部到店记录(用于时间段统计) "assistant_monthly": dict(assistant_monthly), "assistant_totals": assistant_totals, "sorted_assistants": sorted_assistants, "cards": cards, "total_balance": total_balance, }) # 按评分降序 customers.sort(key=lambda c: -c["score"]) # 添加排名 for i, c in enumerate(customers): c["rank"] = i + 1 print(f"[结果] 最终流失客户: {len(customers)} 人") return customers # ═══════════════════════════════════════════════════════════ # 第四部分:HTML 报告生成(v5 重新设计排版) # ═══════════════════════════════════════════════════════════ def generate_html(customers: list[dict]) -> str: """生成完整的 HTML 报告。""" def fm(v: float) -> str: return "-" if v == 0 else f"¥{v:,.0f}" def fmn(v: float) -> str: """纯数字格式(无¥前缀,用于非金额场景)""" return "-" if v == 0 else f"{v:,.0f}" def fm2(v: float) -> str: return "-" if v == 0 else f"¥{v:,.2f}" # 横坐标:25/01 02 03 ... 12 26/01 02 03 short_labels = [] for m in MONTH_LABELS: y, mo = m.split("-") if mo == "01": short_labels.append(f"{y[2:]}/{mo}") else: short_labels.append(mo) short_labels_js = str(short_labels) month_nums = [m.split("-")[1].lstrip("0") for m in MONTH_LABELS] # ── 汇总表(去掉评分列)── summary_rows = "" for c in customers: bal = fm(c["total_balance"]) if c["total_balance"] > 0 else "-" summary_rows += ( f"" f"{c['rank']}" f"{c['phone']}" f"{c['names']}" f"{fm(c['max_monthly'])}" f"{fm(c['avg_monthly'])}" f"{fm(c['total_consume'])}" f"{c['total_visits']}" f"{c['last_visit'].strftime('%Y-%m-%d')}" f"{c['days_since']}天" f"{bal}" f"" ) # ── 客户详情页 ── detail_pages = "" for c in customers: consume_data = [round(c["monthly_consume"].get(m, 0), 0) for m in MONTH_LABELS] visit_data = [c["monthly_visits"].get(m, 0) for m in MONTH_LABELS] recharge_data = [round(c["monthly_recharge"].get(m, 0), 0) for m in MONTH_LABELS] # 消费趋势 nz = [(m, v) for m, v in c["monthly_consume"].items() if v > 0] if nz: mx = max(nz, key=lambda x: x[1]) mn = min(nz, key=lambda x: x[1]) h1 = sum(v for m, v in nz if m < "2025-07") h2 = sum(v for m, v in nz if m >= "2025-07") trend = ("上升" if h2 > h1 * 1.2 else ("下降" if h2 < h1 * 0.8 else "平稳")) if h1 > 0 and h2 > 0 else ("仅上半年" if h1 > 0 else "下半年开始") c_note = f"最高月消费 {mx[0]}({fm(mx[1])}元),最低 {mn[0]}({fm(mn[1])}元),整体{trend}" else: c_note = "无消费记录" # 充值说明 nzr = [(m, v) for m, v in c["monthly_recharge"].items() if v > 0] r_note = f"累计充值 {fm(c['total_recharge'])}元,当前卡余额 {fm(c['total_balance'])}元" if nzr else f"无充值记录,当前卡余额 {fm(c['total_balance'])}元" # 时段偏好(2小时时间段,取前3) tv = c["weekday_count"] + c["weekend_count"] if tv > 0: wd_p = c["weekday_count"] / tv * 100 we_p = c["weekend_count"] / tv * 100 day_tag = f"工作日 {c['weekday_count']}次({wd_p:.0f}%) / 周末 {c['weekend_count']}次({we_p:.0f}%)" # 按2小时时间段统计,取前4 slot_counts: dict[str, int] = defaultdict(int) vlist = c.get("all_visits", c["recent_3_visits"]) for v in vlist: vt = v["visit_time"] h = vt.hour if hasattr(vt, "hour") else 12 slot_start = (h // 2) * 2 slot_end = slot_start + 2 slot_key = f"{slot_start}-{slot_end}点" slot_counts[slot_key] += 1 top_slots = sorted(slot_counts.items(), key=lambda x: -x[1])[:4] if top_slots: # 两组一行 pairs = [] for i in range(0, len(top_slots), 2): row_items = [f"{s}:{n}次({n/tv*100:.0f}%)" for s, n in top_slots[i:i+2]] pairs.append(" | ".join(row_items)) time_tag = "
".join(pairs) else: time_tag = "数据不足" else: day_tag = time_tag = "数据不足" # 消费构成 ct = c["total_table_fee"] + c["total_goods"] + c["total_assistant"] pt = c["total_table_fee"] / ct * 100 if ct > 0 else 0 pg = c["total_goods"] / ct * 100 if ct > 0 else 0 pa = c["total_assistant"] / ct * 100 if ct > 0 else 0 # 会员卡 if c["cards"]: ac = [cd for cd in c["cards"] if cd["balance"] > 0] card_rows = "".join( f"{cd['card_type']}{fm2(cd['balance'])}元" for cd in sorted(ac, key=lambda x: -x["balance"]) ) if ac else "余额已清零" else: card_rows = "无会员卡" # 最近3次到店(两列双行:左列=日期时间+金额,右列=构成+助教) recent_html = "" for v in c["recent_3_visits"]: vt = v["visit_time"] d = v["visit_date"].strftime("%Y-%m-%d") t = vt.strftime("%H:%M") if hasattr(vt, "strftime") else "" ast_names = "、".join(v["assistant_names"][:3]) if v["assistant_names"] else "无" parts = [] if v["table_fee"] and v["table_fee"] > 0: parts.append(f"台费{fm(v['table_fee'])}") if v["goods_amount"] and v["goods_amount"] > 0: parts.append(f"商品{fm(v['goods_amount'])}") if v["assistant_amount"] and v["assistant_amount"] > 0: parts.append(f"助教{fm(v['assistant_amount'])}") detail = " · ".join(parts) if parts else "-" recent_html += ( f"" f"
{d} {t}
总消费 {fm(v['items_sum'])}
" f"
{detail}
助教:{ast_names}
" f"" ) if not recent_html: recent_html = "无到店记录" # 助教表格(含合计行) ast_count = len(c["sorted_assistants"]) ast_table_rows = ast_count + 2 # 表头+数据行+合计行 ast_on_page1 = ast_table_rows <= 20 # 含合计行≤20则放第一页 def build_ast_table(assistants): if not assistants: return "

无助教服务记录

" html = "" for mn in month_nums: html += f"" html += "" month_totals = {m: 0.0 for m in MONTH_LABELS} grand_total = 0.0 for aname in assistants: html += f"" for m in MONTH_LABELS: val = c["assistant_monthly"].get(aname, {}).get(m, 0) month_totals[m] += val html += f"" if val > 0 else "" t = c['assistant_totals'][aname] grand_total += t html += f"" # 合计行 html += "" for m in MONTH_LABELS: v = month_totals[m] html += f"" if v > 0 else "" html += f"" html += "
助教{mn}月合计
{aname}{val:.1f}-{t:.1f}
合计{v:.1f}-{grand_total:.1f}
" return html # 计算图表高度(A4 可用约 1030px CSS) # 固定部分:头部35 + 三栏120 + 消费标题24 + 充值标题24 + 助教标题24 # + card间距(5×8=40) + card内padding(5×16=80) + 页padding16 = ~363px page_h = 1030 fixed_h = 363 if ast_on_page1: ast_h = ast_table_rows * 15 # 每行约15px else: ast_h = 25 # 提示文字 chart_total = page_h - fixed_h - ast_h consume_h = max(int(chart_total * 0.63), 130) recharge_h = max(chart_total - consume_h, 80) detail_pages += f"""
#{c['rank']} {c['phone']} {c['names']} 最后到店 {c['last_visit'].strftime('%Y-%m-%d')}({c['days_since']}天前) 累计到店 {c['total_visits']}次 客均消费 {fm(c['avg_ticket'])} 累计消费 {fm(c['total_consume'])}
最近到店明细
{recent_html}
消费画像
台费 {fm(c['total_table_fee'])}
商品 {fm(c['total_goods'])}
助教 {fm(c['total_assistant'])}
按日:{day_tag}
按时:{time_tag}
会员卡余额
{card_rows}
消费动态 消费额 到店次数 {c_note}
充值动态 {r_note}
""" if ast_on_page1: detail_pages += f"""
喜爱的助教 加权等效时长(小时),超休×1.5
{build_ast_table(c["sorted_assistants"])}
""" else: detail_pages += f"""
喜爱的助教

服务过 {c['names']} 的助教有 {ast_count} 人,详情记录在下页。

#{c['rank']} {c['phone']} {c['names']} 喜爱的助教(续) 共{ast_count}人
喜爱的助教 加权等效时长(小时),超休×1.5
{build_ast_table(c["sorted_assistants"])}
""" # ── Chart.js 脚本 ── chart_scripts = "" for c in customers: cd = [round(c["monthly_consume"].get(m, 0), 0) for m in MONTH_LABELS] vd = [c["monthly_visits"].get(m, 0) for m in MONTH_LABELS] rd = [round(c["monthly_recharge"].get(m, 0), 0) for m in MONTH_LABELS] chart_scripts += f""" try {{ var _vd{c['rank']} = {vd}; var ec{c['rank']} = echarts.init(document.getElementById('consume-{c['rank']}')); ec{c['rank']}.setOption({{ grid: {{ left:45, right:55, top:30, bottom:25 }}, xAxis: {{ type:'category', data:{short_labels_js}, axisLabel:{{ fontSize:9 }}, boundaryGap:false }}, yAxis: [ {{ type:'value', name:'消费额(元)', nameTextStyle:{{ fontSize:8 }}, axisLabel:{{ fontSize:8 }}, splitLine:{{ lineStyle:{{ type:'dashed', color:'#eee' }} }} }}, {{ type:'value', name:'到店次数', nameTextStyle:{{ fontSize:8 }}, axisLabel:{{ fontSize:8 }}, splitLine:{{ show:false }} }} ], series: [ {{ name:'消费', type:'line', data:{cd}, smooth:0.3, z:20, zlevel:1, symbol:'circle', symbolSize:6, itemStyle:{{ color:'#e74c3c' }}, lineStyle:{{ color:'#e74c3c', width:2 }}, areaStyle:{{ color:'rgba(231,76,60,0.06)' }}, label:{{ show:true, position:'top', distance:6, backgroundColor:'rgba(255,255,255,0.9)', borderRadius:3, padding:[2,4], rich:{{ a:{{ color:'#c0392b', fontSize:9, fontWeight:'bold', lineHeight:14 }}, b:{{ color:'#2980b9', fontSize:8, lineHeight:12 }} }}, formatter: function(p){{ var v = _vd{c['rank']}[p.dataIndex]; var lines = []; if(p.value>0) lines.push('{{a|¥'+p.value.toLocaleString()+'}}'); if(v>0) lines.push('{{b|'+v+'次}}'); return lines.join('\\n'); }} }}, labelLine:{{ show:false }}, }}, {{ name:'到店', type:'line', data:{vd}, smooth:0.3, yAxisIndex:1, z:10, symbol:'circle', symbolSize:5, itemStyle:{{ color:'#3498db' }}, lineStyle:{{ color:'#3498db', width:1.5, type:'dashed' }}, label:{{ show:false }}, }} ], labelLayout:{{ moveOverlap:'shiftY', hideOverlap:false }}, tooltip:{{ trigger:'axis' }}, }}); }} catch(e){{ console.warn('c{c['rank']}',e); }} try {{ new Chart(document.getElementById('pie-{c['rank']}'), {{ type:'doughnut', data:{{ labels:['台费','商品','助教'], datasets:[{{ data:[{c['total_table_fee']:.0f},{c['total_goods']:.0f},{c['total_assistant']:.0f}], backgroundColor:['#3498db','#2ecc71','#e74c3c'], borderWidth:1 }}] }}, options:{{ responsive:true, maintainAspectRatio:true, animation:false, cutout:'30%', plugins:{{ legend:{{ display:false }}, datalabels:{{ display:function(c){{ return c.parsed>0; }}, color:'#fff', font:{{ size:11, weight:'bold' }}, textAlign:'center', formatter:function(v,c){{ var t=c.dataset.data.reduce(function(a,b){{return a+b}},0); var lbl=c.chart.data.labels[c.dataIndex]; return t>0? lbl+'\\n'+(v/t*100).toFixed(0)+'%' : ''; }} }} }} }}, plugins:[ChartDataLabels] }}); }} catch(e){{ console.warn('p{c['rank']}',e); }} try {{ var er{c['rank']} = echarts.init(document.getElementById('recharge-{c['rank']}')); er{c['rank']}.setOption({{ grid: {{ left:45, right:55, top:25, bottom:20 }}, xAxis: {{ type:'category', data:{short_labels_js}, axisLabel:{{ fontSize:8 }}, boundaryGap:false }}, yAxis: {{ type:'value', axisLabel:{{ fontSize:8 }}, splitLine:{{ lineStyle:{{ type:'dashed', color:'#eee' }} }} }}, series: [{{ name:'充值', type:'line', data:{rd}, smooth:0.3, symbol:'circle', symbolSize:6, itemStyle:{{ color:'#27ae60' }}, lineStyle:{{ color:'#27ae60', width:2 }}, areaStyle:{{ color:'rgba(39,174,96,0.06)' }}, label:{{ show:true, position:'top', color:'#1e8449', fontSize:9, fontWeight:'bold', backgroundColor:'rgba(255,255,255,0.85)', borderRadius:3, padding:[2,4], formatter: function(p){{ return p.value>0? '¥'+p.value.toLocaleString():''; }} }}, labelLine:{{ show:false }}, }}], labelLayout:{{ moveOverlap:'shiftY', hideOverlap:false }}, tooltip:{{ trigger:'axis' }}, }}); }} catch(e){{ console.warn('r{c['rank']}',e); }} """ html = f""" 流失客户回访报告 — {REPORT_DATE.strftime('%Y-%m-%d')}

流失客户回访报告

报告日期:{REPORT_DATE.strftime('%Y-%m-%d')} | 筛选条件:任意月消费 ≥ {MIN_MONTHLY_CONSUME:,} 元 且 最后到店早于 {LAST_VISIT_BEFORE.strftime('%Y-%m-%d')} | 共 {len(customers)} 人
{summary_rows}
#手机号姓名 最高月消费月均消费累计消费 到店最后到店距今卡余额
{detail_pages}
""" return html # ═══════════════════════════════════════════════════════════ # 主入口 # ═══════════════════════════════════════════════════════════ def main(): print("=" * 60) print("流失客户回访报告生成") print("=" * 60) excel_data = load_excel_data() db_data = load_db_data() customers = merge_and_filter(excel_data, db_data) if not customers: print("[警告] 无符合条件的流失客户!") return html = generate_html(customers) OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) OUTPUT_PATH.write_text(html, encoding="utf-8") print(f"\n[完成] 报告已生成: {OUTPUT_PATH.resolve()}") print(f" 共 {len(customers)} 位流失客户") if __name__ == "__main__": main()