9.4 KiB
9.4 KiB
微信消息推送 — 安全模式(AES 加解密)说明
校验依据:https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html
当前状态
- GET 验签:已通过,不受加密模式影响
- POST 消息接收:选择了「安全模式」,微信推送的消息体为 AES 加密,需要解密后才能读取
需要的配置
在 apps/backend/.env.local 中添加:
# 微信后台「消息推送」页面的 EncodingAESKey(43位字符串)
WX_ENCODING_AES_KEY=你在微信后台生成的那个43位字符串
# 小程序的 AppID
WX_APP_ID=你的小程序AppID
安全模式与明文模式的关键区别
安全模式下,微信 POST 推送的 URL 参数会多出两个字段:
| 参数 | 说明 |
|---|---|
signature |
普通签名(token + timestamp + nonce),仅用于明文模式 |
msg_signature |
消息签名(token + timestamp + nonce + Encrypt),安全模式必须用这个验签 |
encrypt_type |
值为 aes,表示安全模式 |
timestamp |
时间戳 |
nonce |
随机数 |
重要:安全模式下验签必须使用 msg_signature,不要使用 signature!
验签算法:将 token、timestamp、nonce、Encrypt(包体内的字段)四个参数字典序排序后拼接,做 SHA1,与 msg_signature 比对。
解密原理
安全模式下,微信 POST 推送的 JSON 结构为:
{
"Encrypt": "加密后的密文字符串"
}
解密流程:
- 从请求体取出
Encrypt字段 - AESKey = Base64_Decode(EncodingAESKey + "="),得到 32 字节 AES 密钥
- 将 Encrypt 密文进行 Base64 解码,得到 TmpMsg
- 用 AESKey 对 TmpMsg 进行 AES-256-CBC 解密(IV 取 AESKey 前 16 字节),去除 PKCS#7 填充(K=32)
- 解密后的明文格式:
random(16B) + msg_len(4B) + msg + appid- random(16B):16 字节随机字符串
- msg_len:消息长度,4 字节网络字节序(大端)
- msg:解密后的明文消息
- appid:小程序 AppID,需验证是否与自身一致
回包加密(安全模式下需要加密回包)
如果需要回复非空内容(非 success),安全模式下回包也需要加密:
- 构造 FullStr = random(16B) + msg_len(4B) + msg + appid
- 用 AESKey 进行 AES-256-CBC 加密,PKCS#7 填充(K=32)
- Base64 编码得到 Encrypt
- 生成 MsgSignature:将 token、TimeStamp、Nonce、Encrypt 四个参数字典序排序拼接后 SHA1
- 回包格式(JSON):
{
"Encrypt": "加密后的密文",
"MsgSignature": "签名",
"TimeStamp": 时间戳,
"Nonce": "随机数"
}
如果只需回复
success,无需加密,直接返回纯文本success即可。
代码改动
方案 A:使用微信官方示例代码(推荐)
微信官方提供了多种语言的加解密示例代码(PHP、Java、C++、Python、C#), 建议优先使用官方示例:https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html (页面底部「示例下载」链接)
安装依赖:
pip install pycryptodome # AES 加解密
方案 B:手动实现解密工具
在 apps/backend/app/utils/ 下新建 wx_crypto.py:
"""
微信消息加解密工具
校验依据:
https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html
PKCS#7 填充说明(微信官方规范):
- K 为秘钥字节数(采用 32)
- Buf 为待加密的内容,N 为其字节数
- Buf 需要被填充为 K 的整数倍
- 在 Buf 的尾部填充 (K - N%K) 个字节,每个字节的内容是 (K - N%K)
"""
import base64
import hashlib
import struct
from Crypto.Cipher import AES
BLOCK_SIZE = 32 # 微信规范:PKCS#7 填充使用 K=32
class WXBizMsgCrypt:
"""微信消息加解密"""
def __init__(self, token: str, encoding_aes_key: str, app_id: str):
self.token = token
self.app_id = app_id
# AESKey = Base64_Decode(EncodingAESKey + "=")
self.aes_key = base64.b64decode(encoding_aes_key + "=")
self.iv = self.aes_key[:16] # 初始向量取密钥前 16 字节
def check_msg_signature(
self, msg_signature: str, timestamp: str, nonce: str, encrypt: str
) -> bool:
"""
安全模式验签。
注意:安全模式下必须用 msg_signature 验签,参与排序的是四个参数:
token、timestamp、nonce、Encrypt(包体内的字段)
"""
items = sorted([self.token, timestamp, nonce, encrypt])
hash_str = hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
return hash_str == msg_signature
def decrypt(self, encrypt_text: str) -> str:
"""
解密微信推送的加密消息。
返回解密后的消息明文(JSON 字符串)。
"""
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.iv)
decrypted = cipher.decrypt(base64.b64decode(encrypt_text))
# 去除 PKCS#7 填充(K=32)
pad_len = decrypted[-1]
if pad_len < 1 or pad_len > BLOCK_SIZE:
raise ValueError(f"无效的 PKCS#7 填充: {pad_len}")
content = decrypted[:-pad_len]
# FullStr = random(16B) + msg_len(4B) + msg + appid
msg_len = struct.unpack("!I", content[16:20])[0]
msg = content[20 : 20 + msg_len].decode("utf-8")
from_app_id = content[20 + msg_len :].decode("utf-8")
# 校验 AppID
if from_app_id != self.app_id:
raise ValueError(
f"AppID 不匹配: 期望 {self.app_id}, 实际 {from_app_id}"
)
return msg
def encrypt(self, msg: str) -> str:
"""
加密回包消息(安全模式下回复非 success 内容时需要)。
返回 Base64 编码的密文。
"""
import os
# 构造 FullStr = random(16B) + msg_len(4B) + msg + appid
msg_bytes = msg.encode("utf-8")
app_id_bytes = self.app_id.encode("utf-8")
full_str = (
os.urandom(16)
+ struct.pack("!I", len(msg_bytes))
+ msg_bytes
+ app_id_bytes
)
# PKCS#7 填充(K=32)
pad_len = BLOCK_SIZE - (len(full_str) % BLOCK_SIZE)
full_str += bytes([pad_len] * pad_len)
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.iv)
encrypted = cipher.encrypt(full_str)
return base64.b64encode(encrypted).decode("utf-8")
def generate_msg_signature(
self, timestamp: str, nonce: str, encrypt: str
) -> str:
"""生成回包的 MsgSignature"""
items = sorted([self.token, timestamp, nonce, encrypt])
return hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
改动 wx_callback.py 的 POST 处理
# 在文件顶部新增导入
import json
from app.utils.wx_crypto import WXBizMsgCrypt
from app.config import get
# 初始化解密器(模块级别)
wx_crypt = WXBizMsgCrypt(
token=get("WX_CALLBACK_TOKEN", ""),
encoding_aes_key=get("WX_ENCODING_AES_KEY", ""),
app_id=get("WX_APP_ID", ""),
)
@router.post("/callback")
async def receive_message(
request: Request,
msg_signature: str = Query("", alias="msg_signature"),
signature: str = Query(""),
timestamp: str = Query(""),
nonce: str = Query(""),
encrypt_type: str = Query(""),
):
body = await request.body()
raw = json.loads(body)
# 安全模式:用 msg_signature 验签(四参数:token + timestamp + nonce + Encrypt)
if encrypt_type == "aes" and "Encrypt" in raw:
if not wx_crypt.check_msg_signature(
msg_signature, timestamp, nonce, raw["Encrypt"]
):
logger.warning("安全模式消息推送验签失败")
return Response(content="signature mismatch", status_code=403)
decrypted_str = wx_crypt.decrypt(raw["Encrypt"])
data = json.loads(decrypted_str)
else:
# 明文模式 / 兼容模式:用 signature 验签(三参数)
if not _check_signature(signature, timestamp, nonce):
logger.warning("明文模式消息推送验签失败")
return Response(content="signature mismatch", status_code=403)
data = raw
logger.info(
"收到微信推送: MsgType=%s, Event=%s",
data.get("MsgType", "?"),
data.get("Event", "?"),
)
# TODO: 根据 MsgType/Event 分发处理
# 回复 success 无需加密
return Response(content="success", media_type="text/plain")
依赖安装
pip install pycryptodome
注意:是 pycryptodome 不是 pycrypto(后者已废弃)。
测试方法
部署后可以用微信开发者工具发送客服消息,或者关注/取关小程序触发事件推送, 查看后端日志确认消息是否正确解密:
# 在 Windows 服务器上查看后端日志
# 应该能看到 "收到微信推送: MsgType=event, Event=subscribe" 之类的输出
微信也提供了调试工具,可在消息推送配置页面使用「请求构造」和「调试工具」进行测试。
注意事项
EncodingAESKey必须和微信后台配置的完全一致,一个字符都不能错- 如果后续在微信后台重新生成了
EncodingAESKey,服务器端也要同步更新并重启 - GET 验签逻辑不需要改动,安全模式只影响 POST 消息体
- PKCS#7 填充的 K 值为 32(不是常见的 16),这是微信的特殊规范
- 安全模式下验签用
msg_signature(四参数),不要用signature(三参数)