Files
Neo-ZQYY/docs/deployment/wx-encrypt-guide.md

284 lines
9.4 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 微信消息推送 — 安全模式AES 加解密)说明
> 校验依据https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html
## 当前状态
- GET 验签:已通过,不受加密模式影响
- POST 消息接收:选择了「安全模式」,微信推送的消息体为 AES 加密,需要解密后才能读取
## 需要的配置
`apps/backend/.env.local` 中添加:
```env
# 微信后台「消息推送」页面的 EncodingAESKey43位字符串
WX_ENCODING_AES_KEY=你在微信后台生成的那个43位字符串
# 小程序的 AppID
WX_APP_ID=你的小程序AppID
```
## 安全模式与明文模式的关键区别
安全模式下,微信 POST 推送的 URL 参数会多出两个字段:
| 参数 | 说明 |
|------|------|
| `signature` | 普通签名token + timestamp + nonce仅用于明文模式 |
| `msg_signature` | 消息签名token + timestamp + nonce + Encrypt安全模式必须用这个验签 |
| `encrypt_type` | 值为 `aes`,表示安全模式 |
| `timestamp` | 时间戳 |
| `nonce` | 随机数 |
**重要:安全模式下验签必须使用 `msg_signature`,不要使用 `signature`**
验签算法:将 token、timestamp、nonce、Encrypt包体内的字段四个参数字典序排序后拼接做 SHA1`msg_signature` 比对。
## 解密原理
安全模式下,微信 POST 推送的 JSON 结构为:
```json
{
"Encrypt": "加密后的密文字符串"
}
```
解密流程:
1. 从请求体取出 `Encrypt` 字段
2. AESKey = Base64_Decode(EncodingAESKey + "="),得到 32 字节 AES 密钥
3. 将 Encrypt 密文进行 Base64 解码,得到 TmpMsg
4. 用 AESKey 对 TmpMsg 进行 AES-256-CBC 解密IV 取 AESKey 前 16 字节),去除 PKCS#7 填充K=32
5. 解密后的明文格式:`random(16B) + msg_len(4B) + msg + appid`
- random(16B)16 字节随机字符串
- msg_len消息长度4 字节网络字节序(大端)
- msg解密后的明文消息
- appid小程序 AppID需验证是否与自身一致
## 回包加密(安全模式下需要加密回包)
如果需要回复非空内容(非 `success`),安全模式下回包也需要加密:
1. 构造 FullStr = random(16B) + msg_len(4B) + msg + appid
2. 用 AESKey 进行 AES-256-CBC 加密PKCS#7 填充K=32
3. Base64 编码得到 Encrypt
4. 生成 MsgSignature将 token、TimeStamp、Nonce、Encrypt 四个参数字典序排序拼接后 SHA1
5. 回包格式JSON
```json
{
"Encrypt": "加密后的密文",
"MsgSignature": "签名",
"TimeStamp": ,
"Nonce": "随机数"
}
```
> 如果只需回复 `success`,无需加密,直接返回纯文本 `success` 即可。
## 代码改动
### 方案 A使用微信官方示例代码推荐
微信官方提供了多种语言的加解密示例代码PHP、Java、C++、Python、C#
建议优先使用官方示例https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html
(页面底部「示例下载」链接)
安装依赖:
```bash
pip install pycryptodome # AES 加解密
```
### 方案 B手动实现解密工具
`apps/backend/app/utils/` 下新建 `wx_crypto.py`
```python
"""
微信消息加解密工具
校验依据:
https://developers.weixin.qq.com/miniprogram/dev/framework/server-ability/message-push.html
PKCS#7 填充说明(微信官方规范):
- K 为秘钥字节数(采用 32
- Buf 为待加密的内容N 为其字节数
- Buf 需要被填充为 K 的整数倍
- 在 Buf 的尾部填充 (K - N%K) 个字节,每个字节的内容是 (K - N%K)
"""
import base64
import hashlib
import struct
from Crypto.Cipher import AES
BLOCK_SIZE = 32 # 微信规范PKCS#7 填充使用 K=32
class WXBizMsgCrypt:
"""微信消息加解密"""
def __init__(self, token: str, encoding_aes_key: str, app_id: str):
self.token = token
self.app_id = app_id
# AESKey = Base64_Decode(EncodingAESKey + "=")
self.aes_key = base64.b64decode(encoding_aes_key + "=")
self.iv = self.aes_key[:16] # 初始向量取密钥前 16 字节
def check_msg_signature(
self, msg_signature: str, timestamp: str, nonce: str, encrypt: str
) -> bool:
"""
安全模式验签。
注意:安全模式下必须用 msg_signature 验签,参与排序的是四个参数:
token、timestamp、nonce、Encrypt包体内的字段
"""
items = sorted([self.token, timestamp, nonce, encrypt])
hash_str = hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
return hash_str == msg_signature
def decrypt(self, encrypt_text: str) -> str:
"""
解密微信推送的加密消息。
返回解密后的消息明文JSON 字符串)。
"""
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.iv)
decrypted = cipher.decrypt(base64.b64decode(encrypt_text))
# 去除 PKCS#7 填充K=32
pad_len = decrypted[-1]
if pad_len < 1 or pad_len > BLOCK_SIZE:
raise ValueError(f"无效的 PKCS#7 填充: {pad_len}")
content = decrypted[:-pad_len]
# FullStr = random(16B) + msg_len(4B) + msg + appid
msg_len = struct.unpack("!I", content[16:20])[0]
msg = content[20 : 20 + msg_len].decode("utf-8")
from_app_id = content[20 + msg_len :].decode("utf-8")
# 校验 AppID
if from_app_id != self.app_id:
raise ValueError(
f"AppID 不匹配: 期望 {self.app_id}, 实际 {from_app_id}"
)
return msg
def encrypt(self, msg: str) -> str:
"""
加密回包消息(安全模式下回复非 success 内容时需要)。
返回 Base64 编码的密文。
"""
import os
# 构造 FullStr = random(16B) + msg_len(4B) + msg + appid
msg_bytes = msg.encode("utf-8")
app_id_bytes = self.app_id.encode("utf-8")
full_str = (
os.urandom(16)
+ struct.pack("!I", len(msg_bytes))
+ msg_bytes
+ app_id_bytes
)
# PKCS#7 填充K=32
pad_len = BLOCK_SIZE - (len(full_str) % BLOCK_SIZE)
full_str += bytes([pad_len] * pad_len)
cipher = AES.new(self.aes_key, AES.MODE_CBC, self.iv)
encrypted = cipher.encrypt(full_str)
return base64.b64encode(encrypted).decode("utf-8")
def generate_msg_signature(
self, timestamp: str, nonce: str, encrypt: str
) -> str:
"""生成回包的 MsgSignature"""
items = sorted([self.token, timestamp, nonce, encrypt])
return hashlib.sha1("".join(items).encode("utf-8")).hexdigest()
```
### 改动 `wx_callback.py` 的 POST 处理
```python
# 在文件顶部新增导入
import json
from app.utils.wx_crypto import WXBizMsgCrypt
from app.config import get
# 初始化解密器(模块级别)
wx_crypt = WXBizMsgCrypt(
token=get("WX_CALLBACK_TOKEN", ""),
encoding_aes_key=get("WX_ENCODING_AES_KEY", ""),
app_id=get("WX_APP_ID", ""),
)
@router.post("/callback")
async def receive_message(
request: Request,
msg_signature: str = Query("", alias="msg_signature"),
signature: str = Query(""),
timestamp: str = Query(""),
nonce: str = Query(""),
encrypt_type: str = Query(""),
):
body = await request.body()
raw = json.loads(body)
# 安全模式:用 msg_signature 验签四参数token + timestamp + nonce + Encrypt
if encrypt_type == "aes" and "Encrypt" in raw:
if not wx_crypt.check_msg_signature(
msg_signature, timestamp, nonce, raw["Encrypt"]
):
logger.warning("安全模式消息推送验签失败")
return Response(content="signature mismatch", status_code=403)
decrypted_str = wx_crypt.decrypt(raw["Encrypt"])
data = json.loads(decrypted_str)
else:
# 明文模式 / 兼容模式:用 signature 验签(三参数)
if not _check_signature(signature, timestamp, nonce):
logger.warning("明文模式消息推送验签失败")
return Response(content="signature mismatch", status_code=403)
data = raw
logger.info(
"收到微信推送: MsgType=%s, Event=%s",
data.get("MsgType", "?"),
data.get("Event", "?"),
)
# TODO: 根据 MsgType/Event 分发处理
# 回复 success 无需加密
return Response(content="success", media_type="text/plain")
```
## 依赖安装
```bash
pip install pycryptodome
```
注意:是 `pycryptodome` 不是 `pycrypto`(后者已废弃)。
## 测试方法
部署后可以用微信开发者工具发送客服消息,或者关注/取关小程序触发事件推送,
查看后端日志确认消息是否正确解密:
```bash
# 在 Windows 服务器上查看后端日志
# 应该能看到 "收到微信推送: MsgType=event, Event=subscribe" 之类的输出
```
微信也提供了调试工具,可在消息推送配置页面使用「请求构造」和「调试工具」进行测试。
## 注意事项
- `EncodingAESKey` 必须和微信后台配置的完全一致,一个字符都不能错
- 如果后续在微信后台重新生成了 `EncodingAESKey`,服务器端也要同步更新并重启
- GET 验签逻辑不需要改动,安全模式只影响 POST 消息体
- PKCS#7 填充的 K 值为 32不是常见的 16这是微信的特殊规范
- 安全模式下验签用 `msg_signature`(四参数),不要用 `signature`(三参数)