Python/utils/client.py

1100 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# 导入模块
import hashlib
import hmac
import json
import re
import sqlite3
import threading
import time
from email.parser import BytesParser
from email.policy import default
from email.utils import parsedate_to_datetime
from functools import wraps
from imaplib import IMAP4_SSL
from pathlib import Path
from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union
from urllib.parse import quote_plus
from xml.etree import ElementTree
import pandas
from pydantic import BaseModel, Field, HttpUrl, model_validator
from requests import Response, Session
from requests.adapters import HTTPAdapter
from sqlalchemy import create_engine, text
from urllib3.util.retry import Retry
"""
封装sqlalchemy实现按照SQL查询
类和函数的区别
类作为对象的模板,定义了对象的结构和行为;而函数则作为实现特定功能或操作的代码块,提高了代码的可读性和可维护性。
使用方法:
with MySQLClient(database='data_analysis') as client:
dataframe = client.execute_query(sql='select * from {{}}')')
"""
class MySQLClient:
"""MySQL客户端"""
def __init__(
self,
database,
host: str = "cdb-7z9lzx4y.cd.tencentcdb.com",
port: int = 10039,
username: str = "root",
password: str = "Te198752",
): # 默认为刘弼仁的腾讯云MySQL数据库
# 数据库登录密码安全编码
password = quote_plus(password)
# 构建数据库连接字符
connect_config = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8"
# 创建MySQL引擎并连接数据库
self.engine = create_engine(
connect_config,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
pool_pre_ping=True,
) # 连接池中保持打开连接的数量为5额外连接数为10连接1小时后重新回收重连检查连接有效性
def __del__(self):
"""析构时自动关闭连接"""
if hasattr(self, "engine") and self.engine:
self.engine.dispose()
def execute_query(self, sql: str) -> pandas.DataFrame:
"""执行SQL查询并返回DATAFRAME"""
if not hasattr(self, "engine") or not self.engine:
raise ConnectionError("未创建数据库连接")
try:
with self.engine.connect() as connection:
dataframe = pandas.read_sql_query(
text(sql), connection, coerce_float=False
) # 不尝试将非整数数值转为浮点维持DECIMAL
return dataframe
except:
connection.rollback()
raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常")
class SQLiteClient:
"""SQLite客户端"""
def __init__(self, database: Union[str, Path]):
"""
初始化SQLite客户端
:param database: 数据库
"""
self.database = database
# 初始化本地线程存储
self.threads = threading.local()
def _connect(self):
"""为当前线程创建数据库连接和游标"""
# 检查当前线程有数据库连接,若有则继续否则创建数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
return
# 为当前线程关闭数据库连接和游标
self._disconnect()
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接
self.threads.connection = sqlite3.connect(
database=self.database,
check_same_thread=True,
timeout=30, # 数据库锁超时时间单位默认为30秒避免并发锁死
)
# 开启行映射,支持按照字段名取值
self.threads.connection.row_factory = sqlite3.Row
# 为当前线程创建数据库游标
self.threads.cursor = self.threads.connection.cursor()
except Exception as exception:
self.threads.connection = None
self.threads.cursor = None
raise RuntimeError(
f"为当前线程创建数据库连接和游标发生异常,{str(exception)}"
) from exception
def _disconnect(self) -> None:
"""为当前线程关闭数据库连接和游标"""
# 检查当前线程有数据库游标,若有则关闭数据库游标
if hasattr(self.threads, "cursor") and self.threads.cursor is not None:
# noinspection PyBroadException
try:
# 为当前线程关闭数据库游标
self.threads.cursor.close()
self.threads.cursor = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭数据库游标发生异常,{str(exception)}"
) from exception
# 检查当前线程有数据库连接,若有则关闭数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
# noinspection PyBroadException
try:
# 为当前线程提交事务
self.threads.connection.commit()
# 为当前线程关闭数据库连接
self.threads.connection.close()
self.threads.connection = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭数据库连接发生异常,{str(exception)}"
) from exception
def _query_one(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> Optional[Dict[str, Any]]:
"""
为当前线程查询并获取单行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 单行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无数据库游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建数据库游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
return (
None
if (result := self.threads.cursor.fetchone()) is None
else dict(result)
)
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool:
"""
为当前线程执行SQL
:param sql: 新增、删除和修改SQL语句
:param parameters: SQL参数
:return: 执行结果
"""
try:
self._connect()
# 检查当前线程无数据库游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建数据库游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
# 为当前线程提交事务
self.threads.connection.commit()
return True
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程执行SQL发生异常") from exception
def __enter__(self):
"""进入上下文管理时为当前线程创建数据库连接和游标"""
self._connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文管理时为当前线程关闭数据库连接和游标"""
self._disconnect()
return False
def __del__(self):
"""析构时为当前线程关闭数据库连接和游标"""
self._disconnect()
# 基于令牌桶限流算法的装饰器
def restrict(refill_rate: float = 5.0, max_tokens: int = 5):
class TokenBucket:
# noinspection PyShadowingNames
def __init__(self, max_tokens: int, refill_rate: float):
"""
初始化令牌桶限流
:param refill_rate: 令牌填充速率,单位为个/秒
:param max_tokens: 最大令牌数,单位为个
"""
# 初始化最大令牌数
self.max_tokens = max_tokens
# 初始化当前令牌数
self.tokens = self.max_tokens * 0.5
# 初始化令牌填充速率
self.refill_rate = refill_rate
# 初始化上一次填充令牌的时间戳(使用单调时间戳)
self.refill_timestamp = time.monotonic()
# 初始化线程锁(所有线程共用)
self.thread_lock = threading.Lock()
# 填充令牌
def _refill(self) -> None:
with self.thread_lock:
# 本次填充令牌的时间戳
refill_timestamp = time.monotonic()
# 重新计算令牌桶中令牌数
# noinspection PyTypeChecker
self.tokens = min(
self.max_tokens,
max(
0,
self.tokens
+ self.refill_rate * (refill_timestamp - self.refill_timestamp),
),
)
self.refill_timestamp = refill_timestamp
# 尝试消耗令牌
def consume(self) -> Tuple[bool, float]:
# 填充令牌
self._refill()
with self.thread_lock:
if self.tokens >= 1:
self.tokens -= 1
return True, 0
# 等待时长
# noinspection PyTypeChecker
wait_time = min(
1 / self.refill_rate,
max(
0,
1 / self.refill_rate
- (time.monotonic() - self.refill_timestamp),
),
)
return False, wait_time
# 初始化所有被装饰的函数创建令牌桶限流存储
buckets = {}
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
# 若当前被装饰的函数不在所有被装饰的函数创建令牌桶限流存储则为当前被装饰的函数实例化令牌桶限流
if func not in buckets:
# 初始化令牌桶限流
buckets[func] = TokenBucket(
refill_rate=refill_rate, max_tokens=max_tokens
)
bucket = buckets[func]
# 重试次数
retries = 0
while retries <= 10:
# 尝试消耗令牌
success, wait_time = bucket.consume()
# 若消耗令牌成功则返回被装饰的函数,否则等待
if success:
return func(*args, **kwargs)
time.sleep(wait_time * 2)
retries += 1
raise Exception("request too frequently")
return wrapper
return decorator
class HTTPClient:
"""请求客户端"""
class RequestException(Exception):
"""请求异常"""
def __init__(
self,
status: Optional[int] = 400,
code: int = 0,
message: str = "请求发生异常",
):
"""
:param status: 状态码
:param code: 错误码
:param message: 错误信息
"""
self.status = status
self.code = code
self.message = message
super().__init__(self.message)
def __str__(self):
return f"请求发生异常(status={self.status} code={self.code}message={self.message})"
class Parameters(BaseModel):
"""
请求参数模型,支持自动校验
"""
url: HttpUrl = Field(
default=..., description="统一资源定位符基于HttpUrl自动校验"
)
params: Optional[Dict[str, Any]] = Field(
default=None, description="统一资源定位符的查询参数"
)
headers: Optional[Dict[str, str]] = Field(default=None, description="请求头")
data: Optional[Dict[str, Any]] = Field(default=None, description="表单数据")
json_data: Optional[Dict[str, Any]] = Field(
default=None, alias="json", description="JSON数据"
)
files: Optional[
Dict[
str,
Union[
Tuple[str, bytes],
Tuple[str, bytes, str],
Tuple[str, bytes, str, Dict[str, str]],
],
]
] = Field(
default=None,
description="上传文件,{字段名: (文件名, 字节数据, 内容类型, 请求头)}",
)
stream_enabled: Optional[bool] = Field(default=None, description="使用流式传输")
guid: Optional[str] = Field(default=None, description="缓存全局唯一标识")
@model_validator(mode="after")
def validate_data(self):
"""校验表单数据和JSON数据互斥"""
if self.data is not None and self.json_data is not None:
raise ValueError("表单数据和JSON数据不能同时使用")
return self
@model_validator(mode="after")
def validate_files(self):
if self.files is not None and self.stream_enabled:
raise ValueError("上传文件和使用流式传输不能同时使用")
return self
class CacheClient(SQLiteClient):
"""缓存客户端"""
def __init__(self, cache_ttl: int):
"""
初始化缓存数据库
:param cache_ttl: 缓存生存时间,单位为秒
"""
# 初始化SQLite客户端
super().__init__(database=Path(__file__).parent.resolve() / "caches.db")
# 初始化缓存生存时间,单位为秒
self.cache_ttl = cache_ttl
# 初始化缓存表和时间戳索引(不清理过期缓存)
try:
with self:
self._execute(
sql="""CREATE TABLE IF NOT EXISTS caches (guid TEXT PRIMARY KEY, cache TEXT NOT NULL, timestamp REAL NOT NULL)"""
)
self._execute(
sql="""CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)"""
)
except Exception as exception:
raise RuntimeError(
f"初始化缓存数据库发生异常:{str(exception)}"
) from exception
# noinspection PyShadowingNames
def query(self, guid: str) -> Optional[Dict[str, Any]]:
"""
查询并获取单条缓存
:param guid: 缓存唯一标识
:return: 缓存
"""
# noinspection PyBroadException
try:
with self:
result = self._query_one(
sql="SELECT cache FROM caches WHERE guid = ? AND timestamp >= ?",
parameters=(guid, time.time() - self.cache_ttl),
)
return (
None if result is None else json.loads(result["cache"])
) # 就缓存JSON反序列化
except Exception as exception:
raise RuntimeError("查询并获取单条缓存发生异常") from exception
# noinspection PyShadowingNames
def update(self, guid: str, cache: Dict) -> Optional[bool]:
"""
新增或更新缓存(若无则新增缓存,若有则更新缓存)
:param guid: 缓存唯一标识
:param cache: 缓存
:return: 成功返回True失败返回False
"""
# noinspection PyBroadException
try:
with self:
return self._execute(
sql="INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?)",
parameters=(
guid,
json.dumps(cache, ensure_ascii=False),
time.time(),
),
)
except Exception as exception:
raise RuntimeError("新增或更新缓存发生异常") from exception
def __init__(
self,
default_headers: Optional[Dict[str, str]] = None,
total: int = 3,
backoff_factor: float = 0.5,
timeout: int = 60,
cache_enabled: bool = False,
cache_ttl: int = 360,
):
"""
:param default_headers: 默认请求头
:param total: 最大重试次数
:param backoff_factor: 重试间隔退避因子
:param timeout: 超时时间,单位为秒
:param cache_enabled: 使用缓存
:param cache_ttl: 缓存生存时间,单位为天
"""
# 创建请求会话并挂载适配器
self.session = self._create_session(
default_headers=default_headers, total=total, backoff_factor=backoff_factor
)
# 初始化超时时间
self.timeout = timeout
# 初始化使用缓存
self.cache_enabled = cache_enabled
# 初始化缓存生存时间,单位由天转为秒
self.cache_ttl = cache_ttl * 86400
self.cache_client: Optional[HTTPClient.CacheClient] = None
# 若使用缓存则实例化缓存客户端
if self.cache_enabled:
# 初始化缓存客户端
self.cache_client = self.CacheClient(cache_ttl=self.cache_ttl)
def __del__(self):
"""析构时关闭请求会话"""
if hasattr(self, "session") and self.session:
self.session.close()
@staticmethod
def _create_session(
total: int,
backoff_factor: float,
default_headers: Optional[Dict[str, str]] = None,
) -> Session:
"""
创建请求会话并挂载适配器
:param default_headers: 默认请求头
:param total: 最大重试次数
:param backoff_factor: 重试间隔退避因子
:return Session: 请求会话实例
"""
# 实例化请求会话
session = Session()
# 设置默认请求头
if default_headers:
session.headers.update(default_headers)
# 设置重试策略并挂载适配器
adapter = HTTPAdapter(
max_retries=Retry(
allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "PATCH"],
status_forcelist=[
408,
502,
503,
504,
], # 408为请求超时502为网关错误503为服务不可用504为网关超时
total=total,
respect_retry_after_header=True,
backoff_factor=backoff_factor,
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def get(
self, **kwargs
) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]:
"""发送GET请求"""
return self._request(method="GET", parameters=self.Parameters(**kwargs))
def post(
self, **kwargs
) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]:
"""发送POST请求"""
return self._request(method="POST", parameters=self.Parameters(**kwargs))
def download(
self, stream_enabled: bool = False, chunk_size: int = 1024, **kwargs
) -> Union[
str,
Tuple[str, bytes],
Dict[str, Any],
ElementTree.Element,
Generator[bytes, None, None],
None,
]:
"""
下载文件
:param stream_enabled: 使用流式传输
:param chunk_size: 流式传输的分块大小
"""
response = self._request(
method="GET",
parameters=self.Parameters(**{"stream_enabled": stream_enabled, **kwargs}),
)
# 若使用流式传输则处理流式传输响应
if stream_enabled:
return self._process_stream_response(
response=response, chunk_size=chunk_size
)
return response
def _request(
self, method: Literal["GET", "POST"], parameters: Parameters
) -> Union[
str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, Response, None
]:
"""请求"""
# 将请求参数模型转为请求参数字典
parameters = parameters.model_dump(exclude_none=True, by_alias=True)
# 将URL由HttpUrl对象转为字符串
parameters["url"] = str(parameters["url"])
# 过滤表单数据中None值
if parameters.get("data") is not None:
parameters["data"] = {
k: v for k, v in parameters["data"].items() if v is not None
}
# 过滤JSON数据中None值
if parameters.get("json_data") is not None:
parameters["json_data"] = {
k: v for k, v in parameters["json_data"].items() if v is not None
}
# 使用流式传输
stream_enabled = parameters.pop("stream_enabled", False)
# 缓存全局唯一标识
guid = parameters.pop("guid", None)
# 若使用缓存且缓存全局唯一标识非空则查询并获取单条缓存
if self.cache_enabled and guid is not None:
cache = self.cache_client.query(guid)
if cache is not None:
return cache
# 发送请求并处理响应
# noinspection PyBroadException
try:
response = self.session.request(
method=method, timeout=self.timeout, **parameters
)
response.raise_for_status() # 若返回非2??状态码则抛出异常
# 若使用流式传输则直接返回(不缓存)
if stream_enabled:
return response
# 处理响应
response = self._process_response(response=response)
# 若使用缓存且缓存全局唯一标识非空则新增或更新缓存
if self.cache_enabled and guid is not None:
self.cache_client.update(guid, response)
return response
except Exception as exception:
# noinspection PyBroadException
try:
response = getattr(exception, "response", None)
status = (
response.json().get("status", response.status_code)
if response is not None
else None
)
message = (
response.json().get("message", str(exception).splitlines()[0])
if response is not None
else str(exception).splitlines()[0]
)
except Exception:
status = None
message = f"{method} {parameters["url"]} 请求发生异常:{str(exception).splitlines()[0]}"
raise self.RequestException(status=status, message=message) from exception
# 处理响应
@staticmethod
def _process_response(
response: Response,
) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]:
# 若响应内容为空则返回None
content = response.content
if not content:
return None
# 响应类型
_type = response.headers.get("Content-Type", "").split(";")[0].strip().lower()
# 根据响应类型匹配响应内容解析方法并返回
# noinspection PyUnreachableCode
match _type:
# JSONJSON反序列化
case "application/json" | "text/json":
return response.json()
# XML解析为XML对象Element实例
case "application/xml" | "text/xml":
return ElementTree.fromstring(content)
# 以image/开头:返回影像件格式和响应内容
case _ if _type.startswith("image/"):
# 影像件格式
image_format = _type.split(sep="/", maxsplit=1)[1]
return image_format, content
# 其它的响应类型先UTF8解码再返回若解码发生异常则直接返回
case _:
try:
return content.decode("utf-8")
except UnicodeDecodeError:
return content
# 处理流式传输响应
@staticmethod
def _process_stream_response(
response: Response, chunk_size: int
) -> Generator[bytes, None, None]:
"""
处理流式响应
:param response: requests.Response对象
:param chunk_size: 分块大小
:return: 字节数据块生成器
"""
if not isinstance(chunk_size, int):
raise ValueError("分块大小数据类型必须为整数")
if chunk_size <= 0:
raise ValueError("分块大小必须大于0")
try:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
yield chunk
finally:
response.close()
class Authenticator:
def __init__(
self,
):
"""认证器(用于获取访问令牌)"""
# 初始化
self._initialize()
def _initialize(self):
"""初始化"""
# 初始化访问凭证地址对象
self.certifications_path = (
Path(__file__).parent.resolve() / "certifications.json"
)
# 若访问凭证地址对象不存在则创建
if not self.certifications_path.exists():
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
{},
file,
ensure_ascii=False,
)
# 初始化请求客户端
self.http_client = HTTPClient()
def _szkt_get_certification(self) -> tuple[str, float]:
"""获取深圳快瞳访问凭证"""
response = self.http_client.get(
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
)
# 若非响应成功则抛出异常
if not (response["status"] == 200 and response["code"] == 0):
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
# 返回访问令牌、失效时间戳
# noinspection PyTypeChecker
return (
response["data"]["access_token"],
time.time() + response["data"]["expires_in"],
)
def _hlyj_get_certification(self) -> Tuple[str, float]:
"""获取合力亿捷访问凭证"""
# 企业访问标识
access_key_id = "25938f1c190448829dbdb5d344231e42"
# 签名秘钥
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
# 时间戳(秒级)
timestamp = int(time.time())
# 签名企业访问标识、签名秘钥和时间戳拼接后计算的十六进制的HMAC-SHA256
signature = hmac.new(
secret_access_key.encode("utf-8"),
f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"),
hashlib.sha256,
).hexdigest()
response = self.http_client.get(
url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}&timestamp={timestamp}&signature={signature}"
)
# 若非响应成功则抛出异常
if not response["success"]:
raise RuntimeError("获取合力亿捷访问凭证发生异常")
# 返回访问令牌、失效时间戳
# noinspection PyTypeChecker
return (
response["data"],
time.time() + 1 * 60 * 60, # 访问令牌有效期为1小时
)
def _feishu_get_certification(self) -> tuple[str, float]:
"""获取飞书访问凭证"""
response = self.http_client.post(
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
data={
"app_id": "cli_a1587980be78500c",
"app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA",
},
)
# 若非响应成功则抛出异常
if not response["code"] == 0:
raise RuntimeError("获取飞书访问凭证发生异常")
# 返回访问令牌、失效时间戳
# noinspection PyTypeChecker
return (
response["tenant_access_token"],
time.time() + response["expire"],
)
def get_token(self, servicer: str) -> str | None:
"""
获取访问令牌
:param servicer: 服务商,暂仅支持深圳快瞳、合力亿捷和飞书
:return token: 访问令牌
"""
with threading.Lock():
# 初始化访问令牌和失效时间戳
token, expired_timestamp = None, 0
try:
with open(self.certifications_path, "r", encoding="utf-8") as file:
# 本地打开并读取所有服务商的访问凭证
certifications = json.load(file)
# 获取指定服务商的访问凭证
certification = certifications.get(servicer, None)
# 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳
if certification is not None:
# 访问令牌
token = certification["token"]
# 失效时间戳
expired_timestamp = certification["expired_timestamp"]
# 若反序列化发生异常则重置访问凭证储存文件
except json.decoder.JSONDecodeError:
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
{},
file,
ensure_ascii=False,
)
except Exception:
raise RuntimeError("获取访问令牌发生异常")
if time.time() > expired_timestamp:
# noinspection PyUnreachableCode
match servicer:
case "szkt":
token, expired_timestamp = self._szkt_get_certification()
case "hlyj":
token, expired_timestamp = self._hlyj_get_certification()
case "feishu":
token, expired_timestamp = self._feishu_get_certification()
case _:
raise RuntimeError(f"未设置服务商:{servicer}获取访问凭证方法")
# 更新服务商访问凭证
certifications[servicer] = {
"token": token,
"expired_timestamp": expired_timestamp,
}
# 将所有服务商访问凭证保存至本地文件
with open(self.certifications_path, "w", encoding="utf-8") as file:
json.dump(
certifications,
file,
ensure_ascii=False,
)
return token
class FeishuClient:
"""飞书客户端"""
def __init__(self):
self.authenticator = Authenticator()
self.http_client = HTTPClient()
def _headers(self):
"""请求头"""
# 装配飞书访问凭证
return {
"Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}",
}
@staticmethod
def get_verification_code():
try:
# 执行时间戳
execute_timestamp = time.time()
# 超时时间戳
timeout_timestamp = execute_timestamp + 65
# 建立加密IMAP连接
server = IMAP4_SSL("imap.feishu.cn", 993)
# 登录
server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2")
while True:
# 若当前时间戳大于超时时间戳则返回NONE
if time.time() <= timeout_timestamp:
# 等待10秒
time.sleep(10)
# 选择文件夹(邮箱验证码)
server.select("&kK57sZqMi8F4AQ-")
# noinspection PyBroadException
try:
# 获取最后一封邮件索引server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引
index = server.search(None, "ALL")[1][0].split()[-1]
# 获取最后一封邮件内容并解析server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文
# noinspection PyUnresolvedReferences
contents = BytesParser(policy=default).parsebytes(
server.fetch(index, "(RFC822)")[1][0][1]
)
# 遍历邮件内容若正文内容类型为纯文本或HTML则解析发送时间和验证码
for content in contents.walk():
if (
content.get_content_type() == "text/plain"
or content.get_content_type() == "text/html"
):
# 邮件发送时间戳
# noinspection PyUnresolvedReferences
send_timestamp = parsedate_to_datetime(
content["Date"]
).timestamp()
# 若邮件发送时间戳大于执行时间戳则解析验证码并返回
if (
execute_timestamp
> send_timestamp
>= execute_timestamp - 35
):
# 登出
server.logout()
# 解析验证码
return re.search(
r"【普康健康】您的验证码是:(\d+)",
content.get_payload(decode=True).decode(),
).group(1)
# 若文件夹无邮件则继续
except:
pass
# 若超时则登出
else:
server.logout()
return None
except Exception:
raise RuntimeError("获取邮箱验证码发生其它异常")
# 查询多维表格记录单次最多查询500条记录
@restrict(refill_rate=5, max_tokens=5)
def query_bitable_records(
self,
bitable: str,
table_id: str,
field_names: Optional[list[str]] = None,
filter_conditions: Optional[dict] = None,
) -> pandas.DataFrame:
# 先查询多维表格记录,在根据字段解析记录
# 装配多维表格查询记录地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20"
response = self.http_client.post(
url=url,
headers=self._headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
# 响应业务码为0则定义为响应成功
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 多维表格记录
records = response.get("data").get("items")
# 检查响应中是否包含还有下一页标识,若有则继续请求下一页
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.post(
url=url_next,
headers=self._headers(),
json={"field_names": field_names, "filter": filter_conditions},
)
assert response.get("code") == 0, "查询多维表格记录发生异常"
# 合并记录
records.append(response.get("data").get("items"))
# 装配多维表格列出字段地址
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20"
response = self.http_client.get(
url=url,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
# 多维表格字段
fields = response.get("data").get("items")
while response.get("data").get("has_more"):
url_next = url + "&page_token={}".format(
response.get("data").get("page_token")
)
response = self.http_client.get(
url=url_next,
headers=self._headers(),
)
assert response.get("code") == 0, "列出多维表格字段发生异常"
fields.append(response.get("data").get("items"))
# 字段映射
field_mappings = {}
for field in fields:
# 字段名
field_name = field["field_name"]
# 根据字段类型匹配
match field["type"]:
case 1005:
field_type = "主键"
case 1:
field_type = "文本"
case 3:
field_type = "单选"
case 2:
# 数字、公式字段的显示格式
match field["property"]["formatter"]:
case "0":
field_type = "整数"
case _:
raise ValueError("未设置数字、公式字段的显示格式")
case _:
raise ValueError("未设置字段类型")
# noinspection PyUnboundLocalVariable
field_mappings.update({field_name: field_type})
# 记录数据体
records_data = []
# 解析记录
for record in records:
# 单条记录数据体
record_data = {}
for field_name, content in record["fields"].items():
match field_mappings[field_name]:
case "主键" | "单选" | "整数":
record_data.update({field_name: content})
case "文本":
# 若存在多行文本则拼接
fragments_content = ""
for fragment_content in content:
fragments_content += fragment_content["text"]
record_data.update({field_name: fragments_content})
case _:
raise ValueError("未设置字段解析方法")
records_data.append(record_data)
return pandas.DataFrame(records_data)