diff --git a/utils/request.py b/utils/request.py index a77463c..802b158 100644 --- a/utils/request.py +++ b/utils/request.py @@ -364,11 +364,10 @@ class Request: status = response.status_code # 尝试将异常信息解析为 JSON,若非 try: - response_json = response.json() # 错误代码 - code = response_json.get("code", 999999) + code = response.json().get("code", 999999) # 错误信息 - message = response_json.get("msg", response.text) + message = response.json().get("message", response.text) except Exception: code, message = 999999, response.text else: diff --git a/安全加密请求/main.py b/安全加密请求/main.py index 89736e8..99f2c87 100644 --- a/安全加密请求/main.py +++ b/安全加密请求/main.py @@ -4,15 +4,16 @@ """ # 列举导入模块 -from base64 import b64encode -from json import dumps +from base64 import b64decode, b64encode +from json import dumps, loads from pathlib import Path from secrets import token_bytes -from time import time +from time import time_ns from typing import cast -from typing import Any, Dict +from typing import Any, Dict, Literal, Optional, Tuple from uuid import uuid4 +from cryptography.exceptions import InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding @@ -21,94 +22,240 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.serialization import load_pem_public_key import sys + sys.path.append(Path(__file__).parent.parent.as_posix()) + from utils.request import Request -def encrypt(payload: Dict[str, Any]) -> Dict[str, Any]: +class Encryptor: """ - 加密载荷 - :param payload: 载荷 - :return: 加密后载荷 + 加密器 """ - pem_path = Path(__file__).parent / "rsa_public_key.pem" - if not pem_path.exists(): - raise FileNotFoundError("RSA 公钥 PEM 文件不存在") - # RSA 公钥 - rsa_public_key = pem_path.read_text(encoding="utf-8") - # 实例 RSA 加密器 - rsa_encryptor = cast( - rsa.RSAPublicKey, - load_pem_public_key( - data=rsa_public_key.encode("utf-8"), backend=default_backend() - ), - ) - # 生成 AES-256 密钥 - aes_key = token_bytes(32) - # 使用 RSA 公钥加密 AES-256 密钥 - aes_key_encrypted = rsa_encryptor.encrypt( - plaintext=aes_key, - padding=padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA256()), - algorithm=hashes.SHA256(), - label=None, - ), - ) - aes_key_encoded = b64encode(aes_key_encrypted).decode() # BASE64 编码 + def __init__(self, rsa_certificate_path: Optional[Path] = None): + """ + 初始化 + :param rsa_certificate_path: RSA 加密证书路径,默认为 rsa_public_key.pem + """ + # RSA 加密证书路径 + if not rsa_certificate_path: + rsa_certificate_path = Path(__file__).parent / "rsa_public_key.pem" + if not rsa_certificate_path.exists(): + raise FileNotFoundError("RSA 加密证书不存在") + # 实例 RSA 加密器 + self._initialize_rsa_encryptor(rsa_certificate_path=rsa_certificate_path) - # 初始向量 - iv = token_bytes(12) - iv_encoded = b64encode(iv).decode() # BASE64 编码 + def _initialize_rsa_encryptor(self, rsa_certificate_path: Path) -> None: + """ + 实例 RSA 加密器 + """ + # RSA 公钥 + rsa_public_key = rsa_certificate_path.read_text(encoding="utf-8") - # 实例 AES-GCM 加密器 - aes_gcm_encryptor = Cipher( - algorithm=algorithms.AES(aes_key), mode=modes.GCM(iv), backend=default_backend() - ).encryptor() - - # 时间戳和随机码 - timestamp, nonce = int(time()), uuid4().hex.lower() - # 附加认证 - aes_gcm_encryptor.authenticate_additional_data( - data=f"timestamp={timestamp}&nonce={nonce}".encode("utf-8") - ) - - ciphertext = ( - aes_gcm_encryptor.update( - data=dumps(payload, ensure_ascii=False).encode("utf-8") + # 实例 RSA 加密器 + self.rsa_encryptor = cast( + rsa.RSAPublicKey, + load_pem_public_key( + data=rsa_public_key.encode("utf-8"), backend=default_backend() + ), ) - + aes_gcm_encryptor.finalize() + + def encrypt( + self, + payload: Dict[str, Any], + fields_mapping: Dict[str, str] = {}, + strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm", + ) -> Dict[str, Any]: + """ + 加密载荷 + :param payload: 载荷 + :param fields_mapping: 字段映射表,默认为空字典(不进行字段映射) + :param strategy: 加密策略,默认为 rsa+aes-gcm + :return: 请求体 + """ + match strategy: + case "rsa+aes-gcm": + # AES-256 密钥,初始化向量,毫秒级时间戳,随机数 + self.aes_key, self.initialization_vector, self.timestamp, self.nonce = ( + token_bytes(32), + token_bytes(12), + time_ns() + // 1_000_000, # time() 返回为浮点,转为毫秒级时间戳会出现精度丢失 + uuid4().hex.lower(), + ) + # AES 加密 + payload_encoded, tag_encoded = self._aes_encrypt(payload=payload) + request_body = { + "aes_key_encoded": self._rsa_encrypt(aes_key=self.aes_key), + "initialization_vector_encoded": b64encode( + s=self.initialization_vector + ).decode(), + "timestamp": self.timestamp, + "nonce": self.nonce, + "payload_encoded": payload_encoded, + "tag_encoded": tag_encoded, + } + case _: + raise ValueError(f"暂不支持该加密策略: {strategy}") + + # 若指定字段映射表则按照字段映射表返回 + if fields_mapping: + return { + value: request_body.get(key) for key, value in fields_mapping.items() + } + else: + return request_body + + def _rsa_encrypt(self, aes_key: bytes) -> str: + """ + RSA 加密 + :param aes_key: AES-256 + :return: BASE64 编码后密文 + """ + # 使用 RSA 公钥加密明文 + ciphertext = self.rsa_encryptor.encrypt( + plaintext=aes_key, + padding=padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + return b64encode(s=ciphertext).decode() # BASE64 编码 + + def _aes_encrypt(self, payload: Dict[str, Any]) -> Tuple[str, str]: + """ + AES 加密 + :param payload: 载荷 + :return: BASE64 编码加密载荷和标签 + """ + # 实例 AES 加密器 + aes_encryptor = Cipher( + algorithm=algorithms.AES(self.aes_key), + mode=modes.GCM(initialization_vector=self.initialization_vector), + backend=default_backend(), + ).encryptor() + # 附加认证 + aes_encryptor.authenticate_additional_data( + data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8") + ) + + # 加密载荷 + payload_encrypted = ( + aes_encryptor.update( + data=dumps(obj=payload, ensure_ascii=False).encode("utf-8") + ) + + aes_encryptor.finalize() + ) + return ( + b64encode(s=payload_encrypted).decode(), + b64encode(s=aes_encryptor.tag).decode(), + ) # 就加密载荷和标签 BASE64 编码 + + def decrypt( + self, + initialization_vector_encoded: str, + tag_encoded: str, + payload_encoded: str, + strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm", + ) -> Dict[str, Any]: + """ + 解密载荷 + :param initialization_vector_encoded: BASE64 编码初始化向量 + :param tag_encoded: BASE64 编码标签 + :param payload_encoded: BASE64 编码加密载荷 + :param strategy: 解密策略 + :return: 载荷 + """ + match strategy: + case "rsa+aes-gcm": + return self._aes_decrypt( + initialization_vector_encoded=initialization_vector_encoded, + tag_encoded=tag_encoded, + payload_encoded=payload_encoded, + ) + case _: + raise ValueError(f"暂不支持该解密策略: {strategy}") + + def _aes_decrypt( + self, + initialization_vector_encoded: str, + tag_encoded: str, + payload_encoded: str, + ) -> Dict[str, Any]: + """ + AES 解密 + :param payload_encoded: BASE64 编码加密载荷 + :param iv_encoded: BASE64 编码初始化向量 + :param tag_encoded: BASE64 编码标签 + :return: 载荷 + """ + # 实例 AES 解密器 + aes_decryptor = Cipher( + algorithm=algorithms.AES(self.aes_key), + mode=modes.GCM( + initialization_vector=b64decode(s=initialization_vector_encoded), + tag=b64decode(s=tag_encoded), + ), + backend=default_backend(), + ).decryptor() + # AES-GCM 解密器附加认证 + aes_decryptor.authenticate_additional_data( + data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8") + ) + try: + return loads( + s=( + aes_decryptor.update(b64decode(s=payload_encoded)) + + aes_decryptor.finalize() + ).decode("utf-8") + ) + except InvalidTag: + raise ValueError("AES 验签发生异常") + except Exception as exception: + raise ValueError(f"AES 解密发生异常: {str(exception)}") from exception + + +if __name__ == "__main__": + + # 实例加密器 + encryptor = Encryptor() + + # 实例请求客户端 + request = Request() # 不使用缓存 + response = request.post( + url="http://192.168.3.103:30380/api/v1/riskgateway/product/invokeSync.json", + headers={ + "Authorization": "Bearer C52FB4D10BC424D9F", + "Content-Type": "application/json;charset=utf-8", + }, + json=encryptor.encrypt( + payload={ + "productId": "BANK_CARD_4", + "name": "刘弼仁", + "idNumber": "131002198705020000", + "bankCard": "1234567890123456", + "phone": "18058798752", + }, + fields_mapping={ + "aes_key_encoded": "encryptedAesKey", + "initialization_vector_encoded": "iv", + "payload_encoded": "encryptedPayload", + "tag_encoded": "tag", + "timestamp": "timestamp", + "nonce": "nonce", + }, # 加密器 AES-GCM 请求体和请求字段映射表 + ), ) - ciphertext_encoded = b64encode(ciphertext).decode() # BASE64 编码 - - tag = aes_gcm_encryptor.tag - tag_encoded = b64encode(tag).decode() # BASE64 编码 - - return { - "encryptedAesKey": aes_key_encoded, - "iv": iv_encoded, - "timestamp": timestamp, - "nonce": nonce, - "encryptedPayload": ciphertext_encoded, - "tag": tag_encoded, - } - - -request = Request() # 不使用缓存 -response = request.post( - url="http://192.168.3.103:30380/", - headers={ - "Authorization": "Bearer C52FB4D10BC424D9F", - "Content-Type": "application/json;charset=utf-8", - }, - json=encrypt( - payload={ - "productId": "BANK_CARD_4", - "name": "刘弼仁", - "idNumber": "131002198705020000", - "bankCard": "1234567890123456", - "phone": "18058798752", - } - ), -) -print(response) \ No newline at end of file + if response.get("code") != "GW_SUCCESS_000": + raise RuntimeError( + f"请求失败: {response.get("status")} {response.get("code")} {response.get("message")}" + ) + # 解密响应体 + data = encryptor.decrypt( + payload_encoded=response.get("data").get("encryptedPayload"), + initialization_vector_encoded=response.get("data").get("iv"), + tag_encoded=response.get("data").get("tag"), + ) + print(data)