Files
Neo-ZQYY/docs/deployment/wx-api-security-guide.md

16 KiB
Raw Permalink Blame History

微信小程序 API 安全(证书 + 签名 + 加密)使用说明

校验依据:https://developers.weixin.qq.com/miniprogram/dev/server/getting_started/api_signature.html

背景

微信小程序部分敏感 API如获取手机号等支持服务通信二次加密和签名 可有效防止数据篡改与泄露。开发者在小程序管理后台「开发 → 开发管理 → 开发设置 → API 安全」 进行密钥配置。

你拿到的四样材料

材料 微信后台对应位置 用途
开放平台证书编号SN API 安全页面显示 放在请求头 Wechatmp-Serial 中标识证书
.cer 平台证书文件 配置完公钥后下载 验证微信 API 响应的签名RSAwithSHA256
对称密钥AES256 Key API 对称密钥处配置 加密请求数据 + 解密响应数据AES256_GCM
非对称密钥RSA 私钥) API 非对称密钥处配置 对请求签名RSAwithSHA256 + PSS 填充)

支持的算法组合

微信支持以下算法,在管理后台配置:

类型 可选算法
加密算法 AES256_GCM、SM4_GCM
签名算法 RSAwithSHA256、SM2withSM3

以下以 AES256_GCM + RSAwithSHA256 为例说明。

完整流程概览

请求:原始数据 → AES256_GCM 加密 → RSAwithSHA256 签名 → 发送
响应:接收 → RSAwithSHA256 验签 → AES256_GCM 解密 → 原始数据

配置步骤

1. 将密钥文件放到服务器

New-Item -ItemType Directory -Path D:\NeoZQYY\secrets -Force

将以下文件放入 D:\NeoZQYY\secrets\

  • wx_api_private_key.pem — 应用私钥RSA
  • wx_api_platform_cert.cer — 平台证书(微信的公钥,用于验签响应)

2. 配置环境变量

apps/backend/.env.local 中添加:

# API 安全 - 对称密钥编号(微信后台 API 安全页面显示的 SN
WX_API_SYM_SN=你的对称密钥编号

# API 安全 - 对称密钥AES256 KeyBase64 编码的 32 字节密钥)
WX_API_SYMMETRIC_KEY=你的对称密钥明文

# API 安全 - 非对称密钥编号
WX_API_ASYM_SN=你的非对称密钥编号

# API 安全 - RSA 私钥文件路径
WX_API_PRIVATE_KEY_PATH=D:/NeoZQYY/secrets/wx_api_private_key.pem

# API 安全 - 平台证书文件路径(微信的公钥,用于验签响应)
WX_API_PLATFORM_CERT_PATH=D:/NeoZQYY/secrets/wx_api_platform_cert.cer

# API 安全 - 平台证书编号(微信后台显示,非证书内序列号)
WX_API_PLATFORM_CERT_SN=平台证书编号

3. 后端工具类

apps/backend/app/utils/ 下新建 wx_api_sign.py

"""
微信小程序 API 安全 — 请求加密 + 签名 + 响应验签 + 解密

校验依据:
https://developers.weixin.qq.com/miniprogram/dev/server/getting_started/api_signature.html

流程:
1. 请求加密AES256_GCM 加密请求数据
2. 请求签名RSAwithSHA256 + PSS 填充salt=32对密文签名
3. 响应验签:用平台证书公钥验证响应签名
4. 响应解密AES256_GCM 解密响应数据
"""
import base64
import json
import os
import time
from pathlib import Path

from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.x509 import load_pem_x509_certificate

from app.config import get


