Files
Neo-ZQYY/apps/backend/app/routers/tenant_clues.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

313 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
租户管理后台 — 维客线索管理路由。
端点清单:
- GET /api/tenant/customers/search — 客户搜索keyword + site_id
- GET /api/tenant/customers/{member_id}/clues — 线索列表source / is_hidden 筛选)
- PATCH /api/tenant/clues/{id} — 编辑线索
- DELETE /api/tenant/clues/{id} — 物理删除线索
- PATCH /api/tenant/clues/{id}/visibility — 切换隐藏/显示
需求: 9.1-9.4, 10.1, 11.1-11.3, 12.2-12.3, 13.1-13.4
AI_CHANGELOG
- 2026-03-23 21:00:00 | Prompt: P20260323-210000根治 tenant_admin managed_site_ids 限制)| Direct causeJWT managed_site_ids 静态签发,新建店铺后所有端点受限 | Summary_get_clue_with_site_check 签名改为接受 admin: CurrentTenantAdminsearch_customers 用 get_effective_site_idslist_customer_clues 用 site_filter_clause(admin=admin);三个调用点改传 admin | Verify维客线索管理覆盖新建店铺
"""
from __future__ import annotations
import logging
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, status
from app.auth.tenant_admins import (
CurrentTenantAdmin,
get_effective_site_ids,
require_tenant_admin,
site_filter_clause,
)
from app.database import get_connection, get_etl_readonly_connection
from app.schemas.tenant_clues import (
ClueEditRequest,
ClueListItem,
ClueVisibilityRequest,
CustomerSearchItem,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/tenant", tags=["维客线索管理"])
def _mask_mobile(mobile: str | None) -> str | None:
"""手机号脱敏:中间 4 位替换为 ****,如 138****1234。"""
if not mobile or len(mobile) < 7:
return mobile
return mobile[:3] + "****" + mobile[7:]
def _get_clue_with_site_check(clue_id: int, admin: CurrentTenantAdmin):
"""
查询线索并校验 site_id 是否在管辖范围内。
不在管辖范围或不存在均返回 404避免泄露线索存在性
返回 (id, site_id, member_id, category, summary, detail,
recorded_by_name, source, recorded_at, is_hidden)。
"""
site_sql, site_params = site_filter_clause(admin=admin)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
f"""
SELECT id, site_id, member_id, category, summary, detail,
recorded_by_name, source, recorded_at::text, is_hidden
FROM public.member_retention_clue
WHERE id = %s AND {site_sql}
""",
(clue_id, *site_params),
)
row = cur.fetchone()
finally:
conn.close()
if row is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="线索不存在")
return row
# ── GET /api/tenant/customers/search ──────────────────────
@router.get("/customers/search")
async def search_customers(
keyword: str = Query(..., min_length=1, description="搜索关键词(姓名模糊/手机号精确)"),
site_id: Optional[int] = Query(None, description="指定门店 ID 筛选"),
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""
客户搜索:在管辖门店范围内搜索 v_dim_member。
nickname 模糊匹配 OR mobile 精确匹配scd2_is_current=1。
手机号脱敏返回。
"""
# 确定要搜索的门店列表
# [CHANGE P20260323-210000] intent: 使用 get_effective_site_ids 统一获取有效 site_ids
effective_ids = get_effective_site_ids(admin)
if site_id is not None:
if site_id not in effective_ids:
return {"items": []}
search_site_ids = [site_id]
else:
search_site_ids = effective_ids
if not search_site_ids:
return {"items": []}
# 逐 site_id 查询 FDWRLS 要求逐个设置 current_site_id
all_items: list[dict] = []
for sid in search_site_ids:
try:
etl_conn = get_etl_readonly_connection(sid)
try:
with etl_conn.cursor() as cur:
cur.execute(
"""
SELECT member_id, nickname, mobile
FROM fdw_etl.v_dim_member
WHERE scd2_is_current = 1
AND (nickname ILIKE %s OR mobile = %s)
LIMIT 50
""",
(f"%{keyword}%", keyword),
)
for row in cur.fetchall():
all_items.append(
CustomerSearchItem(
member_id=row[0],
nickname=row[1],
mobile_masked=_mask_mobile(row[2]),
site_id=sid,
).model_dump(by_alias=True)
)
finally:
etl_conn.close()
except Exception:
logger.warning("v_dim_member 搜索失败site_id=%s", sid, exc_info=True)
# 补充 site_name
if all_items:
site_ids_set = list({item.get("siteId") for item in all_items if item.get("siteId")})
if site_ids_set:
conn = get_connection()
try:
with conn.cursor() as cur:
placeholders = ", ".join(["%s"] * len(site_ids_set))
cur.execute(
f"SELECT site_id, site_name FROM biz.sites WHERE site_id IN ({placeholders})",
tuple(site_ids_set),
)
site_name_map = {r[0]: r[1] for r in cur.fetchall()}
finally:
conn.close()
for item in all_items:
sid_val = item.get("siteId")
if sid_val and sid_val in site_name_map:
item["siteName"] = site_name_map[sid_val]
return {"items": all_items}
# ── GET /api/tenant/customers/{member_id}/clues ───────────
@router.get("/customers/{member_id}/clues")
async def list_customer_clues(
member_id: int,
source: Optional[str] = Query(None, description="按来源筛选manual/ai_consumption/ai_note"),
is_hidden: Optional[bool] = Query(None, description="按隐藏状态筛选"),
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""返回该客户在管辖门店范围内的全部线索,支持 source 和 is_hidden 筛选。"""
site_sql, site_params = site_filter_clause(admin=admin)
conn = get_connection()
try:
with conn.cursor() as cur:
where_parts = [f"{site_sql}", "member_id = %s"]
params: list = list(site_params) + [member_id]
if source is not None:
where_parts.append("source = %s")
params.append(source)
if is_hidden is not None:
where_parts.append("is_hidden = %s")
params.append(is_hidden)
where_clause = " AND ".join(where_parts)
cur.execute(
f"""
SELECT id, category, summary, detail,
recorded_by_name, source, recorded_at::text, is_hidden
FROM public.member_retention_clue
WHERE {where_clause}
ORDER BY recorded_at DESC
""",
tuple(params),
)
rows = cur.fetchall()
finally:
conn.close()
items = [
ClueListItem(
id=r[0], category=r[1], summary=r[2], detail=r[3],
recorded_by_name=r[4], source=r[5], recorded_at=r[6], is_hidden=r[7],
).model_dump(by_alias=True)
for r in rows
]
return {"items": items}
# ── PATCH /api/tenant/clues/{id} ──────────────────────────
@router.patch("/clues/{clue_id}")
async def edit_clue(
clue_id: int,
body: ClueEditRequest,
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""编辑线索 category/summary/detail。校验 category 枚举和 summary 长度。"""
# 先校验线索存在且在管辖范围内
_get_clue_with_site_check(clue_id, admin)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE public.member_retention_clue
SET category = %s, summary = %s, detail = %s
WHERE id = %s
""",
(body.category.value, body.summary, body.detail, clue_id),
)
conn.commit()
except Exception:
conn.rollback()
logger.error("编辑线索失败clue_id=%s", clue_id, exc_info=True)
raise HTTPException(status_code=500, detail="编辑操作失败")
finally:
conn.close()
return {"message": "更新成功"}
# ── DELETE /api/tenant/clues/{id} ─────────────────────────
@router.delete("/clues/{clue_id}")
async def delete_clue(
clue_id: int,
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""物理删除线索。线索不存在或不在管辖范围返回 404。"""
_get_clue_with_site_check(clue_id, admin)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM public.member_retention_clue WHERE id = %s",
(clue_id,),
)
conn.commit()
except Exception:
conn.rollback()
logger.error("删除线索失败clue_id=%s", clue_id, exc_info=True)
raise HTTPException(status_code=500, detail="删除操作失败")
finally:
conn.close()
return {"message": "删除成功"}
# ── PATCH /api/tenant/clues/{id}/visibility ───────────────
@router.patch("/clues/{clue_id}/visibility")
async def toggle_clue_visibility(
clue_id: int,
body: ClueVisibilityRequest,
admin: CurrentTenantAdmin = Depends(require_tenant_admin),
):
"""切换线索 is_hidden 状态。"""
_get_clue_with_site_check(clue_id, admin)
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE public.member_retention_clue
SET is_hidden = %s
WHERE id = %s
""",
(body.is_hidden, clue_id),
)
conn.commit()
except Exception:
conn.rollback()
logger.error("切换线索可见性失败clue_id=%s", clue_id, exc_info=True)
raise HTTPException(status_code=500, detail="操作失败")
finally:
conn.close()
return {"message": "更新成功"}