# -*- coding: utf-8 -*- """ 主运行模块 """ # 列举导入模块 from base64 import b64encode from json import dumps from pathlib import Path from secrets import token_bytes from time import time from typing import cast from typing import Any, Dict from uuid import uuid4 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.serialization import load_pem_public_key import sys sys.path.append(Path(__file__).parent.parent.as_posix()) from utils.request import Request def encrypt(payload: Dict[str, Any]) -> Dict[str, Any]: """ 加密载荷 :param payload: 载荷 :return: 加密后载荷 """ pem_path = Path(__file__).parent / "rsa_public_key.pem" if not pem_path.exists(): raise FileNotFoundError("RSA 公钥 PEM 文件不存在") # RSA 公钥 rsa_public_key = pem_path.read_text(encoding="utf-8") # 实例 RSA 加密器 rsa_encryptor = cast( rsa.RSAPublicKey, load_pem_public_key( data=rsa_public_key.encode("utf-8"), backend=default_backend() ), ) # 生成 AES-256 密钥 aes_key = token_bytes(32) # 使用 RSA 公钥加密 AES-256 密钥 aes_key_encrypted = rsa_encryptor.encrypt( plaintext=aes_key, padding=padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None, ), ) aes_key_encoded = b64encode(aes_key_encrypted).decode() # BASE64 编码 # 初始向量 iv = token_bytes(12) iv_encoded = b64encode(iv).decode() # BASE64 编码 # 实例 AES-GCM 加密器 aes_gcm_encryptor = Cipher( algorithm=algorithms.AES(aes_key), mode=modes.GCM(iv), backend=default_backend() ).encryptor() # 时间戳和随机码 timestamp, nonce = int(time()), uuid4().hex.lower() # 附加认证 aes_gcm_encryptor.authenticate_additional_data( data=f"timestamp={timestamp}&nonce={nonce}".encode("utf-8") ) ciphertext = ( aes_gcm_encryptor.update( data=dumps(payload, ensure_ascii=False).encode("utf-8") ) + aes_gcm_encryptor.finalize() ) ciphertext_encoded = b64encode(ciphertext).decode() # BASE64 编码 tag = aes_gcm_encryptor.tag tag_encoded = b64encode(tag).decode() # BASE64 编码 return { "encryptedAesKey": aes_key_encoded, "iv": iv_encoded, "timestamp": timestamp, "nonce": nonce, "encryptedPayload": ciphertext_encoded, "tag": tag_encoded, } request = Request() # 不使用缓存 response = request.post( url="http://192.168.3.103:30380/", headers={ "Authorization": "Bearer C52FB4D10BC424D9F", "Content-Type": "application/json;charset=utf-8", }, json=encrypt( payload={ "productId": "BANK_CARD_4", "name": "刘弼仁", "idNumber": "131002198705020000", "bankCard": "1234567890123456", "phone": "18058798752", } ), ) print(response)