206 lines
8.4 KiB
Python
206 lines
8.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
助教收支分析任务
|
||
|
||
功能说明:
|
||
以"日期+助教"为粒度,分析助教产出的收入和成本
|
||
|
||
数据来源:
|
||
- dwd_assistant_service_log: 助教服务流水(收入)
|
||
- dws_assistant_salary_calc: 工资计算(成本)
|
||
|
||
目标表:
|
||
billiards_dws.dws_assistant_finance_analysis
|
||
|
||
更新策略:
|
||
- 更新频率:每日更新
|
||
- 幂等方式:delete-before-insert(按日期)
|
||
|
||
作者:ETL团队
|
||
创建日期:2026-02-01
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import date, datetime, timedelta
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from .base_dws_task import BaseDwsTask, CourseType, TaskContext
|
||
|
||
|
||
class AssistantFinanceTask(BaseDwsTask):
|
||
"""
|
||
助教收支分析任务
|
||
"""
|
||
|
||
def get_task_code(self) -> str:
|
||
return "DWS_ASSISTANT_FINANCE"
|
||
|
||
def get_target_table(self) -> str:
|
||
return "dws_assistant_finance_analysis"
|
||
|
||
def get_primary_keys(self) -> List[str]:
|
||
return ["site_id", "stat_date", "assistant_id"]
|
||
|
||
def extract(self, context: TaskContext) -> Dict[str, Any]:
|
||
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
|
||
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
|
||
site_id = context.store_id
|
||
|
||
# 获取助教日度收入
|
||
daily_revenue = self._extract_daily_revenue(site_id, start_date, end_date)
|
||
|
||
# 获取月度工资(用于计算日均成本)
|
||
monthly_salary = self._extract_monthly_salary(site_id, start_date, end_date)
|
||
|
||
# 加载配置
|
||
self.load_config_cache()
|
||
|
||
return {
|
||
'daily_revenue': daily_revenue,
|
||
'monthly_salary': monthly_salary,
|
||
'start_date': start_date,
|
||
'end_date': end_date,
|
||
'site_id': site_id
|
||
}
|
||
|
||
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
|
||
daily_revenue = extracted['daily_revenue']
|
||
monthly_salary = extracted['monthly_salary']
|
||
site_id = extracted['site_id']
|
||
|
||
# 构建月度工资索引
|
||
salary_index = {}
|
||
for sal in monthly_salary:
|
||
asst_id = sal.get('assistant_id')
|
||
month = sal.get('salary_month')
|
||
if asst_id and month:
|
||
salary_index[(asst_id, month)] = sal
|
||
|
||
results = []
|
||
for rev in daily_revenue:
|
||
assistant_id = rev.get('assistant_id')
|
||
stat_date = rev.get('stat_date')
|
||
|
||
# 获取对应月份的工资
|
||
month_start = stat_date.replace(day=1) if isinstance(stat_date, date) else None
|
||
salary = salary_index.get((assistant_id, month_start), {})
|
||
|
||
# 计算日均成本
|
||
gross_salary = self.safe_decimal(salary.get('gross_salary', 0))
|
||
work_days = self.safe_int(salary.get('work_days', 1)) or 1
|
||
cost_daily = gross_salary / Decimal(str(work_days))
|
||
|
||
revenue_total = self.safe_decimal(rev.get('revenue_total', 0))
|
||
gross_profit = revenue_total - cost_daily
|
||
gross_margin = gross_profit / revenue_total if revenue_total > 0 else Decimal('0')
|
||
|
||
record = {
|
||
'site_id': site_id,
|
||
'tenant_id': self.config.get("app.tenant_id", site_id),
|
||
'stat_date': stat_date,
|
||
'assistant_id': assistant_id,
|
||
'assistant_nickname': rev.get('assistant_nickname'),
|
||
'revenue_total': revenue_total,
|
||
'revenue_base': self.safe_decimal(rev.get('revenue_base', 0)),
|
||
'revenue_bonus': self.safe_decimal(rev.get('revenue_bonus', 0)),
|
||
'revenue_room': self.safe_decimal(rev.get('revenue_room', 0)),
|
||
'cost_daily': cost_daily,
|
||
'gross_profit': gross_profit,
|
||
'gross_margin': gross_margin,
|
||
'service_count': self.safe_int(rev.get('service_count', 0)),
|
||
'service_hours': self.safe_decimal(rev.get('service_hours', 0)),
|
||
'room_service_count': self.safe_int(rev.get('room_service_count', 0)),
|
||
'room_service_hours': self.safe_decimal(rev.get('room_service_hours', 0)),
|
||
'unique_customers': self.safe_int(rev.get('unique_customers', 0)),
|
||
}
|
||
results.append(record)
|
||
|
||
return results
|
||
|
||
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
|
||
if not transformed:
|
||
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
|
||
|
||
deleted = self.delete_existing_data(context, date_col="stat_date")
|
||
inserted = self.bulk_insert(transformed)
|
||
|
||
return {
|
||
"counts": {"fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0},
|
||
"extra": {"deleted": deleted}
|
||
}
|
||
|
||
def _extract_daily_revenue(self, site_id: int, start_date: date, end_date: date) -> List[Dict[str, Any]]:
|
||
sql = """
|
||
SELECT
|
||
DATE(s.start_use_time) AS stat_date,
|
||
s.site_assistant_id AS assistant_id,
|
||
MAX(s.nickname) AS assistant_nickname,
|
||
COUNT(*) AS service_count,
|
||
SUM(s.income_seconds) / 3600.0 AS service_hours,
|
||
SUM(s.ledger_amount) AS revenue_total,
|
||
SUM(CASE WHEN COALESCE(st.course_type_code, 'BASE') = 'BASE' THEN s.ledger_amount ELSE 0 END) AS revenue_base,
|
||
SUM(CASE WHEN COALESCE(st.course_type_code, 'BASE') = 'BONUS' THEN s.ledger_amount ELSE 0 END) AS revenue_bonus,
|
||
SUM(CASE WHEN COALESCE(st.course_type_code, 'BASE') = 'ROOM' THEN s.ledger_amount ELSE 0 END) AS revenue_room,
|
||
COUNT(CASE WHEN COALESCE(st.course_type_code, 'BASE') = 'ROOM' THEN 1 END) AS room_service_count,
|
||
SUM(CASE WHEN COALESCE(st.course_type_code, 'BASE') = 'ROOM' THEN s.income_seconds ELSE 0 END) / 3600.0 AS room_service_hours,
|
||
COUNT(DISTINCT CASE WHEN s.tenant_member_id > 0 THEN s.tenant_member_id END) AS unique_customers
|
||
FROM billiards_dwd.dwd_assistant_service_log s
|
||
LEFT JOIN billiards_dws.cfg_skill_type st
|
||
ON st.skill_id = s.skill_id AND st.is_active = TRUE
|
||
WHERE s.site_id = %s
|
||
AND DATE(s.start_use_time) >= %s
|
||
AND DATE(s.start_use_time) <= %s
|
||
AND s.is_delete = 0
|
||
GROUP BY DATE(s.start_use_time), s.site_assistant_id
|
||
"""
|
||
rows = self.db.query(sql, (site_id, start_date, end_date))
|
||
return [dict(row) for row in rows] if rows else []
|
||
|
||
def _extract_monthly_salary(self, site_id: int, start_date: date, end_date: date) -> List[Dict[str, Any]]:
|
||
# 获取涉及的月份
|
||
month_start = start_date.replace(day=1)
|
||
month_end = end_date.replace(day=1)
|
||
|
||
sql = """
|
||
SELECT
|
||
assistant_id,
|
||
salary_month,
|
||
gross_salary,
|
||
effective_hours
|
||
FROM billiards_dws.dws_assistant_salary_calc
|
||
WHERE site_id = %s
|
||
AND salary_month >= %s
|
||
AND salary_month <= %s
|
||
"""
|
||
rows = self.db.query(sql, (site_id, month_start, month_end))
|
||
|
||
# 获取每月工作天数
|
||
work_days_sql = """
|
||
SELECT
|
||
assistant_id,
|
||
DATE_TRUNC('month', stat_date)::DATE AS month,
|
||
COUNT(DISTINCT stat_date) AS work_days
|
||
FROM billiards_dws.dws_assistant_daily_detail
|
||
WHERE site_id = %s
|
||
AND stat_date >= %s
|
||
AND stat_date <= %s
|
||
GROUP BY assistant_id, DATE_TRUNC('month', stat_date)
|
||
"""
|
||
work_days_rows = self.db.query(work_days_sql, (site_id, start_date, end_date))
|
||
work_days_index = {(r['assistant_id'], r['month']): r['work_days'] for r in (work_days_rows or [])}
|
||
|
||
results = []
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
asst_id = row_dict.get('assistant_id')
|
||
month = row_dict.get('salary_month')
|
||
row_dict['work_days'] = work_days_index.get((asst_id, month), 20)
|
||
results.append(row_dict)
|
||
|
||
return results
|
||
|
||
|
||
__all__ = ['AssistantFinanceTask']
|