"""应用 2 财务洞察 Prompt 拼装。 cron 每日 10:00 预热触发,对所有筛选组合(时间 × 区域)生成洞察。 - 数据源:board_service.get_finance_board(time, area, compare=1, site_id) - 筛选维度:8 个时间维度 × 9 个区域 = 72 组合 - 输出字段:insights 数组(seq + title + body) - system prompt 在百炼控制台配置 Prompt 中 board_data 字段名会自动翻译为中文(KEY_TRANSLATIONS), 目的:减少 AI 理解英文变量的成本,生成的洞察正文可读性更强。 """ from __future__ import annotations import json import logging from datetime import datetime from typing import Any from app.services.board_service import get_finance_board, _calc_date_range, _calc_prev_range logger = logging.getLogger(__name__) # App2 时间维度 → board_service 时间枚举 DIMENSION_MAP: dict[str, str] = { "this_month": "month", "last_month": "lastMonth", "this_week": "week", "last_week": "lastWeek", "this_quarter": "quarter", "last_quarter": "lastQuarter", "last_3_months": "last_3m", "last_6_months": "last_6m", } DIMENSION_LABELS: dict[str, str] = { "this_month": "本月", "last_month": "上月", "this_week": "本周", "last_week": "上周", "this_quarter": "本季度", "last_quarter": "上季度", "last_3_months": "近三个月(不含本月)", "last_6_months": "近六个月(不含本月)", } # 区域枚举与中文标签(与 miniprogram/board-finance.ts areaOptions 对齐) AREA_OPTIONS: tuple[str, ...] = ( "all", "hall", "hallA", "hallB", "hallC", "vip", "snooker", "mahjong", "ktv", ) AREA_LABELS: dict[str, str] = { "all": "全部区域", "hall": "大厅", "hallA": "A区", "hallB": "B区", "hallC": "C区", "vip": "台球包厢", "snooker": "斯诺克", "mahjong": "麻将房", "ktv": "团建房", } # 业务字段 → 中文名。覆盖 board_service 返回的所有层级字段。 # 只做键名翻译,不改变值与结构;未命中的键原样保留。 KEY_TRANSLATIONS: dict[str, str] = { # 顶层板块 "overview": "经营一览", "recharge": "预收资产", "revenue": "应计收入确认", "cashflow": "现金流入", "expense": "现金流出", "coach_analysis": "助教分析", # 经营一览 "occurrence": "发生额", "discount": "总优惠", "discount_rate": "优惠率", "confirmed_revenue": "成交收入", "cash_in": "现金流入", "cash_out": "现金流出", "cash_balance": "现金结余", "balance_rate": "结余率", # 预收资产 "actual_income": "储值卡充值实收", "first_charge": "首充", "renew_charge": "续费", "consumed": "储值卡消耗", "card_balance": "储值卡总余额", "all_card_balance": "全类别卡余额合计", "gift_rows": "赠送卡矩阵", "liquor": "酒水卡", "table_fee": "台费卡", "voucher": "抵用券", # 应计收入确认 "total_occurrence": "发生额合计", "discount_total": "优惠合计", "confirmed_total": "确认收入合计", "structure_rows": "收入结构", "price_items": "价目明细", "discount_items": "优惠明细", "channel_items": "渠道明细", "booked": "入账金额", "booked_compare": "入账环比", # 现金流入/流出 "consume_items": "消费收款项", "recharge_items": "充值收款项", "operation_items": "运营支出", "fixed_items": "固定支出", "coach_items": "助教支出", "platform_items": "平台支出", # 助教分析 "basic": "基础助教", "incentive": "激励助教", "total_pay": "合计薪酬", "total_share": "合计分成", "avg_hourly": "平均时薪", "level": "级别", "pay": "薪酬", "share": "分成", "hourly": "时薪", "rows": "明细", # 通用元素 "label": "名称", "amount": "金额", "desc": "说明", "total": "合计", "value": "数值", "compare": "环比", "id": "编号", # 环比后缀(小程序约定) "occurrence_compare": "发生额环比", "occurrence_down": "发生额是否下降", "occurrence_flat": "发生额是否持平", "discount_compare": "总优惠环比", "discount_down": "总优惠是否下降", "discount_flat": "总优惠是否持平", "discount_rate_compare": "优惠率环比", "discount_rate_down": "优惠率是否下降", "discount_rate_flat": "优惠率是否持平", "confirmed_revenue_compare": "成交收入环比", "confirmed_revenue_down": "成交收入是否下降", "confirmed_revenue_flat": "成交收入是否持平", "cash_in_compare": "现金流入环比", "cash_in_down": "现金流入是否下降", "cash_in_flat": "现金流入是否持平", "cash_out_compare": "现金流出环比", "cash_out_down": "现金流出是否下降", "cash_out_flat": "现金流出是否持平", "cash_balance_compare": "现金结余环比", "cash_balance_down": "现金结余是否下降", "cash_balance_flat": "现金结余是否持平", "balance_rate_compare": "结余率环比", "balance_rate_down": "结余率是否下降", "balance_rate_flat": "结余率是否持平", "actual_income_compare": "储值卡充值实收环比", "actual_income_down": "储值卡充值实收是否下降", "first_charge_compare": "首充环比", "first_charge_down": "首充是否下降", "renew_charge_compare": "续费环比", "renew_charge_down": "续费是否下降", "consumed_compare": "储值卡消耗环比", "consumed_down": "储值卡消耗是否下降", "card_balance_compare": "储值卡总余额环比", "card_balance_down": "储值卡总余额是否下降", "all_card_balance_compare": "全类别卡余额合计环比", "all_card_balance_down": "全类别卡余额合计是否下降", "total_compare": "合计环比", "total_down": "合计是否下降", "total_flat": "合计是否持平", "total_pay_compare": "合计薪酬环比", "total_pay_down": "合计薪酬是否下降", "total_share_compare": "合计分成环比", "total_share_down": "合计分成是否下降", "avg_hourly_compare": "平均时薪环比", "avg_hourly_flat": "平均时薪是否持平", "pay_compare": "薪酬环比", "pay_down": "薪酬是否下降", "share_compare": "分成环比", "share_down": "分成是否下降", "hourly_compare": "时薪环比", "hourly_flat": "时薪是否持平", # 赠送卡矩阵 "wine": "酒水", "table": "台费", "coupon": "抵用券", # 元数据 "down": "是否下降", "flat": "是否持平", } # 裁剪时丢弃的"冗余"字段:_down / _flat 布尔元数据(*_compare 字符串已携带符号) _DROP_SUFFIX = ("_down", "_flat") # 行级明细字段:展示用,AI 洞察不需要 _DROP_DETAIL_KEYS = { "structure_rows", "price_items", "channel_items", "gift_rows", "discount_items", # 2026-04-22:升顶层"优惠构成"后,明细源从 revenue 里 drop 去重 } def _is_drop_key(k: str) -> bool: if not isinstance(k, str): return False if k in _DROP_DETAIL_KEYS: return True return k.endswith(_DROP_SUFFIX) def _slim(data: Any) -> Any: """递归裁剪:drop 明细 + _down/_flat + None 值。""" if isinstance(data, dict): out = {} for k, v in data.items(): if _is_drop_key(k): continue slim_v = _slim(v) if slim_v is None: continue out[k] = slim_v return out if out else None if isinstance(data, list): return [_slim(item) for item in data] return data def _pct(numerator: float, denominator: float) -> float: """百分比(小数),分母 0 返回 0。保留 4 位便于 AI 读取。""" if not denominator: return 0.0 return round(numerator / denominator, 4) # 日粒度异常检测参数 _ANOMALY_MIN_DAYS = 7 # 少于 7 天样本不检测(噪声太大) _ANOMALY_DEVIATION = 0.4 # 偏离"同星期均值" > 40% 标记为异常(2026-04-22 改为同星期基线) _ANOMALY_MAX_ITEMS = 10 # 最多保留 10 条(按 |偏离度| 降序截断,防 prompt 膨胀) _ANOMALY_MIN_SAME_WEEKDAY = 2 # 同星期至少 2 天样本才可作基线;不足时回退到整体均值 # 星期中文映射(0=Monday) _WEEKDAY_ZH = ("周一", "周二", "周三", "周四", "周五", "周六", "周日") # 行业基线常量(综合商业球房) # 2026-04-22:移除各类警戒线/健康区间(各球房定位/地段/业态差异大,不宜一刀切)。 # 仅保留"周中客流规律"这类行业普适的时间分布特征。 INDUSTRY_BASELINES: dict[str, Any] = { "周中客流规律": "周五至周日旺季 / 周一最淡 / 周二至周四逐步回升", } def _fetch_daily_series( site_id: int, start_date: str, end_date: str, ) -> list[tuple] | None: """查 [start, end] 日粒度财务流水,一次查完供多个分析函数复用。 返回字段顺序:(stat_date, gross, cash_in, order_count, member_order_count, confirmed) 过滤全 0 停业日;样本不足时返回 None。 """ from app.services.fdw_queries import _fdw_context from app.database import get_connection try: conn = get_connection() except Exception: logger.debug("日粒度查询连接失败", exc_info=True) return None try: with _fdw_context(conn, site_id) as cur: cur.execute( """ SELECT stat_date, COALESCE(gross_amount, 0) AS gross, COALESCE(cash_inflow_total, 0) AS cash_in, COALESCE(order_count, 0) AS order_count, COALESCE(member_order_count, 0) AS member_order_count, COALESCE(confirmed_income, 0) AS confirmed FROM app.v_dws_finance_daily_summary WHERE stat_date >= %s::date AND stat_date <= %s::date ORDER BY stat_date """, (start_date, end_date), ) rows = cur.fetchall() except Exception: logger.debug("日粒度数据查询失败: site_id=%s", site_id, exc_info=True) return None finally: try: conn.close() except Exception: pass active = [ (r[0], float(r[1]), float(r[2]), int(r[3] or 0), int(r[4] or 0), float(r[5] or 0)) for r in rows if float(r[1] or 0) > 0 or float(r[2] or 0) > 0 ] return active if active else None _WEEKDAY_MIN_DAYS = 14 # 月初场景:样本 < 14 天时,每个星期最多 1-2 天,"日均"接近单日值,不注入以免 AI 被误导 def _aggregate_by_weekday(series: list[tuple] | None) -> dict | None: """按星期聚合 7 段日均值(发生额/现金流入/订单数),供 AI 观察周中规律。 要求至少 14 天样本(保证每个星期至少有 2 天),否则返回 None; 防止月初场景下单日值被包装成"日均"迷惑 AI 做周规律判断。 """ if not series or len(series) < _WEEKDAY_MIN_DAYS: return None from collections import defaultdict buckets: dict[int, list[tuple]] = defaultdict(list) for row in series: buckets[row[0].weekday()].append(row) out: dict[str, dict] = {} for wd in range(7): rows = buckets.get(wd) or [] if not rows: continue n = len(rows) out[_WEEKDAY_ZH[wd]] = { "日均发生额": round(sum(r[1] for r in rows) / n, 2), "日均现金流入": round(sum(r[2] for r in rows) / n, 2), "日均订单数": round(sum(r[3] for r in rows) / n, 1), "营业日数": n, } return out or None def _build_unit_economics( series: list[tuple] | None, prev_series: list[tuple] | None = None, ) -> dict | None: """单位经济派生:客单价 / 日均订单数 / 会员订单占比 / 散客订单占比。 口径:全期汇总后再算(避免日均 avg 失真)。 客单价取两口径: - 按成交收入(去除优惠的真实收入单价) — 反映真实收入能力 - 按发生额(含优惠的账单均值) — 反映顾客端认知的单次消费量级 若 prev_series 可用,则附加 _环比 字段避免 AI 推测幻觉。 """ if not series: return None total_orders = sum(r[3] for r in series) if total_orders <= 0: return None total_member_orders = sum(r[4] for r in series) total_confirmed = sum(r[5] for r in series) total_gross = sum(r[1] for r in series) days = len(series) price_confirmed = total_confirmed / total_orders price_gross = total_gross / total_orders member_share = total_member_orders / total_orders daily_orders = total_orders / days out: dict[str, Any] = { "总订单数": total_orders, "日均订单数": round(daily_orders, 1), "客单价_按成交收入": round(price_confirmed, 2), "客单价_按发生额": round(price_gross, 2), "会员订单数": total_member_orders, "会员订单占比": round(member_share, 4), "散客订单数": total_orders - total_member_orders, "散客订单占比": round((total_orders - total_member_orders) / total_orders, 4), } if prev_series: prev_orders = sum(r[3] for r in prev_series) if prev_orders > 0: prev_days = len(prev_series) prev_confirmed = sum(r[5] for r in prev_series) prev_gross = sum(r[1] for r in prev_series) prev_member = sum(r[4] for r in prev_series) # 月初场景:上期样本 < 5 天时客单价环比噪声极大(单日波动主导),加标注供 AI 降权引用 low_sample = prev_days < 5 def _pct_change(cur: float, prev: float) -> str: if prev <= 0: return "无上期数据" value = f"{(cur - prev) / prev * 100:+.1f}%" return f"{value}(上期仅 {prev_days} 天,样本不足仅供参考)" if low_sample else value out["客单价_按成交收入_环比"] = _pct_change(price_confirmed, prev_confirmed / prev_orders) out["客单价_按发生额_环比"] = _pct_change(price_gross, prev_gross / prev_orders) out["日均订单数_环比"] = _pct_change(daily_orders, prev_orders / prev_days) out["会员订单占比_环比"] = _pct_change(member_share, prev_member / prev_orders) return out def _detect_anomaly_days( site_id: int, start_date: str, end_date: str, series: list[tuple] | None = None, ) -> list[dict] | None: """扫描日粒度财务数据,标记偏离同星期均值 > 40% 的异常日。 series 可由调用方传入复用,避免重复查 DB。 """ if series is None: series = _fetch_daily_series(site_id, start_date, end_date) if not series or len(series) < _ANOMALY_MIN_DAYS: return None active = series # 2026-04-22 改进:按"同星期均值"做基线,比"期均"更贴近业态(周一淡/周末旺) # 同星期样本 < _ANOMALY_MIN_SAME_WEEKDAY 天时回退到整体均值 from collections import defaultdict def _scan(idx: int, label: str) -> list[dict]: vals = [row[idx] for row in active] global_mean = sum(vals) / len(vals) if global_mean <= 0: return [] # 按 weekday 分组统计均值 by_weekday: dict[int, list[float]] = defaultdict(list) for d, *metrics in active: by_weekday[d.weekday()].append(metrics[idx - 1]) weekday_mean: dict[int, float] = { wd: (sum(xs) / len(xs)) for wd, xs in by_weekday.items() } flagged: list[dict] = [] for d, *metrics in active: v = metrics[idx - 1] wd = d.weekday() same_count = len(by_weekday.get(wd, [])) # 基线选择:同星期样本 >= 2 用同星期均值,否则用整体均值 if same_count >= _ANOMALY_MIN_SAME_WEEKDAY and weekday_mean[wd] > 0: base = weekday_mean[wd] base_label = f"同{_WEEKDAY_ZH[wd]}均值" else: base = global_mean base_label = "期均" deviation = (v - base) / base if abs(deviation) >= _ANOMALY_DEVIATION: weekday_zh = _WEEKDAY_ZH[wd] flagged.append({ "日期": f"{d} {weekday_zh}", "指标": label, "当日": round(v, 2), "基线": round(base, 2), "基线类型": base_label, "偏离": f"{deviation * 100:+.1f}%", "_abs_dev": abs(deviation), }) return flagged candidates: list[dict] = _scan(1, "发生额") + _scan(2, "现金流入") if not candidates: return None # 按绝对偏离排序,取 top N,去掉排序用辅助键 candidates.sort(key=lambda x: x["_abs_dev"], reverse=True) out = [] for c in candidates[:_ANOMALY_MAX_ITEMS]: c.pop("_abs_dev", None) out.append(c) return out def _fetch_card_balance_opening(site_id: int, start_date: str) -> float | None: """取 start_date 前一日的储值卡总余额(作为本期期初余额)。 数据源:etl 库 app.v_dws_finance_recharge_summary(每日快照,total_card_balance 字段)。 若前一日无数据(门店刚开业 / 数据缺失),返回 None。 """ from app.services.fdw_queries import _fdw_context from app.database import get_connection try: conn = get_connection() except Exception: logger.debug("期初余额查询连接失败", exc_info=True) return None try: with _fdw_context(conn, site_id) as cur: cur.execute( """ SELECT total_card_balance FROM app.v_dws_finance_recharge_summary WHERE stat_date < %s::date ORDER BY stat_date DESC LIMIT 1 """, (start_date,), ) row = cur.fetchone() except Exception: logger.debug("期初余额查询失败: site_id=%s", site_id, exc_info=True) return None finally: try: conn.close() except Exception: pass if not row or row[0] is None: return None return float(row[0]) def _aggregate_expense(expense: dict | None) -> dict | None: """从 expense 四类明细聚合出顶层金额,便于 AI 直接看四大块支出占比。""" if not isinstance(expense, dict): return None def _sum(key: str) -> float: items = expense.get(key) or [] if not isinstance(items, list): return 0.0 return round(sum(float(x.get("amount", 0) or 0) for x in items if isinstance(x, dict)), 2) total = float(expense.get("total", 0) or 0) if total <= 0: return None # 全 0 数据对 AI 无意义,直接丢 return { "合计": round(total, 2), "合计环比": expense.get("total_compare") or "持平", "运营支出": _sum("operation_items"), "固定支出": _sum("fixed_items"), "助教支出": _sum("coach_items"), "平台支出": _sum("platform_items"), } def _build_discount_kpi(revenue: dict | None, overview: dict | None) -> dict | None: """把优惠拆成顶层 KPI + 派生指标(占比、贡献率)。 AI 数据挖掘视角: - 按金额排序展示,top1 一眼看出来 - 每项带 amount / compare / share(占总优惠比) - 整体带优惠率(discount / occurrence)便于判断利润侵蚀程度 """ if not isinstance(revenue, dict): return None items = revenue.get("discount_items") or [] if not isinstance(items, list) or not items: return None total = round(sum(float(x.get("amount", 0) or 0) for x in items if isinstance(x, dict)), 2) breakdown = [] for it in items: if not isinstance(it, dict): continue amt = float(it.get("amount", 0) or 0) row: dict[str, Any] = { "名称": it.get("label"), "金额": round(amt, 2), "占总优惠": _pct(amt, total), } if it.get("compare"): row["环比"] = it["compare"] breakdown.append(row) # 按金额从大到小排序 → AI 阅读顺序 = 重要度顺序 breakdown.sort(key=lambda r: float(r.get("金额") or 0), reverse=True) overview = overview or {} occurrence = float(overview.get("occurrence", 0) or 0) kpi: dict[str, Any] = { "总优惠": total, "优惠率": _pct(total, occurrence), # 0.3796 表示 37.96% "占比排序": breakdown, } if breakdown: top = breakdown[0] kpi["最大优惠来源"] = f"{top.get('名称')}(金额 {top.get('金额')} 元,占总优惠 {int(float(top.get('占总优惠', 0))*100)}%)" return kpi def _build_cashflow_kpi(cashflow: dict | None) -> dict | None: """消费收款拆三档(纸币/线上/团购)+ 充值到账,给 AI 直接看资金来源结构。""" if not isinstance(cashflow, dict): return None consume = cashflow.get("consume_items") or [] recharge = cashflow.get("recharge_items") or [] total = float(cashflow.get("total", 0) or 0) if total <= 0: return None consume_map = {} for it in consume: if not isinstance(it, dict): continue consume_map[it.get("label")] = { "金额": round(float(it.get("amount", 0) or 0), 2), "环比": it.get("compare") or "持平", } recharge_total = round(sum(float(x.get("amount", 0) or 0) for x in recharge if isinstance(x, dict)), 2) consume_total = round(sum(float(v.get("金额", 0) or 0) for v in consume_map.values()), 2) return { "合计": round(total, 2), "合计环比": cashflow.get("total_compare") or "持平", "消费收款合计": consume_total, "消费收款占比": _pct(consume_total, total), "充值收款合计": recharge_total, "充值收款占比": _pct(recharge_total, total), "按渠道": consume_map, } def _build_coach_kpi(coach: dict | None) -> dict | None: """助教成本压缩:只保留两档的合计薪酬+合计分成+平均时薪+3 级别薪酬分布。""" if not isinstance(coach, dict): return None def _slim_tier(t: dict | None) -> dict | None: if not isinstance(t, dict): return None rows = t.get("rows") or [] # 只保留级别-薪酬-时薪 3 字段,作为分布快照 tier_dist = [ {"级别": r.get("level"), "薪酬": r.get("pay"), "时薪": r.get("hourly")} for r in rows if isinstance(r, dict) ] total_pay = float(t.get("total_pay", 0) or 0) if total_pay <= 0: return None return { "合计薪酬": round(total_pay, 2), "合计薪酬环比": t.get("total_pay_compare") or "持平", "合计分成": round(float(t.get("total_share", 0) or 0), 2), "平均时薪": round(float(t.get("avg_hourly", 0) or 0), 2), "各级别分布": tier_dist, } basic = _slim_tier(coach.get("basic")) incentive = _slim_tier(coach.get("incentive")) if not basic and not incentive: return None out: dict[str, Any] = {} if basic: out["基础助教"] = basic if incentive: out["激励助教"] = incentive # 派生:人力成本占收入比(需要收入传进来,这里只给基础值) total_pay = (basic or {}).get("合计薪酬", 0) + (incentive or {}).get("合计薪酬", 0) if total_pay > 0: out["人力薪酬合计"] = round(total_pay, 2) return out def _build_derived_ratios(overview: dict | None, cashflow_kpi: dict | None, coach_kpi: dict | None, discount_kpi: dict | None) -> dict: """数据挖掘视角:派生关键比率,让 AI 不用自己算。 - 储值卡贡献率:充值到账 / 总现金流入 - 人力成本占收入比:助教薪酬合计 / 成交收入 - 优惠侵蚀率:总优惠 / 发生额 - 现金结余率:现金结余 / 现金流入 """ ov = overview or {} confirmed = float(ov.get("confirmed_revenue", 0) or 0) occurrence = float(ov.get("occurrence", 0) or 0) cash_in = float(ov.get("cash_in", 0) or 0) cash_balance = float(ov.get("cash_balance", 0) or 0) total_pay = (coach_kpi or {}).get("人力薪酬合计", 0) recharge_in = (cashflow_kpi or {}).get("充值收款合计", 0) discount_total = (discount_kpi or {}).get("总优惠", 0) out: dict[str, Any] = {} if confirmed > 0 and total_pay: out["人力成本占成交收入比"] = _pct(total_pay, confirmed) if cash_in > 0 and recharge_in: out["储值卡充值占现金流入比"] = _pct(recharge_in, cash_in) if occurrence > 0 and discount_total: out["优惠侵蚀率"] = _pct(discount_total, occurrence) if cash_in > 0: out["现金结余率"] = _pct(cash_balance, cash_in) return out # 2026-04-22:异常检测由 AI 侧自行判断,后端只提供客观 KPI(不给规则结论) def _translate_keys(data: Any) -> Any: """递归翻译 dict/list 中所有键为中文;值保持不变。 - dict: 键命中 KEY_TRANSLATIONS 则替换,未命中保留原键 - list: 逐项递归 - 其他类型(str/int/float/bool/None)原样返回 """ if isinstance(data, dict): return { KEY_TRANSLATIONS.get(k, k): _translate_keys(v) for k, v in data.items() } if isinstance(data, list): return [_translate_keys(item) for item in data] return data async def build_prompt( context: dict, cache_svc: Any | None = None, # 兼容统一签名,App2 不用 ) -> str: """构建 App2 prompt 字符串。 Args: context: site_id, time_dimension, area(可选,默认 all) Returns: JSON 序列化后的 prompt 字符串,所有 board 数据字段已翻译为中文。 """ site_id = context["site_id"] time_dimension = context["time_dimension"] area = context.get("area", "all") board_time = DIMENSION_MAP.get(time_dimension) if not board_time: raise ValueError(f"App2 不支持的时间维度: {time_dimension}") if area not in AREA_LABELS: raise ValueError(f"App2 不支持的区域: {area}") try: board_data = await get_finance_board( time=board_time, area=area, compare=1, site_id=site_id, ) except Exception: logger.warning( "App2 财务看板查询失败: site_id=%s dimension=%s area=%s", site_id, time_dimension, area, exc_info=True, ) board_data = {} # 2026-04-22 数据挖掘视角 prompt 结构化: # - 优惠/现金流/助教/支出 四大领域分别派生 KPI(带占比/排序/派生指标) # - 异常检测:规则法标注 AI 必看异常点 # - 派生比率:人力成本占比/优惠侵蚀率/储值卡贡献率 等不用 AI 再算 # - 原始财务数据经 _slim 裁剪后作为"原始指标"补充,避免 AI 失去追溯能力 overview = board_data.get("overview") if isinstance(board_data, dict) else None revenue = board_data.get("revenue") if isinstance(board_data, dict) else None cashflow = board_data.get("cashflow") if isinstance(board_data, dict) else None expense = board_data.get("expense") if isinstance(board_data, dict) else None coach = board_data.get("coach_analysis") if isinstance(board_data, dict) else None discount_kpi = _build_discount_kpi(revenue, overview) cashflow_kpi = _build_cashflow_kpi(cashflow) expense_kpi = _aggregate_expense(expense) coach_kpi = _build_coach_kpi(coach) ratios = _build_derived_ratios(overview, cashflow_kpi, coach_kpi, discount_kpi) # 原始数据:slim 后再翻译,供 AI 追溯细节 slim_data = _slim(board_data) or {} raw_cn = _translate_keys(slim_data) # 对比口径说明:当期/对比期均为"同天数对齐",避免 AI 把环比误读为"当期部分 vs 上期整月" compare_caliber: dict[str, Any] | None = None try: from app.services.runtime_context import get_runtime_context runtime_ctx = get_runtime_context(site_id) cur_start, cur_end = _calc_date_range(board_time, ref_date=runtime_ctx.business_date) prev_start, prev_end = _calc_prev_range(board_time, cur_start, cur_end) cur_days = (cur_end - cur_start).days + 1 prev_days = (prev_end - prev_start).days + 1 compare_caliber = { "当期范围": f"{cur_start} ~ {cur_end}({cur_days} 天)", "对比期范围": f"{prev_start} ~ {prev_end}({prev_days} 天)", "对齐方式": "上期同天数对齐(非整月/整周对比)", "说明": "所有 _环比 / _compare 字段均按上表口径计算;月中调用时对比期会自动截断到与当期相同天数", } except Exception: logger.debug("对比口径字段生成失败(不影响主流程)", exc_info=True) payload: dict[str, Any] = { "当前时间": get_runtime_context(site_id).business_now.strftime("%Y-%m-%d %H:%M"), "门店编号": site_id, "时间维度": DIMENSION_LABELS.get(time_dimension, time_dimension), "区域": AREA_LABELS.get(area, area), # 0. 对比口径:让 AI 正确解读环比字段 **({"对比口径": compare_caliber} if compare_caliber else {}), # 1. 核心 KPI:AI 洞察首要依据 "核心KPI": { "发生额": float(overview.get("occurrence", 0)) if overview else 0, "发生额环比": (overview or {}).get("occurrence_compare") or "持平", "成交收入": float(overview.get("confirmed_revenue", 0)) if overview else 0, "成交收入环比": (overview or {}).get("confirmed_revenue_compare") or "持平", "现金流入": (overview or {}).get("cash_in"), "现金流入环比": (overview or {}).get("cash_in_compare") or "持平", "现金结余": (overview or {}).get("cash_balance"), "现金结余环比": (overview or {}).get("cash_balance_compare") or "持平", }, # 2. 派生比率:不用 AI 再算 "派生比率": ratios, } # 3. 优惠构成(带排序/占比/环比/最大来源提示) if discount_kpi: payload["优惠构成"] = discount_kpi # 4. 现金流入来源分布 if cashflow_kpi: payload["现金流入来源"] = cashflow_kpi # 5. 支出概况(聚合到四大类,total=0 则不给 AI) if expense_kpi: payload["支出概况"] = expense_kpi # 6. 助教成本画像 if coach_kpi: payload["助教成本"] = coach_kpi # 7. 储值卡余额变化:期初 + 期末 + 充值 + 消耗 + 其他调整(揭示"充值-消耗≠余额变化"的差异) # 避免 AI 在只看当期充值/消耗时对"余额为何涨"的矛盾自圆其说 if area == "all" and isinstance(recharge := board_data.get("recharge"), dict): try: start_date_obj, _end = _calc_date_range(board_time) opening = _fetch_card_balance_opening(site_id, str(start_date_obj)) closing = float(recharge.get("card_balance") or 0) period_recharge = float(recharge.get("actual_income") or 0) period_consume = float(recharge.get("consumed") or 0) if opening is not None and (opening > 0 or closing > 0): diff = closing - opening other_adj = round(diff - (period_recharge - period_consume), 2) payload["储值卡余额变化"] = { "期初余额": round(opening, 2), "期末余额": round(closing, 2), "余额变化": round(diff, 2), "本期充值": round(period_recharge, 2), "本期消耗": round(period_consume, 2), "其他调整": other_adj, # 含过期/赠送/退款/手动调整,非 0 时 AI 需要关注 } except Exception: logger.debug("储值卡余额变化注入失败", exc_info=True) # 8. 日粒度派生(仅 area=all,样本 ≥ 7 天):一次 DB 查询,三段派生 # - 单位经济:客单价/订单数/会员占比(含环比,避免 AI 对客单走势推测幻觉) # - 按星期聚合:供 E 板块做周中规律宏观洞察 # - 日粒度异常:同星期均值基线下的极端偏离 if area == "all": try: start_date, end_date = _calc_date_range(board_time) series = _fetch_daily_series(site_id, str(start_date), str(end_date)) # 上期序列(用于客单价环比) prev_series: list[tuple] | None = None try: prev_start, prev_end = _calc_prev_range(board_time, start_date, end_date) prev_series = _fetch_daily_series(site_id, str(prev_start), str(prev_end)) except Exception: logger.debug("上期 series 查询失败,客单价环比字段将省略", exc_info=True) if series: unit_econ = _build_unit_economics(series, prev_series=prev_series) if unit_econ: payload["单位经济"] = unit_econ by_weekday = _aggregate_by_weekday(series) if by_weekday: payload["按星期聚合"] = by_weekday anomalies = _detect_anomaly_days( site_id, str(start_date), str(end_date), series=series, ) if anomalies: payload["日粒度异常"] = anomalies except Exception: logger.debug("日粒度派生字段注入失败(不影响主流程)", exc_info=True) # 9. 行业基线:AI 判断是否超警戒线的参照 payload["行业基线"] = INDUSTRY_BASELINES # 10. 原始财务数据:供 AI 追溯(大部分 prompt 长度来自这里,已 slim) payload["原始指标"] = raw_cn if not board_data: payload["数据缺失提示"] = "财务看板数据获取失败,请基于已有缓存或常识分析" return json.dumps(payload, ensure_ascii=False, default=str)