1088 lines
40 KiB
Python
1088 lines
40 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""生成 2025年10-12月 报表(CSV + MD)。
|
||
|
||
输出目录:etl_billiards/docs/table_2025-12-19
|
||
|
||
重要口径(已按需求固化):
|
||
- 门店:site_id=2790685415443269
|
||
- 月份切割:北京时间(+08)当月1日 00:00:00
|
||
- 消费/流水:应付金额(不扣优惠),= 台费(dwd_table_fee_log.ledger_amount) + 助教(dwd_assistant_service_log.ledger_amount) + 商品(dwd_store_goods_sale.ledger_amount)
|
||
- 助教时长:income_seconds(计费秒数)换算小时
|
||
- 优惠:台费调账 dwd_table_fee_adjust.ledger_amount + 会员折扣 dwd_table_fee_log.member_discount_amount
|
||
- 多助教/充值归因:全额复制计入(会导致汇总>门店总额),在表格说明中提示
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import re
|
||
from dataclasses import dataclass
|
||
from decimal import Decimal
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import psycopg2
|
||
import psycopg2.extras
|
||
|
||
|
||
SITE_ID = 2790685415443269
|
||
TZ = "Asia/Shanghai"
|
||
|
||
WIN_OCT = ("2025-10-01 00:00:00+08", "2025-11-01 00:00:00+08")
|
||
WIN_NOV = ("2025-11-01 00:00:00+08", "2025-12-01 00:00:00+08")
|
||
WIN_DEC = ("2025-12-01 00:00:00+08", "2026-01-01 00:00:00+08")
|
||
WIN_ALL = (WIN_OCT[0], WIN_DEC[1])
|
||
|
||
MONTHS = [
|
||
("2025-10", "10月", WIN_OCT),
|
||
("2025-11", "11月", WIN_NOV),
|
||
("2025-12", "12月", WIN_DEC),
|
||
]
|
||
|
||
REPO_ROOT = Path(__file__).resolve().parents[3]
|
||
ENV_PATH = REPO_ROOT / "etl_billiards" / ".env"
|
||
OUT_DIR = Path(__file__).resolve().parent
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SqlBlock:
|
||
title: str
|
||
sql: str
|
||
|
||
|
||
def read_pg_dsn() -> str:
|
||
text = ENV_PATH.read_text(encoding="utf-8")
|
||
m = re.search(r"^PG_DSN=(.+)$", text, re.M)
|
||
if not m:
|
||
raise RuntimeError(f"未在 {ENV_PATH} 中找到 PG_DSN")
|
||
return m.group(1).strip()
|
||
|
||
|
||
def conn():
|
||
return psycopg2.connect(read_pg_dsn(), connect_timeout=10)
|
||
|
||
|
||
def sanitize_filename(name: str) -> str:
|
||
name = name.strip()
|
||
name = re.sub(r"[<>:\"/\\|?*]+", "_", name)
|
||
name = re.sub(r"\s+", " ", name)
|
||
return name
|
||
|
||
|
||
def mask_phone(phone: str | None) -> str:
|
||
if not phone:
|
||
return ""
|
||
digits = re.sub(r"\D", "", phone)
|
||
if len(digits) < 7:
|
||
return phone
|
||
return re.sub(r"^(\d{3})\d{4}(\d+)$", r"\1****\2", digits)
|
||
|
||
|
||
def d(v: Any) -> Decimal:
|
||
if v is None:
|
||
return Decimal("0")
|
||
if isinstance(v, Decimal):
|
||
return v
|
||
return Decimal(str(v))
|
||
|
||
|
||
def fmt_money(v: Any) -> str:
|
||
return f"{d(v):.2f}"
|
||
|
||
|
||
def fmt_hours(v: Any, digits: int = 1) -> str:
|
||
q = Decimal("1").scaleb(-digits)
|
||
return f"{d(v).quantize(q):f}h"
|
||
|
||
|
||
def write_csv(path: Path, title: str, description: str, header_rows: list[list[str]], rows: list[list[Any]]) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
with path.open("w", newline="", encoding="utf-8") as f:
|
||
w = csv.writer(f)
|
||
w.writerow([title])
|
||
w.writerow([description])
|
||
w.writerow([]) # 说明与表头之间空 1 行
|
||
for hr in header_rows:
|
||
w.writerow(hr)
|
||
for r in rows:
|
||
w.writerow(["" if v is None else v for v in r])
|
||
|
||
|
||
def write_md(path: Path, title: str, thinking: str, description: str, sql_blocks: list[SqlBlock]) -> None:
|
||
parts: list[str] = []
|
||
parts.append(f"# {title}\n")
|
||
parts.append("## 思考过程\n")
|
||
parts.append(thinking.strip() + "\n")
|
||
parts.append("\n## 查询说明\n")
|
||
parts.append(description.strip() + "\n")
|
||
parts.append("\n## SQL\n")
|
||
for b in sql_blocks:
|
||
parts.append(f"\n### {b.title}\n")
|
||
parts.append("```sql\n")
|
||
parts.append(b.sql.strip() + "\n")
|
||
parts.append("```\n")
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text("".join(parts), encoding="utf-8")
|
||
|
||
|
||
def fetch_all(cur, sql: str, params: dict[str, Any]) -> list[dict[str, Any]]:
|
||
cur.execute(sql, params)
|
||
return list(cur.fetchall())
|
||
|
||
|
||
def month_case(ts_expr: str) -> str:
|
||
parts = []
|
||
for month_key, _, (ws, we) in MONTHS:
|
||
parts.append(
|
||
f"when {ts_expr} >= '{ws}'::timestamptz and {ts_expr} < '{we}'::timestamptz then '{month_key}'"
|
||
)
|
||
return "case " + " ".join(parts) + " else null end"
|
||
|
||
|
||
def sql_order_base(window_start: str, window_end: str) -> str:
|
||
return f"""
|
||
with base_orders as (
|
||
select
|
||
tfl.order_settle_id,
|
||
max(tfl.member_id) as member_id,
|
||
min(tfl.start_use_time) as order_start_time,
|
||
max(tfl.ledger_end_time) as order_end_time,
|
||
sum(tfl.ledger_amount) as table_amount,
|
||
sum(tfl.real_table_use_seconds) as table_use_seconds
|
||
from billiards_dwd.dwd_table_fee_log tfl
|
||
where tfl.site_id = %(site_id)s
|
||
and coalesce(tfl.is_delete,0) = 0
|
||
and tfl.start_use_time >= '{window_start}'::timestamptz
|
||
and tfl.start_use_time < '{window_end}'::timestamptz
|
||
group by tfl.order_settle_id
|
||
),
|
||
assistant_info as (
|
||
select
|
||
asl.order_settle_id,
|
||
sum(asl.ledger_amount) as assistant_amount,
|
||
min(asl.start_use_time) as assistant_start_time,
|
||
max(asl.last_use_time) as assistant_end_time
|
||
from billiards_dwd.dwd_assistant_service_log asl
|
||
join base_orders bo on bo.order_settle_id = asl.order_settle_id
|
||
where asl.site_id = %(site_id)s
|
||
and coalesce(asl.is_delete,0) = 0
|
||
group by asl.order_settle_id
|
||
),
|
||
goods_amount as (
|
||
select
|
||
g.order_settle_id,
|
||
sum(g.ledger_amount) as goods_amount
|
||
from billiards_dwd.dwd_store_goods_sale g
|
||
join base_orders bo on bo.order_settle_id = g.order_settle_id
|
||
where g.site_id = %(site_id)s
|
||
and coalesce(g.is_delete,0) = 0
|
||
group by g.order_settle_id
|
||
),
|
||
orders as (
|
||
select
|
||
bo.order_settle_id,
|
||
bo.member_id,
|
||
least(bo.order_start_time, coalesce(a.assistant_start_time, bo.order_start_time)) as order_start_time,
|
||
greatest(bo.order_end_time, coalesce(a.assistant_end_time, bo.order_end_time)) as order_end_time,
|
||
bo.table_use_seconds,
|
||
coalesce(bo.table_amount,0) + coalesce(a.assistant_amount,0) + coalesce(g.goods_amount,0) as order_amount
|
||
from base_orders bo
|
||
left join assistant_info a on a.order_settle_id = bo.order_settle_id
|
||
left join goods_amount g on g.order_settle_id = bo.order_settle_id
|
||
)
|
||
"""
|
||
|
||
|
||
def build_finance_discount(cur) -> None:
|
||
title = "2025年10-12月 财务优惠(会员折扣+台费调账)分布"
|
||
desc = (
|
||
"优惠=会员折扣(dwd_table_fee_log.member_discount_amount)+台费调账(dwd_table_fee_adjust.ledger_amount),"
|
||
"按订单归集后汇总到客户(member_id),按订单最早开台时间切月;不含团购抵扣等其它优惠。"
|
||
)
|
||
thinking = "用台费订单为基准关联调账表,再按客户+月份汇总,输出“谁享受了优惠”及金额分布。"
|
||
|
||
sql = f"""
|
||
with base_orders as (
|
||
select
|
||
tfl.order_settle_id,
|
||
max(tfl.member_id) as member_id,
|
||
min(tfl.start_use_time) as order_start_time,
|
||
sum(tfl.member_discount_amount) as member_discount_amount
|
||
from billiards_dwd.dwd_table_fee_log tfl
|
||
where tfl.site_id = %(site_id)s
|
||
and coalesce(tfl.is_delete,0) = 0
|
||
and tfl.start_use_time >= %(window_start)s::timestamptz
|
||
and tfl.start_use_time < %(window_end)s::timestamptz
|
||
group by tfl.order_settle_id
|
||
),
|
||
adjusts as (
|
||
select
|
||
tfa.order_settle_id,
|
||
sum(tfa.ledger_amount) as adjust_amount
|
||
from billiards_dwd.dwd_table_fee_adjust tfa
|
||
join base_orders bo on bo.order_settle_id = tfa.order_settle_id
|
||
where tfa.site_id = %(site_id)s
|
||
and coalesce(tfa.is_delete,0) = 0
|
||
group by tfa.order_settle_id
|
||
)
|
||
, x as (
|
||
select
|
||
bo.member_id,
|
||
{month_case('bo.order_start_time')} as month_key,
|
||
coalesce(bo.member_discount_amount,0) as member_discount_amount,
|
||
coalesce(a.adjust_amount,0) as adjust_amount
|
||
from base_orders bo
|
||
left join adjusts a on a.order_settle_id = bo.order_settle_id
|
||
)
|
||
select
|
||
member_id,
|
||
month_key,
|
||
sum(member_discount_amount) as member_discount_sum,
|
||
sum(adjust_amount) as adjust_sum
|
||
from x
|
||
where month_key is not null
|
||
group by member_id, month_key;
|
||
"""
|
||
|
||
rows = fetch_all(
|
||
cur,
|
||
sql,
|
||
{"site_id": SITE_ID, "window_start": WIN_ALL[0], "window_end": WIN_ALL[1]},
|
||
)
|
||
|
||
member_ids = sorted({r["member_id"] for r in rows if r["member_id"] not in (None, 0)})
|
||
cur.execute(
|
||
"""
|
||
select member_id, nickname, mobile, member_card_grade_name
|
||
from billiards_dwd.dim_member
|
||
where scd2_is_current=1 and member_id = any(%(ids)s)
|
||
""",
|
||
{"ids": member_ids},
|
||
)
|
||
member_map = {r["member_id"]: {"name": (r.get("nickname") or ""), "mobile": (r.get("mobile") or ""), "vip": (r.get("member_card_grade_name") or "")} for r in cur.fetchall()}
|
||
|
||
per_member: dict[int, dict[str, Decimal]] = {}
|
||
for r in rows:
|
||
mid = r["member_id"]
|
||
if not mid or mid == 0:
|
||
continue
|
||
per_member.setdefault(mid, {})
|
||
mk = r["month_key"]
|
||
per_member[mid][f"{mk}_member"] = d(r["member_discount_sum"])
|
||
per_member[mid][f"{mk}_adjust"] = d(r["adjust_sum"])
|
||
|
||
out_rows: list[list[Any]] = []
|
||
for mid, info_d in per_member.items():
|
||
info = member_map.get(mid, {"name": "", "mobile": "", "vip": ""})
|
||
row: list[Any] = [info["name"], mask_phone(info["mobile"]), info["vip"]]
|
||
total = Decimal("0")
|
||
for mk, _, _ in MONTHS:
|
||
md = info_d.get(f"{mk}_member", Decimal("0"))
|
||
ad = info_d.get(f"{mk}_adjust", Decimal("0"))
|
||
row += [fmt_money(md), fmt_money(ad), fmt_money(md + ad)]
|
||
total += md + ad
|
||
row.append(fmt_money(total))
|
||
out_rows.append(row)
|
||
|
||
out_rows.sort(key=lambda r: Decimal(r[-1]), reverse=True)
|
||
|
||
csv_path = OUT_DIR / "财务_优惠分布_2025年10-12月.csv"
|
||
md_path = OUT_DIR / "财务_优惠分布_2025年10-12月.md"
|
||
|
||
write_csv(
|
||
csv_path,
|
||
title,
|
||
desc,
|
||
[
|
||
[
|
||
"客户名称",
|
||
"手机号(脱敏)",
|
||
"VIP卡/等级",
|
||
"10月",
|
||
"10月",
|
||
"10月",
|
||
"11月",
|
||
"11月",
|
||
"11月",
|
||
"12月",
|
||
"12月",
|
||
"12月",
|
||
"10-12月",
|
||
],
|
||
[
|
||
"客户名称",
|
||
"手机号(脱敏)",
|
||
"VIP卡/等级",
|
||
"会员折扣(元)",
|
||
"台费调账(元)",
|
||
"合计优惠(元)",
|
||
"会员折扣(元)",
|
||
"台费调账(元)",
|
||
"合计优惠(元)",
|
||
"会员折扣(元)",
|
||
"台费调账(元)",
|
||
"合计优惠(元)",
|
||
"合计优惠(元)",
|
||
],
|
||
],
|
||
out_rows,
|
||
)
|
||
|
||
write_md(md_path, title, thinking, desc, [SqlBlock("优惠分布(客户+月份)", sql)])
|
||
|
||
|
||
def build_customer_top100(cur) -> list[int]:
|
||
title_total = "2025年10-12月 客户消费能力Top100(总表)"
|
||
title_split = "2025年10-12月 客户消费能力Top100(分表)"
|
||
|
||
desc_total = (
|
||
"消费=台费(dwd_table_fee_log.ledger_amount)+助教(dwd_assistant_service_log.ledger_amount)+商品(dwd_store_goods_sale.ledger_amount),均为应付金额(不扣优惠),以台费订单为基准串联;"
|
||
"充值=充值支付流水(dwd_payment.relate_type=5, pay_status=2, pay_amount>0)按支付时间切月;"
|
||
"储值卡未使用金额=当前有效储值卡余额之和(dim_member_card_account.balance, member_card_type_name='储值卡');"
|
||
"喜爱助教=基础课时长+附加课时长*1.5(income_seconds换算小时),总表按10-12月汇总Top5。"
|
||
)
|
||
desc_split = "与总表同口径;分表仅计算12月喜爱助教Top5,并展示10-12月各月消费/充值。"
|
||
|
||
thinking = (
|
||
"以台费订单为基准汇总三类明细,满足应付金额口径,并输出Top100客户的消费/充值/助教偏好。"
|
||
"按你的要求,先生成评价为空的版本,再在脚本末尾回填评价。"
|
||
)
|
||
|
||
sql_top100 = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + """
|
||
select
|
||
o.member_id,
|
||
sum(o.order_amount) as consume_total,
|
||
count(*) as order_cnt
|
||
from orders o
|
||
where o.member_id is not null and o.member_id <> 0
|
||
group by o.member_id
|
||
order by consume_total desc
|
||
limit 100;
|
||
"""
|
||
|
||
sql_monthly_consume = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + f"""
|
||
, x as (
|
||
select
|
||
o.member_id,
|
||
{month_case('o.order_start_time')} as month_key,
|
||
o.order_amount
|
||
from orders o
|
||
where o.member_id is not null and o.member_id <> 0
|
||
)
|
||
select
|
||
member_id,
|
||
month_key,
|
||
sum(order_amount) as consume_sum
|
||
from x
|
||
where month_key is not null
|
||
group by member_id, month_key;
|
||
"""
|
||
|
||
sql_monthly_recharge = f"""
|
||
with pay as (
|
||
select
|
||
p.pay_time,
|
||
r.member_id,
|
||
p.pay_amount
|
||
from billiards_dwd.dwd_payment p
|
||
join billiards_dwd.dwd_recharge_order r on r.recharge_order_id = p.relate_id
|
||
where p.site_id = %(site_id)s
|
||
and p.relate_type = 5
|
||
and p.pay_status = 2
|
||
and p.pay_amount > 0
|
||
and p.pay_time >= %(window_start)s::timestamptz
|
||
and p.pay_time < %(window_end)s::timestamptz
|
||
)
|
||
, x as (
|
||
select
|
||
member_id,
|
||
{month_case('pay_time')} as month_key,
|
||
pay_amount
|
||
from pay
|
||
)
|
||
select
|
||
member_id,
|
||
month_key,
|
||
sum(pay_amount) as recharge_sum
|
||
from x
|
||
where month_key is not null
|
||
group by member_id, month_key;
|
||
"""
|
||
|
||
sql_fav_all = f"""
|
||
with x as (
|
||
select
|
||
asl.tenant_member_id as member_id,
|
||
asl.nickname as assistant_nickname,
|
||
sum(case when asl.order_assistant_type=1 then asl.income_seconds else asl.income_seconds*1.5 end) / 3600.0 as weighted_hours
|
||
from billiards_dwd.dwd_assistant_service_log asl
|
||
where asl.site_id = %(site_id)s
|
||
and coalesce(asl.is_delete,0)=0
|
||
and asl.tenant_member_id is not null and asl.tenant_member_id <> 0
|
||
and asl.start_use_time >= %(window_start)s::timestamptz
|
||
and asl.start_use_time < %(window_end)s::timestamptz
|
||
group by asl.tenant_member_id, asl.nickname
|
||
),
|
||
ranked as (
|
||
select *, row_number() over(partition by member_id order by weighted_hours desc) as rn
|
||
from x
|
||
)
|
||
select
|
||
member_id,
|
||
string_agg(assistant_nickname || '(' || to_char(round(weighted_hours::numeric, 1), 'FM999999990.0') || 'h)', '、' order by weighted_hours desc) as fav5
|
||
from ranked
|
||
where rn <= 5
|
||
group by member_id;
|
||
"""
|
||
|
||
sql_stored_value = """
|
||
select
|
||
tenant_member_id as member_id,
|
||
sum(balance) as stored_value_balance
|
||
from billiards_dwd.dim_member_card_account
|
||
where scd2_is_current=1
|
||
and coalesce(is_delete,0)=0
|
||
and member_card_type_name='储值卡'
|
||
and tenant_member_id = any(%(ids)s)
|
||
group by tenant_member_id;
|
||
"""
|
||
|
||
# 评价字段:画像查询(仅用于生成评价文本,不影响Top100口径)
|
||
sql_review_profile = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + f"""
|
||
select
|
||
o.member_id,
|
||
count(*) as orders,
|
||
avg(o.order_amount) as avg_order,
|
||
sum(o.table_use_seconds)/3600.0 as play_hours,
|
||
avg(o.table_use_seconds)/3600.0 as avg_play_hours,
|
||
min((o.order_start_time at time zone '{TZ}')::date) as first_visit_day,
|
||
max((o.order_start_time at time zone '{TZ}')::date) as last_visit_day,
|
||
count(distinct (o.order_start_time at time zone '{TZ}')::date) as visit_days,
|
||
sum(case when o.order_start_time >= '{WIN_OCT[0]}'::timestamptz and o.order_start_time < '{WIN_OCT[1]}'::timestamptz then 1 else 0 end) as orders_oct,
|
||
sum(case when o.order_start_time >= '{WIN_NOV[0]}'::timestamptz and o.order_start_time < '{WIN_NOV[1]}'::timestamptz then 1 else 0 end) as orders_nov,
|
||
sum(case when o.order_start_time >= '{WIN_DEC[0]}'::timestamptz and o.order_start_time < '{WIN_DEC[1]}'::timestamptz then 1 else 0 end) as orders_dec
|
||
from orders o
|
||
where o.member_id = any(%(ids)s)
|
||
group by o.member_id;
|
||
"""
|
||
|
||
sql_review_time = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + f"""
|
||
, t as (
|
||
select
|
||
o.member_id,
|
||
extract(hour from (o.order_start_time at time zone '{TZ}'))
|
||
+ extract(minute from (o.order_start_time at time zone '{TZ}'))/60.0
|
||
+ extract(second from (o.order_start_time at time zone '{TZ}'))/3600.0 as arrive_h,
|
||
extract(hour from (o.order_end_time at time zone '{TZ}'))
|
||
+ extract(minute from (o.order_end_time at time zone '{TZ}'))/60.0
|
||
+ extract(second from (o.order_end_time at time zone '{TZ}'))/3600.0 as leave_h_raw
|
||
from orders o
|
||
where o.member_id = any(%(ids)s)
|
||
),
|
||
tt as (
|
||
select
|
||
member_id,
|
||
arrive_h,
|
||
case when leave_h_raw < arrive_h then leave_h_raw + 24 else leave_h_raw end as leave_h
|
||
from t
|
||
)
|
||
select
|
||
member_id,
|
||
avg(arrive_h) as arrive_avg_h,
|
||
percentile_cont(0.5) within group (order by arrive_h) as arrive_med_h,
|
||
avg(leave_h) as leave_avg_h,
|
||
percentile_cont(0.5) within group (order by leave_h) as leave_med_h
|
||
from tt
|
||
group by member_id;
|
||
"""
|
||
|
||
sql_review_pref = """
|
||
select
|
||
tfl.member_id,
|
||
coalesce(tfl.site_table_area_name,'') as site_table_area_name,
|
||
sum(tfl.real_table_use_seconds)/3600.0 as hours
|
||
from billiards_dwd.dwd_table_fee_log tfl
|
||
where tfl.site_id=%(site_id)s and coalesce(tfl.is_delete,0)=0
|
||
and tfl.start_use_time >= %(window_start)s::timestamptz and tfl.start_use_time < %(window_end)s::timestamptz
|
||
and tfl.member_id = any(%(ids)s)
|
||
group by tfl.member_id, site_table_area_name;
|
||
"""
|
||
|
||
sql_review_goods = """
|
||
with base_orders as (
|
||
select order_settle_id, max(member_id) as member_id
|
||
from billiards_dwd.dwd_table_fee_log
|
||
where site_id=%(site_id)s and coalesce(is_delete,0)=0
|
||
and start_use_time >= %(window_start)s::timestamptz and start_use_time < %(window_end)s::timestamptz
|
||
group by order_settle_id
|
||
)
|
||
select
|
||
bo.member_id,
|
||
g.ledger_name,
|
||
sum(g.ledger_count) as qty,
|
||
sum(g.ledger_amount) as amount
|
||
from base_orders bo
|
||
join billiards_dwd.dwd_store_goods_sale g on g.order_settle_id = bo.order_settle_id
|
||
where g.site_id=%(site_id)s and coalesce(g.is_delete,0)=0
|
||
and bo.member_id = any(%(ids)s)
|
||
group by bo.member_id, g.ledger_name;
|
||
"""
|
||
|
||
sql_review_visits = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + f"""
|
||
select
|
||
o.member_id,
|
||
(o.order_start_time at time zone '{TZ}')::date as visit_date,
|
||
count(*) as orders,
|
||
sum(o.order_amount) as amount
|
||
from orders o
|
||
where o.member_id = any(%(ids)s)
|
||
group by o.member_id, visit_date
|
||
order by o.member_id, visit_date;
|
||
"""
|
||
|
||
sql_fav_dec = sql_fav_all.replace("%(window_start)s", "'2025-12-01 00:00:00+08'").replace(
|
||
"%(window_end)s", "'2026-01-01 00:00:00+08'"
|
||
)
|
||
|
||
top100 = fetch_all(cur, sql_top100, {"site_id": SITE_ID})
|
||
top_ids = [r["member_id"] for r in top100]
|
||
|
||
cur.execute(
|
||
"""
|
||
select member_id, nickname, mobile, member_card_grade_name
|
||
from billiards_dwd.dim_member
|
||
where scd2_is_current=1 and member_id = any(%(ids)s)
|
||
""",
|
||
{"ids": top_ids},
|
||
)
|
||
info_map = {r["member_id"]: {"name": (r.get("nickname") or ""), "mobile": (r.get("mobile") or ""), "vip": (r.get("member_card_grade_name") or "")} for r in cur.fetchall()}
|
||
|
||
balances = fetch_all(cur, sql_stored_value, {"ids": top_ids})
|
||
balance_map = {r["member_id"]: d(r["stored_value_balance"]) for r in balances}
|
||
|
||
monthly_consume = fetch_all(cur, sql_monthly_consume, {"site_id": SITE_ID})
|
||
monthly_recharge = fetch_all(cur, sql_monthly_recharge, {"site_id": SITE_ID, "window_start": WIN_ALL[0], "window_end": WIN_ALL[1]})
|
||
fav_all = fetch_all(cur, sql_fav_all, {"site_id": SITE_ID, "window_start": WIN_ALL[0], "window_end": WIN_ALL[1]})
|
||
fav_dec = fetch_all(cur, sql_fav_dec, {"site_id": SITE_ID})
|
||
|
||
consume_map: dict[int, dict[str, Decimal]] = {}
|
||
for r in monthly_consume:
|
||
consume_map.setdefault(r["member_id"], {})[r["month_key"]] = d(r["consume_sum"])
|
||
|
||
recharge_map: dict[int, dict[str, Decimal]] = {}
|
||
for r in monthly_recharge:
|
||
recharge_map.setdefault(r["member_id"], {})[r["month_key"]] = d(r["recharge_sum"])
|
||
|
||
fav_all_map = {r["member_id"]: (r["fav5"] or "") for r in fav_all}
|
||
fav_dec_map = {r["member_id"]: (r["fav5"] or "") for r in fav_dec}
|
||
|
||
rows_total: list[list[Any]] = []
|
||
rows_split: list[list[Any]] = []
|
||
|
||
for idx, r in enumerate(top100, start=1):
|
||
mid = r["member_id"]
|
||
info = info_map.get(mid, {"name": "", "mobile": "", "vip": ""})
|
||
total_rech = sum(recharge_map.get(mid, {}).values()) if recharge_map.get(mid) else Decimal("0")
|
||
rows_total.append([
|
||
idx,
|
||
info["name"],
|
||
mask_phone(info["mobile"]),
|
||
fav_all_map.get(mid, ""),
|
||
fmt_money(r["consume_total"]),
|
||
fmt_money(total_rech),
|
||
fmt_money(balance_map.get(mid, Decimal("0"))),
|
||
"", # 评价后续回填
|
||
])
|
||
|
||
c = consume_map.get(mid, {})
|
||
rc = recharge_map.get(mid, {})
|
||
rows_split.append([
|
||
idx,
|
||
info["name"],
|
||
mask_phone(info["mobile"]),
|
||
fav_dec_map.get(mid, ""),
|
||
fmt_money(c.get("2025-12", Decimal("0"))),
|
||
fmt_money(rc.get("2025-12", Decimal("0"))),
|
||
fmt_money(c.get("2025-11", Decimal("0"))),
|
||
fmt_money(rc.get("2025-11", Decimal("0"))),
|
||
fmt_money(c.get("2025-10", Decimal("0"))),
|
||
fmt_money(rc.get("2025-10", Decimal("0"))),
|
||
])
|
||
|
||
csv_total = OUT_DIR / "客户_Top100_2025年10-12月_总表.csv"
|
||
md_total = OUT_DIR / "客户_Top100_2025年10-12月_总表.md"
|
||
write_csv(
|
||
csv_total,
|
||
title_total,
|
||
desc_total,
|
||
[
|
||
["排名", "客户名称", "电话号码", "10月-12月", "10月-12月", "10月-12月", "当前", "评价"],
|
||
["排名", "客户名称", "电话号码", "喜爱助教昵称", "总消费(元)", "总充值(元)", "储值卡未使用金额(元)", "评价"],
|
||
],
|
||
rows_total,
|
||
)
|
||
write_md(
|
||
md_total,
|
||
title_total,
|
||
thinking,
|
||
desc_total,
|
||
[
|
||
SqlBlock("Top100(按消费总额)", sql_top100),
|
||
SqlBlock("按月消费汇总", sql_monthly_consume),
|
||
SqlBlock("按月充值汇总", sql_monthly_recharge),
|
||
SqlBlock("储值卡未使用金额(当前余额汇总)", sql_stored_value),
|
||
SqlBlock("喜爱助教Top5(10-12月)", sql_fav_all),
|
||
SqlBlock("评价画像:订单/时长/到店日期", sql_review_profile),
|
||
SqlBlock("评价画像:到店/离店时间(小时)", sql_review_time),
|
||
SqlBlock("评价画像:球台分区偏好(按时长)", sql_review_pref),
|
||
SqlBlock("评价画像:商品明细(名称+数量)", sql_review_goods),
|
||
SqlBlock("评价画像:到店日期明细(用于周期/近期分析)", sql_review_visits),
|
||
],
|
||
)
|
||
|
||
csv_split = OUT_DIR / "客户_Top100_2025年10-12月_分表.csv"
|
||
md_split = OUT_DIR / "客户_Top100_2025年10-12月_分表.md"
|
||
write_csv(
|
||
csv_split,
|
||
title_split,
|
||
desc_split,
|
||
[
|
||
["排名", "客户名称", "电话号码", "12月", "12月", "12月", "11月", "11月", "10月", "10月"],
|
||
["排名", "客户名称", "电话号码", "喜爱助教昵称", "消费(元)", "充值(元)", "消费(元)", "充值(元)", "消费(元)", "充值(元)"],
|
||
],
|
||
rows_split,
|
||
)
|
||
write_md(
|
||
md_split,
|
||
title_split,
|
||
thinking,
|
||
desc_split,
|
||
[
|
||
SqlBlock("Top100(按消费总额)", sql_top100),
|
||
SqlBlock("按月消费汇总", sql_monthly_consume),
|
||
SqlBlock("按月充值汇总", sql_monthly_recharge),
|
||
SqlBlock("喜爱助教Top5(仅12月)", sql_fav_dec),
|
||
],
|
||
)
|
||
|
||
return top_ids
|
||
|
||
|
||
def backfill_customer_reviews(cur, top_ids: list[int]) -> None:
|
||
def csv_safe_text(text: str) -> str:
|
||
return (text or "").replace(",", ",").replace("\r", " ").replace("\n", " ").strip()
|
||
|
||
def fmt_clock_time(v: Any, *, prefix_next_day: bool) -> str:
|
||
if v is None:
|
||
return ""
|
||
minutes = int((d(v) * 60).to_integral_value(rounding="ROUND_HALF_UP"))
|
||
is_next_day = minutes >= 24 * 60
|
||
minutes = minutes % (24 * 60)
|
||
hh = minutes // 60
|
||
mm = minutes % 60
|
||
base = f"{hh:02d}:{mm:02d}"
|
||
if is_next_day and prefix_next_day:
|
||
return f"次日{base}"
|
||
return base
|
||
|
||
def norm_area(name: str | None) -> str:
|
||
s = (name or "").strip()
|
||
if not s:
|
||
return "未知"
|
||
m = re.search(r"([A-Z])区", s)
|
||
if m:
|
||
return f"{m.group(1)}区"
|
||
if "斯诺克" in s:
|
||
return "S区/斯诺克"
|
||
if "麻将" in s:
|
||
return "麻将"
|
||
if "K包" in s or "VIP" in s or "包厢" in s:
|
||
return "包厢"
|
||
if "团建" in s:
|
||
return "团建"
|
||
return s
|
||
|
||
# 画像(订单维度:订单、时长、到离店时间、到店日期分布)
|
||
sql_profile = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + f"""
|
||
, order_ext as (
|
||
select
|
||
o.*,
|
||
extract(epoch from (o.order_end_time - o.order_start_time)) as stay_seconds
|
||
from orders o
|
||
)
|
||
select
|
||
o.member_id,
|
||
count(*) as orders,
|
||
avg(o.order_amount) as avg_order,
|
||
sum(o.table_use_seconds)/3600.0 as play_hours,
|
||
avg(o.table_use_seconds)/3600.0 as avg_play_hours,
|
||
sum(case when o.order_start_time >= '{WIN_DEC[0]}'::timestamptz and o.order_start_time < '{WIN_DEC[1]}'::timestamptz then coalesce(o.table_use_seconds,0) else 0 end)/3600.0 as play_hours_dec,
|
||
min((o.order_start_time at time zone '{TZ}')::date) as first_visit_day,
|
||
max((o.order_start_time at time zone '{TZ}')::date) as last_visit_day,
|
||
count(distinct (o.order_start_time at time zone '{TZ}')::date) as visit_days,
|
||
count(distinct case when o.order_start_time >= '{WIN_DEC[0]}'::timestamptz and o.order_start_time < '{WIN_DEC[1]}'::timestamptz then (o.order_start_time at time zone '{TZ}')::date else null end) as visit_days_dec,
|
||
sum(case when o.order_start_time >= '{WIN_OCT[0]}'::timestamptz and o.order_start_time < '{WIN_OCT[1]}'::timestamptz then 1 else 0 end) as orders_oct,
|
||
sum(case when o.order_start_time >= '{WIN_NOV[0]}'::timestamptz and o.order_start_time < '{WIN_NOV[1]}'::timestamptz then 1 else 0 end) as orders_nov,
|
||
sum(case when o.order_start_time >= '{WIN_DEC[0]}'::timestamptz and o.order_start_time < '{WIN_DEC[1]}'::timestamptz then 1 else 0 end) as orders_dec,
|
||
sum(coalesce(o.stay_seconds,0))/3600.0 as stay_hours_total,
|
||
sum(case when o.order_start_time >= '{WIN_DEC[0]}'::timestamptz and o.order_start_time < '{WIN_DEC[1]}'::timestamptz then coalesce(o.stay_seconds,0) else 0 end)/3600.0 as stay_hours_dec
|
||
from order_ext o
|
||
where o.member_id = any(%(ids)s)
|
||
group by o.member_id;
|
||
"""
|
||
|
||
sql_time = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + f"""
|
||
, t as (
|
||
select
|
||
o.member_id,
|
||
extract(hour from (o.order_start_time at time zone '{TZ}'))
|
||
+ extract(minute from (o.order_start_time at time zone '{TZ}'))/60.0
|
||
+ extract(second from (o.order_start_time at time zone '{TZ}'))/3600.0 as arrive_h,
|
||
extract(hour from (o.order_end_time at time zone '{TZ}'))
|
||
+ extract(minute from (o.order_end_time at time zone '{TZ}'))/60.0
|
||
+ extract(second from (o.order_end_time at time zone '{TZ}'))/3600.0 as leave_h_raw
|
||
from orders o
|
||
where o.member_id = any(%(ids)s)
|
||
),
|
||
tt as (
|
||
select
|
||
member_id,
|
||
arrive_h,
|
||
case when leave_h_raw < arrive_h then leave_h_raw + 24 else leave_h_raw end as leave_h
|
||
from t
|
||
)
|
||
select
|
||
member_id,
|
||
avg(arrive_h) as arrive_avg_h,
|
||
percentile_cont(0.5) within group (order by arrive_h) as arrive_med_h,
|
||
avg(leave_h) as leave_avg_h,
|
||
percentile_cont(0.5) within group (order by leave_h) as leave_med_h
|
||
from tt
|
||
group by member_id;
|
||
"""
|
||
|
||
prof = fetch_all(cur, sql_profile, {"site_id": SITE_ID, "ids": top_ids})
|
||
prof_map = {r["member_id"]: r for r in prof}
|
||
|
||
times = fetch_all(cur, sql_time, {"site_id": SITE_ID, "ids": top_ids})
|
||
time_map = {r["member_id"]: r for r in times}
|
||
|
||
# 偏好(按球台分区小时)
|
||
sql_pref = f"""
|
||
select
|
||
tfl.member_id,
|
||
coalesce(tfl.site_table_area_name,'') as site_table_area_name,
|
||
sum(tfl.real_table_use_seconds)/3600.0 as hours
|
||
from billiards_dwd.dwd_table_fee_log tfl
|
||
where tfl.site_id=%(site_id)s and coalesce(tfl.is_delete,0)=0
|
||
and tfl.start_use_time >= %(window_start)s::timestamptz and tfl.start_use_time < %(window_end)s::timestamptz
|
||
and tfl.member_id = any(%(ids)s)
|
||
group by tfl.member_id, site_table_area_name;
|
||
"""
|
||
|
||
prefs = fetch_all(
|
||
cur,
|
||
sql_pref,
|
||
{"site_id": SITE_ID, "window_start": WIN_ALL[0], "window_end": WIN_ALL[1], "ids": top_ids},
|
||
)
|
||
pref_map: dict[int, list[dict[str, Any]]] = {}
|
||
for r in prefs:
|
||
pref_map.setdefault(r["member_id"], []).append(r)
|
||
|
||
# 商品(名称+数量)
|
||
sql_food = """
|
||
with base_orders as (
|
||
select order_settle_id, max(member_id) as member_id
|
||
from billiards_dwd.dwd_table_fee_log
|
||
where site_id=%(site_id)s and coalesce(is_delete,0)=0
|
||
and start_use_time >= %(window_start)s::timestamptz and start_use_time < %(window_end)s::timestamptz
|
||
group by order_settle_id
|
||
)
|
||
select
|
||
bo.member_id,
|
||
g.ledger_name,
|
||
sum(g.ledger_count) as qty,
|
||
sum(g.ledger_amount) as amount
|
||
from base_orders bo
|
||
join billiards_dwd.dwd_store_goods_sale g on g.order_settle_id = bo.order_settle_id
|
||
where g.site_id=%(site_id)s and coalesce(g.is_delete,0)=0
|
||
and bo.member_id = any(%(ids)s)
|
||
group by bo.member_id, g.ledger_name;
|
||
"""
|
||
|
||
foods = fetch_all(
|
||
cur,
|
||
sql_food,
|
||
{"site_id": SITE_ID, "window_start": WIN_ALL[0], "window_end": WIN_ALL[1], "ids": top_ids},
|
||
)
|
||
food_map: dict[int, list[dict[str, Any]]] = {}
|
||
for r in foods:
|
||
food_map.setdefault(r["member_id"], []).append(r)
|
||
|
||
sql_visits = sql_order_base(WIN_ALL[0], WIN_ALL[1]) + f"""
|
||
select
|
||
o.member_id,
|
||
(o.order_start_time at time zone '{TZ}')::date as visit_date,
|
||
count(*) as orders,
|
||
sum(o.order_amount) as amount
|
||
from orders o
|
||
where o.member_id = any(%(ids)s)
|
||
group by o.member_id, visit_date
|
||
order by o.member_id, visit_date;
|
||
"""
|
||
visits = fetch_all(cur, sql_visits, {"site_id": SITE_ID, "ids": top_ids})
|
||
visit_map: dict[int, list[dict[str, Any]]] = {}
|
||
for r in visits:
|
||
visit_map.setdefault(r["member_id"], []).append(r)
|
||
|
||
# 喜爱助教(10-12月)
|
||
sql_fav = f"""
|
||
with x as (
|
||
select
|
||
asl.tenant_member_id as member_id,
|
||
asl.nickname as assistant_nickname,
|
||
sum(case when asl.order_assistant_type=1 then asl.income_seconds else asl.income_seconds*1.5 end) / 3600.0 as weighted_hours
|
||
from billiards_dwd.dwd_assistant_service_log asl
|
||
where asl.site_id = %(site_id)s
|
||
and coalesce(asl.is_delete,0)=0
|
||
and asl.tenant_member_id = any(%(ids)s)
|
||
and asl.start_use_time >= %(window_start)s::timestamptz
|
||
and asl.start_use_time < %(window_end)s::timestamptz
|
||
group by asl.tenant_member_id, asl.nickname
|
||
),
|
||
ranked as (
|
||
select *, row_number() over(partition by member_id order by weighted_hours desc) as rn
|
||
from x
|
||
)
|
||
select
|
||
member_id,
|
||
string_agg(assistant_nickname || '(' || to_char(round(weighted_hours::numeric, 1), 'FM999999990.0') || 'h)', '、' order by weighted_hours desc) as fav5
|
||
from ranked
|
||
where rn <= 5
|
||
group by member_id;
|
||
"""
|
||
|
||
favs = fetch_all(cur, sql_fav, {"site_id": SITE_ID, "window_start": WIN_ALL[0], "window_end": WIN_ALL[1], "ids": top_ids})
|
||
fav_map = {r["member_id"]: (r["fav5"] or "") for r in favs}
|
||
|
||
# 到店周期中位(天)与排名:以 Top100 为比较集合;值越小表示到店越频繁,排名越靠前
|
||
gap_med_all: dict[int, int] = {}
|
||
gap_med_dec: dict[int, int] = {}
|
||
for mid in top_ids:
|
||
vrows = visit_map.get(mid, [])
|
||
dates_all = [r.get("visit_date") for r in vrows if r.get("visit_date")]
|
||
gaps_all: list[int] = []
|
||
for d1, d2 in zip(dates_all, dates_all[1:]):
|
||
gaps_all.append((d2 - d1).days)
|
||
if gaps_all:
|
||
gaps_sorted = sorted(gaps_all)
|
||
gap_med_all[mid] = gaps_sorted[len(gaps_sorted) // 2]
|
||
|
||
dates_dec = [d for d in dates_all if d.year == 2025 and d.month == 12]
|
||
gaps_dec: list[int] = []
|
||
for d1, d2 in zip(dates_dec, dates_dec[1:]):
|
||
gaps_dec.append((d2 - d1).days)
|
||
if gaps_dec:
|
||
gaps_sorted = sorted(gaps_dec)
|
||
gap_med_dec[mid] = gaps_sorted[len(gaps_sorted) // 2]
|
||
|
||
def dense_rank(values: list[tuple[int, Any]], *, reverse: bool) -> dict[int, int]:
|
||
sorted_vals = sorted(values, key=lambda kv: kv[1], reverse=reverse)
|
||
out: dict[int, int] = {}
|
||
last_v = None
|
||
rank = 0
|
||
for mid, v in sorted_vals:
|
||
if last_v is None or v != last_v:
|
||
rank += 1
|
||
last_v = v
|
||
out[mid] = rank
|
||
return out
|
||
|
||
gap_rank_all = dense_rank([(mid, v) for mid, v in gap_med_all.items()], reverse=False)
|
||
gap_rank_dec = dense_rank([(mid, v) for mid, v in gap_med_dec.items()], reverse=False)
|
||
|
||
# 在店时长:用台费使用时长(real_table_use_seconds 汇总到 orders.table_use_seconds)作为“在店时长”近似
|
||
play_rank_all = dense_rank(
|
||
[(mid, d(prof_map.get(mid, {}).get("play_hours"))) for mid in top_ids if d(prof_map.get(mid, {}).get("play_hours")) > 0],
|
||
reverse=True,
|
||
)
|
||
play_rank_dec = dense_rank(
|
||
[(mid, d(prof_map.get(mid, {}).get("play_hours_dec"))) for mid in top_ids if d(prof_map.get(mid, {}).get("play_hours_dec")) > 0],
|
||
reverse=True,
|
||
)
|
||
|
||
def build_review(
|
||
mid: int,
|
||
rank: int,
|
||
consume_total: Decimal,
|
||
recharge_total: Decimal,
|
||
stored_value_balance: Decimal,
|
||
) -> str:
|
||
p = prof_map.get(mid) or {}
|
||
orders = int(p.get("orders") or 0)
|
||
if orders <= 0:
|
||
return ""
|
||
avg_order = d(p.get("avg_order"))
|
||
play_h = d(p.get("play_hours"))
|
||
avg_play_h = d(p.get("avg_play_hours"))
|
||
|
||
# 偏好:球台分区Top(按时长)
|
||
areas_raw = pref_map.get(mid, [])
|
||
area_hours: dict[str, Decimal] = {}
|
||
for r in areas_raw:
|
||
label = norm_area(r.get("site_table_area_name"))
|
||
area_hours[label] = area_hours.get(label, Decimal("0")) + d(r.get("hours"))
|
||
area_items = sorted(area_hours.items(), key=lambda kv: kv[1], reverse=True)
|
||
area_total = sum((v for _, v in area_items), Decimal("0"))
|
||
pref_text = ""
|
||
if area_total > 0 and area_items:
|
||
top_parts = []
|
||
for label, hours in area_items[:4]:
|
||
pct = (hours / area_total * 100).quantize(Decimal("1.0"))
|
||
top_parts.append(f"{label}({pct}%)")
|
||
pref_text = "、".join(top_parts)
|
||
|
||
# 时间:到店/离店(小时)
|
||
t = time_map.get(mid) or {}
|
||
time_text = ""
|
||
if t:
|
||
a_avg = fmt_clock_time(t.get("arrive_avg_h"), prefix_next_day=False)
|
||
a_med = fmt_clock_time(t.get("arrive_med_h"), prefix_next_day=False)
|
||
l_avg = fmt_clock_time(t.get("leave_avg_h"), prefix_next_day=True)
|
||
l_med = fmt_clock_time(t.get("leave_med_h"), prefix_next_day=True)
|
||
if a_avg and a_med and l_avg and l_med:
|
||
time_text = f"到店均值{a_avg} 中位{a_med};离店均值{l_avg} 中位{l_med}"
|
||
|
||
# 商品:名称+数量+金额
|
||
foods_ = food_map.get(mid, [])
|
||
foods_.sort(key=lambda r: d(r["amount"]), reverse=True)
|
||
goods_total = sum((d(r["amount"]) for r in foods_), Decimal("0"))
|
||
goods_text = ""
|
||
if foods_:
|
||
parts = []
|
||
for r in foods_[:6]:
|
||
name = csv_safe_text(str(r.get("ledger_name") or ""))
|
||
qty = d(r.get("qty"))
|
||
amt = d(r.get("amount"))
|
||
if not name:
|
||
continue
|
||
qty_i = int(qty) if qty == qty.to_integral_value() else qty
|
||
parts.append(f"{name}×{qty_i}(¥{amt:.2f})")
|
||
if parts:
|
||
goods_text = "、".join(parts)
|
||
if consume_total > 0 and goods_total > 0:
|
||
ratio = (goods_total / consume_total * 100).quantize(Decimal("1.0"))
|
||
goods_text = f"{goods_text}(商品合计¥{goods_total:.2f} 占比{ratio}%)"
|
||
|
||
# 到店周期/近期分布(按到店日期)
|
||
vrows = visit_map.get(mid, [])
|
||
visit_dates = [r.get("visit_date") for r in vrows if r.get("visit_date")]
|
||
last_day = p.get("last_visit_day")
|
||
visit_days = int(p.get("visit_days") or 0)
|
||
visit_days_dec = int(p.get("visit_days_dec") or 0)
|
||
orders_dec = int(p.get("orders_dec") or 0)
|
||
|
||
# 趋势:按月订单数粗略判断(仅Top100客户画像用途)
|
||
orders_oct = int(p.get("orders_oct") or 0)
|
||
orders_nov = int(p.get("orders_nov") or 0)
|
||
orders_dec2 = int(p.get("orders_dec") or 0)
|
||
if orders_dec2 > orders_nov >= orders_oct:
|
||
trend = "10-12月到店频次上升,12月更活跃"
|
||
elif orders_oct > orders_nov > orders_dec2:
|
||
trend = "10-12月到店频次下降,建议重点唤醒"
|
||
elif orders_dec2 >= orders_oct and orders_dec2 >= orders_nov:
|
||
trend = "12月为高峰月,具备加深运营空间"
|
||
else:
|
||
trend = "到店频次波动,建议按常用时段做稳定触达"
|
||
|
||
# 运营建议(尽量简短、可执行)
|
||
advice_parts: list[str] = []
|
||
if stored_value_balance <= 0 and recharge_total > 0:
|
||
advice_parts.append("关注是否为临时充值型,建议引导储值梯度与权益")
|
||
if stored_value_balance > 0 and stored_value_balance < Decimal("500"):
|
||
advice_parts.append("储值余额偏低,建议在其常用时段做补能引导")
|
||
if stored_value_balance >= Decimal("10000"):
|
||
advice_parts.append("储值余额充足,可提供包厢/团建档期与专属权益")
|
||
if pref_text and ("包厢" in pref_text or "团建" in pref_text):
|
||
advice_parts.append("重点维护包厢/团建需求,提前锁档与套餐化")
|
||
if goods_total >= Decimal("2000"):
|
||
advice_parts.append("商品贡献高,可做常购商品补货提醒与组合促销")
|
||
if not advice_parts:
|
||
advice_parts.append("保持常用时段的稳定服务供给,提升复购粘性")
|
||
advice = ";".join(advice_parts[:3])
|
||
|
||
# 综合(按你给的格式组织;排名基于Top100集合)
|
||
gap_rank_all_text = f"#{gap_rank_all.get(mid)}" if gap_rank_all.get(mid) else "—"
|
||
gap_rank_dec_text = f"#{gap_rank_dec.get(mid)}" if gap_rank_dec.get(mid) else "—"
|
||
play_rank_all_text = f"#{play_rank_all.get(mid)}" if play_rank_all.get(mid) else "—"
|
||
play_rank_dec_text = f"#{play_rank_dec.get(mid)}" if play_rank_dec.get(mid) else "—"
|
||
|
||
composite_lines = []
|
||
composite_lines.append(
|
||
f"10-12月到店消费{visit_days}天/{orders}次,到店周期中位{gap_rank_all_text},消费排名#{rank},在店时长{play_rank_all_text}"
|
||
)
|
||
composite_lines.append(
|
||
f"12月到店消费{visit_days_dec}天/{orders_dec}次,到店周期中位{gap_rank_dec_text},消费排名#{rank},在店时长{play_rank_dec_text}"
|
||
)
|
||
if last_day:
|
||
composite_lines.append(f"最近到店{last_day}")
|
||
composite_lines.append(f"趋势:{trend}")
|
||
composite_lines.append(f"围客与客户运营建议:{advice}")
|
||
|
||
parts = [
|
||
f"订单:{orders}单,平均单次¥{avg_order:.2f}",
|
||
f"打球:{fmt_hours(play_h,1)},平均单次{fmt_hours(avg_play_h,1)}",
|
||
]
|
||
if pref_text:
|
||
parts.append(f"偏好:{pref_text}")
|
||
if time_text:
|
||
parts.append(f"时间:{time_text}")
|
||
if goods_text:
|
||
parts.append(f"商品:{goods_text}")
|
||
parts.append("综合:" + ";".join([x for x in composite_lines if x]))
|
||
|
||
return csv_safe_text(";".join([p for p in parts if p]))
|
||
|
||
# 回写 CSV(保留原有总消费/总充值/储值余额/助教偏好,只更新评价列)
|
||
csv_total = OUT_DIR / "客户_Top100_2025年10-12月_总表.csv"
|
||
if not csv_total.exists():
|
||
return
|
||
|
||
lines = csv_total.read_text(encoding="utf-8").splitlines()
|
||
# CSV:前3行是标题、说明、空行;接着2行表头;后面是数据
|
||
head = lines[:5]
|
||
data_lines = lines[5:]
|
||
|
||
# 解析数据行(简单 split,避免引号复杂情况:本脚本生成的行不含逗号)
|
||
new_data = []
|
||
for idx, line in enumerate(data_lines, start=1):
|
||
cols = line.split(",")
|
||
if len(cols) < 8:
|
||
new_data.append(line)
|
||
continue
|
||
# 按排名映射 member_id
|
||
rank = int(cols[0])
|
||
mid = top_ids[rank - 1] if 0 < rank <= len(top_ids) else None
|
||
if mid:
|
||
consume_total = d(cols[4])
|
||
recharge_total = d(cols[5])
|
||
stored_value_balance = d(cols[6])
|
||
cols[7] = build_review(mid, rank, consume_total, recharge_total, stored_value_balance)
|
||
new_data.append(",".join(cols))
|
||
|
||
csv_total.write_text("\n".join(head + new_data) + "\n", encoding="utf-8")
|
||
|
||
|
||
def main() -> None:
|
||
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
with conn() as c, c.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
|
||
build_finance_discount(cur)
|
||
top_ids = build_customer_top100(cur)
|
||
backfill_customer_reviews(cur, top_ids)
|
||
|
||
print(f"完成:{OUT_DIR}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|