# -*- coding: utf-8 -*- """ 助教工资计算任务 功能说明: 以"助教+月份"为粒度,计算月度工资明细 数据来源: - dws_assistant_monthly_summary: 月度业绩汇总 - dws_assistant_recharge_commission: 充值提成(Excel导入) - cfg_performance_tier: 绩效档位配置 - cfg_assistant_level_price: 等级定价配置 - cfg_bonus_rules: 奖金规则配置 目标表: billiards_dws.dws_assistant_salary_calc 更新策略: - 更新频率:月初计算上月工资 - 幂等方式:delete-before-insert(按月份) 业务规则(来自DWS数据库处理需求.md): - 基础课收入 = 基础课小时数 × (客户支付价格 - 专业课抽成) 例:中级助教基础课170小时,3档 = 170 × (108 - 13) = 16,150元 - 附加课收入 = 附加课小时数 × 附加课价格 × (1 - 打赏课抽成比例) 例:附加课15小时,3档 = 15 × 190 × (1 - 0.35) = 1,852.5元 - 冲刺奖金:H>=190:300, H>=220:800(不累计,取最高档) - Top3奖金:1st:1000, 2nd:600, 3rd:400(并列都算) - 充值提成:来自dws_assistant_recharge_commission - SCD2口径:等级定价使用月份对应的历史值 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple from .base_dws_task import BaseDwsTask, TaskContext class AssistantSalaryTask(BaseDwsTask): """ 助教工资计算任务 计算每个助教每月的工资明细: - 课时收入(基础课+附加课) - 扣款(档位扣款+其他) - 奖金(档位奖金+冲刺+Top3+充值提成+其他) - 应发工资 """ def get_task_code(self) -> str: return "DWS_ASSISTANT_SALARY" def get_target_table(self) -> str: return "dws_assistant_salary_calc" def get_primary_keys(self) -> List[str]: return ["site_id", "assistant_id", "salary_month"] # ========================================================================== # ETL主流程 # ========================================================================== def extract(self, context: TaskContext) -> Dict[str, Any]: """ 提取数据 """ # 确定工资月份(通常是上月) end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end salary_month = self._get_salary_month(end_date) site_id = context.store_id self.logger.info( "%s: 提取数据,工资月份 %s", self.get_task_code(), salary_month ) # 1. 获取月度业绩汇总 monthly_summary = self._extract_monthly_summary(site_id, salary_month) # 2. 获取充值提成 recharge_commission = self._extract_recharge_commission(site_id, salary_month) # 3. 加载配置缓存 self.load_config_cache() return { 'monthly_summary': monthly_summary, 'recharge_commission': recharge_commission, 'salary_month': salary_month, 'site_id': site_id } def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]: """ 转换数据:计算工资 """ monthly_summary = extracted['monthly_summary'] recharge_commission = extracted['recharge_commission'] salary_month = extracted['salary_month'] site_id = extracted['site_id'] self.logger.info( "%s: 转换数据,%d 条月度汇总记录", self.get_task_code(), len(monthly_summary) ) # 构建充值提成索引 commission_index = {} for comm in recharge_commission: asst_id = comm.get('assistant_id') if asst_id: commission_index[asst_id] = commission_index.get(asst_id, Decimal('0')) + \ self.safe_decimal(comm.get('commission_amount', 0)) # 计算工资 results = [] for summary in monthly_summary: record = self._calculate_salary(summary, commission_index, salary_month, site_id) results.append(record) return results def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict: """ 加载数据 """ if not transformed: self.logger.info("%s: 无数据需要写入", self.get_task_code()) return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}} # 删除已存在的数据 deleted = self._delete_by_month(context, transformed) # 批量插入 inserted = self.bulk_insert(transformed) self.logger.info( "%s: 加载完成,删除 %d 行,插入 %d 行", self.get_task_code(), deleted, inserted ) return { "counts": { "fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0 }, "extra": {"deleted": deleted} } # ========================================================================== # 数据提取方法 # ========================================================================== def _get_salary_month(self, end_date: date) -> date: """ 获取工资月份(默认为上月) """ # 如果是月初,计算上月工资 if end_date.day <= 5: if end_date.month == 1: return date(end_date.year - 1, 12, 1) else: return date(end_date.year, end_date.month - 1, 1) else: # 否则计算当月(可能是调整) return end_date.replace(day=1) def _extract_monthly_summary( self, site_id: int, salary_month: date ) -> List[Dict[str, Any]]: """ 提取月度业绩汇总 """ sql = """ SELECT assistant_id, assistant_nickname, stat_month, assistant_level_code, assistant_level_name, hire_date, is_new_hire, effective_hours, base_hours, bonus_hours, tier_id, tier_code, tier_name, rank_with_ties FROM billiards_dws.dws_assistant_monthly_summary WHERE site_id = %s AND stat_month = %s """ rows = self.db.query(sql, (site_id, salary_month)) return [dict(row) for row in rows] if rows else [] def _extract_recharge_commission( self, site_id: int, salary_month: date ) -> List[Dict[str, Any]]: """ 提取充值提成 """ sql = """ SELECT assistant_id, commission_amount FROM billiards_dws.dws_assistant_recharge_commission WHERE site_id = %s AND commission_month = %s """ rows = self.db.query(sql, (site_id, salary_month)) return [dict(row) for row in rows] if rows else [] # ========================================================================== # 工资计算方法 # ========================================================================== def _calculate_salary( self, summary: Dict[str, Any], commission_index: Dict[int, Decimal], salary_month: date, site_id: int ) -> Dict[str, Any]: """ 计算单个助教的月度工资 """ assistant_id = summary.get('assistant_id') level_code = summary.get('assistant_level_code') effective_hours = self.safe_decimal(summary.get('effective_hours', 0)) base_hours = self.safe_decimal(summary.get('base_hours', 0)) bonus_hours = self.safe_decimal(summary.get('bonus_hours', 0)) is_new_hire = summary.get('is_new_hire', False) rank = summary.get('rank_with_ties') # 获取等级定价(SCD2口径,按月份取值) # base_course_price: 客户支付价格(初级98/中级108/高级118/星级138) # bonus_course_price: 附加课客户支付价格(固定190元) level_price = self.get_level_price(level_code, salary_month) base_course_price = self.safe_decimal( level_price.get('base_course_price', 98) if level_price else 98 ) bonus_course_price = self.safe_decimal( level_price.get('bonus_course_price', 190) if level_price else 190 ) # 获取档位配置 # base_deduction: 专业课抽成(元/小时),球房从每小时扣除 # bonus_deduction_ratio: 打赏课抽成比例,球房从附加课收入扣除的比例 tier = self.get_performance_tier_by_id(summary.get('tier_id'), salary_month) if not tier: tier = self.get_performance_tier( effective_hours, is_new_hire, effective_date=salary_month ) base_deduction = self.safe_decimal(tier.get('base_deduction', 18)) if tier else Decimal('18') bonus_deduction_ratio = self.safe_decimal(tier.get('bonus_deduction_ratio', 0.40)) if tier else Decimal('0.40') vacation_days = tier.get('vacation_days', 0) if tier else 0 vacation_unlimited = tier.get('vacation_unlimited', False) if tier else False # ============================================================ # 工资计算公式(来自DWS数据库处理需求.md) # ============================================================ # 基础课收入 = 基础课小时数 × (客户支付价格 - 专业课抽成) # 例:中级助教170小时,3档 = 170 × (108 - 13) = 16,150元 base_income = base_hours * (base_course_price - base_deduction) # 附加课收入 = 附加课小时数 × 附加课价格 × (1 - 打赏课抽成比例) # 例:15小时,3档 = 15 × 190 × (1 - 0.35) = 1,852.5元 bonus_income = bonus_hours * bonus_course_price * (Decimal('1') - bonus_deduction_ratio) # 课时收入合计 total_course_income = base_income + bonus_income # 计算冲刺奖金(H>=190:300, H>=220:800,不累计取最高) sprint_bonus = self.calculate_sprint_bonus(effective_hours, salary_month) # 计算Top3排名奖金(1st:1000, 2nd:600, 3rd:400,并列都算) top_rank_bonus = Decimal('0') if rank and rank <= 3: top_rank_bonus = self.calculate_top_rank_bonus(rank, salary_month) # 获取充值提成 recharge_commission = commission_index.get(assistant_id, Decimal('0')) # 汇总奖金 other_bonus = Decimal('0') # 预留其他奖金 total_bonus = sprint_bonus + top_rank_bonus + recharge_commission + other_bonus # 计算应发工资 = 课时收入 + 奖金 gross_salary = total_course_income + total_bonus # 构建记录 return { 'site_id': site_id, 'tenant_id': self.config.get("app.tenant_id", site_id), 'assistant_id': assistant_id, 'assistant_nickname': summary.get('assistant_nickname'), 'salary_month': salary_month, 'assistant_level_code': level_code, 'assistant_level_name': summary.get('assistant_level_name'), 'hire_date': summary.get('hire_date'), 'is_new_hire': is_new_hire, 'effective_hours': effective_hours, 'base_hours': base_hours, 'bonus_hours': bonus_hours, 'tier_id': summary.get('tier_id'), 'tier_code': tier.get('tier_code') if tier else None, 'tier_name': tier.get('tier_name') if tier else None, 'rank_with_ties': rank, # 定价信息 'base_course_price': base_course_price, 'bonus_course_price': bonus_course_price, 'base_deduction': base_deduction, 'bonus_deduction_ratio': bonus_deduction_ratio, # 收入明细 'base_income': base_income, 'bonus_income': bonus_income, 'total_course_income': total_course_income, # 奖金明细 'sprint_bonus': sprint_bonus, 'top_rank_bonus': top_rank_bonus, 'recharge_commission': recharge_commission, 'other_bonus': other_bonus, 'total_bonus': total_bonus, # 应发工资 'gross_salary': gross_salary, # 假期 'vacation_days': vacation_days, 'vacation_unlimited': vacation_unlimited, 'calc_notes': self._build_calc_notes(summary, tier, sprint_bonus, top_rank_bonus), } def _build_calc_notes( self, summary: Dict[str, Any], tier: Optional[Dict[str, Any]], sprint_bonus: Decimal, top_rank_bonus: Decimal ) -> Optional[str]: """ 构建计算备注 """ notes = [] if summary.get('is_new_hire'): notes.append("新入职首月") if tier: notes.append(f"档位: {tier.get('tier_name', 'N/A')}") if sprint_bonus > 0: notes.append(f"冲刺奖金: {sprint_bonus}") if top_rank_bonus > 0: rank = summary.get('rank_with_ties') notes.append(f"Top{rank}奖金: {top_rank_bonus}") return "; ".join(notes) if notes else None def _delete_by_month( self, context: TaskContext, records: List[Dict[str, Any]] ) -> int: """ 按月份删除已存在的数据 """ months = set(r.get('salary_month') for r in records if r.get('salary_month')) if not months: return 0 target_table = self.get_target_table() full_table = f"{self.DWS_SCHEMA}.{target_table}" total_deleted = 0 with self.db.conn.cursor() as cur: for month in months: sql = f""" DELETE FROM {full_table} WHERE site_id = %s AND salary_month = %s """ cur.execute(sql, (context.store_id, month)) total_deleted += cur.rowcount return total_deleted # 便于外部导入 __all__ = ['AssistantSalaryTask']