diff --git a/utils/certifications.json b/utils/certifications.json index 0c59df0..efdc1be 100644 --- a/utils/certifications.json +++ b/utils/certifications.json @@ -1,5 +1 @@ -<<<<<<< HEAD -{"feishu": ["t-g1043mboYRA4SDDKUY5RTRLCHPF3YGJQIYCLGDVP", 1774157093.7511718]} -======= -{"feishu": ["t-g1043kdp74T7OBBWKU32LNRRUIOSIE4Q77JNQTEZ", 1773991516.999697]} ->>>>>>> 239b7d7f2837822ce7a85afdcb85bafc4888d6f7 +{"feishu": ["t-g1043ngrEEDZF356VR4UCQ53TWQLEUJ5MRGSKLFX", 1774261660.611816], "cloudreve": ["eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwic3ViIjoiYUpIRCIsImV4cCI6MTc3NDI2Mjg1NywibmJmIjoxNzc0MjU5MjU3fQ.OHAHMg_R_1KXB9Qd88xwx148THGneViUdR6ZyLnN8Ws", 1774262857.749695]} \ No newline at end of file diff --git a/utils/cloudreve.py b/utils/cloudreve.py index 74db1f0..f8f3b58 100644 --- a/utils/cloudreve.py +++ b/utils/cloudreve.py @@ -3,20 +3,11 @@ 封装 Cloudreve 客户端 """ -from email.parser import BytesParser -from email.policy import default -from email.utils import parsedate_to_datetime -from imaplib import IMAP4_SSL -import re -from time import time -from typing import Any, Dict, Optional, List - -from pydantic.config import JsonEncoder +from typing import Generator +from urllib.parse import quote from authenticator import Authenticator from request import Request -from base64 import b64encode -from urllib.parse import quote class Cloudreve: @@ -28,9 +19,9 @@ class Cloudreve: # 实例化请求客户端 self.http_client = Request() - def _create_upload_session(self, uri: str, size: int) -> str: + def _get_upload_session(self, uri: str, size: int) -> str: """ - 创建上传会话 + 获取上传会话标识 :param uri: 统一资源标识符 :param size: 文件大小 :return: 上传会话标识 @@ -44,6 +35,7 @@ class Cloudreve: json={ "uri": quote(string=uri, safe=":/?&="), # 编码统一资源标识 "size": size, + "policy_id": "zpHb", # https://cloudreve.liubiren.cloud/api/v4/file?uri=cloudreve://my/转直链 可查看该文件夹储存策略 }, ) # 若非响应成功则抛出异常 @@ -51,19 +43,86 @@ class Cloudreve: raise RuntimeError("创建上传会话发生异常") return response["data"]["session_id"] - def _upload_file_chunk(self, session_id: str, index: int, file_chunk: bytes) -> str: + def _upload_file_chunk(self, session_id: str, index: int, chunk: bytes) -> None: """ 上传文件块 :param session_id: 上传会话标识 :param index: 文件块索引 - :return: 上传文件标识 + :param chunk: 文件块数据 + :return: None """ response = self.http_client.post( url=f"https://cloudreve.liubiren.cloud/api/v4/file/upload/{session_id}/{index}", headers={ "Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}", "Content-Type": "application/octet-stream", - "Content-Length": len(file_chunk), + "Content-Length": str(len(chunk)), }, - json=open(file_path, "rb").read(), + data=chunk, ) + if not response["code"] == 0: + raise RuntimeError("上传文件块发生异常") + + def _upload_file(self, uri: str, size: int, generator: Generator) -> None: + """ + 上传文件 + :param uri: 统一资源标识符 + :param size: 文件大小 + :param generator: 文件块生成器 + :return: None + """ + # 获取上传会话标识 + session_id = self._get_upload_session(uri=uri, size=size) + for index, chunk in enumerate(generator): + # 上传文件块 + self._upload_file_chunk(session_id=session_id, index=index, chunk=chunk) + + def _create_direct_link(self, uri: str) -> str: + """ + 创建直链 + :param uri: 统一资源标识符 + :return: 直链 + """ + response = self.http_client.put( + url="https://cloudreve.liubiren.cloud/api/v4/file/source", + headers={ + "Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}", + "Content-Type": "application/json; charset=utf-8", + }, + json={ + "uris": [quote(string=uri, safe=":/?&=")], # 编码统一资源标识 + }, + ) + if not response["code"] == 0: + raise RuntimeError("获取直链发生异常") + return response["data"][0]["link"] + + def get_direct_link(self, uri: str, size: int, generator: Generator) -> str: + """ + 获取直链 + :param uri: 统一资源标识符 + :return: 直链 + """ + response = self.http_client.get( + url=f"https://cloudreve.liubiren.cloud/api/v4/file/info", + headers={ + "Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}", + "Content-Type": "application/json; charset=utf-8", + }, + params={ + "uri": quote(string=uri, safe=":/?&="), # 编码统一资源标识 + "extended": "true", # 获取文件扩展信息 + }, + ) + # 若文件已存在则返回直链,否则上传文件并创建直连 + if response["code"] == 0: + return response["data"]["extended_info"]["direct_links"][0]["url"] + # 上传文件 + self._upload_file( + generator=generator, + uri=uri, + size=size, + ) + # 创建直连 + direct_link = self._create_direct_link(uri=uri) + return direct_link diff --git a/utils/feishu.py b/utils/feishu.py index 4d24d36..368aa31 100644 --- a/utils/feishu.py +++ b/utils/feishu.py @@ -3,17 +3,19 @@ 封装飞书客户端 """ +from base64 import b64encode from email.parser import BytesParser from email.policy import default from email.utils import parsedate_to_datetime from imaplib import IMAP4_SSL import re from time import time -from typing import Any, Dict, Optional, List +from typing import Any, Dict, List, Optional +from urllib.parse import unquote from authenticator import Authenticator +from cloudreve import Cloudreve from request import Request -from base64 import b64encode class Feishu: @@ -22,9 +24,13 @@ class Feishu: def __init__(self): # 实例化认证器 self.authenticator = Authenticator() + # 实例化请求客户端 self.http_client = Request() + # 实例化 Cloudreve 客户端 + self.cloudreve = Cloudreve() + @staticmethod def get_mail_verification_code( folder: str, regular_expression: str @@ -39,7 +45,6 @@ class Feishu: 使用示例: feishu = Feishu() feishu.get_mail_verification_code(folder="邮箱验证码", regular_expression=r"【普康健康】您的验证码是:(\\d+)") - 输出:123456 """ if not folder: raise RuntimeError("邮箱文件夹名不能为空") @@ -173,6 +178,11 @@ class Feishu: :param table_id: 多维表格数据表标识 :return: 多维表格记录 """ + """ + 使用示例: + feishu = Feishu() + feishu.get_bitable_records(app_token="A17bbGqZZaVWnfsFgencdls7nNf", table_id="tblijCBxHdfWyGcu") + """ # 构建多维表格查询记录的请求地址 url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search" # https://open.feishu.cn/document/docs/bitable-v1/app-table-record/search 默认分页大小为 20 @@ -209,18 +219,17 @@ class Feishu: return records - def _convert_to_cloudreve_direct_link(self, material_token: str) -> str: + def generate_direct_link(self, material_token: str) -> str: """ - 转为Cloudreve直链 + 生成直链 :param material_token: 素材标识 - :return: 素材直链地址 + :return: 直链 + """ + """ + 使用示例: + feishu = Feishu() + feishu.generate_direct_link(material_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh") """ - # 获取 Cloudreve 上传 session_id - session_id = self.http_client. - - - - # 构建下载素材的请求地址 url = f"https://open.feishu.cn/open-apis/drive/v1/medias/{material_token}/download" # https://open.feishu.cn/document/server-docs/docs/drive-v1/media/download @@ -232,27 +241,39 @@ class Feishu: } ) - response = self.http_client.download( + response_headers, generator = self.http_client.download( url=url, headers=headers, -<<<<<<< HEAD - stream_enabled=True, - ) # 默认使用流式传输 - -======= - stream_enabled=stream_enabled, - ) - print(type(response[0])) - print(type(response[1])) - print(type(response[2])) + stream=True, + ) # 开启流式传输 ->>>>>>> 239b7d7f2837822ce7a85afdcb85bafc4888d6f7 + matched = re.search( + pattern=r"filename\*=(?P[^']+)'([^']*)'(?P.+)", + string=response_headers["Content-Disposition"], + ) # 按照 RFC 5987 标准格式(形如 filename*=UTF-8'zh-CN'%E7%BA%A2%E5%AF%8C%E5%A3%AB.jpeg )解析素材名称 + # 解析素材名称 + if not matched: + raise RuntimeError("解析素材名称失败") + # 解码素材名称 + material_name = unquote( + string=matched.group("material_name"), + encoding=matched.group("encoding"), + ) + # 解析素材大小 + material_size = int(response_headers["Content-Length"]) + + # 构建素材统一资源标识符 + material_uri = f"cloudreve://my/转直链/{material_name}" + + return self.cloudreve.get_direct_link( + uri=material_uri, size=material_size, generator=generator + ) a = Feishu() print( - a.download_material( - file_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh", + a.generate_direct_link( + material_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh", ) ) diff --git a/utils/request.py b/utils/request.py index a0d0fb2..6446d86 100644 --- a/utils/request.py +++ b/utils/request.py @@ -25,13 +25,13 @@ class Parameters(BaseModel): """ url: HttpUrl = Field(default=..., description="统一资源定位符,基于HttpUrl自动校验") - params: Optional[Dict[str, Any]] = Field( - default=None, description="统一资源定位符的查询参数" + params: Optional[Dict[str, Any]] = Field(default=None, description="查询参数") + headers: Optional[Dict[str, Any]] = Field(default=None, description="请求头") + data: Optional[Union[str, bytes, Dict[str, Any]]] = Field( + default=None, description="数据参数" ) - headers: Optional[Dict[str, str]] = Field(default=None, description="请求头") - form: Optional[Dict[str, Any]] = Field(default=None, description="表单数据") json_: Optional[Dict[str, Any]] = Field( - default=None, alias="json", description="JSON 数据" + default=None, alias="json", description="JSON 参数" ) files: Optional[ Dict[ @@ -44,22 +44,22 @@ class Parameters(BaseModel): ] ] = Field( default=None, - description="上传文件,{字段名: (文件名, 字节数据, 内容类型, 请求头)}", + description="文件上传参数", ) - stream_enabled: Optional[bool] = Field(default=None, description="使用流式传输") + stream: Optional[bool] = Field(default=None, description="开启流式传输") guid: Optional[str] = Field(default=None, description="缓存全局唯一标识") @model_validator(mode="after") def validate_data(self): - """校验:表单数据和JSON数据互斥""" - if self.form and self.json_: - raise ValueError("表单数据和 JSON 数据不能同时使用") + """校验:表单参数和JSON参数互斥""" + if self.data and self.json_: + raise ValueError("表单参数和 JSON 参数不能同时使用") return self @model_validator(mode="after") def validate_files(self): - if self.files and self.stream_enabled: - raise ValueError("上传文件和使用流式传输不能同时使用") + if self.files and self.stream: + raise ValueError("文件上传参数和开启流式传输不能同时使用") return self @@ -264,29 +264,30 @@ class Request: :param kwargs: 请求参数 :return: 响应内容 """ - # 若表单数据和 JSON 数据同时为空则重构 JSON数据 - if not kwargs.get("form") and not kwargs.get("json"): + # 若表单参数和 JSON 参数同时为空则重构 JSON 参数 + if not kwargs.get("data") and not kwargs.get("json"): kwargs["json"] = {} return self._request(method="POST", parameters=Parameters(**kwargs)) def download( - self, stream_enabled: bool = False, chunk_size: int = 1024 * 1024, **kwargs + self, stream: bool = False, chunk_size: int = 2 * 1024 * 1024, **kwargs ) -> Any: """ 下载 - :param stream_enabled: 使用流式传输,默认为关闭流式传输 - :param chunk_size: 分块大小,若开启流式传输则分块大小默认为 1MB + :param stream: 开启流式传输,默认为关闭流式传输 + :param chunk_size: 分块大小,若开启流式传输则分块大小默认为 2MB :param kwargs: 请求参数 :return: 响应内容 """ response = self._request( method="GET", - parameters=Parameters(**{"stream_enabled": stream_enabled, **kwargs}), + parameters=Parameters(**{"stream": stream, **kwargs}), ) - # 若使用流式传输则返回响应内容迭代器 - if stream_enabled: - return self._process_stream_response( + + # 若开启流式传输则返回响应内容迭代器 + if stream: + return response.headers, self._process_stream_response( response=response, chunk_size=chunk_size ) @@ -308,17 +309,14 @@ class Request: # 将统一资源定位符转为字符串 url = str(kwargs.pop("url")) - # 过滤表单数据中空值 - if kwargs.get("form"): - kwargs["form"] = {k: v for k, v in kwargs["form"].items() if v} + # 过滤表单参数中空值 + if kwargs.get("data") and isinstance(kwargs["data"], dict): + kwargs["data"] = {k: v for k, v in kwargs["data"].items() if v} - # 过滤 JSON 数据中空键值对 + # 过滤 JSON 参数中空键值对 if kwargs.get("json"): kwargs["json"] = {k: v for k, v in kwargs["json"].items() if v} - # 使用流式传输 - stream_enabled = kwargs.pop("stream_enabled", False) - # 缓存全局唯一标识 guid = kwargs.pop("guid", None) # 若缓存非空且缓存全局唯一标识非空则查询并获取单条缓存 @@ -334,8 +332,8 @@ class Request: ) response.raise_for_status() # 若返回非2??状态码则抛出异常 - # 若使用流式传输则直接返回响应对象(不缓存) - if stream_enabled: + # 若开启流式传输则直接返回响应对象(不缓存) + if kwargs.get("stream"): return response # 处理响应对象 @@ -345,7 +343,6 @@ class Request: self.caches.update(guid, response) return response - # 重构异常信息 except Exception as exception: response = getattr(