295 lines
10 KiB
Python
295 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
主运行模块
|
|
"""
|
|
|
|
# 列举导入模块
|
|
from base64 import b64decode, b64encode
|
|
from json import dumps, loads
|
|
from pathlib import Path
|
|
from secrets import token_bytes
|
|
import sys
|
|
from time import time_ns
|
|
from typing import cast
|
|
from typing import Any, Dict, Literal, Optional, Tuple
|
|
from uuid import uuid4
|
|
|
|
from cryptography.exceptions import InvalidTag
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.primitives.serialization import load_pem_public_key
|
|
|
|
sys.path.append(Path(__file__).parent.parent.as_posix())
|
|
from utils.request import Request
|
|
|
|
|
|
class Encryptor:
|
|
"""
|
|
加密器
|
|
"""
|
|
|
|
def __init__(self, rsa_certificate_path: Optional[Path] = None):
|
|
"""
|
|
初始化
|
|
:param rsa_certificate_path: RSA 加密证书路径,默认为 rsa_public_key.pem
|
|
"""
|
|
# RSA 加密证书路径
|
|
if not rsa_certificate_path:
|
|
rsa_certificate_path = Path(__file__).parent / "rsa_public_key.pem"
|
|
if not rsa_certificate_path.exists():
|
|
raise FileNotFoundError("RSA 加密证书不存在")
|
|
# 实例 RSA 加密器
|
|
self._initialize_rsa_encryptor(rsa_certificate_path=rsa_certificate_path)
|
|
|
|
def _initialize_rsa_encryptor(self, rsa_certificate_path: Path) -> None:
|
|
"""
|
|
实例 RSA 加密器
|
|
"""
|
|
# RSA 公钥
|
|
rsa_public_key = rsa_certificate_path.read_text(encoding="utf-8")
|
|
|
|
# 实例 RSA 加密器
|
|
self.rsa_encryptor = cast(
|
|
rsa.RSAPublicKey,
|
|
load_pem_public_key(
|
|
data=rsa_public_key.encode("utf-8"), backend=default_backend()
|
|
),
|
|
)
|
|
|
|
def encrypt(
|
|
self,
|
|
payload: Dict[str, Any],
|
|
fields_mapping: Dict[str, str] = {},
|
|
strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
加密载荷
|
|
:param payload: 载荷
|
|
:param fields_mapping: 字段映射表,默认为空字典(不进行字段映射)
|
|
:param strategy: 加密策略,默认为 rsa+aes-gcm
|
|
:return: 请求体
|
|
"""
|
|
match strategy:
|
|
case "rsa+aes-gcm":
|
|
# AES-256 密钥,初始化向量,毫秒级时间戳,随机数
|
|
self.aes_key, self.initialization_vector, self.timestamp, self.nonce = (
|
|
token_bytes(32),
|
|
token_bytes(12),
|
|
time_ns()
|
|
// 1_000_000, # time() 返回为浮点,转为毫秒级时间戳会出现精度丢失
|
|
uuid4().hex.lower(),
|
|
)
|
|
# AES 加密
|
|
payload_encoded, tag_encoded = self._aes_encrypt(payload=payload)
|
|
request_body = {
|
|
"aes_key_encoded": self._rsa_encrypt(aes_key=self.aes_key),
|
|
"initialization_vector_encoded": b64encode(
|
|
s=self.initialization_vector
|
|
).decode(),
|
|
"timestamp": self.timestamp,
|
|
"nonce": self.nonce,
|
|
"payload_encoded": payload_encoded,
|
|
"tag_encoded": tag_encoded,
|
|
}
|
|
case _:
|
|
raise ValueError(f"暂不支持该加密策略: {strategy}")
|
|
|
|
# 若指定字段映射表则按照字段映射表返回
|
|
if fields_mapping:
|
|
return {
|
|
value: request_body.get(key) for key, value in fields_mapping.items()
|
|
}
|
|
else:
|
|
return request_body
|
|
|
|
def _rsa_encrypt(self, aes_key: bytes) -> str:
|
|
"""
|
|
RSA 加密
|
|
:param aes_key: AES-256
|
|
:return: BASE64 编码后密文
|
|
"""
|
|
# 使用 RSA 公钥加密明文
|
|
ciphertext = self.rsa_encryptor.encrypt(
|
|
plaintext=aes_key,
|
|
padding=padding.OAEP(
|
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
|
algorithm=hashes.SHA256(),
|
|
label=None,
|
|
),
|
|
)
|
|
return b64encode(s=ciphertext).decode() # BASE64 编码
|
|
|
|
def _aes_encrypt(self, payload: Dict[str, Any]) -> Tuple[str, str]:
|
|
"""
|
|
AES 加密
|
|
:param payload: 载荷
|
|
:return: BASE64 编码加密载荷和标签
|
|
"""
|
|
# 实例 AES 加密器
|
|
aes_encryptor = Cipher(
|
|
algorithm=algorithms.AES(self.aes_key),
|
|
mode=modes.GCM(initialization_vector=self.initialization_vector),
|
|
backend=default_backend(),
|
|
).encryptor()
|
|
# 附加认证
|
|
aes_encryptor.authenticate_additional_data(
|
|
data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8")
|
|
)
|
|
|
|
# 加密载荷
|
|
payload_encrypted = (
|
|
aes_encryptor.update(
|
|
data=dumps(obj=payload, ensure_ascii=False).encode("utf-8")
|
|
)
|
|
+ aes_encryptor.finalize()
|
|
)
|
|
return (
|
|
b64encode(s=payload_encrypted).decode(),
|
|
b64encode(s=aes_encryptor.tag).decode(),
|
|
) # 就加密载荷和标签 BASE64 编码
|
|
|
|
def decrypt(
|
|
self,
|
|
initialization_vector_encoded: str,
|
|
tag_encoded: str,
|
|
payload_encoded: str,
|
|
strategy: Literal["rsa+aes-gcm"] = "rsa+aes-gcm",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
解密载荷
|
|
:param initialization_vector_encoded: BASE64 编码初始化向量
|
|
:param tag_encoded: BASE64 编码标签
|
|
:param payload_encoded: BASE64 编码加密载荷
|
|
:param strategy: 解密策略
|
|
:return: 载荷
|
|
"""
|
|
match strategy:
|
|
case "rsa+aes-gcm":
|
|
return self._aes_decrypt(
|
|
initialization_vector_encoded=initialization_vector_encoded,
|
|
tag_encoded=tag_encoded,
|
|
payload_encoded=payload_encoded,
|
|
)
|
|
case _:
|
|
raise ValueError(f"暂不支持该解密策略: {strategy}")
|
|
|
|
def _aes_decrypt(
|
|
self,
|
|
initialization_vector_encoded: str,
|
|
tag_encoded: str,
|
|
payload_encoded: str,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
AES 解密
|
|
:param payload_encoded: BASE64 编码加密载荷
|
|
:param iv_encoded: BASE64 编码初始化向量
|
|
:param tag_encoded: BASE64 编码标签
|
|
:return: 载荷
|
|
"""
|
|
# 实例 AES 解密器
|
|
aes_decryptor = Cipher(
|
|
algorithm=algorithms.AES(self.aes_key),
|
|
mode=modes.GCM(
|
|
initialization_vector=b64decode(s=initialization_vector_encoded),
|
|
tag=b64decode(s=tag_encoded),
|
|
),
|
|
backend=default_backend(),
|
|
).decryptor()
|
|
# AES-GCM 解密器附加认证
|
|
aes_decryptor.authenticate_additional_data(
|
|
data=f"timestamp={self.timestamp}&nonce={self.nonce}".encode("utf-8")
|
|
)
|
|
try:
|
|
return loads(
|
|
s=(
|
|
aes_decryptor.update(b64decode(s=payload_encoded))
|
|
+ aes_decryptor.finalize()
|
|
).decode("utf-8")
|
|
)
|
|
except InvalidTag:
|
|
raise ValueError("AES 验签发生异常")
|
|
except Exception as exception:
|
|
raise ValueError(f"AES 解密发生异常: {str(exception)}") from exception
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# 实例加密器
|
|
encryptor = Encryptor()
|
|
|
|
# 杭州汤然请求同步响应
|
|
# https://risk-gw.pangjukeji.com/api/v1/riskgateway/product/invokeSync.json
|
|
# 杭州汤然请求异步响应(创建任务)
|
|
# https://risk-gw.pangjukeji.com/api/v1/riskgateway/product/submitAsyncTask.json
|
|
# 杭州汤然请求异步响应(查询任务)
|
|
# https://risk-gw.pangjukeji.com/api/v1/riskgateway/result/queryAsyncResult.json
|
|
# 实例请求客户端
|
|
request = Request() # 不使用缓存
|
|
response = request.post(
|
|
url="https://risk-gw.pangjukeji.com/api/v1/riskgateway/result/queryAsyncResult.json",
|
|
headers={
|
|
"Authorization": "Bearer 337162980141699072",
|
|
"Content-Type": "application/json;charset=utf-8",
|
|
},
|
|
json=encryptor.encrypt(
|
|
payload={
|
|
"recordId": "337895642829557760",
|
|
},
|
|
fields_mapping={
|
|
"aes_key_encoded": "encryptedAesKey",
|
|
"initialization_vector_encoded": "iv",
|
|
"payload_encoded": "encryptedPayload",
|
|
"tag_encoded": "tag",
|
|
"timestamp": "timestamp",
|
|
"nonce": "nonce",
|
|
}, # 加密器 AES-GCM 请求体和请求字段映射表
|
|
),
|
|
)
|
|
if response.get("code") != "GW_SUCCESS_000":
|
|
raise RuntimeError(
|
|
f"请求失败: {response.get("status")} {response.get("code")} {response.get("message")}"
|
|
)
|
|
print(response)
|
|
# 解密响应体
|
|
data = encryptor.decrypt(
|
|
payload_encoded=response.get("data").get("encryptedPayload"),
|
|
initialization_vector_encoded=response.get("data").get("iv"),
|
|
tag_encoded=response.get("data").get("tag"),
|
|
)
|
|
print(data)
|
|
|
|
"""
|
|
{
|
|
"productId": "CREDIT_RISK_PRE_SCREEN",
|
|
"mobilephone": "13833652839",
|
|
"certNo": "131002196212124620"
|
|
}
|
|
|
|
{
|
|
"productId": "BANK_CARD_4",
|
|
"name": "刘弼仁",
|
|
"certNo": "131002198705024619",
|
|
"mobilephone": "18058798752",
|
|
"bankCardNo": "6228480328744527172",
|
|
},
|
|
|
|
{
|
|
"productId": "SOCIAL_CAPACITY",
|
|
"name": "刘弼仁",
|
|
"certNo": "131002198705024619",
|
|
"mobilephone": "18058798752",
|
|
},
|
|
{
|
|
"productId": "CREDIT_RISK_PRE_SCREEN",
|
|
"mobilephone": "13833652939",
|
|
"certNo": "131002196212124620"
|
|
},
|
|
{
|
|
"productId": "CAR_ACCIDENT_DETAIL",
|
|
"vin": "L6T7712Z2GN076782",
|
|
}
|
|
"""
|