Files
ZQYY.FQ-ETL/tasks/dws/finance_discount_task.py

487 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
优惠明细分析任务
功能说明:
"日期+优惠类型"为粒度,分析优惠构成
数据来源:
- dwd_settlement_head: 结账单头表(优惠字段)
- dwd_groupbuy_redemption: 团购核销(团购实付金额)
- dwd_member_balance_change: 余额变动(赠送卡消费)
目标表:
billiards_dws.dws_finance_discount_detail
更新策略:
- 更新频率:每日更新
- 幂等方式delete-before-insert按日期
业务规则:
- 团购优惠 (GROUPBUY): coupon_amount - 团购实付金额
- 会员折扣 (VIP): member_discount_amount
- 赠送卡抵扣 (GIFT_CARD_*): dwd_member_balance_change台费卡/酒水卡/活动抵用券)
- 抹零 (ROUNDING): rounding_amount
- 大客户优惠 (BIG_CUSTOMER): 手动调整中标记的大客户订单
- 其他优惠 (OTHER): 手动调整中除大客户外的部分
作者ETL团队
创建日期2026-02-01
"""
from __future__ import annotations
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from .base_dws_task import BaseDwsTask, TaskContext
class FinanceDiscountDetailTask(BaseDwsTask):
"""
优惠明细分析任务
分析各类优惠的使用情况:
- 团购优惠
- 会员折扣
- 赠送卡抵扣
- 手动调整
- 抹零
- 其他优惠
"""
def get_task_code(self) -> str:
return "DWS_FINANCE_DISCOUNT_DETAIL"
def get_target_table(self) -> str:
return "dws_finance_discount_detail"
def get_primary_keys(self) -> List[str]:
return ["site_id", "stat_date", "discount_type_code"]
def extract(self, context: TaskContext) -> Dict[str, Any]:
"""
抽取优惠相关数据
数据来源:
1. settlement_head: 各类优惠字段
2. groupbuy_redemption: 团购实付金额
"""
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
site_id = context.store_id
# 从settlement_head抽取优惠数据
discount_summary = self._extract_discount_summary(site_id, start_date, end_date)
# 从groupbuy_redemption获取团购实付金额
groupbuy_payments = self._extract_groupbuy_payments(site_id, start_date, end_date)
# 提取大客户优惠(拆分手动调整)
big_customer_summary = self._extract_big_customer_discounts(site_id, start_date, end_date)
# 提取赠送卡消费(按卡类型拆分)
gift_card_consumes = self._extract_gift_card_consumes(site_id, start_date, end_date)
return {
'discount_summary': discount_summary,
'groupbuy_payments': groupbuy_payments,
'big_customer_summary': big_customer_summary,
'gift_card_consumes': gift_card_consumes,
}
def _extract_discount_summary(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
从结账单头表抽取优惠汇总
字段说明:
- coupon_amount: 团购抵消台费金额
- adjust_amount: 手动调整金额(台费打折)
- member_discount_amount: 会员折扣
- rounding_amount: 抹零金额
- pl_coupon_sale_amount: 平台券销售金额团购实付路径1
"""
sql = """
SELECT
pay_time::DATE AS stat_date,
-- 团购相关
COALESCE(SUM(coupon_amount), 0) AS coupon_amount_total,
COALESCE(SUM(pl_coupon_sale_amount), 0) AS pl_coupon_sale_total,
COUNT(CASE WHEN coupon_amount > 0 THEN 1 END) AS coupon_order_count,
-- 手动调整
COALESCE(SUM(adjust_amount), 0) AS adjust_amount_total,
COUNT(CASE WHEN adjust_amount != 0 THEN 1 END) AS adjust_order_count,
-- 会员折扣
COALESCE(SUM(member_discount_amount), 0) AS member_discount_total,
COUNT(CASE WHEN member_discount_amount > 0 THEN 1 END) AS member_discount_order_count,
-- 抹零
COALESCE(SUM(rounding_amount), 0) AS rounding_amount_total,
COUNT(CASE WHEN rounding_amount != 0 THEN 1 END) AS rounding_order_count,
-- 总订单数
COUNT(*) AS total_orders
FROM billiards_dwd.dwd_settlement_head
WHERE site_id = %(site_id)s
AND pay_time >= %(start_date)s
AND pay_time < %(end_date)s + INTERVAL '1 day'
AND settle_status = 1 -- 已结账
GROUP BY pay_time::DATE
ORDER BY stat_date
"""
rows = self.db.query(sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
return [dict(row) for row in rows] if rows else []
def _extract_groupbuy_payments(
self,
site_id: int,
start_date: date,
end_date: date
) -> Dict[date, Decimal]:
"""
从团购核销表获取团购实付金额
团购实付金额计算:
- 若 pl_coupon_sale_amount > 0使用该值
- 否则使用 groupbuy_redemption.ledger_unit_price
返回:{日期: 团购实付总额}
"""
sql = """
SELECT
sh.pay_time::DATE AS stat_date,
SUM(
CASE
WHEN sh.pl_coupon_sale_amount > 0 THEN sh.pl_coupon_sale_amount
ELSE COALESCE(gr.ledger_unit_price, 0)
END
) AS groupbuy_payment
FROM billiards_dwd.dwd_settlement_head sh
LEFT JOIN billiards_dwd.dwd_groupbuy_redemption gr
ON gr.order_settle_id = sh.order_settle_id
AND COALESCE(gr.is_delete, 0) = 0
WHERE sh.site_id = %(site_id)s
AND sh.pay_time >= %(start_date)s
AND sh.pay_time < %(end_date)s + INTERVAL '1 day'
AND sh.settle_status = 1
AND sh.coupon_amount > 0 -- 只统计有团购的订单
GROUP BY sh.pay_time::DATE
"""
rows = self.db.query(sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
result = {}
if rows:
for row in rows:
result[row['stat_date']] = self.safe_decimal(row.get('groupbuy_payment', 0))
return result
def _extract_gift_card_consumes(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取赠送卡消费(按卡类型)
"""
gift_card_type_ids = (
2791990152417157, # 台费卡
2794699703437125, # 酒水卡
2793266846533445, # 活动抵用券
)
id_list = ", ".join(str(card_id) for card_id in gift_card_type_ids)
sql = f"""
SELECT
change_time::DATE AS stat_date,
card_type_id,
COUNT(*) AS consume_count,
SUM(ABS(change_amount)) AS consume_amount
FROM billiards_dwd.dwd_member_balance_change
WHERE site_id = %(site_id)s
AND change_time >= %(start_date)s
AND change_time < %(end_date)s + INTERVAL '1 day'
AND from_type = 1
AND change_amount < 0
AND COALESCE(is_delete, 0) = 0
AND card_type_id IN ({id_list})
GROUP BY change_time::DATE, card_type_id
"""
rows = self.db.query(sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
return [dict(row) for row in rows] if rows else []
def transform(self, data: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
"""
转换数据
将抽取的数据转换为目标表格式:
- 每种优惠类型一条记录
- 计算团购优惠coupon_amount - 团购实付)
- 计算优惠占比
"""
site_id = context.store_id
tenant_id = self.config.get("app.tenant_id", site_id)
discount_summary = data.get('discount_summary', [])
groupbuy_payments = data.get('groupbuy_payments', {})
big_customer_summary = {r['stat_date']: r for r in data.get('big_customer_summary', [])}
gift_card_consumes = data.get('gift_card_consumes', [])
records = []
# 优惠类型定义
# (type_code, type_name, amount_field, count_field, special_calc)
discount_types = [
('GROUPBUY', '团购优惠', 'coupon_amount_total', 'coupon_order_count', True),
('VIP', '会员折扣', 'member_discount_total', 'member_discount_order_count', False),
('ROUNDING', '抹零', 'rounding_amount_total', 'rounding_order_count', False),
]
gift_card_type_map = {
2791990152417157: ('GIFT_CARD_TABLE', '台费卡抵扣'),
2794699703437125: ('GIFT_CARD_DRINK', '酒水卡抵扣'),
2793266846533445: ('GIFT_CARD_COUPON', '活动抵用券抵扣'),
}
# 赠送卡消费按日期+类型聚合
gift_card_by_date: Dict[date, Dict[str, Dict[str, Any]]] = {}
for row in gift_card_consumes:
stat_date = row.get('stat_date')
card_type_id = row.get('card_type_id')
type_info = gift_card_type_map.get(card_type_id)
if not stat_date or not type_info:
continue
type_code, type_name = type_info
daily = gift_card_by_date.setdefault(stat_date, {})
entry = daily.setdefault(type_code, {'type_name': type_name, 'amount': Decimal('0'), 'count': 0})
entry['amount'] += self.safe_decimal(row.get('consume_amount', 0))
entry['count'] += self.safe_int(row.get('consume_count', 0))
discount_summary_map = {row.get('stat_date'): row for row in discount_summary if row.get('stat_date')}
stat_dates = set(discount_summary_map.keys())
stat_dates.update(groupbuy_payments.keys())
stat_dates.update(big_customer_summary.keys())
stat_dates.update(gift_card_by_date.keys())
for stat_date in sorted(stat_dates):
daily_data = discount_summary_map.get(stat_date, {})
# 计算各类优惠金额
daily_discounts = {}
total_discount = Decimal('0')
for type_code, type_name, amount_field, count_field, special_calc in discount_types:
if special_calc and type_code == 'GROUPBUY':
# 团购优惠 = 团购抵消台费 - 团购实付
coupon_amount = self.safe_decimal(daily_data.get(amount_field, 0))
groupbuy_paid = groupbuy_payments.get(stat_date, Decimal('0'))
discount_amount = coupon_amount - groupbuy_paid
# 确保优惠金额为正数
discount_amount = max(discount_amount, Decimal('0'))
else:
discount_amount = abs(self.safe_decimal(daily_data.get(amount_field, 0)))
usage_count = daily_data.get(count_field, 0) or 0
daily_discounts[type_code] = {
'type_name': type_name,
'amount': discount_amount,
'count': usage_count,
}
total_discount += discount_amount
# 赠送卡拆分(台费卡/酒水卡/活动券)
gift_daily = gift_card_by_date.get(stat_date, {})
for type_code, type_name in gift_card_type_map.values():
info = gift_daily.get(type_code, {'amount': Decimal('0'), 'count': 0})
daily_discounts[type_code] = {
'type_name': type_name,
'amount': self.safe_decimal(info.get('amount', 0)),
'count': self.safe_int(info.get('count', 0)),
}
total_discount += self.safe_decimal(info.get('amount', 0))
# 拆分手动调整为大客户/其他
adjust_amount = abs(self.safe_decimal(daily_data.get('adjust_amount_total', 0)))
adjust_count = daily_data.get('adjust_order_count', 0) or 0
big_customer_info = big_customer_summary.get(stat_date, {})
big_customer_amount = self.safe_decimal(big_customer_info.get('big_customer_amount', 0))
big_customer_count = big_customer_info.get('big_customer_count', 0) or 0
other_amount = adjust_amount - big_customer_amount
if other_amount < 0:
other_amount = Decimal('0')
other_count = adjust_count - big_customer_count
if other_count < 0:
other_count = 0
daily_discounts['BIG_CUSTOMER'] = {
'type_name': '大客户优惠',
'amount': big_customer_amount,
'count': big_customer_count,
}
daily_discounts['OTHER'] = {
'type_name': '其他优惠',
'amount': other_amount,
'count': other_count,
}
total_discount += big_customer_amount + other_amount
# 为每种优惠类型生成记录
for type_code, discount_info in daily_discounts.items():
discount_amount = discount_info['amount']
usage_count = discount_info['count']
# 计算占比(避免除零)
discount_ratio = (discount_amount / total_discount) if total_discount > 0 else Decimal('0')
records.append({
'site_id': site_id,
'tenant_id': tenant_id,
'stat_date': stat_date,
'discount_type_code': type_code,
'discount_type_name': discount_info['type_name'],
'discount_amount': discount_amount,
'discount_ratio': round(discount_ratio, 4),
'usage_count': usage_count,
'affected_orders': usage_count, # 简化:使用次数=影响订单数
})
return records
def _extract_big_customer_discounts(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取大客户优惠(基于手动调整)
"""
member_ids = self._parse_id_list(self.config.get("dws.discount.big_customer_member_ids"))
order_ids = self._parse_id_list(self.config.get("dws.discount.big_customer_order_ids"))
if not member_ids and not order_ids:
return []
sql = """
SELECT
pay_time::DATE AS stat_date,
order_settle_id,
member_id,
adjust_amount
FROM billiards_dwd.dwd_settlement_head
WHERE site_id = %(site_id)s
AND pay_time >= %(start_date)s
AND pay_time < %(end_date)s + INTERVAL '1 day'
AND adjust_amount != 0
"""
rows = self.db.query(sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
if not rows:
return []
result: Dict[date, Dict[str, Any]] = {}
for row in rows:
row_dict = dict(row)
stat_date = row_dict.get('stat_date')
if not stat_date:
continue
order_id = row_dict.get('order_settle_id')
member_id = row_dict.get('member_id')
if order_id not in order_ids and member_id not in member_ids:
continue
amount = abs(self.safe_decimal(row_dict.get('adjust_amount', 0)))
entry = result.setdefault(stat_date, {'stat_date': stat_date, 'big_customer_amount': Decimal('0'), 'big_customer_count': 0})
entry['big_customer_amount'] += amount
entry['big_customer_count'] += 1
return list(result.values())
def _parse_id_list(self, value: Any) -> set:
if not value:
return set()
if isinstance(value, str):
items = [v.strip() for v in value.split(",") if v.strip()]
return {int(v) for v in items if v.isdigit()}
if isinstance(value, (list, tuple, set)):
result = set()
for item in value:
if item is None:
continue
try:
result.add(int(item))
except (ValueError, TypeError):
continue
return result
return set()
def load(self, records: List[Dict[str, Any]], context: TaskContext) -> Dict[str, Any]:
"""
加载数据到目标表
使用幂等方式delete-before-insert按日期范围
"""
if not records:
return {'inserted': 0, 'deleted': 0}
site_id = context.store_id
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
# 删除窗口内的旧数据
delete_sql = """
DELETE FROM billiards_dws.dws_finance_discount_detail
WHERE site_id = %(site_id)s
AND stat_date >= %(start_date)s
AND stat_date <= %(end_date)s
"""
deleted = self.db.execute(delete_sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
# 批量插入新数据
insert_sql = """
INSERT INTO billiards_dws.dws_finance_discount_detail (
site_id, tenant_id, stat_date,
discount_type_code, discount_type_name,
discount_amount, discount_ratio,
usage_count, affected_orders,
created_at, updated_at
) VALUES (
%(site_id)s, %(tenant_id)s, %(stat_date)s,
%(discount_type_code)s, %(discount_type_name)s,
%(discount_amount)s, %(discount_ratio)s,
%(usage_count)s, %(affected_orders)s,
NOW(), NOW()
)
"""
inserted = 0
for record in records:
self.db.execute(insert_sql, record)
inserted += 1
return {
'deleted': deleted or 0,
'inserted': inserted,
}