Files
feiqiu-ETL/etl_billiards/scripts/analyze_member_discount_usage.py
2026-02-04 21:39:01 +08:00

288 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
会员折扣启用分析脚本
功能说明:
确认 dwd_settlement_head.member_discount_amount 字段是否已启用
分析内容:
1. 统计非零记录数
2. 按时间分布分析
3. 按会员类型分析
4. 与其他字段的关联分析
输出:
- 控制台打印分析结果
- 结论:字段是否已启用,使用场景
作者ETL团队
创建日期2026-02-01
"""
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
from etl_billiards.utils.config import Config
from etl_billiards.utils.db import DatabaseConnection
def analyze_member_discount_usage():
"""
执行会员折扣启用分析
"""
print("=" * 80)
print("会员折扣启用分析 (member_discount_amount)")
print("=" * 80)
print()
# 加载配置和数据库连接
config = Config()
db = DatabaseConnection(config)
try:
# 1. 基础统计
print("【1. 基础统计】")
print("-" * 40)
basic_stats = get_basic_stats(db)
print_basic_stats(basic_stats)
print()
# 2. 时间分布分析
print("【2. 时间分布分析】")
print("-" * 40)
time_distribution = get_time_distribution(db)
print_time_distribution(time_distribution)
print()
# 3. 会员类型分析
print("【3. 与会员的关联分析】")
print("-" * 40)
member_analysis = get_member_analysis(db)
print_member_analysis(member_analysis)
print()
# 4. 样本数据
print("【4. 样本数据】")
print("-" * 40)
samples = get_sample_data(db)
print_samples(samples)
print()
# 5. 结论
print("【5. 分析结论】")
print("-" * 40)
print_conclusion(basic_stats)
finally:
db.close()
def get_basic_stats(db: DatabaseConnection) -> Dict[str, Any]:
"""
获取基础统计数据
"""
sql = """
SELECT
COUNT(*) AS total_orders,
COUNT(CASE WHEN member_discount_amount != 0 THEN 1 END) AS with_member_discount,
COUNT(CASE WHEN member_discount_amount > 0 THEN 1 END) AS positive_discount,
COUNT(CASE WHEN member_discount_amount < 0 THEN 1 END) AS negative_discount,
SUM(member_discount_amount) AS total_member_discount,
AVG(CASE WHEN member_discount_amount != 0 THEN member_discount_amount END) AS avg_discount,
MAX(member_discount_amount) AS max_discount,
MIN(member_discount_amount) AS min_discount,
STDDEV(CASE WHEN member_discount_amount != 0 THEN member_discount_amount END) AS stddev_discount
FROM billiards_dwd.dwd_settlement_head
"""
rows = db.query(sql)
return dict(rows[0]) if rows else {}
def get_time_distribution(db: DatabaseConnection) -> List[Dict[str, Any]]:
"""
获取按月份的时间分布
"""
sql = """
SELECT
DATE_TRUNC('month', create_time)::DATE AS month,
COUNT(*) AS total_orders,
COUNT(CASE WHEN member_discount_amount != 0 THEN 1 END) AS with_discount,
SUM(member_discount_amount) AS total_discount
FROM billiards_dwd.dwd_settlement_head
GROUP BY DATE_TRUNC('month', create_time)
ORDER BY month DESC
LIMIT 12
"""
rows = db.query(sql)
return [dict(row) for row in rows] if rows else []
def get_member_analysis(db: DatabaseConnection) -> Dict[str, Any]:
"""
分析与会员的关联
"""
# 会员vs非会员
sql_member_vs_guest = """
SELECT
CASE WHEN member_id = 0 THEN '散客' ELSE '会员' END AS customer_type,
COUNT(*) AS total_orders,
COUNT(CASE WHEN member_discount_amount != 0 THEN 1 END) AS with_discount,
SUM(member_discount_amount) AS total_discount
FROM billiards_dwd.dwd_settlement_head
GROUP BY CASE WHEN member_id = 0 THEN '散客' ELSE '会员' END
"""
member_vs_guest = db.query(sql_member_vs_guest)
# 按会员卡等级
sql_by_grade = """
SELECT
COALESCE(m.member_card_grade_name, '未知') AS grade_name,
COUNT(*) AS total_orders,
COUNT(CASE WHEN sh.member_discount_amount != 0 THEN 1 END) AS with_discount,
SUM(sh.member_discount_amount) AS total_discount
FROM billiards_dwd.dwd_settlement_head sh
LEFT JOIN billiards_dwd.dim_member m ON sh.member_id = m.member_id
WHERE sh.member_id != 0
GROUP BY COALESCE(m.member_card_grade_name, '未知')
ORDER BY total_orders DESC
"""
by_grade = db.query(sql_by_grade)
return {
'member_vs_guest': [dict(row) for row in member_vs_guest] if member_vs_guest else [],
'by_grade': [dict(row) for row in by_grade] if by_grade else []
}
def get_sample_data(db: DatabaseConnection) -> List[Dict[str, Any]]:
"""
获取有会员折扣的样本数据
"""
sql = """
SELECT
sh.order_settle_id,
sh.order_trade_no,
sh.create_time,
sh.member_id,
m.nickname AS member_name,
m.member_card_grade_name,
sh.consume_money,
sh.pay_amount,
sh.member_discount_amount,
ROUND(sh.member_discount_amount / NULLIF(sh.consume_money, 0) * 100, 2) AS discount_ratio
FROM billiards_dwd.dwd_settlement_head sh
LEFT JOIN billiards_dwd.dim_member m ON sh.member_id = m.member_id
WHERE sh.member_discount_amount != 0
ORDER BY sh.create_time DESC
LIMIT 20
"""
rows = db.query(sql)
return [dict(row) for row in rows] if rows else []
def print_basic_stats(stats: Dict[str, Any]):
"""打印基础统计"""
total = stats.get('total_orders', 1)
with_discount = stats.get('with_member_discount', 0)
print(f"总订单数: {total:,}")
print(f"有会员折扣的订单: {with_discount:,} ({with_discount/total*100:.4f}%)")
print(f" - 正值(折扣): {stats.get('positive_discount', 0):,}")
print(f" - 负值(加价?): {stats.get('negative_discount', 0):,}")
print()
print(f"会员折扣总额: {stats.get('total_member_discount', 0):,.2f}")
print(f"平均折扣: {stats.get('avg_discount', 0) or 0:,.2f}")
print(f"最大折扣: {stats.get('max_discount', 0):,.2f}")
print(f"最小折扣: {stats.get('min_discount', 0):,.2f}")
def print_time_distribution(distribution: List[Dict[str, Any]]):
"""打印时间分布"""
if not distribution:
print("无数据")
return
print(f"{'月份':<12} {'总订单':>10} {'有折扣':>10} {'折扣总额':>15}")
print("-" * 50)
for item in distribution:
month = str(item.get('month', 'N/A'))[:7]
total = item.get('total_orders', 0)
with_discount = item.get('with_discount', 0)
total_discount = item.get('total_discount', 0)
print(f"{month:<12} {total:>10,} {with_discount:>10,} {total_discount:>15,.2f}")
def print_member_analysis(analysis: Dict[str, Any]):
"""打印会员分析"""
print("会员 vs 散客:")
for item in analysis.get('member_vs_guest', []):
print(f" {item.get('customer_type', 'N/A')}: {item.get('total_orders', 0):,} 单, {item.get('with_discount', 0)} 单有折扣, 折扣总额 {item.get('total_discount', 0):,.2f}")
print("\n按会员卡等级:")
for item in analysis.get('by_grade', []):
print(f" {item.get('grade_name', 'N/A')}: {item.get('total_orders', 0):,} 单, {item.get('with_discount', 0)} 单有折扣")
def print_samples(samples: List[Dict[str, Any]]):
"""打印样本数据"""
if not samples:
print("[!] 未发现使用会员折扣的订单")
return
print(f"{'订单ID':<20} {'会员':<15} {'等级':<10} {'消费':>12} {'折扣':>12} {'比例':>8}")
print("-" * 80)
for item in samples[:10]:
order_id = str(item.get('order_settle_id', 'N/A'))[:18]
member = str(item.get('member_name', 'N/A'))[:13]
grade = str(item.get('member_card_grade_name', 'N/A'))[:8]
consume = item.get('consume_money', 0)
discount = item.get('member_discount_amount', 0)
ratio = item.get('discount_ratio', 0)
print(f"{order_id:<20} {member:<15} {grade:<10} {consume:>12,.2f} {discount:>12,.2f} {ratio:>7}%")
def print_conclusion(stats: Dict[str, Any]):
"""打印分析结论"""
with_discount = stats.get('with_member_discount', 0)
total = stats.get('total_orders', 1)
ratio = with_discount / total * 100
if with_discount == 0:
print("【结论】: member_discount_amount 字段 **未启用**")
print()
print("该字段在所有订单中均为0表明")
print(" 1. 会员折扣功能在业务系统中未开启")
print(" 2. 或会员折扣通过其他方式如adjust_amount记录")
print()
print("【建议】:")
print(" 1. 在DWS财务统计中暂时不处理此字段")
print(" 2. 将此字段标记为'预留/待启用'")
print(" 3. 后续如果业务启用,再更新统计逻辑")
elif ratio < 1:
print(f"【结论】: member_discount_amount 字段 **极少使用** (仅{ratio:.4f}%订单)")
print()
print("该字段使用率极低,可能是:")
print(" 1. 会员折扣功能刚启用不久")
print(" 2. 仅特定场景使用")
print()
print("【建议】:")
print(" 1. 在DWS财务统计中保留此字段的处理逻辑")
print(" 2. 定期监控使用率变化")
else:
print(f"【结论】: member_discount_amount 字段 **已启用** ({ratio:.2f}%订单使用)")
print()
print("【建议】:")
print(" 1. 在DWS财务优惠明细中正常统计此字段")
print(" 2. 关注会员折扣与其他优惠的叠加规则")
if __name__ == "__main__":
analyze_member_discount_usage()