# -*- coding: utf-8 -*- """ 助教日度业绩明细任务 功能说明: 以"助教+日期"为粒度,汇总每日业绩明细 数据来源: - dwd_assistant_service_log: 助教服务流水 - dwd_assistant_trash_event: 废除记录(排除) - dim_assistant: 助教维度(SCD2,获取当日等级) - cfg_skill_type: 技能→课程类型映射 目标表: billiards_dws.dws_assistant_daily_detail 更新策略: - 更新频率:每小时增量更新 - 幂等方式:delete-before-insert(按日期窗口) 业务规则: - 有效业绩:需排除dwd_assistant_trash_event中的废除记录 - 助教等级:使用SCD2 as-of取值,获取统计日当日生效的等级 - 课程类型:通过skill_id映射,分为基础课和附加课 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Set, Tuple from .base_dws_task import BaseDwsTask, CourseType, TaskContext class AssistantDailyTask(BaseDwsTask): """ 助教日度业绩明细任务 汇总每个助教每天的: - 服务次数(总/基础课/附加课) - 计费时长(秒/小时) - 计费金额 - 服务客户数(去重) - 服务台桌数(去重) - 被废除的记录统计 """ def get_task_code(self) -> str: return "DWS_ASSISTANT_DAILY" def get_target_table(self) -> str: return "dws_assistant_daily_detail" def get_primary_keys(self) -> List[str]: return ["site_id", "assistant_id", "stat_date"] # ========================================================================== # ETL主流程 # ========================================================================== def extract(self, context: TaskContext) -> Dict[str, Any]: """ 提取数据:从DWD层读取助教服务记录 """ start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end site_id = context.store_id self.logger.info( "%s: 提取数据,日期范围 %s ~ %s", self.get_task_code(), start_date, end_date ) # 1. 获取助教服务记录 service_records = self._extract_service_records(site_id, start_date, end_date) # 2. 获取废除记录 trash_records = self._extract_trash_records(site_id, start_date, end_date) # 3. 加载配置缓存 self.load_config_cache() return { 'service_records': service_records, 'trash_records': trash_records, 'start_date': start_date, 'end_date': end_date, 'site_id': site_id } def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]: """ 转换数据:按助教+日期聚合 """ service_records = extracted['service_records'] trash_records = extracted['trash_records'] site_id = extracted['site_id'] self.logger.info( "%s: 转换数据,服务记录 %d 条,废除记录 %d 条", self.get_task_code(), len(service_records), len(trash_records) ) # 构建废除记录索引(assistant_service_id -> trash_info) trash_index = self._build_trash_index(trash_records) # 按助教+日期聚合 aggregated = self._aggregate_by_assistant_date( service_records, trash_index, site_id ) return aggregated def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict: """ 加载数据:写入DWS表 """ if not transformed: self.logger.info("%s: 无数据需要写入", self.get_task_code()) return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}} # 删除已存在的数据(幂等) deleted = self.delete_existing_data(context, date_col="stat_date") # 批量插入 inserted = self.bulk_insert(transformed) self.logger.info( "%s: 加载完成,删除 %d 行,插入 %d 行", self.get_task_code(), deleted, inserted ) return { "counts": { "fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0 }, "extra": {"deleted": deleted} } # ========================================================================== # 数据提取方法 # ========================================================================== def _extract_service_records( self, site_id: int, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """ 提取助教服务记录 """ sql = """ SELECT asl.assistant_service_id, asl.order_settle_id, asl.site_assistant_id AS assistant_id, asl.nickname AS assistant_nickname, asl.assistant_level, asl.skill_id, asl.skill_name, asl.tenant_member_id AS member_id, asl.site_table_id AS table_id, asl.income_seconds, asl.real_use_seconds, asl.ledger_amount, asl.ledger_unit_price, DATE(asl.start_use_time) AS service_date FROM billiards_dwd.dwd_assistant_service_log asl WHERE asl.site_id = %s AND DATE(asl.start_use_time) >= %s AND DATE(asl.start_use_time) <= %s """ rows = self.db.query(sql, (site_id, start_date, end_date)) return [dict(row) for row in rows] if rows else [] def _extract_trash_records( self, site_id: int, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """ 提取废除记录 有效业绩的排除规则:仅对"助教废除表"的记录进行处理排除 """ sql = """ SELECT assistant_service_id, trash_seconds, trash_reason, trash_time FROM billiards_dwd.dwd_assistant_trash_event WHERE site_id = %s AND DATE(trash_time) >= %s AND DATE(trash_time) <= %s """ rows = self.db.query(sql, (site_id, start_date, end_date)) return [dict(row) for row in rows] if rows else [] # ========================================================================== # 数据转换方法 # ========================================================================== def _build_trash_index( self, trash_records: List[Dict[str, Any]] ) -> Dict[int, Dict[str, Any]]: """ 构建废除记录索引 """ index = {} for record in trash_records: service_id = record.get('assistant_service_id') if service_id: index[service_id] = record return index def _aggregate_by_assistant_date( self, service_records: List[Dict[str, Any]], trash_index: Dict[int, Dict[str, Any]], site_id: int ) -> List[Dict[str, Any]]: """ 按助教+日期聚合服务记录 """ # 聚合字典:(assistant_id, service_date) -> aggregated_data agg_dict: Dict[Tuple[int, date], Dict[str, Any]] = {} for record in service_records: assistant_id = record.get('assistant_id') service_date = record.get('service_date') if not assistant_id or not service_date: continue key = (assistant_id, service_date) # 初始化聚合数据 if key not in agg_dict: # 获取助教当日等级(SCD2 as-of) level_info = self.get_assistant_level_asof(assistant_id, service_date) agg_dict[key] = { 'site_id': site_id, 'tenant_id': self.config.get("app.tenant_id", site_id), 'assistant_id': assistant_id, 'assistant_nickname': record.get('assistant_nickname'), 'stat_date': service_date, 'assistant_level_code': level_info.get('level_code') if level_info else record.get('assistant_level'), 'assistant_level_name': level_info.get('level_name') if level_info else None, 'total_service_count': 0, 'base_service_count': 0, 'bonus_service_count': 0, 'total_seconds': 0, 'base_seconds': 0, 'bonus_seconds': 0, 'total_hours': Decimal('0'), 'base_hours': Decimal('0'), 'bonus_hours': Decimal('0'), 'total_ledger_amount': Decimal('0'), 'base_ledger_amount': Decimal('0'), 'bonus_ledger_amount': Decimal('0'), 'unique_customers': set(), 'unique_tables': set(), 'trashed_seconds': 0, 'trashed_count': 0, } agg = agg_dict[key] # 获取服务信息 service_id = record.get('assistant_service_id') income_seconds = self.safe_int(record.get('income_seconds', 0)) ledger_amount = self.safe_decimal(record.get('ledger_amount', 0)) skill_id = record.get('skill_id') member_id = record.get('member_id') table_id = record.get('table_id') # 判断课程类型 course_type = self.get_course_type(skill_id) if skill_id else CourseType.BASE is_base = course_type == CourseType.BASE # 检查是否被废除 is_trashed = service_id in trash_index if is_trashed: # 废除记录单独统计 trash_info = trash_index[service_id] trash_seconds = self.safe_int(trash_info.get('trash_seconds', income_seconds)) agg['trashed_seconds'] += trash_seconds agg['trashed_count'] += 1 else: # 正常记录累加 agg['total_service_count'] += 1 agg['total_seconds'] += income_seconds agg['total_ledger_amount'] += ledger_amount if is_base: agg['base_service_count'] += 1 agg['base_seconds'] += income_seconds agg['base_ledger_amount'] += ledger_amount else: agg['bonus_service_count'] += 1 agg['bonus_seconds'] += income_seconds agg['bonus_ledger_amount'] += ledger_amount # 客户和台桌去重统计(不论是否废除) if member_id and not self.is_guest(member_id): agg['unique_customers'].add(member_id) if table_id: agg['unique_tables'].add(table_id) # 转换为列表并计算派生字段 result = [] for key, agg in agg_dict.items(): # 计算小时数 agg['total_hours'] = self.seconds_to_hours(agg['total_seconds']) agg['base_hours'] = self.seconds_to_hours(agg['base_seconds']) agg['bonus_hours'] = self.seconds_to_hours(agg['bonus_seconds']) # 转换set为count agg['unique_customers'] = len(agg['unique_customers']) agg['unique_tables'] = len(agg['unique_tables']) result.append(agg) return result # 便于外部导入 __all__ = ['AssistantDailyTask']