Files
Neo-ZQYY/docs/deployment/wx-encrypt-guide.md

9.4 KiB
Raw Permalink Blame History

微信消息推送 — 安全模式AES 加解密)说明

校验依据:https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html

当前状态

  • GET 验签:已通过,不受加密模式影响
  • POST 消息接收:选择了「安全模式」,微信推送的消息体为 AES 加密,需要解密后才能读取

需要的配置

apps/backend/.env.local 中添加:

# 微信后台「消息推送」页面的 EncodingAESKey43位字符串
WX_ENCODING_AES_KEY=你在微信后台生成的那个43位字符串

# 小程序的 AppID
WX_APP_ID=你的小程序AppID

安全模式与明文模式的关键区别

安全模式下,微信 POST 推送的 URL 参数会多出两个字段:

参数 说明
signature 普通签名token + timestamp + nonce仅用于明文模式
msg_signature 消息签名token + timestamp + nonce + Encrypt安全模式必须用这个验签
encrypt_type 值为 aes,表示安全模式
timestamp 时间戳
nonce 随机数

重要:安全模式下验签必须使用 msg_signature,不要使用 signature 验签算法:将 token、timestamp、nonce、Encrypt包体内的字段四个参数字典序排序后拼接做 SHA1msg_signature 比对。

解密原理

安全模式下,微信 POST 推送的 JSON 结构为:

{
    "Encrypt": "加密后的密文字符串"
}

解密流程:

  1. 从请求体取出 Encrypt 字段
  2. AESKey = Base64_Decode(EncodingAESKey + "="),得到 32 字节 AES 密钥
  3. 将 Encrypt 密文进行 Base64 解码,得到 TmpMsg
  4. 用 AESKey 对 TmpMsg 进行 AES-256-CBC 解密IV 取 AESKey 前 16 字节),去除 PKCS#7 填充K=32
  5. 解密后的明文格式:random(16B) + msg_len(4B) + msg + appid
    • random(16B)16 字节随机字符串
    • msg_len消息长度4 字节网络字节序(大端)
    • msg解密后的明文消息
    • appid小程序 AppID需验证是否与自身一致

回包加密(安全模式下需要加密回包)

如果需要回复非空内容(非 success),安全模式下回包也需要加密:

  1. 构造 FullStr = random(16B) + msg_len(4B) + msg + appid
  2. 用 AESKey 进行 AES-256-CBC 加密PKCS#7 填充K=32
  3. Base64 编码得到 Encrypt
  4. 生成 MsgSignature将 token、TimeStamp、Nonce、Encrypt 四个参数字典序排序拼接后 SHA1
  5. 回包格式JSON
{
    "Encrypt": "加密后的密文",
    "MsgSignature": "签名",
    "TimeStamp": 时间戳,
    "Nonce": "随机数"
}

如果只需回复 success,无需加密,直接返回纯文本 success 即可。

代码改动

方案 A使用微信官方示例代码推荐

微信官方提供了多种语言的加解密示例代码PHP、Java、C++、Python、C# 建议优先使用官方示例:https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html (页面底部「示例下载」链接)

安装依赖:

pip install pycryptodome  # AES 加解密

方案 B手动实现解密工具

apps/backend/app/utils/ 下新建 wx_crypto.py

"""
微信消息加解密工具

校验依据:
https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html

PKCS#7 填充说明(微信官方规范):
- K 为秘钥字节数(采用 32
- Buf 为待加密的内容N 为其字节数
- Buf 需要被填充为 K 的整数倍
- 在 Buf 的尾部填充 (K - N%K) 个字节,每个字节的内容是 (K - N%K)
"""
import base64
import hashlib
import struct
from Crypto.Cipher import AES


BLOCK_SIZE = 32  # 微信规范PKCS#7 填充使用 K=32


