462 lines
18 KiB
Python
462 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
会员层召回/转化指数共享逻辑
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass, field
|
||
from datetime import date, datetime, timedelta
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from .base_index_task import BaseIndexTask
|
||
from ..base_dws_task import TaskContext
|
||
|
||
|
||
@dataclass
|
||
class MemberActivityData:
|
||
"""Shared member activity features for WBI/NCI."""
|
||
member_id: int
|
||
site_id: int
|
||
tenant_id: int
|
||
|
||
member_create_time: Optional[datetime] = None
|
||
first_visit_time: Optional[datetime] = None
|
||
last_visit_time: Optional[datetime] = None
|
||
last_recharge_time: Optional[datetime] = None
|
||
|
||
t_v: float = 60.0
|
||
t_r: float = 60.0
|
||
t_a: float = 60.0
|
||
|
||
days_since_first_visit: Optional[int] = None
|
||
days_since_last_visit: Optional[int] = None
|
||
days_since_last_recharge: Optional[int] = None
|
||
|
||
visits_14d: int = 0
|
||
visits_60d: int = 0
|
||
visits_total: int = 0
|
||
|
||
spend_30d: float = 0.0
|
||
spend_180d: float = 0.0
|
||
sv_balance: float = 0.0
|
||
recharge_60d_amt: float = 0.0
|
||
|
||
interval_count: int = 0
|
||
intervals: List[float] = field(default_factory=list)
|
||
interval_ages_days: List[int] = field(default_factory=list)
|
||
|
||
recharge_unconsumed: int = 0
|
||
|
||
|
||
class MemberIndexBaseTask(BaseIndexTask):
|
||
"""Shared extraction and feature building for WBI/NCI."""
|
||
|
||
DEFAULT_VISIT_LOOKBACK_DAYS = 180
|
||
DEFAULT_RECENCY_LOOKBACK_DAYS = 60
|
||
CASH_CARD_TYPE_ID = 2793249295533893
|
||
|
||
def _get_site_id(self, context: Optional[TaskContext]) -> int:
|
||
"""获取门店ID"""
|
||
if context and hasattr(context, 'store_id') and context.store_id:
|
||
return context.store_id
|
||
|
||
site_id = self.config.get('app.default_site_id') or self.config.get('app.store_id')
|
||
if site_id is not None:
|
||
return int(site_id)
|
||
|
||
sql = "SELECT DISTINCT site_id FROM billiards_dwd.dwd_settlement_head WHERE site_id IS NOT NULL LIMIT 1"
|
||
rows = self.db.query(sql)
|
||
if rows:
|
||
value = dict(rows[0]).get('site_id')
|
||
if value is not None:
|
||
return int(value)
|
||
|
||
self.logger.warning("无法确定门店ID,使用 0 继续执行")
|
||
return 0
|
||
|
||
def _get_tenant_id(self) -> int:
|
||
"""获取租户ID"""
|
||
tenant_id = self.config.get('app.tenant_id')
|
||
if tenant_id is not None:
|
||
return int(tenant_id)
|
||
|
||
sql = "SELECT DISTINCT tenant_id FROM billiards_dwd.dwd_settlement_head WHERE tenant_id IS NOT NULL LIMIT 1"
|
||
rows = self.db.query(sql)
|
||
if rows:
|
||
value = dict(rows[0]).get('tenant_id')
|
||
if value is not None:
|
||
return int(value)
|
||
|
||
self.logger.warning("无法确定租户ID,使用 0 继续执行")
|
||
return 0
|
||
|
||
def _load_params(self) -> Dict[str, float]:
|
||
"""Load index parameters with defaults and runtime overrides."""
|
||
params = self.load_index_parameters()
|
||
result = dict(self.DEFAULT_PARAMS)
|
||
result.update(params)
|
||
|
||
# GUI/环境变量可通过 run.index_lookback_days 覆盖 recency 窗口
|
||
override_days = self.config.get('run.index_lookback_days')
|
||
if override_days is not None:
|
||
try:
|
||
override_days_int = int(override_days)
|
||
if override_days_int < 7 or override_days_int > 180:
|
||
self.logger.warning(
|
||
"%s: run.index_lookback_days=%s 超出建议范围[7,180],已自动截断",
|
||
self.get_task_code(),
|
||
override_days,
|
||
)
|
||
override_days_int = max(7, min(180, override_days_int))
|
||
result['lookback_days_recency'] = float(override_days_int)
|
||
self.logger.info(
|
||
"%s: 使用回溯天数覆盖 lookback_days_recency=%d",
|
||
self.get_task_code(),
|
||
override_days_int,
|
||
)
|
||
except (TypeError, ValueError):
|
||
self.logger.warning(
|
||
"%s: run.index_lookback_days=%s is invalid; ignore override and use parameter table value",
|
||
self.get_task_code(),
|
||
override_days,
|
||
)
|
||
|
||
return result
|
||
|
||
def _build_visit_condition_sql(self) -> str:
|
||
"""Build visit-scope condition SQL."""
|
||
return """
|
||
(
|
||
s.settle_type = 1
|
||
OR (
|
||
s.settle_type = 3
|
||
AND EXISTS (
|
||
SELECT 1
|
||
FROM billiards_dwd.dwd_assistant_service_log asl
|
||
JOIN billiards_dws.cfg_skill_type st
|
||
ON asl.skill_id = st.skill_id
|
||
AND st.course_type_code = 'BONUS'
|
||
AND st.is_active = TRUE
|
||
WHERE asl.order_settle_id = s.order_settle_id
|
||
AND asl.site_id = s.site_id
|
||
AND asl.tenant_member_id = s.member_id
|
||
AND asl.is_delete = 0
|
||
)
|
||
)
|
||
)
|
||
"""
|
||
|
||
def _extract_visit_day_rows(
|
||
self,
|
||
site_id: int,
|
||
start_date: date,
|
||
end_date: date,
|
||
) -> List[Dict[str, Any]]:
|
||
"""提取到店记录(按天去重)"""
|
||
condition_sql = self._build_visit_condition_sql()
|
||
sql = f"""
|
||
WITH visit_source AS (
|
||
SELECT
|
||
COALESCE(NULLIF(s.member_id, 0), mca.tenant_member_id) AS canonical_member_id,
|
||
s.pay_time,
|
||
s.pay_amount
|
||
FROM billiards_dwd.dwd_settlement_head s
|
||
LEFT JOIN billiards_dwd.dim_member_card_account mca
|
||
ON s.member_card_account_id = mca.member_card_id
|
||
AND mca.scd2_is_current = 1
|
||
AND mca.register_site_id = s.site_id
|
||
AND COALESCE(mca.is_delete, 0) = 0
|
||
WHERE s.site_id = %s
|
||
AND s.pay_time >= %s
|
||
AND s.pay_time < %s + INTERVAL '1 day'
|
||
AND {condition_sql}
|
||
)
|
||
SELECT
|
||
canonical_member_id AS member_id,
|
||
DATE(pay_time) AS visit_date,
|
||
MAX(pay_time) AS last_visit_time,
|
||
SUM(COALESCE(pay_amount, 0)) AS day_pay_amount
|
||
FROM visit_source
|
||
WHERE canonical_member_id > 0
|
||
GROUP BY canonical_member_id, DATE(pay_time)
|
||
ORDER BY canonical_member_id, visit_date
|
||
"""
|
||
rows = self.db.query(sql, (site_id, start_date, end_date))
|
||
return [dict(row) for row in (rows or [])]
|
||
|
||
def _extract_recharge_rows(
|
||
self,
|
||
site_id: int,
|
||
start_date: date,
|
||
end_date: date,
|
||
) -> Dict[int, Dict[str, Any]]:
|
||
"""提取充值记录(近60天)"""
|
||
sql = """
|
||
WITH recharge_source AS (
|
||
SELECT
|
||
COALESCE(NULLIF(r.member_id, 0), mca.tenant_member_id) AS canonical_member_id,
|
||
r.pay_time,
|
||
r.pay_amount
|
||
FROM billiards_dwd.dwd_recharge_order r
|
||
LEFT JOIN billiards_dwd.dim_member_card_account mca
|
||
ON r.tenant_member_card_id = mca.member_card_id
|
||
AND mca.scd2_is_current = 1
|
||
AND mca.register_site_id = r.site_id
|
||
AND COALESCE(mca.is_delete, 0) = 0
|
||
WHERE r.site_id = %s
|
||
AND r.settle_type = 5
|
||
AND r.pay_time >= %s
|
||
AND r.pay_time < %s + INTERVAL '1 day'
|
||
)
|
||
SELECT
|
||
canonical_member_id AS member_id,
|
||
MAX(pay_time) AS last_recharge_time,
|
||
SUM(COALESCE(pay_amount, 0)) AS recharge_60d_amt
|
||
FROM recharge_source
|
||
WHERE canonical_member_id > 0
|
||
GROUP BY canonical_member_id
|
||
"""
|
||
rows = self.db.query(sql, (site_id, start_date, end_date))
|
||
result: Dict[int, Dict[str, Any]] = {}
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
result[int(row_dict['member_id'])] = row_dict
|
||
return result
|
||
|
||
def _extract_member_create_times(self, member_ids: List[int]) -> Dict[int, datetime]:
|
||
"""提取会员建档时间"""
|
||
if not member_ids:
|
||
return {}
|
||
member_ids_str = ','.join(str(m) for m in member_ids)
|
||
sql = f"""
|
||
SELECT
|
||
member_id,
|
||
create_time
|
||
FROM billiards_dwd.dim_member
|
||
WHERE member_id IN ({member_ids_str})
|
||
AND scd2_is_current = 1
|
||
"""
|
||
rows = self.db.query(sql)
|
||
result = {}
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
member_id = int(row_dict['member_id'])
|
||
create_time = row_dict.get('create_time')
|
||
if create_time:
|
||
result[member_id] = create_time
|
||
return result
|
||
|
||
def _extract_first_visit_times(self, site_id: int, member_ids: List[int]) -> Dict[int, datetime]:
|
||
"""提取首次到店时间(全量)"""
|
||
if not member_ids:
|
||
return {}
|
||
member_ids_str = ','.join(str(m) for m in member_ids)
|
||
condition_sql = self._build_visit_condition_sql()
|
||
sql = f"""
|
||
WITH visit_source AS (
|
||
SELECT
|
||
COALESCE(NULLIF(s.member_id, 0), mca.tenant_member_id) AS canonical_member_id,
|
||
s.pay_time
|
||
FROM billiards_dwd.dwd_settlement_head s
|
||
LEFT JOIN billiards_dwd.dim_member_card_account mca
|
||
ON s.member_card_account_id = mca.member_card_id
|
||
AND mca.scd2_is_current = 1
|
||
AND mca.register_site_id = s.site_id
|
||
AND COALESCE(mca.is_delete, 0) = 0
|
||
WHERE s.site_id = %s
|
||
AND {condition_sql}
|
||
)
|
||
SELECT
|
||
canonical_member_id AS member_id,
|
||
MIN(pay_time) AS first_visit_time
|
||
FROM visit_source
|
||
WHERE canonical_member_id IN ({member_ids_str})
|
||
GROUP BY canonical_member_id
|
||
"""
|
||
rows = self.db.query(sql, (site_id,))
|
||
result = {}
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
member_id = int(row_dict['member_id'])
|
||
first_visit_time = row_dict.get('first_visit_time')
|
||
if first_visit_time:
|
||
result[member_id] = first_visit_time
|
||
return result
|
||
|
||
def _extract_sv_balances(self, site_id: int, tenant_id: int, member_ids: List[int]) -> Dict[int, Decimal]:
|
||
"""Fetch member stored-value card balances."""
|
||
if not member_ids:
|
||
return {}
|
||
member_ids_str = ','.join(str(m) for m in member_ids)
|
||
sql = f"""
|
||
SELECT
|
||
tenant_member_id AS member_id,
|
||
SUM(CASE WHEN card_type_id = %s THEN balance ELSE 0 END) AS sv_balance
|
||
FROM billiards_dwd.dim_member_card_account
|
||
WHERE tenant_id = %s
|
||
AND register_site_id = %s
|
||
AND scd2_is_current = 1
|
||
AND COALESCE(is_delete, 0) = 0
|
||
AND tenant_member_id IN ({member_ids_str})
|
||
GROUP BY tenant_member_id
|
||
"""
|
||
rows = self.db.query(sql, (self.CASH_CARD_TYPE_ID, tenant_id, site_id))
|
||
result: Dict[int, Decimal] = {}
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
member_id = int(row_dict['member_id'])
|
||
result[member_id] = row_dict.get('sv_balance') or Decimal('0')
|
||
return result
|
||
|
||
def _build_member_activity(
|
||
self,
|
||
site_id: int,
|
||
tenant_id: int,
|
||
params: Dict[str, float],
|
||
) -> Dict[int, MemberActivityData]:
|
||
"""构建会员活动特征"""
|
||
now = datetime.now(self.tz)
|
||
base_date = now.date()
|
||
|
||
visit_lookback_days = int(params.get('visit_lookback_days', self.DEFAULT_VISIT_LOOKBACK_DAYS))
|
||
recency_days = int(params.get('lookback_days_recency', self.DEFAULT_RECENCY_LOOKBACK_DAYS))
|
||
|
||
visit_start_date = base_date - timedelta(days=visit_lookback_days)
|
||
visit_rows = self._extract_visit_day_rows(site_id, visit_start_date, base_date)
|
||
|
||
member_day_rows: Dict[int, List[Dict[str, Any]]] = {}
|
||
for row in (visit_rows or []):
|
||
member_id = int(row['member_id'])
|
||
member_day_rows.setdefault(member_id, []).append(row)
|
||
|
||
recharge_start_date = base_date - timedelta(days=recency_days)
|
||
recharge_rows = self._extract_recharge_rows(site_id, recharge_start_date, base_date)
|
||
|
||
member_ids = set(member_day_rows.keys()) | set(recharge_rows.keys())
|
||
if not member_ids:
|
||
return {}
|
||
|
||
member_id_list = list(member_ids)
|
||
member_create_times = self._extract_member_create_times(member_id_list)
|
||
first_visit_times = self._extract_first_visit_times(site_id, member_id_list)
|
||
sv_balances = self._extract_sv_balances(site_id, tenant_id, member_id_list)
|
||
|
||
results: Dict[int, MemberActivityData] = {}
|
||
for member_id in member_ids:
|
||
data = MemberActivityData(
|
||
member_id=member_id,
|
||
site_id=site_id,
|
||
tenant_id=tenant_id,
|
||
)
|
||
|
||
day_rows = member_day_rows.get(member_id, [])
|
||
if day_rows:
|
||
day_rows_sorted = sorted(day_rows, key=lambda x: x['visit_date'])
|
||
data.visits_total = len(day_rows_sorted)
|
||
|
||
last_visit_time = max(r.get('last_visit_time') for r in day_rows_sorted)
|
||
data.last_visit_time = last_visit_time
|
||
|
||
# 近14/60天到店次数
|
||
days_14_ago = base_date - timedelta(days=14)
|
||
days_60_ago = base_date - timedelta(days=60)
|
||
for r in day_rows_sorted:
|
||
visit_date = r.get('visit_date')
|
||
if visit_date is None:
|
||
continue
|
||
if visit_date >= days_14_ago:
|
||
data.visits_14d += 1
|
||
if visit_date >= days_60_ago:
|
||
data.visits_60d += 1
|
||
|
||
# 消费金额
|
||
days_30_ago = base_date - timedelta(days=30)
|
||
for r in day_rows_sorted:
|
||
visit_date = r.get('visit_date')
|
||
day_pay = float(r.get('day_pay_amount') or 0)
|
||
data.spend_180d += day_pay
|
||
if visit_date and visit_date >= days_30_ago:
|
||
data.spend_30d += day_pay
|
||
|
||
# 计算到店间隔(按天)
|
||
visit_dates = [r.get('visit_date') for r in day_rows_sorted if r.get('visit_date')]
|
||
intervals: List[float] = []
|
||
interval_ages_days: List[int] = []
|
||
for i in range(1, len(visit_dates)):
|
||
interval = (visit_dates[i] - visit_dates[i - 1]).days
|
||
intervals.append(float(min(recency_days, interval)))
|
||
interval_ages_days.append(max(0, (base_date - visit_dates[i]).days))
|
||
data.intervals = intervals
|
||
data.interval_ages_days = interval_ages_days
|
||
data.interval_count = len(intervals)
|
||
|
||
recharge_info = recharge_rows.get(member_id)
|
||
if recharge_info:
|
||
data.last_recharge_time = recharge_info.get('last_recharge_time')
|
||
data.recharge_60d_amt = float(recharge_info.get('recharge_60d_amt') or 0)
|
||
|
||
data.member_create_time = member_create_times.get(member_id)
|
||
data.first_visit_time = first_visit_times.get(member_id)
|
||
sv_balance = sv_balances.get(member_id)
|
||
if sv_balance is not None:
|
||
data.sv_balance = float(sv_balance)
|
||
|
||
# 时间差计算
|
||
if data.first_visit_time:
|
||
data.days_since_first_visit = (base_date - data.first_visit_time.date()).days
|
||
if data.last_visit_time:
|
||
data.days_since_last_visit = (base_date - data.last_visit_time.date()).days
|
||
if data.last_recharge_time:
|
||
data.days_since_last_recharge = (base_date - data.last_recharge_time.date()).days
|
||
|
||
# tV/tR/tA
|
||
data.t_v = float(min(recency_days, data.days_since_last_visit)) if data.days_since_last_visit is not None else float(recency_days)
|
||
data.t_r = float(min(recency_days, data.days_since_last_recharge)) if data.days_since_last_recharge is not None else float(recency_days)
|
||
data.t_a = float(min(data.t_v, data.t_r))
|
||
|
||
# 充值是否未回访
|
||
if data.last_recharge_time and (data.last_visit_time is None or data.last_recharge_time > data.last_visit_time):
|
||
data.recharge_unconsumed = 1
|
||
|
||
results[member_id] = data
|
||
|
||
return results
|
||
|
||
def classify_segment(
|
||
self,
|
||
data: MemberActivityData,
|
||
params: Dict[str, float],
|
||
) -> Tuple[str, str, bool]:
|
||
"""Classify member into NEW/OLD/STOP buckets."""
|
||
recency_days = int(params.get('lookback_days_recency', self.DEFAULT_RECENCY_LOOKBACK_DAYS))
|
||
enable_stop_exception = int(params.get('enable_stop_high_balance_exception', 0)) == 1
|
||
high_balance_threshold = float(params.get('high_balance_threshold', 1000))
|
||
|
||
if data.t_a >= recency_days:
|
||
if enable_stop_exception and data.sv_balance >= high_balance_threshold:
|
||
return "STOP", "STOP_HIGH_BALANCE", True
|
||
return "STOP", "STOP", False
|
||
|
||
new_visit_threshold = int(params.get('new_visit_threshold', 2))
|
||
new_days_threshold = int(params.get('new_days_threshold', 30))
|
||
recharge_recent_days = int(params.get('recharge_recent_days', 14))
|
||
new_recharge_max_visits = int(params.get('new_recharge_max_visits', 10))
|
||
|
||
is_new_by_visits = data.visits_total <= new_visit_threshold
|
||
is_new_by_first_visit = data.days_since_first_visit is not None and data.days_since_first_visit <= new_days_threshold
|
||
is_new_by_recharge = (
|
||
data.recharge_unconsumed == 1
|
||
and data.days_since_last_recharge is not None
|
||
and data.days_since_last_recharge <= recharge_recent_days
|
||
and data.visits_total <= new_recharge_max_visits
|
||
)
|
||
|
||
if is_new_by_visits or is_new_by_first_visit or is_new_by_recharge:
|
||
return "NEW", "NEW", True
|
||
|
||
return "OLD", "OLD", True
|
||
|
||
|
||
|