Files
Neo-ZQYY/scripts/ops/member_report_compare.py
2026-03-15 10:15:02 +08:00

378 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
会员消费报表对比:数据库 vs 飞球导出 CSV
对比范围2025-12-09 ~ 2026-03-10
飞球 CSVtmp/结账记录_朗朗桌球_20251209_20260310.csv
通过小票号order_settle_id反查数据库获取会员信息
"""
import os
import csv
from decimal import Decimal
from collections import defaultdict
from dotenv import load_dotenv
load_dotenv()
PG_DSN = os.environ.get("TEST_DB_DSN") or os.environ.get("PG_DSN")
if not PG_DSN:
raise RuntimeError("TEST_DB_DSN / PG_DSN 未配置")
SYSTEM_ANALYZE_ROOT = os.environ.get("SYSTEM_ANALYZE_ROOT")
if not SYSTEM_ANALYZE_ROOT:
raise RuntimeError("SYSTEM_ANALYZE_ROOT 未配置")
import psycopg2
FEIQIU_CSV = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"tmp", "结账记录_朗朗桌球_20251209_20260310.csv"
)
# ── SQL: 按小票号批量查数据库记录 + 会员信息 ──
SQL_BY_TICKET = r"""
SELECT
sh.order_settle_id,
sh.settle_type,
sh.pay_time,
COALESCE(sh.table_charge_money, 0) AS table_charge_money,
COALESCE(sh.goods_money, 0) AS goods_money,
COALESCE(sh.assistant_pd_money, 0) AS assistant_pd_money,
COALESCE(sh.assistant_cx_money, 0) AS assistant_cx_money,
COALESCE(sh.electricity_money, 0) AS electricity_money,
COALESCE(sh.table_charge_money, 0)
+ COALESCE(sh.goods_money, 0)
+ COALESCE(sh.assistant_pd_money, 0)
+ COALESCE(sh.assistant_cx_money, 0)
+ COALESCE(sh.electricity_money, 0) AS items_sum,
sh.consume_money,
sh.member_id,
COALESCE(NULLIF(dm.mobile, ''), NULLIF(sh.member_phone, '')) AS phone,
COALESCE(NULLIF(dm.nickname, ''), NULLIF(sh.member_name, '')) AS member_name
FROM dwd.dwd_settlement_head sh
LEFT JOIN dwd.dim_member dm
ON dm.member_id = sh.member_id AND dm.scd2_is_current = 1
WHERE sh.order_settle_id = ANY(%s)
"""
# ── SQL: 数据库全量汇总(同期) ──
SQL_DB_TOTAL = r"""
SELECT
COUNT(*) AS total_orders,
ROUND(SUM(COALESCE(table_charge_money,0) + COALESCE(goods_money,0)
+ COALESCE(assistant_pd_money,0) + COALESCE(assistant_cx_money,0)
+ COALESCE(electricity_money,0)), 2) AS total_items_sum,
ROUND(SUM(COALESCE(table_charge_money,0)), 2) AS total_table_charge,
ROUND(SUM(COALESCE(goods_money,0)), 2) AS total_goods,
ROUND(SUM(COALESCE(assistant_pd_money,0) + COALESCE(assistant_cx_money,0)), 2) AS total_assistant,
ROUND(SUM(COALESCE(consume_money,0)), 2) AS total_consume_money,
COUNT(CASE WHEN member_id IS NOT NULL AND member_id != 0 THEN 1 END) AS member_orders,
COUNT(CASE WHEN member_id IS NULL OR member_id = 0 THEN 1 END) AS non_member_orders
FROM dwd.dwd_settlement_head
WHERE settle_type IN (1, 3)
AND pay_time >= '2025-12-09'
AND pay_time < '2026-03-11'
"""
def d(val: str) -> Decimal:
"""安全转 Decimal"""
return Decimal(val.strip() if val and val.strip() else "0")
def parse_feiqiu_csv(path: str) -> dict:
"""解析飞球 CSV返回 {小票号: row_dict} + 汇总"""
tickets = {}
totals = {
"orders": 0, "consume": Decimal("0"), "table_fee": Decimal("0"),
"goods": Decimal("0"), "service": Decimal("0"), "course": Decimal("0"),
"incentive": Decimal("0"), "bill_types": defaultdict(int),
}
with open(path, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
ticket = row.get("小票号", "").strip()
if not ticket or ticket == "-":
continue
totals["orders"] += 1
consume = d(row.get("消费金额", "0"))
table_fee = d(row.get("台费", "0"))
goods = d(row.get("商品费", "0"))
service = d(row.get("服务加收", "0"))
course = d(row.get("课程费", "0"))
incentive = d(row.get("激励费", "0"))
totals["consume"] += consume
totals["table_fee"] += table_fee
totals["goods"] += goods
totals["service"] += service
totals["course"] += course
totals["incentive"] += incentive
bt = row.get("账单类型", "未知")
totals["bill_types"][bt] += 1
tickets[ticket] = {
"consume": consume, "table_fee": table_fee,
"goods": goods, "service": service, "course": course,
"incentive": incentive, "bill_type": bt,
"table_no": row.get("台桌号", ""),
"pay_time": row.get("结账时间", ""),
}
return tickets, totals
def run_compare():
print("=" * 90, flush=True)
print("会员消费报表对比:数据库 vs 飞球导出", flush=True)
print("对比范围2025-12-09 ~ 2026-03-10", flush=True)
print("=" * 90, flush=True)
# ── 1. 解析飞球 CSV ──
print("\n📄 解析飞球 CSV ...", flush=True)
if not os.path.exists(FEIQIU_CSV):
print(f" ❌ 文件不存在: {FEIQIU_CSV}")
return
fq_tickets, fq_totals = parse_feiqiu_csv(FEIQIU_CSV)
print(f" 订单数: {fq_totals['orders']}", flush=True)
print(f" 消费金额合计: {fq_totals['consume']}", flush=True)
print(f" 台费: {fq_totals['table_fee']}", flush=True)
print(f" 商品费: {fq_totals['goods']}", flush=True)
print(f" 服务加收(陪打): {fq_totals['service']}", flush=True)
print(f" 课程费(超休): {fq_totals['course']}", flush=True)
print(f" 激励费: {fq_totals['incentive']}", flush=True)
print(f" 助教合计: {fq_totals['service'] + fq_totals['course']}", flush=True)
print(f" 账单类型: {dict(fq_totals['bill_types'])}", flush=True)
# ── 2. 用小票号反查数据库 ──
print("\n🗄️ 用小票号反查数据库 ...", flush=True)
ticket_ids = [int(t) for t in fq_tickets.keys() if t.isdigit()]
print(f" 待查小票数: {len(ticket_ids)}", flush=True)
db_records = {}
with psycopg2.connect(PG_DSN, connect_timeout=15,
options="-c statement_timeout=300000") as conn:
with conn.cursor() as cur:
# 分批查(每批 500
for i in range(0, len(ticket_ids), 500):
batch = ticket_ids[i:i+500]
cur.execute(SQL_BY_TICKET, (batch,))
for row in cur.fetchall():
db_records[str(row[0])] = {
"order_settle_id": row[0],
"settle_type": row[1],
"pay_time": row[2],
"table_charge": float(row[3]),
"goods": float(row[4]),
"assistant_pd": float(row[5]),
"assistant_cx": float(row[6]),
"electricity": float(row[7]),
"items_sum": float(row[8]),
"consume_money": float(row[9]) if row[9] else 0,
"member_id": row[10],
"phone": row[11],
"member_name": row[12],
}
# 全量汇总
cur.execute(SQL_DB_TOTAL)
db_total = cur.fetchone()
print(f" 数据库匹配到: {len(db_records)}", flush=True)
print(f" 飞球有但数据库无: {len(fq_tickets) - len(db_records)}", flush=True)
# ── 3. 数据库全量 ──
print(f"\n🗄️ 数据库全量(同期 settle_type IN 1,3:", flush=True)
print(f" 订单数: {db_total[0]} (会员: {db_total[6]}, 非会员: {db_total[7]})", flush=True)
print(f" items_sum: {db_total[1]}", flush=True)
print(f" 台费: {db_total[2]}", flush=True)
print(f" 商品费: {db_total[3]}", flush=True)
print(f" 助教费: {db_total[4]}", flush=True)
print(f" consume_money: {db_total[5]}", flush=True)
# ── 4. 总量对比 ──
print("\n" + "=" * 90, flush=True)
print("📊 总量对比(飞球 CSV vs 数据库全量)", flush=True)
print("=" * 90, flush=True)
fq_assistant = float(fq_totals["service"] + fq_totals["course"])
comparisons = [
("订单数", float(fq_totals["orders"]), float(db_total[0])),
("消费金额 vs items_sum", float(fq_totals["consume"]), float(db_total[1])),
("消费金额 vs consume_money", float(fq_totals["consume"]), float(db_total[5])),
("台费", float(fq_totals["table_fee"]), float(db_total[2])),
("商品费", float(fq_totals["goods"]), float(db_total[3])),
("助教费(服务+课程)", fq_assistant, float(db_total[4])),
]
print(f"{'指标':<28} {'飞球CSV':>14} {'数据库':>14} {'差额':>14} {'差异%':>9}", flush=True)
print("-" * 90, flush=True)
for label, fq_val, db_val in comparisons:
diff = db_val - fq_val
pct = (diff / fq_val * 100) if fq_val else 0
print(f"{label:<28} {fq_val:>14,.2f} {db_val:>14,.2f} {diff:>+14,.2f} {pct:>+8.2f}%", flush=True)
# ── 5. 逐单对比:找差异订单 ──
print("\n" + "=" * 90, flush=True)
print("🔍 逐单对比(飞球消费金额 vs 数据库 items_sum", flush=True)
print("=" * 90, flush=True)
# 提前定义输出目录
report_dir = os.path.join(os.path.dirname(SYSTEM_ANALYZE_ROOT), "member_reports")
os.makedirs(report_dir, exist_ok=True)
diffs = []
only_fq = []
only_db_settle_types = defaultdict(int) # 飞球有但数据库 settle_type 不同
for ticket, fq_row in fq_tickets.items():
db_row = db_records.get(ticket)
if not db_row:
only_fq.append(ticket)
continue
fq_consume = float(fq_row["consume"])
db_items = db_row["items_sum"]
diff = abs(db_items - fq_consume)
if diff > 0.02: # 容差 2 分钱
diffs.append({
"ticket": ticket,
"fq_pay_time": fq_row["pay_time"],
"db_pay_time": str(db_row["pay_time"]) if db_row["pay_time"] else "",
"fq_consume": fq_consume,
"db_items_sum": db_items,
"db_consume_money": db_row["consume_money"],
"diff": db_items - fq_consume,
"fq_table": float(fq_row["table_fee"]),
"db_table": db_row["table_charge"],
"fq_goods": float(fq_row["goods"]),
"db_goods": db_row["goods"],
"fq_assistant": float(fq_row["service"] + fq_row["course"]),
"db_assistant": db_row["assistant_pd"] + db_row["assistant_cx"],
"member_name": db_row["member_name"] or "",
"phone": db_row["phone"] or "",
"bill_type": fq_row["bill_type"],
"table_no": fq_row["table_no"],
"settle_type": db_row["settle_type"],
})
print(f" 总对比: {len(fq_tickets)}", flush=True)
print(f" 完全匹配差额≤0.02: {len(fq_tickets) - len(only_fq) - len(diffs)}", flush=True)
print(f" 有差异: {len(diffs)}", flush=True)
print(f" 飞球有但数据库无: {len(only_fq)}", flush=True)
if only_fq:
print(f"\n 飞球有但数据库无的小票号前20:", flush=True)
for t in only_fq[:20]:
fq_row = fq_tickets[t]
print(f" {t} 消费={fq_row['consume']} 类型={fq_row['bill_type']} 台桌={fq_row['table_no']}", flush=True)
if diffs:
# 导出完整差异明细 CSV
diff_csv_path = os.path.join(report_dir, "order_diff_detail_20251209.csv")
sorted_diffs = sorted(diffs, key=lambda x: abs(x["diff"]), reverse=True)
with open(diff_csv_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(["小票号", "飞球结账时间", "DB结账时间", "账单类型", "台桌",
"settle_type", "飞球消费", "DB_items_sum", "DB_consume_money",
"差额", "飞球台费", "DB台费", "飞球商品", "DB商品",
"飞球助教", "DB助教", "会员昵称", "手机号"])
for d_row in sorted_diffs:
writer.writerow([
d_row["ticket"], d_row["fq_pay_time"], d_row["db_pay_time"],
d_row["bill_type"], d_row["table_no"], d_row["settle_type"],
f"{d_row['fq_consume']:.2f}", f"{d_row['db_items_sum']:.2f}",
f"{d_row['db_consume_money']:.2f}", f"{d_row['diff']:+.2f}",
f"{d_row['fq_table']:.2f}", f"{d_row['db_table']:.2f}",
f"{d_row['fq_goods']:.2f}", f"{d_row['db_goods']:.2f}",
f"{d_row['fq_assistant']:.2f}", f"{d_row['db_assistant']:.2f}",
d_row["member_name"], d_row["phone"],
])
print(f"\n ✅ 差异明细已导出: {diff_csv_path}", flush=True)
print(f"\n 差异订单前30按差额绝对值排序:", flush=True)
print(f" {'小票号':<20} {'结账时间':<20} {'飞球消费':>10} {'DB items':>10} {'差额':>10} {'台桌':<10} {'会员':>8}", flush=True)
print(f" {'-'*100}", flush=True)
for d_row in sorted_diffs[:30]:
print(f" {d_row['ticket']:<20} {d_row['fq_pay_time']:<20} {d_row['fq_consume']:>10,.2f} {d_row['db_items_sum']:>10,.2f} "
f"{d_row['diff']:>+10,.2f} {d_row['table_no']:<10} {d_row['member_name']:>8}", flush=True)
# ── 6. 按会员归总(飞球 CSV 通过小票号反查会员) ──
print("\n" + "=" * 90, flush=True)
print("👤 按会员归总(飞球订单通过小票号反查数据库会员信息)", flush=True)
print("=" * 90, flush=True)
member_agg = defaultdict(lambda: {
"name": "", "fq_consume": Decimal("0"), "db_items": 0.0,
"fq_table": Decimal("0"), "fq_goods": Decimal("0"),
"fq_assistant": Decimal("0"), "orders": 0,
})
no_member_count = 0
no_db_match = 0
for ticket, fq_row in fq_tickets.items():
db_row = db_records.get(ticket)
if not db_row:
no_db_match += 1
continue
phone = db_row.get("phone") or ""
if not phone:
# 无会员
if db_row.get("member_id") and db_row["member_id"] != 0:
phone = f"member_{db_row['member_id']}"
else:
no_member_count += 1
continue
agg = member_agg[phone]
agg["name"] = db_row.get("member_name") or agg["name"]
agg["fq_consume"] += fq_row["consume"]
agg["db_items"] += db_row["items_sum"]
agg["fq_table"] += fq_row["table_fee"]
agg["fq_goods"] += fq_row["goods"]
agg["fq_assistant"] += fq_row["service"] + fq_row["course"]
agg["orders"] += 1
print(f" 有会员的订单: {sum(a['orders'] for a in member_agg.values())}", flush=True)
print(f" 非会员订单: {no_member_count}", flush=True)
print(f" 数据库无匹配: {no_db_match}", flush=True)
print(f" 会员数: {len(member_agg)}", flush=True)
# 导出 CSV
csv_path = os.path.join(report_dir, "member_consumption_20251209_compare.csv")
sorted_members = sorted(member_agg.items(), key=lambda x: float(x[1]["fq_consume"]), reverse=True)
with open(csv_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(["会员昵称", "手机号", "飞球消费合计", "DB items_sum合计",
"差额", "飞球台费", "飞球商品费", "飞球助教费", "订单数"])
for phone, agg in sorted_members:
diff = agg["db_items"] - float(agg["fq_consume"])
writer.writerow([
agg["name"], phone,
f"{float(agg['fq_consume']):.2f}",
f"{agg['db_items']:.2f}",
f"{diff:+.2f}",
f"{float(agg['fq_table']):.2f}",
f"{float(agg['fq_goods']):.2f}",
f"{float(agg['fq_assistant']):.2f}",
agg["orders"],
])
print(f"\n ✅ 会员对比报表已导出: {csv_path}", flush=True)
# 终端显示前 20
print(f"\n {'会员昵称':<12} {'手机号':<14} {'飞球消费':>10} {'DB items':>10} {'差额':>10} {'单数':>5}", flush=True)
print(f" {'-'*70}", flush=True)
for phone, agg in sorted_members[:20]:
diff = agg["db_items"] - float(agg["fq_consume"])
print(f" {agg['name']:<12} {phone:<14} {float(agg['fq_consume']):>10,.2f} "
f"{agg['db_items']:>10,.2f} {diff:>+10,.2f} {agg['orders']:>5}", flush=True)
if len(sorted_members) > 20:
print(f" ... 还有 {len(sorted_members) - 20} 位(见 CSV", flush=True)
if __name__ == "__main__":
run_compare()