""" JWT 令牌生成、验证与解码。 - access_token:短期有效(默认 30 分钟),用于 API 请求认证 - refresh_token:长期有效(默认 7 天),用于刷新 access_token - payload 包含 user_id、site_id、令牌类型(access / refresh) - 密码哈希直接使用 bcrypt 库(passlib 与 bcrypt>=4.1 存在兼容性问题) """ from datetime import datetime, timedelta, timezone import bcrypt from jose import JWTError, jwt from app import config def verify_password(plain_password: str, hashed_password: str) -> bool: """校验明文密码与哈希是否匹配。""" return bcrypt.checkpw( plain_password.encode("utf-8"), hashed_password.encode("utf-8") ) def hash_password(password: str) -> str: """生成密码的 bcrypt 哈希。""" return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def create_access_token( user_id: int, site_id: int, roles: list[str] | None = None, audience: str | None = None, ) -> str: """ 生成 access_token。 payload: sub=user_id, site_id, roles, type=access, exp, aud(可选) roles / audience 参数默认 None,保持向后兼容。 新增 audience 参数(P0-5 致命 2 修复): - admin-web 端登录传 audience="admin" - 小程序登录传 audience="miniapp" - tenant-admin 在自己的 router 内手动签发,不走本函数 """ expire = datetime.now(timezone.utc) + timedelta( minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES ) payload: dict = { "sub": str(user_id), "site_id": site_id, "type": "access", "exp": expire, } if roles is not None: payload["roles"] = roles if audience is not None: payload["aud"] = audience return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM) def create_refresh_token( user_id: int, site_id: int, audience: str | None = None, ) -> str: """ 生成 refresh_token。 payload: sub=user_id, site_id, type=refresh, exp, aud(可选) """ expire = datetime.now(timezone.utc) + timedelta( days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS ) payload: dict = { "sub": str(user_id), "site_id": site_id, "type": "refresh", "exp": expire, } if audience is not None: payload["aud"] = audience return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM) def create_token_pair( user_id: int, site_id: int, roles: list[str] | None = None, audience: str | None = None, ) -> dict[str, str]: """生成 access_token + refresh_token 令牌对。 audience 参数:admin-web 传 "admin";小程序传 "miniapp"。 """ return { "access_token": create_access_token(user_id, site_id, roles=roles, audience=audience), "refresh_token": create_refresh_token(user_id, site_id, audience=audience), "token_type": "bearer", } def create_limited_token_pair( user_id: int, audience: str | None = None, ) -> dict[str, str]: """ 为 pending 用户签发受限令牌。 payload 不含 site_id 和 roles,仅包含 user_id + type + limited=True + aud(可选)。 受限令牌仅允许访问申请提交和状态查询端点。 audience 参数:小程序 pending 用户传 "miniapp"。 """ now = datetime.now(timezone.utc) access_payload: dict = { "sub": str(user_id), "type": "access", "limited": True, "exp": now + timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES), } refresh_payload: dict = { "sub": str(user_id), "type": "refresh", "limited": True, "exp": now + timedelta(days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS), } if audience is not None: access_payload["aud"] = audience refresh_payload["aud"] = audience return { "access_token": jwt.encode( access_payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM ), "refresh_token": jwt.encode( refresh_payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM ), "token_type": "bearer", } def decode_token(token: str, audience: str | None = None) -> dict: """ 解码并验证 JWT 令牌。 返回 payload dict,包含 sub、site_id、type、exp、aud(可选)。 令牌无效或过期时抛出 JWTError。 audience 参数(P0-5 致命 2 修复): - 传入时强制校验 token 的 aud 字段,不匹配抛 JWTError - 不传时,如果 token 含 aud 字段 jose 会拒绝(因此默认 options 关闭 aud 校验) - 旧 token(无 aud)兼容:不传 audience 时通过 options 关闭 aud 校验,放行 """ try: if audience is not None: payload = jwt.decode( token, config.JWT_SECRET_KEY, algorithms=[config.JWT_ALGORITHM], audience=audience, ) else: # 兼容:不强制 aud 校验(旧 token 与新 token 都能解码) payload = jwt.decode( token, config.JWT_SECRET_KEY, algorithms=[config.JWT_ALGORITHM], options={"verify_aud": False}, ) return payload except JWTError: raise def decode_access_token(token: str, audience: str | None = None) -> dict: """ 解码 access_token 并验证类型。 令牌类型不是 access 时抛出 JWTError。 audience 参数:见 decode_token。 """ payload = decode_token(token, audience=audience) if payload.get("type") != "access": raise JWTError("令牌类型不是 access") return payload def decode_refresh_token(token: str, audience: str | None = None) -> dict: """ 解码 refresh_token 并验证类型。 令牌类型不是 refresh 时抛出 JWTError。 audience 参数:见 decode_token。 """ payload = decode_token(token, audience=audience) if payload.get("type") != "refresh": raise JWTError("令牌类型不是 refresh") return payload