Files
feiqiu-ETL/etl_billiards/tasks/dws/assistant_daily_task.py
2026-02-04 21:39:01 +08:00

345 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
助教日度业绩明细任务
功能说明:
"助教+日期"为粒度,汇总每日业绩明细
数据来源:
- dwd_assistant_service_log: 助教服务流水
- dwd_assistant_trash_event: 废除记录(排除)
- dim_assistant: 助教维度SCD2获取当日等级
- cfg_skill_type: 技能→课程类型映射
目标表:
billiards_dws.dws_assistant_daily_detail
更新策略:
- 更新频率:每小时增量更新
- 幂等方式delete-before-insert按日期窗口
业务规则:
- 有效业绩需排除dwd_assistant_trash_event中的废除记录
- 助教等级使用SCD2 as-of取值获取统计日当日生效的等级
- 课程类型通过skill_id映射分为基础课和附加课
作者ETL团队
创建日期2026-02-01
"""
from __future__ import annotations
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Set, Tuple
from .base_dws_task import BaseDwsTask, CourseType, TaskContext
class AssistantDailyTask(BaseDwsTask):
"""
助教日度业绩明细任务
汇总每个助教每天的:
- 服务次数(总/基础课/附加课)
- 计费时长(秒/小时)
- 计费金额
- 服务客户数(去重)
- 服务台桌数(去重)
- 被废除的记录统计
"""
def get_task_code(self) -> str:
return "DWS_ASSISTANT_DAILY"
def get_target_table(self) -> str:
return "dws_assistant_daily_detail"
def get_primary_keys(self) -> List[str]:
return ["site_id", "assistant_id", "stat_date"]
# ==========================================================================
# ETL主流程
# ==========================================================================
def extract(self, context: TaskContext) -> Dict[str, Any]:
"""
提取数据从DWD层读取助教服务记录
"""
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
site_id = context.store_id
self.logger.info(
"%s: 提取数据,日期范围 %s ~ %s",
self.get_task_code(), start_date, end_date
)
# 1. 获取助教服务记录
service_records = self._extract_service_records(site_id, start_date, end_date)
# 2. 获取废除记录
trash_records = self._extract_trash_records(site_id, start_date, end_date)
# 3. 加载配置缓存
self.load_config_cache()
return {
'service_records': service_records,
'trash_records': trash_records,
'start_date': start_date,
'end_date': end_date,
'site_id': site_id
}
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
"""
转换数据:按助教+日期聚合
"""
service_records = extracted['service_records']
trash_records = extracted['trash_records']
site_id = extracted['site_id']
self.logger.info(
"%s: 转换数据,服务记录 %d 条,废除记录 %d",
self.get_task_code(), len(service_records), len(trash_records)
)
# 构建废除记录索引assistant_service_id -> trash_info
trash_index = self._build_trash_index(trash_records)
# 按助教+日期聚合
aggregated = self._aggregate_by_assistant_date(
service_records,
trash_index,
site_id
)
return aggregated
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
"""
加载数据写入DWS表
"""
if not transformed:
self.logger.info("%s: 无数据需要写入", self.get_task_code())
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
# 删除已存在的数据(幂等)
deleted = self.delete_existing_data(context, date_col="stat_date")
# 批量插入
inserted = self.bulk_insert(transformed)
self.logger.info(
"%s: 加载完成,删除 %d 行,插入 %d",
self.get_task_code(), deleted, inserted
)
return {
"counts": {
"fetched": len(transformed),
"inserted": inserted,
"updated": 0,
"skipped": 0,
"errors": 0
},
"extra": {"deleted": deleted}
}
# ==========================================================================
# 数据提取方法
# ==========================================================================
def _extract_service_records(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取助教服务记录
"""
sql = """
SELECT
asl.assistant_service_id,
asl.order_settle_id,
asl.site_assistant_id AS assistant_id,
asl.nickname AS assistant_nickname,
asl.assistant_level,
asl.skill_id,
asl.skill_name,
asl.tenant_member_id AS member_id,
asl.site_table_id AS table_id,
asl.income_seconds,
asl.real_use_seconds,
asl.ledger_amount,
asl.ledger_unit_price,
DATE(asl.start_use_time) AS service_date
FROM billiards_dwd.dwd_assistant_service_log asl
WHERE asl.site_id = %s
AND DATE(asl.start_use_time) >= %s
AND DATE(asl.start_use_time) <= %s
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_trash_records(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取废除记录
有效业绩的排除规则:仅对"助教废除表"的记录进行处理排除
"""
sql = """
SELECT
assistant_service_id,
trash_seconds,
trash_reason,
trash_time
FROM billiards_dwd.dwd_assistant_trash_event
WHERE site_id = %s
AND DATE(trash_time) >= %s
AND DATE(trash_time) <= %s
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
# ==========================================================================
# 数据转换方法
# ==========================================================================
def _build_trash_index(
self,
trash_records: List[Dict[str, Any]]
) -> Dict[int, Dict[str, Any]]:
"""
构建废除记录索引
"""
index = {}
for record in trash_records:
service_id = record.get('assistant_service_id')
if service_id:
index[service_id] = record
return index
def _aggregate_by_assistant_date(
self,
service_records: List[Dict[str, Any]],
trash_index: Dict[int, Dict[str, Any]],
site_id: int
) -> List[Dict[str, Any]]:
"""
按助教+日期聚合服务记录
"""
# 聚合字典:(assistant_id, service_date) -> aggregated_data
agg_dict: Dict[Tuple[int, date], Dict[str, Any]] = {}
for record in service_records:
assistant_id = record.get('assistant_id')
service_date = record.get('service_date')
if not assistant_id or not service_date:
continue
key = (assistant_id, service_date)
# 初始化聚合数据
if key not in agg_dict:
# 获取助教当日等级SCD2 as-of
level_info = self.get_assistant_level_asof(assistant_id, service_date)
agg_dict[key] = {
'site_id': site_id,
'tenant_id': self.config.get("app.tenant_id", site_id),
'assistant_id': assistant_id,
'assistant_nickname': record.get('assistant_nickname'),
'stat_date': service_date,
'assistant_level_code': level_info.get('level_code') if level_info else record.get('assistant_level'),
'assistant_level_name': level_info.get('level_name') if level_info else None,
'total_service_count': 0,
'base_service_count': 0,
'bonus_service_count': 0,
'total_seconds': 0,
'base_seconds': 0,
'bonus_seconds': 0,
'total_hours': Decimal('0'),
'base_hours': Decimal('0'),
'bonus_hours': Decimal('0'),
'total_ledger_amount': Decimal('0'),
'base_ledger_amount': Decimal('0'),
'bonus_ledger_amount': Decimal('0'),
'unique_customers': set(),
'unique_tables': set(),
'trashed_seconds': 0,
'trashed_count': 0,
}
agg = agg_dict[key]
# 获取服务信息
service_id = record.get('assistant_service_id')
income_seconds = self.safe_int(record.get('income_seconds', 0))
ledger_amount = self.safe_decimal(record.get('ledger_amount', 0))
skill_id = record.get('skill_id')
member_id = record.get('member_id')
table_id = record.get('table_id')
# 判断课程类型
course_type = self.get_course_type(skill_id) if skill_id else CourseType.BASE
is_base = course_type == CourseType.BASE
# 检查是否被废除
is_trashed = service_id in trash_index
if is_trashed:
# 废除记录单独统计
trash_info = trash_index[service_id]
trash_seconds = self.safe_int(trash_info.get('trash_seconds', income_seconds))
agg['trashed_seconds'] += trash_seconds
agg['trashed_count'] += 1
else:
# 正常记录累加
agg['total_service_count'] += 1
agg['total_seconds'] += income_seconds
agg['total_ledger_amount'] += ledger_amount
if is_base:
agg['base_service_count'] += 1
agg['base_seconds'] += income_seconds
agg['base_ledger_amount'] += ledger_amount
else:
agg['bonus_service_count'] += 1
agg['bonus_seconds'] += income_seconds
agg['bonus_ledger_amount'] += ledger_amount
# 客户和台桌去重统计(不论是否废除)
if member_id and not self.is_guest(member_id):
agg['unique_customers'].add(member_id)
if table_id:
agg['unique_tables'].add(table_id)
# 转换为列表并计算派生字段
result = []
for key, agg in agg_dict.items():
# 计算小时数
agg['total_hours'] = self.seconds_to_hours(agg['total_seconds'])
agg['base_hours'] = self.seconds_to_hours(agg['base_seconds'])
agg['bonus_hours'] = self.seconds_to_hours(agg['bonus_seconds'])
# 转换set为count
agg['unique_customers'] = len(agg['unique_customers'])
agg['unique_tables'] = len(agg['unique_tables'])
result.append(agg)
return result
# 便于外部导入
__all__ = ['AssistantDailyTask']