class WxApiSecurity:
    """微信 API 安全:加密 + 签名 + 验签 + 解密"""

    def __init__(self):
        self.app_id: str = get("WX_APP_ID", "")
        self.sym_sn: str = get("WX_API_SYM_SN", "")
        self.asym_sn: str = get("WX_API_ASYM_SN", "")
        self.platform_cert_sn: str = get("WX_API_PLATFORM_CERT_SN", "")

        # 对称密钥AES256Base64 编码)
        sym_key_b64 = get("WX_API_SYMMETRIC_KEY", "")
        self.sym_key = base64.b64decode(sym_key_b64) if sym_key_b64 else None

        # 加载 RSA 私钥(应用私钥,用于签名请求)
        key_path = get("WX_API_PRIVATE_KEY_PATH", "")
        if key_path and Path(key_path).exists():
            with open(key_path, "rb") as f:
                self.private_key = serialization.load_pem_private_key(
                    f.read(), password=None
                )
        else:
            self.private_key = None

        # 加载平台证书(微信公钥,用于验签响应)
        cert_path = get("WX_API_PLATFORM_CERT_PATH", "")
        if cert_path and Path(cert_path).exists():
            with open(cert_path, "rb") as f:
                cert = load_pem_x509_certificate(f.read())
                self.platform_public_key = cert.public_key()
        else:
            self.platform_public_key = None

    # ========== 请求加密AES256_GCM==========

    def encrypt_request(
        self, url_path: str, req_data: dict, timestamp: int
    ) -> dict:
        """
        加密请求数据。

        参数:
            url_path: 完整 API URL含协议不含 query 参数),
                      如 "https://api.weixin.qq.com/wxa/business/getuserphonenumber"
            req_data: 原始请求参数字典
            timestamp: 时间戳

        返回:
            加密后的请求体 {"iv": "...", "data": "...", "authtag": "..."}
        """
        if not self.sym_key:
            raise RuntimeError("对称密钥未配置")

        # 在原数据内添加安全字段
        req_data["_n"] = base64.b64encode(os.urandom(16)).decode()
        req_data["_appid"] = self.app_id
        req_data["_timestamp"] = timestamp

        plaintext = json.dumps(req_data, ensure_ascii=False).encode("utf-8")

        # GCM 额外认证数据AADurlpath|appid|timestamp|sn
        aad = f"{url_path}|{self.app_id}|{timestamp}|{self.sym_sn}".encode()

        # 12 字节随机 IV
        iv = os.urandom(12)
        aesgcm = AESGCM(self.sym_key)
        # encrypt 返回 ciphertext + tag最后 16 字节是 tag
        ct_with_tag = aesgcm.encrypt(iv, plaintext, aad)
        ciphertext = ct_with_tag[:-16]
        authtag = ct_with_tag[-16:]

        return {
            "iv": base64.b64encode(iv).decode(),
            "data": base64.b64encode(ciphertext).decode(),
            "authtag": base64.b64encode(authtag).decode(),
        }

    # ========== 请求签名RSAwithSHA256 + PSS==========

    def sign_request(
        self, url_path: str, post_data: str, timestamp: int
    ) -> dict[str, str]:
        """
        生成签名请求头。

        参数:
            url_path: 完整 API URL含协议不含 query 参数)
            post_data: 加密后的请求体 JSON 字符串
            timestamp: 时间戳(与加密时一致)

        返回:
            签名相关的请求头字典
        """
        if not self.private_key:
            raise RuntimeError("RSA 私钥未配置")

        # 待签名串格式urlpath\nappid\ntimestamp\npostdata
        # 字段之间用换行符分隔,末尾无额外换行符
        payload = f"{url_path}\n{self.app_id}\n{timestamp}\n{post_data}"

        # RSAwithSHA256 + PSS 填充salt 长度为 32
        signature = self.private_key.sign(
            payload.encode("utf-8"),
            asym_padding.PSS(
                mgf=asym_padding.MGF1(hashes.SHA256()),
                salt_length=32,
            ),
            hashes.SHA256(),
        )

        return {
            "Wechatmp-Appid": self.app_id,
            "Wechatmp-TimeStamp": str(timestamp),
            "Wechatmp-Signature": base64.b64encode(signature).decode(),
        }

    # ========== 响应验签 ==========

    def verify_response(
        self, url_path: str, resp_data: str, timestamp: str,
        signature_b64: str, serial: str
    ) -> bool:
        """
        验证微信 API 响应签名。

        参数:
            url_path: 请求的 API URL
            resp_data: 响应体字符串
            timestamp: 响应头 Wechatmp-TimeStamp
            signature_b64: 响应头 Wechatmp-SignatureBase64
            serial: 响应头 Wechatmp-Serial平台证书编号
        """
        if not self.platform_public_key:
            raise RuntimeError("平台证书未配置")

        # 待验签串urlpath\nappid\ntimestamp\nrespdata
        payload = f"{url_path}\n{self.app_id}\n{timestamp}\n{resp_data}"
        signature = base64.b64decode(signature_b64)

        try:
            self.platform_public_key.verify(
                signature,
                payload.encode("utf-8"),
                asym_padding.PSS(
                    mgf=asym_padding.MGF1(hashes.SHA256()),
                    salt_length=asym_padding.PSS.AUTO,
                ),
                hashes.SHA256(),
            )
            return True
        except Exception:
            return False

    # ========== 响应解密AES256_GCM==========

    def decrypt_response(
        self, resp_body: dict, url_path: str, timestamp: str
    ) -> dict:
        """
        解密微信 API 响应数据。

        参数:
            resp_body: 响应体 {"iv": "...", "data": "...", "authtag": "..."}
            url_path: 请求的 API URL
            timestamp: 响应头 Wechatmp-TimeStamp
        """
        if not self.sym_key:
            raise RuntimeError("对称密钥未配置")

        iv = base64.b64decode(resp_body["iv"])
        ciphertext = base64.b64decode(resp_body["data"])
        authtag = base64.b64decode(resp_body["authtag"])

        # AAD 格式与请求时一致
        aad = f"{url_path}|{self.app_id}|{timestamp}|{self.sym_sn}".encode()

        aesgcm = AESGCM(self.sym_key)
        plaintext = aesgcm.decrypt(iv, ciphertext + authtag, aad)
        return json.loads(plaintext)

    # ========== 便捷方法:加密 + 签名一步到位 ==========

    def prepare_request(
        self, url_path: str, req_data: dict
    ) -> tuple[str, dict[str, str]]:
        """
        一步完成加密 + 签名,返回 (加密后的body字符串, 请求头字典)。
        """
        timestamp = int(time.time())

        # 1. 加密
        encrypted = self.encrypt_request(url_path, req_data, timestamp)
        body_str = json.dumps(encrypted, ensure_ascii=False)

        # 2. 签名
        headers = self.sign_request(url_path, body_str, timestamp)
        headers["Content-Type"] = "application/json"

        return body_str, headers


