Files
ZQYY.FQ-ETL/tasks/dws/finance_daily_task.py

628 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
财务日度汇总任务
功能说明:
"日期"为粒度,汇总当日财务数据
数据来源:
- dwd_settlement_head: 结账单头表
- dwd_groupbuy_redemption: 团购核销
- dwd_recharge_order: 充值订单
- dws_finance_expense_summary: 支出汇总Excel导入
- dws_platform_settlement: 平台回款/服务费Excel导入
目标表:
billiards_dws.dws_finance_daily_summary
更新策略:
- 更新频率:每小时更新当日数据
- 幂等方式delete-before-insert按日期
业务规则:
- 发生额table_charge_money + goods_money + assistant_pd_money + assistant_cx_money
- 团购优惠coupon_amount - 团购支付金额
- 团购支付pl_coupon_sale_amount 或关联 groupbuy_redemption.ledger_unit_price
- 首充/续充:通过 is_first 字段区分
作者ETL团队
创建日期2026-02-01
"""
from __future__ import annotations
import calendar
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from .base_dws_task import BaseDwsTask, TaskContext
class FinanceDailyTask(BaseDwsTask):
"""
财务日度汇总任务
汇总每日的:
- 发生额(正价)
- 优惠拆分
- 确认收入
- 现金流(流入/流出)
- 充值统计(首充/续充)
- 订单统计
"""
def get_task_code(self) -> str:
return "DWS_FINANCE_DAILY"
def get_target_table(self) -> str:
return "dws_finance_daily_summary"
def get_primary_keys(self) -> List[str]:
return ["site_id", "stat_date"]
# ==========================================================================
# ETL主流程
# ==========================================================================
def extract(self, context: TaskContext) -> Dict[str, Any]:
"""
提取数据
"""
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
site_id = context.store_id
self.logger.info(
"%s: 提取数据,日期范围 %s ~ %s",
self.get_task_code(), start_date, end_date
)
# 1. 获取结账单汇总
settlement_summary = self._extract_settlement_summary(site_id, start_date, end_date)
# 2. 获取团购核销汇总
groupbuy_summary = self._extract_groupbuy_summary(site_id, start_date, end_date)
# 3. 获取充值汇总
recharge_summary = self._extract_recharge_summary(site_id, start_date, end_date)
# 3.1 获取赠送卡消费汇总(余额变动)
gift_card_summary = self._extract_gift_card_consume_summary(site_id, start_date, end_date)
# 4. 获取支出汇总(来自导入表)
expense_summary = self._extract_expense_summary(site_id, start_date, end_date)
# 5. 获取平台回款汇总(来自导入表)
platform_summary = self._extract_platform_summary(site_id, start_date, end_date)
# 6. 获取大客户优惠明细(用于拆分手动优惠)
big_customer_summary = self._extract_big_customer_discounts(site_id, start_date, end_date)
return {
'settlement_summary': settlement_summary,
'groupbuy_summary': groupbuy_summary,
'recharge_summary': recharge_summary,
'gift_card_summary': gift_card_summary,
'expense_summary': expense_summary,
'platform_summary': platform_summary,
'big_customer_summary': big_customer_summary,
'start_date': start_date,
'end_date': end_date,
'site_id': site_id
}
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
"""
转换数据:按日期聚合
"""
settlement_summary = extracted['settlement_summary']
groupbuy_summary = extracted['groupbuy_summary']
recharge_summary = extracted['recharge_summary']
gift_card_summary = extracted['gift_card_summary']
expense_summary = extracted['expense_summary']
platform_summary = extracted['platform_summary']
big_customer_summary = extracted['big_customer_summary']
site_id = extracted['site_id']
self.logger.info(
"%s: 转换数据,%d 天结账数据,%d 天充值数据",
self.get_task_code(), len(settlement_summary), len(recharge_summary)
)
# 按日期合并数据
dates = set()
for item in settlement_summary + recharge_summary + gift_card_summary + expense_summary + platform_summary:
stat_date = item.get('stat_date')
if stat_date:
dates.add(stat_date)
# 构建索引
settle_index = {s['stat_date']: s for s in settlement_summary}
groupbuy_index = {g['stat_date']: g for g in groupbuy_summary}
recharge_index = {r['stat_date']: r for r in recharge_summary}
gift_card_index = {g['stat_date']: g for g in gift_card_summary}
expense_index = {e['stat_date']: e for e in expense_summary}
platform_index = {p['stat_date']: p for p in platform_summary}
big_customer_index = {b['stat_date']: b for b in big_customer_summary}
results = []
for stat_date in sorted(dates):
settle = settle_index.get(stat_date, {})
groupbuy = groupbuy_index.get(stat_date, {})
recharge = recharge_index.get(stat_date, {})
gift_card = gift_card_index.get(stat_date, {})
expense = expense_index.get(stat_date, {})
platform = platform_index.get(stat_date, {})
big_customer = big_customer_index.get(stat_date, {})
record = self._build_daily_record(
stat_date, settle, groupbuy, recharge, gift_card, expense, platform, big_customer, site_id
)
results.append(record)
return results
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
"""
加载数据
"""
if not transformed:
self.logger.info("%s: 无数据需要写入", self.get_task_code())
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
deleted = self.delete_existing_data(context, date_col="stat_date")
inserted = self.bulk_insert(transformed)
self.logger.info(
"%s: 加载完成,删除 %d 行,插入 %d",
self.get_task_code(), deleted, inserted
)
return {
"counts": {
"fetched": len(transformed),
"inserted": inserted,
"updated": 0,
"skipped": 0,
"errors": 0
},
"extra": {"deleted": deleted}
}
# ==========================================================================
# 数据提取方法
# ==========================================================================
def _extract_settlement_summary(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取结账单日汇总
"""
sql = """
SELECT
DATE(pay_time) AS stat_date,
COUNT(*) AS order_count,
COUNT(CASE WHEN member_id != 0 AND member_id IS NOT NULL THEN 1 END) AS member_order_count,
COUNT(CASE WHEN member_id = 0 OR member_id IS NULL THEN 1 END) AS guest_order_count,
-- 发生额(正价)
SUM(table_charge_money) AS table_fee_amount,
SUM(goods_money) AS goods_amount,
SUM(assistant_pd_money) AS assistant_pd_amount,
SUM(assistant_cx_money) AS assistant_cx_amount,
SUM(table_charge_money + goods_money + assistant_pd_money + assistant_cx_money) AS gross_amount,
-- 支付
SUM(pay_amount) AS cash_pay_amount,
SUM(recharge_card_amount) AS card_pay_amount,
SUM(balance_amount) AS balance_pay_amount,
-- 优惠
SUM(coupon_amount) AS coupon_amount,
SUM(adjust_amount) AS adjust_amount,
SUM(member_discount_amount) AS member_discount_amount,
SUM(rounding_amount) AS rounding_amount,
SUM(pl_coupon_sale_amount) AS pl_coupon_sale_amount,
-- 消费金额
SUM(consume_money) AS total_consume
FROM billiards_dwd.dwd_settlement_head
WHERE site_id = %s
AND DATE(pay_time) >= %s
AND DATE(pay_time) <= %s
GROUP BY DATE(pay_time)
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_groupbuy_summary(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取团购核销日汇总
"""
sql = """
SELECT
sh.pay_time::DATE AS stat_date,
COUNT(CASE WHEN sh.coupon_amount > 0 THEN 1 END) AS groupbuy_count,
SUM(
CASE
WHEN sh.coupon_amount > 0 THEN
CASE
WHEN sh.pl_coupon_sale_amount > 0 THEN sh.pl_coupon_sale_amount
ELSE COALESCE(gr.ledger_unit_price, 0)
END
ELSE 0
END
) AS groupbuy_pay_total
FROM billiards_dwd.dwd_settlement_head sh
LEFT JOIN billiards_dwd.dwd_groupbuy_redemption gr
ON gr.order_settle_id = sh.order_settle_id
AND COALESCE(gr.is_delete, 0) = 0
WHERE sh.site_id = %s
AND sh.pay_time >= %s
AND sh.pay_time < %s + INTERVAL '1 day'
GROUP BY sh.pay_time::DATE
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_recharge_summary(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取充值日汇总
"""
sql = """
SELECT
DATE(pay_time) AS stat_date,
COUNT(*) AS recharge_count,
SUM(pay_money + gift_money) AS recharge_total,
SUM(pay_money) AS recharge_cash,
SUM(gift_money) AS recharge_gift,
COUNT(CASE WHEN is_first = 1 THEN 1 END) AS first_recharge_count,
SUM(CASE WHEN is_first = 1 THEN pay_money + gift_money ELSE 0 END) AS first_recharge_total,
SUM(CASE WHEN is_first = 1 THEN pay_money ELSE 0 END) AS first_recharge_cash,
SUM(CASE WHEN is_first = 1 THEN gift_money ELSE 0 END) AS first_recharge_gift,
COUNT(CASE WHEN is_first = 0 OR is_first IS NULL THEN 1 END) AS renewal_count,
SUM(CASE WHEN is_first = 0 OR is_first IS NULL THEN pay_money + gift_money ELSE 0 END) AS renewal_total,
SUM(CASE WHEN is_first = 0 OR is_first IS NULL THEN pay_money ELSE 0 END) AS renewal_cash,
SUM(CASE WHEN is_first = 0 OR is_first IS NULL THEN gift_money ELSE 0 END) AS renewal_gift,
COUNT(DISTINCT member_id) AS recharge_member_count
FROM billiards_dwd.dwd_recharge_order
WHERE site_id = %s
AND DATE(pay_time) >= %s
AND DATE(pay_time) <= %s
GROUP BY DATE(pay_time)
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_gift_card_consume_summary(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取赠送卡消费汇总(来自余额变动)
"""
gift_card_type_ids = (
2791990152417157, # 台费卡
2794699703437125, # 酒水卡
2793266846533445, # 活动抵用券
)
id_list = ", ".join(str(card_id) for card_id in gift_card_type_ids)
sql = f"""
SELECT
change_time::DATE AS stat_date,
SUM(ABS(change_amount)) AS gift_card_consume
FROM billiards_dwd.dwd_member_balance_change
WHERE site_id = %s
AND change_time >= %s
AND change_time < %s + INTERVAL '1 day'
AND from_type = 1
AND change_amount < 0
AND COALESCE(is_delete, 0) = 0
AND card_type_id IN ({id_list})
GROUP BY change_time::DATE
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_expense_summary(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取支出汇总(来自导入表,按月分摊到日)
"""
if start_date > end_date:
return []
start_month = start_date.replace(day=1)
end_month = end_date.replace(day=1)
sql = """
SELECT
expense_month,
SUM(expense_amount) AS expense_amount
FROM billiards_dws.dws_finance_expense_summary
WHERE site_id = %s
AND expense_month >= %s
AND expense_month <= %s
GROUP BY expense_month
"""
rows = self.db.query(sql, (site_id, start_month, end_month))
if not rows:
return []
daily_totals: Dict[date, Decimal] = {}
for row in rows:
row_dict = dict(row)
month_date = row_dict.get('expense_month')
if not month_date:
continue
amount = self.safe_decimal(row_dict.get('expense_amount', 0))
days_in_month = calendar.monthrange(month_date.year, month_date.month)[1]
daily_amount = amount / Decimal(str(days_in_month)) if days_in_month > 0 else Decimal('0')
for day in range(1, days_in_month + 1):
stat_date = date(month_date.year, month_date.month, day)
if stat_date < start_date or stat_date > end_date:
continue
daily_totals[stat_date] = daily_totals.get(stat_date, Decimal('0')) + daily_amount
return [
{'stat_date': stat_date, 'expense_amount': amount}
for stat_date, amount in sorted(daily_totals.items())
]
def _extract_platform_summary(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取平台回款/服务费汇总(来自导入表)
"""
sql = """
SELECT
settlement_date AS stat_date,
SUM(settlement_amount) AS settlement_amount,
SUM(commission_amount) AS commission_amount,
SUM(service_fee) AS service_fee
FROM billiards_dws.dws_platform_settlement
WHERE site_id = %s
AND settlement_date >= %s
AND settlement_date <= %s
GROUP BY settlement_date
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_big_customer_discounts(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取大客户优惠(用于拆分手动调整)
"""
member_ids = self._parse_id_list(self.config.get("dws.discount.big_customer_member_ids"))
order_ids = self._parse_id_list(self.config.get("dws.discount.big_customer_order_ids"))
if not member_ids and not order_ids:
return []
sql = """
SELECT
pay_time::DATE AS stat_date,
order_settle_id,
member_id,
adjust_amount
FROM billiards_dwd.dwd_settlement_head
WHERE site_id = %s
AND pay_time >= %s
AND pay_time < %s + INTERVAL '1 day'
AND adjust_amount != 0
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
if not rows:
return []
result: Dict[date, Dict[str, Any]] = {}
for row in rows:
row_dict = dict(row)
stat_date = row_dict.get('stat_date')
if not stat_date:
continue
order_id = row_dict.get('order_settle_id')
member_id = row_dict.get('member_id')
if order_id not in order_ids and member_id not in member_ids:
continue
amount = abs(self.safe_decimal(row_dict.get('adjust_amount', 0)))
entry = result.setdefault(stat_date, {'stat_date': stat_date, 'big_customer_amount': Decimal('0'), 'big_customer_count': 0})
entry['big_customer_amount'] += amount
entry['big_customer_count'] += 1
return list(result.values())
def _parse_id_list(self, value: Any) -> set:
if not value:
return set()
if isinstance(value, str):
items = [v.strip() for v in value.split(",") if v.strip()]
return {int(v) for v in items if v.isdigit()}
if isinstance(value, (list, tuple, set)):
result = set()
for item in value:
if item is None:
continue
try:
result.add(int(item))
except (ValueError, TypeError):
continue
return result
return set()
# ==========================================================================
# 数据转换方法
# ==========================================================================
def _build_daily_record(
self,
stat_date: date,
settle: Dict[str, Any],
groupbuy: Dict[str, Any],
recharge: Dict[str, Any],
gift_card: Dict[str, Any],
expense: Dict[str, Any],
platform: Dict[str, Any],
big_customer: Dict[str, Any],
site_id: int
) -> Dict[str, Any]:
"""
构建日度财务记录
"""
# 发生额
gross_amount = self.safe_decimal(settle.get('gross_amount', 0))
table_fee_amount = self.safe_decimal(settle.get('table_fee_amount', 0))
goods_amount = self.safe_decimal(settle.get('goods_amount', 0))
assistant_pd_amount = self.safe_decimal(settle.get('assistant_pd_amount', 0))
assistant_cx_amount = self.safe_decimal(settle.get('assistant_cx_amount', 0))
# 支付
cash_pay_amount = self.safe_decimal(settle.get('cash_pay_amount', 0))
card_pay_amount = self.safe_decimal(settle.get('card_pay_amount', 0))
balance_pay_amount = self.safe_decimal(settle.get('balance_pay_amount', 0))
# 优惠
coupon_amount = self.safe_decimal(settle.get('coupon_amount', 0))
pl_coupon_sale = self.safe_decimal(settle.get('pl_coupon_sale_amount', 0))
groupbuy_pay = self.safe_decimal(groupbuy.get('groupbuy_pay_total', 0))
# 团购支付金额优先使用pl_coupon_sale_amount否则使用groupbuy核销金额
if pl_coupon_sale > 0:
groupbuy_pay_amount = pl_coupon_sale
else:
groupbuy_pay_amount = groupbuy_pay
# 团购优惠 = 团购抵消台费 - 团购支付金额
discount_groupbuy = coupon_amount - groupbuy_pay_amount
if discount_groupbuy < 0:
discount_groupbuy = Decimal('0')
adjust_amount = self.safe_decimal(settle.get('adjust_amount', 0))
member_discount = self.safe_decimal(settle.get('member_discount_amount', 0))
rounding_amount = self.safe_decimal(settle.get('rounding_amount', 0))
big_customer_amount = self.safe_decimal(big_customer.get('big_customer_amount', 0))
other_discount = adjust_amount - big_customer_amount
if other_discount < 0:
other_discount = Decimal('0')
# 赠送卡消费(来自余额变动)
gift_card_consume_amount = self.safe_decimal(gift_card.get('gift_card_consume', 0))
# 优惠合计
discount_total = discount_groupbuy + member_discount + gift_card_consume_amount + adjust_amount + rounding_amount
# 确认收入
confirmed_income = gross_amount - discount_total
# 现金流
platform_settlement_amount = self.safe_decimal(platform.get('settlement_amount', 0))
platform_fee_amount = (
self.safe_decimal(platform.get('commission_amount', 0))
+ self.safe_decimal(platform.get('service_fee', 0))
)
recharge_cash_inflow = self.safe_decimal(recharge.get('recharge_cash', 0))
platform_inflow = platform_settlement_amount if platform_settlement_amount > 0 else groupbuy_pay_amount
cash_inflow_total = cash_pay_amount + platform_inflow + recharge_cash_inflow
cash_outflow_total = self.safe_decimal(expense.get('expense_amount', 0)) + platform_fee_amount
cash_balance_change = cash_inflow_total - cash_outflow_total
# 卡消费
cash_card_consume = card_pay_amount + balance_pay_amount
gift_card_consume = gift_card_consume_amount
card_consume_total = cash_card_consume + gift_card_consume
# 充值统计
recharge_count = self.safe_int(recharge.get('recharge_count', 0))
recharge_total = self.safe_decimal(recharge.get('recharge_total', 0))
recharge_cash = self.safe_decimal(recharge.get('recharge_cash', 0))
recharge_gift = self.safe_decimal(recharge.get('recharge_gift', 0))
first_recharge_count = self.safe_int(recharge.get('first_recharge_count', 0))
first_recharge_amount = self.safe_decimal(recharge.get('first_recharge_total', 0))
renewal_count = self.safe_int(recharge.get('renewal_count', 0))
renewal_amount = self.safe_decimal(recharge.get('renewal_total', 0))
# 订单统计
order_count = self.safe_int(settle.get('order_count', 0))
member_order_count = self.safe_int(settle.get('member_order_count', 0))
guest_order_count = self.safe_int(settle.get('guest_order_count', 0))
avg_order_amount = gross_amount / order_count if order_count > 0 else Decimal('0')
return {
'site_id': site_id,
'tenant_id': self.config.get("app.tenant_id", site_id),
'stat_date': stat_date,
# 发生额
'gross_amount': gross_amount,
'table_fee_amount': table_fee_amount,
'goods_amount': goods_amount,
'assistant_pd_amount': assistant_pd_amount,
'assistant_cx_amount': assistant_cx_amount,
# 优惠
'discount_total': discount_total,
'discount_groupbuy': discount_groupbuy,
'discount_vip': member_discount,
'discount_gift_card': gift_card_consume_amount,
'discount_manual': adjust_amount,
'discount_rounding': rounding_amount,
'discount_other': other_discount,
# 确认收入
'confirmed_income': confirmed_income,
# 现金流
'cash_inflow_total': cash_inflow_total,
'cash_pay_amount': cash_pay_amount,
'groupbuy_pay_amount': groupbuy_pay_amount,
'platform_settlement_amount': platform_settlement_amount,
'platform_fee_amount': platform_fee_amount,
'recharge_cash_inflow': recharge_cash_inflow,
'card_consume_total': card_consume_total,
'cash_card_consume': cash_card_consume,
'gift_card_consume': gift_card_consume,
'cash_outflow_total': cash_outflow_total,
'cash_balance_change': cash_balance_change,
# 充值统计
'recharge_count': recharge_count,
'recharge_total': recharge_total,
'recharge_cash': recharge_cash,
'recharge_gift': recharge_gift,
'first_recharge_count': first_recharge_count,
'first_recharge_amount': first_recharge_amount,
'renewal_count': renewal_count,
'renewal_amount': renewal_amount,
# 订单统计
'order_count': order_count,
'member_order_count': member_order_count,
'guest_order_count': guest_order_count,
'avg_order_amount': avg_order_amount,
}
# 便于外部导入
__all__ = ['FinanceDailyTask']