Python/utils/authenticator.py

164 lines
5.6 KiB
Python

# -*- coding: utf-8 -*-
"""
认证器
"""
import hashlib
import hmac
import json
from pathlib import Path
import sys
import threading
import time
from typing import Optional, Tuple
sys.path.append(Path(__file__).parent.as_posix())
from request import Request
class Authenticator:
"""
认证器,支持:
1、
"""
def __init__(self):
"""初始化"""
# 初始化访问凭证路径
self.certifications_path = (
Path(__file__).parent.resolve() / "certifications.json"
)
# 若访问凭证路径不存在则创建
if not self.certifications_path.exists():
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
{},
file,
ensure_ascii=False,
)
# 初始化请求客户端
self.request = Request()
def get_token(self, servicer: str) -> Optional[str]:
"""
获取访问令牌
:param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书
:return token: 访问令牌
"""
with threading.Lock():
token, expired_timestamp = None, 0
try:
with open(self.certifications_path, "r", encoding="utf-8") as file:
certifications = json.load(file)
# 获取指定服务商的访问凭证
certification = certifications.get(servicer)
# 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳
if certification:
# 访问令牌
token = certification["token"]
# 失效时间戳
expired_timestamp = certification["expired_timestamp"]
except json.decoder.JSONDecodeError:
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
certifications := {},
file,
ensure_ascii=False,
)
except Exception as exception:
raise RuntimeError(
f"获取访问凭证发生异常:{str(exception)}"
) from exception
if time.time() > expired_timestamp:
match servicer:
case "szkt":
token, expired_timestamp = self._get_szkt_certification()
case "hlyj":
token, expired_timestamp = self._get_hlyj_certification()
case "feishu":
token, expired_timestamp = self._get_feishu_certification()
case _:
raise RuntimeError(f"暂不支持该服务商:{servicer}")
# 更新访问凭证
certifications[servicer] = {
"token": token,
"expired_timestamp": expired_timestamp,
}
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
certifications,
file,
ensure_ascii=False,
)
return token
def _get_szkt_certification(self) -> tuple[str, float]:
"""
获取深圳快瞳访问凭证
:return: 访问令牌和失效时间戳
"""
response = self.request.get(
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
)
# 若非响应成功则抛出异常
if not (response["status"] == 200 and response["code"] == 0):
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
return (
response["data"]["access_token"],
time.time() + response["data"]["expires_in"],
)
def _get_hlyj_certification(self) -> Tuple[str, float]:
"""获取合力亿捷访问凭证"""
# 企业访问标识
access_key_id = "25938f1c190448829dbdb5d344231e42"
# 签名秘钥
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
# 时间戳(秒级)
timestamp = int(time.time())
# 构建签名
signature = hmac.new(
secret_access_key.encode("utf-8"),
f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"),
hashlib.sha256,
).hexdigest()
response = self.request.get(
url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}&timestamp={timestamp}&signature={signature}"
)
# 若非响应成功则抛出异常
if not response["success"]:
raise RuntimeError("获取合力亿捷访问凭证发生异常")
return (
response["data"],
time.time() + 1 * 3600, # 访问令牌有效期为1小时
)
def _get_feishu_certification(self) -> tuple[str, float]:
"""获取飞书访问凭证"""
response = self.request.post(
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
data={
"app_id": "cli_a1587980be78500c",
"app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA",
},
)
# 若非响应成功则抛出异常
if not response["code"] == 0:
raise RuntimeError("获取飞书访问凭证发生异常")
return (
response["tenant_access_token"],
time.time() + response["expire"],
)