993 lines
33 KiB
Python
993 lines
33 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
# 导入模块
|
||
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import re
|
||
import sqlite3
|
||
import threading
|
||
import time
|
||
from email.parser import BytesParser
|
||
from email.policy import default
|
||
from email.utils import parsedate_to_datetime
|
||
from functools import wraps
|
||
from imaplib import IMAP4_SSL
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union
|
||
from urllib.parse import quote_plus
|
||
from xml.etree import ElementTree
|
||
|
||
import pandas
|
||
from pydantic import BaseModel, Field, HttpUrl, model_validator
|
||
from requests import Response, Session
|
||
from requests.adapters import HTTPAdapter
|
||
from sqlalchemy import create_engine, text
|
||
from urllib3.util.retry import Retry
|
||
|
||
|
||
"""
|
||
封装sqlalchemy,实现按照SQL查询
|
||
类和函数的区别
|
||
类作为对象的模板,定义了对象的结构和行为;而函数则作为实现特定功能或操作的代码块,提高了代码的可读性和可维护性。
|
||
使用方法:
|
||
with MySQLClient(database='data_analysis') as client:
|
||
dataframe = client.execute_query(sql='select * from {{}}')')
|
||
"""
|
||
|
||
|
||
class MySQLClient:
|
||
"""MySQL客户端"""
|
||
|
||
def __init__(
|
||
self,
|
||
database,
|
||
host: str = "cdb-7z9lzx4y.cd.tencentcdb.com",
|
||
port: int = 10039,
|
||
username: str = "root",
|
||
password: str = "Te198752",
|
||
): # 默认为刘弼仁的腾讯云MySQL数据库
|
||
|
||
# 数据库登录密码安全编码
|
||
password = quote_plus(password)
|
||
|
||
# 构建数据库连接字符
|
||
connect_config = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8"
|
||
|
||
# 创建MySQL引擎并连接数据库
|
||
self.engine = create_engine(
|
||
connect_config,
|
||
pool_size=5,
|
||
max_overflow=10,
|
||
pool_recycle=3600,
|
||
pool_pre_ping=True,
|
||
) # 连接池中保持打开连接的数量为5,额外连接数为10,连接1小时后重新回收重连,检查连接有效性
|
||
|
||
def __del__(self):
|
||
"""析构时自动关闭连接"""
|
||
|
||
if hasattr(self, "engine") and self.engine:
|
||
self.engine.dispose()
|
||
|
||
def execute_query(self, sql: str) -> pandas.DataFrame:
|
||
"""执行SQL查询并返回DATAFRAME"""
|
||
|
||
if not hasattr(self, "engine") or not self.engine:
|
||
raise ConnectionError("未创建数据库连接")
|
||
|
||
try:
|
||
with self.engine.connect() as connection:
|
||
dataframe = pandas.read_sql_query(
|
||
text(sql), connection, coerce_float=False
|
||
) # 不尝试将非整数数值转为浮点(维持DECIMAL)
|
||
return dataframe
|
||
|
||
except:
|
||
connection.rollback()
|
||
raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常")
|
||
|
||
|
||
"""
|
||
封装urllib.request的相关操作,实现常用HTTPREQUEST
|
||
使用方法:
|
||
client = HTTPClient()
|
||
response = clinet.post(url)
|
||
"""
|
||
|
||
|
||
class TokenBucket:
|
||
|
||
def __init__(self, refill_rate, max_tokens):
|
||
"""令牌桶,基于令牌桶算法限制请求频率"""
|
||
|
||
# 填充令牌速率(个/秒)
|
||
self.refill_rate = refill_rate
|
||
# 令牌桶最大令牌数
|
||
self.max_tokens = max_tokens
|
||
# 令牌桶当前令牌数
|
||
self.tokens = max_tokens
|
||
# 上一次填充令牌时间戳(使用单调递增时间,单位为秒)
|
||
self.refill_timestamp = time.monotonic()
|
||
|
||
# 获取令牌
|
||
# noinspection PyMissingReturnStatement
|
||
def acquire(self) -> tuple[bool, float]:
|
||
|
||
with threading.Lock():
|
||
# 本次填充令牌时间戳
|
||
refill_timestamp = time.monotonic()
|
||
|
||
# 重新计算令牌桶中令牌数
|
||
self.tokens = min(
|
||
self.max_tokens,
|
||
self.tokens
|
||
+ self.refill_rate * (refill_timestamp - self.refill_timestamp),
|
||
)
|
||
|
||
self.refill_timestamp = refill_timestamp
|
||
|
||
# 若令牌桶当前令牌数大于等于1则减少令牌
|
||
if self.tokens >= 1:
|
||
self.tokens -= 1
|
||
return True, 0.0
|
||
|
||
# 同时返回等待时间
|
||
return False, 0.2
|
||
|
||
|
||
# 将令牌桶以装饰函数封装为请求频率限制方法
|
||
def restrict(refill_rate=5, max_tokens=5):
|
||
|
||
def decorator(func):
|
||
|
||
# 初始化令牌桶
|
||
token_bucket = TokenBucket(refill_rate=refill_rate, max_tokens=max_tokens)
|
||
|
||
@wraps(func)
|
||
def wrapper(*args, **kwargs):
|
||
|
||
# 重试次数
|
||
retries = 0
|
||
|
||
# 若重试数小于等于最大重试次数,则循环检查是否允许请求
|
||
while retries <= 10:
|
||
|
||
success, wait_time = token_bucket.acquire()
|
||
|
||
# 若允许请求则返回嵌套函数,若不允许请求则等待
|
||
if success:
|
||
|
||
return func(*args, **kwargs)
|
||
|
||
time.sleep(wait_time * 1.5**retries)
|
||
|
||
retries += 1
|
||
|
||
raise Exception("request too frequently")
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
class RequestException(Exception):
|
||
"""请求异常"""
|
||
|
||
def __init__(
|
||
self, status: int = 400, code: int = 0, message: str = "request failed"
|
||
):
|
||
"""
|
||
:param status: 状态编码,默认为0
|
||
:param message: 错误信息,默认为RequestException
|
||
"""
|
||
self.status = status
|
||
self.code = code
|
||
self.message = message
|
||
super().__init__(self.message)
|
||
|
||
def __str__(self):
|
||
return f"请求发生异常({self.status}, {self.message})"
|
||
|
||
|
||
# 请求参数数据模型
|
||
class Arguments(BaseModel):
|
||
"""
|
||
:param url: 统一资源定位符,基于统一资源定位符校验器进行校验
|
||
:param params: 查询参数
|
||
:param headers: 请求头
|
||
:param data: 表单数据
|
||
:param json_data: JSON # 入参时使用别名,出参时根据BY_ALIAS=TRUE确定是否使用别名
|
||
:param files: 上传文件
|
||
:param stream: 是否启用流式传输
|
||
:param guid: 全局唯一标识
|
||
"""
|
||
|
||
# 统一资源定位符
|
||
url: HttpUrl = Field(default=...)
|
||
# 查询参数
|
||
params: Optional[Dict] = Field(default=None)
|
||
# 请求头
|
||
headers: Optional[Dict] = Field(default=None)
|
||
# 表单数据
|
||
data: Optional[Dict] = Field(default=None)
|
||
# JSON
|
||
json_data: Optional[Dict] = Field(default=None, alias="json")
|
||
# 上传文件
|
||
files: Optional[
|
||
Dict[
|
||
str,
|
||
Union[
|
||
Tuple[str, bytes], Tuple[str, bytes, str], Tuple[str, bytes, str, dict]
|
||
],
|
||
]
|
||
] = Field(default=None)
|
||
# 是否启用流式传输
|
||
stream: Optional[bool] = Field(default=None)
|
||
# 全局唯一标识
|
||
guid: Optional[str] = Field(default=None)
|
||
|
||
# 表单数据和JSON数据互斥
|
||
@model_validator(mode="after")
|
||
def validate_data(self):
|
||
if self.data and self.json_data:
|
||
raise ValueError("cannot use both data and json parameters simultaneously")
|
||
return self
|
||
|
||
# 上传文件和启用流式传输互斥
|
||
@model_validator(mode="after")
|
||
def validate_files(self):
|
||
if self.files and self.stream:
|
||
raise ValueError(
|
||
"cannot use both files and stream parameters simultaneously"
|
||
)
|
||
return self
|
||
|
||
|
||
# HTTP客户端
|
||
class HTTPClient:
|
||
|
||
def __init__(
|
||
self,
|
||
timeout: int = 60,
|
||
default_headers: Optional[Dict[str, str]] = None,
|
||
total: int = 3,
|
||
backoff_factor: float = 0.5,
|
||
cache_enabled: bool = False,
|
||
cache_ttl: int = 90,
|
||
):
|
||
"""
|
||
:param timeout: 超时时间,单位为秒
|
||
:param default_headers: 默认请求头
|
||
:param total: 最大重试次数
|
||
:param backoff_factor: 重试间隔退避因子
|
||
:param cache_enabled: 是否使用缓存
|
||
:param cache_ttl: 缓存生存时间,单位为天
|
||
"""
|
||
|
||
# 超时时间
|
||
self.timeout = timeout
|
||
# 创建HTTP会话并挂载适配器
|
||
self.session = self._create_session(
|
||
default_headers=default_headers, total=total, backoff_factor=backoff_factor
|
||
)
|
||
|
||
# 是否使用缓存
|
||
self.cache_enabled = cache_enabled
|
||
# 缓存生存时间
|
||
self.cache_ttl = cache_ttl
|
||
# 若使用缓存,则初始化缓存数据库
|
||
if self.cache_enabled:
|
||
self._initialize_cache_database()
|
||
|
||
# 创建HTTP会话并挂载适配器
|
||
@staticmethod
|
||
def _create_session(
|
||
total: int,
|
||
backoff_factor: float,
|
||
default_headers: Optional[Dict[str, str]] = None,
|
||
) -> Session:
|
||
"""
|
||
:param default_headers 默认请求头
|
||
:param total 最大重试次数
|
||
:param backoff_factor 重试间隔退避因子
|
||
"""
|
||
|
||
# 创建会话对象
|
||
session = Session()
|
||
|
||
# 设置请求头
|
||
if default_headers:
|
||
session.headers.update(default_headers)
|
||
|
||
# 设置重试策略(优先按照服响应等待时长,若未返回则默认按照退避算法等待)
|
||
strategy_retries = Retry(
|
||
allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "PATCH"],
|
||
status_forcelist=[
|
||
408,
|
||
502,
|
||
503,
|
||
504,
|
||
], # 408:请求超时,502:网关错误,503:服务不可用,504:网关超时
|
||
total=total,
|
||
respect_retry_after_header=True,
|
||
backoff_factor=backoff_factor,
|
||
)
|
||
|
||
# 创建适配器并绑定重试策略
|
||
adapter = HTTPAdapter(max_retries=strategy_retries)
|
||
# 就HTTP请求生效
|
||
session.mount("http://", adapter)
|
||
# 就HTTPS请求生效
|
||
session.mount("https://", adapter)
|
||
|
||
return session
|
||
|
||
def _initialize_cache_database(self):
|
||
"""初始化缓存数据库"""
|
||
|
||
# 创建缓存数据库连接(使用SQLite)
|
||
self.cache_connection = sqlite3.connect(
|
||
database="SQLite.db", check_same_thread=False
|
||
)
|
||
|
||
self.cache_connection.execute(
|
||
"""CREATE TABLE IF NOT EXISTS caches (guid TEXT PRIMARY KEY, response TEXT, timestamp REAL)"""
|
||
)
|
||
# 创建时间戳索引
|
||
self.cache_connection.execute(
|
||
"""CREATE INDEX IF NOT EXISTS index_timestamp ON caches(timestamp)"""
|
||
)
|
||
# 删除过期缓存
|
||
self.cache_connection.execute(
|
||
"DELETE FROM caches WHERE timestamp < ?",
|
||
(time.time() - self.cache_ttl * 86400,), # 缓存生存时间单位转为秒
|
||
)
|
||
# 提交事物
|
||
self.cache_connection.commit()
|
||
|
||
# 在缓存数据库查询响应
|
||
def _query_response(self, guid: str) -> Optional[Dict]:
|
||
|
||
with threading.Lock():
|
||
cursor = None
|
||
try:
|
||
# 创建游标
|
||
cursor = self.cache_connection.cursor()
|
||
# 根据请求唯一标识查询响应
|
||
cursor.execute(
|
||
"SELECT response FROM caches WHERE guid = ? AND timestamp >= ?",
|
||
(guid, time.time() - self.cache_ttl * 86400),
|
||
)
|
||
if result := cursor.fetchone():
|
||
return json.loads(result[0])
|
||
return None
|
||
# 若发生异常则返回NONE
|
||
except:
|
||
self.cache_connection.rollback()
|
||
return None
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
# 将响应保存至缓存数据库
|
||
def _save_response(self, guid: str, response: Dict):
|
||
|
||
with threading.Lock():
|
||
cursor = None
|
||
try:
|
||
# 创建游标
|
||
cursor = self.cache_connection.cursor()
|
||
# 新增或覆盖响应
|
||
cursor.execute(
|
||
"INSERT OR REPLACE INTO caches (guid, response, timestamp) VALUES (?, ?, ?)",
|
||
(guid, json.dumps(response, ensure_ascii=False), time.time()),
|
||
)
|
||
# 提交事物
|
||
self.cache_connection.commit()
|
||
# 若发生异常则返回NONE
|
||
except:
|
||
self.cache_connection.rollback()
|
||
finally:
|
||
if cursor:
|
||
cursor.close()
|
||
|
||
# GET请求
|
||
def get(self, **kwargs) -> Union[Dict, str]:
|
||
|
||
return self._request(method="GET", arguments=Arguments(**kwargs))
|
||
|
||
# POST请求
|
||
def post(self, **kwargs) -> Union[Dict, str]:
|
||
|
||
return self._request(method="POST", arguments=Arguments(**kwargs))
|
||
|
||
# 文件下载
|
||
def download(
|
||
self, stream=False, chunk_size=1024, **kwargs
|
||
) -> Union[Dict, str, Generator[bytes, None, None]]:
|
||
|
||
response = self._request(
|
||
method="GET", arguments=Arguments(**{"stream": stream, **kwargs})
|
||
)
|
||
|
||
# 若禁用流式传输,则返回响应
|
||
if not stream:
|
||
return response
|
||
# 若启用流式传输,则处理流式传输响应并返回
|
||
return self._process_stream_response(response=response, chunk_size=chunk_size)
|
||
|
||
def _request(self, method: Literal["GET", "POST"], arguments: Arguments) -> Any:
|
||
"""发送请求"""
|
||
|
||
# 请求参数模型
|
||
arguments = arguments.model_dump(exclude_none=True, by_alias=True)
|
||
|
||
# URL由HTTPURL对象转为字符串
|
||
arguments["url"] = str(arguments["url"])
|
||
|
||
# 重构表单数据
|
||
if arguments.get("data") is not None:
|
||
arguments["data"] = {
|
||
key: value
|
||
for key, value in arguments["data"].items()
|
||
if value is not None
|
||
}
|
||
|
||
# 重构JSON格式数据
|
||
if arguments.get("json_data") is not None:
|
||
arguments["json_data"] = {
|
||
key: value
|
||
for key, value in arguments["json_data"].items()
|
||
if value is not None
|
||
}
|
||
|
||
# 重构文件数据
|
||
if arguments.get("files") is not None:
|
||
files_valid = {}
|
||
# 遍历文件数据键值对
|
||
for key, value in arguments["files"].items():
|
||
if isinstance(value, (tuple, list)):
|
||
match len(value):
|
||
# 若文件数据包括文件名称和文件内容
|
||
case 2:
|
||
files_valid[key] = (value[0], value[1], None, None)
|
||
# 若文件数据包含文件名称、文件内容和内容类型
|
||
case 3:
|
||
files_valid[key] = (value[0], value[1], value[2], None)
|
||
# 若文件数据包含文件名称、文件内容、内容类型和请求头
|
||
case 4:
|
||
files_valid[key] = (value[0], value[1], value[2], value[3])
|
||
arguments.update({"files": files_valid})
|
||
|
||
# 全局唯一标识
|
||
guid = arguments.pop("guid", None)
|
||
|
||
# 若使用缓存且本次请求参数包含全局唯一标识,则优先返回缓存数据库中响应
|
||
if self.cache_enabled and guid is not None:
|
||
# 在缓存数据库查询响应
|
||
response = self._query_response(guid=guid)
|
||
# 若缓存响应非空则返回
|
||
if response is not None:
|
||
return response
|
||
|
||
try:
|
||
# 发送请求
|
||
response = self.session.request(
|
||
method=method, timeout=self.timeout, **arguments
|
||
)
|
||
|
||
# 若返回错误状态码则抛出异常
|
||
response.raise_for_status()
|
||
# 处理响应
|
||
response = self._process_response(response=response)
|
||
|
||
# 若请求全局唯一标识非NONE则响应保存至缓存数据库
|
||
# noinspection PyUnboundLocalVariable
|
||
if guid is not None:
|
||
# noinspection PyUnboundLocalVariable
|
||
self._save_response(guid=guid, response=response)
|
||
|
||
return response
|
||
|
||
except Exception as exception:
|
||
# 尝试根据响应解析响应状态码和错误信息,否则进行构造
|
||
try:
|
||
# JOSN反序列化
|
||
# noinspection PyUnboundLocalVariable
|
||
response_decoded = response.json()
|
||
# 响应状态码
|
||
status = response_decoded["status"]
|
||
# 错误信息
|
||
message = response_decoded["message"]
|
||
except:
|
||
status = getattr(getattr(exception, "response", None), "status", None)
|
||
url = arguments["url"]
|
||
message = str(exception).split("\n")[0]
|
||
# 重新构建错误信息
|
||
message = f"{method} {url} failed: {message}"
|
||
raise RequestException(status=status, message=message)
|
||
|
||
# 处理响应
|
||
@staticmethod
|
||
def _process_response(response: Response) -> Any:
|
||
|
||
# 响应内容
|
||
content = response.content
|
||
# 若响应内容为空则返回NONE
|
||
if not content:
|
||
return None
|
||
|
||
# 标准化内容类型
|
||
content_type = (
|
||
response.headers.get("Content-Type", "").split(";")[0].strip().lower()
|
||
)
|
||
|
||
# 根据内容类型匹配解析返回内容方法
|
||
# noinspection PyUnreachableCode
|
||
match content_type:
|
||
case "application/json" | "text/json":
|
||
# JSON反序列化
|
||
return response.json()
|
||
case "application/xml" | "text/xml":
|
||
# 解析为XML(ELEMENT对象)
|
||
return ElementTree.fromstring(text=content)
|
||
case _:
|
||
# 若内容类型以IMAGE/开头则返回图片格式和图片数据
|
||
if content_type.startswith("image/"):
|
||
# 图片格式
|
||
image_format = content_type.split(sep="/", maxsplit=1)[1]
|
||
return f"{image_format}", content
|
||
else:
|
||
return content
|
||
|
||
# 处理流式传输响应
|
||
@staticmethod
|
||
def _process_stream_response(
|
||
response: Response, chunk_size: int
|
||
) -> Generator[bytes, None, None]: # 生成器不接受发SEND发送至、结束时返回NONE
|
||
|
||
# 检查数据分块
|
||
if not isinstance(chunk_size, int) and isinstance(chunk_size, bool):
|
||
raise ValueError("chunk_size must type=int")
|
||
|
||
if chunk_size <= 0:
|
||
raise ValueError("chunk_size must >0")
|
||
|
||
try:
|
||
for chunk in response.iter_content(chunk_size=chunk_size):
|
||
if chunk:
|
||
yield chunk
|
||
finally:
|
||
response.close()
|
||
|
||
|
||
class Authenticator:
|
||
|
||
def __init__(
|
||
self,
|
||
):
|
||
"""认证器(用于获取访问令牌)"""
|
||
|
||
# 初始化
|
||
self._initialize()
|
||
|
||
def _initialize(self):
|
||
"""初始化访问凭证"""
|
||
|
||
# 创建访问凭证地址对象
|
||
self.certifications_path = (
|
||
Path(__file__).parent.resolve() / "certifications.json"
|
||
)
|
||
# 若访问凭证地址对象不存在则创建
|
||
if not self.certifications_path.exists():
|
||
with open(self.certifications_path, "w", encoding="utf-8") as file:
|
||
json.dump(
|
||
{},
|
||
file,
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
# 初始化HTTP客户端
|
||
self.http_client = HTTPClient()
|
||
|
||
def _szkt_get_certification(self) -> tuple[str, float]:
|
||
"""获取深圳快瞳访问凭证"""
|
||
|
||
# 请求深圳快瞳访问凭证获取接口
|
||
response = self.http_client.get(
|
||
url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d"
|
||
)
|
||
|
||
# 若响应非成功则抛出异常
|
||
if not (response["status"] == 200 and response["code"] == 0):
|
||
raise RuntimeError("获取深圳快瞳访问凭证发生异常")
|
||
|
||
# 返回令牌、失效时间戳
|
||
# noinspection PyTypeChecker
|
||
return (
|
||
response["data"]["access_token"],
|
||
time.time() + response["data"]["expires_in"],
|
||
)
|
||
|
||
def _hlyj_get_certification(self) -> tuple[str, float]:
|
||
"""获取合力亿捷访问凭证"""
|
||
|
||
# 企业访问标识
|
||
access_key_id = "25938f1c190448829dbdb5d344231e42"
|
||
|
||
# 签名秘钥
|
||
secret_access_key = "44dc0299aff84d68ae27712f8784f173"
|
||
|
||
# 时间戳(秒级)
|
||
timestamp = int(time.time())
|
||
|
||
# 签名,企业访问标识、签名秘钥和时间戳拼接后计算的十六进制的HMAC-SHA256
|
||
signature = hmac.new(
|
||
secret_access_key.encode("utf-8"),
|
||
f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"),
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
|
||
# 请求合力亿捷访问凭证获取接口
|
||
response = self.http_client.get(
|
||
url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}"
|
||
)
|
||
|
||
# 若响应非成功则抛出异常
|
||
if not response["success"]:
|
||
raise RuntimeError("获取合力亿捷访问凭证发生异常")
|
||
|
||
# 返回令牌、失效时间戳
|
||
# noinspection PyTypeChecker
|
||
return (
|
||
response["data"],
|
||
time.time() + 3600, # 访问令牌有效期为1小时
|
||
)
|
||
|
||
def _feishu_get_certification(self) -> tuple[str, float]:
|
||
"""获取飞书访问凭证"""
|
||
|
||
# 请求飞书访问凭证获取接口
|
||
response = self.http_client.post(
|
||
url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
data={
|
||
"app_id": "cli_a1587980be78500c",
|
||
"app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA",
|
||
},
|
||
)
|
||
|
||
# 若响应非成功则抛出异常
|
||
if not response["code"] == 0:
|
||
raise RuntimeError("获取飞书访问凭证发生异常")
|
||
|
||
# 返回令牌、失效时间戳
|
||
# noinspection PyTypeChecker
|
||
return (
|
||
response["tenant_access_token"],
|
||
time.time() + response["expire"],
|
||
)
|
||
|
||
def get_token(self, servicer: str) -> str | None:
|
||
"""获取访问令牌"""
|
||
"""
|
||
:param servicer: 服务商,数据类型为字符串
|
||
"""
|
||
|
||
with threading.Lock():
|
||
# 初始化令牌和失效时间戳
|
||
token, expired_timestamp = None, 0
|
||
try:
|
||
with open(self.certifications_path, "r", encoding="utf-8") as file:
|
||
# 读取所有服务商访问凭证
|
||
certifications = json.load(file)
|
||
# 获取服务商访问凭证
|
||
certification = certifications.get(servicer, None)
|
||
# 若服务商访问凭证非NONE则解析令牌和失效时间戳
|
||
if certification is not None:
|
||
# 解析服务商访问令牌
|
||
token = certification["token"]
|
||
# 解析服务商访问令牌失效时间戳
|
||
expired_timestamp = certification["expired_timestamp"]
|
||
|
||
# 若JSON反序列化时发生异常则重置访问凭证
|
||
except json.decoder.JSONDecodeError:
|
||
with open(self.certifications_path, "w", encoding="utf-8") as file:
|
||
json.dump(
|
||
{},
|
||
file,
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
except Exception:
|
||
raise RuntimeError("获取访问令牌发生异常")
|
||
|
||
# 若当前时间戳大于失效时间戳,则请求服务商获取访问凭证接口
|
||
if time.time() > expired_timestamp:
|
||
# noinspection PyUnreachableCode
|
||
match servicer:
|
||
case "szkt":
|
||
# 获取深圳快瞳访问凭证
|
||
token, expired_timestamp = self._szkt_get_certification()
|
||
case "feishu":
|
||
token, expired_timestamp = self._feishu_get_certification()
|
||
case "hlyj":
|
||
token, expired_timestamp = self._hlyj_get_certification()
|
||
case _:
|
||
raise RuntimeError(f"服务商({servicer})未设置获取访问令牌方法")
|
||
|
||
# 更新服务商访问凭证
|
||
certifications[servicer] = {
|
||
"token": token,
|
||
"expired_timestamp": expired_timestamp,
|
||
}
|
||
|
||
# 将所有服务商访问凭证保存至本地文件
|
||
with open(self.certifications_path, "w", encoding="utf-8") as file:
|
||
json.dump(
|
||
certifications,
|
||
file,
|
||
ensure_ascii=False,
|
||
)
|
||
|
||
return token
|
||
|
||
|
||
"""
|
||
封装飞书客户端,实现获取验证码、操作多维表格等
|
||
"""
|
||
|
||
|
||
class FeishuClinet:
|
||
|
||
def __init__(self):
|
||
|
||
self.authenticator = Authenticator()
|
||
|
||
self.http_client = HTTPClient()
|
||
|
||
def _headers(self):
|
||
"""请求头"""
|
||
|
||
# 装配飞书访问凭证
|
||
return {
|
||
"Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}",
|
||
}
|
||
|
||
@staticmethod
|
||
def get_verification_code():
|
||
|
||
try:
|
||
|
||
# 执行时间戳
|
||
execute_timestamp = time.time()
|
||
|
||
# 超时时间戳
|
||
timeout_timestamp = execute_timestamp + 65
|
||
|
||
# 建立加密IMAP连接
|
||
server = IMAP4_SSL("imap.feishu.cn", 993)
|
||
|
||
# 登录
|
||
server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2")
|
||
|
||
while True:
|
||
|
||
# 若当前时间戳大于超时时间戳则返回NONE
|
||
if time.time() <= timeout_timestamp:
|
||
|
||
# 等待10秒
|
||
time.sleep(10)
|
||
|
||
# 选择文件夹(邮箱验证码)
|
||
server.select("&kK57sZqMi8F4AQ-")
|
||
|
||
try:
|
||
|
||
# 获取最后一封邮件索引,server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引
|
||
index = server.search(None, "ALL")[1][0].split()[-1]
|
||
|
||
# 获取最后一封邮件内容并解析,server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文
|
||
# noinspection PyUnresolvedReferences
|
||
contents = BytesParser(policy=default).parsebytes(
|
||
server.fetch(index, "(RFC822)")[1][0][1]
|
||
)
|
||
|
||
# 遍历邮件内容,若正文内容类型为纯文本或HTML则解析发送时间和验证码
|
||
for content in contents.walk():
|
||
|
||
if (
|
||
content.get_content_type() == "text/plain"
|
||
or content.get_content_type() == "text/html"
|
||
):
|
||
|
||
# 邮件发送时间戳
|
||
# noinspection PyUnresolvedReferences
|
||
send_timestamp = parsedate_to_datetime(
|
||
content["Date"]
|
||
).timestamp()
|
||
|
||
# 若邮件发送时间戳大于执行时间戳则解析验证码并返回
|
||
if (
|
||
execute_timestamp
|
||
> send_timestamp
|
||
>= execute_timestamp - 35
|
||
):
|
||
|
||
# 登出
|
||
server.logout()
|
||
|
||
# 解析验证码
|
||
return re.search(
|
||
r"【普康健康】您的验证码是:(\d+)",
|
||
content.get_payload(decode=True).decode(),
|
||
).group(1)
|
||
|
||
# 若文件夹无邮件则继续
|
||
except:
|
||
|
||
pass
|
||
|
||
# 若超时则登出
|
||
else:
|
||
|
||
server.logout()
|
||
|
||
return None
|
||
|
||
except Exception:
|
||
|
||
raise RuntimeError("获取邮箱验证码发生其它异常")
|
||
|
||
# 查询多维表格记录,单次最多查询500条记录
|
||
@restrict(refill_rate=5, max_tokens=5)
|
||
def query_bitable_records(
|
||
self,
|
||
bitable: str,
|
||
table_id: str,
|
||
field_names: Optional[list[str]] = None,
|
||
filter_conditions: Optional[dict] = None,
|
||
) -> pandas.DataFrame:
|
||
|
||
# 先查询多维表格记录,在根据字段解析记录
|
||
|
||
# 装配多维表格查询记录地址
|
||
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20"
|
||
|
||
response = self.http_client.post(
|
||
url=url,
|
||
headers=self._headers(),
|
||
json={"field_names": field_names, "filter": filter_conditions},
|
||
)
|
||
|
||
# 响应业务码为0则定义为响应成功
|
||
assert response.get("code") == 0, "查询多维表格记录发生异常"
|
||
|
||
# 多维表格记录
|
||
records = response.get("data").get("items")
|
||
|
||
# 检查响应中是否包含还有下一页标识,若有则继续请求下一页
|
||
while response.get("data").get("has_more"):
|
||
|
||
url_next = url + "&page_token={}".format(
|
||
response.get("data").get("page_token")
|
||
)
|
||
|
||
response = self.http_client.post(
|
||
url=url_next,
|
||
headers=self._headers(),
|
||
json={"field_names": field_names, "filter": filter_conditions},
|
||
)
|
||
|
||
assert response.get("code") == 0, "查询多维表格记录发生异常"
|
||
|
||
# 合并记录
|
||
records.append(response.get("data").get("items"))
|
||
|
||
# 装配多维表格列出字段地址
|
||
url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20"
|
||
|
||
response = self.http_client.get(
|
||
url=url,
|
||
headers=self._headers(),
|
||
)
|
||
|
||
assert response.get("code") == 0, "列出多维表格字段发生异常"
|
||
|
||
# 多维表格字段
|
||
fields = response.get("data").get("items")
|
||
|
||
while response.get("data").get("has_more"):
|
||
|
||
url_next = url + "&page_token={}".format(
|
||
response.get("data").get("page_token")
|
||
)
|
||
|
||
response = self.http_client.get(
|
||
url=url_next,
|
||
headers=self._headers(),
|
||
)
|
||
|
||
assert response.get("code") == 0, "列出多维表格字段发生异常"
|
||
|
||
fields.append(response.get("data").get("items"))
|
||
|
||
# 字段映射
|
||
field_mappings = {}
|
||
|
||
for field in fields:
|
||
|
||
# 字段名
|
||
field_name = field["field_name"]
|
||
|
||
# 根据字段类型匹配
|
||
match field["type"]:
|
||
|
||
case 1005:
|
||
|
||
field_type = "主键"
|
||
|
||
case 1:
|
||
|
||
field_type = "文本"
|
||
|
||
case 3:
|
||
|
||
field_type = "单选"
|
||
|
||
case 2:
|
||
|
||
# 数字、公式字段的显示格式
|
||
match field["property"]["formatter"]:
|
||
|
||
case "0":
|
||
|
||
field_type = "整数"
|
||
|
||
case _:
|
||
|
||
raise ValueError("未设置数字、公式字段的显示格式")
|
||
|
||
case _:
|
||
|
||
raise ValueError("未设置字段类型")
|
||
|
||
# noinspection PyUnboundLocalVariable
|
||
field_mappings.update({field_name: field_type})
|
||
|
||
# 记录数据体
|
||
records_data = []
|
||
|
||
# 解析记录
|
||
for record in records:
|
||
|
||
# 单条记录数据体
|
||
record_data = {}
|
||
|
||
for field_name, content in record["fields"].items():
|
||
|
||
match field_mappings[field_name]:
|
||
|
||
case "主键" | "单选" | "整数":
|
||
|
||
record_data.update({field_name: content})
|
||
|
||
case "文本":
|
||
|
||
# 若存在多行文本则拼接
|
||
fragments_content = ""
|
||
|
||
for fragment_content in content:
|
||
|
||
fragments_content += fragment_content["text"]
|
||
|
||
record_data.update({field_name: fragments_content})
|
||
|
||
case _:
|
||
|
||
raise ValueError("未设置字段解析方法")
|
||
|
||
records_data.append(record_data)
|
||
|
||
return pandas.DataFrame(records_data)
|