341 lines
12 KiB
Python
341 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
统计“忠实台球类竞技客户”Top N(按平均单次日打球小时数排序)。
|
||
|
||
口径(默认):
|
||
- 时间范围:2025-10-01 至今(可传参覆盖)
|
||
- 仅统计 member_id != 0 且有时长的客户
|
||
- 台桌区域仅包含:A区、B区、C区、VIP包厢、斯诺克区、TV台
|
||
- 同日同一客户若同时开多台:按时间区间并集计时(不重复叠加)
|
||
- “单次日打球小时数” = (时间范围内每日打球总小时数) / (有打球的日数)
|
||
- “到店平均间(天)” = 相邻来店日的平均间隔天数(来店日数<2 则为空)
|
||
- “10-12月来店次数” = 时间范围内来店日数(按日去重)
|
||
- “最后一次到店日期” = 该客户最后一次开台的 start_use_time(精确到分钟)
|
||
|
||
输出:CSV(UTF-8-SIG,便于 Excel 打开)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
from dataclasses import dataclass
|
||
from datetime import date, datetime, time, timedelta
|
||
from pathlib import Path
|
||
from typing import Iterable
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from config.settings import AppConfig
|
||
from database.connection import DatabaseConnection
|
||
|
||
|
||
DEFAULT_AREAS = ("A区", "B区", "C区", "VIP包厢", "斯诺克区", "TV台")
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class SessionRow:
|
||
member_id: int
|
||
start: datetime
|
||
end: datetime
|
||
area_name: str | None
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class MemberAgg:
|
||
member_id: int
|
||
total_hours: float
|
||
visit_days: int
|
||
avg_daily_hours: float
|
||
avg_gap_days: float | None
|
||
last_visit_at: datetime | None
|
||
|
||
|
||
def _as_tz(dt: datetime, tz: ZoneInfo) -> datetime:
|
||
if dt.tzinfo is None:
|
||
return dt.replace(tzinfo=tz)
|
||
return dt.astimezone(tz)
|
||
|
||
|
||
def _parse_date(s: str) -> date:
|
||
s = (s or "").strip()
|
||
if len(s) >= 10:
|
||
s = s[:10]
|
||
return date.fromisoformat(s)
|
||
|
||
|
||
def _date_floor(dt: datetime, tz: ZoneInfo) -> datetime:
|
||
dt = _as_tz(dt, tz)
|
||
return datetime.combine(dt.date(), time.min).replace(tzinfo=tz)
|
||
|
||
|
||
def _split_by_day(start: datetime, end: datetime, tz: ZoneInfo) -> Iterable[tuple[date, datetime, datetime]]:
|
||
start = _as_tz(start, tz)
|
||
end = _as_tz(end, tz)
|
||
if end <= start:
|
||
return []
|
||
cur = start
|
||
out: list[tuple[date, datetime, datetime]] = []
|
||
while True:
|
||
day_end = _date_floor(cur, tz) + timedelta(days=1)
|
||
seg_end = end if end <= day_end else day_end
|
||
out.append((cur.date(), cur, seg_end))
|
||
if seg_end >= end:
|
||
break
|
||
cur = seg_end
|
||
return out
|
||
|
||
|
||
def _union_seconds(intervals: list[tuple[datetime, datetime]], tz: ZoneInfo) -> int:
|
||
cleaned = []
|
||
for s, e in intervals:
|
||
s = _as_tz(s, tz)
|
||
e = _as_tz(e, tz)
|
||
if e > s:
|
||
cleaned.append((s, e))
|
||
if not cleaned:
|
||
return 0
|
||
cleaned.sort(key=lambda x: x[0])
|
||
total = 0
|
||
cur_s, cur_e = cleaned[0]
|
||
for s, e in cleaned[1:]:
|
||
if s <= cur_e:
|
||
if e > cur_e:
|
||
cur_e = e
|
||
else:
|
||
total += int((cur_e - cur_s).total_seconds())
|
||
cur_s, cur_e = s, e
|
||
total += int((cur_e - cur_s).total_seconds())
|
||
return total
|
||
|
||
|
||
def _avg_gap_days(visit_dates: list[date]) -> float | None:
|
||
if len(visit_dates) < 2:
|
||
return None
|
||
visit_dates = sorted(set(visit_dates))
|
||
if len(visit_dates) < 2:
|
||
return None
|
||
gaps = [(b - a).days for a, b in zip(visit_dates, visit_dates[1:]) if (b - a).days > 0]
|
||
if not gaps:
|
||
return None
|
||
return sum(gaps) / len(gaps)
|
||
|
||
|
||
def _load_sessions(
|
||
conn: DatabaseConnection,
|
||
*,
|
||
store_id: int,
|
||
tz: ZoneInfo,
|
||
start_dt: datetime,
|
||
end_dt: datetime,
|
||
areas: tuple[str, ...],
|
||
) -> list[SessionRow]:
|
||
sql = """
|
||
SELECT
|
||
member_id,
|
||
start_use_time,
|
||
ledger_end_time,
|
||
real_table_use_seconds,
|
||
site_table_area_name
|
||
FROM billiards_dwd.dwd_table_fee_log
|
||
WHERE site_id = %s
|
||
AND member_id IS NOT NULL
|
||
AND member_id <> 0
|
||
AND start_use_time >= %s
|
||
AND start_use_time < %s
|
||
AND site_table_area_name = ANY(%s)
|
||
"""
|
||
rows = conn.query(sql, (store_id, start_dt, end_dt, list(areas)))
|
||
sessions: list[SessionRow] = []
|
||
for r in rows:
|
||
member_id = int(r.get("member_id") or 0)
|
||
if member_id <= 0:
|
||
continue
|
||
start = r.get("start_use_time")
|
||
end = r.get("ledger_end_time")
|
||
if not isinstance(start, datetime):
|
||
continue
|
||
if isinstance(end, datetime):
|
||
pass
|
||
else:
|
||
secs = r.get("real_table_use_seconds")
|
||
try:
|
||
secs = int(secs or 0)
|
||
except Exception:
|
||
secs = 0
|
||
if secs > 0:
|
||
end = start + timedelta(seconds=secs)
|
||
else:
|
||
continue
|
||
|
||
start = _as_tz(start, tz)
|
||
end = _as_tz(end, tz)
|
||
if end <= start:
|
||
continue
|
||
|
||
sessions.append(SessionRow(member_id=member_id, start=start, end=end, area_name=r.get("site_table_area_name")))
|
||
return sessions
|
||
|
||
|
||
def _load_member_profiles(conn: DatabaseConnection, member_ids: list[int]) -> dict[int, dict]:
|
||
if not member_ids:
|
||
return {}
|
||
sql = """
|
||
SELECT member_id, nickname, mobile
|
||
FROM billiards_dwd.dim_member
|
||
WHERE scd2_is_current = 1
|
||
AND member_id = ANY(%s)
|
||
"""
|
||
rows = conn.query(sql, (member_ids,))
|
||
return {int(r["member_id"]): r for r in rows if r.get("member_id") is not None}
|
||
|
||
|
||
def _load_latest_settlement_contact(conn: DatabaseConnection, *, store_id: int, member_ids: list[int]) -> dict[int, dict]:
|
||
if not member_ids:
|
||
return {}
|
||
sql = """
|
||
SELECT DISTINCT ON (member_id)
|
||
member_id, member_name, member_phone, pay_time
|
||
FROM billiards_dwd.dwd_settlement_head
|
||
WHERE site_id = %s
|
||
AND member_id = ANY(%s)
|
||
AND member_id <> 0
|
||
ORDER BY member_id, pay_time DESC NULLS LAST
|
||
"""
|
||
rows = conn.query(sql, (store_id, member_ids))
|
||
return {int(r["member_id"]): r for r in rows if r.get("member_id") is not None}
|
||
|
||
|
||
def _build_member_aggs(sessions: list[SessionRow], tz: ZoneInfo) -> list[MemberAgg]:
|
||
by_member_day: dict[int, dict[date, list[tuple[datetime, datetime]]]] = {}
|
||
last_visit_at: dict[int, datetime] = {}
|
||
|
||
for s in sessions:
|
||
if s.member_id not in last_visit_at or s.start > last_visit_at[s.member_id]:
|
||
last_visit_at[s.member_id] = s.start
|
||
for d, seg_s, seg_e in _split_by_day(s.start, s.end, tz):
|
||
by_member_day.setdefault(s.member_id, {}).setdefault(d, []).append((seg_s, seg_e))
|
||
|
||
aggs: list[MemberAgg] = []
|
||
for member_id, day_map in by_member_day.items():
|
||
day_seconds = {d: _union_seconds(iv, tz) for d, iv in day_map.items()}
|
||
day_seconds = {d: sec for d, sec in day_seconds.items() if sec > 0}
|
||
if not day_seconds:
|
||
continue
|
||
visit_dates = sorted(day_seconds.keys())
|
||
total_hours = sum(day_seconds.values()) / 3600.0
|
||
visit_days = len(visit_dates)
|
||
avg_daily_hours = total_hours / visit_days if visit_days else 0.0
|
||
aggs.append(
|
||
MemberAgg(
|
||
member_id=member_id,
|
||
total_hours=float(total_hours),
|
||
visit_days=int(visit_days),
|
||
avg_daily_hours=float(avg_daily_hours),
|
||
avg_gap_days=_avg_gap_days(visit_dates),
|
||
last_visit_at=last_visit_at.get(member_id),
|
||
)
|
||
)
|
||
|
||
return aggs
|
||
|
||
|
||
def _write_report(
|
||
out_path: Path,
|
||
*,
|
||
rows: list[MemberAgg],
|
||
tz: ZoneInfo,
|
||
dim_profiles: dict[int, dict],
|
||
latest_settle: dict[int, dict],
|
||
):
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with out_path.open("w", encoding="utf-8-sig", newline="") as f:
|
||
w = csv.writer(f)
|
||
w.writerow(["排名", "客户姓名", "联系方式", "单次日打球小时数", "到店平均间隔(天)", "10-12月来店次数", "最后一次到店日期"])
|
||
for idx, row in enumerate(rows, start=1):
|
||
mid = int(row.member_id)
|
||
prof = dim_profiles.get(mid) or {}
|
||
settle = latest_settle.get(mid) or {}
|
||
name = settle.get("member_name") or prof.get("nickname") or ""
|
||
phone = settle.get("member_phone") or prof.get("mobile") or ""
|
||
last_visit_str = ""
|
||
if isinstance(row.last_visit_at, datetime):
|
||
last_visit_str = _as_tz(row.last_visit_at, tz).strftime("%Y-%m-%d %H:%M")
|
||
w.writerow(
|
||
[
|
||
idx,
|
||
str(name),
|
||
str(phone),
|
||
f"{float(row.avg_daily_hours or 0.0):.2f}",
|
||
"" if row.avg_gap_days is None else f"{float(row.avg_gap_days):.1f}",
|
||
int(row.visit_days or 0),
|
||
last_visit_str,
|
||
]
|
||
)
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="Loyal billiards customers report")
|
||
parser.add_argument("--start-date", default="2025-10-01", help="YYYY-MM-DD")
|
||
parser.add_argument("--end-date", default="", help="YYYY-MM-DD (default: today)")
|
||
parser.add_argument("--top-n", type=int, default=50)
|
||
parser.add_argument("--areas", default=",".join(DEFAULT_AREAS), help="comma separated")
|
||
parser.add_argument("--out", default="", help="output csv path")
|
||
args = parser.parse_args()
|
||
|
||
cfg = AppConfig.load({})
|
||
tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei"))
|
||
store_id = int(cfg.get("app.store_id"))
|
||
|
||
start_date = _parse_date(args.start_date)
|
||
end_date = _parse_date(args.end_date) if args.end_date else datetime.now(tz).date()
|
||
if end_date < start_date:
|
||
raise SystemExit("end_date must be >= start_date")
|
||
|
||
start_dt = datetime.combine(start_date, time.min).replace(tzinfo=tz)
|
||
end_dt = datetime.combine(end_date + timedelta(days=1), time.min).replace(tzinfo=tz)
|
||
|
||
areas = tuple([a.strip() for a in str(args.areas or "").split(",") if a.strip()])
|
||
if not areas:
|
||
raise SystemExit("areas is empty")
|
||
|
||
conn = DatabaseConnection(dsn=cfg["db"]["dsn"], session=cfg["db"].get("session"))
|
||
try:
|
||
sessions = _load_sessions(conn, store_id=store_id, tz=tz, start_dt=start_dt, end_dt=end_dt, areas=areas)
|
||
aggs = _build_member_aggs(sessions, tz)
|
||
aggs.sort(key=lambda x: x.avg_daily_hours, reverse=True)
|
||
top_n = max(1, int(args.top_n))
|
||
|
||
picked_avg = aggs[:top_n]
|
||
picked_gap_lt_20 = [r for r in aggs if r.avg_gap_days is not None and r.avg_gap_days < 20]
|
||
|
||
picked_visit = sorted(aggs, key=lambda x: (x.visit_days, x.avg_daily_hours), reverse=True)[:top_n]
|
||
picked_hours = sorted(aggs, key=lambda x: (x.total_hours, x.avg_daily_hours), reverse=True)[:top_n]
|
||
|
||
member_ids = sorted({r.member_id for r in (picked_avg + picked_gap_lt_20 + picked_visit + picked_hours)})
|
||
dim_profiles = _load_member_profiles(conn, member_ids)
|
||
latest_settle = _load_latest_settlement_contact(conn, store_id=store_id, member_ids=member_ids)
|
||
finally:
|
||
conn.close()
|
||
|
||
today = datetime.now(tz).date()
|
||
base_dir = Path(args.out).parent if args.out else Path(__file__).parent
|
||
out_main = Path(args.out) if args.out else base_dir / f"loyal_billiards_customers_{today.isoformat()}.csv"
|
||
out_gap = base_dir / f"loyal_billiards_customers_gap_lt_20d_{today.isoformat()}.csv"
|
||
out_visit = base_dir / f"loyal_billiards_customers_top{top_n}_by_visit_days_{today.isoformat()}.csv"
|
||
out_hours = base_dir / f"loyal_billiards_customers_top{top_n}_by_total_hours_{today.isoformat()}.csv"
|
||
|
||
_write_report(out_main, rows=picked_avg, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle)
|
||
_write_report(out_gap, rows=picked_gap_lt_20, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle)
|
||
_write_report(out_visit, rows=picked_visit, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle)
|
||
_write_report(out_hours, rows=picked_hours, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle)
|
||
|
||
print(str(out_main))
|
||
print(str(out_gap))
|
||
print(str(out_visit))
|
||
print(str(out_hours))
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|