70 lines
2.2 KiB
Python
70 lines
2.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
封装 Cloudreve 客户端
|
|
"""
|
|
|
|
from email.parser import BytesParser
|
|
from email.policy import default
|
|
from email.utils import parsedate_to_datetime
|
|
from imaplib import IMAP4_SSL
|
|
import re
|
|
from time import time
|
|
from typing import Any, Dict, Optional, List
|
|
|
|
from pydantic.config import JsonEncoder
|
|
|
|
from authenticator import Authenticator
|
|
from request import Request
|
|
from base64 import b64encode
|
|
from urllib.parse import quote
|
|
|
|
|
|
class Cloudreve:
|
|
"""Cloudreve 客户端"""
|
|
|
|
def __init__(self):
|
|
# 实例化认证器
|
|
self.authenticator = Authenticator()
|
|
# 实例化请求客户端
|
|
self.http_client = Request()
|
|
|
|
def _create_upload_session(self, uri: str, size: int) -> str:
|
|
"""
|
|
创建上传会话
|
|
:param uri: 统一资源标识符
|
|
:param size: 文件大小
|
|
:return: 上传会话标识
|
|
"""
|
|
response = self.http_client.put(
|
|
url=f"https://cloudreve.liubiren.cloud/api/v4/file/upload",
|
|
headers={
|
|
"Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}",
|
|
"Content-Type": "application/json; charset=utf-8",
|
|
},
|
|
json={
|
|
"uri": quote(string=uri, safe=":/?&="), # 编码统一资源标识
|
|
"size": size,
|
|
},
|
|
)
|
|
# 若非响应成功则抛出异常
|
|
if not response["code"] == 0:
|
|
raise RuntimeError("创建上传会话发生异常")
|
|
return response["data"]["session_id"]
|
|
|
|
def _upload_file_chunk(self, session_id: str, index: int, file_chunk: bytes) -> str:
|
|
"""
|
|
上传文件块
|
|
:param session_id: 上传会话标识
|
|
:param index: 文件块索引
|
|
:return: 上传文件标识
|
|
"""
|
|
response = self.http_client.post(
|
|
url=f"https://cloudreve.liubiren.cloud/api/v4/file/upload/{session_id}/{index}",
|
|
headers={
|
|
"Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}",
|
|
"Content-Type": "application/octet-stream",
|
|
"Content-Length": len(file_chunk),
|
|
},
|
|
json=open(file_path, "rb").read(),
|
|
)
|