Files
Neo-ZQYY/apps/backend/app/auth/jwt.py
Neo 17f045a89e fix(backend): Wave 1 Day 1 三个 P0 D Bug 修复
- W1-T3 修 4 处 fdw_etl.* 必坏残留 → app.* (P0-5 致命 1)
  · tenant_users.py L431/L456-457: v_dim_assistant + v_dim_staff(_ex)
  · tenant_excel.py L394/L411: v_dim_assistant + v_dim_staff
  · tenant_clues.py L119: v_dim_member
  · 修复后 tenant-admin 用户审核 / Excel 上传 / 维客线索恢复正常

- W1-T4 JWT aud sign 端写入 (P0-5 致命 2 最小止血)
  · jwt.py 全部 token 创建/解码函数加 audience 参数
  · auth.py admin 端加 audience="admin"
  · xcx_auth.py miniapp 端加 audience="miniapp" (8 处调用)
  · 18 router 切强制 aud 校验留 Wave 2

- W1-T5 DBViewer 白名单 + 黑名单双保险 (P0-8)
  · 白名单: SELECT/WITH/EXPLAIN/SHOW 开头
  · 黑名单: 17 关键词覆盖全 DML/DDL/DCL
  · 注释剥离避免误伤;15/15 单测 PASS

参考: docs/audit/changes/2026-05-04__wave1_day1_d_bug_triple_fix.md
2026-05-04 07:36:20 +08:00

198 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
JWT 令牌生成、验证与解码。
- access_token短期有效默认 30 分钟),用于 API 请求认证
- refresh_token长期有效默认 7 天),用于刷新 access_token
- payload 包含 user_id、site_id、令牌类型access / refresh
- 密码哈希直接使用 bcrypt 库passlib 与 bcrypt>=4.1 存在兼容性问题)
"""
from datetime import datetime, timedelta, timezone
import bcrypt
from jose import JWTError, jwt
from app import config
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""校验明文密码与哈希是否匹配。"""
return bcrypt.checkpw(
plain_password.encode("utf-8"), hashed_password.encode("utf-8")
)
def hash_password(password: str) -> str:
"""生成密码的 bcrypt 哈希。"""
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def create_access_token(
user_id: int,
site_id: int,
roles: list[str] | None = None,
audience: str | None = None,
) -> str:
"""
生成 access_token。
payload: sub=user_id, site_id, roles, type=access, exp, aud(可选)
roles / audience 参数默认 None,保持向后兼容。
新增 audience 参数(P0-5 致命 2 修复):
- admin-web 端登录传 audience="admin"
- 小程序登录传 audience="miniapp"
- tenant-admin 在自己的 router 内手动签发,不走本函数
"""
expire = datetime.now(timezone.utc) + timedelta(
minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES
)
payload: dict = {
"sub": str(user_id),
"site_id": site_id,
"type": "access",
"exp": expire,
}
if roles is not None:
payload["roles"] = roles
if audience is not None:
payload["aud"] = audience
return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
def create_refresh_token(
user_id: int,
site_id: int,
audience: str | None = None,
) -> str:
"""
生成 refresh_token。
payload: sub=user_id, site_id, type=refresh, exp, aud(可选)
"""
expire = datetime.now(timezone.utc) + timedelta(
days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS
)
payload: dict = {
"sub": str(user_id),
"site_id": site_id,
"type": "refresh",
"exp": expire,
}
if audience is not None:
payload["aud"] = audience
return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
def create_token_pair(
user_id: int,
site_id: int,
roles: list[str] | None = None,
audience: str | None = None,
) -> dict[str, str]:
"""生成 access_token + refresh_token 令牌对。
audience 参数:admin-web 传 "admin";小程序传 "miniapp"
"""
return {
"access_token": create_access_token(user_id, site_id, roles=roles, audience=audience),
"refresh_token": create_refresh_token(user_id, site_id, audience=audience),
"token_type": "bearer",
}
def create_limited_token_pair(
user_id: int,
audience: str | None = None,
) -> dict[str, str]:
"""
为 pending 用户签发受限令牌。
payload 不含 site_id 和 roles,仅包含 user_id + type + limited=True + aud(可选)。
受限令牌仅允许访问申请提交和状态查询端点。
audience 参数:小程序 pending 用户传 "miniapp"
"""
now = datetime.now(timezone.utc)
access_payload: dict = {
"sub": str(user_id),
"type": "access",
"limited": True,
"exp": now + timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
}
refresh_payload: dict = {
"sub": str(user_id),
"type": "refresh",
"limited": True,
"exp": now + timedelta(days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS),
}
if audience is not None:
access_payload["aud"] = audience
refresh_payload["aud"] = audience
return {
"access_token": jwt.encode(
access_payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM
),
"refresh_token": jwt.encode(
refresh_payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM
),
"token_type": "bearer",
}
def decode_token(token: str, audience: str | None = None) -> dict:
"""
解码并验证 JWT 令牌。
返回 payload dict,包含 sub、site_id、type、exp、aud(可选)。
令牌无效或过期时抛出 JWTError。
audience 参数(P0-5 致命 2 修复):
- 传入时强制校验 token 的 aud 字段,不匹配抛 JWTError
- 不传时,如果 token 含 aud 字段 jose 会拒绝(因此默认 options 关闭 aud 校验)
- 旧 token(无 aud)兼容:不传 audience 时通过 options 关闭 aud 校验,放行
"""
try:
if audience is not None:
payload = jwt.decode(
token,
config.JWT_SECRET_KEY,
algorithms=[config.JWT_ALGORITHM],
audience=audience,
)
else:
# 兼容:不强制 aud 校验(旧 token 与新 token 都能解码)
payload = jwt.decode(
token,
config.JWT_SECRET_KEY,
algorithms=[config.JWT_ALGORITHM],
options={"verify_aud": False},
)
return payload
except JWTError:
raise
def decode_access_token(token: str, audience: str | None = None) -> dict:
"""
解码 access_token 并验证类型。
令牌类型不是 access 时抛出 JWTError。
audience 参数:见 decode_token。
"""
payload = decode_token(token, audience=audience)
if payload.get("type") != "access":
raise JWTError("令牌类型不是 access")
return payload
def decode_refresh_token(token: str, audience: str | None = None) -> dict:
"""
解码 refresh_token 并验证类型。
令牌类型不是 refresh 时抛出 JWTError。
audience 参数:见 decode_token。
"""
payload = decode_token(token, audience=audience)
if payload.get("type") != "refresh":
raise JWTError("令牌类型不是 refresh")
return payload