# -*- coding: utf-8 -*- """ 封装认证器 实例化时本地加载所有服务商的访问凭证。根据服务商获取其访问令牌时,若该服务商的访问令牌已过期则刷新该服务商的访问凭证并更新本地文件。 """ import hashlib import hmac import json from pathlib import Path import threading import time from typing import Optional, Tuple from request import Request import sys sys.path.append(Path(__file__).parent.as_posix()) class Authenticator: """ 认证器,支持: get_token:根据服务商获取其访问令牌 """ def __init__(self): """初始化""" # 构建所有服务商的访问凭证本地文件路径 self.certifications_path = ( Path(__file__).parent.resolve() / "certifications.json" ) # 初始化所有服务商的访问凭证 try: with open( file=self.certifications_path, mode="r", encoding="utf-8", ) as file: self.certifications = json.load(fp=file) except FileNotFoundError: with open( file=self.certifications_path, mode="w", encoding="utf-8" ) as file: json.dump( obj={}, fp=file, ensure_ascii=False, ) self.certifications = {} except Exception as exception: raise RuntimeError( f"初始化所有服务商的访问凭证发生异常:{str(exception)}" ) from exception # 实例化请求客户端 self.request = Request() def get_token(self, servicer: str) -> Optional[str]: """ 根据服务商获取其访问令牌 :param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书 :return token: 访问令牌 """ with threading.Lock(): token, expired_timestamp = self.certifications.get( servicer, (None, 0), ) # 指定服务商的访问令牌和失效时间戳 if time.time() > expired_timestamp: match servicer: # 刷新深圳快瞳访问凭证 case "inspirvision": token, expired_timestamp = ( self._refresh_inspirvision_certification() ) # 刷新合力亿捷访问凭证 case "hollycrm": token, expired_timestamp = ( self._refresh_hollycrm_certification() ) # 刷新飞书访问凭证 case "feishu": token, expired_timestamp = self._refresh_feishu_certification() case _: raise RuntimeError(f"未知服务商:{servicer}") self.certifications.update( { servicer: ( token, expired_timestamp, ), } ) # 更新所有服务商的访问凭证本地文件 with open(self.certifications_path, mode="w", encoding="utf-8") as file: json.dump( obj=self.certifications, fp=file, ensure_ascii=False, ) return token def _refresh_inspirvision_certification(self) -> Tuple[str, float]: """ 刷新深圳快瞳访问凭证 :return: 深圳快瞳访问令牌和失效时间戳 """ response = self.request.get( url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" ) # 若非响应成功则抛出异常 if not (response["status"] == 200 and response["code"] == 0): raise RuntimeError("刷新深圳快瞳访问凭证发生异常") return ( response["data"]["access_token"], time.time() + response["data"]["expires_in"], ) # https://inspirvision.cn/spa/documentCenter/ocr-finance/mark?id=3 深圳快瞳获取访问凭证接口,其中 access_token 为访问令牌,expires_in 为有效剩余时间(单位为秒) def _refresh_hollycrm_certification(self) -> Tuple[str, float]: """ 刷新合力亿捷访问凭证 :return: 合力亿捷访问令牌和失效时间戳 """ # 企业访问标识 access_key_id = "25938f1c190448829dbdb5d344231e42" # 签名秘钥 secret_access_key = "44dc0299aff84d68ae27712f8784f173" # 时间戳 timestamp = int(time.time()) # 基于 HMAC-SHA256 算法构建签名 signature = hmac.new( secret_access_key.encode("utf-8"), f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"), hashlib.sha256, ).hexdigest() response = self.request.get( url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}" ) # 若非响应成功则抛出异常 if not response["success"]: raise RuntimeError("刷新合力亿捷访问凭证发生异常") return ( response["data"], time.time() + 1 * 3600, # 访问令牌有效期为1小时 ) def _refresh_feishu_certification(self) -> Tuple[str, float]: """ 刷新飞书访问凭证 :return: 飞书访问令牌和失效时间戳 """ response = self.request.post( url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", data={ "app_id": "cli_a1587980be78500c", "app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA", }, ) # 若非响应成功则抛出异常 if not response["code"] == 0: raise RuntimeError("刷新飞书访问凭证发生异常") return ( response["tenant_access_token"], time.time() + response["expire"], ) # https://open.feishu.cn/document/server-docs/api-call-guide/calling-process/get-access-token 飞书获取访问凭证接口