20260520更新

This commit is contained in:
liubiren 2026-05-20 22:00:06 +08:00
parent a4c76c576d
commit d656683d9b
2 changed files with 233 additions and 87 deletions

View File

@ -364,11 +364,10 @@ class Request:
status = response.status_code status = response.status_code
# 尝试将异常信息解析为 JSON若非 # 尝试将异常信息解析为 JSON若非
try: try:
response_json = response.json()
# 错误代码 # 错误代码
code = response_json.get("code", 999999) code = response.json().get("code", 999999)
# 错误信息 # 错误信息
message = response_json.get("msg", response.text) message = response.json().get("message", response.text)
except Exception: except Exception:
code, message = 999999, response.text code, message = 999999, response.text
else: else:

View File

@ -4,15 +4,16 @@
""" """
# 列举导入模块 # 列举导入模块
from base64 import b64encode from base64 import b64decode, b64encode
from json import dumps from json import dumps, loads
from pathlib import Path from pathlib import Path
from secrets import token_bytes from secrets import token_bytes
from time import time from time import time_ns
from typing import cast from typing import cast
from typing import Any, Dict from typing import Any, Dict, Literal, Optional, Tuple
from uuid import uuid4 from uuid import uuid4
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import padding
@ -21,94 +22,240 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.serialization import load_pem_public_key from cryptography.hazmat.primitives.serialization import load_pem_public_key
import sys import sys
sys.path.append(Path(__file__).parent.parent.as_posix()) sys.path.append(Path(__file__).parent.parent.as_posix())
from utils.request import Request from utils.request import Request
def encrypt(payload: Dict[str, Any]) -> Dict[str, Any]: class Encryptor:
""" """
加密载荷 加密器
:param payload: 载荷
:return: 加密后载荷
""" """
pem_path = Path(__file__).parent / "rsa_public_key.pem"
if not pem_path.exists():
raise FileNotFoundError("RSA 公钥 PEM 文件不存在")
# RSA 公钥
rsa_public_key = pem_path.read_text(encoding="utf-8")
# 实例 RSA 加密器 def __init__(self, rsa_certificate_path: Optional[Path] = None):
rsa_encryptor = cast( """
rsa.RSAPublicKey, 初始化
load_pem_public_key( :param rsa_certificate_path: RSA 加密证书路径默认为 rsa_public_key.pem
data=rsa_public_key.encode("utf-8"), backend=default_backend() """
), # RSA 加密证书路径
) if not rsa_certificate_path:
# 生成 AES-256 密钥 rsa_certificate_path = Path(__file__).parent / "rsa_public_key.pem"
aes_key = token_bytes(32) if not rsa_certificate_path.exists():
# 使用 RSA 公钥加密 AES-256 密钥 raise FileNotFoundError("RSA 加密证书不存在")
aes_key_encrypted = rsa_encryptor.encrypt( # 实例 RSA 加密器
plaintext=aes_key, self._initialize_rsa_encryptor(rsa_certificate_path=rsa_certificate_path)
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
aes_key_encoded = b64encode(aes_key_encrypted).decode() # BASE64 编码
# 初始向量 def _initialize_rsa_encryptor(self, rsa_certificate_path: Path) -> None:
iv = token_bytes(12) """
iv_encoded = b64encode(iv).decode() # BASE64 编码 实例 RSA 加密器
"""
# RSA 公钥
rsa_public_key = rsa_certificate_path.read_text(encoding="utf-8")
# 实例 AES-GCM 加密器 # 实例 RSA 加密器
aes_gcm_encryptor = Cipher( self.rsa_encryptor = cast(
algorithm=algorithms.AES(aes_key), mode=modes.GCM(iv), backend=default_backend() rsa.RSAPublicKey,
).encryptor() load_pem_public_key(
data=rsa_public_key.encode("utf-8"), backend=default_backend()
# 时间戳和随机码 ),
timestamp, nonce = int(time()), uuid4().hex.lower()
# 附加认证
aes_gcm_encryptor.authenticate_additional_data(
data=f"timestamp={timestamp}&nonce={nonce}".encode("utf-8")
)
ciphertext = (
aes_gcm_encryptor.update(
data=dumps(payload, ensure_ascii=False).encode("utf-8")
) )
+ aes_gcm_encryptor.finalize()
def encrypt(
self,
payload: Dict[str, Any],
fields_mapping: Dict[str, str] = {},
strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm",
) -> Dict[str, Any]:
"""
加密载荷
:param payload: 载荷
:param fields_mapping: 字段映射表默认为空字典不进行字段映射
:param strategy: 加密策略默认为 rsa+aes-gcm
:return: 请求体
"""
match strategy:
case "rsa+aes-gcm":
# AES-256 密钥,初始化向量,毫秒级时间戳,随机数
self.aes_key, self.initialization_vector, self.timestamp, self.nonce = (
token_bytes(32),
token_bytes(12),
time_ns()
// 1_000_000, # time() 返回为浮点,转为毫秒级时间戳会出现精度丢失
uuid4().hex.lower(),
)
# AES 加密
payload_encoded, tag_encoded = self._aes_encrypt(payload=payload)
request_body = {
"aes_key_encoded": self._rsa_encrypt(aes_key=self.aes_key),
"initialization_vector_encoded": b64encode(
s=self.initialization_vector
).decode(),
"timestamp": self.timestamp,
"nonce": self.nonce,
"payload_encoded": payload_encoded,
"tag_encoded": tag_encoded,
}
case _:
raise ValueError(f"暂不支持该加密策略: {strategy}")
# 若指定字段映射表则按照字段映射表返回
if fields_mapping:
return {
value: request_body.get(key) for key, value in fields_mapping.items()
}
else:
return request_body
def _rsa_encrypt(self, aes_key: bytes) -> str:
"""
RSA 加密
:param aes_key: AES-256
:return: BASE64 编码后密文
"""
# 使用 RSA 公钥加密明文
ciphertext = self.rsa_encryptor.encrypt(
plaintext=aes_key,
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
return b64encode(s=ciphertext).decode() # BASE64 编码
def _aes_encrypt(self, payload: Dict[str, Any]) -> Tuple[str, str]:
"""
AES 加密
:param payload: 载荷
:return: BASE64 编码加密载荷和标签
"""
# 实例 AES 加密器
aes_encryptor = Cipher(
algorithm=algorithms.AES(self.aes_key),
mode=modes.GCM(initialization_vector=self.initialization_vector),
backend=default_backend(),
).encryptor()
# 附加认证
aes_encryptor.authenticate_additional_data(
data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8")
)
# 加密载荷
payload_encrypted = (
aes_encryptor.update(
data=dumps(obj=payload, ensure_ascii=False).encode("utf-8")
)
+ aes_encryptor.finalize()
)
return (
b64encode(s=payload_encrypted).decode(),
b64encode(s=aes_encryptor.tag).decode(),
) # 就加密载荷和标签 BASE64 编码
def decrypt(
self,
initialization_vector_encoded: str,
tag_encoded: str,
payload_encoded: str,
strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm",
) -> Dict[str, Any]:
"""
解密载荷
:param initialization_vector_encoded: BASE64 编码初始化向量
:param tag_encoded: BASE64 编码标签
:param payload_encoded: BASE64 编码加密载荷
:param strategy: 解密策略
:return: 载荷
"""
match strategy:
case "rsa+aes-gcm":
return self._aes_decrypt(
initialization_vector_encoded=initialization_vector_encoded,
tag_encoded=tag_encoded,
payload_encoded=payload_encoded,
)
case _:
raise ValueError(f"暂不支持该解密策略: {strategy}")
def _aes_decrypt(
self,
initialization_vector_encoded: str,
tag_encoded: str,
payload_encoded: str,
) -> Dict[str, Any]:
"""
AES 解密
:param payload_encoded: BASE64 编码加密载荷
:param iv_encoded: BASE64 编码初始化向量
:param tag_encoded: BASE64 编码标签
:return: 载荷
"""
# 实例 AES 解密器
aes_decryptor = Cipher(
algorithm=algorithms.AES(self.aes_key),
mode=modes.GCM(
initialization_vector=b64decode(s=initialization_vector_encoded),
tag=b64decode(s=tag_encoded),
),
backend=default_backend(),
).decryptor()
# AES-GCM 解密器附加认证
aes_decryptor.authenticate_additional_data(
data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8")
)
try:
return loads(
s=(
aes_decryptor.update(b64decode(s=payload_encoded))
+ aes_decryptor.finalize()
).decode("utf-8")
)
except InvalidTag:
raise ValueError("AES 验签发生异常")
except Exception as exception:
raise ValueError(f"AES 解密发生异常: {str(exception)}") from exception
if __name__ == "__main__":
# 实例加密器
encryptor = Encryptor()
# 实例请求客户端
request = Request() # 不使用缓存
response = request.post(
url="http://192.168.3.103:30380/api/v1/riskgateway/product/invokeSync.json",
headers={
"Authorization": "Bearer C52FB4D10BC424D9F",
"Content-Type": "application/json;charset=utf-8",
},
json=encryptor.encrypt(
payload={
"productId": "BANK_CARD_4",
"name": "刘弼仁",
"idNumber": "131002198705020000",
"bankCard": "1234567890123456",
"phone": "18058798752",
},
fields_mapping={
"aes_key_encoded": "encryptedAesKey",
"initialization_vector_encoded": "iv",
"payload_encoded": "encryptedPayload",
"tag_encoded": "tag",
"timestamp": "timestamp",
"nonce": "nonce",
}, # 加密器 AES-GCM 请求体和请求字段映射表
),
) )
ciphertext_encoded = b64encode(ciphertext).decode() # BASE64 编码 if response.get("code") != "GW_SUCCESS_000":
raise RuntimeError(
tag = aes_gcm_encryptor.tag f"请求失败: {response.get("status")} {response.get("code")} {response.get("message")}"
tag_encoded = b64encode(tag).decode() # BASE64 编码 )
# 解密响应体
return { data = encryptor.decrypt(
"encryptedAesKey": aes_key_encoded, payload_encoded=response.get("data").get("encryptedPayload"),
"iv": iv_encoded, initialization_vector_encoded=response.get("data").get("iv"),
"timestamp": timestamp, tag_encoded=response.get("data").get("tag"),
"nonce": nonce, )
"encryptedPayload": ciphertext_encoded, print(data)
"tag": tag_encoded,
}
request = Request() # 不使用缓存
response = request.post(
url="http://192.168.3.103:30380/",
headers={
"Authorization": "Bearer C52FB4D10BC424D9F",
"Content-Type": "application/json;charset=utf-8",
},
json=encrypt(
payload={
"productId": "BANK_CARD_4",
"name": "刘弼仁",
"idNumber": "131002198705020000",
"bankCard": "1234567890123456",
"phone": "18058798752",
}
),
)
print(response)