This commit is contained in:
liubiren 2026-03-23 18:05:00 +08:00
parent 06f90694eb
commit ca3e1977a8
4 changed files with 152 additions and 79 deletions

View File

@ -1,5 +1 @@
<<<<<<< HEAD
{"feishu": ["t-g1043mboYRA4SDDKUY5RTRLCHPF3YGJQIYCLGDVP", 1774157093.7511718]}
=======
{"feishu": ["t-g1043kdp74T7OBBWKU32LNRRUIOSIE4Q77JNQTEZ", 1773991516.999697]}
>>>>>>> 239b7d7f2837822ce7a85afdcb85bafc4888d6f7
{"feishu": ["t-g1043ngrEEDZF356VR4UCQ53TWQLEUJ5MRGSKLFX", 1774261660.611816], "cloudreve": ["eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwic3ViIjoiYUpIRCIsImV4cCI6MTc3NDI2Mjg1NywibmJmIjoxNzc0MjU5MjU3fQ.OHAHMg_R_1KXB9Qd88xwx148THGneViUdR6ZyLnN8Ws", 1774262857.749695]}

View File

@ -3,20 +3,11 @@
封装 Cloudreve 客户端
"""
from email.parser import BytesParser
from email.policy import default
from email.utils import parsedate_to_datetime
from imaplib import IMAP4_SSL
import re
from time import time
from typing import Any, Dict, Optional, List
from pydantic.config import JsonEncoder
from typing import Generator
from urllib.parse import quote
from authenticator import Authenticator
from request import Request
from base64 import b64encode
from urllib.parse import quote
class Cloudreve:
@ -28,9 +19,9 @@ class Cloudreve:
# 实例化请求客户端
self.http_client = Request()
def _create_upload_session(self, uri: str, size: int) -> str:
def _get_upload_session(self, uri: str, size: int) -> str:
"""
创建上传会话
获取上传会话标识
:param uri: 统一资源标识符
:param size: 文件大小
:return: 上传会话标识
@ -44,6 +35,7 @@ class Cloudreve:
json={
"uri": quote(string=uri, safe=":/?&="), # 编码统一资源标识
"size": size,
"policy_id": "zpHb", # https://cloudreve.liubiren.cloud/api/v4/file?uri=cloudreve://my/转直链 可查看该文件夹储存策略
},
)
# 若非响应成功则抛出异常
@ -51,19 +43,86 @@ class Cloudreve:
raise RuntimeError("创建上传会话发生异常")
return response["data"]["session_id"]
def _upload_file_chunk(self, session_id: str, index: int, file_chunk: bytes) -> str:
def _upload_file_chunk(self, session_id: str, index: int, chunk: bytes) -> None:
"""
上传文件块
:param session_id: 上传会话标识
:param index: 文件块索引
:return: 上传文件标识
:param chunk: 文件块数据
:return: None
"""
response = self.http_client.post(
url=f"https://cloudreve.liubiren.cloud/api/v4/file/upload/{session_id}/{index}",
headers={
"Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}",
"Content-Type": "application/octet-stream",
"Content-Length": len(file_chunk),
"Content-Length": str(len(chunk)),
},
json=open(file_path, "rb").read(),
data=chunk,
)
if not response["code"] == 0:
raise RuntimeError("上传文件块发生异常")
def _upload_file(self, uri: str, size: int, generator: Generator) -> None:
"""
上传文件
:param uri: 统一资源标识符
:param size: 文件大小
:param generator: 文件块生成器
:return: None
"""
# 获取上传会话标识
session_id = self._get_upload_session(uri=uri, size=size)
for index, chunk in enumerate(generator):
# 上传文件块
self._upload_file_chunk(session_id=session_id, index=index, chunk=chunk)
def _create_direct_link(self, uri: str) -> str:
"""
创建直链
:param uri: 统一资源标识符
:return: 直链
"""
response = self.http_client.put(
url="https://cloudreve.liubiren.cloud/api/v4/file/source",
headers={
"Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}",
"Content-Type": "application/json; charset=utf-8",
},
json={
"uris": [quote(string=uri, safe=":/?&=")], # 编码统一资源标识
},
)
if not response["code"] == 0:
raise RuntimeError("获取直链发生异常")
return response["data"][0]["link"]
def get_direct_link(self, uri: str, size: int, generator: Generator) -> str:
"""
获取直链
:param uri: 统一资源标识符
:return: 直链
"""
response = self.http_client.get(
url=f"https://cloudreve.liubiren.cloud/api/v4/file/info",
headers={
"Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}",
"Content-Type": "application/json; charset=utf-8",
},
params={
"uri": quote(string=uri, safe=":/?&="), # 编码统一资源标识
"extended": "true", # 获取文件扩展信息
},
)
# 若文件已存在则返回直链,否则上传文件并创建直连
if response["code"] == 0:
return response["data"]["extended_info"]["direct_links"][0]["url"]
# 上传文件
self._upload_file(
generator=generator,
uri=uri,
size=size,
)
# 创建直连
direct_link = self._create_direct_link(uri=uri)
return direct_link

