# -*- coding: utf-8 -*- """ 主运行模块 """ # 列举导入模块 from base64 import b64decode, b64encode from json import dumps, loads from pathlib import Path from secrets import token_bytes import sys from time import time_ns from typing import cast from typing import Any, Dict, Literal, Optional, Tuple from uuid import uuid4 from cryptography.exceptions import InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.serialization import load_pem_public_key sys.path.append(Path(__file__).parent.parent.as_posix()) from utils.request import Request class Encryptor: """ 加密器 """ def __init__(self, rsa_certificate_path: Optional[Path] = None): """ 初始化 :param rsa_certificate_path: RSA 加密证书路径,默认为 rsa_public_key.pem """ # RSA 加密证书路径 if not rsa_certificate_path: rsa_certificate_path = Path(__file__).parent / "rsa_public_key.pem" if not rsa_certificate_path.exists(): raise FileNotFoundError("RSA 加密证书不存在") # 实例 RSA 加密器 self._initialize_rsa_encryptor(rsa_certificate_path=rsa_certificate_path) def _initialize_rsa_encryptor(self, rsa_certificate_path: Path) -> None: """ 实例 RSA 加密器 """ # RSA 公钥 rsa_public_key = rsa_certificate_path.read_text(encoding="utf-8") # 实例 RSA 加密器 self.rsa_encryptor = cast( rsa.RSAPublicKey, load_pem_public_key( data=rsa_public_key.encode("utf-8"), backend=default_backend() ), ) def encrypt( self, payload: Dict[str, Any], fields_mapping: Dict[str, str] = {}, strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm", ) -> Dict[str, Any]: """ 加密载荷 :param payload: 载荷 :param fields_mapping: 字段映射表,默认为空字典(不进行字段映射) :param strategy: 加密策略,默认为 rsa+aes-gcm :return: 请求体 """ match strategy: case "rsa+aes-gcm": # AES-256 密钥,初始化向量,毫秒级时间戳,随机数 self.aes_key, self.initialization_vector, self.timestamp, self.nonce = ( token_bytes(32), token_bytes(12), time_ns() // 1_000_000, # time() 返回为浮点,转为毫秒级时间戳会出现精度丢失 uuid4().hex.lower(), ) # AES 加密 payload_encoded, tag_encoded = self._aes_encrypt(payload=payload) request_body = { "aes_key_encoded": self._rsa_encrypt(aes_key=self.aes_key), "initialization_vector_encoded": b64encode( s=self.initialization_vector ).decode(), "timestamp": self.timestamp, "nonce": self.nonce, "payload_encoded": payload_encoded, "tag_encoded": tag_encoded, } case _: raise ValueError(f"暂不支持该加密策略: {strategy}") # 若指定字段映射表则按照字段映射表返回 if fields_mapping: return { value: request_body.get(key) for key, value in fields_mapping.items() } else: return request_body def _rsa_encrypt(self, aes_key: bytes) -> str: """ RSA 加密 :param aes_key: AES-256 :return: BASE64 编码后密文 """ # 使用 RSA 公钥加密明文 ciphertext = self.rsa_encryptor.encrypt( plaintext=aes_key, padding=padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None, ), ) return b64encode(s=ciphertext).decode() # BASE64 编码 def _aes_encrypt(self, payload: Dict[str, Any]) -> Tuple[str, str]: """ AES 加密 :param payload: 载荷 :return: BASE64 编码加密载荷和标签 """ # 实例 AES 加密器 aes_encryptor = Cipher( algorithm=algorithms.AES(self.aes_key), mode=modes.GCM(initialization_vector=self.initialization_vector), backend=default_backend(), ).encryptor() # 附加认证 aes_encryptor.authenticate_additional_data( data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8") ) # 加密载荷 payload_encrypted = ( aes_encryptor.update( data=dumps(obj=payload, ensure_ascii=False).encode("utf-8") ) + aes_encryptor.finalize() ) return ( b64encode(s=payload_encrypted).decode(), b64encode(s=aes_encryptor.tag).decode(), ) # 就加密载荷和标签 BASE64 编码 def decrypt( self, initialization_vector_encoded: str, tag_encoded: str, payload_encoded: str, strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm", ) -> Dict[str, Any]: """ 解密载荷 :param initialization_vector_encoded: BASE64 编码初始化向量 :param tag_encoded: BASE64 编码标签 :param payload_encoded: BASE64 编码加密载荷 :param strategy: 解密策略 :return: 载荷 """ match strategy: case "rsa+aes-gcm": return self._aes_decrypt( initialization_vector_encoded=initialization_vector_encoded, tag_encoded=tag_encoded, payload_encoded=payload_encoded, ) case _: raise ValueError(f"暂不支持该解密策略: {strategy}") def _aes_decrypt( self, initialization_vector_encoded: str, tag_encoded: str, payload_encoded: str, ) -> Dict[str, Any]: """ AES 解密 :param payload_encoded: BASE64 编码加密载荷 :param iv_encoded: BASE64 编码初始化向量 :param tag_encoded: BASE64 编码标签 :return: 载荷 """ # 实例 AES 解密器 aes_decryptor = Cipher( algorithm=algorithms.AES(self.aes_key), mode=modes.GCM( initialization_vector=b64decode(s=initialization_vector_encoded), tag=b64decode(s=tag_encoded), ), backend=default_backend(), ).decryptor() # AES-GCM 解密器附加认证 aes_decryptor.authenticate_additional_data( data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8") ) try: return loads( s=( aes_decryptor.update(b64decode(s=payload_encoded)) + aes_decryptor.finalize() ).decode("utf-8") ) except InvalidTag: raise ValueError("AES 验签发生异常") except Exception as exception: raise ValueError(f"AES 解密发生异常: {str(exception)}") from exception if __name__ == "__main__": # 实例加密器 encryptor = Encryptor() # 杭州汤然请求同步响应 # https://risk-gw.pangjukeji.com/api/v1/riskgateway/product/invokeSync.json # 杭州汤然请求异步响应(创建任务) # https://risk-gw.pangjukeji.com/api/v1/riskgateway/product/submitAsyncTask.json # 杭州汤然请求异步响应(查询任务) # https://risk-gw.pangjukeji.com/api/v1/riskgateway/result/queryAsyncResult.json # 实例请求客户端 request = Request() # 不使用缓存 response = request.post( url="https://risk-gw.pangjukeji.com/api/v1/riskgateway/result/queryAsyncResult.json", headers={ "Authorization": "Bearer 337162980141699072", "Content-Type": "application/json;charset=utf-8", }, json=encryptor.encrypt( payload={ "recordId": "337895642829557760", }, fields_mapping={ "aes_key_encoded": "encryptedAesKey", "initialization_vector_encoded": "iv", "payload_encoded": "encryptedPayload", "tag_encoded": "tag", "timestamp": "timestamp", "nonce": "nonce", }, # 加密器 AES-GCM 请求体和请求字段映射表 ), ) if response.get("code") != "GW_SUCCESS_000": raise RuntimeError( f"请求失败: {response.get("status")} {response.get("code")} {response.get("message")}" ) print(response) # 解密响应体 data = encryptor.decrypt( payload_encoded=response.get("data").get("encryptedPayload"), initialization_vector_encoded=response.get("data").get("iv"), tag_encoded=response.get("data").get("tag"), ) print(data) """ { "productId": "CREDIT_RISK_PRE_SCREEN", "mobilephone": "13833652839", "certNo": "131002196212124620" } { "productId": "BANK_CARD_4", "name": "刘弼仁", "certNo": "131002198705024619", "mobilephone": "18058798752", "bankCardNo": "6228480328744527172", }, { "productId": "SOCIAL_CAPACITY", "name": "刘弼仁", "certNo": "131002198705024619", "mobilephone": "18058798752", }, { "productId": "CREDIT_RISK_PRE_SCREEN", "mobilephone": "13833652939", "certNo": "131002196212124620" }, { "productId": "CAR_ACCIDENT_DETAIL", "vin": "L6T7712Z2GN076782", } """