Files
ZQYY.FQ-ETL/tasks/dws/index/newconv_index_task.py

382 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
新客转化指数NCI计算任务。"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from .member_index_base import MemberActivityData, MemberIndexBaseTask
from ..base_dws_task import TaskContext
@dataclass
class MemberNewconvData:
activity: MemberActivityData
status: str
segment: str
need_new: float = 0.0
salvage_new: float = 0.0
recharge_new: float = 0.0
value_new: float = 0.0
welcome_new: float = 0.0
raw_score_welcome: Optional[float] = None
raw_score_convert: Optional[float] = None
raw_score: Optional[float] = None
display_score_welcome: Optional[float] = None
display_score_convert: Optional[float] = None
display_score: Optional[float] = None
class NewconvIndexTask(MemberIndexBaseTask):
"""新客转化指数NCI计算任务。"""
INDEX_TYPE = "NCI"
DEFAULT_PARAMS = {
# 通用参数
'lookback_days_recency': 60,
'visit_lookback_days': 180,
'percentile_lower': 5,
'percentile_upper': 95,
'compression_mode': 0,
'use_smoothing': 1,
'ewma_alpha': 0.2,
# 分流参数
'new_visit_threshold': 2,
'new_days_threshold': 30,
'recharge_recent_days': 14,
'new_recharge_max_visits': 10,
# NCI参数
'no_touch_days_new': 3,
't2_target_days': 7,
'salvage_start': 30,
'salvage_end': 60,
'welcome_window_days': 3,
'active_new_visit_threshold_14d': 2,
'active_new_recency_days': 7,
'active_new_penalty': 0.2,
'h_recharge': 7,
'amount_base_M0': 300,
'balance_base_B0': 500,
'value_w_spend': 1.0,
'value_w_bal': 0.8,
'w_welcome': 1.0,
'w_need': 1.6,
'w_re': 0.8,
'w_value': 1.0,
# STOP高余额例外默认关闭
'enable_stop_high_balance_exception': 0,
'high_balance_threshold': 1000,
}
def get_task_code(self) -> str:
return "DWS_NEWCONV_INDEX"
def get_target_table(self) -> str:
return "dws_member_newconv_index"
def get_primary_keys(self) -> List[str]:
return ['site_id', 'member_id']
def get_index_type(self) -> str:
return self.INDEX_TYPE
def execute(self, context: Optional[TaskContext]) -> Dict[str, Any]:
"""执行 NCI 计算"""
self.logger.info("开始计算新客转化指数(NCI)")
site_id = self._get_site_id(context)
tenant_id = self._get_tenant_id()
params = self._load_params()
activity_map = self._build_member_activity(site_id, tenant_id, params)
if not activity_map:
self.logger.warning("No member activity data available; skip calculation")
return {'status': 'skipped', 'reason': 'no_data'}
newconv_list: List[MemberNewconvData] = []
for activity in activity_map.values():
segment, status, in_scope = self.classify_segment(activity, params)
if not in_scope:
continue
if segment != "NEW":
continue
data = MemberNewconvData(activity=activity, status=status, segment=segment)
self._calculate_nci_scores(data, params)
newconv_list.append(data)
if not newconv_list:
self.logger.warning("No new-member rows to calculate")
return {'status': 'skipped', 'reason': 'no_new_members'}
# 归一化 Display Score
raw_scores = [
(d.activity.member_id, d.raw_score)
for d in newconv_list
if d.raw_score is not None
]
if raw_scores:
use_smoothing = int(params.get('use_smoothing', 1)) == 1
total_score_map = self._normalize_score_pairs(
raw_scores,
params=params,
site_id=site_id,
use_smoothing=use_smoothing,
)
for data in newconv_list:
if data.activity.member_id in total_score_map:
data.display_score = total_score_map[data.activity.member_id]
raw_scores_welcome = [
(d.activity.member_id, d.raw_score_welcome)
for d in newconv_list
if d.raw_score_welcome is not None
]
welcome_score_map = self._normalize_score_pairs(
raw_scores_welcome,
params=params,
site_id=site_id,
use_smoothing=False,
)
for data in newconv_list:
if data.activity.member_id in welcome_score_map:
data.display_score_welcome = welcome_score_map[data.activity.member_id]
raw_scores_convert = [
(d.activity.member_id, d.raw_score_convert)
for d in newconv_list
if d.raw_score_convert is not None
]
convert_score_map = self._normalize_score_pairs(
raw_scores_convert,
params=params,
site_id=site_id,
use_smoothing=False,
)
for data in newconv_list:
if data.activity.member_id in convert_score_map:
data.display_score_convert = convert_score_map[data.activity.member_id]
# 保存分位点历史
all_raw = [float(score) for _, score in raw_scores]
q_l, q_u = self.calculate_percentiles(
all_raw,
int(params['percentile_lower']),
int(params['percentile_upper'])
)
if use_smoothing:
smoothed_l, smoothed_u = self._apply_ewma_smoothing(site_id, q_l, q_u)
else:
smoothed_l, smoothed_u = q_l, q_u
self.save_percentile_history(
site_id=site_id,
percentile_5=q_l,
percentile_95=q_u,
percentile_5_smoothed=smoothed_l,
percentile_95_smoothed=smoothed_u,
record_count=len(all_raw),
min_raw=min(all_raw),
max_raw=max(all_raw),
avg_raw=sum(all_raw) / len(all_raw)
)
inserted = self._save_newconv_data(newconv_list)
self.logger.info("NCI calculation finished, inserted %d rows", inserted)
return {
'status': 'success',
'member_count': len(newconv_list),
'records_inserted': inserted
}
def _calculate_nci_scores(self, data: MemberNewconvData, params: Dict[str, float]) -> None:
"""计算 NCI 分项与 Raw Score"""
activity = data.activity
# 1) 紧迫度
no_touch_days = float(params['no_touch_days_new'])
t2_target_days = float(params['t2_target_days'])
t2_max_days = t2_target_days * 2.0
if t2_max_days <= no_touch_days:
data.need_new = 0.0
else:
data.need_new = self._clip(
(activity.t_v - no_touch_days) / (t2_max_days - no_touch_days),
0.0, 1.0
)
# 2) Salvage30-60天线性衰减
salvage_start = float(params['salvage_start'])
salvage_end = float(params['salvage_end'])
if salvage_end <= salvage_start:
data.salvage_new = 0.0
elif activity.t_a <= salvage_start:
data.salvage_new = 1.0
elif activity.t_a >= salvage_end:
data.salvage_new = 0.0
else:
data.salvage_new = (salvage_end - activity.t_a) / (salvage_end - salvage_start)
# 3) 充值未回访压力
if activity.recharge_unconsumed == 1:
data.recharge_new = self.decay(activity.t_r, params['h_recharge'])
else:
data.recharge_new = 0.0
# 4) 价值分
m0 = float(params['amount_base_M0'])
b0 = float(params['balance_base_B0'])
spend_score = math.log1p(activity.spend_180d / m0) if m0 > 0 else 0.0
bal_score = math.log1p(activity.sv_balance / b0) if b0 > 0 else 0.0
data.value_new = float(params['value_w_spend']) * spend_score + float(params['value_w_bal']) * bal_score
# 5) 欢迎建联分:优先首访后立即触达
welcome_window_days = float(params.get('welcome_window_days', 3))
data.welcome_new = 0.0
if welcome_window_days > 0 and activity.visits_total <= 1 and activity.t_v <= welcome_window_days:
data.welcome_new = self._clip(1.0 - (activity.t_v / welcome_window_days), 0.0, 1.0)
# 6) 抑制高活跃新客在转化召回排名中的权重
active_visit_threshold = int(params.get('active_new_visit_threshold_14d', 2))
active_recency_days = float(params.get('active_new_recency_days', 7))
active_penalty = float(params.get('active_new_penalty', 0.2))
if activity.visits_14d >= active_visit_threshold and activity.t_v <= active_recency_days:
active_multiplier = self._clip(active_penalty, 0.0, 1.0)
else:
active_multiplier = 1.0
# 7) 价值/充值分主要在进入免打扰窗口后生效
if no_touch_days > 0:
touch_multiplier = self._clip(activity.t_v / no_touch_days, 0.0, 1.0)
else:
touch_multiplier = 1.0
data.raw_score_welcome = float(params.get('w_welcome', 1.0)) * data.welcome_new
data.raw_score_convert = active_multiplier * (
float(params['w_need']) * (data.need_new * data.salvage_new)
+ float(params['w_re']) * data.recharge_new * touch_multiplier
+ float(params['w_value']) * data.value_new * touch_multiplier
)
data.raw_score_welcome = max(0.0, data.raw_score_welcome)
data.raw_score_convert = max(0.0, data.raw_score_convert)
data.raw_score = data.raw_score_welcome + data.raw_score_convert
if data.raw_score < 0:
data.raw_score = 0.0
def _save_newconv_data(self, data_list: List[MemberNewconvData]) -> int:
"""保存 NCI 数据"""
if not data_list:
return 0
site_id = data_list[0].activity.site_id
# 按门店全量刷新,避免因分群变化导致过期数据残留。
delete_sql = """
DELETE FROM billiards_dws.dws_member_newconv_index
WHERE site_id = %s
"""
with self.db.conn.cursor() as cur:
cur.execute(delete_sql, (site_id,))
insert_sql = """
INSERT INTO billiards_dws.dws_member_newconv_index (
site_id, tenant_id, member_id,
status, segment,
member_create_time, first_visit_time, last_visit_time, last_recharge_time,
t_v, t_r, t_a,
visits_14d, visits_60d, visits_total,
spend_30d, spend_180d, sv_balance, recharge_60d_amt,
interval_count,
need_new, salvage_new, recharge_new, value_new,
welcome_new,
raw_score_welcome, raw_score_convert, raw_score,
display_score_welcome, display_score_convert, display_score,
last_wechat_touch_time,
calc_time, created_at, updated_at
) VALUES (
%s, %s, %s,
%s, %s,
%s, %s, %s, %s,
%s, %s, %s,
%s, %s, %s,
%s, %s, %s, %s,
%s,
%s, %s, %s, %s,
%s,
%s, %s, %s,
%s, %s, %s,
%s,
NOW(), NOW(), NOW()
)
"""
inserted = 0
with self.db.conn.cursor() as cur:
for data in data_list:
activity = data.activity
cur.execute(insert_sql, (
activity.site_id, activity.tenant_id, activity.member_id,
data.status, data.segment,
activity.member_create_time, activity.first_visit_time, activity.last_visit_time, activity.last_recharge_time,
activity.t_v, activity.t_r, activity.t_a,
activity.visits_14d, activity.visits_60d, activity.visits_total,
activity.spend_30d, activity.spend_180d, activity.sv_balance, activity.recharge_60d_amt,
activity.interval_count,
data.need_new, data.salvage_new, data.recharge_new, data.value_new,
data.welcome_new,
data.raw_score_welcome, data.raw_score_convert, data.raw_score,
data.display_score_welcome, data.display_score_convert, data.display_score,
None,
))
inserted += cur.rowcount
self.db.conn.commit()
return inserted
def _clip(self, value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def _map_compression(self, params: Dict[str, float]) -> str:
mode = int(params.get('compression_mode', 0))
if mode == 1:
return "log1p"
if mode == 2:
return "asinh"
return "none"
def _normalize_score_pairs(
self,
raw_scores: List[tuple[int, Optional[float]]],
params: Dict[str, float],
site_id: int,
use_smoothing: bool,
) -> Dict[int, float]:
valid_scores = [(member_id, float(score)) for member_id, score in raw_scores if score is not None]
if not valid_scores:
return {}
# 全为0时直接返回避免 MinMax 归一化退化
if all(abs(score) <= 1e-9 for _, score in valid_scores):
return {member_id: 0.0 for member_id, _ in valid_scores}
compression = self._map_compression(params)
normalized = self.batch_normalize_to_display(
valid_scores,
compression=compression,
percentile_lower=int(params['percentile_lower']),
percentile_upper=int(params['percentile_upper']),
use_smoothing=use_smoothing,
site_id=site_id
)
return {member_id: display for member_id, _, display in normalized}
__all__ = ['NewconvIndexTask']