163 lines
5.6 KiB
Python
163 lines
5.6 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
认证器模块
|
||
"""
|
||
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import sys
|
||
import threading
|
||
import time
|
||
from pathlib import Path
|
||
from typing import Optional, Tuple
|
||
|
||
sys.path.append(Path(__file__).parent.as_posix())
|
||
from request import Request
|
||
|
||
|
||
class Authenticator:
|
||
"""
|
||
认证器,支持:
|
||
get_token:获取访问令牌
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""初始化"""
|
||
# 初始化访问凭证路径
|
||
self.certifications_path = (
|
||
Path(__file__).parent.resolve() / "certifications.json"
|
||
)
|
||
# 若访问凭证路径不存在则创建
|
||
if not self.certifications_path.exists():
|
||
with open(self.certifications_path, "w", encoding="utf-8") as file:
|
||
json.dump(
|
||
{},
|
||
file,
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
# 初始化请求客户端
|
||
self.request = Request()
|
||
|
||
def get_token(self, servicer: str) -> Optional[str]:
|
||
"""
|
||
获取访问令牌
|
||
:param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书
|
||
:return token: 访问令牌
|
||
"""
|
||
with threading.Lock():
|
||
token, expired_timestamp = None, 0
|
||
try:
|
||
with open(self.certifications_path, "r", encoding="utf-8") as file:
|
||
certifications = json.load(file)
|
||
# 获取指定服务商的访问凭证
|
||
certification = certifications.get(servicer)
|
||
if certification:
|
||
token = certification["token"] # 访问令牌
|
||
expired_timestamp = certification[
|
||
"expired_timestamp"
|
||
] # 失效时间戳
|
||
|
||
except json.decoder.JSONDecodeError:
|
||
with open(self.certifications_path, "w", encoding="utf-8") as file:
|
||
json.dump(
|
||
certifications := {},
|
||
file,
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
except Exception as exception:
|
||
raise RuntimeError(
|
||
f"获取访问凭证发生异常:{str(exception)}"
|
||
) from exception
|
||
|
||
if time.time() > expired_timestamp:
|
||
match servicer:
|
||
case "szkt":
|
||
token, expired_timestamp = self._get_szkt_certification()
|
||
case "hlyj":
|
||
token, expired_timestamp = self._get_hlyj_certification()
|
||
case "feishu":
|
||
token, expired_timestamp = self._get_feishu_certification()
|
||
case _:
|
||
raise RuntimeError(f"暂不支持该服务商:{servicer}")
|
||
|
||
# 更新访问凭证
|
||
certifications[servicer] = {
|
||
"token": token,
|
||
"expired_timestamp": expired_timestamp,
|
||
}
|
||
with open(self.certifications_path, "w", encoding="utf-8") as file:
|
||
json.dump(
|
||
certifications,
|
||
file,
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
return token
|
||
|
||
def _get_szkt_certification(self) -> tuple[str, float]:
|
||
"""
|
||
获取深圳快瞳访问凭证
|
||
:return: 访问令牌和失效时间戳
|
||
"""
|
||
response = self.request.get(
|
||
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
|
||
)
|
||
|
||
# 若非响应成功则抛出异常
|
||
if not (response["status"] == 200 and response["code"] == 0):
|
||
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
|
||
|
||
return (
|
||
response["data"]["access_token"],
|
||
time.time() + response["data"]["expires_in"],
|
||
)
|
||
|
||
def _get_hlyj_certification(self) -> Tuple[str, float]:
|
||
"""获取合力亿捷访问凭证"""
|
||
# 企业访问标识
|
||
access_key_id = "25938f1c190448829dbdb5d344231e42"
|
||
# 签名秘钥
|
||
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
|
||
# 时间戳(秒级)
|
||
timestamp = int(time.time())
|
||
# 构建签名
|
||
signature = hmac.new(
|
||
secret_access_key.encode("utf-8"),
|
||
f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"),
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
response = self.request.get(
|
||
url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}"
|
||
)
|
||
|
||
# 若非响应成功则抛出异常
|
||
if not response["success"]:
|
||
raise RuntimeError("获取合力亿捷访问凭证发生异常")
|
||
|
||
return (
|
||
response["data"],
|
||
time.time() + 1 * 3600, # 访问令牌有效期为1小时
|
||
)
|
||
|
||
def _get_feishu_certification(self) -> tuple[str, float]:
|
||
"""获取飞书访问凭证"""
|
||
response = self.request.post(
|
||
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
data={
|
||
"app_id": "cli_a1587980be78500c",
|
||
"app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA",
|
||
},
|
||
)
|
||
|
||
# 若非响应成功则抛出异常
|
||
if not response["code"] == 0:
|
||
raise RuntimeError("获取飞书访问凭证发生异常")
|
||
|
||
return (
|
||
response["tenant_access_token"],
|
||
time.time() + response["expire"],
|
||
)
|