Files
feiqiu-ETL/etl_billiards/tasks/dws/finance_income_task.py
2026-02-04 21:39:01 +08:00

438 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
收入结构分析任务
功能说明:
"日期+区域/类型"为粒度,分析收入结构
数据来源:
- dwd_settlement_head: 结账单头表(台费、商品、助教正价)
- dwd_table_fee_log: 台费流水(区域关联)
- dwd_assistant_service_log: 助教服务流水(区域关联)
- cfg_area_category: 区域分类映射
目标表:
billiards_dws.dws_finance_income_structure
更新策略:
- 更新频率:每日更新
- 幂等方式delete-before-insert按日期+类型)
业务规则:
- 结构类型1INCOME_TYPE按收入类型分析台费/商品/助教基础课/助教附加课)
- 结构类型2AREA按区域分析普通台球区/VIP包厢/斯诺克/麻将/KTV等
- 区域映射使用cfg_area_category配置
作者ETL团队
创建日期2026-02-01
"""
from __future__ import annotations
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from .base_dws_task import BaseDwsTask, TaskContext
class FinanceIncomeStructureTask(BaseDwsTask):
"""
收入结构分析任务
分析收入的两种维度:
1. INCOME_TYPE: 按收入类型(台费/商品/助教基础课/助教附加课)
2. AREA: 按区域使用cfg_area_category映射
"""
def get_task_code(self) -> str:
return "DWS_FINANCE_INCOME_STRUCTURE"
def get_target_table(self) -> str:
return "dws_finance_income_structure"
def get_primary_keys(self) -> List[str]:
return ["site_id", "stat_date", "structure_type", "category_code"]
def extract(self, context: TaskContext) -> Dict[str, Any]:
"""
抽取数据
分两条路径抽取:
1. 按收入类型汇总来自settlement_head
2. 按区域汇总来自table_fee_log和assistant_service_log
"""
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
site_id = context.store_id
# 按收入类型汇总
income_by_type = self._extract_income_by_type(site_id, start_date, end_date)
# 按区域汇总
income_by_area = self._extract_income_by_area(site_id, start_date, end_date)
return {
'income_by_type': income_by_type,
'income_by_area': income_by_area,
}
def _extract_income_by_type(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
按收入类型汇总
收入类型分类:
- TABLE_FEE: 台费收入 (table_charge_money)
- GOODS: 商品收入 (goods_money)
- ASSISTANT_BASE: 助教基础课 (assistant_pd_money)
- ASSISTANT_BONUS: 助教附加课 (assistant_cx_money)
"""
sql = """
SELECT
pay_time::DATE AS stat_date,
-- 台费收入
COALESCE(SUM(table_charge_money), 0) AS table_fee_income,
COUNT(CASE WHEN table_charge_money > 0 THEN 1 END) AS table_fee_orders,
-- 商品收入
COALESCE(SUM(goods_money), 0) AS goods_income,
COUNT(CASE WHEN goods_money > 0 THEN 1 END) AS goods_orders,
-- 助教基础课收入PD=陪打)
COALESCE(SUM(assistant_pd_money), 0) AS assistant_base_income,
COUNT(CASE WHEN assistant_pd_money > 0 THEN 1 END) AS assistant_base_orders,
-- 助教附加课收入CX=超休/促销)
COALESCE(SUM(assistant_cx_money), 0) AS assistant_bonus_income,
COUNT(CASE WHEN assistant_cx_money > 0 THEN 1 END) AS assistant_bonus_orders,
-- 总订单数
COUNT(*) AS total_orders
FROM billiards_dwd.dwd_settlement_head
WHERE site_id = %(site_id)s
AND pay_time >= %(start_date)s
AND pay_time < %(end_date)s + INTERVAL '1 day'
AND settle_status = 1 -- 已结账
GROUP BY pay_time::DATE
ORDER BY stat_date
"""
rows = self.db.query(sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
return [dict(row) for row in rows] if rows else []
def _extract_income_by_area(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
按区域汇总收入
关联dim_table获取区域名称再映射到cfg_area_category
"""
sql = """
WITH
-- 台费按区域汇总
table_fee_by_area AS (
SELECT
tfl.pay_time::DATE AS stat_date,
dt.site_table_area_name AS area_name,
COALESCE(SUM(tfl.ledger_amount), 0) AS income_amount,
COALESCE(SUM(tfl.ledger_time_seconds), 0) AS duration_seconds,
COUNT(DISTINCT tfl.order_settle_id) AS order_count
FROM billiards_dwd.dwd_table_fee_log tfl
LEFT JOIN billiards_dwd.dim_table dt
ON dt.site_table_id = tfl.site_table_id
WHERE tfl.site_id = %(site_id)s
AND tfl.pay_time >= %(start_date)s
AND tfl.pay_time < %(end_date)s + INTERVAL '1 day'
GROUP BY tfl.pay_time::DATE, dt.site_table_area_name
),
-- 助教服务按区域汇总
assistant_by_area AS (
SELECT
asl.start_use_time::DATE AS stat_date,
dt.site_table_area_name AS area_name,
COALESCE(SUM(asl.ledger_amount), 0) AS income_amount,
COALESCE(SUM(asl.income_seconds), 0) AS duration_seconds,
COUNT(DISTINCT asl.order_settle_id) AS order_count
FROM billiards_dwd.dwd_assistant_service_log asl
LEFT JOIN billiards_dwd.dim_table dt
ON dt.site_table_id = asl.site_table_id
WHERE asl.site_id = %(site_id)s
AND asl.start_use_time >= %(start_date)s
AND asl.start_use_time < %(end_date)s + INTERVAL '1 day'
GROUP BY asl.start_use_time::DATE, dt.site_table_area_name
)
-- 合并台费和助教服务
SELECT
COALESCE(t.stat_date, a.stat_date) AS stat_date,
COALESCE(t.area_name, a.area_name) AS area_name,
COALESCE(t.income_amount, 0) + COALESCE(a.income_amount, 0) AS income_amount,
COALESCE(t.duration_seconds, 0) + COALESCE(a.duration_seconds, 0) AS duration_seconds,
GREATEST(COALESCE(t.order_count, 0), COALESCE(a.order_count, 0)) AS order_count
FROM table_fee_by_area t
FULL OUTER JOIN assistant_by_area a
ON t.stat_date = a.stat_date AND t.area_name = a.area_name
ORDER BY stat_date, area_name
"""
rows = self.db.query(sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
return [dict(row) for row in rows] if rows else []
def transform(self, data: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
"""
转换数据
将抽取的数据转换为目标表格式:
1. 按收入类型展开(每种类型一条记录)
2. 按区域展开(每个区域一条记录)
3. 计算占比
"""
site_id = context.store_id
tenant_id = self.config.get("app.tenant_id", site_id)
records = []
# 处理按收入类型的数据
income_type_records = self._transform_income_by_type(
data.get('income_by_type', []),
site_id,
tenant_id
)
records.extend(income_type_records)
# 处理按区域的数据
area_records = self._transform_income_by_area(
data.get('income_by_area', []),
site_id,
tenant_id
)
records.extend(area_records)
return records
def _transform_income_by_type(
self,
income_data: List[Dict[str, Any]],
site_id: int,
tenant_id: int
) -> List[Dict[str, Any]]:
"""
转换按收入类型的数据
将每日汇总数据展开为4条记录台费/商品/基础课/附加课)
"""
# 收入类型定义
income_types = [
('TABLE_FEE', '台费收入', 'table_fee_income', 'table_fee_orders'),
('GOODS', '商品收入', 'goods_income', 'goods_orders'),
('ASSISTANT_BASE', '助教基础课', 'assistant_base_income', 'assistant_base_orders'),
('ASSISTANT_BONUS', '助教附加课', 'assistant_bonus_income', 'assistant_bonus_orders'),
]
records = []
for daily_data in income_data:
stat_date = daily_data.get('stat_date')
# 计算当日总收入(用于计算占比)
total_income = sum(
self.safe_decimal(daily_data.get(field, 0))
for _, _, field, _ in income_types
)
# 为每种收入类型生成一条记录
for type_code, type_name, income_field, order_field in income_types:
income_amount = self.safe_decimal(daily_data.get(income_field, 0))
order_count = daily_data.get(order_field, 0) or 0
# 计算占比(避免除零)
income_ratio = (income_amount / total_income) if total_income > 0 else Decimal('0')
records.append({
'site_id': site_id,
'tenant_id': tenant_id,
'stat_date': stat_date,
'structure_type': 'INCOME_TYPE',
'category_code': type_code,
'category_name': type_name,
'income_amount': income_amount,
'income_ratio': round(income_ratio, 4),
'order_count': order_count,
'duration_minutes': 0, # 收入类型维度不统计时长
})
return records
def _transform_income_by_area(
self,
area_data: List[Dict[str, Any]],
site_id: int,
tenant_id: int
) -> List[Dict[str, Any]]:
"""
转换按区域的数据
将区域名称映射到cfg_area_category的category_code
"""
records = []
# 加载区域分类配置
area_categories = self._get_config_cache().get('area_categories', {})
# 按日期分组计算总收入(用于计算占比)
daily_totals = {}
for row in area_data:
stat_date = row.get('stat_date')
income = self.safe_decimal(row.get('income_amount', 0))
daily_totals[stat_date] = daily_totals.get(stat_date, Decimal('0')) + income
# 按日期+区域聚合相同category_code需要合并
aggregated = {}
for row in area_data:
stat_date = row.get('stat_date')
area_name = row.get('area_name') or '未知区域'
income_amount = self.safe_decimal(row.get('income_amount', 0))
duration_seconds = row.get('duration_seconds', 0) or 0
order_count = row.get('order_count', 0) or 0
# 映射区域名称到分类代码
category = self._map_area_to_category(area_name, area_categories)
category_code = category.get('category_code', 'OTHER')
category_name = category.get('category_name', '其他区域')
# 聚合键
key = (stat_date, category_code)
if key not in aggregated:
aggregated[key] = {
'stat_date': stat_date,
'category_code': category_code,
'category_name': category_name,
'income_amount': Decimal('0'),
'duration_seconds': 0,
'order_count': 0,
}
aggregated[key]['income_amount'] += income_amount
aggregated[key]['duration_seconds'] += duration_seconds
aggregated[key]['order_count'] += order_count
# 生成记录
for key, agg_data in aggregated.items():
stat_date = agg_data['stat_date']
total_income = daily_totals.get(stat_date, Decimal('1'))
income_amount = agg_data['income_amount']
# 计算占比
income_ratio = (income_amount / total_income) if total_income > 0 else Decimal('0')
records.append({
'site_id': site_id,
'tenant_id': tenant_id,
'stat_date': stat_date,
'structure_type': 'AREA',
'category_code': agg_data['category_code'],
'category_name': agg_data['category_name'],
'income_amount': income_amount,
'income_ratio': round(income_ratio, 4),
'order_count': agg_data['order_count'],
'duration_minutes': agg_data['duration_seconds'] // 60,
})
return records
def _map_area_to_category(
self,
area_name: str,
area_categories: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""
将区域名称映射到分类
匹配规则:
1. 精确匹配 match_pattern
2. 模糊匹配LIKE
3. 默认返回 OTHER
"""
if not area_name:
return {'category_code': 'OTHER', 'category_name': '其他区域'}
# 遍历配置查找匹配
for pattern, category in area_categories.items():
match_type = category.get('match_type', 'exact')
if match_type == 'exact':
if area_name == pattern:
return category
elif match_type == 'like':
# 简单的模糊匹配(包含关系)
if pattern.replace('%', '') in area_name:
return category
# 默认分类
return {'category_code': 'OTHER', 'category_name': '其他区域'}
def load(self, records: List[Dict[str, Any]], context: TaskContext) -> Dict[str, Any]:
"""
加载数据到目标表
使用幂等方式delete-before-insert按日期范围
"""
if not records:
return {'inserted': 0, 'deleted': 0}
site_id = context.store_id
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
# 删除窗口内的旧数据
delete_sql = """
DELETE FROM billiards_dws.dws_finance_income_structure
WHERE site_id = %(site_id)s
AND stat_date >= %(start_date)s
AND stat_date <= %(end_date)s
"""
deleted = self.db.execute(delete_sql, {
'site_id': site_id,
'start_date': start_date,
'end_date': end_date,
})
# 批量插入新数据
insert_sql = """
INSERT INTO billiards_dws.dws_finance_income_structure (
site_id, tenant_id, stat_date,
structure_type, category_code, category_name,
income_amount, income_ratio,
order_count, duration_minutes,
created_at, updated_at
) VALUES (
%(site_id)s, %(tenant_id)s, %(stat_date)s,
%(structure_type)s, %(category_code)s, %(category_name)s,
%(income_amount)s, %(income_ratio)s,
%(order_count)s, %(duration_minutes)s,
NOW(), NOW()
)
"""
inserted = 0
for record in records:
self.db.execute(insert_sql, record)
inserted += 1
return {
'deleted': deleted or 0,
'inserted': inserted,
}