Python/安全加密请求/main.py

275 lines
9.2 KiB
Python

# -*- coding: utf-8 -*-
"""
主运行模块
"""
# 列举导入模块
from base64 import b64decode, b64encode
from json import dumps, loads
from pathlib import Path
from secrets import token_bytes
from time import time_ns
from typing import cast
from typing import Any, Dict, Literal, Optional, Tuple
from uuid import uuid4
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.serialization import load_pem_public_key
import sys
sys.path.append(Path(__file__).parent.parent.as_posix())
from utils.request import Request
class Encryptor:
"""
加密器
"""
def __init__(self, rsa_certificate_path: Optional[Path] = None):
"""
初始化
:param rsa_certificate_path: RSA 加密证书路径,默认为 rsa_public_key.pem
"""
# RSA 加密证书路径
if not rsa_certificate_path:
rsa_certificate_path = Path(__file__).parent / "rsa_public_key.pem"
if not rsa_certificate_path.exists():
raise FileNotFoundError("RSA 加密证书不存在")
# 实例 RSA 加密器
self._initialize_rsa_encryptor(rsa_certificate_path=rsa_certificate_path)
def _initialize_rsa_encryptor(self, rsa_certificate_path: Path) -> None:
"""
实例 RSA 加密器
"""
# RSA 公钥
rsa_public_key = rsa_certificate_path.read_text(encoding="utf-8")
# 实例 RSA 加密器
self.rsa_encryptor = cast(
rsa.RSAPublicKey,
load_pem_public_key(
data=rsa_public_key.encode("utf-8"), backend=default_backend()
),
)
def encrypt(
self,
payload: Dict[str, Any],
fields_mapping: Dict[str, str] = {},
strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm",
) -> Dict[str, Any]:
"""
加密载荷
:param payload: 载荷
:param fields_mapping: 字段映射表,默认为空字典(不进行字段映射)
:param strategy: 加密策略,默认为 rsa+aes-gcm
:return: 请求体
"""
match strategy:
case "rsa+aes-gcm":
# AES-256 密钥,初始化向量,毫秒级时间戳,随机数
self.aes_key, self.initialization_vector, self.timestamp, self.nonce = (
token_bytes(32),
token_bytes(12),
time_ns()
// 1_000_000, # time() 返回为浮点,转为毫秒级时间戳会出现精度丢失
uuid4().hex.lower(),
)
# AES 加密
payload_encoded, tag_encoded = self._aes_encrypt(payload=payload)
request_body = {
"aes_key_encoded": self._rsa_encrypt(aes_key=self.aes_key),
"initialization_vector_encoded": b64encode(
s=self.initialization_vector
).decode(),
"timestamp": self.timestamp,
"nonce": self.nonce,
"payload_encoded": payload_encoded,
"tag_encoded": tag_encoded,
}
case _:
raise ValueError(f"暂不支持该加密策略: {strategy}")
# 若指定字段映射表则按照字段映射表返回
if fields_mapping:
return {
value: request_body.get(key) for key, value in fields_mapping.items()
}
else:
return request_body
def _rsa_encrypt(self, aes_key: bytes) -> str:
"""
RSA 加密
:param aes_key: AES-256
:return: BASE64 编码后密文
"""
# 使用 RSA 公钥加密明文
ciphertext = self.rsa_encryptor.encrypt(
plaintext=aes_key,
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
return b64encode(s=ciphertext).decode() # BASE64 编码
def _aes_encrypt(self, payload: Dict[str, Any]) -> Tuple[str, str]:
"""
AES 加密
:param payload: 载荷
:return: BASE64 编码加密载荷和标签
"""
# 实例 AES 加密器
aes_encryptor = Cipher(
algorithm=algorithms.AES(self.aes_key),
mode=modes.GCM(initialization_vector=self.initialization_vector),
backend=default_backend(),
).encryptor()
# 附加认证
aes_encryptor.authenticate_additional_data(
data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8")
)
# 加密载荷
payload_encrypted = (
aes_encryptor.update(
data=dumps(obj=payload, ensure_ascii=False).encode("utf-8")
)
+ aes_encryptor.finalize()
)
return (
b64encode(s=payload_encrypted).decode(),
b64encode(s=aes_encryptor.tag).decode(),
) # 就加密载荷和标签 BASE64 编码
def decrypt(
self,
initialization_vector_encoded: str,
tag_encoded: str,
payload_encoded: str,
strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm",
) -> Dict[str, Any]:
"""
解密载荷
:param initialization_vector_encoded: BASE64 编码初始化向量
:param tag_encoded: BASE64 编码标签
:param payload_encoded: BASE64 编码加密载荷
:param strategy: 解密策略
:return: 载荷
"""
match strategy:
case "rsa+aes-gcm":
return self._aes_decrypt(
initialization_vector_encoded=initialization_vector_encoded,
tag_encoded=tag_encoded,
payload_encoded=payload_encoded,
)
case _:
raise ValueError(f"暂不支持该解密策略: {strategy}")
def _aes_decrypt(
self,
initialization_vector_encoded: str,
tag_encoded: str,
payload_encoded: str,
) -> Dict[str, Any]:
"""
AES 解密
:param payload_encoded: BASE64 编码加密载荷
:param iv_encoded: BASE64 编码初始化向量
:param tag_encoded: BASE64 编码标签
:return: 载荷
"""
# 实例 AES 解密器
aes_decryptor = Cipher(
algorithm=algorithms.AES(self.aes_key),
mode=modes.GCM(
initialization_vector=b64decode(s=initialization_vector_encoded),
tag=b64decode(s=tag_encoded),
),
backend=default_backend(),
).decryptor()
# AES-GCM 解密器附加认证
aes_decryptor.authenticate_additional_data(
data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8")
)
try:
return loads(
s=(
aes_decryptor.update(b64decode(s=payload_encoded))
+ aes_decryptor.finalize()
).decode("utf-8")
)
except InvalidTag:
raise ValueError("AES 验签发生异常")
except Exception as exception:
raise ValueError(f"AES 解密发生异常: {str(exception)}") from exception
if __name__ == "__main__":
# 实例加密器
encryptor = Encryptor()
# 实例请求客户端
request = Request() # 不使用缓存
response = request.post(
url="https://risk-gw.pangjukeji.com/api/v1/riskgateway/product/invokeSync.json",
headers={
"Authorization": "Bearer 337162980141699072",
"Content-Type": "application/json;charset=utf-8",
},
json=encryptor.encrypt(
payload={
"productId": "BANK_CARD_4",
"name": "刘弼仁",
"certNo": "131002198705024619",
"mobilephone": "18058798752",
"bankCardNo": "6214835712066453",
},
fields_mapping={
"aes_key_encoded": "encryptedAesKey",
"initialization_vector_encoded": "iv",
"payload_encoded": "encryptedPayload",
"tag_encoded": "tag",
"timestamp": "timestamp",
"nonce": "nonce",
}, # 加密器 AES-GCM 请求体和请求字段映射表
),
)
if response.get("code") != "GW_SUCCESS_000":
raise RuntimeError(
f"请求失败: {response.get("status")} {response.get("code")} {response.get("message")}"
)
# 解密响应体
data = encryptor.decrypt(
payload_encoded=response.get("data").get("encryptedPayload"),
initialization_vector_encoded=response.get("data").get("iv"),
tag_encoded=response.get("data").get("tag"),
)
print(data)
"""
{
"productId": "CREDIT_RISK_PRE_SCREEN",
"mobilephone": "13833652839",
"certNo": "131002196212124620"
}
"""