335 lines
14 KiB
Python
335 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
助教服务客户统计任务
|
||
|
||
功能说明:
|
||
以"助教+客户"为粒度,统计服务关系和滚动窗口指标
|
||
|
||
数据来源:
|
||
- dwd_assistant_service_log: 助教服务流水
|
||
- dim_member: 会员维度
|
||
|
||
目标表:
|
||
billiards_dws.dws_assistant_customer_stats
|
||
|
||
更新策略:
|
||
- 更新频率:每日更新
|
||
- 幂等方式:delete-before-insert(按统计日期)
|
||
|
||
业务规则:
|
||
- 散客处理:member_id=0 不进入此表统计
|
||
- 滚动窗口:7/10/15/30/60/90天
|
||
- 活跃度:近7天/30天是否有服务
|
||
|
||
作者:ETL团队
|
||
创建日期:2026-02-01
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import date, datetime, timedelta
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||
|
||
from .base_dws_task import BaseDwsTask, TaskContext
|
||
|
||
|
||
class AssistantCustomerTask(BaseDwsTask):
|
||
"""
|
||
助教服务客户统计任务
|
||
|
||
统计每个助教与每个客户的服务关系:
|
||
- 首次/最近服务日期
|
||
- 累计服务统计
|
||
- 滚动窗口统计(7/10/15/30/60/90天)
|
||
- 活跃度指标
|
||
"""
|
||
|
||
def get_task_code(self) -> str:
|
||
return "DWS_ASSISTANT_CUSTOMER"
|
||
|
||
def get_target_table(self) -> str:
|
||
return "dws_assistant_customer_stats"
|
||
|
||
def get_primary_keys(self) -> List[str]:
|
||
return ["site_id", "assistant_id", "member_id", "stat_date"]
|
||
|
||
# ==========================================================================
|
||
# ETL主流程
|
||
# ==========================================================================
|
||
|
||
def extract(self, context: TaskContext) -> Dict[str, Any]:
|
||
"""
|
||
提取数据
|
||
"""
|
||
stat_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
|
||
site_id = context.store_id
|
||
|
||
self.logger.info(
|
||
"%s: 提取数据,统计日期 %s",
|
||
self.get_task_code(), stat_date
|
||
)
|
||
|
||
# 计算最大回溯日期(90天窗口)
|
||
lookback_start = stat_date - timedelta(days=90)
|
||
|
||
# 1. 获取助教-客户服务记录(包含历史全量用于累计统计)
|
||
service_pairs = self._extract_service_pairs(site_id, stat_date)
|
||
|
||
# 2. 获取会员信息
|
||
member_info = self._extract_member_info(site_id)
|
||
|
||
# 3. 获取助教信息
|
||
assistant_info = self._extract_assistant_info(site_id)
|
||
|
||
return {
|
||
'service_pairs': service_pairs,
|
||
'member_info': member_info,
|
||
'assistant_info': assistant_info,
|
||
'stat_date': stat_date,
|
||
'site_id': site_id
|
||
}
|
||
|
||
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
|
||
"""
|
||
转换数据:计算各窗口统计
|
||
"""
|
||
service_pairs = extracted['service_pairs']
|
||
member_info = extracted['member_info']
|
||
assistant_info = extracted['assistant_info']
|
||
stat_date = extracted['stat_date']
|
||
site_id = extracted['site_id']
|
||
|
||
self.logger.info(
|
||
"%s: 转换数据,%d 条服务关系记录",
|
||
self.get_task_code(), len(service_pairs)
|
||
)
|
||
|
||
# 构建统计记录
|
||
results = []
|
||
|
||
for pair in service_pairs:
|
||
assistant_id = pair.get('assistant_id')
|
||
member_id = pair.get('member_id')
|
||
|
||
# 跳过散客
|
||
if self.is_guest(member_id):
|
||
continue
|
||
|
||
asst_info = assistant_info.get(assistant_id, {})
|
||
memb_info = member_info.get(member_id, {})
|
||
|
||
# 构建记录
|
||
record = {
|
||
'site_id': site_id,
|
||
'tenant_id': self.config.get("app.tenant_id", site_id),
|
||
'assistant_id': assistant_id,
|
||
'assistant_nickname': asst_info.get('nickname', pair.get('assistant_nickname')),
|
||
'member_id': member_id,
|
||
'member_nickname': memb_info.get('nickname'),
|
||
'member_mobile': self._mask_mobile(memb_info.get('mobile')),
|
||
'stat_date': stat_date,
|
||
# 全量累计统计
|
||
'first_service_date': pair.get('first_service_date'),
|
||
'last_service_date': pair.get('last_service_date'),
|
||
'total_service_count': self.safe_int(pair.get('total_service_count', 0)),
|
||
'total_service_hours': self.safe_decimal(pair.get('total_service_hours', 0)),
|
||
'total_service_amount': self.safe_decimal(pair.get('total_service_amount', 0)),
|
||
# 滚动窗口统计
|
||
'service_count_7d': self.safe_int(pair.get('service_count_7d', 0)),
|
||
'service_count_10d': self.safe_int(pair.get('service_count_10d', 0)),
|
||
'service_count_15d': self.safe_int(pair.get('service_count_15d', 0)),
|
||
'service_count_30d': self.safe_int(pair.get('service_count_30d', 0)),
|
||
'service_count_60d': self.safe_int(pair.get('service_count_60d', 0)),
|
||
'service_count_90d': self.safe_int(pair.get('service_count_90d', 0)),
|
||
'service_hours_7d': self.safe_decimal(pair.get('service_hours_7d', 0)),
|
||
'service_hours_10d': self.safe_decimal(pair.get('service_hours_10d', 0)),
|
||
'service_hours_15d': self.safe_decimal(pair.get('service_hours_15d', 0)),
|
||
'service_hours_30d': self.safe_decimal(pair.get('service_hours_30d', 0)),
|
||
'service_hours_60d': self.safe_decimal(pair.get('service_hours_60d', 0)),
|
||
'service_hours_90d': self.safe_decimal(pair.get('service_hours_90d', 0)),
|
||
'service_amount_7d': self.safe_decimal(pair.get('service_amount_7d', 0)),
|
||
'service_amount_10d': self.safe_decimal(pair.get('service_amount_10d', 0)),
|
||
'service_amount_15d': self.safe_decimal(pair.get('service_amount_15d', 0)),
|
||
'service_amount_30d': self.safe_decimal(pair.get('service_amount_30d', 0)),
|
||
'service_amount_60d': self.safe_decimal(pair.get('service_amount_60d', 0)),
|
||
'service_amount_90d': self.safe_decimal(pair.get('service_amount_90d', 0)),
|
||
# 活跃度指标
|
||
'days_since_last': self._calc_days_since(stat_date, pair.get('last_service_date')),
|
||
'is_active_7d': self.safe_int(pair.get('service_count_7d', 0)) > 0,
|
||
'is_active_30d': self.safe_int(pair.get('service_count_30d', 0)) > 0,
|
||
}
|
||
results.append(record)
|
||
|
||
return results
|
||
|
||
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
|
||
"""
|
||
加载数据
|
||
"""
|
||
if not transformed:
|
||
self.logger.info("%s: 无数据需要写入", self.get_task_code())
|
||
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
|
||
|
||
# 删除已存在的数据
|
||
deleted = self.delete_existing_data(context, date_col="stat_date")
|
||
|
||
# 批量插入
|
||
inserted = self.bulk_insert(transformed)
|
||
|
||
self.logger.info(
|
||
"%s: 加载完成,删除 %d 行,插入 %d 行",
|
||
self.get_task_code(), deleted, inserted
|
||
)
|
||
|
||
return {
|
||
"counts": {
|
||
"fetched": len(transformed),
|
||
"inserted": inserted,
|
||
"updated": 0,
|
||
"skipped": 0,
|
||
"errors": 0
|
||
},
|
||
"extra": {"deleted": deleted}
|
||
}
|
||
|
||
# ==========================================================================
|
||
# 数据提取方法
|
||
# ==========================================================================
|
||
|
||
def _extract_service_pairs(
|
||
self,
|
||
site_id: int,
|
||
stat_date: date
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
提取助教-客户服务统计(含滚动窗口)
|
||
"""
|
||
sql = """
|
||
WITH service_base AS (
|
||
SELECT
|
||
site_assistant_id AS assistant_id,
|
||
nickname AS assistant_nickname,
|
||
tenant_member_id AS member_id,
|
||
DATE(start_use_time) AS service_date,
|
||
income_seconds,
|
||
ledger_amount
|
||
FROM billiards_dwd.dwd_assistant_service_log
|
||
WHERE site_id = %s
|
||
AND tenant_member_id IS NOT NULL
|
||
AND tenant_member_id != 0
|
||
AND is_delete = 0
|
||
)
|
||
SELECT
|
||
assistant_id,
|
||
MAX(assistant_nickname) AS assistant_nickname,
|
||
member_id,
|
||
MIN(service_date) AS first_service_date,
|
||
MAX(service_date) AS last_service_date,
|
||
-- 全量累计
|
||
COUNT(*) AS total_service_count,
|
||
SUM(income_seconds) / 3600.0 AS total_service_hours,
|
||
SUM(ledger_amount) AS total_service_amount,
|
||
-- 7天窗口
|
||
COUNT(CASE WHEN service_date >= %s - INTERVAL '6 days' THEN 1 END) AS service_count_7d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '6 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_7d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '6 days' THEN ledger_amount ELSE 0 END) AS service_amount_7d,
|
||
-- 10天窗口
|
||
COUNT(CASE WHEN service_date >= %s - INTERVAL '9 days' THEN 1 END) AS service_count_10d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '9 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_10d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '9 days' THEN ledger_amount ELSE 0 END) AS service_amount_10d,
|
||
-- 15天窗口
|
||
COUNT(CASE WHEN service_date >= %s - INTERVAL '14 days' THEN 1 END) AS service_count_15d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '14 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_15d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '14 days' THEN ledger_amount ELSE 0 END) AS service_amount_15d,
|
||
-- 30天窗口
|
||
COUNT(CASE WHEN service_date >= %s - INTERVAL '29 days' THEN 1 END) AS service_count_30d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '29 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_30d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '29 days' THEN ledger_amount ELSE 0 END) AS service_amount_30d,
|
||
-- 60天窗口
|
||
COUNT(CASE WHEN service_date >= %s - INTERVAL '59 days' THEN 1 END) AS service_count_60d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '59 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_60d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '59 days' THEN ledger_amount ELSE 0 END) AS service_amount_60d,
|
||
-- 90天窗口
|
||
COUNT(CASE WHEN service_date >= %s - INTERVAL '89 days' THEN 1 END) AS service_count_90d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '89 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_90d,
|
||
SUM(CASE WHEN service_date >= %s - INTERVAL '89 days' THEN ledger_amount ELSE 0 END) AS service_amount_90d
|
||
FROM service_base
|
||
GROUP BY assistant_id, member_id
|
||
HAVING MAX(service_date) >= %s - INTERVAL '90 days'
|
||
"""
|
||
# 构建参数(每个窗口需要3个日期参数)
|
||
params = [site_id]
|
||
for _ in range(6): # 6个窗口,每个3个参数
|
||
params.extend([stat_date, stat_date, stat_date])
|
||
params.append(stat_date) # HAVING条件
|
||
|
||
rows = self.db.query(sql, tuple(params))
|
||
return [dict(row) for row in rows] if rows else []
|
||
|
||
def _extract_member_info(self, site_id: int) -> Dict[int, Dict[str, Any]]:
|
||
"""
|
||
提取会员信息
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
member_id,
|
||
nickname,
|
||
mobile
|
||
FROM billiards_dwd.dim_member
|
||
WHERE site_id = %s
|
||
"""
|
||
rows = self.db.query(sql, (site_id,))
|
||
|
||
result = {}
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
result[row_dict['member_id']] = row_dict
|
||
return result
|
||
|
||
def _extract_assistant_info(self, site_id: int) -> Dict[int, Dict[str, Any]]:
|
||
"""
|
||
提取助教信息
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
assistant_id,
|
||
nickname
|
||
FROM billiards_dwd.dim_assistant
|
||
WHERE site_id = %s
|
||
AND scd2_is_current = 1
|
||
"""
|
||
rows = self.db.query(sql, (site_id,))
|
||
|
||
result = {}
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
result[row_dict['assistant_id']] = row_dict
|
||
return result
|
||
|
||
# ==========================================================================
|
||
# 工具方法
|
||
# ==========================================================================
|
||
|
||
def _mask_mobile(self, mobile: Optional[str]) -> Optional[str]:
|
||
"""
|
||
手机号脱敏
|
||
"""
|
||
if not mobile or len(mobile) < 7:
|
||
return mobile
|
||
return mobile[:3] + "****" + mobile[-4:]
|
||
|
||
def _calc_days_since(self, stat_date: date, last_date: Optional[date]) -> Optional[int]:
|
||
"""
|
||
计算距离最近服务的天数
|
||
"""
|
||
if not last_date:
|
||
return None
|
||
if isinstance(last_date, datetime):
|
||
last_date = last_date.date()
|
||
return (stat_date - last_date).days
|
||
|
||
|
||
# 便于外部导入
|
||
__all__ = ['AssistantCustomerTask']
|