diff --git a/utils/authenticator.py b/utils/authenticator.py index b2ba0a4..1b3d8d0 100644 --- a/utils/authenticator.py +++ b/utils/authenticator.py @@ -70,7 +70,7 @@ class Authenticator: ) # 指定服务商的访问令牌和失效时间戳 if time.time() > expired_timestamp: match servicer: - # 刷新深圳快瞳访问凭证 + # 刷新深圳快瞳访问凭证cd C:\Python\.venv\Scripts case "inspirvision": token, expired_timestamp = ( self._refresh_inspirvision_certification() @@ -155,7 +155,7 @@ class Authenticator: """ response = self.request.post( url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", - data={ + json={ "app_id": "cli_a1587980be78500c", "app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA", }, diff --git a/utils/certifications.json b/utils/certifications.json index 52a1f89..c775cdf 100644 --- a/utils/certifications.json +++ b/utils/certifications.json @@ -1 +1 @@ -{"feishu": ["t-g1043icTZGZGOXVHKENQWLCOHTRRBNSQEYJLOVNT", 1773816955.118503]} \ No newline at end of file +{"feishu": ["t-g1043jkq4UBLA2TLB6HFDIUPFXIQBTCHUMZ5XPYX", 1773930401.8437002]} \ No newline at end of file diff --git a/utils/feishu.py b/utils/feishu.py index 87075e8..6461fa6 100644 --- a/utils/feishu.py +++ b/utils/feishu.py @@ -8,13 +8,12 @@ from email.policy import default from email.utils import parsedate_to_datetime from imaplib import IMAP4_SSL import re -from time import sleep, time -from typing import Any, Dict - -import pandas +from time import time +from typing import Any, Dict, Optional, List from authenticator import Authenticator from request import Request +from base64 import b64encode class Feishu: @@ -26,220 +25,220 @@ class Feishu: # 实例化请求客户端 self.http_client = Request() - def _get_headers(self) -> Dict[str, Any]: - """获取请求头""" - # 构建身份认证请求头 - return { - "Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}", - } - @staticmethod - def get_verification_code(): - try: - # 当前时间戳 - current_timestamp = time() + def get_mail_verification_code( + folder: str, regular_expression: str + ) -> Optional[str]: + """ + 根据邮箱文件夹名获取邮箱验证码 + :param folder: 邮箱文件夹名 + :param regular_expression: 正则表达式 + :return: 邮箱验证码 + """ + """ + 使用示例: + feishu = Feishu() + print(feishu.get_mail_verification_code(folder="邮箱验证码", regular_expression=r"【普康健康】您的验证码是:(\\d+)")) + 输出:123456 + """ + if not folder: + raise RuntimeError("邮箱文件夹名不能为空") + # 若邮箱文件夹名不可ASCII编码(例如包含中文)则按照 IMAP 协议支持的字符串 + try: + folder.encode("ascii") + except UnicodeEncodeError: + folder = f"&{b64encode(folder.encode(encoding="utf-16be")).decode(encoding="ascii").replace("/", ",").replace("+", "-").rstrip("=")}-" + + try: # 建立加密IMAP连接 connection = IMAP4_SSL(host="imap.feishu.cn", port=993) # 登录 connection.login(user="mars@liubiren.cloud", password="aJBZSZzhQN13M11K") + except Exception as exception: + raise RuntimeError(f"登录邮箱发生异常:{str(exception)}") - while True: - if time() <= current_timestamp + 120: - sleep(5) - # 选择邮箱文件夹(邮箱验证码) - connection.select(mailbox="&kK57sZqMi8F4AQ-") - try: - # 获取最后一封邮件索引,server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引 - index = server.search(None, "ALL")[1][0].split()[-1] - # 获取最后一封邮件内容并解析,server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文 - # noinspection PyUnresolvedReferences - contents = BytesParser(policy=default).parsebytes( - server.fetch(index, "(RFC822)")[1][0][1] + # 开始时间戳(秒级,若无特殊说明时间戳均为秒级) + start_timestamp = time() + # 上一次查询时间戳 + last_timestamp = 0 + + while True: + # 当前时间戳 + current_timestamp = time() + + # 若当前时间戳大于超时时间戳则登出并返回空 + if current_timestamp > start_timestamp + 120: + connection.logout() + return None + + # 若当前时间戳和上一次查询时间戳间隔小于5秒则跳转至下一次循环 + if current_timestamp - last_timestamp < 5: + continue + + last_timestamp = current_timestamp + + # 选择邮箱文件夹 + connection.select(mailbox=folder) + + # 查询该邮箱文件夹内所有邮件 + status, indices = connection.search( + "utf-8", "ALL" + ) # search()返回形如 ('OK', [b'1 2 3 4 5']) 元组。其中,第一个元素为查询状态,第二个元素为查询结果(邮件索引的字节串列表) + # 若查询状态非成功则登出并抛出异常 + if status != "OK": + connection.logout() + raise RuntimeError(f"查询邮箱文件夹内所有邮件失败") + + # 拼接所有邮件索引并拆分为邮件索引列表 + indices = b" ".join(indices).split() + # 若邮件索引列表为空则跳转至下一次循环 + if not indices: + continue + + # 查询该邮箱文件夹内最后一封邮件完整原始内容 + status, contents = connection.fetch(indices[-1].decode("utf-8"), "(RFC822)") + # 若查询状态非成功则登出并抛出异常 + if status != "OK": + connection.logout() + raise RuntimeError(f"查询邮箱文件夹内最后一封邮件完整原始内容失败") + + contents = b"".join( + item[1] if isinstance(item, tuple) else item + for item in contents + if isinstance(item, (bytes, tuple)) + and not (isinstance(item, bytes) and item == b")") + ) # IMAP 协议中约定邮件完整原始内容中,第一个元素为二元组,形如 (b'5 (RFC822 {1234}', b'邮件字节') (第一个元素为元数据,第二个元素为第一部分内容),最后一个元素为结束符 b')' + # 若邮件内容为空则跳转至下一次循环 + if not contents: + continue + + # 解析邮件内容 + contents = BytesParser(policy=default).parsebytes(text=contents) + # 邮件发送时间戳 + send_timestamp = parsedate_to_datetime(contents["Date"]).timestamp() + # 若邮件发送时间戳小于开始时间戳减去冗余(近N秒)则跳转至下一次循环 + if send_timestamp < start_timestamp - 300: + continue + + for content in contents.walk(): + # 若内容类型非 text/html 或 text/plain 则跳转至下一次循环 + if content.get_content_type() not in ["text/plain", "text/html"]: + continue + + # 获取内容载荷 + payload = content.get_payload(decode=True) + match payload: + # 若内容载荷类型为字节串则解码 + case _ if isinstance(payload, bytes): + payload = payload.decode( + encoding=content.get_content_charset() or "utf-8", + errors="replace", ) - # 遍历邮件内容,若正文内容类型为纯文本或HTML则解析发送时间和验证码 - for content in contents.walk(): - if ( - content.get_content_type() == "text/plain" - or content.get_content_type() == "text/html" - ): - # 邮件发送时间戳 - # noinspection PyUnresolvedReferences - send_timestamp = parsedate_to_datetime( - content["Date"] - ).timestamp() - # 若邮件发送时间戳大于执行时间戳则解析验证码并返回 - if ( - execute_timestamp - > send_timestamp - >= execute_timestamp - 35 - ): - # 登出 - server.logout() - # 解析验证码 - return re.search( - r"【普康健康】您的验证码是:(\d+)", - content.get_payload(decode=True).decode(), - ).group(1) - - # 若文件夹无邮件则继续 - except: - pass - - # 若超时则登出 - else: - server.logout() - return None - - except Exception: - raise RuntimeError("获取邮箱验证码发生其它异常") - - # 查询多维表格记录,单次最多查询500条记录 - @restrict(refill_rate=5, max_tokens=5) - def query_bitable_records( - self, - bitable: str, - table_id: str, - field_names: Optional[list[str]] = None, - filter_conditions: Optional[dict] = None, - ) -> pandas.DataFrame: - # 先查询多维表格记录,在根据字段解析记录 - # 装配多维表格查询记录地址 - url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20" - - response = self.http_client.post( - url=url, - headers=self._headers(), - json={"field_names": field_names, "filter": filter_conditions}, - ) - - # 响应业务码为0则定义为响应成功 - assert response.get("code") == 0, "查询多维表格记录发生异常" - - # 多维表格记录 - records = response.get("data").get("items") - - # 检查响应中是否包含还有下一页标识,若有则继续请求下一页 - while response.get("data").get("has_more"): - - url_next = url + "&page_token={}".format( - response.get("data").get("page_token") - ) - - response = self.http_client.post( - url=url_next, - headers=self._get_headers(), - json={"field_names": field_names, "filter": filter_conditions}, - ) - - assert response.get("code") == 0, "查询多维表格记录发生异常" - - # 合并记录 - records.append(response.get("data").get("items")) - - # 装配多维表格列出字段地址 - url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20" - - response = self.http_client.get( - url=url, - headers=self._headers(), - ) - - assert response.get("code") == 0, "列出多维表格字段发生异常" - - # 多维表格字段 - fields = response.get("data").get("items") - - while response.get("data").get("has_more"): - - url_next = url + "&page_token={}".format( - response.get("data").get("page_token") - ) - - response = self.http_client.get( - url=url_next, - headers=self._headers(), - ) - - assert response.get("code") == 0, "列出多维表格字段发生异常" - - fields.append(response.get("data").get("items")) - - # 字段映射 - field_mappings = {} - - for field in fields: - - # 字段名 - field_name = field["field_name"] - - # 根据字段类型匹配 - match field["type"]: - - case 1005: - - field_type = "主键" - - case 1: - - field_type = "文本" - - case 3: - - field_type = "单选" - - case 2: - - # 数字、公式字段的显示格式 - match field["property"]["formatter"]: - - case "0": - - field_type = "整数" - - case _: - - raise ValueError("未设置数字、公式字段的显示格式") - - case _: - - raise ValueError("未设置字段类型") - - # noinspection PyUnboundLocalVariable - field_mappings.update({field_name: field_type}) - - # 记录数据体 - records_data = [] - - # 解析记录 - for record in records: - - # 单条记录数据体 - record_data = {} - - for field_name, content in record["fields"].items(): - - match field_mappings[field_name]: - - case "主键" | "单选" | "整数": - - record_data.update({field_name: content}) - - case "文本": - - # 若存在多行文本则拼接 - fragments_content = "" - - for fragment_content in content: - - fragments_content += fragment_content["text"] - - record_data.update({field_name: fragments_content}) - + case _ if isinstance(payload, str): + payload = payload + # 若内容为空则跳转至下一次循环 case _: + continue - raise ValueError("未设置字段解析方法") + matched = re.match( + pattern=regular_expression, + string=payload, + ) + # 若未匹配到验证码则跳转至下一次循环 + if not matched: + continue - records_data.append(record_data) + connection.logout() + return matched.group(1) - return pandas.DataFrame(records_data) + def _get_headers(self) -> Dict[str, Any]: + """ + 获取请求头 + :return: 请求头 + """ + return { + "Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}", + } + + # 获取多维表格记录 + def get_bitable_records( + self, + app_token: str, + table_id: str, + ) -> List[Dict[str, Any]]: + """ + 获取多维表格记录 + :param app_token: 多维表格应用标识 + :param table_id: 多维表格数据表标识 + :return: 多维表格记录 + """ + # 构建多维表格查询记录的请求地址 + url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search" # https://open.feishu.cn/document/docs/bitable-v1/app-table-record/search 默认分页大小为 20 + + headers = self._get_headers() + # 添加 Content-Type 请求头 + headers.update( + { + "Content-Type": "application/json; charset=utf-8", + } + ) + + records = [] # 用于临时保存多维表格记录 + page_token = None # 分页标识 + while True: + response = self.http_client.post( + url=(f"{url}?&page_token={page_token}" if page_token else url), + headers=headers, + ) + # 若响应错误代码非0则抛出异常 + if response["code"] != 0: + raise RuntimeError( + f"请求多维表格查询记录失败:{response['code']} {response['message']}" + ) + + # 合并多维表格记录 + records.extend(response["data"]["items"]) + + # 若响应没有更多记录则跳出循环 + if not response["data"]["has_more"]: + break + + # 更新分页标识 + page_token = response["data"]["page_token"] + + return records + + def download_material(self, file_token: str, stream_enabled: bool = False) -> str: + """ + 下载素材 + :param file_token: 素材标识 + :param stream_enabled: 使用流式传输,默认 False + :return: 素材 base64 编码的字符串 + """ + # 构建下载素材的请求地址 + url = f"https://open.feishu.cn/open-apis/drive/v1/medias/{file_token}/download" # https://open.feishu.cn/document/server-docs/docs/drive-v1/media/download + + headers = self._get_headers() + # 添加 Content-Type 请求头 + headers.update( + { + "Content-Type": "application/json; charset=utf-8", + } + ) + + response = self.http_client.download( + url=url, + headers=headers, + stream_enabled=stream_enabled, + ) + print(response) + + +a = Feishu() + +print( + a.download_material( + file_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh", + ) +) diff --git a/utils/request.py b/utils/request.py index 7fb6a65..32e2215 100644 --- a/utils/request.py +++ b/utils/request.py @@ -4,10 +4,10 @@ """ import json +from pathlib import Path import sys import time -from pathlib import Path -from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union +from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union from xml.etree import ElementTree from pydantic import BaseModel, Field, HttpUrl, model_validator @@ -15,7 +15,7 @@ from requests import Response, Session from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry -sys.path.append(Path(__file__).parent.as_posix()) +from restrict import restrict from sqlite import SQLite @@ -29,9 +29,9 @@ class Parameters(BaseModel): default=None, description="统一资源定位符的查询参数" ) headers: Optional[Dict[str, str]] = Field(default=None, description="请求头") - data: Optional[Dict[str, Any]] = Field(default=None, description="表单数据") + form: Optional[Dict[str, Any]] = Field(default=None, description="表单数据") json_: Optional[Dict[str, Any]] = Field( - default=None, alias="json", description="JSON数据" + default=None, alias="json", description="JSON 数据" ) files: Optional[ Dict[ @@ -52,13 +52,13 @@ class Parameters(BaseModel): @model_validator(mode="after") def validate_data(self): """校验:表单数据和JSON数据互斥""" - if self.data is not None and self.json_ is not None: - raise ValueError("表单数据和JSON数据不能同时使用") + if self.form and self.json_: + raise ValueError("表单数据和 JSON 数据不能同时使用") return self @model_validator(mode="after") def validate_files(self): - if self.files is not None and self.stream_enabled: + if self.files and self.stream_enabled: raise ValueError("上传文件和使用流式传输不能同时使用") return self @@ -199,6 +199,7 @@ class Request: ) # 初始化超时时间 self.timeout = timeout + # 实例化缓存 self.caches = Caches(cache_ttl=cache_ttl * 86400) if cache_enabled else None @@ -256,6 +257,10 @@ class Request: :param kwargs: 请求参数 :return: 响应内容 """ + # 若表单数据和 JSON 数据同时为空则重构 JSON数据 + if not kwargs.get("form") and not kwargs.get("json"): + kwargs["json"] = {} + return self._request(method="POST", parameters=Parameters(**kwargs)) def download( @@ -279,6 +284,7 @@ class Request: ) return response + @restrict(max_tokens=5, refill_rate=5.0) def _request(self, method: Literal["GET", "POST"], parameters: Parameters) -> Any: """ 请求 @@ -293,10 +299,10 @@ class Request: url = str(kwargs.pop("url")) # 过滤表单数据中空值 - if kwargs.get("data"): - kwargs["data"] = {k: v for k, v in kwargs["data"].items() if v} + if kwargs.get("form"): + kwargs["form"] = {k: v for k, v in kwargs["form"].items() if v} - # 过滤JSON数据中空值 + # 过滤 JSON 数据中空键值对 if kwargs.get("json"): kwargs["json"] = {k: v for k, v in kwargs["json"].items() if v} @@ -332,22 +338,24 @@ class Request: # 重构异常信息 except Exception as exception: - try: - response = getattr(exception, "response", None) - status = ( - response.json().get("status", response.status_code) - if response - else None - ) - message = ( - response.json().get("message", response.text) - if response - else str(exception).splitlines()[0] - ) - except Exception: - status = None - message = f"{method} {kwargs["url"]} 请求发生异常:{str(exception).splitlines()[0]}" - return RequestException(status=status, message=message).__dict__ + response = getattr( + exception, "response", None + ) # 若非 requests 异常或服务未响应则为 None + if response is not None: # 注意 if 返回 False + # 获取响应状态码 + status = response.status_code + # 尝试将异常信息解析为 JSON,若非 + try: + response_json = response.json() + # 错误代码 + code = response_json.get("code", 999999) + # 错误信息 + message = response_json.get("msg", response.text) + except Exception: + code, message = 999999, response.text + else: + status, code, message = 400, 999999, f"{str(exception)}" + return RequestException(status=status, code=code, message=message).__dict__ @staticmethod def _process_response(