# 模块级单例
wx_security = WxApiSecurity()

4. 使用示例:获取手机号

import httpx
import json
from app.utils.wx_api_sign import wx_security


async def get_user_phone_number(access_token: str, code: str) -> dict:
    """
    调用微信接口获取用户手机号(带加密 + 签名)。
    code 来自小程序端 getPhoneNumber 按钮回调。

    接口文档:
    https://developers.weixin.qq.com/miniprogram/dev/server/API/user-info/phone-number/api_getphonenumber.html
    """
    url_path = "https://api.weixin.qq.com/wxa/business/getuserphonenumber"
    full_url = f"{url_path}?access_token={access_token}"

    # 加密 + 签名
    body_str, headers = wx_security.prepare_request(
        url_path, {"code": code}
    )

    async with httpx.AsyncClient() as client:
        resp = await client.post(full_url, content=body_str, headers=headers)

        # 验签(从响应头获取签名信息)
        resp_sig = resp.headers.get("Wechatmp-Signature", "")
        resp_ts = resp.headers.get("Wechatmp-TimeStamp", "")
        resp_serial = resp.headers.get("Wechatmp-Serial", "")

        if resp_sig:
            verified = wx_security.verify_response(
                url_path, resp.text, resp_ts, resp_sig, resp_serial
            )
            if not verified:
                raise ValueError("微信响应签名验证失败")

        # 解密响应
        resp_body = resp.json()
        if "iv" in resp_body and "data" in resp_body:
            return wx_security.decrypt_response(resp_body, url_path, resp_ts)
        else:
            # 未加密的响应(错误码等)
            return resp_body

