This commit is contained in:
liubiren 2026-03-18 13:06:20 +08:00
parent 423a6e7b6f
commit 4f0739fa92
2 changed files with 69 additions and 65 deletions

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""
封装认证器
实例化时本地加载所有服务商的访问凭证根据服务商获取其访问令牌时若该服务商的访问令牌已过期则刷新该服务商的访问凭证并更新本地文件
"""
import hashlib
@ -21,116 +22,116 @@ sys.path.append(Path(__file__).parent.as_posix())
class Authenticator:
"""
认证器支持
get_token获取访问令牌
get_token根据服务商获取访问令牌
"""
def __init__(self):
"""初始化"""
# 初始化访问凭证路径
# 构建所有服务商的访问凭证本地文件路径
self.certifications_path = (
Path(__file__).parent.resolve() / "certifications.json"
)
# 若访问凭证路径不存在则创建
if not self.certifications_path.exists():
with open(self.certifications_path, "w", encoding="utf-8") as file:
# 初始化所有服务商的访问凭证
try:
with open(
file=self.certifications_path,
mode="r",
encoding="utf-8",
) as file:
self.certifications = json.load(fp=file)
except FileNotFoundError:
with open(
file=self.certifications_path, mode="w", encoding="utf-8"
) as file:
json.dump(
{},
file,
obj={},
fp=file,
ensure_ascii=False,
)
# 初始化所有访问凭证
self.certifications = {}
except Exception as exception:
raise RuntimeError(
f"初始化所有服务商的访问凭证发生异常:{str(exception)}"
) from exception
# 初始化请求客户端
self.request = Request()
def get_token(self, servicer: str) -> Optional[str]:
"""
获取访问令牌
根据服务商获取访问令牌
:param servicer: 服务商名称暂仅支持深圳快瞳合力亿捷和飞书
:return token: 访问令牌
"""
with threading.Lock():
token, expired_timestamp = None, 0
try:
with open(self.certifications_path, "r", encoding="utf-8") as file:
certifications = json.load(file)
# 获取指定服务商的访问凭证
certification = certifications.get(servicer)
if certification:
token = certification["token"] # 访问令牌
expired_timestamp = certification[
"expired_timestamp"
] # 失效时间戳
except json.decoder.JSONDecodeError:
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
certifications := {},
file,
ensure_ascii=False,
)
except Exception as exception:
raise RuntimeError(
f"获取访问凭证发生异常:{str(exception)}"
) from exception
token, expired_timestamp = self.certifications.get(
servicer,
(None, 0),
) # 指定服务商的访问令牌和失效时间戳
if time.time() > expired_timestamp:
match servicer:
case "szkt":
# 刷新深圳快瞳访问凭证
case "inspirvision":
token, expired_timestamp = (
self._get_inspirvision_certification()
self._refresh_inspirvision_certification()
)
# 刷新合力亿捷访问凭证
case "hollycrm":
token, expired_timestamp = self._get_hollycrm_certification()
token, expired_timestamp = (
self._refresh_hollycrm_certification()
)
# 刷新飞书访问凭证
case "feishu":
token, expired_timestamp = self._get_feishu_certification()
token, expired_timestamp = self._refresh_feishu_certification()
case _:
raise RuntimeError(f"暂不支持该服务商:{servicer}")
raise RuntimeError(f"未知服务商:{servicer}")
# 更新访问凭证
certifications[servicer] = {
"token": token,
"expired_timestamp": expired_timestamp,
self.certifications.update(
{
servicer: (
token,
expired_timestamp,
),
}
with open(self.certifications_path, "w", encoding="utf-8") as file:
)
# 更新所有服务商的访问凭证本地文件
with open(self.certifications_path, mode="w", encoding="utf-8") as file:
json.dump(
certifications,
file,
obj=self.certifications,
fp=file,
ensure_ascii=False,
)
return token
def _get_inspirvision_certification(self) -> Tuple[str, float]:
def _refresh_inspirvision_certification(self) -> Tuple[str, float]:
"""
获取深圳快瞳访问凭证
:return: 访问令牌和失效时间戳
刷新深圳快瞳访问凭证
:return: 深圳快瞳访问令牌和失效时间戳
"""
response = self.request.get(
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
)
# 若非响应成功则抛出异常
if not (response["status"] == 200 and response["code"] == 0):
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
raise RuntimeError("刷新深圳快瞳访问凭证发生异常")
return (
response["data"]["access_token"],
time.time() + response["data"]["expires_in"],
)
) # https://inspirvision.cn/spa/documentCenter/ocr-finance/mark?id=3 深圳快瞳获取访问凭证接口,其中 access_token 为访问令牌expires_in 为有效剩余时间(单位为秒)
def _get_hollycrm_certification(self) -> Tuple[str, float]:
"""获取合力亿捷访问凭证"""
def _refresh_hollycrm_certification(self) -> Tuple[str, float]:
"""
刷新合力亿捷访问凭证
:return: 合力亿捷访问令牌和失效时间戳
"""
# 企业访问标识
access_key_id = "25938f1c190448829dbdb5d344231e42"
# 签名秘钥
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
# 时间戳(秒级)
# 时间戳
timestamp = int(time.time())
# 构建签名
# 基于 HMAC-SHA256 算法构建签名
signature = hmac.new(
secret_access_key.encode("utf-8"),
f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"),
@ -142,14 +143,17 @@ class Authenticator:
)
# 若非响应成功则抛出异常
if not response["success"]:
raise RuntimeError("获取合力亿捷访问凭证发生异常")
raise RuntimeError("刷新合力亿捷访问凭证发生异常")
return (
response["data"],
time.time() + 1 * 3600, # 访问令牌有效期为1小时
)
def _get_feishu_certification(self) -> Tuple[str, float]:
"""获取飞书访问凭证"""
def _refresh_feishu_certification(self) -> Tuple[str, float]:
"""
刷新飞书访问凭证
:return: 飞书访问令牌和失效时间戳
"""
response = self.request.post(
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
data={
@ -159,7 +163,7 @@ class Authenticator:
)
# 若非响应成功则抛出异常
if not response["code"] == 0:
raise RuntimeError("获取飞书访问凭证发生异常")
raise RuntimeError("刷新飞书访问凭证发生异常")
return (
response["tenant_access_token"],
time.time() + response["expire"],

View File

@ -1 +1 @@
{"szkt": {"token": "a62c56ace614a6546191d5af8ca8b1513cfaeaea7ce67d0a37de994ab6c2aa4e2a0b058e0da575ff376dd51dc19c5ad353ab2761cb6d9db4d521b83adeee2979b78f7ae70765b26985165b6266d084b75f2f918008966e72a116d8bca5ec4c7cecc5223f78fa47b4d40aa9cf5277a11b0b967ad06e84ef7c4acbc53ccdef936c062b2d037ae0dad8c29d50426b668ec349cc8c0099a0270e16f97d31e4f058bc086334468f88d934c7fd1464ed3800833d2f486dc06f0689b99abbb78a8ebf4a3877bd82d0dd765dc09b7a1594fa8849d51f59282a81048c52e82e8320d1ad042a6c307ca831647cba4356564704780f", "expired_timestamp": 1859201579.393386}, "feishu": {"token": "t-g1043hkKMBTKBZNZJPNCVDO7JMNOLWMM56ZCWKR6", "expired_timestamp": 1773758800.9476614}}
{"feishu": ["t-g1043icTZGZGOXVHKENQWLCOHTRRBNSQEYJLOVNT", 1773816955.118503]}