# -*- coding: utf-8 -*- """ 会员来店明细任务 功能说明: 以"会员+订单"为粒度,记录每次来店消费明细 数据来源: - dwd_settlement_head: 结账单头表 - dwd_assistant_service_log: 助教服务流水 - dim_member: 会员维度 - dim_table: 台桌维度 - cfg_area_category: 区域分类映射 目标表: billiards_dws.dws_member_visit_detail 更新策略: - 更新频率:每日增量更新 - 幂等方式:delete-before-insert(按日期窗口) 业务规则: - 散客处理:member_id=0 不进入此表 - 区域分类:使用cfg_area_category映射 - 助教服务:以JSON格式存储多个助教的服务明细 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations import json from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Set, Tuple from .base_dws_task import BaseDwsTask, TaskContext class MemberVisitTask(BaseDwsTask): """ 会员来店明细任务 记录每个会员每次来店的: - 台桌信息和区域分类 - 消费金额明细 - 支付方式明细 - 助教服务明细(JSON格式) """ def get_task_code(self) -> str: return "DWS_MEMBER_VISIT" def get_target_table(self) -> str: return "dws_member_visit_detail" def get_primary_keys(self) -> List[str]: return ["site_id", "member_id", "order_settle_id"] # ========================================================================== # ETL主流程 # ========================================================================== def extract(self, context: TaskContext) -> Dict[str, Any]: """ 提取数据 """ start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end site_id = context.store_id self.logger.info( "%s: 提取数据,日期范围 %s ~ %s", self.get_task_code(), start_date, end_date ) # 1. 获取结账单 settlements = self._extract_settlements(site_id, start_date, end_date) # 2. 获取助教服务明细 assistant_services = self._extract_assistant_services(site_id, start_date, end_date) # 3. 获取会员信息 member_info = self._extract_member_info(site_id) # 4. 获取台桌信息 table_info = self._extract_table_info(site_id) # 5. 加载配置 self.load_config_cache() return { 'settlements': settlements, 'assistant_services': assistant_services, 'member_info': member_info, 'table_info': table_info, 'start_date': start_date, 'end_date': end_date, 'site_id': site_id } def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]: """ 转换数据 """ settlements = extracted['settlements'] assistant_services = extracted['assistant_services'] member_info = extracted['member_info'] table_info = extracted['table_info'] site_id = extracted['site_id'] self.logger.info( "%s: 转换数据,%d 条结账单", self.get_task_code(), len(settlements) ) # 构建助教服务索引:order_settle_id -> [services] service_index = self._build_service_index(assistant_services) results = [] for settle in settlements: member_id = settle.get('member_id') # 跳过散客 if self.is_guest(member_id): continue order_settle_id = settle.get('order_settle_id') table_id = settle.get('table_id') memb_info = member_info.get(member_id, {}) tbl_info = table_info.get(table_id, {}) services = service_index.get(order_settle_id, []) # 获取区域分类 area_name = tbl_info.get('area_name') area_cat = self.get_area_category(area_name) # 构建助教服务JSON assistant_services_json = self._build_assistant_services_json(services) # 计算时长 table_duration = self._calc_table_duration(settle) assistant_duration = sum( self.safe_int(s.get('income_seconds', 0)) for s in services ) // 60 # 转为分钟 record = { 'site_id': site_id, 'tenant_id': self.config.get("app.tenant_id", site_id), 'member_id': member_id, 'order_settle_id': order_settle_id, 'visit_date': settle.get('visit_date'), 'visit_time': settle.get('create_time'), # 会员信息 'member_nickname': memb_info.get('nickname'), 'member_mobile': self._mask_mobile(memb_info.get('mobile')), 'member_birthday': memb_info.get('birthday'), # 台桌信息 'table_id': table_id, 'table_name': tbl_info.get('table_name'), 'area_name': area_name, 'area_category': area_cat.get('category_name'), # 消费金额 'table_fee': self.safe_decimal(settle.get('table_charge_money', 0)), 'goods_amount': self.safe_decimal(settle.get('goods_money', 0)), 'assistant_amount': self.safe_decimal(settle.get('assistant_pd_money', 0)) + \ self.safe_decimal(settle.get('assistant_cx_money', 0)), 'total_consume': self.safe_decimal(settle.get('consume_money', 0)), 'total_discount': self._calc_total_discount(settle), 'actual_pay': self.safe_decimal(settle.get('pay_amount', 0)), # 支付方式 'cash_pay': self.safe_decimal(settle.get('pay_amount', 0)), 'cash_card_pay': self.safe_decimal(settle.get('balance_amount', 0)), 'gift_card_pay': self.safe_decimal(settle.get('gift_card_amount', 0)), 'groupbuy_pay': self.safe_decimal(settle.get('coupon_amount', 0)), # 时长 'table_duration_min': table_duration, 'assistant_duration_min': assistant_duration, # 助教服务明细 'assistant_services': assistant_services_json, } results.append(record) return results def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict: """ 加载数据 """ if not transformed: self.logger.info("%s: 无数据需要写入", self.get_task_code()) return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}} deleted = self.delete_existing_data(context, date_col="visit_date") inserted = self.bulk_insert(transformed) self.logger.info( "%s: 加载完成,删除 %d 行,插入 %d 行", self.get_task_code(), deleted, inserted ) return { "counts": { "fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0 }, "extra": {"deleted": deleted} } # ========================================================================== # 数据提取方法 # ========================================================================== def _extract_settlements( self, site_id: int, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """ 提取结账单 """ sql = """ SELECT order_settle_id, order_trade_no, table_id, member_id, create_time, DATE(create_time) AS visit_date, consume_money, pay_amount, table_charge_money, goods_money, assistant_pd_money, assistant_cx_money, coupon_amount, adjust_amount, member_discount_amount, rounding_amount, gift_card_amount, balance_amount, recharge_card_amount FROM billiards_dwd.dwd_settlement_head WHERE site_id = %s AND DATE(create_time) >= %s AND DATE(create_time) <= %s AND member_id IS NOT NULL AND member_id != 0 """ rows = self.db.query(sql, (site_id, start_date, end_date)) return [dict(row) for row in rows] if rows else [] def _extract_assistant_services( self, site_id: int, start_date: date, end_date: date ) -> List[Dict[str, Any]]: """ 提取助教服务明细 """ sql = """ SELECT order_settle_id, site_assistant_id AS assistant_id, nickname AS assistant_nickname, income_seconds, ledger_amount FROM billiards_dwd.dwd_assistant_service_log WHERE site_id = %s AND DATE(start_use_time) >= %s AND DATE(start_use_time) <= %s """ rows = self.db.query(sql, (site_id, start_date, end_date)) return [dict(row) for row in rows] if rows else [] def _extract_member_info(self, site_id: int) -> Dict[int, Dict[str, Any]]: """ 提取会员信息 """ sql = """ SELECT member_id, nickname, mobile, birthday FROM billiards_dwd.dim_member WHERE site_id = %s """ rows = self.db.query(sql, (site_id,)) return {r['member_id']: dict(r) for r in (rows or [])} def _extract_table_info(self, site_id: int) -> Dict[int, Dict[str, Any]]: """ 提取台桌信息 """ sql = """ SELECT site_table_id AS table_id, site_table_name AS table_name, site_table_area_name AS area_name FROM billiards_dwd.dim_table WHERE site_id = %s AND valid_to IS NULL """ rows = self.db.query(sql, (site_id,)) return {r['table_id']: dict(r) for r in (rows or [])} # ========================================================================== # 工具方法 # ========================================================================== def _build_service_index( self, services: List[Dict[str, Any]] ) -> Dict[int, List[Dict[str, Any]]]: """ 构建助教服务索引 """ index: Dict[int, List[Dict[str, Any]]] = {} for service in services: order_id = service.get('order_settle_id') if order_id: if order_id not in index: index[order_id] = [] index[order_id].append(service) return index def _build_assistant_services_json( self, services: List[Dict[str, Any]] ) -> Optional[str]: """ 构建助教服务JSON """ if not services: return None json_data = [] for s in services: json_data.append({ 'assistant_id': s.get('assistant_id'), 'nickname': s.get('assistant_nickname'), 'duration_min': self.safe_int(s.get('income_seconds', 0)) // 60, 'amount': float(self.safe_decimal(s.get('ledger_amount', 0))) }) return json.dumps(json_data, ensure_ascii=False) def _calc_table_duration(self, settle: Dict[str, Any]) -> int: """ 计算台桌使用时长(分钟) 简化处理:根据台费和假设单价估算 """ table_fee = self.safe_decimal(settle.get('table_charge_money', 0)) if table_fee <= 0: return 0 # 假设平均台费单价为0.5元/分钟 return int(table_fee / Decimal('0.5')) def _calc_total_discount(self, settle: Dict[str, Any]) -> Decimal: """ 计算总优惠 """ adjust = self.safe_decimal(settle.get('adjust_amount', 0)) member_discount = self.safe_decimal(settle.get('member_discount_amount', 0)) rounding = self.safe_decimal(settle.get('rounding_amount', 0)) return adjust + member_discount + rounding def _mask_mobile(self, mobile: Optional[str]) -> Optional[str]: """手机号脱敏""" if not mobile or len(mobile) < 7: return mobile return mobile[:3] + "****" + mobile[-4:] # 便于外部导入 __all__ = ['MemberVisitTask']