不使用加密的简单调用(仅签名)

如果某些接口只需要签名不需要加密,可以只用签名部分:

async def call_wx_api_sign_only(
    access_token: str, url_path: str, data: dict
) -> dict:
    """仅签名,不加密请求体"""
    full_url = f"{url_path}?access_token={access_token}"
    timestamp = int(time.time())
    body_str = json.dumps(data, ensure_ascii=False)

    headers = wx_security.sign_request(url_path, body_str, timestamp)
    headers["Content-Type"] = "application/json"

    async with httpx.AsyncClient() as client:
        resp = await client.post(full_url, content=body_str, headers=headers)
        return resp.json()

依赖安装

pip install cryptography httpx
  • cryptography — RSA 签名PSS 填充)+ AES256_GCM 加解密 + X.509 证书解析
  • httpx — 异步 HTTP 客户端

注意API 安全使用的是 AES256_GCM不是 CBCcryptography 库原生支持,不需要 pycryptodome

请求头说明

Header 说明
Wechatmp-Appid 当前小程序的 AppID
Wechatmp-TimeStamp 签名时的时间戳
Wechatmp-Signature 签名数据Base64 编码)

响应头说明

Header 说明
Wechatmp-Appid 小程序 AppID
Wechatmp-TimeStamp 签名时间戳
Wechatmp-Serial 平台证书编号(在 MP 管理页获取,非证书内序列号)
Wechatmp-Signature 平台证书签名Base64
Wechatmp-Serial-Deprecated 即将失效的证书编号(仅证书更换周期内出现)
Wechatmp-Signature-Deprecated 即将失效的证书签名(仅证书更换周期内出现)

Wechatmp-Serial-Deprecated 出现,说明当前使用的平台证书即将过期,请尽快到 MP 后台更新。

签名算法关键细节

  1. 签名使用 PSS 填充方式salt 长度为 32
  2. 待签名串格式:urlpath\nappid\ntimestamp\npostdata,字段间用 \n 分隔,末尾无额外换行
  3. urlpath 需要包含 HTTP 协议头(如 https://api.weixin.qq.com/wxa/...),不包含 URL 参数
  4. PSS 签名包含随机因子,每次签名结果都会变化,这是正常的

加密算法关键细节

  1. 使用 AES256_GCM 模式(不是 CBC
  2. IV 为 12 字节随机字符串
  3. AAD额外认证数据格式urlpath|appid|timestamp|sn,用竖线 | 分隔
  4. 加密后的 data 明文需要额外包含 _n(随机串)、_appid_timestamp 三个安全字段
  5. 不包含 AccessTokenAccessToken 放在 URL 参数中)

错误码

错误码 说明
40230 缺少 Wechatmp-Serial
40231 缺少 Wechatmp-Timestamp
40232 缺少 Wechatmp-Signature
40233 缺少 Wechatmp-Appid
40234 签名错误
40235 加密错误
40236 无效的 Wechatmp-Appid
40237 Wechatmp-Appid 和 Token 不匹配
40238 开发者未设置对称密钥
40239 开发者未设置公钥
40240 超时的数据

安全注意事项

  • RSA 私钥文件绝对不能提交到 Git放在 D:\NeoZQYY\secrets\ 并确保 .gitignore 排除
  • 对称密钥只放在 .env.local 中,不要硬编码
  • 平台证书有有效期,注意响应头中的 Wechatmp-Serial-Deprecated 提示
  • 测试环境和正式环境使用同一套密钥API 安全是 AppID 级别的)

参考文档