# -*- coding: utf-8 -*- """ 助教月度业绩汇总任务 功能说明: 以"助教+月份"为粒度,汇总月度业绩及档位计算 数据来源: - dws_assistant_daily_detail: 日度明细(聚合) - dim_assistant: 助教维度(入职日期、等级) - cfg_performance_tier: 绩效档位配置 目标表: billiards_dws.dws_assistant_monthly_summary 更新策略: - 更新频率:每日更新当月数据 - 幂等方式:delete-before-insert(按月份) 业务规则: - 新入职判断:入职日期在月1日0点之后则为新入职 - 有效业绩:total_hours - trashed_hours - 档位匹配:根据有效业绩小时数匹配cfg_performance_tier - 排名计算:按有效业绩小时数降序,考虑并列(如2个第一则都是1,下一个是3) 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Set, Tuple from .base_dws_task import BaseDwsTask, TaskContext class AssistantMonthlyTask(BaseDwsTask): """ 助教月度业绩汇总任务 汇总每个助教每月的: - 工作天数、服务次数、时长 - 有效业绩(扣除废除记录后) - 档位匹配 - 月度排名(用于Top3奖金) """ def get_task_code(self) -> str: return "DWS_ASSISTANT_MONTHLY" def get_target_table(self) -> str: return "dws_assistant_monthly_summary" def get_primary_keys(self) -> List[str]: return ["site_id", "assistant_id", "stat_month"] # ========================================================================== # ETL主流程 # ========================================================================== def extract(self, context: TaskContext) -> Dict[str, Any]: """ 提取数据:从日度明细表聚合 """ # 确定月份范围 start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end site_id = context.store_id # 获取涉及的月份列表 months = self._get_months_in_range(start_date, end_date) self.logger.info( "%s: 提取数据,月份范围 %s", self.get_task_code(), [str(m) for m in months] ) # 1. 获取日度明细聚合数据 daily_aggregates = self._extract_daily_aggregates(site_id, months) # 2. 获取助教基本信息 assistant_info = self._extract_assistant_info(site_id) # 3. 加载配置缓存 self.load_config_cache() return { 'daily_aggregates': daily_aggregates, 'assistant_info': assistant_info, 'months': months, 'site_id': site_id } def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]: """ 转换数据:计算月度汇总、档位匹配、排名 """ daily_aggregates = extracted['daily_aggregates'] assistant_info = extracted['assistant_info'] months = extracted['months'] site_id = extracted['site_id'] self.logger.info( "%s: 转换数据,%d 个月份,%d 条聚合记录", self.get_task_code(), len(months), len(daily_aggregates) ) # 按月份处理 all_results = [] for month in months: month_results = self._process_month( daily_aggregates, assistant_info, month, site_id ) all_results.extend(month_results) return all_results def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict: """ 加载数据:写入DWS表 """ if not transformed: self.logger.info("%s: 无数据需要写入", self.get_task_code()) return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}} # 删除已存在的数据(按月份) deleted = self._delete_by_months(context, transformed) # 批量插入 inserted = self.bulk_insert(transformed) self.logger.info( "%s: 加载完成,删除 %d 行,插入 %d 行", self.get_task_code(), deleted, inserted ) return { "counts": { "fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0 }, "extra": {"deleted": deleted} } # ========================================================================== # 数据提取方法 # ========================================================================== def _get_months_in_range(self, start_date: date, end_date: date) -> List[date]: """ 获取日期范围内的所有月份(月第一天) """ months = [] current = start_date.replace(day=1) end_month = end_date.replace(day=1) while current <= end_month: months.append(current) # 下个月 if current.month == 12: current = current.replace(year=current.year + 1, month=1) else: current = current.replace(month=current.month + 1) return months def _extract_daily_aggregates( self, site_id: int, months: List[date] ) -> List[Dict[str, Any]]: """ 从日度明细表提取并按月聚合 """ if not months: return [] # 构建月份条件 month_conditions = [] for month in months: next_month = (month.replace(day=28) + timedelta(days=4)).replace(day=1) month_conditions.append(f"(stat_date >= '{month}' AND stat_date < '{next_month}')") month_where = " OR ".join(month_conditions) sql = f""" SELECT assistant_id, assistant_nickname, assistant_level_code, assistant_level_name, DATE_TRUNC('month', stat_date)::DATE AS stat_month, COUNT(DISTINCT stat_date) AS work_days, SUM(total_service_count) AS total_service_count, SUM(base_service_count) AS base_service_count, SUM(bonus_service_count) AS bonus_service_count, SUM(total_hours) AS total_hours, SUM(base_hours) AS base_hours, SUM(bonus_hours) AS bonus_hours, SUM(total_ledger_amount) AS total_ledger_amount, SUM(base_ledger_amount) AS base_ledger_amount, SUM(bonus_ledger_amount) AS bonus_ledger_amount, SUM(unique_customers) AS total_unique_customers, SUM(unique_tables) AS total_unique_tables, SUM(trashed_seconds) AS trashed_seconds, SUM(trashed_count) AS trashed_count FROM billiards_dws.dws_assistant_daily_detail WHERE site_id = %s AND ({month_where}) GROUP BY assistant_id, assistant_nickname, assistant_level_code, assistant_level_name, DATE_TRUNC('month', stat_date) """ rows = self.db.query(sql, (site_id,)) return [dict(row) for row in rows] if rows else [] def _extract_assistant_info(self, site_id: int) -> Dict[int, Dict[str, Any]]: """ 提取助教基本信息 """ sql = """ SELECT site_assistant_id AS assistant_id, nickname, assistant_level, entry_date AS hire_date FROM billiards_dwd.dim_assistant WHERE site_id = %s AND valid_to IS NULL -- 当前有效记录 """ rows = self.db.query(sql, (site_id,)) result = {} for row in (rows or []): row_dict = dict(row) result[row_dict['assistant_id']] = row_dict return result # ========================================================================== # 数据转换方法 # ========================================================================== def _process_month( self, daily_aggregates: List[Dict[str, Any]], assistant_info: Dict[int, Dict[str, Any]], month: date, site_id: int ) -> List[Dict[str, Any]]: """ 处理单个月份的数据 """ # 筛选该月份的数据 month_data = [ agg for agg in daily_aggregates if agg.get('stat_month') == month ] if not month_data: return [] # 构建月度汇总记录 month_records = [] for agg in month_data: assistant_id = agg.get('assistant_id') asst_info = assistant_info.get(assistant_id, {}) # 计算有效业绩 total_hours = self.safe_decimal(agg.get('total_hours', 0)) trashed_hours = self.seconds_to_hours(self.safe_int(agg.get('trashed_seconds', 0))) effective_hours = total_hours - trashed_hours # 判断是否新入职 hire_date = asst_info.get('hire_date') is_new_hire = False if hire_date: if isinstance(hire_date, datetime): hire_date = hire_date.date() is_new_hire = self.is_new_hire_in_month(hire_date, month) # 匹配档位 tier_hours = effective_hours max_tier_level = None if is_new_hire: tier_hours = self._calc_new_hire_tier_hours(effective_hours, self.safe_int(agg.get('work_days', 0))) if hire_date and hire_date.day > 25: max_tier_level = 3 tier = self.get_performance_tier( tier_hours, is_new_hire, effective_date=month, max_tier_level=max_tier_level ) # 获取月末的等级信息(用于记录) month_end = self._get_month_end(month) level_info = self.get_assistant_level_asof(assistant_id, month_end) record = { 'site_id': site_id, 'tenant_id': self.config.get("app.tenant_id", site_id), 'assistant_id': assistant_id, 'assistant_nickname': agg.get('assistant_nickname'), 'stat_month': month, 'assistant_level_code': level_info.get('level_code') if level_info else agg.get('assistant_level_code'), 'assistant_level_name': level_info.get('level_name') if level_info else agg.get('assistant_level_name'), 'hire_date': hire_date, 'is_new_hire': is_new_hire, 'work_days': self.safe_int(agg.get('work_days', 0)), 'total_service_count': self.safe_int(agg.get('total_service_count', 0)), 'base_service_count': self.safe_int(agg.get('base_service_count', 0)), 'bonus_service_count': self.safe_int(agg.get('bonus_service_count', 0)), 'total_hours': total_hours, 'base_hours': self.safe_decimal(agg.get('base_hours', 0)), 'bonus_hours': self.safe_decimal(agg.get('bonus_hours', 0)), 'effective_hours': effective_hours, 'trashed_hours': trashed_hours, 'total_ledger_amount': self.safe_decimal(agg.get('total_ledger_amount', 0)), 'base_ledger_amount': self.safe_decimal(agg.get('base_ledger_amount', 0)), 'bonus_ledger_amount': self.safe_decimal(agg.get('bonus_ledger_amount', 0)), 'unique_customers': self.safe_int(agg.get('total_unique_customers', 0)), 'unique_tables': self.safe_int(agg.get('total_unique_tables', 0)), 'avg_service_seconds': self._calc_avg_service_seconds(agg), 'tier_id': tier.get('tier_id') if tier else None, 'tier_code': tier.get('tier_code') if tier else None, 'tier_name': tier.get('tier_name') if tier else None, 'rank_by_hours': None, # 后面计算 'rank_with_ties': None, # 后面计算 } month_records.append(record) # 计算排名 self._calculate_ranks(month_records) return month_records def _get_month_end(self, month: date) -> date: """ 获取月末日期 """ if month.month == 12: next_month = month.replace(year=month.year + 1, month=1, day=1) else: next_month = month.replace(month=month.month + 1, day=1) return next_month - timedelta(days=1) def _calc_avg_service_seconds(self, agg: Dict[str, Any]) -> Decimal: """ 计算平均单次服务时长 """ total_count = self.safe_int(agg.get('total_service_count', 0)) if total_count == 0: return Decimal('0') total_hours = self.safe_decimal(agg.get('total_hours', 0)) total_seconds = total_hours * Decimal('3600') return total_seconds / Decimal(str(total_count)) def _calc_new_hire_tier_hours(self, effective_hours: Decimal, work_days: int) -> Decimal: """ 新入职定档:日均 * 30(仅用于定档,不影响奖金与排名) """ if work_days <= 0: return Decimal('0') return (effective_hours / Decimal(str(work_days))) * Decimal('30') def _calculate_ranks(self, records: List[Dict[str, Any]]) -> None: """ 计算排名(考虑并列) Top3排名口径:按有效业绩总小时数排名, 如遇并列则都算,比如2个第一,则记为2个第一,一个第三 """ if not records: return # 按有效业绩降序排序 sorted_records = sorted( records, key=lambda x: x.get('effective_hours', Decimal('0')), reverse=True ) # 计算考虑并列的排名 values = [ (r.get('assistant_id'), r.get('effective_hours', Decimal('0'))) for r in sorted_records ] ranked = self.calculate_rank_with_ties(values) # 创建排名映射 rank_map = { assistant_id: (rank, dense_rank) for assistant_id, rank, dense_rank in ranked } # 更新记录 for record in records: assistant_id = record.get('assistant_id') if assistant_id in rank_map: rank, _ = rank_map[assistant_id] record['rank_by_hours'] = rank record['rank_with_ties'] = rank # 使用考虑并列的排名 def _delete_by_months( self, context: TaskContext, records: List[Dict[str, Any]] ) -> int: """ 按月份删除已存在的数据 """ # 获取涉及的月份 months = set(r.get('stat_month') for r in records if r.get('stat_month')) if not months: return 0 target_table = self.get_target_table() full_table = f"{self.DWS_SCHEMA}.{target_table}" total_deleted = 0 with self.db.conn.cursor() as cur: for month in months: sql = f""" DELETE FROM {full_table} WHERE site_id = %s AND stat_month = %s """ cur.execute(sql, (context.store_id, month)) total_deleted += cur.rowcount return total_deleted # 便于外部导入 __all__ = ['AssistantMonthlyTask']