# -*- coding: utf-8 -*- """ 会员层召回/转化指数共享逻辑 """ from __future__ import annotations from dataclasses import dataclass, field from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple from .base_index_task import BaseIndexTask from ..base_dws_task import TaskContext @dataclass class MemberActivityData: """Shared member activity features for WBI/NCI.""" member_id: int site_id: int tenant_id: int member_create_time: Optional[datetime] = None first_visit_time: Optional[datetime] = None last_visit_time: Optional[datetime] = None last_recharge_time: Optional[datetime] = None t_v: float = 60.0 t_r: float = 60.0 t_a: float = 60.0 days_since_first_visit: Optional[int] = None days_since_last_visit: Optional[int] = None days_since_last_recharge: Optional[int] = None visits_14d: int = 0 visits_60d: int = 0 visits_total: int = 0 spend_30d: float = 0.0 spend_180d: float = 0.0 sv_balance: float = 0.0 recharge_60d_amt: float = 0.0 interval_count: int = 0 intervals: List[float] = field(default_factory=list) interval_ages_days: List[int] = field(default_factory=list) recharge_unconsumed: int = 0 class MemberIndexBaseTask(BaseIndexTask): """Shared extraction and feature building for WBI/NCI.""" DEFAULT_VISIT_LOOKBACK_DAYS = 180 DEFAULT_RECENCY_LOOKBACK_DAYS = 60 CASH_CARD_TYPE_ID = 2793249295533893 def _get_site_id(self, context: Optional[TaskContext]) -> int: """获取门店ID""" if context and hasattr(context, 'store_id') and context.store_id: return context.store_id site_id = self.config.get('app.default_site_id') or self.config.get('app.store_id') if site_id is not None: return int(site_id) sql = "SELECT DISTINCT site_id FROM billiards_dwd.dwd_settlement_head WHERE site_id IS NOT NULL LIMIT 1" rows = self.db.query(sql) if rows: value = dict(rows[0]).get('site_id') if value is not None: return int(value) self.logger.warning("无法确定门店ID,使用 0 继续执行") return 0 def _get_tenant_id(self) -> int: """获取租户ID""" tenant_id = self.config.get('app.tenant_id') if tenant_id is not None: return int(tenant_id) sql = "SELECT DISTINCT tenant_id FROM billiards_dwd.dwd_settlement_head WHERE tenant_id IS NOT NULL LIMIT 1" rows = self.db.query(sql) if rows: value = dict(rows[0]).get('tenant_id') if value is not None: return int(value) self.logger.warning("无法确定租户ID,使用 0 继续执行") return 0 def _load_params(self) -> Dict[str, float]: """Load index parameters with defaults and runtime overrides.""" params = self.load_index_parameters() result = dict(self.DEFAULT_PARAMS) result.update(params) # GUI/环境变量可通过 run.index_lookback_days 覆盖 recency 窗口 override_days = self.config.get('run.index_lookback_days') if override_days is not None: try: override_days_int = int(override_days) if override_days_int < 7 or override_days_int > 180: self.logger.warning( "%s: run.index_lookback_days=%s 超出建议范围[7,180],已自动截断", self.get_task_code(), override_days, ) override_days_int = max(7, min(180, override_days_int)) result['lookback_days_recency'] = float(override_days_int) self.logger.info( "%s: 使用回溯天数覆盖 lookback_days_recency=%d", self.get_task_code(), override_days_int, ) except (TypeError, ValueError): self.logger.warning( "%s: run.index_lookback_days=%s is invalid; ignore override and use parameter table value", self.get_task_code(), override_days, ) return result def _build_visit_condition_sql(self) -> str: """Build visit-scope condition SQL.""" return """ ( s.settle_type = 1 OR ( s.settle_type = 3 AND EXISTS ( SELECT 1 FROM billiards_dwd.dwd_assistant_service_log asl JOIN billiards_dws.cfg_skill_type st ON asl.skill_id = st.skill_id AND st.course_type_code = 'BONUS' AND st.is_active = TRUE WHERE asl.order_settle_id = s.order_settle_id AND asl.site_id = s.site_id AND asl.tenant_member_id = s.member_id AND asl.is_delete = 0 ) ) ) """ def _extract_visit_day_rows( self, site_id: int, start_date: date, end_date: date, ) -> List[Dict[str, Any]]: """提取到店记录(按天去重)""" condition_sql = self._build_visit_condition_sql() sql = f""" WITH visit_source AS ( SELECT COALESCE(NULLIF(s.member_id, 0), mca.tenant_member_id) AS canonical_member_id, s.pay_time, s.pay_amount FROM billiards_dwd.dwd_settlement_head s LEFT JOIN billiards_dwd.dim_member_card_account mca ON s.member_card_account_id = mca.member_card_id AND mca.scd2_is_current = 1 AND mca.register_site_id = s.site_id AND COALESCE(mca.is_delete, 0) = 0 WHERE s.site_id = %s AND s.pay_time >= %s AND s.pay_time < %s + INTERVAL '1 day' AND {condition_sql} ) SELECT canonical_member_id AS member_id, DATE(pay_time) AS visit_date, MAX(pay_time) AS last_visit_time, SUM(COALESCE(pay_amount, 0)) AS day_pay_amount FROM visit_source WHERE canonical_member_id > 0 GROUP BY canonical_member_id, DATE(pay_time) ORDER BY canonical_member_id, visit_date """ rows = self.db.query(sql, (site_id, start_date, end_date)) return [dict(row) for row in (rows or [])] def _extract_recharge_rows( self, site_id: int, start_date: date, end_date: date, ) -> Dict[int, Dict[str, Any]]: """提取充值记录(近60天)""" sql = """ WITH recharge_source AS ( SELECT COALESCE(NULLIF(r.member_id, 0), mca.tenant_member_id) AS canonical_member_id, r.pay_time, r.pay_amount FROM billiards_dwd.dwd_recharge_order r LEFT JOIN billiards_dwd.dim_member_card_account mca ON r.tenant_member_card_id = mca.member_card_id AND mca.scd2_is_current = 1 AND mca.register_site_id = r.site_id AND COALESCE(mca.is_delete, 0) = 0 WHERE r.site_id = %s AND r.settle_type = 5 AND r.pay_time >= %s AND r.pay_time < %s + INTERVAL '1 day' ) SELECT canonical_member_id AS member_id, MAX(pay_time) AS last_recharge_time, SUM(COALESCE(pay_amount, 0)) AS recharge_60d_amt FROM recharge_source WHERE canonical_member_id > 0 GROUP BY canonical_member_id """ rows = self.db.query(sql, (site_id, start_date, end_date)) result: Dict[int, Dict[str, Any]] = {} for row in (rows or []): row_dict = dict(row) result[int(row_dict['member_id'])] = row_dict return result def _extract_member_create_times(self, member_ids: List[int]) -> Dict[int, datetime]: """提取会员建档时间""" if not member_ids: return {} member_ids_str = ','.join(str(m) for m in member_ids) sql = f""" SELECT member_id, create_time FROM billiards_dwd.dim_member WHERE member_id IN ({member_ids_str}) AND scd2_is_current = 1 """ rows = self.db.query(sql) result = {} for row in (rows or []): row_dict = dict(row) member_id = int(row_dict['member_id']) create_time = row_dict.get('create_time') if create_time: result[member_id] = create_time return result def _extract_first_visit_times(self, site_id: int, member_ids: List[int]) -> Dict[int, datetime]: """提取首次到店时间(全量)""" if not member_ids: return {} member_ids_str = ','.join(str(m) for m in member_ids) condition_sql = self._build_visit_condition_sql() sql = f""" WITH visit_source AS ( SELECT COALESCE(NULLIF(s.member_id, 0), mca.tenant_member_id) AS canonical_member_id, s.pay_time FROM billiards_dwd.dwd_settlement_head s LEFT JOIN billiards_dwd.dim_member_card_account mca ON s.member_card_account_id = mca.member_card_id AND mca.scd2_is_current = 1 AND mca.register_site_id = s.site_id AND COALESCE(mca.is_delete, 0) = 0 WHERE s.site_id = %s AND {condition_sql} ) SELECT canonical_member_id AS member_id, MIN(pay_time) AS first_visit_time FROM visit_source WHERE canonical_member_id IN ({member_ids_str}) GROUP BY canonical_member_id """ rows = self.db.query(sql, (site_id,)) result = {} for row in (rows or []): row_dict = dict(row) member_id = int(row_dict['member_id']) first_visit_time = row_dict.get('first_visit_time') if first_visit_time: result[member_id] = first_visit_time return result def _extract_sv_balances(self, site_id: int, tenant_id: int, member_ids: List[int]) -> Dict[int, Decimal]: """Fetch member stored-value card balances.""" if not member_ids: return {} member_ids_str = ','.join(str(m) for m in member_ids) sql = f""" SELECT tenant_member_id AS member_id, SUM(CASE WHEN card_type_id = %s THEN balance ELSE 0 END) AS sv_balance FROM billiards_dwd.dim_member_card_account WHERE tenant_id = %s AND register_site_id = %s AND scd2_is_current = 1 AND COALESCE(is_delete, 0) = 0 AND tenant_member_id IN ({member_ids_str}) GROUP BY tenant_member_id """ rows = self.db.query(sql, (self.CASH_CARD_TYPE_ID, tenant_id, site_id)) result: Dict[int, Decimal] = {} for row in (rows or []): row_dict = dict(row) member_id = int(row_dict['member_id']) result[member_id] = row_dict.get('sv_balance') or Decimal('0') return result def _build_member_activity( self, site_id: int, tenant_id: int, params: Dict[str, float], ) -> Dict[int, MemberActivityData]: """构建会员活动特征""" now = datetime.now(self.tz) base_date = now.date() visit_lookback_days = int(params.get('visit_lookback_days', self.DEFAULT_VISIT_LOOKBACK_DAYS)) recency_days = int(params.get('lookback_days_recency', self.DEFAULT_RECENCY_LOOKBACK_DAYS)) visit_start_date = base_date - timedelta(days=visit_lookback_days) visit_rows = self._extract_visit_day_rows(site_id, visit_start_date, base_date) member_day_rows: Dict[int, List[Dict[str, Any]]] = {} for row in (visit_rows or []): member_id = int(row['member_id']) member_day_rows.setdefault(member_id, []).append(row) recharge_start_date = base_date - timedelta(days=recency_days) recharge_rows = self._extract_recharge_rows(site_id, recharge_start_date, base_date) member_ids = set(member_day_rows.keys()) | set(recharge_rows.keys()) if not member_ids: return {} member_id_list = list(member_ids) member_create_times = self._extract_member_create_times(member_id_list) first_visit_times = self._extract_first_visit_times(site_id, member_id_list) sv_balances = self._extract_sv_balances(site_id, tenant_id, member_id_list) results: Dict[int, MemberActivityData] = {} for member_id in member_ids: data = MemberActivityData( member_id=member_id, site_id=site_id, tenant_id=tenant_id, ) day_rows = member_day_rows.get(member_id, []) if day_rows: day_rows_sorted = sorted(day_rows, key=lambda x: x['visit_date']) data.visits_total = len(day_rows_sorted) last_visit_time = max(r.get('last_visit_time') for r in day_rows_sorted) data.last_visit_time = last_visit_time # 近14/60天到店次数 days_14_ago = base_date - timedelta(days=14) days_60_ago = base_date - timedelta(days=60) for r in day_rows_sorted: visit_date = r.get('visit_date') if visit_date is None: continue if visit_date >= days_14_ago: data.visits_14d += 1 if visit_date >= days_60_ago: data.visits_60d += 1 # 消费金额 days_30_ago = base_date - timedelta(days=30) for r in day_rows_sorted: visit_date = r.get('visit_date') day_pay = float(r.get('day_pay_amount') or 0) data.spend_180d += day_pay if visit_date and visit_date >= days_30_ago: data.spend_30d += day_pay # 计算到店间隔(按天) visit_dates = [r.get('visit_date') for r in day_rows_sorted if r.get('visit_date')] intervals: List[float] = [] interval_ages_days: List[int] = [] for i in range(1, len(visit_dates)): interval = (visit_dates[i] - visit_dates[i - 1]).days intervals.append(float(min(recency_days, interval))) interval_ages_days.append(max(0, (base_date - visit_dates[i]).days)) data.intervals = intervals data.interval_ages_days = interval_ages_days data.interval_count = len(intervals) recharge_info = recharge_rows.get(member_id) if recharge_info: data.last_recharge_time = recharge_info.get('last_recharge_time') data.recharge_60d_amt = float(recharge_info.get('recharge_60d_amt') or 0) data.member_create_time = member_create_times.get(member_id) data.first_visit_time = first_visit_times.get(member_id) sv_balance = sv_balances.get(member_id) if sv_balance is not None: data.sv_balance = float(sv_balance) # 时间差计算 if data.first_visit_time: data.days_since_first_visit = (base_date - data.first_visit_time.date()).days if data.last_visit_time: data.days_since_last_visit = (base_date - data.last_visit_time.date()).days if data.last_recharge_time: data.days_since_last_recharge = (base_date - data.last_recharge_time.date()).days # tV/tR/tA data.t_v = float(min(recency_days, data.days_since_last_visit)) if data.days_since_last_visit is not None else float(recency_days) data.t_r = float(min(recency_days, data.days_since_last_recharge)) if data.days_since_last_recharge is not None else float(recency_days) data.t_a = float(min(data.t_v, data.t_r)) # 充值是否未回访 if data.last_recharge_time and (data.last_visit_time is None or data.last_recharge_time > data.last_visit_time): data.recharge_unconsumed = 1 results[member_id] = data return results def classify_segment( self, data: MemberActivityData, params: Dict[str, float], ) -> Tuple[str, str, bool]: """Classify member into NEW/OLD/STOP buckets.""" recency_days = int(params.get('lookback_days_recency', self.DEFAULT_RECENCY_LOOKBACK_DAYS)) enable_stop_exception = int(params.get('enable_stop_high_balance_exception', 0)) == 1 high_balance_threshold = float(params.get('high_balance_threshold', 1000)) if data.t_a >= recency_days: if enable_stop_exception and data.sv_balance >= high_balance_threshold: return "STOP", "STOP_HIGH_BALANCE", True return "STOP", "STOP", False new_visit_threshold = int(params.get('new_visit_threshold', 2)) new_days_threshold = int(params.get('new_days_threshold', 30)) recharge_recent_days = int(params.get('recharge_recent_days', 14)) new_recharge_max_visits = int(params.get('new_recharge_max_visits', 10)) is_new_by_visits = data.visits_total <= new_visit_threshold is_new_by_first_visit = data.days_since_first_visit is not None and data.days_since_first_visit <= new_days_threshold is_new_by_recharge = ( data.recharge_unconsumed == 1 and data.days_since_last_recharge is not None and data.days_since_last_recharge <= recharge_recent_days and data.visits_total <= new_recharge_max_visits ) if is_new_by_visits or is_new_by_first_visit or is_new_by_recharge: return "NEW", "NEW", True return "OLD", "OLD", True