Files
ZQYY.FQ-ETL/tasks/dws/index/member_index_base.py

462 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
会员层召回/转化指数共享逻辑
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from .base_index_task import BaseIndexTask
from ..base_dws_task import TaskContext
@dataclass
class MemberActivityData:
"""Shared member activity features for WBI/NCI."""
member_id: int
site_id: int
tenant_id: int
member_create_time: Optional[datetime] = None
first_visit_time: Optional[datetime] = None
last_visit_time: Optional[datetime] = None
last_recharge_time: Optional[datetime] = None
t_v: float = 60.0
t_r: float = 60.0
t_a: float = 60.0
days_since_first_visit: Optional[int] = None
days_since_last_visit: Optional[int] = None
days_since_last_recharge: Optional[int] = None
visits_14d: int = 0
visits_60d: int = 0
visits_total: int = 0
spend_30d: float = 0.0
spend_180d: float = 0.0
sv_balance: float = 0.0
recharge_60d_amt: float = 0.0
interval_count: int = 0
intervals: List[float] = field(default_factory=list)
interval_ages_days: List[int] = field(default_factory=list)
recharge_unconsumed: int = 0
class MemberIndexBaseTask(BaseIndexTask):
"""Shared extraction and feature building for WBI/NCI."""
DEFAULT_VISIT_LOOKBACK_DAYS = 180
DEFAULT_RECENCY_LOOKBACK_DAYS = 60
CASH_CARD_TYPE_ID = 2793249295533893
def _get_site_id(self, context: Optional[TaskContext]) -> int:
"""获取门店ID"""
if context and hasattr(context, 'store_id') and context.store_id:
return context.store_id
site_id = self.config.get('app.default_site_id') or self.config.get('app.store_id')
if site_id is not None:
return int(site_id)
sql = "SELECT DISTINCT site_id FROM billiards_dwd.dwd_settlement_head WHERE site_id IS NOT NULL LIMIT 1"
rows = self.db.query(sql)
if rows:
value = dict(rows[0]).get('site_id')
if value is not None:
return int(value)
self.logger.warning("无法确定门店ID使用 0 继续执行")
return 0
def _get_tenant_id(self) -> int:
"""获取租户ID"""
tenant_id = self.config.get('app.tenant_id')
if tenant_id is not None:
return int(tenant_id)
sql = "SELECT DISTINCT tenant_id FROM billiards_dwd.dwd_settlement_head WHERE tenant_id IS NOT NULL LIMIT 1"
rows = self.db.query(sql)
if rows:
value = dict(rows[0]).get('tenant_id')
if value is not None:
return int(value)
self.logger.warning("无法确定租户ID使用 0 继续执行")
return 0
def _load_params(self) -> Dict[str, float]:
"""Load index parameters with defaults and runtime overrides."""
params = self.load_index_parameters()
result = dict(self.DEFAULT_PARAMS)
result.update(params)
# GUI/环境变量可通过 run.index_lookback_days 覆盖 recency 窗口
override_days = self.config.get('run.index_lookback_days')
if override_days is not None:
try:
override_days_int = int(override_days)
if override_days_int < 7 or override_days_int > 180:
self.logger.warning(
"%s: run.index_lookback_days=%s 超出建议范围[7,180],已自动截断",
self.get_task_code(),
override_days,
)
override_days_int = max(7, min(180, override_days_int))
result['lookback_days_recency'] = float(override_days_int)
self.logger.info(
"%s: 使用回溯天数覆盖 lookback_days_recency=%d",
self.get_task_code(),
override_days_int,
)
except (TypeError, ValueError):
self.logger.warning(
"%s: run.index_lookback_days=%s is invalid; ignore override and use parameter table value",
self.get_task_code(),
override_days,
)
return result
def _build_visit_condition_sql(self) -> str:
"""Build visit-scope condition SQL."""
return """
(
s.settle_type = 1
OR (
s.settle_type = 3
AND EXISTS (
SELECT 1
FROM billiards_dwd.dwd_assistant_service_log asl
JOIN billiards_dws.cfg_skill_type st
ON asl.skill_id = st.skill_id
AND st.course_type_code = 'BONUS'
AND st.is_active = TRUE
WHERE asl.order_settle_id = s.order_settle_id
AND asl.site_id = s.site_id
AND asl.tenant_member_id = s.member_id
AND asl.is_delete = 0
)
)
)
"""
def _extract_visit_day_rows(
self,
site_id: int,
start_date: date,
end_date: date,
) -> List[Dict[str, Any]]:
"""提取到店记录(按天去重)"""
condition_sql = self._build_visit_condition_sql()
sql = f"""
WITH visit_source AS (
SELECT
COALESCE(NULLIF(s.member_id, 0), mca.tenant_member_id) AS canonical_member_id,
s.pay_time,
s.pay_amount
FROM billiards_dwd.dwd_settlement_head s
LEFT JOIN billiards_dwd.dim_member_card_account mca
ON s.member_card_account_id = mca.member_card_id
AND mca.scd2_is_current = 1
AND mca.register_site_id = s.site_id
AND COALESCE(mca.is_delete, 0) = 0
WHERE s.site_id = %s
AND s.pay_time >= %s
AND s.pay_time < %s + INTERVAL '1 day'
AND {condition_sql}
)
SELECT
canonical_member_id AS member_id,
DATE(pay_time) AS visit_date,
MAX(pay_time) AS last_visit_time,
SUM(COALESCE(pay_amount, 0)) AS day_pay_amount
FROM visit_source
WHERE canonical_member_id > 0
GROUP BY canonical_member_id, DATE(pay_time)
ORDER BY canonical_member_id, visit_date
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in (rows or [])]
def _extract_recharge_rows(
self,
site_id: int,
start_date: date,
end_date: date,
) -> Dict[int, Dict[str, Any]]:
"""提取充值记录近60天"""
sql = """
WITH recharge_source AS (
SELECT
COALESCE(NULLIF(r.member_id, 0), mca.tenant_member_id) AS canonical_member_id,
r.pay_time,
r.pay_amount
FROM billiards_dwd.dwd_recharge_order r
LEFT JOIN billiards_dwd.dim_member_card_account mca
ON r.tenant_member_card_id = mca.member_card_id
AND mca.scd2_is_current = 1
AND mca.register_site_id = r.site_id
AND COALESCE(mca.is_delete, 0) = 0
WHERE r.site_id = %s
AND r.settle_type = 5
AND r.pay_time >= %s
AND r.pay_time < %s + INTERVAL '1 day'
)
SELECT
canonical_member_id AS member_id,
MAX(pay_time) AS last_recharge_time,
SUM(COALESCE(pay_amount, 0)) AS recharge_60d_amt
FROM recharge_source
WHERE canonical_member_id > 0
GROUP BY canonical_member_id
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
result: Dict[int, Dict[str, Any]] = {}
for row in (rows or []):
row_dict = dict(row)
result[int(row_dict['member_id'])] = row_dict
return result
def _extract_member_create_times(self, member_ids: List[int]) -> Dict[int, datetime]:
"""提取会员建档时间"""
if not member_ids:
return {}
member_ids_str = ','.join(str(m) for m in member_ids)
sql = f"""
SELECT
member_id,
create_time
FROM billiards_dwd.dim_member
WHERE member_id IN ({member_ids_str})
AND scd2_is_current = 1
"""
rows = self.db.query(sql)
result = {}
for row in (rows or []):
row_dict = dict(row)
member_id = int(row_dict['member_id'])
create_time = row_dict.get('create_time')
if create_time:
result[member_id] = create_time
return result
def _extract_first_visit_times(self, site_id: int, member_ids: List[int]) -> Dict[int, datetime]:
"""提取首次到店时间(全量)"""
if not member_ids:
return {}
member_ids_str = ','.join(str(m) for m in member_ids)
condition_sql = self._build_visit_condition_sql()
sql = f"""
WITH visit_source AS (
SELECT
COALESCE(NULLIF(s.member_id, 0), mca.tenant_member_id) AS canonical_member_id,
s.pay_time
FROM billiards_dwd.dwd_settlement_head s
LEFT JOIN billiards_dwd.dim_member_card_account mca
ON s.member_card_account_id = mca.member_card_id
AND mca.scd2_is_current = 1
AND mca.register_site_id = s.site_id
AND COALESCE(mca.is_delete, 0) = 0
WHERE s.site_id = %s
AND {condition_sql}
)
SELECT
canonical_member_id AS member_id,
MIN(pay_time) AS first_visit_time
FROM visit_source
WHERE canonical_member_id IN ({member_ids_str})
GROUP BY canonical_member_id
"""
rows = self.db.query(sql, (site_id,))
result = {}
for row in (rows or []):
row_dict = dict(row)
member_id = int(row_dict['member_id'])
first_visit_time = row_dict.get('first_visit_time')
if first_visit_time:
result[member_id] = first_visit_time
return result
def _extract_sv_balances(self, site_id: int, tenant_id: int, member_ids: List[int]) -> Dict[int, Decimal]:
"""Fetch member stored-value card balances."""
if not member_ids:
return {}
member_ids_str = ','.join(str(m) for m in member_ids)
sql = f"""
SELECT
tenant_member_id AS member_id,
SUM(CASE WHEN card_type_id = %s THEN balance ELSE 0 END) AS sv_balance
FROM billiards_dwd.dim_member_card_account
WHERE tenant_id = %s
AND register_site_id = %s
AND scd2_is_current = 1
AND COALESCE(is_delete, 0) = 0
AND tenant_member_id IN ({member_ids_str})
GROUP BY tenant_member_id
"""
rows = self.db.query(sql, (self.CASH_CARD_TYPE_ID, tenant_id, site_id))
result: Dict[int, Decimal] = {}
for row in (rows or []):
row_dict = dict(row)
member_id = int(row_dict['member_id'])
result[member_id] = row_dict.get('sv_balance') or Decimal('0')
return result
def _build_member_activity(
self,
site_id: int,
tenant_id: int,
params: Dict[str, float],
) -> Dict[int, MemberActivityData]:
"""构建会员活动特征"""
now = datetime.now(self.tz)
base_date = now.date()
visit_lookback_days = int(params.get('visit_lookback_days', self.DEFAULT_VISIT_LOOKBACK_DAYS))
recency_days = int(params.get('lookback_days_recency', self.DEFAULT_RECENCY_LOOKBACK_DAYS))
visit_start_date = base_date - timedelta(days=visit_lookback_days)
visit_rows = self._extract_visit_day_rows(site_id, visit_start_date, base_date)
member_day_rows: Dict[int, List[Dict[str, Any]]] = {}
for row in (visit_rows or []):
member_id = int(row['member_id'])
member_day_rows.setdefault(member_id, []).append(row)
recharge_start_date = base_date - timedelta(days=recency_days)
recharge_rows = self._extract_recharge_rows(site_id, recharge_start_date, base_date)
member_ids = set(member_day_rows.keys()) | set(recharge_rows.keys())
if not member_ids:
return {}
member_id_list = list(member_ids)
member_create_times = self._extract_member_create_times(member_id_list)
first_visit_times = self._extract_first_visit_times(site_id, member_id_list)
sv_balances = self._extract_sv_balances(site_id, tenant_id, member_id_list)
results: Dict[int, MemberActivityData] = {}
for member_id in member_ids:
data = MemberActivityData(
member_id=member_id,
site_id=site_id,
tenant_id=tenant_id,
)
day_rows = member_day_rows.get(member_id, [])
if day_rows:
day_rows_sorted = sorted(day_rows, key=lambda x: x['visit_date'])
data.visits_total = len(day_rows_sorted)
last_visit_time = max(r.get('last_visit_time') for r in day_rows_sorted)
data.last_visit_time = last_visit_time
# 近14/60天到店次数
days_14_ago = base_date - timedelta(days=14)
days_60_ago = base_date - timedelta(days=60)
for r in day_rows_sorted:
visit_date = r.get('visit_date')
if visit_date is None:
continue
if visit_date >= days_14_ago:
data.visits_14d += 1
if visit_date >= days_60_ago:
data.visits_60d += 1
# 消费金额
days_30_ago = base_date - timedelta(days=30)
for r in day_rows_sorted:
visit_date = r.get('visit_date')
day_pay = float(r.get('day_pay_amount') or 0)
data.spend_180d += day_pay
if visit_date and visit_date >= days_30_ago:
data.spend_30d += day_pay
# 计算到店间隔(按天)
visit_dates = [r.get('visit_date') for r in day_rows_sorted if r.get('visit_date')]
intervals: List[float] = []
interval_ages_days: List[int] = []
for i in range(1, len(visit_dates)):
interval = (visit_dates[i] - visit_dates[i - 1]).days
intervals.append(float(min(recency_days, interval)))
interval_ages_days.append(max(0, (base_date - visit_dates[i]).days))
data.intervals = intervals
data.interval_ages_days = interval_ages_days
data.interval_count = len(intervals)
recharge_info = recharge_rows.get(member_id)
if recharge_info:
data.last_recharge_time = recharge_info.get('last_recharge_time')
data.recharge_60d_amt = float(recharge_info.get('recharge_60d_amt') or 0)
data.member_create_time = member_create_times.get(member_id)
data.first_visit_time = first_visit_times.get(member_id)
sv_balance = sv_balances.get(member_id)
if sv_balance is not None:
data.sv_balance = float(sv_balance)
# 时间差计算
if data.first_visit_time:
data.days_since_first_visit = (base_date - data.first_visit_time.date()).days
if data.last_visit_time:
data.days_since_last_visit = (base_date - data.last_visit_time.date()).days
if data.last_recharge_time:
data.days_since_last_recharge = (base_date - data.last_recharge_time.date()).days
# tV/tR/tA
data.t_v = float(min(recency_days, data.days_since_last_visit)) if data.days_since_last_visit is not None else float(recency_days)
data.t_r = float(min(recency_days, data.days_since_last_recharge)) if data.days_since_last_recharge is not None else float(recency_days)
data.t_a = float(min(data.t_v, data.t_r))
# 充值是否未回访
if data.last_recharge_time and (data.last_visit_time is None or data.last_recharge_time > data.last_visit_time):
data.recharge_unconsumed = 1
results[member_id] = data
return results
def classify_segment(
self,
data: MemberActivityData,
params: Dict[str, float],
) -> Tuple[str, str, bool]:
"""Classify member into NEW/OLD/STOP buckets."""
recency_days = int(params.get('lookback_days_recency', self.DEFAULT_RECENCY_LOOKBACK_DAYS))
enable_stop_exception = int(params.get('enable_stop_high_balance_exception', 0)) == 1
high_balance_threshold = float(params.get('high_balance_threshold', 1000))
if data.t_a >= recency_days:
if enable_stop_exception and data.sv_balance >= high_balance_threshold:
return "STOP", "STOP_HIGH_BALANCE", True
return "STOP", "STOP", False
new_visit_threshold = int(params.get('new_visit_threshold', 2))
new_days_threshold = int(params.get('new_days_threshold', 30))
recharge_recent_days = int(params.get('recharge_recent_days', 14))
new_recharge_max_visits = int(params.get('new_recharge_max_visits', 10))
is_new_by_visits = data.visits_total <= new_visit_threshold
is_new_by_first_visit = data.days_since_first_visit is not None and data.days_since_first_visit <= new_days_threshold
is_new_by_recharge = (
data.recharge_unconsumed == 1
and data.days_since_last_recharge is not None
and data.days_since_last_recharge <= recharge_recent_days
and data.visits_total <= new_recharge_max_visits
)
if is_new_by_visits or is_new_by_first_visit or is_new_by_recharge:
return "NEW", "NEW", True
return "OLD", "OLD", True