357 lines
13 KiB
Python
357 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
助教日度业绩明细任务
|
||
|
||
功能说明:
|
||
以"助教+日期"为粒度,汇总每日业绩明细
|
||
|
||
数据来源:
|
||
- dwd_assistant_service_log: 助教服务流水
|
||
- dwd_assistant_trash_event: 废除记录(排除)
|
||
- dim_assistant: 助教维度(SCD2,获取当日等级)
|
||
- cfg_skill_type: 技能→课程类型映射
|
||
|
||
目标表:
|
||
billiards_dws.dws_assistant_daily_detail
|
||
|
||
更新策略:
|
||
- 更新频率:每小时增量更新
|
||
- 幂等方式:delete-before-insert(按日期窗口)
|
||
|
||
业务规则:
|
||
- 有效业绩:需排除dwd_assistant_trash_event中的废除记录
|
||
- 助教等级:使用SCD2 as-of取值,获取统计日当日生效的等级
|
||
- 课程类型:通过skill_id映射,分为基础课和附加课
|
||
|
||
作者:ETL团队
|
||
创建日期:2026-02-01
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import date, datetime, timedelta
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||
|
||
from .base_dws_task import BaseDwsTask, CourseType, TaskContext
|
||
|
||
|
||
class AssistantDailyTask(BaseDwsTask):
|
||
"""
|
||
助教日度业绩明细任务
|
||
|
||
汇总每个助教每天的:
|
||
- 服务次数(总/基础课/附加课)
|
||
- 计费时长(秒/小时)
|
||
- 计费金额
|
||
- 服务客户数(去重)
|
||
- 服务台桌数(去重)
|
||
- 被废除的记录统计
|
||
"""
|
||
|
||
def get_task_code(self) -> str:
|
||
return "DWS_ASSISTANT_DAILY"
|
||
|
||
def get_target_table(self) -> str:
|
||
return "dws_assistant_daily_detail"
|
||
|
||
def get_primary_keys(self) -> List[str]:
|
||
return ["site_id", "assistant_id", "stat_date"]
|
||
|
||
# ==========================================================================
|
||
# ETL主流程
|
||
# ==========================================================================
|
||
|
||
def extract(self, context: TaskContext) -> Dict[str, Any]:
|
||
"""
|
||
提取数据:从DWD层读取助教服务记录
|
||
"""
|
||
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
|
||
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
|
||
site_id = context.store_id
|
||
|
||
self.logger.info(
|
||
"%s: 提取数据,日期范围 %s ~ %s",
|
||
self.get_task_code(), start_date, end_date
|
||
)
|
||
|
||
# 1. 获取助教服务记录
|
||
service_records = self._extract_service_records(site_id, start_date, end_date)
|
||
|
||
# 2. 获取废除记录
|
||
trash_records = self._extract_trash_records(site_id, start_date, end_date)
|
||
|
||
# 3. 加载配置缓存
|
||
self.load_config_cache()
|
||
|
||
return {
|
||
'service_records': service_records,
|
||
'trash_records': trash_records,
|
||
'start_date': start_date,
|
||
'end_date': end_date,
|
||
'site_id': site_id
|
||
}
|
||
|
||
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
|
||
"""
|
||
转换数据:按助教+日期聚合
|
||
"""
|
||
service_records = extracted['service_records']
|
||
trash_records = extracted['trash_records']
|
||
site_id = extracted['site_id']
|
||
|
||
self.logger.info(
|
||
"%s: 转换数据,服务记录 %d 条,废除记录 %d 条",
|
||
self.get_task_code(), len(service_records), len(trash_records)
|
||
)
|
||
|
||
# 构建废除记录索引(assistant_service_id -> trash_info)
|
||
trash_index = self._build_trash_index(trash_records)
|
||
|
||
# 按助教+日期聚合
|
||
aggregated = self._aggregate_by_assistant_date(
|
||
service_records,
|
||
trash_index,
|
||
site_id
|
||
)
|
||
|
||
return aggregated
|
||
|
||
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
|
||
"""
|
||
加载数据:写入DWS表
|
||
"""
|
||
if not transformed:
|
||
self.logger.info("%s: 无数据需要写入", self.get_task_code())
|
||
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
|
||
|
||
# 删除已存在的数据(幂等)
|
||
deleted = self.delete_existing_data(context, date_col="stat_date")
|
||
|
||
# 批量插入
|
||
inserted = self.bulk_insert(transformed)
|
||
|
||
self.logger.info(
|
||
"%s: 加载完成,删除 %d 行,插入 %d 行",
|
||
self.get_task_code(), deleted, inserted
|
||
)
|
||
|
||
return {
|
||
"counts": {
|
||
"fetched": len(transformed),
|
||
"inserted": inserted,
|
||
"updated": 0,
|
||
"skipped": 0,
|
||
"errors": 0
|
||
},
|
||
"extra": {"deleted": deleted}
|
||
}
|
||
|
||
# ==========================================================================
|
||
# 数据提取方法
|
||
# ==========================================================================
|
||
|
||
def _extract_service_records(
|
||
self,
|
||
site_id: int,
|
||
start_date: date,
|
||
end_date: date
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
提取助教服务记录
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
asl.assistant_service_id,
|
||
asl.order_settle_id,
|
||
asl.site_assistant_id AS assistant_id,
|
||
asl.nickname AS assistant_nickname,
|
||
asl.assistant_level,
|
||
asl.skill_id,
|
||
asl.skill_name,
|
||
asl.tenant_member_id AS member_id,
|
||
asl.site_table_id AS table_id,
|
||
asl.income_seconds,
|
||
asl.real_use_seconds,
|
||
asl.ledger_amount,
|
||
asl.ledger_unit_price,
|
||
DATE(asl.start_use_time) AS service_date
|
||
FROM billiards_dwd.dwd_assistant_service_log asl
|
||
WHERE asl.site_id = %s
|
||
AND DATE(asl.start_use_time) >= %s
|
||
AND DATE(asl.start_use_time) <= %s
|
||
AND asl.is_delete = 0
|
||
"""
|
||
rows = self.db.query(sql, (site_id, start_date, end_date))
|
||
return [dict(row) for row in rows] if rows else []
|
||
|
||
def _extract_trash_records(
|
||
self,
|
||
site_id: int,
|
||
start_date: date,
|
||
end_date: date
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
提取废除记录
|
||
|
||
有效业绩的排除规则:仅对"助教废除表"的记录进行处理排除
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
assistant_service_id,
|
||
trash_seconds,
|
||
trash_reason,
|
||
trash_time
|
||
FROM billiards_dwd.dwd_assistant_trash_event
|
||
WHERE site_id = %s
|
||
AND DATE(trash_time) >= %s
|
||
AND DATE(trash_time) <= %s
|
||
"""
|
||
rows = self.db.query(sql, (site_id, start_date, end_date))
|
||
return [dict(row) for row in rows] if rows else []
|
||
|
||
# ==========================================================================
|
||
# 数据转换方法
|
||
# ==========================================================================
|
||
|
||
def _build_trash_index(
|
||
self,
|
||
trash_records: List[Dict[str, Any]]
|
||
) -> Dict[int, Dict[str, Any]]:
|
||
"""
|
||
构建废除记录索引
|
||
"""
|
||
index = {}
|
||
for record in trash_records:
|
||
service_id = record.get('assistant_service_id')
|
||
if service_id:
|
||
index[service_id] = record
|
||
return index
|
||
|
||
def _aggregate_by_assistant_date(
|
||
self,
|
||
service_records: List[Dict[str, Any]],
|
||
trash_index: Dict[int, Dict[str, Any]],
|
||
site_id: int
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
按助教+日期聚合服务记录
|
||
"""
|
||
# 聚合字典:(assistant_id, service_date) -> aggregated_data
|
||
agg_dict: Dict[Tuple[int, date], Dict[str, Any]] = {}
|
||
|
||
for record in service_records:
|
||
assistant_id = record.get('assistant_id')
|
||
service_date = record.get('service_date')
|
||
|
||
if not assistant_id or not service_date:
|
||
continue
|
||
|
||
key = (assistant_id, service_date)
|
||
|
||
# 初始化聚合数据
|
||
if key not in agg_dict:
|
||
# 获取助教当日等级(SCD2 as-of)
|
||
level_info = self.get_assistant_level_asof(assistant_id, service_date)
|
||
|
||
agg_dict[key] = {
|
||
'site_id': site_id,
|
||
'tenant_id': self.config.get("app.tenant_id", site_id),
|
||
'assistant_id': assistant_id,
|
||
'assistant_nickname': record.get('assistant_nickname'),
|
||
'stat_date': service_date,
|
||
'assistant_level_code': level_info.get('level_code') if level_info else record.get('assistant_level'),
|
||
'assistant_level_name': level_info.get('level_name') if level_info else None,
|
||
'total_service_count': 0,
|
||
'base_service_count': 0,
|
||
'bonus_service_count': 0,
|
||
'room_service_count': 0,
|
||
'total_seconds': 0,
|
||
'base_seconds': 0,
|
||
'bonus_seconds': 0,
|
||
'room_seconds': 0,
|
||
'total_hours': Decimal('0'),
|
||
'base_hours': Decimal('0'),
|
||
'bonus_hours': Decimal('0'),
|
||
'room_hours': Decimal('0'),
|
||
'total_ledger_amount': Decimal('0'),
|
||
'base_ledger_amount': Decimal('0'),
|
||
'bonus_ledger_amount': Decimal('0'),
|
||
'room_ledger_amount': Decimal('0'),
|
||
'unique_customers': set(),
|
||
'unique_tables': set(),
|
||
'trashed_seconds': 0,
|
||
'trashed_count': 0,
|
||
}
|
||
|
||
agg = agg_dict[key]
|
||
|
||
# 获取服务信息
|
||
service_id = record.get('assistant_service_id')
|
||
income_seconds = self.safe_int(record.get('income_seconds', 0))
|
||
ledger_amount = self.safe_decimal(record.get('ledger_amount', 0))
|
||
skill_id = record.get('skill_id')
|
||
member_id = record.get('member_id')
|
||
table_id = record.get('table_id')
|
||
|
||
# 判断课程类型
|
||
course_type = self.get_course_type(skill_id) if skill_id else CourseType.BASE
|
||
is_base = course_type == CourseType.BASE
|
||
is_bonus = course_type == CourseType.BONUS
|
||
is_room = course_type == CourseType.ROOM
|
||
|
||
# 检查是否被废除
|
||
is_trashed = service_id in trash_index
|
||
|
||
if is_trashed:
|
||
# 废除记录单独统计
|
||
trash_info = trash_index[service_id]
|
||
trash_seconds = self.safe_int(trash_info.get('trash_seconds', income_seconds))
|
||
agg['trashed_seconds'] += trash_seconds
|
||
agg['trashed_count'] += 1
|
||
else:
|
||
# 正常记录累加
|
||
agg['total_service_count'] += 1
|
||
agg['total_seconds'] += income_seconds
|
||
agg['total_ledger_amount'] += ledger_amount
|
||
|
||
if is_base:
|
||
agg['base_service_count'] += 1
|
||
agg['base_seconds'] += income_seconds
|
||
agg['base_ledger_amount'] += ledger_amount
|
||
elif is_bonus:
|
||
agg['bonus_service_count'] += 1
|
||
agg['bonus_seconds'] += income_seconds
|
||
agg['bonus_ledger_amount'] += ledger_amount
|
||
elif is_room:
|
||
agg['room_service_count'] += 1
|
||
agg['room_seconds'] += income_seconds
|
||
agg['room_ledger_amount'] += ledger_amount
|
||
|
||
# 客户和台桌去重统计(不论是否废除)
|
||
if member_id and not self.is_guest(member_id):
|
||
agg['unique_customers'].add(member_id)
|
||
if table_id:
|
||
agg['unique_tables'].add(table_id)
|
||
|
||
# 转换为列表并计算派生字段
|
||
result = []
|
||
for key, agg in agg_dict.items():
|
||
# 计算小时数
|
||
agg['total_hours'] = self.seconds_to_hours(agg['total_seconds'])
|
||
agg['base_hours'] = self.seconds_to_hours(agg['base_seconds'])
|
||
agg['bonus_hours'] = self.seconds_to_hours(agg['bonus_seconds'])
|
||
agg['room_hours'] = self.seconds_to_hours(agg['room_seconds'])
|
||
|
||
# 转换set为count
|
||
agg['unique_customers'] = len(agg['unique_customers'])
|
||
agg['unique_tables'] = len(agg['unique_tables'])
|
||
|
||
result.append(agg)
|
||
|
||
return result
|
||
|
||
|
||
# 便于外部导入
|
||
__all__ = ['AssistantDailyTask']
|