# -*- coding: utf-8 -*- """ 统计“忠实台球类竞技客户”Top N(按平均单次日打球小时数排序)。 口径(默认): - 时间范围:2025-10-01 至今(可传参覆盖) - 仅统计 member_id != 0 且有时长的客户 - 台桌区域仅包含:A区、B区、C区、VIP包厢、斯诺克区、TV台 - 同日同一客户若同时开多台:按时间区间并集计时(不重复叠加) - “单次日打球小时数” = (时间范围内每日打球总小时数) / (有打球的日数) - “到店平均间(天)” = 相邻来店日的平均间隔天数(来店日数<2 则为空) - “10-12月来店次数” = 时间范围内来店日数(按日去重) - “最后一次到店日期” = 该客户最后一次开台的 start_use_time(精确到分钟) 输出:CSV(UTF-8-SIG,便于 Excel 打开) """ from __future__ import annotations import argparse import csv from dataclasses import dataclass from datetime import date, datetime, time, timedelta from pathlib import Path from typing import Iterable from zoneinfo import ZoneInfo from config.settings import AppConfig from database.connection import DatabaseConnection DEFAULT_AREAS = ("A区", "B区", "C区", "VIP包厢", "斯诺克区", "TV台") @dataclass(frozen=True) class SessionRow: member_id: int start: datetime end: datetime area_name: str | None @dataclass(frozen=True) class MemberAgg: member_id: int total_hours: float visit_days: int avg_daily_hours: float avg_gap_days: float | None last_visit_at: datetime | None def _as_tz(dt: datetime, tz: ZoneInfo) -> datetime: if dt.tzinfo is None: return dt.replace(tzinfo=tz) return dt.astimezone(tz) def _parse_date(s: str) -> date: s = (s or "").strip() if len(s) >= 10: s = s[:10] return date.fromisoformat(s) def _date_floor(dt: datetime, tz: ZoneInfo) -> datetime: dt = _as_tz(dt, tz) return datetime.combine(dt.date(), time.min).replace(tzinfo=tz) def _split_by_day(start: datetime, end: datetime, tz: ZoneInfo) -> Iterable[tuple[date, datetime, datetime]]: start = _as_tz(start, tz) end = _as_tz(end, tz) if end <= start: return [] cur = start out: list[tuple[date, datetime, datetime]] = [] while True: day_end = _date_floor(cur, tz) + timedelta(days=1) seg_end = end if end <= day_end else day_end out.append((cur.date(), cur, seg_end)) if seg_end >= end: break cur = seg_end return out def _union_seconds(intervals: list[tuple[datetime, datetime]], tz: ZoneInfo) -> int: cleaned = [] for s, e in intervals: s = _as_tz(s, tz) e = _as_tz(e, tz) if e > s: cleaned.append((s, e)) if not cleaned: return 0 cleaned.sort(key=lambda x: x[0]) total = 0 cur_s, cur_e = cleaned[0] for s, e in cleaned[1:]: if s <= cur_e: if e > cur_e: cur_e = e else: total += int((cur_e - cur_s).total_seconds()) cur_s, cur_e = s, e total += int((cur_e - cur_s).total_seconds()) return total def _avg_gap_days(visit_dates: list[date]) -> float | None: if len(visit_dates) < 2: return None visit_dates = sorted(set(visit_dates)) if len(visit_dates) < 2: return None gaps = [(b - a).days for a, b in zip(visit_dates, visit_dates[1:]) if (b - a).days > 0] if not gaps: return None return sum(gaps) / len(gaps) def _load_sessions( conn: DatabaseConnection, *, store_id: int, tz: ZoneInfo, start_dt: datetime, end_dt: datetime, areas: tuple[str, ...], ) -> list[SessionRow]: sql = """ SELECT member_id, start_use_time, ledger_end_time, real_table_use_seconds, site_table_area_name FROM billiards_dwd.dwd_table_fee_log WHERE site_id = %s AND member_id IS NOT NULL AND member_id <> 0 AND start_use_time >= %s AND start_use_time < %s AND site_table_area_name = ANY(%s) """ rows = conn.query(sql, (store_id, start_dt, end_dt, list(areas))) sessions: list[SessionRow] = [] for r in rows: member_id = int(r.get("member_id") or 0) if member_id <= 0: continue start = r.get("start_use_time") end = r.get("ledger_end_time") if not isinstance(start, datetime): continue if isinstance(end, datetime): pass else: secs = r.get("real_table_use_seconds") try: secs = int(secs or 0) except Exception: secs = 0 if secs > 0: end = start + timedelta(seconds=secs) else: continue start = _as_tz(start, tz) end = _as_tz(end, tz) if end <= start: continue sessions.append(SessionRow(member_id=member_id, start=start, end=end, area_name=r.get("site_table_area_name"))) return sessions def _load_member_profiles(conn: DatabaseConnection, member_ids: list[int]) -> dict[int, dict]: if not member_ids: return {} sql = """ SELECT member_id, nickname, mobile FROM billiards_dwd.dim_member WHERE scd2_is_current = 1 AND member_id = ANY(%s) """ rows = conn.query(sql, (member_ids,)) return {int(r["member_id"]): r for r in rows if r.get("member_id") is not None} def _load_latest_settlement_contact(conn: DatabaseConnection, *, store_id: int, member_ids: list[int]) -> dict[int, dict]: if not member_ids: return {} sql = """ SELECT DISTINCT ON (member_id) member_id, member_name, member_phone, pay_time FROM billiards_dwd.dwd_settlement_head WHERE site_id = %s AND member_id = ANY(%s) AND member_id <> 0 ORDER BY member_id, pay_time DESC NULLS LAST """ rows = conn.query(sql, (store_id, member_ids)) return {int(r["member_id"]): r for r in rows if r.get("member_id") is not None} def _build_member_aggs(sessions: list[SessionRow], tz: ZoneInfo) -> list[MemberAgg]: by_member_day: dict[int, dict[date, list[tuple[datetime, datetime]]]] = {} last_visit_at: dict[int, datetime] = {} for s in sessions: if s.member_id not in last_visit_at or s.start > last_visit_at[s.member_id]: last_visit_at[s.member_id] = s.start for d, seg_s, seg_e in _split_by_day(s.start, s.end, tz): by_member_day.setdefault(s.member_id, {}).setdefault(d, []).append((seg_s, seg_e)) aggs: list[MemberAgg] = [] for member_id, day_map in by_member_day.items(): day_seconds = {d: _union_seconds(iv, tz) for d, iv in day_map.items()} day_seconds = {d: sec for d, sec in day_seconds.items() if sec > 0} if not day_seconds: continue visit_dates = sorted(day_seconds.keys()) total_hours = sum(day_seconds.values()) / 3600.0 visit_days = len(visit_dates) avg_daily_hours = total_hours / visit_days if visit_days else 0.0 aggs.append( MemberAgg( member_id=member_id, total_hours=float(total_hours), visit_days=int(visit_days), avg_daily_hours=float(avg_daily_hours), avg_gap_days=_avg_gap_days(visit_dates), last_visit_at=last_visit_at.get(member_id), ) ) return aggs def _write_report( out_path: Path, *, rows: list[MemberAgg], tz: ZoneInfo, dim_profiles: dict[int, dict], latest_settle: dict[int, dict], ): out_path.parent.mkdir(parents=True, exist_ok=True) with out_path.open("w", encoding="utf-8-sig", newline="") as f: w = csv.writer(f) w.writerow(["排名", "客户姓名", "联系方式", "单次日打球小时数", "到店平均间隔(天)", "10-12月来店次数", "最后一次到店日期"]) for idx, row in enumerate(rows, start=1): mid = int(row.member_id) prof = dim_profiles.get(mid) or {} settle = latest_settle.get(mid) or {} name = settle.get("member_name") or prof.get("nickname") or "" phone = settle.get("member_phone") or prof.get("mobile") or "" last_visit_str = "" if isinstance(row.last_visit_at, datetime): last_visit_str = _as_tz(row.last_visit_at, tz).strftime("%Y-%m-%d %H:%M") w.writerow( [ idx, str(name), str(phone), f"{float(row.avg_daily_hours or 0.0):.2f}", "" if row.avg_gap_days is None else f"{float(row.avg_gap_days):.1f}", int(row.visit_days or 0), last_visit_str, ] ) def main() -> int: parser = argparse.ArgumentParser(description="Loyal billiards customers report") parser.add_argument("--start-date", default="2025-10-01", help="YYYY-MM-DD") parser.add_argument("--end-date", default="", help="YYYY-MM-DD (default: today)") parser.add_argument("--top-n", type=int, default=50) parser.add_argument("--areas", default=",".join(DEFAULT_AREAS), help="comma separated") parser.add_argument("--out", default="", help="output csv path") args = parser.parse_args() cfg = AppConfig.load({}) tz = ZoneInfo(cfg.get("app.timezone", "Asia/Taipei")) store_id = int(cfg.get("app.store_id")) start_date = _parse_date(args.start_date) end_date = _parse_date(args.end_date) if args.end_date else datetime.now(tz).date() if end_date < start_date: raise SystemExit("end_date must be >= start_date") start_dt = datetime.combine(start_date, time.min).replace(tzinfo=tz) end_dt = datetime.combine(end_date + timedelta(days=1), time.min).replace(tzinfo=tz) areas = tuple([a.strip() for a in str(args.areas or "").split(",") if a.strip()]) if not areas: raise SystemExit("areas is empty") conn = DatabaseConnection(dsn=cfg["db"]["dsn"], session=cfg["db"].get("session")) try: sessions = _load_sessions(conn, store_id=store_id, tz=tz, start_dt=start_dt, end_dt=end_dt, areas=areas) aggs = _build_member_aggs(sessions, tz) aggs.sort(key=lambda x: x.avg_daily_hours, reverse=True) top_n = max(1, int(args.top_n)) picked_avg = aggs[:top_n] picked_gap_lt_20 = [r for r in aggs if r.avg_gap_days is not None and r.avg_gap_days < 20] picked_visit = sorted(aggs, key=lambda x: (x.visit_days, x.avg_daily_hours), reverse=True)[:top_n] picked_hours = sorted(aggs, key=lambda x: (x.total_hours, x.avg_daily_hours), reverse=True)[:top_n] member_ids = sorted({r.member_id for r in (picked_avg + picked_gap_lt_20 + picked_visit + picked_hours)}) dim_profiles = _load_member_profiles(conn, member_ids) latest_settle = _load_latest_settlement_contact(conn, store_id=store_id, member_ids=member_ids) finally: conn.close() today = datetime.now(tz).date() base_dir = Path(args.out).parent if args.out else Path(__file__).parent out_main = Path(args.out) if args.out else base_dir / f"loyal_billiards_customers_{today.isoformat()}.csv" out_gap = base_dir / f"loyal_billiards_customers_gap_lt_20d_{today.isoformat()}.csv" out_visit = base_dir / f"loyal_billiards_customers_top{top_n}_by_visit_days_{today.isoformat()}.csv" out_hours = base_dir / f"loyal_billiards_customers_top{top_n}_by_total_hours_{today.isoformat()}.csv" _write_report(out_main, rows=picked_avg, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle) _write_report(out_gap, rows=picked_gap_lt_20, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle) _write_report(out_visit, rows=picked_visit, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle) _write_report(out_hours, rows=picked_hours, tz=tz, dim_profiles=dim_profiles, latest_settle=latest_settle) print(str(out_main)) print(str(out_gap)) print(str(out_visit)) print(str(out_hours)) return 0 if __name__ == "__main__": raise SystemExit(main())