# -*- coding: utf-8 -*- """ 收入结构分析任务 功能说明: 以"日期+区域/类型"为粒度,分析收入结构 数据来源: - dwd_settlement_head: 结账单头表(台费、商品、助教正价) - dwd_table_fee_log: 台费流水(区域关联) - dwd_assistant_service_log: 助教服务流水(区域关联) - cfg_area_category: 区域分类映射 目标表: billiards_dws.dws_finance_income_structure 更新策略: - 更新频率:每日更新 - 幂等方式:delete-before-insert(按日期+类型) 业务规则: - 结构类型1(INCOME_TYPE):按收入类型分析(台费/商品/助教基础课/助教附加课) - 结构类型2(AREA):按区域分析(普通台球区/VIP包厢/斯诺克/麻将/KTV等) - 区域映射使用cfg_area_category配置 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple from .base_dws_task import BaseDwsTask, TaskContext class FinanceIncomeStructureTask(BaseDwsTask): """ 收入结构分析任务 分析收入的两种维度: 1. INCOME_TYPE: 按收入类型(台费/商品/助教基础课/助教附加课) 2. AREA: 按区域(使用cfg_area_category映射) """ def get_task_code(self) -> str: return "DWS_FINANCE_INCOME_STRUCTURE" def get_target_table(self) -> str: return "dws_finance_income_structure" def get_primary_keys(self) -> List[str]: return ["site_id", "stat_date", "structure_type", "category_code"] def extract(self, context: TaskContext) -> Dict[str, Any]: """ 抽取数据 分两条路径抽取: 1. 按收入类型汇总(来自settlement_head) 2. 按区域汇总(来自table_fee_log和assistant_service_log) """ start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end site_id = context.store_id # 按收入类型汇总 income_by_type = self._extract_income_by_type(site_id, start_date, end_date) # 按区域汇总 income_by_area = self._extract_income_by_area(site_id, start_date, end_date) return { 'income_by_type': income_by_type, 'income_by_area': income_by_area, } def _extract_income_by_type( self, site_id: int, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """ 按收入类型汇总 收入类型分类: - TABLE_FEE: 台费收入 (table_charge_money) - GOODS: 商品收入 (goods_money) - ASSISTANT_BASE: 助教基础课 (assistant_pd_money) - ASSISTANT_BONUS: 助教附加课 (assistant_cx_money) """ sql = """ SELECT pay_time::DATE AS stat_date, -- 台费收入 COALESCE(SUM(table_charge_money), 0) AS table_fee_income, COUNT(CASE WHEN table_charge_money > 0 THEN 1 END) AS table_fee_orders, -- 商品收入 COALESCE(SUM(goods_money), 0) AS goods_income, COUNT(CASE WHEN goods_money > 0 THEN 1 END) AS goods_orders, -- 助教基础课收入(PD=陪打) COALESCE(SUM(assistant_pd_money), 0) AS assistant_base_income, COUNT(CASE WHEN assistant_pd_money > 0 THEN 1 END) AS assistant_base_orders, -- 助教附加课收入(CX=超休/促销) COALESCE(SUM(assistant_cx_money), 0) AS assistant_bonus_income, COUNT(CASE WHEN assistant_cx_money > 0 THEN 1 END) AS assistant_bonus_orders, -- 总订单数 COUNT(*) AS total_orders FROM billiards_dwd.dwd_settlement_head WHERE site_id = %(site_id)s AND pay_time >= %(start_date)s AND pay_time < %(end_date)s + INTERVAL '1 day' AND settle_status = 1 -- 已结账 GROUP BY pay_time::DATE ORDER BY stat_date """ rows = self.db.query(sql, { 'site_id': site_id, 'start_date': start_date, 'end_date': end_date, }) return [dict(row) for row in rows] if rows else [] def _extract_income_by_area( self, site_id: int, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """ 按区域汇总收入 关联dim_table获取区域名称,再映射到cfg_area_category """ sql = """ WITH area_orders AS ( SELECT tfl.pay_time::DATE AS stat_date, dt.site_table_area_name AS area_name, tfl.order_settle_id, COALESCE(tfl.ledger_amount, 0) AS income_amount, COALESCE(tfl.ledger_time_seconds, 0) AS duration_seconds FROM billiards_dwd.dwd_table_fee_log tfl LEFT JOIN billiards_dwd.dim_table dt ON dt.site_table_id = tfl.site_table_id WHERE tfl.site_id = %(site_id)s AND tfl.pay_time >= %(start_date)s AND tfl.pay_time < %(end_date)s + INTERVAL '1 day' AND COALESCE(tfl.is_delete, 0) = 0 UNION ALL SELECT asl.start_use_time::DATE AS stat_date, dt.site_table_area_name AS area_name, asl.order_settle_id, COALESCE(asl.ledger_amount, 0) AS income_amount, COALESCE(asl.income_seconds, 0) AS duration_seconds FROM billiards_dwd.dwd_assistant_service_log asl LEFT JOIN billiards_dwd.dim_table dt ON dt.site_table_id = asl.site_table_id WHERE asl.site_id = %(site_id)s AND asl.start_use_time >= %(start_date)s AND asl.start_use_time < %(end_date)s + INTERVAL '1 day' AND asl.is_delete = 0 ) SELECT stat_date, area_name, COALESCE(SUM(income_amount), 0) AS income_amount, COALESCE(SUM(duration_seconds), 0) AS duration_seconds, COUNT(DISTINCT order_settle_id) AS order_count FROM area_orders GROUP BY stat_date, area_name ORDER BY stat_date, area_name """ rows = self.db.query(sql, { 'site_id': site_id, 'start_date': start_date, 'end_date': end_date, }) return [dict(row) for row in rows] if rows else [] def transform(self, data: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]: """ 转换数据 将抽取的数据转换为目标表格式: 1. 按收入类型展开(每种类型一条记录) 2. 按区域展开(每个区域一条记录) 3. 计算占比 """ site_id = context.store_id tenant_id = self.config.get("app.tenant_id", site_id) records = [] # 处理按收入类型的数据 income_type_records = self._transform_income_by_type( data.get('income_by_type', []), site_id, tenant_id ) records.extend(income_type_records) # 处理按区域的数据 area_records = self._transform_income_by_area( data.get('income_by_area', []), site_id, tenant_id ) records.extend(area_records) return records def _transform_income_by_type( self, income_data: List[Dict[str, Any]], site_id: int, tenant_id: int ) -> List[Dict[str, Any]]: """ 转换按收入类型的数据 将每日汇总数据展开为4条记录(台费/商品/基础课/附加课) """ # 收入类型定义 income_types = [ ('TABLE_FEE', '台费收入', 'table_fee_income', 'table_fee_orders'), ('GOODS', '商品收入', 'goods_income', 'goods_orders'), ('ASSISTANT_BASE', '助教基础课', 'assistant_base_income', 'assistant_base_orders'), ('ASSISTANT_BONUS', '助教附加课', 'assistant_bonus_income', 'assistant_bonus_orders'), ] records = [] for daily_data in income_data: stat_date = daily_data.get('stat_date') # 计算当日总收入(用于计算占比) total_income = sum( self.safe_decimal(daily_data.get(field, 0)) for _, _, field, _ in income_types ) # 为每种收入类型生成一条记录 for type_code, type_name, income_field, order_field in income_types: income_amount = self.safe_decimal(daily_data.get(income_field, 0)) order_count = daily_data.get(order_field, 0) or 0 # 计算占比(避免除零) income_ratio = (income_amount / total_income) if total_income > 0 else Decimal('0') records.append({ 'site_id': site_id, 'tenant_id': tenant_id, 'stat_date': stat_date, 'structure_type': 'INCOME_TYPE', 'category_code': type_code, 'category_name': type_name, 'income_amount': income_amount, 'income_ratio': round(income_ratio, 4), 'order_count': order_count, 'duration_minutes': 0, # 收入类型维度不统计时长 }) return records def _transform_income_by_area( self, area_data: List[Dict[str, Any]], site_id: int, tenant_id: int ) -> List[Dict[str, Any]]: """ 转换按区域的数据 将区域名称映射到cfg_area_category的category_code """ records = [] # 加载区域分类配置 self.load_config_cache() # 按日期分组计算总收入(用于计算占比) daily_totals = {} for row in area_data: stat_date = row.get('stat_date') income = self.safe_decimal(row.get('income_amount', 0)) daily_totals[stat_date] = daily_totals.get(stat_date, Decimal('0')) + income # 按日期+区域聚合(相同category_code需要合并) aggregated = {} for row in area_data: stat_date = row.get('stat_date') area_name = row.get('area_name') or '未知区域' income_amount = self.safe_decimal(row.get('income_amount', 0)) duration_seconds = row.get('duration_seconds', 0) or 0 order_count = row.get('order_count', 0) or 0 # 映射区域名称到分类代码 category = self.get_area_category(area_name) category_code = category.get('category_code', 'OTHER') category_name = category.get('category_name', '其他区域') # 聚合键 key = (stat_date, category_code) if key not in aggregated: aggregated[key] = { 'stat_date': stat_date, 'category_code': category_code, 'category_name': category_name, 'income_amount': Decimal('0'), 'duration_seconds': 0, 'order_count': 0, } aggregated[key]['income_amount'] += income_amount aggregated[key]['duration_seconds'] += duration_seconds aggregated[key]['order_count'] += order_count # 生成记录 for key, agg_data in aggregated.items(): stat_date = agg_data['stat_date'] total_income = daily_totals.get(stat_date, Decimal('1')) income_amount = agg_data['income_amount'] # 计算占比 income_ratio = (income_amount / total_income) if total_income > 0 else Decimal('0') records.append({ 'site_id': site_id, 'tenant_id': tenant_id, 'stat_date': stat_date, 'structure_type': 'AREA', 'category_code': agg_data['category_code'], 'category_name': agg_data['category_name'], 'income_amount': income_amount, 'income_ratio': round(income_ratio, 4), 'order_count': agg_data['order_count'], 'duration_minutes': agg_data['duration_seconds'] // 60, }) return records def _map_area_to_category( self, area_name: str, area_categories: Dict[str, Dict[str, Any]] ) -> Dict[str, Any]: """ 兼容旧逻辑的映射方法(当前使用 get_area_category) """ return self.get_area_category(area_name) def load(self, records: List[Dict[str, Any]], context: TaskContext) -> Dict[str, Any]: """ 加载数据到目标表 使用幂等方式:delete-before-insert(按日期范围) """ if not records: return {'inserted': 0, 'deleted': 0} site_id = context.store_id start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end # 删除窗口内的旧数据 delete_sql = """ DELETE FROM billiards_dws.dws_finance_income_structure WHERE site_id = %(site_id)s AND stat_date >= %(start_date)s AND stat_date <= %(end_date)s """ deleted = self.db.execute(delete_sql, { 'site_id': site_id, 'start_date': start_date, 'end_date': end_date, }) # 批量插入新数据 insert_sql = """ INSERT INTO billiards_dws.dws_finance_income_structure ( site_id, tenant_id, stat_date, structure_type, category_code, category_name, income_amount, income_ratio, order_count, duration_minutes, created_at, updated_at ) VALUES ( %(site_id)s, %(tenant_id)s, %(stat_date)s, %(structure_type)s, %(category_code)s, %(category_name)s, %(income_amount)s, %(income_ratio)s, %(order_count)s, %(duration_minutes)s, NOW(), NOW() ) """ inserted = 0 for record in records: self.db.execute(insert_sql, record) inserted += 1 return { 'deleted': deleted or 0, 'inserted': inserted, }