class WXBizMsgCrypt:
    """微信消息加解密"""

    def __init__(self, token: str, encoding_aes_key: str, app_id: str):
        self.token = token
        self.app_id = app_id
        # AESKey = Base64_Decode(EncodingAESKey + "=")
        self.aes_key = base64.b64decode(encoding_aes_key + "=")
        self.iv = self.aes_key[:16]  # 初始向量取密钥前 16 字节

    def check_msg_signature(
        self, msg_signature: str, timestamp: str, nonce: str, encrypt: str
    ) -> bool:
        """
        安全模式验签。
        注意:安全模式下必须用 msg_signature 验签,参与排序的是四个参数:
        token、timestamp、nonce、Encrypt包体内的字段
        """
        items = sorted([self.token, timestamp, nonce, encrypt])
        hash_str = hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
        return hash_str == msg_signature

    def decrypt(self, encrypt_text: str) -> str:
        """
        解密微信推送的加密消息。
        返回解密后的消息明文JSON 字符串)。
        """
        cipher = AES.new(self.aes_key, AES.MODE_CBC, self.iv)
        decrypted = cipher.decrypt(base64.b64decode(encrypt_text))

        # 去除 PKCS#7 填充K=32
        pad_len = decrypted[-1]
        if pad_len < 1 or pad_len > BLOCK_SIZE:
            raise ValueError(f"无效的 PKCS#7 填充: {pad_len}")
        content = decrypted[:-pad_len]

        # FullStr = random(16B) + msg_len(4B) + msg + appid
        msg_len = struct.unpack("!I", content[16:20])[0]
        msg = content[20 : 20 + msg_len].decode("utf-8")
        from_app_id = content[20 + msg_len :].decode("utf-8")

        # 校验 AppID
        if from_app_id != self.app_id:
            raise ValueError(
                f"AppID 不匹配: 期望 {self.app_id}, 实际 {from_app_id}"
            )

        return msg

    def encrypt(self, msg: str) -> str:
        """
        加密回包消息(安全模式下回复非 success 内容时需要)。
        返回 Base64 编码的密文。
        """
        import os

        # 构造 FullStr = random(16B) + msg_len(4B) + msg + appid
        msg_bytes = msg.encode("utf-8")
        app_id_bytes = self.app_id.encode("utf-8")
        full_str = (
            os.urandom(16)
            + struct.pack("!I", len(msg_bytes))
            + msg_bytes
            + app_id_bytes
        )

        # PKCS#7 填充K=32
        pad_len = BLOCK_SIZE - (len(full_str) % BLOCK_SIZE)
        full_str += bytes([pad_len] * pad_len)

        cipher = AES.new(self.aes_key, AES.MODE_CBC, self.iv)
        encrypted = cipher.encrypt(full_str)
        return base64.b64encode(encrypted).decode("utf-8")

    def generate_msg_signature(
        self, timestamp: str, nonce: str, encrypt: str
    ) -> str:
        """生成回包的 MsgSignature"""
        items = sorted([self.token, timestamp, nonce, encrypt])
        return hashlib.sha1("".join(items).encode("utf-8")).hexdigest()

改动 wx_callback.py 的 POST 处理

# 在文件顶部新增导入
import json
from app.utils.wx_crypto import WXBizMsgCrypt
from app.config import get

# 初始化解密器(模块级别)
wx_crypt = WXBizMsgCrypt(
    token=get("WX_CALLBACK_TOKEN", ""),
    encoding_aes_key=get("WX_ENCODING_AES_KEY", ""),
    app_id=get("WX_APP_ID", ""),
)


@router.post("/callback")
async def receive_message(
    request: Request,
    msg_signature: str = Query("", alias="msg_signature"),
    signature: str = Query(""),
    timestamp: str = Query(""),
    nonce: str = Query(""),
    encrypt_type: str = Query(""),
):
    body = await request.body()
    raw = json.loads(body)

    # 安全模式:用 msg_signature 验签四参数token + timestamp + nonce + Encrypt
    if encrypt_type == "aes" and "Encrypt" in raw:
        if not wx_crypt.check_msg_signature(
            msg_signature, timestamp, nonce, raw["Encrypt"]
        ):
            logger.warning("安全模式消息推送验签失败")
            return Response(content="signature mismatch", status_code=403)

        decrypted_str = wx_crypt.decrypt(raw["Encrypt"])
        data = json.loads(decrypted_str)
    else:
        # 明文模式 / 兼容模式:用 signature 验签(三参数)
        if not _check_signature(signature, timestamp, nonce):
            logger.warning("明文模式消息推送验签失败")
            return Response(content="signature mismatch", status_code=403)
        data = raw

    logger.info(
        "收到微信推送: MsgType=%s, Event=%s",
        data.get("MsgType", "?"),
        data.get("Event", "?"),
    )

    # TODO: 根据 MsgType/Event 分发处理
    # 回复 success 无需加密
    return Response(content="success", media_type="text/plain")

依赖安装

pip install pycryptodome

注意:是 pycryptodome 不是 pycrypto(后者已废弃)。

测试方法

部署后可以用微信开发者工具发送客服消息,或者关注/取关小程序触发事件推送, 查看后端日志确认消息是否正确解密:

# 在 Windows 服务器上查看后端日志
# 应该能看到 "收到微信推送: MsgType=event, Event=subscribe" 之类的输出

微信也提供了调试工具,可在消息推送配置页面使用「请求构造」和「调试工具」进行测试。

注意事项

  • EncodingAESKey 必须和微信后台配置的完全一致,一个字符都不能错
  • 如果后续在微信后台重新生成了 EncodingAESKey,服务器端也要同步更新并重启
  • GET 验签逻辑不需要改动,安全模式只影响 POST 消息体
  • PKCS#7 填充的 K 值为 32不是常见的 16这是微信的特殊规范
  • 安全模式下验签用 msg_signature(四参数),不要用 signature(三参数)