Files
ZQYY.FQ-ETL/tasks/dws/assistant_salary_task.py

438 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
助教工资计算任务
功能说明:
"助教+月份"为粒度,计算月度工资明细
数据来源:
- dws_assistant_monthly_summary: 月度业绩汇总
- dws_assistant_recharge_commission: 充值提成Excel导入
- cfg_performance_tier: 绩效档位配置
- cfg_assistant_level_price: 等级定价配置
- cfg_bonus_rules: 奖金规则配置
目标表:
billiards_dws.dws_assistant_salary_calc
更新策略:
- 更新频率:月初计算上月工资
- 幂等方式delete-before-insert按月份
业务规则来自DWS数据库处理需求.md
- 基础课收入 = 基础课小时数 × (客户支付价格 - 专业课抽成)
中级助教基础课170小时3档 = 170 × (108 - 13) = 16,150元
- 附加课收入 = 附加课小时数 × 附加课价格 × (1 - 打赏课抽成比例)
附加课15小时3档 = 15 × 190 × (1 - 0.35) = 1,852.5元
- 包厢课收入 = 包厢课小时数 × (包厢课客户支付价格 - 专业课抽成)
- 冲刺奖金:按规则表配置(历史口径,不累计取最高档)
- Top3奖金1st:1000, 2nd:600, 3rd:400并列都算
- 充值提成来自dws_assistant_recharge_commission
- SCD2口径等级定价使用月份对应的历史值
作者ETL团队
创建日期2026-02-01
"""
from __future__ import annotations
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from .base_dws_task import BaseDwsTask, TaskContext
class AssistantSalaryTask(BaseDwsTask):
"""
助教工资计算任务
计算每个助教每月的工资明细:
- 课时收入(基础课+附加课)
- 扣款(档位扣款+其他)
- 奖金(档位奖金+冲刺+Top3+充值提成+其他)
- 应发工资
"""
def get_task_code(self) -> str:
return "DWS_ASSISTANT_SALARY"
def get_target_table(self) -> str:
return "dws_assistant_salary_calc"
def get_primary_keys(self) -> List[str]:
return ["site_id", "assistant_id", "salary_month"]
# ==========================================================================
# ETL主流程
# ==========================================================================
def extract(self, context: TaskContext) -> Dict[str, Any]:
"""
提取数据
"""
# 确定工资月份(通常是上月)
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
if self._should_skip_run(end_date):
self.logger.info("%s: 非工资结算期,跳过", self.get_task_code())
return {
'monthly_summary': [],
'recharge_commission': [],
'salary_month': None,
'site_id': context.store_id,
}
salary_month = self._get_salary_month(end_date)
site_id = context.store_id
self.logger.info(
"%s: 提取数据,工资月份 %s",
self.get_task_code(), salary_month
)
# 1. 获取月度业绩汇总
monthly_summary = self._extract_monthly_summary(site_id, salary_month)
# 2. 获取充值提成
recharge_commission = self._extract_recharge_commission(site_id, salary_month)
# 3. 加载配置缓存
self.load_config_cache()
return {
'monthly_summary': monthly_summary,
'recharge_commission': recharge_commission,
'salary_month': salary_month,
'site_id': site_id
}
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
"""
转换数据:计算工资
"""
if not extracted.get('salary_month'):
return []
monthly_summary = extracted['monthly_summary']
recharge_commission = extracted['recharge_commission']
salary_month = extracted['salary_month']
site_id = extracted['site_id']
self.logger.info(
"%s: 转换数据,%d 条月度汇总记录",
self.get_task_code(), len(monthly_summary)
)
# 构建充值提成索引
commission_index = {}
for comm in recharge_commission:
asst_id = comm.get('assistant_id')
if asst_id:
commission_index[asst_id] = commission_index.get(asst_id, Decimal('0')) + \
self.safe_decimal(comm.get('commission_amount', 0))
# 计算工资
results = []
for summary in monthly_summary:
record = self._calculate_salary(summary, commission_index, salary_month, site_id)
results.append(record)
return results
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
"""
加载数据
"""
if not transformed:
self.logger.info("%s: 无数据需要写入", self.get_task_code())
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
# 删除已存在的数据
deleted = self._delete_by_month(context, transformed)
# 批量插入
inserted = self.bulk_insert(transformed)
self.logger.info(
"%s: 加载完成,删除 %d 行,插入 %d",
self.get_task_code(), deleted, inserted
)
return {
"counts": {
"fetched": len(transformed),
"inserted": inserted,
"updated": 0,
"skipped": 0,
"errors": 0
},
"extra": {"deleted": deleted}
}
# ==========================================================================
# 数据提取方法
# ==========================================================================
def _get_salary_month(self, end_date: date) -> date:
"""
获取工资月份(默认为上月)
"""
# 如果是月初,计算上月工资
if end_date.day <= 5:
if end_date.month == 1:
return date(end_date.year - 1, 12, 1)
else:
return date(end_date.year, end_date.month - 1, 1)
else:
# 否则计算当月(可能是调整)
return end_date.replace(day=1)
def _should_skip_run(self, end_date: date) -> bool:
"""
工资计算仅在月初运行(默认前 N 天)
"""
allow_out_of_cycle = bool(self.config.get("dws.salary.allow_out_of_cycle", False))
if allow_out_of_cycle:
return False
run_days = self.safe_int(self.config.get("dws.salary.run_days", 5))
if run_days <= 0:
return False
return end_date.day > run_days
def _extract_monthly_summary(
self,
site_id: int,
salary_month: date
) -> List[Dict[str, Any]]:
"""
提取月度业绩汇总
"""
sql = """
SELECT
assistant_id,
assistant_nickname,
stat_month,
assistant_level_code,
assistant_level_name,
hire_date,
is_new_hire,
effective_hours,
base_hours,
bonus_hours,
room_hours,
tier_id,
tier_code,
tier_name,
rank_with_ties
FROM billiards_dws.dws_assistant_monthly_summary
WHERE site_id = %s AND stat_month = %s
"""
rows = self.db.query(sql, (site_id, salary_month))
return [dict(row) for row in rows] if rows else []
def _extract_recharge_commission(
self,
site_id: int,
salary_month: date
) -> List[Dict[str, Any]]:
"""
提取充值提成
"""
sql = """
SELECT
assistant_id,
commission_amount
FROM billiards_dws.dws_assistant_recharge_commission
WHERE site_id = %s AND commission_month = %s
"""
rows = self.db.query(sql, (site_id, salary_month))
return [dict(row) for row in rows] if rows else []
# ==========================================================================
# 工资计算方法
# ==========================================================================
def _calculate_salary(
self,
summary: Dict[str, Any],
commission_index: Dict[int, Decimal],
salary_month: date,
site_id: int
) -> Dict[str, Any]:
"""
计算单个助教的月度工资
"""
assistant_id = summary.get('assistant_id')
level_code = summary.get('assistant_level_code')
effective_hours = self.safe_decimal(summary.get('effective_hours', 0))
base_hours = self.safe_decimal(summary.get('base_hours', 0))
bonus_hours = self.safe_decimal(summary.get('bonus_hours', 0))
room_hours = self.safe_decimal(summary.get('room_hours', 0))
is_new_hire = summary.get('is_new_hire', False)
rank = summary.get('rank_with_ties')
# 获取等级定价SCD2口径按月份取值
# base_course_price: 客户支付价格初级98/中级108/高级118/星级138
# bonus_course_price: 附加课客户支付价格固定190元
# room_course_price: 包厢课客户支付价格固定138元
level_price = self.get_level_price(level_code, salary_month)
base_course_price = self.safe_decimal(
level_price.get('base_course_price', 98) if level_price else 98
)
bonus_course_price = self.safe_decimal(
level_price.get('bonus_course_price', 190) if level_price else 190
)
room_course_price = self.safe_decimal(
self.config.get("dws.salary.room_course_price", 138)
)
# 获取档位配置
# base_deduction: 专业课抽成(元/小时),球房从每小时扣除
# bonus_deduction_ratio: 打赏课抽成比例,球房从附加课收入扣除的比例
tier = self.get_performance_tier_by_id(summary.get('tier_id'), salary_month)
if not tier:
tier = self.get_performance_tier(
effective_hours,
is_new_hire,
effective_date=salary_month
)
base_deduction = self.safe_decimal(tier.get('base_deduction', 18)) if tier else Decimal('18')
bonus_deduction_ratio = self.safe_decimal(tier.get('bonus_deduction_ratio', 0.40)) if tier else Decimal('0.40')
vacation_days = tier.get('vacation_days', 0) if tier else 0
vacation_unlimited = tier.get('vacation_unlimited', False) if tier else False
# ============================================================
# 工资计算公式来自DWS数据库处理需求.md
# ============================================================
# 基础课收入 = 基础课小时数 × (客户支付价格 - 专业课抽成)
# 例中级助教170小时3档 = 170 × (108 - 13) = 16,150元
base_income = base_hours * (base_course_price - base_deduction)
# 附加课收入 = 附加课小时数 × 附加课价格 × (1 - 打赏课抽成比例)
# 例15小时3档 = 15 × 190 × (1 - 0.35) = 1,852.5元
bonus_income = bonus_hours * bonus_course_price * (Decimal('1') - bonus_deduction_ratio)
# 包厢课收入(按包厢课统一价格口径)
room_income = room_hours * (room_course_price - base_deduction)
# 课时收入合计
total_course_income = base_income + bonus_income + room_income
# 计算冲刺奖金(按规则表配置,不累计取最高)
sprint_bonus = self.calculate_sprint_bonus(effective_hours, salary_month)
# 计算Top3排名奖金1st:1000, 2nd:600, 3rd:400并列都算
top_rank_bonus = Decimal('0')
if rank and rank <= 3:
top_rank_bonus = self.calculate_top_rank_bonus(rank, salary_month)
# 获取充值提成
recharge_commission = commission_index.get(assistant_id, Decimal('0'))
# 汇总奖金
other_bonus = Decimal('0') # 预留其他奖金
total_bonus = sprint_bonus + top_rank_bonus + recharge_commission + other_bonus
# 计算应发工资 = 课时收入 + 奖金
gross_salary = total_course_income + total_bonus
# 构建记录
return {
'site_id': site_id,
'tenant_id': self.config.get("app.tenant_id", site_id),
'assistant_id': assistant_id,
'assistant_nickname': summary.get('assistant_nickname'),
'salary_month': salary_month,
'assistant_level_code': level_code,
'assistant_level_name': summary.get('assistant_level_name'),
'hire_date': summary.get('hire_date'),
'is_new_hire': is_new_hire,
'effective_hours': effective_hours,
'base_hours': base_hours,
'bonus_hours': bonus_hours,
'room_hours': room_hours,
'tier_id': summary.get('tier_id'),
'tier_code': tier.get('tier_code') if tier else None,
'tier_name': tier.get('tier_name') if tier else None,
'rank_with_ties': rank,
# 定价信息
'base_course_price': base_course_price,
'bonus_course_price': bonus_course_price,
'base_deduction': base_deduction,
'bonus_deduction_ratio': bonus_deduction_ratio,
# 收入明细
'base_income': base_income,
'bonus_income': bonus_income,
'room_income': room_income,
'total_course_income': total_course_income,
# 奖金明细
'sprint_bonus': sprint_bonus,
'top_rank_bonus': top_rank_bonus,
'recharge_commission': recharge_commission,
'other_bonus': other_bonus,
'total_bonus': total_bonus,
# 应发工资
'gross_salary': gross_salary,
# 假期
'vacation_days': vacation_days,
'vacation_unlimited': vacation_unlimited,
'calc_notes': self._build_calc_notes(summary, tier, sprint_bonus, top_rank_bonus),
}
def _build_calc_notes(
self,
summary: Dict[str, Any],
tier: Optional[Dict[str, Any]],
sprint_bonus: Decimal,
top_rank_bonus: Decimal
) -> Optional[str]:
"""
构建计算备注
"""
notes = []
if summary.get('is_new_hire'):
notes.append("新入职首月")
if tier:
notes.append(f"档位: {tier.get('tier_name', 'N/A')}")
if sprint_bonus > 0:
notes.append(f"冲刺奖金: {sprint_bonus}")
if top_rank_bonus > 0:
rank = summary.get('rank_with_ties')
notes.append(f"Top{rank}奖金: {top_rank_bonus}")
return "; ".join(notes) if notes else None
def _delete_by_month(
self,
context: TaskContext,
records: List[Dict[str, Any]]
) -> int:
"""
按月份删除已存在的数据
"""
months = set(r.get('salary_month') for r in records if r.get('salary_month'))
if not months:
return 0
target_table = self.get_target_table()
full_table = f"{self.DWS_SCHEMA}.{target_table}"
total_deleted = 0
with self.db.conn.cursor() as cur:
for month in months:
sql = f"""
DELETE FROM {full_table}
WHERE site_id = %s AND salary_month = %s
"""
cur.execute(sql, (context.store_id, month))
total_deleted += cur.rowcount
return total_deleted
# 便于外部导入
__all__ = ['AssistantSalaryTask']