Python/utils/cloudreve.py

70 lines
2.2 KiB
Python

# -*- coding: utf-8 -*-
"""
封装 Cloudreve 客户端
"""
from email.parser import BytesParser
from email.policy import default
from email.utils import parsedate_to_datetime
from imaplib import IMAP4_SSL
import re
from time import time
from typing import Any, Dict, Optional, List
from pydantic.config import JsonEncoder
from authenticator import Authenticator
from request import Request
from base64 import b64encode
from urllib.parse import quote
class Cloudreve:
"""Cloudreve 客户端"""
def __init__(self):
# 实例化认证器
self.authenticator = Authenticator()
# 实例化请求客户端
self.http_client = Request()
def _create_upload_session(self, uri: str, size: int) -> str:
"""
创建上传会话
:param uri: 统一资源标识符
:param size: 文件大小
:return: 上传会话标识
"""
response = self.http_client.put(
url=f"https://cloudreve.liubiren.cloud/api/v4/file/upload",
headers={
"Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}",
"Content-Type": "application/json; charset=utf-8",
},
json={
"uri": quote(string=uri, safe=":/?&="), # 编码统一资源标识
"size": size,
},
)
# 若非响应成功则抛出异常
if not response["code"] == 0:
raise RuntimeError("创建上传会话发生异常")
return response["data"]["session_id"]
def _upload_file_chunk(self, session_id: str, index: int, file_chunk: bytes) -> str:
"""
上传文件块
:param session_id: 上传会话标识
:param index: 文件块索引
:return: 上传文件标识
"""
response = self.http_client.post(
url=f"https://cloudreve.liubiren.cloud/api/v4/file/upload/{session_id}/{index}",
headers={
"Authorization": f"Bearer {self.authenticator.get_token(servicer="cloudreve")}",
"Content-Type": "application/octet-stream",
"Content-Length": len(file_chunk),
},
json=open(file_path, "rb").read(),
)