View File

@ -3,17 +3,19 @@
封装飞书客户端
"""
from base64 import b64encode
from email.parser import BytesParser
from email.policy import default
from email.utils import parsedate_to_datetime
from imaplib import IMAP4_SSL
import re
from time import time
from typing import Any, Dict, Optional, List
from typing import Any, Dict, List, Optional
from urllib.parse import unquote
from authenticator import Authenticator
from cloudreve import Cloudreve
from request import Request
from base64 import b64encode
class Feishu:
@ -22,9 +24,13 @@ class Feishu:
def __init__(self):
# 实例化认证器
self.authenticator = Authenticator()
# 实例化请求客户端
self.http_client = Request()
# 实例化 Cloudreve 客户端
self.cloudreve = Cloudreve()
@staticmethod
def get_mail_verification_code(
folder: str, regular_expression: str
@ -39,7 +45,6 @@ class Feishu:
使用示例
feishu = Feishu()
feishu.get_mail_verification_code(folder="邮箱验证码", regular_expression=r"【普康健康】您的验证码是:(\\d+)")
输出123456
"""
if not folder:
raise RuntimeError("邮箱文件夹名不能为空")
@ -173,6 +178,11 @@ class Feishu:
:param table_id: 多维表格数据表标识
:return: 多维表格记录
"""
"""
使用示例
feishu = Feishu()
feishu.get_bitable_records(app_token="A17bbGqZZaVWnfsFgencdls7nNf", table_id="tblijCBxHdfWyGcu")
"""
# 构建多维表格查询记录的请求地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search" # https://open.feishu.cn/document/docs/bitable-v1/app-table-record/search 默认分页大小为 20
@ -209,18 +219,17 @@ class Feishu:
return records
def _convert_to_cloudreve_direct_link(self, material_token: str) -> str:
def generate_direct_link(self, material_token: str) -> str:
"""
转为Cloudreve直链
生成直链
:param material_token: 素材标识
:return: 素材直链地址
:return: 直链
"""
"""
使用示例
feishu = Feishu()
feishu.generate_direct_link(material_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh")
"""
# 获取 Cloudreve 上传 session_id
session_id = self.http_client.
# 构建下载素材的请求地址
url = f"https://open.feishu.cn/open-apis/drive/v1/medias/{material_token}/download" # https://open.feishu.cn/document/server-docs/docs/drive-v1/media/download
@ -232,27 +241,39 @@ class Feishu:
}
)
response = self.http_client.download(
response_headers, generator = self.http_client.download(
url=url,
headers=headers,
<<<<<<< HEAD
stream_enabled=True,
) # 默认使用流式传输
=======
stream_enabled=stream_enabled,
)
print(type(response[0]))
print(type(response[1]))
print(type(response[2]))
stream=True,
) # 开启流式传输
>>>>>>> 239b7d7f2837822ce7a85afdcb85bafc4888d6f7
matched = re.search(
pattern=r"filename\*=(?P<encoding>[^']+)'([^']*)'(?P<material_name>.+)",
string=response_headers["Content-Disposition"],
) # 按照 RFC 5987 标准格式(形如 filename*=UTF-8'zh-CN'%E7%BA%A2%E5%AF%8C%E5%A3%AB.jpeg )解析素材名称
# 解析素材名称
if not matched:
raise RuntimeError("解析素材名称失败")
# 解码素材名称
material_name = unquote(
string=matched.group("material_name"),
encoding=matched.group("encoding"),
)
# 解析素材大小
material_size = int(response_headers["Content-Length"])
# 构建素材统一资源标识符
material_uri = f"cloudreve://my/转直链/{material_name}"
return self.cloudreve.get_direct_link(
uri=material_uri, size=material_size, generator=generator
)
a = Feishu()
print(
a.download_material(
file_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh",
a.generate_direct_link(
material_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh",
)
)

View File

@ -25,13 +25,13 @@ class Parameters(BaseModel):
"""
url: HttpUrl = Field(default=..., description="统一资源定位符基于HttpUrl自动校验")
params: Optional[Dict[str, Any]] = Field(
default=None, description="统一资源定位符的查询参数"
params: Optional[Dict[str, Any]] = Field(default=None, description="查询参数")
headers: Optional[Dict[str, Any]] = Field(default=None, description="请求头")
data: Optional[Union[str, bytes, Dict[str, Any]]] = Field(
default=None, description="数据参数"
)
headers: Optional[Dict[str, str]] = Field(default=None, description="请求头")
form: Optional[Dict[str, Any]] = Field(default=None, description="表单数据")
json_: Optional[Dict[str, Any]] = Field(
default=None, alias="json", description="JSON "
default=None, alias="json", description="JSON 参数"
)
files: Optional[
Dict[
@ -44,22 +44,22 @@ class Parameters(BaseModel):
]
] = Field(
default=None,
description="上传文件,{字段名: (文件名, 字节数据, 内容类型, 请求头)}",
description="文件上传参数",
)
stream_enabled: Optional[bool] = Field(default=None, description="使用流式传输")
stream: Optional[bool] = Field(default=None, description="开启流式传输")
guid: Optional[str] = Field(default=None, description="缓存全局唯一标识")
@model_validator(mode="after")
def validate_data(self):
"""校验:表单和JSON数互斥"""
if self.form and self.json_:
raise ValueError("表单和 JSON 数不能同时使用")
"""校验:表单数和JSON数互斥"""
if self.data and self.json_:
raise ValueError("表单数和 JSON 数不能同时使用")
return self
@model_validator(mode="after")
def validate_files(self):
if self.files and self.stream_enabled:
raise ValueError("上传文件和使用流式传输不能同时使用")
if self.files and self.stream:
raise ValueError("文件上传参数和开启流式传输不能同时使用")
return self
@ -264,29 +264,30 @@ class Request:
:param kwargs: 请求参数
:return: 响应内容
"""
# 若表单和 JSON 数同时为空则重构 JSON数
if not kwargs.get("form") and not kwargs.get("json"):
# 若表单数和 JSON 数同时为空则重构 JSON
if not kwargs.get("data") and not kwargs.get("json"):
kwargs["json"] = {}
return self._request(method="POST", parameters=Parameters(**kwargs))
def download(
self, stream_enabled: bool = False, chunk_size: int = 1024 * 1024, **kwargs
self, stream: bool = False, chunk_size: int = 2 * 1024 * 1024, **kwargs
) -> Any:
"""
下载
:param stream_enabled: 使用流式传输默认为关闭流式传输
:param chunk_size: 分块大小若开启流式传输则分块大小默认为 1MB
:param stream: 开启流式传输默认为关闭流式传输
:param chunk_size: 分块大小若开启流式传输则分块大小默认为 2MB
:param kwargs: 请求参数
:return: 响应内容
"""
response = self._request(
method="GET",
parameters=Parameters(**{"stream_enabled": stream_enabled, **kwargs}),
parameters=Parameters(**{"stream": stream, **kwargs}),
)
# 若使用流式传输则返回响应内容迭代器
if stream_enabled:
return self._process_stream_response(
# 若开启流式传输则返回响应内容迭代器
if stream:
return response.headers, self._process_stream_response(
response=response, chunk_size=chunk_size
)
@ -308,17 +309,14 @@ class Request:
# 将统一资源定位符转为字符串
url = str(kwargs.pop("url"))
# 过滤表单中空值
if kwargs.get("form"):
kwargs["form"] = {k: v for k, v in kwargs["form"].items() if v}
# 过滤表单数中空值
if kwargs.get("data") and isinstance(kwargs["data"], dict):
kwargs["data"] = {k: v for k, v in kwargs["data"].items() if v}
# 过滤 JSON 中空键值对
# 过滤 JSON 数中空键值对
if kwargs.get("json"):
kwargs["json"] = {k: v for k, v in kwargs["json"].items() if v}
# 使用流式传输
stream_enabled = kwargs.pop("stream_enabled", False)
# 缓存全局唯一标识
guid = kwargs.pop("guid", None)
# 若缓存非空且缓存全局唯一标识非空则查询并获取单条缓存
@ -334,8 +332,8 @@ class Request:
)
response.raise_for_status() # 若返回非2??状态码则抛出异常
# 若使用流式传输则直接返回响应对象(不缓存)
if stream_enabled:
# 若开启流式传输则直接返回响应对象(不缓存)
if kwargs.get("stream"):
return response
# 处理响应对象
@ -345,7 +343,6 @@ class Request:
self.caches.update(guid, response)
return response
# 重构异常信息
except Exception as exception:
response = getattr(