""" 会员消费报表对比:数据库 vs 飞球导出 CSV 对比范围:2025-12-09 ~ 2026-03-10 飞球 CSV:tmp/结账记录_朗朗桌球_20251209_20260310.csv 通过小票号(order_settle_id)反查数据库获取会员信息 """ import os import csv from decimal import Decimal from collections import defaultdict from dotenv import load_dotenv load_dotenv() PG_DSN = os.environ.get("TEST_DB_DSN") or os.environ.get("PG_DSN") if not PG_DSN: raise RuntimeError("TEST_DB_DSN / PG_DSN 未配置") SYSTEM_ANALYZE_ROOT = os.environ.get("SYSTEM_ANALYZE_ROOT") if not SYSTEM_ANALYZE_ROOT: raise RuntimeError("SYSTEM_ANALYZE_ROOT 未配置") import psycopg2 FEIQIU_CSV = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "tmp", "结账记录_朗朗桌球_20251209_20260310.csv" ) # ── SQL: 按小票号批量查数据库记录 + 会员信息 ── SQL_BY_TICKET = r""" SELECT sh.order_settle_id, sh.settle_type, sh.pay_time, COALESCE(sh.table_charge_money, 0) AS table_charge_money, COALESCE(sh.goods_money, 0) AS goods_money, COALESCE(sh.assistant_pd_money, 0) AS assistant_pd_money, COALESCE(sh.assistant_cx_money, 0) AS assistant_cx_money, COALESCE(sh.electricity_money, 0) AS electricity_money, COALESCE(sh.table_charge_money, 0) + COALESCE(sh.goods_money, 0) + COALESCE(sh.assistant_pd_money, 0) + COALESCE(sh.assistant_cx_money, 0) + COALESCE(sh.electricity_money, 0) AS items_sum, sh.consume_money, sh.member_id, COALESCE(NULLIF(dm.mobile, ''), NULLIF(sh.member_phone, '')) AS phone, COALESCE(NULLIF(dm.nickname, ''), NULLIF(sh.member_name, '')) AS member_name FROM dwd.dwd_settlement_head sh LEFT JOIN dwd.dim_member dm ON dm.member_id = sh.member_id AND dm.scd2_is_current = 1 WHERE sh.order_settle_id = ANY(%s) """ # ── SQL: 数据库全量汇总(同期) ── SQL_DB_TOTAL = r""" SELECT COUNT(*) AS total_orders, ROUND(SUM(COALESCE(table_charge_money,0) + COALESCE(goods_money,0) + COALESCE(assistant_pd_money,0) + COALESCE(assistant_cx_money,0) + COALESCE(electricity_money,0)), 2) AS total_items_sum, ROUND(SUM(COALESCE(table_charge_money,0)), 2) AS total_table_charge, ROUND(SUM(COALESCE(goods_money,0)), 2) AS total_goods, ROUND(SUM(COALESCE(assistant_pd_money,0) + COALESCE(assistant_cx_money,0)), 2) AS total_assistant, ROUND(SUM(COALESCE(consume_money,0)), 2) AS total_consume_money, COUNT(CASE WHEN member_id IS NOT NULL AND member_id != 0 THEN 1 END) AS member_orders, COUNT(CASE WHEN member_id IS NULL OR member_id = 0 THEN 1 END) AS non_member_orders FROM dwd.dwd_settlement_head WHERE settle_type IN (1, 3) AND pay_time >= '2025-12-09' AND pay_time < '2026-03-11' """ def d(val: str) -> Decimal: """安全转 Decimal""" return Decimal(val.strip() if val and val.strip() else "0") def parse_feiqiu_csv(path: str) -> dict: """解析飞球 CSV,返回 {小票号: row_dict} + 汇总""" tickets = {} totals = { "orders": 0, "consume": Decimal("0"), "table_fee": Decimal("0"), "goods": Decimal("0"), "service": Decimal("0"), "course": Decimal("0"), "incentive": Decimal("0"), "bill_types": defaultdict(int), } with open(path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: ticket = row.get("小票号", "").strip() if not ticket or ticket == "-": continue totals["orders"] += 1 consume = d(row.get("消费金额", "0")) table_fee = d(row.get("台费", "0")) goods = d(row.get("商品费", "0")) service = d(row.get("服务加收", "0")) course = d(row.get("课程费", "0")) incentive = d(row.get("激励费", "0")) totals["consume"] += consume totals["table_fee"] += table_fee totals["goods"] += goods totals["service"] += service totals["course"] += course totals["incentive"] += incentive bt = row.get("账单类型", "未知") totals["bill_types"][bt] += 1 tickets[ticket] = { "consume": consume, "table_fee": table_fee, "goods": goods, "service": service, "course": course, "incentive": incentive, "bill_type": bt, "table_no": row.get("台桌号", ""), "pay_time": row.get("结账时间", ""), } return tickets, totals def run_compare(): print("=" * 90, flush=True) print("会员消费报表对比:数据库 vs 飞球导出", flush=True) print("对比范围:2025-12-09 ~ 2026-03-10", flush=True) print("=" * 90, flush=True) # ── 1. 解析飞球 CSV ── print("\n📄 解析飞球 CSV ...", flush=True) if not os.path.exists(FEIQIU_CSV): print(f" ❌ 文件不存在: {FEIQIU_CSV}") return fq_tickets, fq_totals = parse_feiqiu_csv(FEIQIU_CSV) print(f" 订单数: {fq_totals['orders']}", flush=True) print(f" 消费金额合计: {fq_totals['consume']}", flush=True) print(f" 台费: {fq_totals['table_fee']}", flush=True) print(f" 商品费: {fq_totals['goods']}", flush=True) print(f" 服务加收(陪打): {fq_totals['service']}", flush=True) print(f" 课程费(超休): {fq_totals['course']}", flush=True) print(f" 激励费: {fq_totals['incentive']}", flush=True) print(f" 助教合计: {fq_totals['service'] + fq_totals['course']}", flush=True) print(f" 账单类型: {dict(fq_totals['bill_types'])}", flush=True) # ── 2. 用小票号反查数据库 ── print("\n🗄️ 用小票号反查数据库 ...", flush=True) ticket_ids = [int(t) for t in fq_tickets.keys() if t.isdigit()] print(f" 待查小票数: {len(ticket_ids)}", flush=True) db_records = {} with psycopg2.connect(PG_DSN, connect_timeout=15, options="-c statement_timeout=300000") as conn: with conn.cursor() as cur: # 分批查(每批 500) for i in range(0, len(ticket_ids), 500): batch = ticket_ids[i:i+500] cur.execute(SQL_BY_TICKET, (batch,)) for row in cur.fetchall(): db_records[str(row[0])] = { "order_settle_id": row[0], "settle_type": row[1], "pay_time": row[2], "table_charge": float(row[3]), "goods": float(row[4]), "assistant_pd": float(row[5]), "assistant_cx": float(row[6]), "electricity": float(row[7]), "items_sum": float(row[8]), "consume_money": float(row[9]) if row[9] else 0, "member_id": row[10], "phone": row[11], "member_name": row[12], } # 全量汇总 cur.execute(SQL_DB_TOTAL) db_total = cur.fetchone() print(f" 数据库匹配到: {len(db_records)} 条", flush=True) print(f" 飞球有但数据库无: {len(fq_tickets) - len(db_records)} 条", flush=True) # ── 3. 数据库全量 ── print(f"\n🗄️ 数据库全量(同期 settle_type IN 1,3):", flush=True) print(f" 订单数: {db_total[0]} (会员: {db_total[6]}, 非会员: {db_total[7]})", flush=True) print(f" items_sum: {db_total[1]}", flush=True) print(f" 台费: {db_total[2]}", flush=True) print(f" 商品费: {db_total[3]}", flush=True) print(f" 助教费: {db_total[4]}", flush=True) print(f" consume_money: {db_total[5]}", flush=True) # ── 4. 总量对比 ── print("\n" + "=" * 90, flush=True) print("📊 总量对比(飞球 CSV vs 数据库全量)", flush=True) print("=" * 90, flush=True) fq_assistant = float(fq_totals["service"] + fq_totals["course"]) comparisons = [ ("订单数", float(fq_totals["orders"]), float(db_total[0])), ("消费金额 vs items_sum", float(fq_totals["consume"]), float(db_total[1])), ("消费金额 vs consume_money", float(fq_totals["consume"]), float(db_total[5])), ("台费", float(fq_totals["table_fee"]), float(db_total[2])), ("商品费", float(fq_totals["goods"]), float(db_total[3])), ("助教费(服务+课程)", fq_assistant, float(db_total[4])), ] print(f"{'指标':<28} {'飞球CSV':>14} {'数据库':>14} {'差额':>14} {'差异%':>9}", flush=True) print("-" * 90, flush=True) for label, fq_val, db_val in comparisons: diff = db_val - fq_val pct = (diff / fq_val * 100) if fq_val else 0 print(f"{label:<28} {fq_val:>14,.2f} {db_val:>14,.2f} {diff:>+14,.2f} {pct:>+8.2f}%", flush=True) # ── 5. 逐单对比:找差异订单 ── print("\n" + "=" * 90, flush=True) print("🔍 逐单对比(飞球消费金额 vs 数据库 items_sum)", flush=True) print("=" * 90, flush=True) # 提前定义输出目录 report_dir = os.path.join(os.path.dirname(SYSTEM_ANALYZE_ROOT), "member_reports") os.makedirs(report_dir, exist_ok=True) diffs = [] only_fq = [] only_db_settle_types = defaultdict(int) # 飞球有但数据库 settle_type 不同 for ticket, fq_row in fq_tickets.items(): db_row = db_records.get(ticket) if not db_row: only_fq.append(ticket) continue fq_consume = float(fq_row["consume"]) db_items = db_row["items_sum"] diff = abs(db_items - fq_consume) if diff > 0.02: # 容差 2 分钱 diffs.append({ "ticket": ticket, "fq_pay_time": fq_row["pay_time"], "db_pay_time": str(db_row["pay_time"]) if db_row["pay_time"] else "", "fq_consume": fq_consume, "db_items_sum": db_items, "db_consume_money": db_row["consume_money"], "diff": db_items - fq_consume, "fq_table": float(fq_row["table_fee"]), "db_table": db_row["table_charge"], "fq_goods": float(fq_row["goods"]), "db_goods": db_row["goods"], "fq_assistant": float(fq_row["service"] + fq_row["course"]), "db_assistant": db_row["assistant_pd"] + db_row["assistant_cx"], "member_name": db_row["member_name"] or "", "phone": db_row["phone"] or "", "bill_type": fq_row["bill_type"], "table_no": fq_row["table_no"], "settle_type": db_row["settle_type"], }) print(f" 总对比: {len(fq_tickets)} 单", flush=True) print(f" 完全匹配(差额≤0.02): {len(fq_tickets) - len(only_fq) - len(diffs)} 单", flush=True) print(f" 有差异: {len(diffs)} 单", flush=True) print(f" 飞球有但数据库无: {len(only_fq)} 单", flush=True) if only_fq: print(f"\n 飞球有但数据库无的小票号(前20):", flush=True) for t in only_fq[:20]: fq_row = fq_tickets[t] print(f" {t} 消费={fq_row['consume']} 类型={fq_row['bill_type']} 台桌={fq_row['table_no']}", flush=True) if diffs: # 导出完整差异明细 CSV diff_csv_path = os.path.join(report_dir, "order_diff_detail_20251209.csv") sorted_diffs = sorted(diffs, key=lambda x: abs(x["diff"]), reverse=True) with open(diff_csv_path, "w", newline="", encoding="utf-8-sig") as f: writer = csv.writer(f) writer.writerow(["小票号", "飞球结账时间", "DB结账时间", "账单类型", "台桌", "settle_type", "飞球消费", "DB_items_sum", "DB_consume_money", "差额", "飞球台费", "DB台费", "飞球商品", "DB商品", "飞球助教", "DB助教", "会员昵称", "手机号"]) for d_row in sorted_diffs: writer.writerow([ d_row["ticket"], d_row["fq_pay_time"], d_row["db_pay_time"], d_row["bill_type"], d_row["table_no"], d_row["settle_type"], f"{d_row['fq_consume']:.2f}", f"{d_row['db_items_sum']:.2f}", f"{d_row['db_consume_money']:.2f}", f"{d_row['diff']:+.2f}", f"{d_row['fq_table']:.2f}", f"{d_row['db_table']:.2f}", f"{d_row['fq_goods']:.2f}", f"{d_row['db_goods']:.2f}", f"{d_row['fq_assistant']:.2f}", f"{d_row['db_assistant']:.2f}", d_row["member_name"], d_row["phone"], ]) print(f"\n ✅ 差异明细已导出: {diff_csv_path}", flush=True) print(f"\n 差异订单(前30,按差额绝对值排序):", flush=True) print(f" {'小票号':<20} {'结账时间':<20} {'飞球消费':>10} {'DB items':>10} {'差额':>10} {'台桌':<10} {'会员':>8}", flush=True) print(f" {'-'*100}", flush=True) for d_row in sorted_diffs[:30]: print(f" {d_row['ticket']:<20} {d_row['fq_pay_time']:<20} {d_row['fq_consume']:>10,.2f} {d_row['db_items_sum']:>10,.2f} " f"{d_row['diff']:>+10,.2f} {d_row['table_no']:<10} {d_row['member_name']:>8}", flush=True) # ── 6. 按会员归总(飞球 CSV 通过小票号反查会员) ── print("\n" + "=" * 90, flush=True) print("👤 按会员归总(飞球订单通过小票号反查数据库会员信息)", flush=True) print("=" * 90, flush=True) member_agg = defaultdict(lambda: { "name": "", "fq_consume": Decimal("0"), "db_items": 0.0, "fq_table": Decimal("0"), "fq_goods": Decimal("0"), "fq_assistant": Decimal("0"), "orders": 0, }) no_member_count = 0 no_db_match = 0 for ticket, fq_row in fq_tickets.items(): db_row = db_records.get(ticket) if not db_row: no_db_match += 1 continue phone = db_row.get("phone") or "" if not phone: # 无会员 if db_row.get("member_id") and db_row["member_id"] != 0: phone = f"member_{db_row['member_id']}" else: no_member_count += 1 continue agg = member_agg[phone] agg["name"] = db_row.get("member_name") or agg["name"] agg["fq_consume"] += fq_row["consume"] agg["db_items"] += db_row["items_sum"] agg["fq_table"] += fq_row["table_fee"] agg["fq_goods"] += fq_row["goods"] agg["fq_assistant"] += fq_row["service"] + fq_row["course"] agg["orders"] += 1 print(f" 有会员的订单: {sum(a['orders'] for a in member_agg.values())} 单", flush=True) print(f" 非会员订单: {no_member_count} 单", flush=True) print(f" 数据库无匹配: {no_db_match} 单", flush=True) print(f" 会员数: {len(member_agg)} 位", flush=True) # 导出 CSV csv_path = os.path.join(report_dir, "member_consumption_20251209_compare.csv") sorted_members = sorted(member_agg.items(), key=lambda x: float(x[1]["fq_consume"]), reverse=True) with open(csv_path, "w", newline="", encoding="utf-8-sig") as f: writer = csv.writer(f) writer.writerow(["会员昵称", "手机号", "飞球消费合计", "DB items_sum合计", "差额", "飞球台费", "飞球商品费", "飞球助教费", "订单数"]) for phone, agg in sorted_members: diff = agg["db_items"] - float(agg["fq_consume"]) writer.writerow([ agg["name"], phone, f"{float(agg['fq_consume']):.2f}", f"{agg['db_items']:.2f}", f"{diff:+.2f}", f"{float(agg['fq_table']):.2f}", f"{float(agg['fq_goods']):.2f}", f"{float(agg['fq_assistant']):.2f}", agg["orders"], ]) print(f"\n ✅ 会员对比报表已导出: {csv_path}", flush=True) # 终端显示前 20 print(f"\n {'会员昵称':<12} {'手机号':<14} {'飞球消费':>10} {'DB items':>10} {'差额':>10} {'单数':>5}", flush=True) print(f" {'-'*70}", flush=True) for phone, agg in sorted_members[:20]: diff = agg["db_items"] - float(agg["fq_consume"]) print(f" {agg['name']:<12} {phone:<14} {float(agg['fq_consume']):>10,.2f} " f"{agg['db_items']:>10,.2f} {diff:>+10,.2f} {agg['orders']:>5}", flush=True) if len(sorted_members) > 20: print(f" ... 还有 {len(sorted_members) - 20} 位(见 CSV)", flush=True) if __name__ == "__main__": run_compare()