# -*- coding: utf-8 -*- """ 一次性脚本:验证消费用例对账公式在全表中的成立率。 连接 test_etl_feiqiu 测试库,对每个对账公式计算: - 成立的订单数 / 不成立的订单数 / 不成立率 - 不成立时的最大偏差 - 前 5 个不成立的订单 ID 和偏差值 输出报告到 {EXPORT_ROOT}/REPORTS/formula_verification_report.md """ from __future__ import annotations import os from datetime import datetime from decimal import Decimal from pathlib import Path # ── 环境加载 ────────────────────────────────────────────── from _env_paths import get_output_path TEST_DB_DSN = os.environ.get("TEST_DB_DSN") if not TEST_DB_DSN: raise RuntimeError("环境变量 TEST_DB_DSN 未定义。请在根 .env 中配置测试库连接串。") import psycopg2 import psycopg2.extras # ── 输出路径 ────────────────────────────────────────────── EXPORT_ROOT = get_output_path("EXPORT_ROOT") OUTPUT_DIR = EXPORT_ROOT / "REPORTS" OUTPUT_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_FILE = OUTPUT_DIR / "formula_verification_report.md" def get_conn(): return psycopg2.connect(TEST_DB_DSN) def run_formula(cur, formula_id: str, formula_name: str, sql: str) -> dict: """ 执行一个公式验证 SQL,返回结构化结果。 SQL 必须返回以下列: - order_id: 订单 ID - diff: 偏差值(0 表示成立) """ print(f" 验证 {formula_id}: {formula_name} ...") cur.execute(sql) rows = cur.fetchall() total = len(rows) ok = sum(1 for r in rows if abs(r["diff"]) < Decimal("0.005")) fail = total - ok # 不成立的记录 failures = sorted( [r for r in rows if abs(r["diff"]) >= Decimal("0.005")], key=lambda r: abs(r["diff"]), reverse=True, ) max_diff = abs(failures[0]["diff"]) if failures else Decimal("0") top5 = failures[:5] result = { "id": formula_id, "name": formula_name, "total": total, "ok": ok, "fail": fail, "fail_rate": f"{fail / total * 100:.2f}%" if total > 0 else "N/A", "max_diff": float(max_diff), "top5": [ {"order_id": r["order_id"], "diff": float(r["diff"])} for r in top5 ], } print(f" 总数={total}, 成立={ok}, 不成立={fail} ({result['fail_rate']}), 最大偏差={max_diff}") return result # ── 公式定义 ────────────────────────────────────────────── FORMULAS = [ # ── 消费结算公式 (settle_type=1) ────────────────────── # F1: 消费构成等式(含 pl_coupon) { "id": "F1", "name": "消费构成: consume = table + goods + asst_pd + asst_cx + elec + pl_coupon (settle_type=1)", "sql": """ SELECT order_settle_id AS order_id, consume_money - (table_charge_money + goods_money + assistant_pd_money + assistant_cx_money + electricity_money + pl_coupon_sale_amount) AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 1 """, }, # F1b: 仅最近5个月(样本采样范围) { "id": "F1b", "name": "消费构成(最近5月): consume = table + goods + asst + elec + pl_coupon", "sql": """ SELECT order_settle_id AS order_id, consume_money - (table_charge_money + goods_money + assistant_pd_money + assistant_cx_money + electricity_money + pl_coupon_sale_amount) AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 1 AND create_time >= NOW() - INTERVAL '5 months' """, }, # F2: 收支平衡等式(减去 rounding) { "id": "F2", "name": "收支平衡: consume = point + balance + coupon + member_disc + adjust + pl_coupon - rounding", "sql": """ SELECT order_settle_id AS order_id, consume_money - (point_amount + balance_amount + coupon_amount + member_discount_amount + adjust_amount + pl_coupon_sale_amount - rounding_amount) AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 1 """, }, # F2b: 收支平衡(最近5月) { "id": "F2b", "name": "收支平衡(最近5月): consume = point + balance + coupon + disc + adjust + pl_coupon - rounding", "sql": """ SELECT order_settle_id AS order_id, consume_money - (point_amount + balance_amount + coupon_amount + member_discount_amount + adjust_amount + pl_coupon_sale_amount - rounding_amount) AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 1 AND create_time >= NOW() - INTERVAL '5 months' """, }, # F3: 台费明细一致性 { "id": "F3", "name": "台费明细: table_charge_money = SUM(table_fee_logs.ledger_amount)", "sql": """ SELECT h.order_settle_id AS order_id, h.table_charge_money - COALESCE(t.total_ledger, 0) AS diff FROM dwd.dwd_settlement_head h LEFT JOIN ( SELECT order_settle_id, SUM(ledger_amount) AS total_ledger FROM dwd.dwd_table_fee_log GROUP BY order_settle_id ) t ON h.order_settle_id = t.order_settle_id WHERE h.settle_type = 1 """, }, # F4: pay_amount = point_amount { "id": "F4", "name": "pay_amount = point_amount (消费结算)", "sql": """ SELECT order_settle_id AS order_id, pay_amount - point_amount AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 1 """, }, # F5: 余额构成: balance >= gift_card + recharge_card { "id": "F5", "name": "余额构成: balance >= gift_card + recharge_card", "sql": """ SELECT order_settle_id AS order_id, balance_amount - (gift_card_amount + recharge_card_amount) AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 1 AND balance_amount > 0 """, }, # F6: 抹零等式 (纯积分支付) { "id": "F6", "name": "抹零等式: point = consume + rounding (纯积分支付,无其他优惠)", "sql": """ SELECT order_settle_id AS order_id, point_amount - (consume_money + rounding_amount) AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 1 AND point_amount > 0 AND balance_amount = 0 AND coupon_amount = 0 AND member_discount_amount = 0 AND adjust_amount = 0 AND pl_coupon_sale_amount = 0 """, }, # ── 商品结算 / 充值撤销 ────────────────────────────── # F7: 商品结算: consume = goods_money { "id": "F7", "name": "商品结算: consume = goods_money (settle_type=3)", "sql": """ SELECT order_settle_id AS order_id, consume_money - goods_money AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 3 """, }, # F7b: 商品结算含 pl_coupon { "id": "F7b", "name": "商品结算: consume = goods_money + pl_coupon (settle_type=3)", "sql": """ SELECT order_settle_id AS order_id, consume_money - (goods_money + pl_coupon_sale_amount) AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 3 """, }, # F8: 充值撤销: consume = pay_amount { "id": "F8", "name": "充值撤销: consume = pay_amount (settle_type=7)", "sql": """ SELECT order_settle_id AS order_id, consume_money - pay_amount AS diff FROM dwd.dwd_settlement_head WHERE settle_type = 7 """, }, # ── 充值公式 ───────────────────────────────────────── # R1: 充值: pay_amount = point_amount { "id": "R1", "name": "充值: pay_amount = point_amount (pay>0)", "sql": """ SELECT recharge_order_id AS order_id, pay_amount - point_amount AS diff FROM dwd.dwd_recharge_order WHERE pay_amount > 0 """, }, # R2: 充值: ex.consume_money = pay_amount { "id": "R2", "name": "充值: ex.consume_money = pay_amount", "sql": """ SELECT r.recharge_order_id AS order_id, rex.consume_money - r.pay_amount AS diff FROM dwd.dwd_recharge_order r JOIN dwd.dwd_recharge_order_ex rex USING (recharge_order_id) WHERE r.pay_amount > 0 """, }, # ── 余额变动公式 ───────────────────────────────────── # B1: 余额变动一致性 { "id": "B1", "name": "余额变动: balance_after = balance_before + change_amount", "sql": """ SELECT balance_change_id AS order_id, balance_after - (balance_before + change_amount) AS diff FROM dwd.dwd_member_balance_change """, }, # B2: 本金变动一致性(过滤 NULL) { "id": "B2", "name": "本金变动: principal_after = principal_before + principal_change", "sql": """ SELECT balance_change_id AS order_id, principal_after - (principal_before + principal_change_amount) AS diff FROM dwd.dwd_member_balance_change WHERE principal_after IS NOT NULL AND principal_before IS NOT NULL AND principal_change_amount IS NOT NULL """, }, ] def generate_report(results: list[dict]) -> str: """生成 Markdown 格式的验证报告""" lines = [ "# 对账公式验证报告", "", f"> 数据库:test_etl_feiqiu DWD 层", f"> 生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"> 验证公式数:{len(results)}", "", "## 汇总", "", "| 编号 | 公式 | 总数 | 成立 | 不成立 | 不成立率 | 最大偏差 |", "|------|------|------|------|--------|----------|----------|", ] for r in results: status = "✅" if r["fail"] == 0 else "⚠️" if r["fail_rate"].replace("%", "") and float(r["fail_rate"].replace("%", "")) < 1 else "❌" lines.append( f"| {r['id']} | {r['name'][:50]}{'...' if len(r['name']) > 50 else ''} " f"| {r['total']:,} | {r['ok']:,} | {r['fail']:,} " f"| {r['fail_rate']} | {r['max_diff']:.2f} |" ) lines.append("") lines.append("## 详情") lines.append("") for r in results: lines.append(f"### {r['id']}: {r['name']}") lines.append("") lines.append(f"- 总数:{r['total']:,}") lines.append(f"- 成立:{r['ok']:,}") lines.append(f"- 不成立:{r['fail']:,} ({r['fail_rate']})") lines.append(f"- 最大偏差:{r['max_diff']:.2f}") lines.append("") if r["top5"]: lines.append("**不成立的前 5 条记录:**") lines.append("") lines.append("| 订单 ID | 偏差 |") lines.append("|---------|------|") for t in r["top5"]: lines.append(f"| {t['order_id']} | {t['diff']:.2f} |") lines.append("") else: lines.append("(全部成立)") lines.append("") return "\n".join(lines) def main(): print(f"连接测试库: {TEST_DB_DSN[:30]}...") conn = get_conn() results = [] try: cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) for formula in FORMULAS: try: result = run_formula(cur, formula["id"], formula["name"], formula["sql"]) results.append(result) except Exception as e: print(f" ❌ {formula['id']} 执行失败: {e}") results.append({ "id": formula["id"], "name": formula["name"], "total": 0, "ok": 0, "fail": 0, "fail_rate": "ERROR", "max_diff": 0, "top5": [], }) cur.close() finally: conn.close() # 生成报告 report = generate_report(results) with open(OUTPUT_FILE, "w", encoding="utf-8") as f: f.write(report) print(f"\n✅ 报告已生成: {OUTPUT_FILE}") print(f" 文件大小: {OUTPUT_FILE.stat().st_size / 1024:.1f} KB") # 打印汇总 print("\n=== 汇总 ===") all_ok = True for r in results: status = "✅" if r["fail"] == 0 else "❌" print(f" {status} {r['id']}: {r['name'][:60]} — {r['fail_rate']}") if r["fail"] > 0: all_ok = False if all_ok: print("\n🎉 所有公式全部成立!") else: print("\n⚠️ 部分公式存在不成立的记录,请查看详细报告。") if __name__ == "__main__": main()