413 lines
15 KiB
Python
413 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
收入结构分析任务
|
||
|
||
功能说明:
|
||
以"日期+区域/类型"为粒度,分析收入结构
|
||
|
||
数据来源:
|
||
- dwd_settlement_head: 结账单头表(台费、商品、助教正价)
|
||
- dwd_table_fee_log: 台费流水(区域关联)
|
||
- dwd_assistant_service_log: 助教服务流水(区域关联)
|
||
- cfg_area_category: 区域分类映射
|
||
|
||
目标表:
|
||
billiards_dws.dws_finance_income_structure
|
||
|
||
更新策略:
|
||
- 更新频率:每日更新
|
||
- 幂等方式:delete-before-insert(按日期+类型)
|
||
|
||
业务规则:
|
||
- 结构类型1(INCOME_TYPE):按收入类型分析(台费/商品/助教基础课/助教附加课)
|
||
- 结构类型2(AREA):按区域分析(普通台球区/VIP包厢/斯诺克/麻将/KTV等)
|
||
- 区域映射使用cfg_area_category配置
|
||
|
||
作者:ETL团队
|
||
创建日期:2026-02-01
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import date, datetime, timedelta
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from .base_dws_task import BaseDwsTask, TaskContext
|
||
|
||
|
||
class FinanceIncomeStructureTask(BaseDwsTask):
|
||
"""
|
||
收入结构分析任务
|
||
|
||
分析收入的两种维度:
|
||
1. INCOME_TYPE: 按收入类型(台费/商品/助教基础课/助教附加课)
|
||
2. AREA: 按区域(使用cfg_area_category映射)
|
||
"""
|
||
|
||
def get_task_code(self) -> str:
|
||
return "DWS_FINANCE_INCOME_STRUCTURE"
|
||
|
||
def get_target_table(self) -> str:
|
||
return "dws_finance_income_structure"
|
||
|
||
def get_primary_keys(self) -> List[str]:
|
||
return ["site_id", "stat_date", "structure_type", "category_code"]
|
||
|
||
def extract(self, context: TaskContext) -> Dict[str, Any]:
|
||
"""
|
||
抽取数据
|
||
|
||
分两条路径抽取:
|
||
1. 按收入类型汇总(来自settlement_head)
|
||
2. 按区域汇总(来自table_fee_log和assistant_service_log)
|
||
"""
|
||
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
|
||
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
|
||
site_id = context.store_id
|
||
|
||
# 按收入类型汇总
|
||
income_by_type = self._extract_income_by_type(site_id, start_date, end_date)
|
||
|
||
# 按区域汇总
|
||
income_by_area = self._extract_income_by_area(site_id, start_date, end_date)
|
||
|
||
return {
|
||
'income_by_type': income_by_type,
|
||
'income_by_area': income_by_area,
|
||
}
|
||
|
||
def _extract_income_by_type(
|
||
self,
|
||
site_id: int,
|
||
start_date: date,
|
||
end_date: date
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
按收入类型汇总
|
||
|
||
收入类型分类:
|
||
- TABLE_FEE: 台费收入 (table_charge_money)
|
||
- GOODS: 商品收入 (goods_money)
|
||
- ASSISTANT_BASE: 助教基础课 (assistant_pd_money)
|
||
- ASSISTANT_BONUS: 助教附加课 (assistant_cx_money)
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
pay_time::DATE AS stat_date,
|
||
-- 台费收入
|
||
COALESCE(SUM(table_charge_money), 0) AS table_fee_income,
|
||
COUNT(CASE WHEN table_charge_money > 0 THEN 1 END) AS table_fee_orders,
|
||
-- 商品收入
|
||
COALESCE(SUM(goods_money), 0) AS goods_income,
|
||
COUNT(CASE WHEN goods_money > 0 THEN 1 END) AS goods_orders,
|
||
-- 助教基础课收入(PD=陪打)
|
||
COALESCE(SUM(assistant_pd_money), 0) AS assistant_base_income,
|
||
COUNT(CASE WHEN assistant_pd_money > 0 THEN 1 END) AS assistant_base_orders,
|
||
-- 助教附加课收入(CX=超休/促销)
|
||
COALESCE(SUM(assistant_cx_money), 0) AS assistant_bonus_income,
|
||
COUNT(CASE WHEN assistant_cx_money > 0 THEN 1 END) AS assistant_bonus_orders,
|
||
-- 总订单数
|
||
COUNT(*) AS total_orders
|
||
FROM billiards_dwd.dwd_settlement_head
|
||
WHERE site_id = %(site_id)s
|
||
AND pay_time >= %(start_date)s
|
||
AND pay_time < %(end_date)s + INTERVAL '1 day'
|
||
AND settle_status = 1 -- 已结账
|
||
GROUP BY pay_time::DATE
|
||
ORDER BY stat_date
|
||
"""
|
||
rows = self.db.query(sql, {
|
||
'site_id': site_id,
|
||
'start_date': start_date,
|
||
'end_date': end_date,
|
||
})
|
||
return [dict(row) for row in rows] if rows else []
|
||
|
||
def _extract_income_by_area(
|
||
self,
|
||
site_id: int,
|
||
start_date: date,
|
||
end_date: date
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
按区域汇总收入
|
||
|
||
关联dim_table获取区域名称,再映射到cfg_area_category
|
||
"""
|
||
sql = """
|
||
WITH area_orders AS (
|
||
SELECT
|
||
tfl.pay_time::DATE AS stat_date,
|
||
dt.site_table_area_name AS area_name,
|
||
tfl.order_settle_id,
|
||
COALESCE(tfl.ledger_amount, 0) AS income_amount,
|
||
COALESCE(tfl.ledger_time_seconds, 0) AS duration_seconds
|
||
FROM billiards_dwd.dwd_table_fee_log tfl
|
||
LEFT JOIN billiards_dwd.dim_table dt
|
||
ON dt.site_table_id = tfl.site_table_id
|
||
WHERE tfl.site_id = %(site_id)s
|
||
AND tfl.pay_time >= %(start_date)s
|
||
AND tfl.pay_time < %(end_date)s + INTERVAL '1 day'
|
||
AND COALESCE(tfl.is_delete, 0) = 0
|
||
|
||
UNION ALL
|
||
|
||
SELECT
|
||
asl.start_use_time::DATE AS stat_date,
|
||
dt.site_table_area_name AS area_name,
|
||
asl.order_settle_id,
|
||
COALESCE(asl.ledger_amount, 0) AS income_amount,
|
||
COALESCE(asl.income_seconds, 0) AS duration_seconds
|
||
FROM billiards_dwd.dwd_assistant_service_log asl
|
||
LEFT JOIN billiards_dwd.dim_table dt
|
||
ON dt.site_table_id = asl.site_table_id
|
||
WHERE asl.site_id = %(site_id)s
|
||
AND asl.start_use_time >= %(start_date)s
|
||
AND asl.start_use_time < %(end_date)s + INTERVAL '1 day'
|
||
AND asl.is_delete = 0
|
||
)
|
||
SELECT
|
||
stat_date,
|
||
area_name,
|
||
COALESCE(SUM(income_amount), 0) AS income_amount,
|
||
COALESCE(SUM(duration_seconds), 0) AS duration_seconds,
|
||
COUNT(DISTINCT order_settle_id) AS order_count
|
||
FROM area_orders
|
||
GROUP BY stat_date, area_name
|
||
ORDER BY stat_date, area_name
|
||
"""
|
||
rows = self.db.query(sql, {
|
||
'site_id': site_id,
|
||
'start_date': start_date,
|
||
'end_date': end_date,
|
||
})
|
||
return [dict(row) for row in rows] if rows else []
|
||
|
||
def transform(self, data: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
|
||
"""
|
||
转换数据
|
||
|
||
将抽取的数据转换为目标表格式:
|
||
1. 按收入类型展开(每种类型一条记录)
|
||
2. 按区域展开(每个区域一条记录)
|
||
3. 计算占比
|
||
"""
|
||
site_id = context.store_id
|
||
tenant_id = self.config.get("app.tenant_id", site_id)
|
||
|
||
records = []
|
||
|
||
# 处理按收入类型的数据
|
||
income_type_records = self._transform_income_by_type(
|
||
data.get('income_by_type', []),
|
||
site_id,
|
||
tenant_id
|
||
)
|
||
records.extend(income_type_records)
|
||
|
||
# 处理按区域的数据
|
||
area_records = self._transform_income_by_area(
|
||
data.get('income_by_area', []),
|
||
site_id,
|
||
tenant_id
|
||
)
|
||
records.extend(area_records)
|
||
|
||
return records
|
||
|
||
def _transform_income_by_type(
|
||
self,
|
||
income_data: List[Dict[str, Any]],
|
||
site_id: int,
|
||
tenant_id: int
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
转换按收入类型的数据
|
||
|
||
将每日汇总数据展开为4条记录(台费/商品/基础课/附加课)
|
||
"""
|
||
# 收入类型定义
|
||
income_types = [
|
||
('TABLE_FEE', '台费收入', 'table_fee_income', 'table_fee_orders'),
|
||
('GOODS', '商品收入', 'goods_income', 'goods_orders'),
|
||
('ASSISTANT_BASE', '助教基础课', 'assistant_base_income', 'assistant_base_orders'),
|
||
('ASSISTANT_BONUS', '助教附加课', 'assistant_bonus_income', 'assistant_bonus_orders'),
|
||
]
|
||
|
||
records = []
|
||
|
||
for daily_data in income_data:
|
||
stat_date = daily_data.get('stat_date')
|
||
|
||
# 计算当日总收入(用于计算占比)
|
||
total_income = sum(
|
||
self.safe_decimal(daily_data.get(field, 0))
|
||
for _, _, field, _ in income_types
|
||
)
|
||
|
||
# 为每种收入类型生成一条记录
|
||
for type_code, type_name, income_field, order_field in income_types:
|
||
income_amount = self.safe_decimal(daily_data.get(income_field, 0))
|
||
order_count = daily_data.get(order_field, 0) or 0
|
||
|
||
# 计算占比(避免除零)
|
||
income_ratio = (income_amount / total_income) if total_income > 0 else Decimal('0')
|
||
|
||
records.append({
|
||
'site_id': site_id,
|
||
'tenant_id': tenant_id,
|
||
'stat_date': stat_date,
|
||
'structure_type': 'INCOME_TYPE',
|
||
'category_code': type_code,
|
||
'category_name': type_name,
|
||
'income_amount': income_amount,
|
||
'income_ratio': round(income_ratio, 4),
|
||
'order_count': order_count,
|
||
'duration_minutes': 0, # 收入类型维度不统计时长
|
||
})
|
||
|
||
return records
|
||
|
||
def _transform_income_by_area(
|
||
self,
|
||
area_data: List[Dict[str, Any]],
|
||
site_id: int,
|
||
tenant_id: int
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
转换按区域的数据
|
||
|
||
将区域名称映射到cfg_area_category的category_code
|
||
"""
|
||
records = []
|
||
|
||
# 加载区域分类配置
|
||
self.load_config_cache()
|
||
|
||
# 按日期分组计算总收入(用于计算占比)
|
||
daily_totals = {}
|
||
for row in area_data:
|
||
stat_date = row.get('stat_date')
|
||
income = self.safe_decimal(row.get('income_amount', 0))
|
||
daily_totals[stat_date] = daily_totals.get(stat_date, Decimal('0')) + income
|
||
|
||
# 按日期+区域聚合(相同category_code需要合并)
|
||
aggregated = {}
|
||
|
||
for row in area_data:
|
||
stat_date = row.get('stat_date')
|
||
area_name = row.get('area_name') or '未知区域'
|
||
income_amount = self.safe_decimal(row.get('income_amount', 0))
|
||
duration_seconds = row.get('duration_seconds', 0) or 0
|
||
order_count = row.get('order_count', 0) or 0
|
||
|
||
# 映射区域名称到分类代码
|
||
category = self.get_area_category(area_name)
|
||
category_code = category.get('category_code', 'OTHER')
|
||
category_name = category.get('category_name', '其他区域')
|
||
|
||
# 聚合键
|
||
key = (stat_date, category_code)
|
||
|
||
if key not in aggregated:
|
||
aggregated[key] = {
|
||
'stat_date': stat_date,
|
||
'category_code': category_code,
|
||
'category_name': category_name,
|
||
'income_amount': Decimal('0'),
|
||
'duration_seconds': 0,
|
||
'order_count': 0,
|
||
}
|
||
|
||
aggregated[key]['income_amount'] += income_amount
|
||
aggregated[key]['duration_seconds'] += duration_seconds
|
||
aggregated[key]['order_count'] += order_count
|
||
|
||
# 生成记录
|
||
for key, agg_data in aggregated.items():
|
||
stat_date = agg_data['stat_date']
|
||
total_income = daily_totals.get(stat_date, Decimal('1'))
|
||
income_amount = agg_data['income_amount']
|
||
|
||
# 计算占比
|
||
income_ratio = (income_amount / total_income) if total_income > 0 else Decimal('0')
|
||
|
||
records.append({
|
||
'site_id': site_id,
|
||
'tenant_id': tenant_id,
|
||
'stat_date': stat_date,
|
||
'structure_type': 'AREA',
|
||
'category_code': agg_data['category_code'],
|
||
'category_name': agg_data['category_name'],
|
||
'income_amount': income_amount,
|
||
'income_ratio': round(income_ratio, 4),
|
||
'order_count': agg_data['order_count'],
|
||
'duration_minutes': agg_data['duration_seconds'] // 60,
|
||
})
|
||
|
||
return records
|
||
|
||
def _map_area_to_category(
|
||
self,
|
||
area_name: str,
|
||
area_categories: Dict[str, Dict[str, Any]]
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
兼容旧逻辑的映射方法(当前使用 get_area_category)
|
||
"""
|
||
return self.get_area_category(area_name)
|
||
|
||
def load(self, records: List[Dict[str, Any]], context: TaskContext) -> Dict[str, Any]:
|
||
"""
|
||
加载数据到目标表
|
||
|
||
使用幂等方式:delete-before-insert(按日期范围)
|
||
"""
|
||
if not records:
|
||
return {'inserted': 0, 'deleted': 0}
|
||
|
||
site_id = context.store_id
|
||
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
|
||
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
|
||
|
||
# 删除窗口内的旧数据
|
||
delete_sql = """
|
||
DELETE FROM billiards_dws.dws_finance_income_structure
|
||
WHERE site_id = %(site_id)s
|
||
AND stat_date >= %(start_date)s
|
||
AND stat_date <= %(end_date)s
|
||
"""
|
||
deleted = self.db.execute(delete_sql, {
|
||
'site_id': site_id,
|
||
'start_date': start_date,
|
||
'end_date': end_date,
|
||
})
|
||
|
||
# 批量插入新数据
|
||
insert_sql = """
|
||
INSERT INTO billiards_dws.dws_finance_income_structure (
|
||
site_id, tenant_id, stat_date,
|
||
structure_type, category_code, category_name,
|
||
income_amount, income_ratio,
|
||
order_count, duration_minutes,
|
||
created_at, updated_at
|
||
) VALUES (
|
||
%(site_id)s, %(tenant_id)s, %(stat_date)s,
|
||
%(structure_type)s, %(category_code)s, %(category_name)s,
|
||
%(income_amount)s, %(income_ratio)s,
|
||
%(order_count)s, %(duration_minutes)s,
|
||
NOW(), NOW()
|
||
)
|
||
"""
|
||
|
||
inserted = 0
|
||
for record in records:
|
||
self.db.execute(insert_sql, record)
|
||
inserted += 1
|
||
|
||
return {
|
||
'deleted': deleted or 0,
|
||
'inserted': inserted,
|
||
}
|