Files
ZQYY.FQ-ETL/tasks/dws/member_visit_task.py

424 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
会员来店明细任务
功能说明:
"会员+订单"为粒度,记录每次来店消费明细
数据来源:
- dwd_settlement_head: 结账单头表
- dwd_assistant_service_log: 助教服务流水
- dim_member: 会员维度
- dim_table: 台桌维度
- cfg_area_category: 区域分类映射
目标表:
billiards_dws.dws_member_visit_detail
更新策略:
- 更新频率:每日增量更新
- 幂等方式delete-before-insert按日期窗口
业务规则:
- 散客处理member_id=0 不进入此表
- 区域分类使用cfg_area_category映射
- 助教服务以JSON格式存储多个助教的服务明细
作者ETL团队
创建日期2026-02-01
"""
from __future__ import annotations
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Set, Tuple
from .base_dws_task import BaseDwsTask, TaskContext
class MemberVisitTask(BaseDwsTask):
"""
会员来店明细任务
记录每个会员每次来店的:
- 台桌信息和区域分类
- 消费金额明细
- 支付方式明细
- 助教服务明细JSON格式
"""
def get_task_code(self) -> str:
return "DWS_MEMBER_VISIT"
def get_target_table(self) -> str:
return "dws_member_visit_detail"
def get_primary_keys(self) -> List[str]:
return ["site_id", "member_id", "order_settle_id"]
# ==========================================================================
# ETL主流程
# ==========================================================================
def extract(self, context: TaskContext) -> Dict[str, Any]:
"""
提取数据
"""
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
site_id = context.store_id
self.logger.info(
"%s: 提取数据,日期范围 %s ~ %s",
self.get_task_code(), start_date, end_date
)
# 1. 获取结账单
settlements = self._extract_settlements(site_id, start_date, end_date)
# 2. 获取助教服务明细
assistant_services = self._extract_assistant_services(site_id, start_date, end_date)
# 2.1 获取台费时长(真实秒数)
table_fee_durations = self._extract_table_fee_durations(site_id, start_date, end_date)
# 3. 获取会员信息
member_info = self._extract_member_info(site_id)
# 4. 获取台桌信息
table_info = self._extract_table_info(site_id)
# 5. 加载配置
self.load_config_cache()
return {
'settlements': settlements,
'assistant_services': assistant_services,
'member_info': member_info,
'table_info': table_info,
'table_fee_durations': table_fee_durations,
'start_date': start_date,
'end_date': end_date,
'site_id': site_id
}
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
"""
转换数据
"""
settlements = extracted['settlements']
assistant_services = extracted['assistant_services']
member_info = extracted['member_info']
table_info = extracted['table_info']
table_fee_durations = extracted['table_fee_durations']
site_id = extracted['site_id']
self.logger.info(
"%s: 转换数据,%d 条结账单",
self.get_task_code(), len(settlements)
)
# 构建助教服务索引order_settle_id -> [services]
service_index = self._build_service_index(assistant_services)
# 构建台费时长索引order_settle_id -> total_seconds
table_duration_index = {
row.get('order_settle_id'): self.safe_int(row.get('table_use_seconds', 0))
for row in (table_fee_durations or [])
if row.get('order_settle_id')
}
results = []
for settle in settlements:
member_id = settle.get('member_id')
# 跳过散客
if self.is_guest(member_id):
continue
order_settle_id = settle.get('order_settle_id')
table_id = settle.get('table_id')
memb_info = member_info.get(member_id, {})
tbl_info = table_info.get(table_id, {})
services = service_index.get(order_settle_id, [])
# 获取区域分类
area_name = tbl_info.get('area_name')
area_cat = self.get_area_category(area_name)
# 构建助教服务JSON
assistant_services_json = self._build_assistant_services_json(services)
# 计算时长
table_seconds = table_duration_index.get(order_settle_id, 0)
table_duration = self._calc_table_duration(table_seconds)
assistant_duration = sum(
self.safe_int(s.get('income_seconds', 0))
for s in services
) // 60 # 转为分钟
record = {
'site_id': site_id,
'tenant_id': self.config.get("app.tenant_id", site_id),
'member_id': member_id,
'order_settle_id': order_settle_id,
'visit_date': settle.get('visit_date'),
'visit_time': settle.get('create_time'),
# 会员信息
'member_nickname': memb_info.get('nickname'),
'member_mobile': self._mask_mobile(memb_info.get('mobile')),
'member_birthday': memb_info.get('birthday'),
# 台桌信息
'table_id': table_id,
'table_name': tbl_info.get('table_name'),
'area_name': area_name,
'area_category': area_cat.get('category_name'),
# 消费金额
'table_fee': self.safe_decimal(settle.get('table_charge_money', 0)),
'goods_amount': self.safe_decimal(settle.get('goods_money', 0)),
'assistant_amount': self.safe_decimal(settle.get('assistant_pd_money', 0)) + \
self.safe_decimal(settle.get('assistant_cx_money', 0)),
'total_consume': self.safe_decimal(settle.get('consume_money', 0)),
'total_discount': self._calc_total_discount(settle),
'actual_pay': self.safe_decimal(settle.get('pay_amount', 0)),
# 支付方式
'cash_pay': self.safe_decimal(settle.get('pay_amount', 0)),
'cash_card_pay': self.safe_decimal(settle.get('balance_amount', 0)),
'gift_card_pay': self.safe_decimal(settle.get('gift_card_amount', 0)),
'groupbuy_pay': self.safe_decimal(settle.get('coupon_amount', 0)),
# 时长
'table_duration_min': table_duration,
'assistant_duration_min': assistant_duration,
# 助教服务明细
'assistant_services': assistant_services_json,
}
results.append(record)
return results
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
"""
加载数据
"""
if not transformed:
self.logger.info("%s: 无数据需要写入", self.get_task_code())
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
deleted = self.delete_existing_data(context, date_col="visit_date")
inserted = self.bulk_insert(transformed)
self.logger.info(
"%s: 加载完成,删除 %d 行,插入 %d",
self.get_task_code(), deleted, inserted
)
return {
"counts": {
"fetched": len(transformed),
"inserted": inserted,
"updated": 0,
"skipped": 0,
"errors": 0
},
"extra": {"deleted": deleted}
}
# ==========================================================================
# 数据提取方法
# ==========================================================================
def _extract_settlements(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取结账单
"""
sql = """
SELECT
order_settle_id,
order_trade_no,
table_id,
member_id,
create_time,
pay_time,
DATE(pay_time) AS visit_date,
consume_money,
pay_amount,
table_charge_money,
goods_money,
assistant_pd_money,
assistant_cx_money,
coupon_amount,
adjust_amount,
member_discount_amount,
rounding_amount,
gift_card_amount,
balance_amount,
recharge_card_amount
FROM billiards_dwd.dwd_settlement_head
WHERE site_id = %s
AND DATE(pay_time) >= %s
AND DATE(pay_time) <= %s
AND member_id IS NOT NULL
AND member_id != 0
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_assistant_services(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取助教服务明细
"""
sql = """
SELECT
order_settle_id,
site_assistant_id AS assistant_id,
nickname AS assistant_nickname,
income_seconds,
ledger_amount
FROM billiards_dwd.dwd_assistant_service_log
WHERE site_id = %s
AND DATE(start_use_time) >= %s
AND DATE(start_use_time) <= %s
AND is_delete = 0
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_table_fee_durations(
self,
site_id: int,
start_date: date,
end_date: date
) -> List[Dict[str, Any]]:
"""
提取台费时长(真实秒数)
"""
sql = """
SELECT
order_settle_id,
SUM(COALESCE(real_table_use_seconds, 0)) AS table_use_seconds
FROM billiards_dwd.dwd_table_fee_log
WHERE site_id = %s
AND DATE(ledger_end_time) >= %s
AND DATE(ledger_end_time) <= %s
AND COALESCE(is_delete, 0) = 0
GROUP BY order_settle_id
"""
rows = self.db.query(sql, (site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_member_info(self, site_id: int) -> Dict[int, Dict[str, Any]]:
"""
提取会员信息
"""
sql = """
SELECT
member_id,
nickname,
mobile,
birthday
FROM billiards_dwd.dim_member
WHERE site_id = %s
AND scd2_is_current = 1
"""
rows = self.db.query(sql, (site_id,))
return {r['member_id']: dict(r) for r in (rows or [])}
def _extract_table_info(self, site_id: int) -> Dict[int, Dict[str, Any]]:
"""
提取台桌信息
"""
sql = """
SELECT
site_table_id AS table_id,
site_table_name AS table_name,
site_table_area_name AS area_name
FROM billiards_dwd.dim_table
WHERE site_id = %s
AND scd2_is_current = 1
"""
rows = self.db.query(sql, (site_id,))
return {r['table_id']: dict(r) for r in (rows or [])}
# ==========================================================================
# 工具方法
# ==========================================================================
def _build_service_index(
self,
services: List[Dict[str, Any]]
) -> Dict[int, List[Dict[str, Any]]]:
"""
构建助教服务索引
"""
index: Dict[int, List[Dict[str, Any]]] = {}
for service in services:
order_id = service.get('order_settle_id')
if order_id:
if order_id not in index:
index[order_id] = []
index[order_id].append(service)
return index
def _build_assistant_services_json(
self,
services: List[Dict[str, Any]]
) -> Optional[str]:
"""
构建助教服务JSON
"""
if not services:
return None
json_data = []
for s in services:
json_data.append({
'assistant_id': s.get('assistant_id'),
'nickname': s.get('assistant_nickname'),
'duration_min': self.safe_int(s.get('income_seconds', 0)) // 60,
'amount': float(self.safe_decimal(s.get('ledger_amount', 0)))
})
return json.dumps(json_data, ensure_ascii=False)
def _calc_table_duration(self, table_use_seconds: int) -> int:
"""
计算台桌使用时长(分钟)
使用真实台费流水秒数
"""
if not table_use_seconds or table_use_seconds <= 0:
return 0
return int(table_use_seconds // 60)
def _calc_total_discount(self, settle: Dict[str, Any]) -> Decimal:
"""
计算总优惠
"""
adjust = self.safe_decimal(settle.get('adjust_amount', 0))
member_discount = self.safe_decimal(settle.get('member_discount_amount', 0))
rounding = self.safe_decimal(settle.get('rounding_amount', 0))
return adjust + member_discount + rounding
def _mask_mobile(self, mobile: Optional[str]) -> Optional[str]:
"""手机号脱敏"""
if not mobile or len(mobile) < 7:
return mobile
return mobile[:3] + "****" + mobile[-4:]
# 便于外部导入
__all__ = ['MemberVisitTask']