Files

98 lines
2.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证路由:登录与令牌刷新。
- POST /api/auth/login — 验证用户名密码,返回 JWT 令牌对
- POST /api/auth/refresh — 用刷新令牌换取新的访问令牌
"""
import logging
from fastapi import APIRouter, HTTPException, status
from jose import JWTError
from app.auth.jwt import (
create_access_token,
create_token_pair,
decode_refresh_token,
verify_password,
)
from app.database import get_connection
from app.schemas.auth import LoginRequest, RefreshRequest, TokenResponse
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/auth", tags=["认证"])
@router.post("/login", response_model=TokenResponse)
async def login(body: LoginRequest):
"""
用户登录。
查询 admin_users 表验证用户名密码,成功后返回 JWT 令牌对。
- 用户不存在或密码错误401
- 账号已禁用is_active=false401
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
"SELECT id, password_hash, site_id, is_active "
"FROM admin_users WHERE username = %s",
(body.username,),
)
row = cur.fetchone()
finally:
conn.close()
if row is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
)
user_id, password_hash, site_id, is_active = row
if not is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="账号已被禁用",
)
if not verify_password(body.password, password_hash):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
)
tokens = create_token_pair(user_id, site_id)
return TokenResponse(**tokens)
@router.post("/refresh", response_model=TokenResponse)
async def refresh(body: RefreshRequest):
"""
刷新访问令牌。
验证 refresh_token 有效性,成功后仅返回新的 access_token
refresh_token 保持不变,由客户端继续持有)。
"""
try:
payload = decode_refresh_token(body.refresh_token)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的刷新令牌",
)
user_id = int(payload["sub"])
site_id = payload["site_id"]
# 生成新的 access_tokenrefresh_token 原样返回
new_access = create_access_token(user_id, site_id)
return TokenResponse(
access_token=new_access,
refresh_token=body.refresh_token,
token_type="bearer",
)