# -*- coding: utf-8 -*- """ 认证器模块 """ import hashlib import hmac import json import sys import threading import time from pathlib import Path from typing import Optional, Tuple sys.path.append(Path(__file__).parent.as_posix()) from request import Request class Authenticator: """ 认证器,支持: get_token:获取访问令牌 """ def __init__(self): """初始化""" # 初始化访问凭证路径 self.certifications_path = ( Path(__file__).parent.resolve() / "certifications.json" ) # 若访问凭证路径不存在则创建 if not self.certifications_path.exists(): with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( {}, file, ensure_ascii=False, ) # 初始化请求客户端 self.request = Request() def get_token(self, servicer: str) -> Optional[str]: """ 获取访问令牌 :param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书 :return token: 访问令牌 """ with threading.Lock(): token, expired_timestamp = None, 0 try: with open(self.certifications_path, "r", encoding="utf-8") as file: certifications = json.load(file) # 获取指定服务商的访问凭证 certification = certifications.get(servicer) if certification: token = certification["token"] # 访问令牌 expired_timestamp = certification[ "expired_timestamp" ] # 失效时间戳 except json.decoder.JSONDecodeError: with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( certifications := {}, file, ensure_ascii=False, ) except Exception as exception: raise RuntimeError( f"获取访问凭证发生异常:{str(exception)}" ) from exception if time.time() > expired_timestamp: match servicer: case "szkt": token, expired_timestamp = self._get_szkt_certification() case "hlyj": token, expired_timestamp = self._get_hlyj_certification() case "feishu": token, expired_timestamp = self._get_feishu_certification() case _: raise RuntimeError(f"暂不支持该服务商:{servicer}") # 更新访问凭证 certifications[servicer] = { "token": token, "expired_timestamp": expired_timestamp, } with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( certifications, file, ensure_ascii=False, ) return token def _get_szkt_certification(self) -> tuple[str, float]: """ 获取深圳快瞳访问凭证 :return: 访问令牌和失效时间戳 """ response = self.request.get( url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" ) # 若非响应成功则抛出异常 if not (response["status"] == 200 and response["code"] == 0): raise RuntimeError("获取深圳快瞳访问凭证发生异常") return ( response["data"]["access_token"], time.time() + response["data"]["expires_in"], ) def _get_hlyj_certification(self) -> Tuple[str, float]: """获取合力亿捷访问凭证""" # 企业访问标识 access_key_id = "25938f1c190448829dbdb5d344231e42" # 签名秘钥 secret_access_key = "44dc0299aff84d68ae27712f8784f173" # 时间戳(秒级) timestamp = int(time.time()) # 构建签名 signature = hmac.new( secret_access_key.encode("utf-8"), f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"), hashlib.sha256, ).hexdigest() response = self.request.get( url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}" ) # 若非响应成功则抛出异常 if not response["success"]: raise RuntimeError("获取合力亿捷访问凭证发生异常") return ( response["data"], time.time() + 1 * 3600, # 访问令牌有效期为1小时 ) def _get_feishu_certification(self) -> tuple[str, float]: """获取飞书访问凭证""" response = self.request.post( url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", data={ "app_id": "cli_a1587980be78500c", "app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA", }, ) # 若非响应成功则抛出异常 if not response["code"] == 0: raise RuntimeError("获取飞书访问凭证发生异常") return ( response["tenant_access_token"], time.time() + response["expire"], )