# -*- coding: utf-8 -*- """ 助教服务客户统计任务 功能说明: 以"助教+客户"为粒度,统计服务关系和滚动窗口指标 数据来源: - dwd_assistant_service_log: 助教服务流水 - dim_member: 会员维度 目标表: billiards_dws.dws_assistant_customer_stats 更新策略: - 更新频率:每日更新 - 幂等方式:delete-before-insert(按统计日期) 业务规则: - 散客处理:member_id=0 不进入此表统计 - 滚动窗口:7/10/15/30/60/90天 - 活跃度:近7天/30天是否有服务 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Set, Tuple from .base_dws_task import BaseDwsTask, TaskContext class AssistantCustomerTask(BaseDwsTask): """ 助教服务客户统计任务 统计每个助教与每个客户的服务关系: - 首次/最近服务日期 - 累计服务统计 - 滚动窗口统计(7/10/15/30/60/90天) - 活跃度指标 """ def get_task_code(self) -> str: return "DWS_ASSISTANT_CUSTOMER" def get_target_table(self) -> str: return "dws_assistant_customer_stats" def get_primary_keys(self) -> List[str]: return ["site_id", "assistant_id", "member_id", "stat_date"] # ========================================================================== # ETL主流程 # ========================================================================== def extract(self, context: TaskContext) -> Dict[str, Any]: """ 提取数据 """ stat_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end site_id = context.store_id self.logger.info( "%s: 提取数据,统计日期 %s", self.get_task_code(), stat_date ) # 计算最大回溯日期(90天窗口) lookback_start = stat_date - timedelta(days=90) # 1. 获取助教-客户服务记录(包含历史全量用于累计统计) service_pairs = self._extract_service_pairs(site_id, stat_date) # 2. 获取会员信息 member_info = self._extract_member_info(site_id) # 3. 获取助教信息 assistant_info = self._extract_assistant_info(site_id) return { 'service_pairs': service_pairs, 'member_info': member_info, 'assistant_info': assistant_info, 'stat_date': stat_date, 'site_id': site_id } def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]: """ 转换数据:计算各窗口统计 """ service_pairs = extracted['service_pairs'] member_info = extracted['member_info'] assistant_info = extracted['assistant_info'] stat_date = extracted['stat_date'] site_id = extracted['site_id'] self.logger.info( "%s: 转换数据,%d 条服务关系记录", self.get_task_code(), len(service_pairs) ) # 构建统计记录 results = [] for pair in service_pairs: assistant_id = pair.get('assistant_id') member_id = pair.get('member_id') # 跳过散客 if self.is_guest(member_id): continue asst_info = assistant_info.get(assistant_id, {}) memb_info = member_info.get(member_id, {}) # 构建记录 record = { 'site_id': site_id, 'tenant_id': self.config.get("app.tenant_id", site_id), 'assistant_id': assistant_id, 'assistant_nickname': asst_info.get('nickname', pair.get('assistant_nickname')), 'member_id': member_id, 'member_nickname': memb_info.get('nickname'), 'member_mobile': self._mask_mobile(memb_info.get('mobile')), 'stat_date': stat_date, # 全量累计统计 'first_service_date': pair.get('first_service_date'), 'last_service_date': pair.get('last_service_date'), 'total_service_count': self.safe_int(pair.get('total_service_count', 0)), 'total_service_hours': self.safe_decimal(pair.get('total_service_hours', 0)), 'total_service_amount': self.safe_decimal(pair.get('total_service_amount', 0)), # 滚动窗口统计 'service_count_7d': self.safe_int(pair.get('service_count_7d', 0)), 'service_count_10d': self.safe_int(pair.get('service_count_10d', 0)), 'service_count_15d': self.safe_int(pair.get('service_count_15d', 0)), 'service_count_30d': self.safe_int(pair.get('service_count_30d', 0)), 'service_count_60d': self.safe_int(pair.get('service_count_60d', 0)), 'service_count_90d': self.safe_int(pair.get('service_count_90d', 0)), 'service_hours_7d': self.safe_decimal(pair.get('service_hours_7d', 0)), 'service_hours_10d': self.safe_decimal(pair.get('service_hours_10d', 0)), 'service_hours_15d': self.safe_decimal(pair.get('service_hours_15d', 0)), 'service_hours_30d': self.safe_decimal(pair.get('service_hours_30d', 0)), 'service_hours_60d': self.safe_decimal(pair.get('service_hours_60d', 0)), 'service_hours_90d': self.safe_decimal(pair.get('service_hours_90d', 0)), 'service_amount_7d': self.safe_decimal(pair.get('service_amount_7d', 0)), 'service_amount_10d': self.safe_decimal(pair.get('service_amount_10d', 0)), 'service_amount_15d': self.safe_decimal(pair.get('service_amount_15d', 0)), 'service_amount_30d': self.safe_decimal(pair.get('service_amount_30d', 0)), 'service_amount_60d': self.safe_decimal(pair.get('service_amount_60d', 0)), 'service_amount_90d': self.safe_decimal(pair.get('service_amount_90d', 0)), # 活跃度指标 'days_since_last': self._calc_days_since(stat_date, pair.get('last_service_date')), 'is_active_7d': self.safe_int(pair.get('service_count_7d', 0)) > 0, 'is_active_30d': self.safe_int(pair.get('service_count_30d', 0)) > 0, } results.append(record) return results def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict: """ 加载数据 """ if not transformed: self.logger.info("%s: 无数据需要写入", self.get_task_code()) return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}} # 删除已存在的数据 deleted = self.delete_existing_data(context, date_col="stat_date") # 批量插入 inserted = self.bulk_insert(transformed) self.logger.info( "%s: 加载完成,删除 %d 行,插入 %d 行", self.get_task_code(), deleted, inserted ) return { "counts": { "fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0 }, "extra": {"deleted": deleted} } # ========================================================================== # 数据提取方法 # ========================================================================== def _extract_service_pairs( self, site_id: int, stat_date: date ) -> List[Dict[str, Any]]: """ 提取助教-客户服务统计(含滚动窗口) """ sql = """ WITH service_base AS ( SELECT site_assistant_id AS assistant_id, nickname AS assistant_nickname, tenant_member_id AS member_id, DATE(start_use_time) AS service_date, income_seconds, ledger_amount FROM billiards_dwd.dwd_assistant_service_log WHERE site_id = %s AND tenant_member_id IS NOT NULL AND tenant_member_id != 0 AND is_delete = 0 ) SELECT assistant_id, MAX(assistant_nickname) AS assistant_nickname, member_id, MIN(service_date) AS first_service_date, MAX(service_date) AS last_service_date, -- 全量累计 COUNT(*) AS total_service_count, SUM(income_seconds) / 3600.0 AS total_service_hours, SUM(ledger_amount) AS total_service_amount, -- 7天窗口 COUNT(CASE WHEN service_date >= %s - INTERVAL '6 days' THEN 1 END) AS service_count_7d, SUM(CASE WHEN service_date >= %s - INTERVAL '6 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_7d, SUM(CASE WHEN service_date >= %s - INTERVAL '6 days' THEN ledger_amount ELSE 0 END) AS service_amount_7d, -- 10天窗口 COUNT(CASE WHEN service_date >= %s - INTERVAL '9 days' THEN 1 END) AS service_count_10d, SUM(CASE WHEN service_date >= %s - INTERVAL '9 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_10d, SUM(CASE WHEN service_date >= %s - INTERVAL '9 days' THEN ledger_amount ELSE 0 END) AS service_amount_10d, -- 15天窗口 COUNT(CASE WHEN service_date >= %s - INTERVAL '14 days' THEN 1 END) AS service_count_15d, SUM(CASE WHEN service_date >= %s - INTERVAL '14 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_15d, SUM(CASE WHEN service_date >= %s - INTERVAL '14 days' THEN ledger_amount ELSE 0 END) AS service_amount_15d, -- 30天窗口 COUNT(CASE WHEN service_date >= %s - INTERVAL '29 days' THEN 1 END) AS service_count_30d, SUM(CASE WHEN service_date >= %s - INTERVAL '29 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_30d, SUM(CASE WHEN service_date >= %s - INTERVAL '29 days' THEN ledger_amount ELSE 0 END) AS service_amount_30d, -- 60天窗口 COUNT(CASE WHEN service_date >= %s - INTERVAL '59 days' THEN 1 END) AS service_count_60d, SUM(CASE WHEN service_date >= %s - INTERVAL '59 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_60d, SUM(CASE WHEN service_date >= %s - INTERVAL '59 days' THEN ledger_amount ELSE 0 END) AS service_amount_60d, -- 90天窗口 COUNT(CASE WHEN service_date >= %s - INTERVAL '89 days' THEN 1 END) AS service_count_90d, SUM(CASE WHEN service_date >= %s - INTERVAL '89 days' THEN income_seconds ELSE 0 END) / 3600.0 AS service_hours_90d, SUM(CASE WHEN service_date >= %s - INTERVAL '89 days' THEN ledger_amount ELSE 0 END) AS service_amount_90d FROM service_base GROUP BY assistant_id, member_id HAVING MAX(service_date) >= %s - INTERVAL '90 days' """ # 构建参数(每个窗口需要3个日期参数) params = [site_id] for _ in range(6): # 6个窗口,每个3个参数 params.extend([stat_date, stat_date, stat_date]) params.append(stat_date) # HAVING条件 rows = self.db.query(sql, tuple(params)) return [dict(row) for row in rows] if rows else [] def _extract_member_info(self, site_id: int) -> Dict[int, Dict[str, Any]]: """ 提取会员信息 """ sql = """ SELECT member_id, nickname, mobile FROM billiards_dwd.dim_member WHERE site_id = %s """ rows = self.db.query(sql, (site_id,)) result = {} for row in (rows or []): row_dict = dict(row) result[row_dict['member_id']] = row_dict return result def _extract_assistant_info(self, site_id: int) -> Dict[int, Dict[str, Any]]: """ 提取助教信息 """ sql = """ SELECT assistant_id, nickname FROM billiards_dwd.dim_assistant WHERE site_id = %s AND scd2_is_current = 1 """ rows = self.db.query(sql, (site_id,)) result = {} for row in (rows or []): row_dict = dict(row) result[row_dict['assistant_id']] = row_dict return result # ========================================================================== # 工具方法 # ========================================================================== def _mask_mobile(self, mobile: Optional[str]) -> Optional[str]: """ 手机号脱敏 """ if not mobile or len(mobile) < 7: return mobile return mobile[:3] + "****" + mobile[-4:] def _calc_days_since(self, stat_date: date, last_date: Optional[date]) -> Optional[int]: """ 计算距离最近服务的天数 """ if not last_date: return None if isinstance(last_date, datetime): last_date = last_date.date() return (stat_date - last_date).days # 便于外部导入 __all__ = ['AssistantCustomerTask']