Python/utils/authenticator.py

163 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
认证器模块
"""
import hashlib
import hmac
import json
import sys
import threading
import time
from pathlib import Path
from typing import Optional, Tuple
sys.path.append(Path(__file__).parent.as_posix())
from request import Request
class Authenticator:
"""
认证器,支持:
get_token获取访问令牌
"""
def __init__(self):
"""初始化"""
# 初始化访问凭证路径
self.certifications_path = (
Path(__file__).parent.resolve() / "certifications.json"
)
# 若访问凭证路径不存在则创建
if not self.certifications_path.exists():
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
{},
file,
ensure_ascii=False,
)
# 初始化请求客户端
self.request = Request()
def get_token(self, servicer: str) -> Optional[str]:
"""
获取访问令牌
:param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书
:return token: 访问令牌
"""
with threading.Lock():
token, expired_timestamp = None, 0
try:
with open(self.certifications_path, "r", encoding="utf-8") as file:
certifications = json.load(file)
# 获取指定服务商的访问凭证
certification = certifications.get(servicer)
if certification:
token = certification["token"] # 访问令牌
expired_timestamp = certification[
"expired_timestamp"
] # 失效时间戳
except json.decoder.JSONDecodeError:
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
certifications := {},
file,
ensure_ascii=False,
)
except Exception as exception:
raise RuntimeError(
f"获取访问凭证发生异常:{str(exception)}"
) from exception
if time.time() > expired_timestamp:
match servicer:
case "szkt":
token, expired_timestamp = self._get_szkt_certification()
case "hlyj":
token, expired_timestamp = self._get_hlyj_certification()
case "feishu":
token, expired_timestamp = self._get_feishu_certification()
case _:
raise RuntimeError(f"暂不支持该服务商:{servicer}")
# 更新访问凭证
certifications[servicer] = {
"token": token,
"expired_timestamp": expired_timestamp,
}
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
certifications,
file,
ensure_ascii=False,
)
return token
def _get_szkt_certification(self) -> tuple[str, float]:
"""
获取深圳快瞳访问凭证
:return: 访问令牌和失效时间戳
"""
response = self.request.get(
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
)
# 若非响应成功则抛出异常
if not (response["status"] == 200 and response["code"] == 0):
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
return (
response["data"]["access_token"],
time.time() + response["data"]["expires_in"],
)
def _get_hlyj_certification(self) -> Tuple[str, float]:
"""获取合力亿捷访问凭证"""
# 企业访问标识
access_key_id = "25938f1c190448829dbdb5d344231e42"
# 签名秘钥
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
# 时间戳(秒级)
timestamp = int(time.time())
# 构建签名
signature = hmac.new(
secret_access_key.encode("utf-8"),
f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"),
hashlib.sha256,
).hexdigest()
response = self.request.get(
url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}&timestamp={timestamp}&signature={signature}"
)
# 若非响应成功则抛出异常
if not response["success"]:
raise RuntimeError("获取合力亿捷访问凭证发生异常")
return (
response["data"],
time.time() + 1 * 3600, # 访问令牌有效期为1小时
)
def _get_feishu_certification(self) -> tuple[str, float]:
"""获取飞书访问凭证"""
response = self.request.post(
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
data={
"app_id": "cli_a1587980be78500c",
"app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA",
},
)
# 若非响应成功则抛出异常
if not response["code"] == 0:
raise RuntimeError("获取飞书访问凭证发生异常")
return (
response["tenant_access_token"],
time.time() + response["expire"],
)