Files
Neo-ZQYY/apps/backend/app/routers/tenant_auth.py
Neo 6f8f12314f feat: 累积功能变更 — 聊天集成、租户管理、小程序更新、ETL 增强、迁移脚本
包含多个会话的累积代码变更:
- backend: AI 聊天服务、触发器调度、认证增强、WebSocket、调度器最小间隔
- admin-web: ETL 状态页、任务管理、调度配置、登录优化
- miniprogram: 看板页面、聊天集成、UI 组件、导航更新
- etl: DWS 新任务(finance_area_daily/board_cache)、连接器增强
- tenant-admin: 项目初始化
- db: 19 个迁移脚本(etl_feiqiu 11 + zqyy_app 8)
- packages/shared: 枚举和工具函数更新
- tools: 数据库工具、报表生成、健康检查
- docs: PRD/架构/部署/合约文档更新

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-06 00:03:48 +08:00

248 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
租户管理员认证路由:登录与令牌刷新。
- POST /api/tenant/auth/login — 用户名+密码验证,签发 JWTaud=tenant-admin
- POST /api/tenant/auth/refresh — 刷新令牌换取新令牌对
JWT payload 包含sub=admin_id, tenant_id, managed_site_ids, aud=tenant-admin, type
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException, status
from jose import JWTError, jwt as jose_jwt
from pydantic import BaseModel, Field
from app import config
from app.auth.jwt import verify_password
from app.database import get_connection
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/tenant/auth", tags=["租户认证"])
# ── Pydantic 模型 ────────────────────────────────────────────
class TenantLoginRequest(BaseModel):
"""租户管理员登录请求。"""
username: str = Field(..., min_length=1, max_length=100, description="用户名")
password: str = Field(..., min_length=1, description="密码")
class TenantRefreshRequest(BaseModel):
"""刷新令牌请求。"""
refresh_token: str = Field(..., min_length=1, description="刷新令牌")
class TenantTokenResponse(BaseModel):
"""令牌响应。"""
access_token: str
refresh_token: str
token_type: str = "bearer"
# ── JWT 签发(租户管理员专用,含 aud=tenant-admin ──────────
def _create_tenant_access_token(
admin_id: int,
tenant_id: int,
managed_site_ids: list[int],
admin_type: str = "tenant_admin",
display_name: str | None = None,
) -> str:
"""签发租户管理员 access_tokenaud=tenant-admin"""
expire = datetime.now(timezone.utc) + timedelta(
minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES
)
payload: dict = {
"sub": str(admin_id),
"tenant_id": tenant_id,
"managed_site_ids": managed_site_ids,
"admin_type": admin_type,
"aud": "tenant-admin",
"type": "access",
"exp": expire,
}
if display_name is not None:
payload["display_name"] = display_name
return jose_jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
def _create_tenant_refresh_token(
admin_id: int,
tenant_id: int,
managed_site_ids: list[int],
admin_type: str = "tenant_admin",
) -> str:
"""签发租户管理员 refresh_tokenaud=tenant-admin"""
expire = datetime.now(timezone.utc) + timedelta(
days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS
)
payload: dict = {
"sub": str(admin_id),
"tenant_id": tenant_id,
"managed_site_ids": managed_site_ids,
"admin_type": admin_type,
"aud": "tenant-admin",
"type": "refresh",
"exp": expire,
}
return jose_jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
# ── 路由端点 ─────────────────────────────────────────────────
@router.post("/login", response_model=TenantTokenResponse)
async def tenant_login(body: TenantLoginRequest):
"""
租户管理员登录。
查询 auth.tenant_admins 表验证用户名密码,成功后签发 JWT 令牌对。
- 用户不存在或密码错误401统一消息不区分
- 账号已禁用is_active=false403
- 登录成功:更新 last_login_at
"""
conn = get_connection()
try:
with conn.cursor() as cur:
# CHANGE 2026-03-22 | Prompt: 删除与禁用分离 | 过滤已删除记录
# CHANGE 2026-03-23 | Prompt: 登录用户名大小写不敏感 | LOWER() 比较
cur.execute(
"SELECT id, password_hash, display_name, tenant_id, "
"managed_site_ids, is_active, admin_type "
"FROM auth.tenant_admins "
"WHERE LOWER(username) = LOWER(%s) AND deleted_at IS NULL",
(body.username,),
)
row = cur.fetchone()
finally:
conn.close()
# 用户不存在 → 401统一消息
if row is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
)
admin_id, password_hash, display_name, tenant_id, managed_site_ids, is_active, admin_type = row
# 账号禁用 → 403
if not is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="账号已被禁用",
)
# 密码错误 → 401统一消息
if not verify_password(body.password, password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
)
# 登录成功:更新 last_login_at
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"UPDATE auth.tenant_admins SET last_login_at = NOW() WHERE id = %s",
(admin_id,),
)
conn.commit()
except Exception:
logger.warning("更新 last_login_at 失败admin_id=%s", admin_id, exc_info=True)
finally:
conn.close()
# 签发令牌对
access_token = _create_tenant_access_token(
admin_id=admin_id,
tenant_id=tenant_id,
managed_site_ids=managed_site_ids,
admin_type=admin_type,
display_name=display_name,
)
refresh_token = _create_tenant_refresh_token(
admin_id=admin_id,
tenant_id=tenant_id,
managed_site_ids=managed_site_ids,
admin_type=admin_type,
)
return TenantTokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
)
@router.post("/refresh", response_model=TenantTokenResponse)
async def tenant_refresh(body: TenantRefreshRequest):
"""
刷新租户管理员令牌。
验证 refresh_tokenaud=tenant-admin, type=refresh签发新令牌对。
"""
try:
payload = jose_jwt.decode(
body.refresh_token,
config.JWT_SECRET_KEY,
algorithms=[config.JWT_ALGORITHM],
audience="tenant-admin",
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的刷新令牌",
)
# 验证 token type
if payload.get("type") != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="令牌类型不匹配",
)
# 验证 audjose 在 aud 缺失时不会拒绝,需显式检查)
if payload.get("aud") != "tenant-admin":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="令牌类型不匹配",
)
# 提取字段
admin_id = int(payload["sub"])
tenant_id = payload["tenant_id"]
managed_site_ids = payload["managed_site_ids"]
admin_type = payload.get("admin_type", "tenant_admin")
# 签发新令牌对
access_token = _create_tenant_access_token(
admin_id=admin_id,
tenant_id=tenant_id,
managed_site_ids=managed_site_ids,
admin_type=admin_type,
display_name=payload.get("display_name"),
)
refresh_token = _create_tenant_refresh_token(
admin_id=admin_id,
tenant_id=tenant_id,
managed_site_ids=managed_site_ids,
admin_type=admin_type,
)
return TenantTokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
)