689 lines
26 KiB
Python
689 lines
26 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
客户-助教亲密指数计算任务
|
||
|
||
功能说明:
|
||
- 衡量客户与助教的关系强度和近期温度
|
||
- 用于助教约课精力分配和约课成功率预估
|
||
- 附加课权重 = 基础课的1.5倍
|
||
- 检测频率激增并放大权重
|
||
|
||
算法公式:
|
||
Raw Score = (w_F × F + w_R × R + w_M × M + w_D × D) × mult
|
||
|
||
其中:
|
||
- F = Σ(τ_i × decay(d_i, h_sess)) # 频次强度
|
||
- R = decay(d_last, h_last) # 最近温度
|
||
- M = Σ(ln(1+amt/A0) × decay(d_r, h_pay)) # 归因充值强度
|
||
- D = Σ(sqrt(dur/60) × τ × decay(d, h)) # 时长贡献
|
||
- mult = 1 + γ × burst # 激增放大
|
||
- burst = max(0, ln(1 + (F_short/F_long - 1)))
|
||
|
||
特殊逻辑:
|
||
- 会话合并:同一客人对同一助教,间隔<4小时算同次服务
|
||
- 充值归因:服务结束后1小时内的充值算做该助教贡献
|
||
|
||
数据来源:
|
||
- dwd_assistant_service_log: 服务记录
|
||
- dwd_recharge_order: 充值记录
|
||
|
||
更新频率:每4小时
|
||
|
||
作者:ETL团队
|
||
创建日期:2026-02-03
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import math
|
||
from dataclasses import dataclass, field
|
||
from datetime import date, datetime, timedelta
|
||
from decimal import Decimal
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
from .base_index_task import BaseIndexTask, PercentileHistory
|
||
from ..base_dws_task import TaskContext
|
||
|
||
|
||
# =============================================================================
|
||
# 数据类定义
|
||
# =============================================================================
|
||
|
||
@dataclass
|
||
class ServiceSession:
|
||
"""合并后的服务会话"""
|
||
session_start: datetime
|
||
session_end: datetime
|
||
total_duration_minutes: int = 0
|
||
course_weight: float = 1.0 # 1.0=基础课, 1.5=附加课
|
||
is_incentive: bool = False # 是否为附加课
|
||
|
||
|
||
@dataclass
|
||
class AttributedRecharge:
|
||
"""归因充值"""
|
||
pay_time: datetime
|
||
pay_amount: float
|
||
days_ago: float
|
||
|
||
|
||
@dataclass
|
||
class MemberAssistantIntimacyData:
|
||
"""客户-助教亲密数据"""
|
||
member_id: int
|
||
assistant_no: str # 助教工号(字符串,如 "1", "2", "15")
|
||
assistant_nickname: str # 助教昵称
|
||
site_id: int
|
||
tenant_id: int
|
||
|
||
# 计算输入特征
|
||
session_count: int = 0
|
||
total_duration_minutes: int = 0
|
||
basic_session_count: int = 0
|
||
incentive_session_count: int = 0
|
||
days_since_last_session: Optional[int] = None
|
||
attributed_recharge_count: int = 0
|
||
attributed_recharge_amount: float = 0.0
|
||
|
||
# 分项得分
|
||
score_frequency: float = 0.0
|
||
score_recency: float = 0.0
|
||
score_recharge: float = 0.0
|
||
score_duration: float = 0.0
|
||
burst_multiplier: float = 1.0
|
||
|
||
# 最终分数
|
||
raw_score: float = 0.0
|
||
display_score: float = 0.0
|
||
|
||
# 中间数据
|
||
sessions: List[ServiceSession] = field(default_factory=list)
|
||
recharges: List[AttributedRecharge] = field(default_factory=list)
|
||
|
||
|
||
# =============================================================================
|
||
# 亲密指数任务
|
||
# =============================================================================
|
||
|
||
class IntimacyIndexTask(BaseIndexTask):
|
||
"""
|
||
客户-助教亲密指数计算任务
|
||
|
||
计算流程:
|
||
1. 提取近60天的助教服务记录
|
||
2. 按(member_id, assistant_id)分组,合并4小时内的服务
|
||
3. 提取归因充值(服务结束后1小时内)
|
||
4. 计算5项分数(频次、最近、充值、时长、激增)
|
||
5. 汇总Raw Score
|
||
6. 分位截断 + Log压缩 + MinMax映射到0-10
|
||
7. 写入DWS表
|
||
"""
|
||
|
||
INDEX_TYPE = "INTIMACY"
|
||
|
||
# 技能ID映射
|
||
SKILL_ID_BASIC = 2790683529513797 # 基础课
|
||
SKILL_ID_INCENTIVE = 2790683529513798 # 附加课/激励课
|
||
SKILL_ID_BOX = 3039912271463941 # 包厢课
|
||
|
||
# 默认参数
|
||
DEFAULT_PARAMS = {
|
||
'lookback_days': 60,
|
||
'halflife_session': 14.0,
|
||
'halflife_last': 10.0,
|
||
'halflife_recharge': 21.0,
|
||
'halflife_short': 7.0,
|
||
'halflife_long': 30.0,
|
||
'amount_base': 500.0,
|
||
'incentive_weight': 1.5,
|
||
'session_merge_hours': 4,
|
||
'recharge_attribute_hours': 1,
|
||
'weight_frequency': 2.0,
|
||
'weight_recency': 1.5,
|
||
'weight_recharge': 2.0,
|
||
'weight_duration': 0.5,
|
||
'burst_gamma': 0.6,
|
||
'percentile_lower': 5,
|
||
'percentile_upper': 95,
|
||
}
|
||
|
||
# ==========================================================================
|
||
# 抽象方法实现
|
||
# ==========================================================================
|
||
|
||
def get_task_code(self) -> str:
|
||
return "DWS_INTIMACY_INDEX"
|
||
|
||
def get_target_table(self) -> str:
|
||
return "dws_member_assistant_intimacy"
|
||
|
||
def get_primary_keys(self) -> List[str]:
|
||
return ['site_id', 'member_id', 'assistant_id']
|
||
|
||
def get_index_type(self) -> str:
|
||
return self.INDEX_TYPE
|
||
|
||
# ==========================================================================
|
||
# 任务执行
|
||
# ==========================================================================
|
||
|
||
def execute(self, context: Optional[TaskContext]) -> Dict[str, Any]:
|
||
"""执行亲密指数计算"""
|
||
self.logger.info("开始计算客户-助教亲密指数")
|
||
|
||
# 获取门店ID
|
||
site_id = self._get_site_id(context)
|
||
tenant_id = self._get_tenant_id()
|
||
|
||
# 加载参数
|
||
params = self._load_params()
|
||
lookback_days = int(params['lookback_days'])
|
||
|
||
# 计算基准日期和时间
|
||
now = datetime.now(self.tz)
|
||
base_date = now.date()
|
||
start_datetime = now - timedelta(days=lookback_days)
|
||
|
||
self.logger.info(
|
||
"参数: lookback=%d天, h_sess=%.1f, h_last=%.1f, h_pay=%.1f, γ=%.2f",
|
||
lookback_days, params['halflife_session'], params['halflife_last'],
|
||
params['halflife_recharge'], params['burst_gamma']
|
||
)
|
||
|
||
# 1. 提取服务记录
|
||
raw_services = self._extract_service_records(site_id, start_datetime, now)
|
||
self.logger.info("提取到 %d 条原始服务记录", len(raw_services))
|
||
|
||
if not raw_services:
|
||
self.logger.warning("没有服务记录,跳过计算")
|
||
return {'status': 'skipped', 'reason': 'no_data'}
|
||
|
||
# 2. 按(member_id, assistant_id)分组并合并会话
|
||
pair_data = self._group_and_merge_sessions(raw_services, params, now)
|
||
self.logger.info("合并为 %d 个客户-助教对", len(pair_data))
|
||
|
||
# 3. 提取归因充值
|
||
self._extract_attributed_recharges(site_id, pair_data, params, now)
|
||
|
||
# 4. 计算每个pair的特征和分数
|
||
intimacy_data_list: List[MemberAssistantIntimacyData] = []
|
||
|
||
for key, data in pair_data.items():
|
||
data.site_id = site_id
|
||
data.tenant_id = tenant_id
|
||
|
||
# 计算分项得分
|
||
self._calculate_component_scores(data, params, now)
|
||
|
||
# 汇总Raw Score
|
||
base_score = (
|
||
params['weight_frequency'] * data.score_frequency +
|
||
params['weight_recency'] * data.score_recency +
|
||
params['weight_recharge'] * data.score_recharge +
|
||
params['weight_duration'] * data.score_duration
|
||
)
|
||
data.raw_score = base_score * data.burst_multiplier
|
||
|
||
intimacy_data_list.append(data)
|
||
|
||
self.logger.info("计算完成 %d 个pair的Raw Score", len(intimacy_data_list))
|
||
|
||
# 5. 归一化到Display Score(使用对数压缩)
|
||
raw_scores = [((d.member_id, d.assistant_no), d.raw_score) for d in intimacy_data_list]
|
||
normalized = self.batch_normalize_to_display(
|
||
raw_scores,
|
||
use_log=True, # 亲密指数建议使用对数压缩
|
||
percentile_lower=int(params['percentile_lower']),
|
||
percentile_upper=int(params['percentile_upper']),
|
||
use_smoothing=True,
|
||
site_id=site_id
|
||
)
|
||
|
||
# 更新display_score
|
||
score_map = {key: (raw, display) for key, raw, display in normalized}
|
||
for data in intimacy_data_list:
|
||
key = (data.member_id, data.assistant_no)
|
||
if key in score_map:
|
||
_, data.display_score = score_map[key]
|
||
|
||
# 6. 保存分位点历史
|
||
if intimacy_data_list:
|
||
all_raw = [d.raw_score for d in intimacy_data_list]
|
||
q_l, q_u = self.calculate_percentiles(
|
||
all_raw,
|
||
int(params['percentile_lower']),
|
||
int(params['percentile_upper'])
|
||
)
|
||
smoothed_l, smoothed_u = self._apply_ewma_smoothing(site_id, q_l, q_u)
|
||
|
||
self.save_percentile_history(
|
||
site_id=site_id,
|
||
percentile_5=q_l,
|
||
percentile_95=q_u,
|
||
percentile_5_smoothed=smoothed_l,
|
||
percentile_95_smoothed=smoothed_u,
|
||
record_count=len(all_raw),
|
||
min_raw=min(all_raw),
|
||
max_raw=max(all_raw),
|
||
avg_raw=sum(all_raw) / len(all_raw)
|
||
)
|
||
|
||
# 7. 写入DWS表
|
||
inserted = self._save_intimacy_data(intimacy_data_list)
|
||
|
||
self.logger.info("亲密指数计算完成,写入 %d 条记录", inserted)
|
||
|
||
return {
|
||
'status': 'success',
|
||
'pair_count': len(intimacy_data_list),
|
||
'records_inserted': inserted
|
||
}
|
||
|
||
# ==========================================================================
|
||
# 数据提取方法
|
||
# ==========================================================================
|
||
|
||
def _extract_service_records(
|
||
self,
|
||
site_id: int,
|
||
start_datetime: datetime,
|
||
end_datetime: datetime
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
提取服务记录
|
||
|
||
注意: 使用 assistant_no (助教工号) 作为助教标识,而不是 site_assistant_id
|
||
因为 site_assistant_id 在数据中是每次服务的唯一ID,不是助教的唯一标识
|
||
|
||
Returns:
|
||
[{'member_id', 'assistant_no', 'assistant_nickname', 'start_time', 'end_time', 'duration_minutes', 'skill_id'}, ...]
|
||
"""
|
||
sql = """
|
||
SELECT
|
||
tenant_member_id AS member_id,
|
||
assistant_no,
|
||
nickname AS assistant_nickname,
|
||
start_use_time,
|
||
last_use_time,
|
||
COALESCE(income_seconds, 0) / 60 AS duration_minutes,
|
||
skill_id
|
||
FROM billiards_dwd.dwd_assistant_service_log
|
||
WHERE site_id = %s
|
||
AND tenant_member_id > 0 -- 排除散客
|
||
AND is_delete = 0
|
||
AND assistant_no IS NOT NULL -- 确保有助教工号
|
||
AND last_use_time >= %s
|
||
AND last_use_time < %s
|
||
ORDER BY tenant_member_id, assistant_no, start_use_time
|
||
"""
|
||
|
||
rows = self.db.query(sql, (site_id, start_datetime, end_datetime))
|
||
|
||
result = []
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
# 使用 assistant_no 作为助教标识
|
||
assistant_no = row_dict['assistant_no']
|
||
if assistant_no:
|
||
result.append({
|
||
'member_id': int(row_dict['member_id']),
|
||
'assistant_no': str(assistant_no), # 助教工号(字符串)
|
||
'assistant_nickname': row_dict['assistant_nickname'] or '',
|
||
'start_time': row_dict['start_use_time'],
|
||
'end_time': row_dict['last_use_time'],
|
||
'duration_minutes': int(row_dict['duration_minutes'] or 0),
|
||
'skill_id': int(row_dict['skill_id'] or 0)
|
||
})
|
||
|
||
return result
|
||
|
||
def _group_and_merge_sessions(
|
||
self,
|
||
raw_services: List[Dict[str, Any]],
|
||
params: Dict[str, float],
|
||
now: datetime
|
||
) -> Dict[Tuple[int, str], MemberAssistantIntimacyData]:
|
||
"""
|
||
按(member_id, assistant_no)分组并合并会话
|
||
|
||
合并逻辑:同一客人对同一助教,间隔<4小时算同次服务
|
||
"""
|
||
merge_threshold_hours = int(params['session_merge_hours'])
|
||
merge_threshold = timedelta(hours=merge_threshold_hours)
|
||
incentive_weight = params['incentive_weight']
|
||
|
||
pair_data: Dict[Tuple[int, str], MemberAssistantIntimacyData] = {}
|
||
|
||
# 按pair分组(使用assistant_no)
|
||
pair_services: Dict[Tuple[int, str], List[Dict[str, Any]]] = {}
|
||
for svc in raw_services:
|
||
key = (svc['member_id'], svc['assistant_no'])
|
||
if key not in pair_services:
|
||
pair_services[key] = []
|
||
pair_services[key].append(svc)
|
||
|
||
# 对每个pair合并会话
|
||
for key, services in pair_services.items():
|
||
member_id, assistant_no = key
|
||
# 取第一个服务记录的昵称
|
||
assistant_nickname = services[0]['assistant_nickname'] if services else ''
|
||
|
||
data = MemberAssistantIntimacyData(
|
||
member_id=member_id,
|
||
assistant_no=assistant_no,
|
||
assistant_nickname=assistant_nickname,
|
||
site_id=0, # 稍后填充
|
||
tenant_id=0
|
||
)
|
||
|
||
# 按开始时间排序
|
||
sorted_services = sorted(services, key=lambda x: x['start_time'])
|
||
|
||
# 合并会话
|
||
current_session: Optional[ServiceSession] = None
|
||
|
||
for svc in sorted_services:
|
||
start_time = svc['start_time']
|
||
end_time = svc['end_time']
|
||
duration = svc['duration_minutes']
|
||
skill_id = svc['skill_id']
|
||
|
||
# 判断课型
|
||
is_incentive = (skill_id == self.SKILL_ID_INCENTIVE)
|
||
weight = incentive_weight if is_incentive else 1.0
|
||
|
||
if current_session is None:
|
||
# 开始新会话
|
||
current_session = ServiceSession(
|
||
session_start=start_time,
|
||
session_end=end_time,
|
||
total_duration_minutes=duration,
|
||
course_weight=weight,
|
||
is_incentive=is_incentive
|
||
)
|
||
elif start_time - current_session.session_end <= merge_threshold:
|
||
# 合并到当前会话
|
||
current_session.session_end = max(current_session.session_end, end_time)
|
||
current_session.total_duration_minutes += duration
|
||
# 同次服务取最高权重
|
||
current_session.course_weight = max(current_session.course_weight, weight)
|
||
current_session.is_incentive = current_session.is_incentive or is_incentive
|
||
else:
|
||
# 保存当前会话,开始新会话
|
||
data.sessions.append(current_session)
|
||
current_session = ServiceSession(
|
||
session_start=start_time,
|
||
session_end=end_time,
|
||
total_duration_minutes=duration,
|
||
course_weight=weight,
|
||
is_incentive=is_incentive
|
||
)
|
||
|
||
# 保存最后一个会话
|
||
if current_session is not None:
|
||
data.sessions.append(current_session)
|
||
|
||
# 统计特征
|
||
data.session_count = len(data.sessions)
|
||
data.total_duration_minutes = sum(s.total_duration_minutes for s in data.sessions)
|
||
data.basic_session_count = sum(1 for s in data.sessions if not s.is_incentive)
|
||
data.incentive_session_count = sum(1 for s in data.sessions if s.is_incentive)
|
||
|
||
# 最近一次服务
|
||
if data.sessions:
|
||
last_session = max(data.sessions, key=lambda s: s.session_end)
|
||
data.days_since_last_session = (now - last_session.session_end).days
|
||
|
||
pair_data[key] = data
|
||
|
||
return pair_data
|
||
|
||
def _extract_attributed_recharges(
|
||
self,
|
||
site_id: int,
|
||
pair_data: Dict[Tuple[int, int], MemberAssistantIntimacyData],
|
||
params: Dict[str, float],
|
||
now: datetime
|
||
) -> None:
|
||
"""
|
||
提取归因充值
|
||
|
||
归因逻辑:服务结束后1小时内的充值算做该助教贡献
|
||
"""
|
||
attribution_hours = int(params['recharge_attribute_hours'])
|
||
attribution_window = timedelta(hours=attribution_hours)
|
||
|
||
# 获取所有相关会员ID
|
||
member_ids = set(key[0] for key in pair_data.keys())
|
||
if not member_ids:
|
||
return
|
||
|
||
member_ids_str = ','.join(str(m) for m in member_ids)
|
||
|
||
# 查询充值记录
|
||
sql = f"""
|
||
SELECT
|
||
member_id,
|
||
pay_time,
|
||
pay_amount
|
||
FROM billiards_dwd.dwd_recharge_order
|
||
WHERE site_id = %s
|
||
AND member_id IN ({member_ids_str})
|
||
AND settle_type = 5 -- 充值订单
|
||
AND pay_time >= %s
|
||
"""
|
||
|
||
lookback_days = int(params['lookback_days'])
|
||
start_datetime = now - timedelta(days=lookback_days)
|
||
|
||
rows = self.db.query(sql, (site_id, start_datetime))
|
||
|
||
# 为每个充值找到归因助教
|
||
for row in (rows or []):
|
||
row_dict = dict(row)
|
||
member_id = int(row_dict['member_id'])
|
||
pay_time = row_dict['pay_time']
|
||
pay_amount = float(row_dict['pay_amount'] or 0)
|
||
|
||
if pay_amount <= 0:
|
||
continue
|
||
|
||
# 查找该会员在pay_time前1小时内结束服务的助教
|
||
for key, data in pair_data.items():
|
||
if key[0] != member_id:
|
||
continue
|
||
|
||
for session in data.sessions:
|
||
# 服务结束后1小时内的充值
|
||
if (session.session_end <= pay_time and
|
||
pay_time - session.session_end <= attribution_window):
|
||
# 归因给这个助教
|
||
data.attributed_recharge_count += 1
|
||
data.attributed_recharge_amount += pay_amount
|
||
data.recharges.append(AttributedRecharge(
|
||
pay_time=pay_time,
|
||
pay_amount=pay_amount,
|
||
days_ago=(now - pay_time).total_seconds() / 86400
|
||
))
|
||
break # 一笔充值只归因给一个助教
|
||
|
||
# ==========================================================================
|
||
# 分数计算方法
|
||
# ==========================================================================
|
||
|
||
def _calculate_component_scores(
|
||
self,
|
||
data: MemberAssistantIntimacyData,
|
||
params: Dict[str, float],
|
||
now: datetime
|
||
) -> None:
|
||
"""计算5项分数"""
|
||
epsilon = 1e-6
|
||
|
||
h_sess = params['halflife_session']
|
||
h_last = params['halflife_last']
|
||
h_pay = params['halflife_recharge']
|
||
h_short = params['halflife_short']
|
||
h_long = params['halflife_long']
|
||
A0 = params['amount_base']
|
||
gamma = params['burst_gamma']
|
||
|
||
# 1. 频次强度 F = Σ(τ_i × decay(d_i, h_sess))
|
||
F = 0.0
|
||
for session in data.sessions:
|
||
days_ago = (now - session.session_end).total_seconds() / 86400
|
||
F += session.course_weight * self.decay(days_ago, h_sess)
|
||
data.score_frequency = F
|
||
|
||
# 2. 最近温度 R = decay(d_last, h_last)
|
||
if data.days_since_last_session is not None:
|
||
data.score_recency = self.decay(data.days_since_last_session, h_last)
|
||
else:
|
||
data.score_recency = 0.0
|
||
|
||
# 3. 归因充值强度 M = Σ(ln(1+amt/A0) × decay(d_r, h_pay))
|
||
M = 0.0
|
||
for recharge in data.recharges:
|
||
m_amt = math.log1p(recharge.pay_amount / A0)
|
||
M += m_amt * self.decay(recharge.days_ago, h_pay)
|
||
data.score_recharge = M
|
||
|
||
# 4. 时长贡献 D = Σ(sqrt(dur/60) × τ × decay(d, h_sess))
|
||
D = 0.0
|
||
for session in data.sessions:
|
||
days_ago = (now - session.session_end).total_seconds() / 86400
|
||
dur_hours = session.total_duration_minutes / 60.0
|
||
D += math.sqrt(dur_hours) * session.course_weight * self.decay(days_ago, h_sess)
|
||
data.score_duration = D
|
||
|
||
# 5. 频率激增放大 mult = 1 + γ × burst
|
||
# F_short = Σ(τ × decay(d, h_short))
|
||
# F_long = Σ(τ × decay(d, h_long))
|
||
F_short = 0.0
|
||
F_long = 0.0
|
||
for session in data.sessions:
|
||
days_ago = (now - session.session_end).total_seconds() / 86400
|
||
F_short += session.course_weight * self.decay(days_ago, h_short)
|
||
F_long += session.course_weight * self.decay(days_ago, h_long)
|
||
|
||
# burst = max(0, ln(1 + (F_short/F_long - 1)))
|
||
ratio = F_short / (F_long + epsilon)
|
||
if ratio > 1:
|
||
burst = self.safe_ln1p(ratio - 1)
|
||
else:
|
||
burst = 0.0
|
||
|
||
data.burst_multiplier = 1 + gamma * burst
|
||
|
||
# ==========================================================================
|
||
# 数据保存方法
|
||
# ==========================================================================
|
||
|
||
def _save_intimacy_data(self, data_list: List[MemberAssistantIntimacyData]) -> int:
|
||
"""保存亲密数据到DWS表"""
|
||
if not data_list:
|
||
return 0
|
||
|
||
# 先删除已存在的记录
|
||
site_id = data_list[0].site_id
|
||
|
||
# 构建删除条件(使用assistant_no)
|
||
# 注意:assistant_id字段在数据库中存储assistant_no的整数形式
|
||
keys = [(d.member_id, d.assistant_no) for d in data_list]
|
||
conditions = " OR ".join(
|
||
f"(member_id = {m} AND assistant_id = {int(a)})" for m, a in keys
|
||
)
|
||
|
||
delete_sql = f"""
|
||
DELETE FROM billiards_dws.dws_member_assistant_intimacy
|
||
WHERE site_id = %s AND ({conditions})
|
||
"""
|
||
|
||
with self.db.conn.cursor() as cur:
|
||
cur.execute(delete_sql, (site_id,))
|
||
|
||
# 插入新记录
|
||
# 使用assistant_no的整数值作为assistant_id
|
||
insert_sql = """
|
||
INSERT INTO billiards_dws.dws_member_assistant_intimacy (
|
||
site_id, tenant_id, member_id, assistant_id,
|
||
session_count, total_duration_minutes,
|
||
basic_session_count, incentive_session_count,
|
||
days_since_last_session,
|
||
attributed_recharge_count, attributed_recharge_amount,
|
||
score_frequency, score_recency, score_recharge, score_duration,
|
||
burst_multiplier, raw_score, display_score,
|
||
calc_time, created_at, updated_at
|
||
) VALUES (
|
||
%s, %s, %s, %s,
|
||
%s, %s,
|
||
%s, %s,
|
||
%s,
|
||
%s, %s,
|
||
%s, %s, %s, %s,
|
||
%s, %s, %s,
|
||
NOW(), NOW(), NOW()
|
||
)
|
||
"""
|
||
|
||
inserted = 0
|
||
with self.db.conn.cursor() as cur:
|
||
for data in data_list:
|
||
# 将assistant_no转为整数作为assistant_id
|
||
assistant_id = int(data.assistant_no) if data.assistant_no.isdigit() else 0
|
||
cur.execute(insert_sql, (
|
||
data.site_id, data.tenant_id, data.member_id, assistant_id,
|
||
data.session_count, data.total_duration_minutes,
|
||
data.basic_session_count, data.incentive_session_count,
|
||
data.days_since_last_session,
|
||
data.attributed_recharge_count, data.attributed_recharge_amount,
|
||
data.score_frequency, data.score_recency, data.score_recharge, data.score_duration,
|
||
data.burst_multiplier, data.raw_score, data.display_score
|
||
))
|
||
inserted += cur.rowcount
|
||
|
||
# 提交事务
|
||
self.db.conn.commit()
|
||
|
||
return inserted
|
||
|
||
# ==========================================================================
|
||
# 辅助方法
|
||
# ==========================================================================
|
||
|
||
def _load_params(self) -> Dict[str, float]:
|
||
"""加载参数,缺失时使用默认值"""
|
||
params = self.load_index_parameters()
|
||
result = dict(self.DEFAULT_PARAMS)
|
||
result.update(params)
|
||
return result
|
||
|
||
def _get_site_id(self, context: Optional[TaskContext]) -> int:
|
||
"""获取门店ID"""
|
||
if context and hasattr(context, 'store_id') and context.store_id:
|
||
return context.store_id
|
||
|
||
site_id = self.config.get('app.default_site_id')
|
||
if site_id:
|
||
return int(site_id)
|
||
|
||
sql = "SELECT DISTINCT site_id FROM billiards_dwd.dwd_assistant_service_log LIMIT 1"
|
||
rows = self.db.query(sql)
|
||
if rows:
|
||
return int(dict(rows[0])['site_id'])
|
||
|
||
raise ValueError("无法确定门店ID")
|
||
|
||
def _get_tenant_id(self) -> int:
|
||
"""获取租户ID"""
|
||
tenant_id = self.config.get('app.tenant_id')
|
||
if tenant_id:
|
||
return int(tenant_id)
|
||
|
||
sql = "SELECT DISTINCT tenant_id FROM billiards_dwd.dwd_assistant_service_log LIMIT 1"
|
||
rows = self.db.query(sql)
|
||
if rows:
|
||
return int(dict(rows[0])['tenant_id'])
|
||
|
||
return 0
|