Files
Neo-ZQYY/apps/backend/app/auth/jwt.py

113 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
JWT 令牌生成、验证与解码。
- access_token短期有效默认 30 分钟),用于 API 请求认证
- refresh_token长期有效默认 7 天),用于刷新 access_token
- payload 包含 user_id、site_id、令牌类型access / refresh
- 密码哈希直接使用 bcrypt 库passlib 与 bcrypt>=4.1 存在兼容性问题)
"""
from datetime import datetime, timedelta, timezone
import bcrypt
from jose import JWTError, jwt
from app import config
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""校验明文密码与哈希是否匹配。"""
return bcrypt.checkpw(
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
)
def hash_password(password: str) -> str:
"""生成密码的 bcrypt 哈希。"""
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def create_access_token(user_id: int, site_id: int) -> str:
"""
生成 access_token。
payload: sub=user_id, site_id, type=access, exp
"""
expire = datetime.now(timezone.utc) + timedelta(
minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES
)
payload = {
"sub": str(user_id),
"site_id": site_id,
"type": "access",
"exp": expire,
}
return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
def create_refresh_token(user_id: int, site_id: int) -> str:
"""
生成 refresh_token。
payload: sub=user_id, site_id, type=refresh, exp
"""
expire = datetime.now(timezone.utc) + timedelta(
days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS
)
payload = {
"sub": str(user_id),
"site_id": site_id,
"type": "refresh",
"exp": expire,
}
return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
def create_token_pair(user_id: int, site_id: int) -> dict[str, str]:
"""生成 access_token + refresh_token 令牌对。"""
return {
"access_token": create_access_token(user_id, site_id),
"refresh_token": create_refresh_token(user_id, site_id),
"token_type": "bearer",
}
def decode_token(token: str) -> dict:
"""
解码并验证 JWT 令牌。
返回 payload dict包含 sub、site_id、type、exp。
令牌无效或过期时抛出 JWTError。
"""
try:
payload = jwt.decode(
token, config.JWT_SECRET_KEY, algorithms=[config.JWT_ALGORITHM]
)
return payload
except JWTError:
raise
def decode_access_token(token: str) -> dict:
"""
解码 access_token 并验证类型。
令牌类型不是 access 时抛出 JWTError。
"""
payload = decode_token(token)
if payload.get("type") != "access":
raise JWTError("令牌类型不是 access")
return payload
def decode_refresh_token(token: str) -> dict:
"""
解码 refresh_token 并验证类型。
令牌类型不是 refresh 时抛出 JWTError。
"""
payload = decode_token(token)
if payload.get("type") != "refresh":
raise JWTError("令牌类型不是 refresh")
return payload