diff --git a/utils/authenticator.py b/utils/authenticator.py index 8261708..24497cb 100644 --- a/utils/authenticator.py +++ b/utils/authenticator.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- """ 封装认证器 +实例化时本地加载所有服务商的访问凭证。根据服务商获取其访问令牌时,若该服务商的访问令牌已过期则刷新该服务商的访问凭证并更新本地文件。 """ import hashlib @@ -21,116 +22,116 @@ sys.path.append(Path(__file__).parent.as_posix()) class Authenticator: """ 认证器,支持: - get_token:获取访问令牌 + get_token:根据服务商获取其访问令牌 """ def __init__(self): """初始化""" - # 初始化访问凭证路径 + # 构建所有服务商的访问凭证本地文件路径 self.certifications_path = ( Path(__file__).parent.resolve() / "certifications.json" ) - # 若访问凭证路径不存在则创建 - if not self.certifications_path.exists(): - with open(self.certifications_path, "w", encoding="utf-8") as file: + # 初始化所有服务商的访问凭证 + try: + with open( + file=self.certifications_path, + mode="r", + encoding="utf-8", + ) as file: + self.certifications = json.load(fp=file) + except FileNotFoundError: + with open( + file=self.certifications_path, mode="w", encoding="utf-8" + ) as file: json.dump( - {}, - file, + obj={}, + fp=file, ensure_ascii=False, ) - - # 初始化所有访问凭证 - self.certifications = {} + self.certifications = {} + except Exception as exception: + raise RuntimeError( + f"初始化所有服务商的访问凭证发生异常:{str(exception)}" + ) from exception # 初始化请求客户端 self.request = Request() def get_token(self, servicer: str) -> Optional[str]: """ - 获取访问令牌 + 根据服务商获取其访问令牌 :param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书 :return token: 访问令牌 """ with threading.Lock(): - token, expired_timestamp = None, 0 - try: - with open(self.certifications_path, "r", encoding="utf-8") as file: - certifications = json.load(file) - # 获取指定服务商的访问凭证 - certification = certifications.get(servicer) - if certification: - token = certification["token"] # 访问令牌 - expired_timestamp = certification[ - "expired_timestamp" - ] # 失效时间戳 - - except json.decoder.JSONDecodeError: - with open(self.certifications_path, "w", encoding="utf-8") as file: - json.dump( - certifications := {}, - file, - ensure_ascii=False, - ) - - except Exception as exception: - raise RuntimeError( - f"获取访问凭证发生异常:{str(exception)}" - ) from exception - + token, expired_timestamp = self.certifications.get( + servicer, + (None, 0), + ) # 指定服务商的访问令牌和失效时间戳 if time.time() > expired_timestamp: match servicer: - case "szkt": + # 刷新深圳快瞳访问凭证 + case "inspirvision": token, expired_timestamp = ( - self._get_inspirvision_certification() + self._refresh_inspirvision_certification() ) + # 刷新合力亿捷访问凭证 case "hollycrm": - token, expired_timestamp = self._get_hollycrm_certification() + token, expired_timestamp = ( + self._refresh_hollycrm_certification() + ) + # 刷新飞书访问凭证 case "feishu": - token, expired_timestamp = self._get_feishu_certification() + token, expired_timestamp = self._refresh_feishu_certification() case _: - raise RuntimeError(f"暂不支持该服务商:{servicer}") + raise RuntimeError(f"未知服务商:{servicer}") - # 更新访问凭证 - certifications[servicer] = { - "token": token, - "expired_timestamp": expired_timestamp, - } - with open(self.certifications_path, "w", encoding="utf-8") as file: + self.certifications.update( + { + servicer: ( + token, + expired_timestamp, + ), + } + ) + # 更新所有服务商的访问凭证本地文件 + with open(self.certifications_path, mode="w", encoding="utf-8") as file: json.dump( - certifications, - file, + obj=self.certifications, + fp=file, ensure_ascii=False, ) return token - def _get_inspirvision_certification(self) -> Tuple[str, float]: + def _refresh_inspirvision_certification(self) -> Tuple[str, float]: """ - 获取深圳快瞳访问凭证 - :return: 访问令牌和失效时间戳 + 刷新深圳快瞳访问凭证 + :return: 深圳快瞳访问令牌和失效时间戳 """ response = self.request.get( url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" ) - # 若非响应成功则抛出异常 if not (response["status"] == 200 and response["code"] == 0): - raise RuntimeError("获取深圳快瞳访问凭证发生异常") - + raise RuntimeError("刷新深圳快瞳访问凭证发生异常") return ( response["data"]["access_token"], time.time() + response["data"]["expires_in"], - ) + ) # https://inspirvision.cn/spa/documentCenter/ocr-finance/mark?id=3 深圳快瞳获取访问凭证接口,其中 access_token 为访问令牌,expires_in 为有效剩余时间(单位为秒) - def _get_hollycrm_certification(self) -> Tuple[str, float]: - """获取合力亿捷访问凭证""" + def _refresh_hollycrm_certification(self) -> Tuple[str, float]: + """ + 刷新合力亿捷访问凭证 + :return: 合力亿捷访问令牌和失效时间戳 + """ # 企业访问标识 access_key_id = "25938f1c190448829dbdb5d344231e42" # 签名秘钥 secret_access_key = "44dc0299aff84d68ae27712f8784f173" - # 时间戳(秒级) + # 时间戳 timestamp = int(time.time()) - # 构建签名 + # 基于 HMAC-SHA256 算法构建签名 signature = hmac.new( secret_access_key.encode("utf-8"), f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"), @@ -142,14 +143,17 @@ class Authenticator: ) # 若非响应成功则抛出异常 if not response["success"]: - raise RuntimeError("获取合力亿捷访问凭证发生异常") + raise RuntimeError("刷新合力亿捷访问凭证发生异常") return ( response["data"], time.time() + 1 * 3600, # 访问令牌有效期为1小时 ) - def _get_feishu_certification(self) -> Tuple[str, float]: - """获取飞书访问凭证""" + def _refresh_feishu_certification(self) -> Tuple[str, float]: + """ + 刷新飞书访问凭证 + :return: 飞书访问令牌和失效时间戳 + """ response = self.request.post( url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", data={ @@ -159,7 +163,7 @@ class Authenticator: ) # 若非响应成功则抛出异常 if not response["code"] == 0: - raise RuntimeError("获取飞书访问凭证发生异常") + raise RuntimeError("刷新飞书访问凭证发生异常") return ( response["tenant_access_token"], time.time() + response["expire"], diff --git a/utils/certifications.json b/utils/certifications.json index d177044..52a1f89 100644 --- a/utils/certifications.json +++ b/utils/certifications.json @@ -1 +1 @@ -{"szkt": {"token": "a62c56ace614a6546191d5af8ca8b1513cfaeaea7ce67d0a37de994ab6c2aa4e2a0b058e0da575ff376dd51dc19c5ad353ab2761cb6d9db4d521b83adeee2979b78f7ae70765b26985165b6266d084b75f2f918008966e72a116d8bca5ec4c7cecc5223f78fa47b4d40aa9cf5277a11b0b967ad06e84ef7c4acbc53ccdef936c062b2d037ae0dad8c29d50426b668ec349cc8c0099a0270e16f97d31e4f058bc086334468f88d934c7fd1464ed3800833d2f486dc06f0689b99abbb78a8ebf4a3877bd82d0dd765dc09b7a1594fa8849d51f59282a81048c52e82e8320d1ad042a6c307ca831647cba4356564704780f", "expired_timestamp": 1859201579.393386}, "feishu": {"token": "t-g1043hkKMBTKBZNZJPNCVDO7JMNOLWMM56ZCWKR6", "expired_timestamp": 1773758800.9476614}} \ No newline at end of file +{"feishu": ["t-g1043icTZGZGOXVHKENQWLCOHTRRBNSQEYJLOVNT", 1773816955.118503]} \ No newline at end of file