194 lines
7.4 KiB
Python
194 lines
7.4 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
封装认证器
|
||
实例化时本地加载所有服务商的访问凭证。根据服务商获取其访问令牌时,若该服务商的访问令牌已过期则刷新该服务商的访问凭证并更新本地文件。
|
||
"""
|
||
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
from pathlib import Path
|
||
import threading
|
||
import time
|
||
from typing import Optional, Tuple
|
||
from datetime import datetime
|
||
|
||
from request import Request
|
||
|
||
import sys
|
||
|
||
sys.path.append(Path(__file__).parent.as_posix())
|
||
|
||
|
||
class Authenticator:
|
||
"""
|
||
认证器,支持:
|
||
get_token:根据服务商获取其访问令牌
|
||
"""
|
||
|
||
def __init__(self):
|
||
"""初始化"""
|
||
# 构建所有服务商的访问凭证本地文件路径
|
||
self.certifications_path = (
|
||
Path(__file__).parent.resolve() / "certifications.json"
|
||
)
|
||
# 初始化所有服务商的访问凭证
|
||
try:
|
||
with open(
|
||
file=self.certifications_path,
|
||
mode="r",
|
||
encoding="utf-8",
|
||
) as file:
|
||
self.certifications = json.load(fp=file)
|
||
except FileNotFoundError:
|
||
with open(
|
||
file=self.certifications_path, mode="w", encoding="utf-8"
|
||
) as file:
|
||
json.dump(
|
||
obj={},
|
||
fp=file,
|
||
ensure_ascii=False,
|
||
)
|
||
self.certifications = {}
|
||
except Exception as exception:
|
||
raise RuntimeError(
|
||
f"初始化所有服务商的访问凭证发生异常:{str(exception)}"
|
||
) from exception
|
||
|
||
# 实例化请求客户端
|
||
self.request = Request()
|
||
|
||
def get_token(self, servicer: str) -> Optional[str]:
|
||
"""
|
||
根据服务商获取其访问令牌
|
||
:param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书
|
||
:return token: 访问令牌
|
||
"""
|
||
with threading.Lock():
|
||
token, expired_timestamp = self.certifications.get(
|
||
servicer,
|
||
(None, 0),
|
||
) # 指定服务商的访问令牌和失效时间戳
|
||
if time.time() > expired_timestamp:
|
||
match servicer:
|
||
# 获取深圳快瞳访问凭证cd C:\Python\.venv\Scripts
|
||
case "inspirvision":
|
||
token, expired_timestamp = (
|
||
self._get_inspirvision_certification()
|
||
)
|
||
# 获取合力亿捷访问凭证
|
||
case "hollycrm":
|
||
token, expired_timestamp = self._get_hollycrm_certification()
|
||
# 获取飞书访问凭证
|
||
case "feishu":
|
||
token, expired_timestamp = self._get_feishu_certification()
|
||
# 获取 Cloudreve 访问凭证
|
||
case "cloudreve":
|
||
token, expired_timestamp = self._get_cloudreve_certification()
|
||
case _:
|
||
raise RuntimeError(f"未知服务商:{servicer}")
|
||
|
||
self.certifications.update(
|
||
{
|
||
servicer: (
|
||
token,
|
||
expired_timestamp,
|
||
),
|
||
}
|
||
)
|
||
# 更新所有服务商的访问凭证本地文件
|
||
with open(self.certifications_path, mode="w", encoding="utf-8") as file:
|
||
json.dump(
|
||
obj=self.certifications,
|
||
fp=file,
|
||
ensure_ascii=False,
|
||
)
|
||
return token
|
||
|
||
def _get_inspirvision_certification(self) -> Tuple[str, float]:
|
||
"""
|
||
获取深圳快瞳访问凭证
|
||
:return: 深圳快瞳访问令牌和失效时间戳
|
||
"""
|
||
response = self.request.get(
|
||
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
|
||
)
|
||
# 若非响应成功则抛出异常
|
||
if not (response["status"] == 200 and response["code"] == 0):
|
||
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
|
||
return (
|
||
response["data"]["access_token"],
|
||
time.time() + response["data"]["expires_in"],
|
||
) # https://inspirvision.cn/spa/documentCenter/ocr-finance/mark?id=3 深圳快瞳获取访问凭证接口,其中 access_token 为访问令牌,expires_in 为有效剩余时间(单位为秒)
|
||
|
||
def _get_hollycrm_certification(self) -> Tuple[str, float]:
|
||
"""
|
||
获取合力亿捷访问凭证
|
||
:return: 合力亿捷访问令牌和失效时间戳
|
||
"""
|
||
# 企业访问标识
|
||
access_key_id = "25938f1c190448829dbdb5d344231e42"
|
||
# 签名秘钥
|
||
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
|
||
# 时间戳
|
||
timestamp = int(time.time())
|
||
# 基于 HMAC-SHA256 算法构建签名
|
||
signature = hmac.new(
|
||
secret_access_key.encode("utf-8"),
|
||
f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"),
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
|
||
response = self.request.get(
|
||
url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}"
|
||
)
|
||
# 若非响应成功则抛出异常
|
||
if not response["success"]:
|
||
raise RuntimeError("获取合力亿捷访问凭证发生异常")
|
||
return (
|
||
response["data"],
|
||
time.time() + 1 * 3600, # 访问令牌有效期为1小时
|
||
)
|
||
|
||
def _get_feishu_certification(self) -> Tuple[str, float]:
|
||
"""
|
||
获取飞书访问凭证
|
||
:return: 飞书访问令牌和失效时间戳
|
||
"""
|
||
response = self.request.post(
|
||
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={
|
||
"app_id": "cli_a1587980be78500c",
|
||
"app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA",
|
||
},
|
||
)
|
||
# 若非响应成功则抛出异常
|
||
if not response["code"] == 0:
|
||
raise RuntimeError("获取飞书访问凭证发生异常")
|
||
return (
|
||
response["tenant_access_token"],
|
||
time.time() + response["expire"],
|
||
) # https://open.feishu.cn/document/server-docs/api-call-guide/calling-process/get-access-token 飞书获取访问凭证接口
|
||
|
||
def _get_cloudreve_certification(self) -> Tuple[str, float]:
|
||
"""
|
||
获取 Cloudreve 访问凭证
|
||
:return: Cloudreve 访问令牌和失效时间戳
|
||
"""
|
||
response = self.request.post(
|
||
url="https://cloudreve.liubiren.cloud/api/v4/session/token",
|
||
json={
|
||
"email": "marslbr@qq.com",
|
||
"password": "Cl198752",
|
||
},
|
||
)
|
||
# 若非响应成功则抛出异常
|
||
if not response["code"] == 0:
|
||
raise RuntimeError("获取 Cloudreve 访问凭证发生异常")
|
||
return (
|
||
response["data"]["token"]["access_token"],
|
||
datetime.fromisoformat(
|
||
response["data"]["token"]["access_expires"]
|
||
).timestamp(),
|
||
) # https://docs.cloudreve.org/zh/api/auth 通过登录获取 AccessToken,时效时重新登录重新获取
|