fix(backend): Wave 1 Day 1 三个 P0 D Bug 修复
- W1-T3 修 4 处 fdw_etl.* 必坏残留 → app.* (P0-5 致命 1) · tenant_users.py L431/L456-457: v_dim_assistant + v_dim_staff(_ex) · tenant_excel.py L394/L411: v_dim_assistant + v_dim_staff · tenant_clues.py L119: v_dim_member · 修复后 tenant-admin 用户审核 / Excel 上传 / 维客线索恢复正常 - W1-T4 JWT aud sign 端写入 (P0-5 致命 2 最小止血) · jwt.py 全部 token 创建/解码函数加 audience 参数 · auth.py admin 端加 audience="admin" · xcx_auth.py miniapp 端加 audience="miniapp" (8 处调用) · 18 router 切强制 aud 校验留 Wave 2 - W1-T5 DBViewer 白名单 + 黑名单双保险 (P0-8) · 白名单: SELECT/WITH/EXPLAIN/SHOW 开头 · 黑名单: 17 关键词覆盖全 DML/DDL/DCL · 注释剥离避免误伤;15/15 单测 PASS 参考: docs/audit/changes/2026-05-04__wave1_day1_d_bug_triple_fix.md
This commit is contained in:
@@ -28,18 +28,25 @@ def hash_password(password: str) -> str:
|
||||
|
||||
|
||||
def create_access_token(
|
||||
user_id: int, site_id: int, roles: list[str] | None = None
|
||||
user_id: int,
|
||||
site_id: int,
|
||||
roles: list[str] | None = None,
|
||||
audience: str | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
生成 access_token。
|
||||
|
||||
payload: sub=user_id, site_id, roles, type=access, exp
|
||||
roles 参数默认 None,保持向后兼容。
|
||||
payload: sub=user_id, site_id, roles, type=access, exp, aud(可选)
|
||||
roles / audience 参数默认 None,保持向后兼容。
|
||||
新增 audience 参数(P0-5 致命 2 修复):
|
||||
- admin-web 端登录传 audience="admin"
|
||||
- 小程序登录传 audience="miniapp"
|
||||
- tenant-admin 在自己的 router 内手动签发,不走本函数
|
||||
"""
|
||||
expire = datetime.now(timezone.utc) + timedelta(
|
||||
minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES
|
||||
)
|
||||
payload = {
|
||||
payload: dict = {
|
||||
"sub": str(user_id),
|
||||
"site_id": site_id,
|
||||
"type": "access",
|
||||
@@ -47,56 +54,79 @@ def create_access_token(
|
||||
}
|
||||
if roles is not None:
|
||||
payload["roles"] = roles
|
||||
if audience is not None:
|
||||
payload["aud"] = audience
|
||||
return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
|
||||
|
||||
|
||||
def create_refresh_token(user_id: int, site_id: int) -> str:
|
||||
def create_refresh_token(
|
||||
user_id: int,
|
||||
site_id: int,
|
||||
audience: str | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
生成 refresh_token。
|
||||
|
||||
payload: sub=user_id, site_id, type=refresh, exp
|
||||
payload: sub=user_id, site_id, type=refresh, exp, aud(可选)
|
||||
"""
|
||||
expire = datetime.now(timezone.utc) + timedelta(
|
||||
days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS
|
||||
)
|
||||
payload = {
|
||||
payload: dict = {
|
||||
"sub": str(user_id),
|
||||
"site_id": site_id,
|
||||
"type": "refresh",
|
||||
"exp": expire,
|
||||
}
|
||||
if audience is not None:
|
||||
payload["aud"] = audience
|
||||
return jwt.encode(payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM)
|
||||
|
||||
|
||||
def create_token_pair(user_id: int, site_id: int, roles: list[str] | None = None) -> dict[str, str]:
|
||||
"""生成 access_token + refresh_token 令牌对。"""
|
||||
def create_token_pair(
|
||||
user_id: int,
|
||||
site_id: int,
|
||||
roles: list[str] | None = None,
|
||||
audience: str | None = None,
|
||||
) -> dict[str, str]:
|
||||
"""生成 access_token + refresh_token 令牌对。
|
||||
|
||||
audience 参数:admin-web 传 "admin";小程序传 "miniapp"。
|
||||
"""
|
||||
return {
|
||||
"access_token": create_access_token(user_id, site_id, roles=roles),
|
||||
"refresh_token": create_refresh_token(user_id, site_id),
|
||||
"access_token": create_access_token(user_id, site_id, roles=roles, audience=audience),
|
||||
"refresh_token": create_refresh_token(user_id, site_id, audience=audience),
|
||||
"token_type": "bearer",
|
||||
}
|
||||
|
||||
|
||||
def create_limited_token_pair(user_id: int) -> dict[str, str]:
|
||||
def create_limited_token_pair(
|
||||
user_id: int,
|
||||
audience: str | None = None,
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
为 pending 用户签发受限令牌。
|
||||
|
||||
payload 不含 site_id 和 roles,仅包含 user_id + type + limited=True。
|
||||
payload 不含 site_id 和 roles,仅包含 user_id + type + limited=True + aud(可选)。
|
||||
受限令牌仅允许访问申请提交和状态查询端点。
|
||||
audience 参数:小程序 pending 用户传 "miniapp"。
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
access_payload = {
|
||||
access_payload: dict = {
|
||||
"sub": str(user_id),
|
||||
"type": "access",
|
||||
"limited": True,
|
||||
"exp": now + timedelta(minutes=config.JWT_ACCESS_TOKEN_EXPIRE_MINUTES),
|
||||
}
|
||||
refresh_payload = {
|
||||
refresh_payload: dict = {
|
||||
"sub": str(user_id),
|
||||
"type": "refresh",
|
||||
"limited": True,
|
||||
"exp": now + timedelta(days=config.JWT_REFRESH_TOKEN_EXPIRE_DAYS),
|
||||
}
|
||||
if audience is not None:
|
||||
access_payload["aud"] = audience
|
||||
refresh_payload["aud"] = audience
|
||||
return {
|
||||
"access_token": jwt.encode(
|
||||
access_payload, config.JWT_SECRET_KEY, algorithm=config.JWT_ALGORITHM
|
||||
@@ -108,41 +138,60 @@ def create_limited_token_pair(user_id: int) -> dict[str, str]:
|
||||
}
|
||||
|
||||
|
||||
def decode_token(token: str) -> dict:
|
||||
def decode_token(token: str, audience: str | None = None) -> dict:
|
||||
"""
|
||||
解码并验证 JWT 令牌。
|
||||
|
||||
返回 payload dict,包含 sub、site_id、type、exp。
|
||||
返回 payload dict,包含 sub、site_id、type、exp、aud(可选)。
|
||||
令牌无效或过期时抛出 JWTError。
|
||||
|
||||
audience 参数(P0-5 致命 2 修复):
|
||||
- 传入时强制校验 token 的 aud 字段,不匹配抛 JWTError
|
||||
- 不传时,如果 token 含 aud 字段 jose 会拒绝(因此默认 options 关闭 aud 校验)
|
||||
- 旧 token(无 aud)兼容:不传 audience 时通过 options 关闭 aud 校验,放行
|
||||
"""
|
||||
try:
|
||||
payload = jwt.decode(
|
||||
token, config.JWT_SECRET_KEY, algorithms=[config.JWT_ALGORITHM]
|
||||
)
|
||||
if audience is not None:
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
config.JWT_SECRET_KEY,
|
||||
algorithms=[config.JWT_ALGORITHM],
|
||||
audience=audience,
|
||||
)
|
||||
else:
|
||||
# 兼容:不强制 aud 校验(旧 token 与新 token 都能解码)
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
config.JWT_SECRET_KEY,
|
||||
algorithms=[config.JWT_ALGORITHM],
|
||||
options={"verify_aud": False},
|
||||
)
|
||||
return payload
|
||||
except JWTError:
|
||||
raise
|
||||
|
||||
|
||||
def decode_access_token(token: str) -> dict:
|
||||
def decode_access_token(token: str, audience: str | None = None) -> dict:
|
||||
"""
|
||||
解码 access_token 并验证类型。
|
||||
|
||||
令牌类型不是 access 时抛出 JWTError。
|
||||
audience 参数:见 decode_token。
|
||||
"""
|
||||
payload = decode_token(token)
|
||||
payload = decode_token(token, audience=audience)
|
||||
if payload.get("type") != "access":
|
||||
raise JWTError("令牌类型不是 access")
|
||||
return payload
|
||||
|
||||
|
||||
def decode_refresh_token(token: str) -> dict:
|
||||
def decode_refresh_token(token: str, audience: str | None = None) -> dict:
|
||||
"""
|
||||
解码 refresh_token 并验证类型。
|
||||
|
||||
令牌类型不是 refresh 时抛出 JWTError。
|
||||
audience 参数:见 decode_token。
|
||||
"""
|
||||
payload = decode_token(token)
|
||||
payload = decode_token(token, audience=audience)
|
||||
if payload.get("type") != "refresh":
|
||||
raise JWTError("令牌类型不是 refresh")
|
||||
return payload
|
||||
|
||||
Reference in New Issue
Block a user