"""客户消费数据获取模块(应用 3/6/7 共用)。 从 ETL 库 app.v_* RLS 视图获取客户近 N 个月消费数据,从业务库获取备注。 金额口径统一使用拆分字段(table_charge_money + assistant_pd/cx_money + goods_money),禁止 consume_money。 会员信息通过 member_id JOIN v_dim_member (scd2_is_current=1) 获取。 """ # CHANGE 2026-03-23 | Prompt: FDW 迁移——fdw_etl.* → app.* 直连 ETL 库 # intent: 将所有 fdw_etl.* 外部表引用改为 app.v_* RLS 视图(直连 ETL 库),列名同步修正 # 连接方式不变(get_etl_readonly_connection),仅改 SQL 表名和列名 from __future__ import annotations import asyncio import logging from datetime import date, datetime from decimal import Decimal from functools import partial from typing import Any from app.database import get_connection, get_etl_readonly_connection logger = logging.getLogger(__name__) # 消费记录最大返回数 MAX_CONSUMPTION_RECORDS = 100 # 备注最大返回数 MAX_NOTES = 50 # 备注单条最大字符数 MAX_NOTE_LENGTH = 500 # FDW 查询超时(秒) FDW_QUERY_TIMEOUT_SEC = 5 async def fetch_member_consumption_data( site_id: int, member_id: int, months: int = 3, ) -> dict[str, Any]: """获取客户近 N 个月消费数据。 返回结构对应 NS2 设计文档中 main_data: - consumption_records: 消费记录列表(最多 100 条,settle_date DESC) - member_cards: 会员卡明细列表 - card_balance_total: 储值卡余额合计 - stored_value_balance_total: 储值余额合计 - expected_visit_date: 预计到店日期 - days_since_last_visit: 距上次到店天数 - member_nickname: 会员昵称 Raises: TimeoutError: FDW 查询超时(>5s) ConnectionError: FDW 连接失败 """ loop = asyncio.get_event_loop() return await loop.run_in_executor( None, partial(_fetch_member_consumption_data_sync, site_id, member_id, months), ) def _fetch_member_consumption_data_sync( site_id: int, member_id: int, months: int, ) -> dict[str, Any]: """同步实现:在单个 FDW 连接上串行执行多个查询。""" conn = None try: conn = get_etl_readonly_connection(site_id) # RLS 隔离 + 语句超时(get_etl_readonly_connection 的 SET LOCAL 在 commit 后失效, # 需在查询事务中重新设置) with conn.cursor() as cur: cur.execute( "SET LOCAL app.current_site_id = %s", (str(site_id),) ) cur.execute( "SET LOCAL statement_timeout = %s", (f"{FDW_QUERY_TIMEOUT_SEC * 1000}",), # 毫秒 ) # 1. 会员昵称 nickname = _query_member_nickname(conn, member_id) # 2. 消费记录(台桌结账 + 商城订单) records, total_count = _query_consumption_records(conn, member_id, months) # 3. 会员卡明细 cards = _query_member_cards(conn, member_id) # 4. 余额汇总 balance_info = _query_balance_summary(conn, member_id) # 5. 到店数据 visit_info = _query_visit_info(conn, member_id) result: dict[str, Any] = { "member_nickname": nickname, "consumption_records": records, "member_cards": cards, "card_balance_total": balance_info.get("card_balance_total", Decimal("0")), "stored_value_balance_total": balance_info.get( "stored_value_balance_total", Decimal("0") ), "expected_visit_date": visit_info.get("expected_visit_date"), "days_since_last_visit": visit_info.get("days_since_last_visit"), } if total_count > MAX_CONSUMPTION_RECORDS: result["truncated"] = True result["total_count"] = total_count conn.commit() return result except Exception as e: # psycopg2 超时异常包含 "statement timeout" err_msg = str(e).lower() if "statement timeout" in err_msg or "timeout" in err_msg: raise TimeoutError( f"FDW 查询超时(>{FDW_QUERY_TIMEOUT_SEC}s): member_id={member_id}" ) from e if "connection" in err_msg or "connect" in err_msg: raise ConnectionError( f"FDW 连接失败: member_id={member_id}, error={e}" ) from e raise finally: if conn: conn.close() def _query_member_nickname(conn: Any, member_id: int) -> str: """从 app.v_dim_member 获取会员昵称(scd2_is_current=1)。""" with conn.cursor() as cur: cur.execute( """ SELECT nickname FROM app.v_dim_member WHERE member_id = %s AND scd2_is_current = 1 LIMIT 1 """, (member_id,), ) row = cur.fetchone() return row[0] if row and row[0] else "" def _query_consumption_records( conn: Any, member_id: int, months: int ) -> tuple[list[dict], int]: """从 app.v_dwd_settlement_head + app.v_dwd_table_fee_log 获取消费记录。 仅包含正向交易(settle_type IN (1, 3))。 ⚠️ 费用拆分字段(table_charge_money, assistant_pd/cx_money)在 settlement_head 上。 ⚠️ table_fee_log 提供台桌时长(real_table_use_seconds)和桌台ID(site_table_id)。 ⚠️ 列名映射: settle_date→create_time, settle_id→order_settle_id, sale_amount→ledger_amount。 返回 (records, total_count)。 """ with conn.cursor() as cur: # 先查总数 cur.execute( """ SELECT COUNT(*) FROM app.v_dwd_settlement_head sh WHERE sh.member_id = %s AND sh.settle_type IN (1, 3) AND sh.create_time >= (CURRENT_DATE - INTERVAL '%s months') """, (member_id, months), ) total_count = cur.fetchone()[0] # 查询消费记录(限制 100 条) # table_charge_money/assistant_pd_money/assistant_cx_money 直接从 settlement_head 取 # 台桌信息从 table_fee_log 取(site_table_id, real_table_use_seconds) # 商品金额从 store_goods_sale 聚合 # 助教姓名从 service_log JOIN dim_assistant 获取 cur.execute( """ SELECT sh.create_time AS settle_date, sh.settle_type, sh.table_charge_money + sh.assistant_pd_money + sh.assistant_cx_money + COALESCE(sg.goods_money, 0) AS items_sum, COALESCE(sh.table_charge_money, 0) AS table_charge_money, COALESCE(sh.assistant_pd_money, 0) AS assistant_pd_money, COALESCE(sh.assistant_cx_money, 0) AS assistant_cx_money, COALESCE(sg.goods_money, 0) AS goods_money, tfl.site_table_id AS room_name, COALESCE(tfl.real_table_use_seconds / 60, 0) AS duration_minutes, coaches.assistant_names FROM app.v_dwd_settlement_head sh LEFT JOIN app.v_dwd_table_fee_log tfl ON sh.order_settle_id = tfl.order_settle_id LEFT JOIN LATERAL ( SELECT SUM(sgs.ledger_amount) AS goods_money FROM app.v_dwd_store_goods_sale sgs WHERE sgs.order_settle_id = sh.order_settle_id ) sg ON true LEFT JOIN LATERAL ( SELECT string_agg(DISTINCT COALESCE(da.nickname, da.real_name, ''), ', ') AS assistant_names FROM app.v_dwd_assistant_service_log sl LEFT JOIN app.v_dim_assistant da ON sl.site_assistant_id = da.assistant_id AND da.scd2_is_current = 1 WHERE sl.order_settle_id = sh.order_settle_id AND sl.is_delete = 0 ) coaches ON true WHERE sh.member_id = %s AND sh.settle_type IN (1, 3) AND sh.create_time >= (CURRENT_DATE - INTERVAL '%s months') ORDER BY sh.create_time DESC LIMIT %s """, (member_id, months, MAX_CONSUMPTION_RECORDS), ) columns = [desc[0] for desc in cur.description] rows = cur.fetchall() records = [] for row in rows: record = {} for col, val in zip(columns, row): if isinstance(val, (date, datetime)): record[col] = val.isoformat() elif isinstance(val, Decimal): record[col] = float(val) else: record[col] = val # assistant_names: 确保是列表 names = record.get("assistant_names") if names and isinstance(names, str): record["assistant_names"] = [n.strip() for n in names.split(",") if n.strip()] elif not names: record["assistant_names"] = [] records.append(record) return records, total_count def _query_member_cards(conn: Any, member_id: int) -> list[dict]: """从 app.v_dim_member_card_account 获取会员卡明细。 ⚠️ 列名映射: member_id→tenant_member_id, gift_balance 不存在(用 balance - principal_balance 近似)。 """ with conn.cursor() as cur: cur.execute( """ SELECT member_card_type_name AS card_type, COALESCE(balance, 0) AS balance, COALESCE(balance, 0) - COALESCE(principal_balance, 0) AS gift_balance FROM app.v_dim_member_card_account WHERE tenant_member_id = %s AND scd2_is_current = 1 """, (member_id,), ) rows = cur.fetchall() return [ { "card_type": row[0] or "", "balance": float(row[1]) if row[1] else 0.0, "gift_balance": float(row[2]) if row[2] else 0.0, } for row in rows ] def _query_balance_summary(conn: Any, member_id: int) -> dict: """从 app.v_dws_member_consumption_summary 获取余额汇总。 ⚠️ 列名映射: recharge_card_amount→cash_card_balance, balance_amount→total_card_balance。 """ with conn.cursor() as cur: cur.execute( """ SELECT COALESCE(cash_card_balance, 0) AS card_balance_total, COALESCE(total_card_balance, 0) AS stored_value_balance_total FROM app.v_dws_member_consumption_summary WHERE member_id = %s LIMIT 1 """, (member_id,), ) row = cur.fetchone() if not row: return { "card_balance_total": Decimal("0"), "stored_value_balance_total": Decimal("0"), } return { "card_balance_total": row[0], "stored_value_balance_total": row[1], } def _query_visit_info(conn: Any, member_id: int) -> dict: """从 app.v_dws_member_visit_detail 获取到店数据,推算预计到店日期。 ⚠️ 列名映射: last_visit_date→MAX(visit_date), avg_visit_interval_days 需从明细计算。 """ with conn.cursor() as cur: # 获取最近到店日期和平均到店间隔 cur.execute( """ WITH visits AS ( SELECT visit_date, LAG(visit_date) OVER (ORDER BY visit_date) AS prev_visit FROM app.v_dws_member_visit_detail WHERE member_id = %s ) SELECT MAX(visit_date) AS last_visit_date, AVG(visit_date - prev_visit) AS avg_visit_interval_days FROM visits WHERE prev_visit IS NOT NULL """, (member_id,), ) row = cur.fetchone() if not row or not row[0]: return {"expected_visit_date": None, "days_since_last_visit": None} last_visit = row[0] avg_interval = row[1] today = date.today() days_since = (today - last_visit).days if isinstance(last_visit, date) else None expected = None if avg_interval and last_visit: from datetime import timedelta expected_date = last_visit + timedelta(days=int(avg_interval)) expected = expected_date.isoformat() return { "expected_visit_date": expected, "days_since_last_visit": days_since, } async def fetch_member_notes( site_id: int, member_id: int, limit: int = MAX_NOTES, ) -> list[dict]: """获取客户的全部备注(按 created_at DESC,最多 limit 条)。 从业务库 biz.notes 查询。 单条备注内容截断 500 字符,超出附加"…(已截断)"。 """ loop = asyncio.get_event_loop() return await loop.run_in_executor( None, partial(_fetch_member_notes_sync, site_id, member_id, limit), ) def _fetch_member_notes_sync( site_id: int, member_id: int, limit: int, ) -> list[dict]: """同步实现:从业务库查询备注。""" conn = get_connection() try: with conn.cursor() as cur: cur.execute( """ SELECT n.content, u.nickname AS recorded_by, n.created_at FROM biz.notes n LEFT JOIN biz.coach_tasks ct ON ct.id = n.task_id LEFT JOIN public.users u ON u.id = n.user_id WHERE n.target_id = %s AND n.site_id = %s ORDER BY n.created_at DESC LIMIT %s """, (member_id, site_id, limit), ) rows = cur.fetchall() notes = [] for row in rows: content = row[0] or "" recorded_by = row[1] or "" created_at = row[2] # 截断处理 if len(content) > MAX_NOTE_LENGTH: content = content[:MAX_NOTE_LENGTH] + "…(已截断)" notes.append({ "recorded_by": recorded_by, "content": content, "created_at": created_at.isoformat() if isinstance(created_at, (date, datetime)) else str(created_at) if created_at else "", }) return notes finally: conn.close()