# -*- coding: utf-8 -*- """ 租户管理员认证依赖注入。 提供 require_tenant_admin() 依赖,验证 JWT aud=tenant-admin, 与小程序端 get_current_user()(aud 隐含为 xcx)完全隔离。 用法: @router.get("/protected") async def endpoint(admin: CurrentTenantAdmin = Depends(require_tenant_admin)): print(admin.admin_id, admin.managed_site_ids) """ from __future__ import annotations # AI_CHANGELOG # - 2026-03-23 21:00:00 | Prompt: P20260323-210000(根治 tenant_admin managed_site_ids 限制)| Direct cause:JWT managed_site_ids 静态签发,新建店铺后所有端点受限 | Summary:新增 get_tenant_site_ids(tenant_id) 和 get_effective_site_ids(admin) 函数;改造 site_filter_clause 和 verify_site_access 支持 admin= keyword-only 参数(向后兼容旧签名)| Verify:tenant_admin 新建店铺后无需重新登录即可访问所有端点 from dataclasses import dataclass, field from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from jose import JWTError from app.auth.jwt import decode_access_token from app import config as _config from jose import jwt as _jose_jwt # 复用与 dependencies.py 相同的 Bearer 提取器 _bearer_scheme = HTTPBearer(auto_error=True) @dataclass(frozen=True) class CurrentTenantAdmin: """从 JWT 解析出的租户管理员上下文。""" admin_id: int tenant_id: int managed_site_ids: list[int] = field(default_factory=list) display_name: str | None = None admin_type: str = "tenant_admin" # tenant_admin / site_admin async def require_tenant_admin( credentials: HTTPAuthorizationCredentials = Depends(_bearer_scheme), ) -> CurrentTenantAdmin: """ FastAPI 依赖:验证 JWT aud=tenant-admin,提取管理员信息。 拒绝小程序 JWT(aud 不匹配)及任何无效/过期令牌。 """ token = credentials.credentials try: # 直接解码并验证 aud=tenant-admin + type=access # 不能复用 decode_access_token(),因为它不传 audience 参数, # jose 遇到 aud claim 但无 audience 参数时会直接拒绝。 payload = _jose_jwt.decode( token, _config.JWT_SECRET_KEY, algorithms=[_config.JWT_ALGORITHM], audience="tenant-admin", ) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的令牌", headers={"WWW-Authenticate": "Bearer"}, ) # 验证 token type 为 access(与 decode_access_token 一致) if payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="令牌类型不匹配", headers={"WWW-Authenticate": "Bearer"}, ) # jose 在 aud claim 缺失时不会拒绝,需要显式检查 if payload.get("aud") != "tenant-admin": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="令牌类型不匹配", headers={"WWW-Authenticate": "Bearer"}, ) # 提取必要字段 sub = payload.get("sub") tenant_id = payload.get("tenant_id") managed_site_ids = payload.get("managed_site_ids") if sub is None or tenant_id is None or managed_site_ids is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="令牌缺少必要字段", headers={"WWW-Authenticate": "Bearer"}, ) try: admin_id = int(sub) except (TypeError, ValueError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="令牌中 admin_id 格式无效", headers={"WWW-Authenticate": "Bearer"}, ) return CurrentTenantAdmin( admin_id=admin_id, tenant_id=tenant_id, managed_site_ids=managed_site_ids, display_name=payload.get("display_name"), admin_type=payload.get("admin_type", "tenant_admin"), ) # ── 数据隔离工具函数 ───────────────────────────────────────── # [CHANGE P20260323-210000] intent: 根治 tenant_admin 的 managed_site_ids 限制, # tenant_admin 按 tenant_id 查 biz.sites 获取有效 site_ids, # site_admin 仍用 JWT 中的 managed_site_ids。 # assumptions: biz.sites 数据量极小(几条),无需缓存 # verify: tenant_admin 新建店铺后无需重新登录即可访问 def get_tenant_site_ids(tenant_id: int) -> list[int]: """查询租户下所有活跃店铺的 site_id 列表。 通过 biz.tenants.tenant_id(外部租户标识)→ biz.tenants.id(内部 PK) → biz.sites.tenant_id 关联查询。 """ from app.database import get_connection conn = get_connection() try: with conn.cursor() as cur: cur.execute( """ SELECT s.site_id FROM biz.sites s JOIN biz.tenants t ON t.id = s.tenant_id WHERE t.tenant_id = %s AND t.is_active = true AND s.is_active = true """, (tenant_id,), ) return [row[0] for row in cur.fetchall()] finally: conn.close() def get_effective_site_ids(admin: CurrentTenantAdmin) -> list[int]: """获取管理员的有效 site_id 列表。 - tenant_admin:实时查 biz.sites(覆盖新建店铺) - site_admin:使用 JWT 中的 managed_site_ids(精确控制) """ if admin.admin_type == "tenant_admin": return get_tenant_site_ids(admin.tenant_id) return admin.managed_site_ids def site_filter_clause( managed_site_ids: list[int] | None = None, *, admin: CurrentTenantAdmin | None = None, ) -> tuple[str, tuple]: """生成 site_id IN (...) SQL 片段,用于数据隔离查询。 优先使用 admin 参数(自动区分 tenant_admin/site_admin), 也兼容旧的 managed_site_ids 直传方式。 返回 (sql_fragment, params_tuple),可直接拼入 WHERE 子句。 """ if admin is not None: site_ids = get_effective_site_ids(admin) elif managed_site_ids is not None: site_ids = managed_site_ids else: return "1 = 0", () if not site_ids: return "1 = 0", () placeholders = ", ".join(["%s"] * len(site_ids)) return f"site_id IN ({placeholders})", tuple(site_ids) def verify_site_access( site_id: int, managed_site_ids: list[int] | None = None, *, admin: CurrentTenantAdmin | None = None, ) -> None: """校验 site_id 是否在管辖范围内,不在则抛 403。 优先使用 admin 参数(自动区分 tenant_admin/site_admin), 也兼容旧的 managed_site_ids 直传方式。 """ if admin is not None: effective_ids = get_effective_site_ids(admin) elif managed_site_ids is not None: effective_ids = managed_site_ids else: effective_ids = [] if site_id not in effective_ids: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="无权访问该门店数据", )