# -*- coding: utf-8 -*- # 导入模块 import hashlib import hmac import json import re import sqlite3 import threading import time from email.parser import BytesParser from email.policy import default from email.utils import parsedate_to_datetime from functools import wraps from imaplib import IMAP4_SSL from pathlib import Path from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union from urllib.parse import quote_plus from xml.etree import ElementTree import pandas from pydantic import BaseModel, Field, HttpUrl, model_validator from requests import Response, Session from requests.adapters import HTTPAdapter from sqlalchemy import create_engine, text from urllib3.util.retry import Retry """ 封装sqlalchemy,实现按照SQL查询 类和函数的区别 类作为对象的模板,定义了对象的结构和行为;而函数则作为实现特定功能或操作的代码块,提高了代码的可读性和可维护性。 使用方法: with MySQLClient(database='data_analysis') as client: dataframe = client.execute_query(sql='select * from {{}}')') """ class MySQLClient: """MySQL客户端""" def __init__( self, database, host: str = "cdb-7z9lzx4y.cd.tencentcdb.com", port: int = 10039, username: str = "root", password: str = "Te198752", ): # 默认为刘弼仁的腾讯云MySQL数据库 # 数据库登录密码安全编码 password = quote_plus(password) # 构建数据库连接字符 connect_config = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8" # 创建MySQL引擎并连接数据库 self.engine = create_engine( connect_config, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True, ) # 连接池中保持打开连接的数量为5,额外连接数为10,连接1小时后重新回收重连,检查连接有效性 def __del__(self): """析构时自动关闭连接""" if hasattr(self, "engine") and self.engine: self.engine.dispose() def execute_query(self, sql: str) -> pandas.DataFrame: """执行SQL查询并返回DATAFRAME""" if not hasattr(self, "engine") or not self.engine: raise ConnectionError("未创建数据库连接") try: with self.engine.connect() as connection: dataframe = pandas.read_sql_query( text(sql), connection, coerce_float=False ) # 不尝试将非整数数值转为浮点(维持DECIMAL) return dataframe except: connection.rollback() raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常") class SQLiteClient: """SQLite客户端""" def __init__(self, database: Union[str, Path]): """ 初始化SQLite客户端 :param database: 数据库 """ self.database = database # 初始化本地线程存储 self.threads = threading.local() def _connect(self): """为当前线程创建数据库连接和游标""" # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 if hasattr(self.threads, "connection") and self.threads.connection is not None: return # 为当前线程关闭数据库连接和游标 self._disconnect() # noinspection PyBroadException try: # 为当前线程创建数据库连接 self.threads.connection = sqlite3.connect( database=self.database, check_same_thread=True, timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 ) # 开启行映射,支持按照字段名取值 self.threads.connection.row_factory = sqlite3.Row # 为当前线程创建数据库游标 self.threads.cursor = self.threads.connection.cursor() except Exception as exception: self.threads.connection = None self.threads.cursor = None raise RuntimeError( f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" ) from exception def _disconnect(self) -> None: """为当前线程关闭数据库连接和游标""" # 检查当前线程有数据库游标,若有则关闭数据库游标 if hasattr(self.threads, "cursor") and self.threads.cursor is not None: # noinspection PyBroadException try: # 为当前线程关闭数据库游标 self.threads.cursor.close() self.threads.cursor = None except Exception as exception: raise RuntimeError( f"为当前线程关闭数据库游标发生异常,{str(exception)}" ) from exception # 检查当前线程有数据库连接,若有则关闭数据库连接 if hasattr(self.threads, "connection") and self.threads.connection is not None: # noinspection PyBroadException try: # 为当前线程提交事务 self.threads.connection.commit() # 为当前线程关闭数据库连接 self.threads.connection.close() self.threads.connection = None except Exception as exception: raise RuntimeError( f"为当前线程关闭数据库连接发生异常,{str(exception)}" ) from exception def _query_one( self, sql: str, parameters: Tuple[Any, ...] = () ) -> Optional[Dict[str, Any]]: """ 为当前线程查询并获取单行数据 :param sql: 查询SQL语句 :param parameters: SQL参数 :return: 单行数据 """ # noinspection PyBroadException try: # 为当前线程创建数据库连接和游标 self._connect() # 检查当前线程无数据库游标,若无则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("为当前线程创建数据库游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) return ( None if (result := self.threads.cursor.fetchone()) is None else dict(result) ) # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: """ 为当前线程执行SQL :param sql: 新增、删除和修改SQL语句 :param parameters: SQL参数 :return: 执行结果 """ try: self._connect() # 检查当前线程无数据库游标,若无则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("为当前线程创建数据库游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) # 为当前线程提交事务 self.threads.connection.commit() return True # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError("为当前线程执行SQL发生异常") from exception def __enter__(self): """进入上下文管理时为当前线程创建数据库连接和游标""" self._connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """退出上下文管理时为当前线程关闭数据库连接和游标""" self._disconnect() return False def __del__(self): """析构时为当前线程关闭数据库连接和游标""" self._disconnect() # 基于令牌桶限流算法的装饰器 def restrict(refill_rate: float = 5.0, max_tokens: int = 5): class TokenBucket: # noinspection PyShadowingNames def __init__(self, max_tokens: int, refill_rate: float): """ 初始化令牌桶限流 :param refill_rate: 令牌填充速率,单位为个/秒 :param max_tokens: 最大令牌数,单位为个 """ # 初始化最大令牌数 self.max_tokens = max_tokens # 初始化当前令牌数 self.tokens = self.max_tokens * 0.5 # 初始化令牌填充速率 self.refill_rate = refill_rate # 初始化上一次填充令牌的时间戳(使用单调时间戳) self.refill_timestamp = time.monotonic() # 初始化线程锁(所有线程共用) self.thread_lock = threading.Lock() # 填充令牌 def _refill(self) -> None: with self.thread_lock: # 本次填充令牌的时间戳 refill_timestamp = time.monotonic() # 重新计算令牌桶中令牌数 # noinspection PyTypeChecker self.tokens = min( self.max_tokens, max( 0, self.tokens + self.refill_rate * (refill_timestamp - self.refill_timestamp), ), ) self.refill_timestamp = refill_timestamp # 尝试消耗令牌 def consume(self) -> Tuple[bool, float]: # 填充令牌 self._refill() with self.thread_lock: if self.tokens >= 1: self.tokens -= 1 return True, 0 # 等待时长 # noinspection PyTypeChecker wait_time = min( 1 / self.refill_rate, max( 0, 1 / self.refill_rate - (time.monotonic() - self.refill_timestamp), ), ) return False, wait_time # 初始化所有被装饰的函数创建令牌桶限流存储 buckets = {} def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): # 若当前被装饰的函数不在所有被装饰的函数创建令牌桶限流存储则为当前被装饰的函数实例化令牌桶限流 if func not in buckets: # 初始化令牌桶限流 buckets[func] = TokenBucket( refill_rate=refill_rate, max_tokens=max_tokens ) bucket = buckets[func] # 重试次数 retries = 0 while retries <= 10: # 尝试消耗令牌 success, wait_time = bucket.consume() # 若消耗令牌成功则返回被装饰的函数,否则等待 if success: return func(*args, **kwargs) time.sleep(wait_time * 2) retries += 1 raise Exception("request too frequently") return wrapper return decorator class HTTPClient: """请求客户端""" class RequestException(Exception): """请求异常""" def __init__( self, status: Optional[int] = 400, code: int = 0, message: str = "请求发生异常", ): """ :param status: 状态码 :param code: 错误码 :param message: 错误信息 """ self.status = status self.code = code self.message = message super().__init__(self.message) def __str__(self): return f"请求发生异常(status={self.status}, code={self.code},message={self.message})" class Parameters(BaseModel): """ 请求参数模型,支持自动校验 """ url: HttpUrl = Field( default=..., description="统一资源定位符,基于HttpUrl自动校验" ) params: Optional[Dict[str, Any]] = Field( default=None, description="统一资源定位符的查询参数" ) headers: Optional[Dict[str, str]] = Field(default=None, description="请求头") data: Optional[Dict[str, Any]] = Field(default=None, description="表单数据") json_data: Optional[Dict[str, Any]] = Field( default=None, alias="json", description="JSON数据" ) files: Optional[ Dict[ str, Union[ Tuple[str, bytes], Tuple[str, bytes, str], Tuple[str, bytes, str, Dict[str, str]], ], ] ] = Field( default=None, description="上传文件,{字段名: (文件名, 字节数据, 内容类型, 请求头)}", ) stream_enabled: Optional[bool] = Field(default=None, description="使用流式传输") guid: Optional[str] = Field(default=None, description="缓存全局唯一标识") @model_validator(mode="after") def validate_data(self): """校验:表单数据和JSON数据互斥""" if self.data is not None and self.json_data is not None: raise ValueError("表单数据和JSON数据不能同时使用") return self @model_validator(mode="after") def validate_files(self): if self.files is not None and self.stream_enabled: raise ValueError("上传文件和使用流式传输不能同时使用") return self class CacheClient(SQLiteClient): """缓存客户端""" def __init__(self, cache_ttl: int): """ 初始化缓存数据库 :param cache_ttl: 缓存生存时间,单位为天 """ # 初始化SQLite客户端 super().__init__(database=Path(__file__).parent.resolve() / "caches.db") # 初始化缓存生存时间,单位为秒 self.cache_ttl = cache_ttl # 初始化缓存表、时间戳索引和清理过期缓存 try: with self: self._execute( sql="""CREATE TABLE IF NOT EXISTS caches (guid TEXT PRIMARY KEY, cache TEXT NOT NULL, timestamp REAL NOT NULL)""" ) self._execute( sql="""CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)""" ) self._execute( sql="DELETE FROM caches WHERE timestamp < ?", parameters=(time.time() - self.cache_ttl,), ) except Exception as exception: raise RuntimeError( f"初始化缓存数据库发生异常:{str(exception)}" ) from exception # noinspection PyShadowingNames def query(self, guid: str) -> Optional[Dict[str, Any]]: """ 查询并获取单条缓存 :param guid: 缓存唯一标识 :return: 缓存 """ # noinspection PyBroadException try: with self: result = self._query_one( sql="SELECT cache FROM caches WHERE guid = ? AND timestamp >= ?", parameters=(guid, time.time() - self.cache_ttl), ) # 就查询结果JSON反序列化 if result is not None and "cache" in result: return json.loads(result["cache"]) return None except Exception as exception: raise RuntimeError("查询并获取单条缓存发生异常") from exception # noinspection PyShadowingNames def update(self, guid: str, cache: Dict) -> Optional[bool]: """ 新增或更新缓存(若无则新增缓存,若有则更新缓存) :param guid: 缓存唯一标识 :param cache: 缓存 :return: 成功返回True,失败返回False """ # noinspection PyBroadException try: with self: return self._execute( sql="INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?)", parameters=( guid, json.dumps(cache, ensure_ascii=False), time.time(), ), ) except Exception as exception: raise RuntimeError("新增或更新缓存发生异常") from exception def __init__( self, default_headers: Optional[Dict[str, str]] = None, total: int = 3, backoff_factor: float = 0.5, timeout: int = 60, cache_enabled: bool = False, cache_ttl: int = 360, ): """ :param default_headers: 默认请求头 :param total: 最大重试次数 :param backoff_factor: 重试间隔退避因子 :param timeout: 超时时间,单位为秒 :param cache_enabled: 使用缓存 :param cache_ttl: 缓存生存时间,单位为天 """ # 创建请求会话并挂载适配器 self.session = self._create_session( default_headers=default_headers, total=total, backoff_factor=backoff_factor ) # 初始化超时时间 self.timeout = timeout # 初始化使用缓存 self.cache_enabled = cache_enabled # 初始化缓存生存时间,单位为秒 self.cache_ttl = cache_ttl * 86400 self.cache_client: Optional[HTTPClient.CacheClient] = None # 若使用缓存则实例化缓存客户端 if self.cache_enabled: # 初始化缓存客户端 self.cache_client = self.CacheClient(cache_ttl=self.cache_ttl) def __del__(self): """析构时关闭请求会话""" if hasattr(self, "session") and self.session: self.session.close() @staticmethod def _create_session( total: int, backoff_factor: float, default_headers: Optional[Dict[str, str]] = None, ) -> Session: """ 创建请求会话并挂载适配器 :param default_headers: 默认请求头 :param total: 最大重试次数 :param backoff_factor: 重试间隔退避因子 :return Session: 请求会话实例 """ # 实例化请求会话 session = Session() # 设置默认请求头 if default_headers: session.headers.update(default_headers) # 设置重试策略并挂载适配器 adapter = HTTPAdapter( max_retries=Retry( allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "PATCH"], status_forcelist=[ 408, 502, 503, 504, ], # 408为请求超时,502为网关错误,503为服务不可用,504为网关超时 total=total, respect_retry_after_header=True, backoff_factor=backoff_factor, ) ) session.mount("http://", adapter) session.mount("https://", adapter) return session def get( self, **kwargs ) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]: """发送GET请求""" return self._request(method="GET", parameters=self.Parameters(**kwargs)) def post( self, **kwargs ) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]: """发送POST请求""" return self._request(method="POST", parameters=self.Parameters(**kwargs)) def download( self, stream_enabled: bool = False, chunk_size: int = 1024, **kwargs ) -> Union[ str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, Generator[bytes, None, None], None, ]: response = self._request( method="GET", parameters=self.Parameters(**{"stream_enabled": stream_enabled, **kwargs}), ) """ 下载文件 :param stream_enabled: 使用流式传输 :param chunk_size: 流式传输的分块大小 """ # 若是用流式传输则处理流式传输响应 if stream_enabled: return self._process_stream_response( response=response, chunk_size=chunk_size ) return response def _request( self, method: Literal["GET", "POST"], parameters: Parameters ) -> Union[ str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, Response, None ]: """请求""" # 将请求参数模型转为请求参数字典 parameters = parameters.model_dump(exclude_none=True, by_alias=True) # 将URL由HttpUrl对象转为字符串 parameters["url"] = str(parameters["url"]) # 过滤表单数据中None值 if parameters.get("data") is not None: parameters["data"] = { k: v for k, v in parameters["data"].items() if v is not None } # 过滤JSON数据中None值 if parameters.get("json_data") is not None: parameters["json_data"] = { k: v for k, v in parameters["json_data"].items() if v is not None } # 使用流式传输 stream_enabled = parameters.pop("stream_enabled", False) # 缓存全局唯一标识 guid = parameters.pop("guid", None) # 若使用缓存且缓存全局唯一标识非空则查询并获取单条缓存 if self.cache_enabled and guid is not None: cache = self.cache_client.query(guid) if cache is not None: return cache # 发送请求并处理响应 # noinspection PyBroadException try: response = self.session.request( method=method, timeout=self.timeout, **parameters ) response.raise_for_status() # 若返回非2??状态码则抛出异常 # 若使用流式传输则直接返回(不缓存) if stream_enabled: return response # 处理响应 response = self._process_response(response=response) # 若使用缓存且缓存全局唯一标识非空则新增或更新缓存 if self.cache_enabled and guid is not None: self.cache_client.update(guid, response) return response except Exception as exception: # noinspection PyBroadException try: response = response or getattr(exception, "response", None) status = ( response.json().get("status", response.status_code) if response is not None else None ) message = ( response.json().get("message", str(exception).splitlines()[0]) if response is not None else str(exception).splitlines()[0] ) except Exception: status = None message = f"{method} {parameters["url"]} 请求发生异常:{str(exception).split("\n")[0]}" raise self.RequestException(status=status, message=message) from exception # 处理响应 @staticmethod def _process_response( response: Response, ) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]: # 若响应内容为空则返回None content = response.content if not response: return None # 响应类型 _type = response.headers.get("Content-Type", "").split(";")[0].strip().lower() # 根据响应类型匹配响应内容解析方法并返回 # noinspection PyUnreachableCode match _type: # JSON:JSON反序列化 case "application/json" | "text/json": return response.json() # XML:解析为XML对象(Element实例) case "application/xml" | "text/xml": return ElementTree.fromstring(content) # 以image/开头:返回影像件格式和响应内容 case _ if _type.startswith("image/"): # 影像件格式 image_format = _type.split(sep="/", maxsplit=1)[1] return image_format, content # 其它的响应类型,先UTF8解码再返回,若解码发生异常则直接返回 case _: try: return content.decode("utf-8") except UnicodeDecodeError: return content # 处理流式传输响应 @staticmethod def _process_stream_response( response: Response, chunk_size: int ) -> Generator[bytes, None, None]: """ 处理流式响应 :param response: requests.Response对象 :param chunk_size: 分块大小 :return: 字节数据块生成器 """ if not isinstance(chunk_size, int): raise ValueError("分块大小数据类型必须为整数") if chunk_size <= 0: raise ValueError("分块大小必须大于0") try: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: yield chunk finally: response.close() class Authenticator: def __init__( self, ): """认证器(用于获取访问令牌)""" # 初始化 self._initialize() def _initialize(self): """初始化""" # 初始化访问凭证地址对象 self.certifications_path = ( Path(__file__).parent.resolve() / "certifications.json" ) # 若访问凭证地址对象不存在则创建 if not self.certifications_path.exists(): with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( {}, file, ensure_ascii=False, ) # 初始化请求客户端 self.http_client = HTTPClient() def _szkt_get_certification(self) -> tuple[str, float]: """获取深圳快瞳访问凭证""" response = self.http_client.get( url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" ) # 若非响应成功则抛出异常 if not (response["status"] == 200 and response["code"] == 0): raise RuntimeError("获取深圳快瞳访问凭证发生异常") # 返回访问令牌、失效时间戳 # noinspection PyTypeChecker return ( response["data"]["access_token"], time.time() + response["data"]["expires_in"], ) def _hlyj_get_certification(self) -> Tuple[str, float]: """获取合力亿捷访问凭证""" # 企业访问标识 access_key_id = "25938f1c190448829dbdb5d344231e42" # 签名秘钥 secret_access_key = "44dc0299aff84d68ae27712f8784f173" # 时间戳(秒级) timestamp = int(time.time()) # 签名,企业访问标识、签名秘钥和时间戳拼接后计算的十六进制的HMAC-SHA256 signature = hmac.new( secret_access_key.encode("utf-8"), f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"), hashlib.sha256, ).hexdigest() response = self.http_client.get( url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}" ) # 若非响应成功则抛出异常 if not response["success"]: raise RuntimeError("获取合力亿捷访问凭证发生异常") # 返回访问令牌、失效时间戳 # noinspection PyTypeChecker return ( response["data"], time.time() + 1 * 60 * 60, # 访问令牌有效期为1小时 ) def _feishu_get_certification(self) -> tuple[str, float]: """获取飞书访问凭证""" response = self.http_client.post( url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", data={ "app_id": "cli_a1587980be78500c", "app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA", }, ) # 若非响应成功则抛出异常 if not response["code"] == 0: raise RuntimeError("获取飞书访问凭证发生异常") # 返回访问令牌、失效时间戳 # noinspection PyTypeChecker return ( response["tenant_access_token"], time.time() + response["expire"], ) def get_token(self, servicer: str) -> str | None: """ 获取访问令牌 :param servicer: 服务商,暂仅支持深圳快瞳、合力亿捷和飞书 :return token: 访问令牌 """ with threading.Lock(): # 初始化访问令牌和失效时间戳 token, expired_timestamp = None, 0 try: with open(self.certifications_path, "r", encoding="utf-8") as file: # 本地打开并读取所有服务商的访问凭证 certifications = json.load(file) # 获取指定服务商的访问凭证 certification = certifications.get(servicer, None) # 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳 if certification is not None: # 访问令牌 token = certification["token"] # 失效时间戳 expired_timestamp = certification["expired_timestamp"] # 若反序列化发生异常则重置访问凭证储存文件 except json.decoder.JSONDecodeError: with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( {}, file, ensure_ascii=False, ) except Exception: raise RuntimeError("获取访问令牌发生异常") if time.time() > expired_timestamp: # noinspection PyUnreachableCode match servicer: case "szkt": token, expired_timestamp = self._szkt_get_certification() case "hlyj": token, expired_timestamp = self._hlyj_get_certification() case "feishu": token, expired_timestamp = self._feishu_get_certification() case _: raise RuntimeError(f"未设置服务商:{servicer}获取访问凭证方法") # 更新服务商访问凭证 certifications[servicer] = { "token": token, "expired_timestamp": expired_timestamp, } # 将所有服务商访问凭证保存至本地文件 with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( certifications, file, ensure_ascii=False, ) return token class FeishuClient: """飞书客户端""" def __init__(self): self.authenticator = Authenticator() self.http_client = HTTPClient() def _headers(self): """请求头""" # 装配飞书访问凭证 return { "Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}", } @staticmethod def get_verification_code(): try: # 执行时间戳 execute_timestamp = time.time() # 超时时间戳 timeout_timestamp = execute_timestamp + 65 # 建立加密IMAP连接 server = IMAP4_SSL("imap.feishu.cn", 993) # 登录 server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2") while True: # 若当前时间戳大于超时时间戳则返回NONE if time.time() <= timeout_timestamp: # 等待10秒 time.sleep(10) # 选择文件夹(邮箱验证码) server.select("&kK57sZqMi8F4AQ-") # noinspection PyBroadException try: # 获取最后一封邮件索引,server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引 index = server.search(None, "ALL")[1][0].split()[-1] # 获取最后一封邮件内容并解析,server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文 # noinspection PyUnresolvedReferences contents = BytesParser(policy=default).parsebytes( server.fetch(index, "(RFC822)")[1][0][1] ) # 遍历邮件内容,若正文内容类型为纯文本或HTML则解析发送时间和验证码 for content in contents.walk(): if ( content.get_content_type() == "text/plain" or content.get_content_type() == "text/html" ): # 邮件发送时间戳 # noinspection PyUnresolvedReferences send_timestamp = parsedate_to_datetime( content["Date"] ).timestamp() # 若邮件发送时间戳大于执行时间戳则解析验证码并返回 if ( execute_timestamp > send_timestamp >= execute_timestamp - 35 ): # 登出 server.logout() # 解析验证码 return re.search( r"【普康健康】您的验证码是:(\d+)", content.get_payload(decode=True).decode(), ).group(1) # 若文件夹无邮件则继续 except: pass # 若超时则登出 else: server.logout() return None except Exception: raise RuntimeError("获取邮箱验证码发生其它异常") # 查询多维表格记录,单次最多查询500条记录 @restrict(refill_rate=5, max_tokens=5) def query_bitable_records( self, bitable: str, table_id: str, field_names: Optional[list[str]] = None, filter_conditions: Optional[dict] = None, ) -> pandas.DataFrame: # 先查询多维表格记录,在根据字段解析记录 # 装配多维表格查询记录地址 url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20" response = self.http_client.post( url=url, headers=self._headers(), json={"field_names": field_names, "filter": filter_conditions}, ) # 响应业务码为0则定义为响应成功 assert response.get("code") == 0, "查询多维表格记录发生异常" # 多维表格记录 records = response.get("data").get("items") # 检查响应中是否包含还有下一页标识,若有则继续请求下一页 while response.get("data").get("has_more"): url_next = url + "&page_token={}".format( response.get("data").get("page_token") ) response = self.http_client.post( url=url_next, headers=self._headers(), json={"field_names": field_names, "filter": filter_conditions}, ) assert response.get("code") == 0, "查询多维表格记录发生异常" # 合并记录 records.append(response.get("data").get("items")) # 装配多维表格列出字段地址 url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20" response = self.http_client.get( url=url, headers=self._headers(), ) assert response.get("code") == 0, "列出多维表格字段发生异常" # 多维表格字段 fields = response.get("data").get("items") while response.get("data").get("has_more"): url_next = url + "&page_token={}".format( response.get("data").get("page_token") ) response = self.http_client.get( url=url_next, headers=self._headers(), ) assert response.get("code") == 0, "列出多维表格字段发生异常" fields.append(response.get("data").get("items")) # 字段映射 field_mappings = {} for field in fields: # 字段名 field_name = field["field_name"] # 根据字段类型匹配 match field["type"]: case 1005: field_type = "主键" case 1: field_type = "文本" case 3: field_type = "单选" case 2: # 数字、公式字段的显示格式 match field["property"]["formatter"]: case "0": field_type = "整数" case _: raise ValueError("未设置数字、公式字段的显示格式") case _: raise ValueError("未设置字段类型") # noinspection PyUnboundLocalVariable field_mappings.update({field_name: field_type}) # 记录数据体 records_data = [] # 解析记录 for record in records: # 单条记录数据体 record_data = {} for field_name, content in record["fields"].items(): match field_mappings[field_name]: case "主键" | "单选" | "整数": record_data.update({field_name: content}) case "文本": # 若存在多行文本则拼接 fragments_content = "" for fragment_content in content: fragments_content += fragment_content["text"] record_data.update({field_name: fragments_content}) case _: raise ValueError("未设置字段解析方法") records_data.append(record_data) return pandas.DataFrame(records_data)