This commit is contained in:
liubiren 2026-03-19 21:53:38 +08:00
parent 0f8c56e72a
commit 0f37c81377
4 changed files with 246 additions and 239 deletions

View File

@ -70,7 +70,7 @@ class Authenticator:
) # 指定服务商的访问令牌和失效时间戳
if time.time() > expired_timestamp:
match servicer:
# 刷新深圳快瞳访问凭证
# 刷新深圳快瞳访问凭证cd C:\Python\.venv\Scripts
case "inspirvision":
token, expired_timestamp = (
self._refresh_inspirvision_certification()
@ -155,7 +155,7 @@ class Authenticator:
"""
response = self.request.post(
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
data={
json={
"app_id": "cli_a1587980be78500c",
"app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA",
},

View File

@ -1 +1 @@
{"feishu": ["t-g1043icTZGZGOXVHKENQWLCOHTRRBNSQEYJLOVNT", 1773816955.118503]}
{"feishu": ["t-g1043jkq4UBLA2TLB6HFDIUPFXIQBTCHUMZ5XPYX", 1773930401.8437002]}

View File

@ -8,13 +8,12 @@ from email.policy import default
from email.utils import parsedate_to_datetime
from imaplib import IMAP4_SSL
import re
from time import sleep, time
from typing import Any, Dict
import pandas
from time import time
from typing import Any, Dict, Optional, List
from authenticator import Authenticator
from request import Request
from base64 import b64encode
class Feishu:
@ -26,220 +25,220 @@ class Feishu:
# 实例化请求客户端
self.http_client = Request()
def _get_headers(self) -> Dict[str, Any]:
"""获取请求头"""
# 构建身份认证请求头
return {
"Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}",
}
@staticmethod
def get_verification_code():
try:
# 当前时间戳
current_timestamp = time()
def get_mail_verification_code(
folder: str, regular_expression: str
) -> Optional[str]:
"""
根据邮箱文件夹名获取邮箱验证码
:param folder: 邮箱文件夹名
:param regular_expression: 正则表达式
:return: 邮箱验证码
"""
"""
使用示例
feishu = Feishu()
print(feishu.get_mail_verification_code(folder="邮箱验证码", regular_expression=r"【普康健康】您的验证码是:(\\d+)"))
输出123456
"""
if not folder:
raise RuntimeError("邮箱文件夹名不能为空")
# 若邮箱文件夹名不可ASCII编码例如包含中文则按照 IMAP 协议支持的字符串
try:
folder.encode("ascii")
except UnicodeEncodeError:
folder = f"&{b64encode(folder.encode(encoding="utf-16be")).decode(encoding="ascii").replace("/", ",").replace("+", "-").rstrip("=")}-"
try:
# 建立加密IMAP连接
connection = IMAP4_SSL(host="imap.feishu.cn", port=993)
# 登录
connection.login(user="mars@liubiren.cloud", password="aJBZSZzhQN13M11K")
except Exception as exception:
raise RuntimeError(f"登录邮箱发生异常:{str(exception)}")
while True:
if time() <= current_timestamp + 120:
sleep(5)
# 选择邮箱文件夹(邮箱验证码)
connection.select(mailbox="&kK57sZqMi8F4AQ-")
try:
# 获取最后一封邮件索引server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引
index = server.search(None, "ALL")[1][0].split()[-1]
# 获取最后一封邮件内容并解析server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文
# noinspection PyUnresolvedReferences
contents = BytesParser(policy=default).parsebytes(
server.fetch(index, "(RFC822)")[1][0][1]
# 开始时间戳(秒级,若无特殊说明时间戳均为秒级)
start_timestamp = time()
# 上一次查询时间戳
last_timestamp = 0
while True:
# 当前时间戳
current_timestamp = time()
# 若当前时间戳大于超时时间戳则登出并返回空
if current_timestamp > start_timestamp + 120:
connection.logout()
return None
# 若当前时间戳和上一次查询时间戳间隔小于5秒则跳转至下一次循环
if current_timestamp - last_timestamp < 5:
continue
last_timestamp = current_timestamp
# 选择邮箱文件夹
connection.select(mailbox=folder)
# 查询该邮箱文件夹内所有邮件
status, indices = connection.search(
"utf-8", "ALL"
) # search()返回形如 ('OK', [b'1 2 3 4 5']) 元组。其中,第一个元素为查询状态,第二个元素为查询结果(邮件索引的字节串列表)
# 若查询状态非成功则登出并抛出异常
if status != "OK":
connection.logout()
raise RuntimeError(f"查询邮箱文件夹内所有邮件失败")
# 拼接所有邮件索引并拆分为邮件索引列表
indices = b" ".join(indices).split()
# 若邮件索引列表为空则跳转至下一次循环
if not indices:
continue
# 查询该邮箱文件夹内最后一封邮件完整原始内容
status, contents = connection.fetch(indices[-1].decode("utf-8"), "(RFC822)")
# 若查询状态非成功则登出并抛出异常
if status != "OK":
connection.logout()
raise RuntimeError(f"查询邮箱文件夹内最后一封邮件完整原始内容失败")
contents = b"".join(
item[1] if isinstance(item, tuple) else item
for item in contents
if isinstance(item, (bytes, tuple))
and not (isinstance(item, bytes) and item == b")")
) # IMAP 协议中约定邮件完整原始内容中,第一个元素为二元组,形如 (b'5 (RFC822 {1234}', b'邮件字节') (第一个元素为元数据,第二个元素为第一部分内容),最后一个元素为结束符 b')'
# 若邮件内容为空则跳转至下一次循环
if not contents:
continue
# 解析邮件内容
contents = BytesParser(policy=default).parsebytes(text=contents)
# 邮件发送时间戳
send_timestamp = parsedate_to_datetime(contents["Date"]).timestamp()
# 若邮件发送时间戳小于开始时间戳减去冗余近N秒则跳转至下一次循环
if send_timestamp < start_timestamp - 300:
continue
for content in contents.walk():
# 若内容类型非 text/html 或 text/plain 则跳转至下一次循环
if content.get_content_type() not in ["text/plain", "text/html"]:
continue
# 获取内容载荷
payload = content.get_payload(decode=True)
match payload:
# 若内容载荷类型为字节串则解码
case _ if isinstance(payload, bytes):
payload = payload.decode(
encoding=content.get_content_charset() or "utf-8",
errors="replace",
)
# 遍历邮件内容若正文内容类型为纯文本或HTML则解析发送时间和验证码
for content in contents.walk():
if (
content.get_content_type() == "text/plain"
or content.get_content_type() == "text/html"
):
# 邮件发送时间戳
# noinspection PyUnresolvedReferences
send_timestamp = parsedate_to_datetime(
content["Date"]
).timestamp()
# 若邮件发送时间戳大于执行时间戳则解析验证码并返回
if (
execute_timestamp
> send_timestamp
>= execute_timestamp - 35
):
# 登出
server.logout()
# 解析验证码
return re.search(
r"【普康健康】您的验证码是:(\d+)",
content.get_payload(decode=True).decode(),
).group(1)
# 若文件夹无邮件则继续
except:
pass
# 若超时则登出
else:
server.logout()
return None
except Exception:
raise RuntimeError("获取邮箱验证码发生其它异常")
# 查询多维表格记录单次最多查询500条记录
@restrict(refill_rate=5, max_tokens=5)
def query_bitable_records(
self,
bitable: str,
table_id: str,
field_names: Optional[list[str]] = None,
filter_conditions: Optional[dict] = None,
) -> pandas.DataFrame:
# 先查询多维表格记录,在根据字段解析记录
# 装配多维表格查询记录地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20"
response = self.http_client.post(
url=url,
headers=self._headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
# 响应业务码为0则定义为响应成功
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 多维表格记录
records = response.get("data").get("items")
# 检查响应中是否包含还有下一页标识,若有则继续请求下一页
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.post(
url=url_next,
headers=self._get_headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 合并记录
records.append(response.get("data").get("items"))
# 装配多维表格列出字段地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20"
response = self.http_client.get(
url=url,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
# 多维表格字段
fields = response.get("data").get("items")
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.get(
url=url_next,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
fields.append(response.get("data").get("items"))
# 字段映射
field_mappings = {}
for field in fields:
# 字段名
field_name = field["field_name"]
# 根据字段类型匹配
match field["type"]:
case 1005:
field_type = "主键"
case 1:
field_type = "文本"
case 3:
field_type = "单选"
case 2:
# 数字、公式字段的显示格式
match field["property"]["formatter"]:
case "0":
field_type = "整数"
case _:
raise ValueError("未设置数字、公式字段的显示格式")
case _:
raise ValueError("未设置字段类型")
# noinspection PyUnboundLocalVariable
field_mappings.update({field_name: field_type})
# 记录数据体
records_data = []
# 解析记录
for record in records:
# 单条记录数据体
record_data = {}
for field_name, content in record["fields"].items():
match field_mappings[field_name]:
case "主键" | "单选" | "整数":
record_data.update({field_name: content})
case "文本":
# 若存在多行文本则拼接
fragments_content = ""
for fragment_content in content:
fragments_content += fragment_content["text"]
record_data.update({field_name: fragments_content})
case _ if isinstance(payload, str):
payload = payload
# 若内容为空则跳转至下一次循环
case _:
continue
raise ValueError("未设置字段解析方法")
matched = re.match(
pattern=regular_expression,
string=payload,
)
# 若未匹配到验证码则跳转至下一次循环
if not matched:
continue
records_data.append(record_data)
connection.logout()
return matched.group(1)
return pandas.DataFrame(records_data)
def _get_headers(self) -> Dict[str, Any]:
"""
获取请求头
:return: 请求头
"""
return {
"Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}",
}
# 获取多维表格记录
def get_bitable_records(
self,
app_token: str,
table_id: str,
) -> List[Dict[str, Any]]:
"""
获取多维表格记录
:param app_token: 多维表格应用标识
:param table_id: 多维表格数据表标识
:return: 多维表格记录
"""
# 构建多维表格查询记录的请求地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search" # https://open.feishu.cn/document/docs/bitable-v1/app-table-record/search 默认分页大小为 20
headers = self._get_headers()
# 添加 Content-Type 请求头
headers.update(
{
"Content-Type": "application/json; charset=utf-8",
}
)
records = [] # 用于临时保存多维表格记录
page_token = None # 分页标识
while True:
response = self.http_client.post(
url=(f"{url}?&page_token={page_token}" if page_token else url),
headers=headers,
)
# 若响应错误代码非0则抛出异常
if response["code"] != 0:
raise RuntimeError(
f"请求多维表格查询记录失败:{response['code']} {response['message']}"
)
# 合并多维表格记录
records.extend(response["data"]["items"])
# 若响应没有更多记录则跳出循环
if not response["data"]["has_more"]:
break
# 更新分页标识
page_token = response["data"]["page_token"]
return records
def download_material(self, file_token: str, stream_enabled: bool = False) -> str:
"""
下载素材
:param file_token: 素材标识
:param stream_enabled: 使用流式传输默认 False
:return: 素材 base64 编码的字符串
"""
# 构建下载素材的请求地址
url = f"https://open.feishu.cn/open-apis/drive/v1/medias/{file_token}/download" # https://open.feishu.cn/document/server-docs/docs/drive-v1/media/download
headers = self._get_headers()
# 添加 Content-Type 请求头
headers.update(
{
"Content-Type": "application/json; charset=utf-8",
}
)
response = self.http_client.download(
url=url,
headers=headers,
stream_enabled=stream_enabled,
)
print(response)
a = Feishu()
print(
a.download_material(
file_token="DsG4bY3iKo0n6Bx6O5fcKAbnnCh",
)
)

View File

@ -4,10 +4,10 @@
"""
import json
from pathlib import Path
import sys
import time
from pathlib import Path
from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union
from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union
from xml.etree import ElementTree
from pydantic import BaseModel, Field, HttpUrl, model_validator
@ -15,7 +15,7 @@ from requests import Response, Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
sys.path.append(Path(__file__).parent.as_posix())
from restrict import restrict
from sqlite import SQLite
@ -29,9 +29,9 @@ class Parameters(BaseModel):
default=None, description="统一资源定位符的查询参数"
)
headers: Optional[Dict[str, str]] = Field(default=None, description="请求头")
data: Optional[Dict[str, Any]] = Field(default=None, description="表单数据")
form: Optional[Dict[str, Any]] = Field(default=None, description="表单数据")
json_: Optional[Dict[str, Any]] = Field(
default=None, alias="json", description="JSON数据"
default=None, alias="json", description="JSON 数据"
)
files: Optional[
Dict[
@ -52,13 +52,13 @@ class Parameters(BaseModel):
@model_validator(mode="after")
def validate_data(self):
"""校验表单数据和JSON数据互斥"""
if self.data is not None and self.json_ is not None:
raise ValueError("表单数据和JSON数据不能同时使用")
if self.form and self.json_:
raise ValueError("表单数据和 JSON 数据不能同时使用")
return self
@model_validator(mode="after")
def validate_files(self):
if self.files is not None and self.stream_enabled:
if self.files and self.stream_enabled:
raise ValueError("上传文件和使用流式传输不能同时使用")
return self
@ -199,6 +199,7 @@ class Request:
)
# 初始化超时时间
self.timeout = timeout
# 实例化缓存
self.caches = Caches(cache_ttl=cache_ttl * 86400) if cache_enabled else None
@ -256,6 +257,10 @@ class Request:
:param kwargs: 请求参数
:return: 响应内容
"""
# 若表单数据和 JSON 数据同时为空则重构 JSON数据
if not kwargs.get("form") and not kwargs.get("json"):
kwargs["json"] = {}
return self._request(method="POST", parameters=Parameters(**kwargs))
def download(
@ -279,6 +284,7 @@ class Request:
)
return response
@restrict(max_tokens=5, refill_rate=5.0)
def _request(self, method: Literal["GET", "POST"], parameters: Parameters) -> Any:
"""
请求
@ -293,10 +299,10 @@ class Request:
url = str(kwargs.pop("url"))
# 过滤表单数据中空值
if kwargs.get("data"):
kwargs["data"] = {k: v for k, v in kwargs["data"].items() if v}
if kwargs.get("form"):
kwargs["form"] = {k: v for k, v in kwargs["form"].items() if v}
# 过滤JSON数据中空值
# 过滤 JSON 数据中空
if kwargs.get("json"):
kwargs["json"] = {k: v for k, v in kwargs["json"].items() if v}
@ -332,22 +338,24 @@ class Request:
# 重构异常信息
except Exception as exception:
try:
response = getattr(exception, "response", None)
status = (
response.json().get("status", response.status_code)
if response
else None
)
message = (
response.json().get("message", response.text)
if response
else str(exception).splitlines()[0]
)
except Exception:
status = None
message = f"{method} {kwargs["url"]} 请求发生异常:{str(exception).splitlines()[0]}"
return RequestException(status=status, message=message).__dict__
response = getattr(
exception, "response", None
) # 若非 requests 异常或服务未响应则为 None
if response is not None: # 注意 if <Response [400]> 返回 False
# 获取响应状态码
status = response.status_code
# 尝试将异常信息解析为 JSON若非
try:
response_json = response.json()
# 错误代码
code = response_json.get("code", 999999)
# 错误信息
message = response_json.get("msg", response.text)
except Exception:
code, message = 999999, response.text
else:
status, code, message = 400, 999999, f"{str(exception)}"
return RequestException(status=status, code=code, message=message).__dict__
@staticmethod
def _process_response(