""" JWT 令牌生成、验证与解码。 - access_token:短期有效(默认 30 分钟),用于 API 请求认证 - refresh_token:长期有效(默认 7 天),用于刷新 access_token - payload 包含 user_id、site_id、令牌类型(access / refresh) - 密码哈希直接使用 bcrypt 库(passlib 与 bcrypt>=4.1 存在兼容性问题) """ from datetime import datetime, timedelta, timezone import bcrypt from jose import JWTError, jwt from app import config def verify_password(plain_password: str, hashed_password: str) -> bool: """校验明文密码与哈希是否匹配。""" return bcrypt.checkpw( plain_password.encode("utf-8"), hashed_password.encode("utf-8") ) def hash_password(password: str) -> str: """生成密码的 bcrypt 哈希。""" return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def create_access_token(user_id: int, site_id: int) -> str: """ 生成 access_token。 payload: sub=user_id, site_id, type=access, exp """ expire = datetime.now(timezone.utc) + timedelta( minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES ) payload = { "sub": str(user_id), "site_id": site_id, "type": "access", "exp": expire, } return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM) def create_refresh_token(user_id: int, site_id: int) -> str: """ 生成 refresh_token。 payload: sub=user_id, site_id, type=refresh, exp """ expire = datetime.now(timezone.utc) + timedelta( days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS ) payload = { "sub": str(user_id), "site_id": site_id, "type": "refresh", "exp": expire, } return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM) def create_token_pair(user_id: int, site_id: int) -> dict[str, str]: """生成 access_token + refresh_token 令牌对。""" return { "access_token": create_access_token(user_id, site_id), "refresh_token": create_refresh_token(user_id, site_id), "token_type": "bearer", } def decode_token(token: str) -> dict: """ 解码并验证 JWT 令牌。 返回 payload dict,包含 sub、site_id、type、exp。 令牌无效或过期时抛出 JWTError。 """ try: payload = jwt.decode( token, config.JWT_SECRET_KEY, algorithms=[config.JWT_ALGORITHM] ) return payload except JWTError: raise def decode_access_token(token: str) -> dict: """ 解码 access_token 并验证类型。 令牌类型不是 access 时抛出 JWTError。 """ payload = decode_token(token) if payload.get("type") != "access": raise JWTError("令牌类型不是 access") return payload def decode_refresh_token(token: str) -> dict: """ 解码 refresh_token 并验证类型。 令牌类型不是 refresh 时抛出 JWTError。 """ payload = decode_token(token) if payload.get("type") != "refresh": raise JWTError("令牌类型不是 refresh") return payload