# -*- coding: utf-8 -*- # 导入模块 import hashlib import hmac import json import re import sqlite3 import threading import time from email.parser import BytesParser from email.policy import default from email.utils import parsedate_to_datetime from functools import wraps from imaplib import IMAP4_SSL from pathlib import Path from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union from urllib.parse import quote_plus from xml.etree import ElementTree import pandas from pydantic import BaseModel, Field, HttpUrl, model_validator from requests import Response, Session from requests.adapters import HTTPAdapter from sqlalchemy import create_engine, text from urllib3.util.retry import Retry """ 封装sqlalchemy,实现按照SQL查询 类和函数的区别 类作为对象的模板,定义了对象的结构和行为;而函数则作为实现特定功能或操作的代码块,提高了代码的可读性和可维护性。 使用方法: with MySQLClient(database='data_analysis') as client: dataframe = client.execute_query(sql='select * from {{}}')') """ class MySQLClient: """MySQL客户端""" def __init__( self, database, host: str = "cdb-7z9lzx4y.cd.tencentcdb.com", port: int = 10039, username: str = "root", password: str = "Te198752", ): # 默认为刘弼仁的腾讯云MySQL数据库 # 数据库登录密码安全编码 password = quote_plus(password) # 构建数据库连接字符 connect_config = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8" # 创建MySQL引擎并连接数据库 self.engine = create_engine( connect_config, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True, ) # 连接池中保持打开连接的数量为5,额外连接数为10,连接1小时后重新回收重连,检查连接有效性 def __del__(self): """析构时自动关闭连接""" if hasattr(self, "engine") and self.engine: self.engine.dispose() def execute_query(self, sql: str) -> pandas.DataFrame: """执行SQL查询并返回DATAFRAME""" if not hasattr(self, "engine") or not self.engine: raise ConnectionError("未创建数据库连接") try: with self.engine.connect() as connection: dataframe = pandas.read_sql_query( text(sql), connection, coerce_float=False ) # 不尝试将非整数数值转为浮点(维持DECIMAL) return dataframe except: connection.rollback() raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常") """ 封装urllib.request的相关操作,实现常用HTTPREQUEST 使用方法: client = HTTPClient() response = clinet.post(url) """ class TokenBucket: def __init__(self, refill_rate, max_tokens): """令牌桶,基于令牌桶算法限制请求频率""" # 填充令牌速率(个/秒) self.refill_rate = refill_rate # 令牌桶最大令牌数 self.max_tokens = max_tokens # 令牌桶当前令牌数 self.tokens = max_tokens # 上一次填充令牌时间戳(使用单调递增时间,单位为秒) self.refill_timestamp = time.monotonic() # 获取令牌 # noinspection PyMissingReturnStatement def acquire(self) -> tuple[bool, float]: with threading.Lock(): # 本次填充令牌时间戳 refill_timestamp = time.monotonic() # 重新计算令牌桶中令牌数 self.tokens = min( self.max_tokens, self.tokens + self.refill_rate * (refill_timestamp - self.refill_timestamp), ) self.refill_timestamp = refill_timestamp # 若令牌桶当前令牌数大于等于1则减少令牌 if self.tokens >= 1: self.tokens -= 1 return True, 0.0 # 同时返回等待时间 return False, 0.2 # 将令牌桶以装饰函数封装为请求频率限制方法 def restrict(refill_rate=5, max_tokens=5): def decorator(func): # 初始化令牌桶 token_bucket = TokenBucket(refill_rate=refill_rate, max_tokens=max_tokens) @wraps(func) def wrapper(*args, **kwargs): # 重试次数 retries = 0 # 若重试数小于等于最大重试次数,则循环检查是否允许请求 while retries <= 10: success, wait_time = token_bucket.acquire() # 若允许请求则返回嵌套函数,若不允许请求则等待 if success: return func(*args, **kwargs) time.sleep(wait_time * 1.5**retries) retries += 1 raise Exception("request too frequently") return wrapper return decorator class RequestException(Exception): """请求异常""" def __init__( self, status: int = 400, code: int = 0, message: str = "request failed" ): """ :param status: 状态编码,默认为0 :param message: 错误信息,默认为RequestException """ self.status = status self.code = code self.message = message super().__init__(self.message) def __str__(self): return f"请求发生异常({self.status}, {self.message})" # 请求参数数据模型 class Arguments(BaseModel): """ :param url: 统一资源定位符,基于统一资源定位符校验器进行校验 :param params: 查询参数 :param headers: 请求头 :param data: 表单数据 :param json_data: JSON # 入参时使用别名,出参时根据BY_ALIAS=TRUE确定是否使用别名 :param files: 上传文件 :param stream: 是否启用流式传输 :param guid: 全局唯一标识 """ # 统一资源定位符 url: HttpUrl = Field(default=...) # 查询参数 params: Optional[Dict] = Field(default=None) # 请求头 headers: Optional[Dict] = Field(default=None) # 表单数据 data: Optional[Dict] = Field(default=None) # JSON json_data: Optional[Dict] = Field(default=None, alias="json") # 上传文件 files: Optional[ Dict[ str, Union[ Tuple[str, bytes], Tuple[str, bytes, str], Tuple[str, bytes, str, dict] ], ] ] = Field(default=None) # 是否启用流式传输 stream: Optional[bool] = Field(default=None) # 全局唯一标识 guid: Optional[str] = Field(default=None) # 表单数据和JSON数据互斥 @model_validator(mode="after") def validate_data(self): if self.data and self.json_data: raise ValueError("cannot use both data and json parameters simultaneously") return self # 上传文件和启用流式传输互斥 @model_validator(mode="after") def validate_files(self): if self.files and self.stream: raise ValueError( "cannot use both files and stream parameters simultaneously" ) return self # HTTP客户端 class HTTPClient: def __init__( self, timeout: int = 60, default_headers: Optional[Dict[str, str]] = None, total: int = 3, backoff_factor: float = 0.5, cache_enabled: bool = False, cache_ttl: int = 90, ): """ :param timeout: 超时时间,单位为秒 :param default_headers: 默认请求头 :param total: 最大重试次数 :param backoff_factor: 重试间隔退避因子 :param cache_enabled: 是否使用缓存 :param cache_ttl: 缓存生存时间,单位为天 """ # 超时时间 self.timeout = timeout # 创建HTTP会话并挂载适配器 self.session = self._create_session( default_headers=default_headers, total=total, backoff_factor=backoff_factor ) # 是否使用缓存 self.cache_enabled = cache_enabled # 缓存生存时间 self.cache_ttl = cache_ttl # 若使用缓存,则初始化缓存数据库 if self.cache_enabled: self._initialize_cache_database() # 创建HTTP会话并挂载适配器 @staticmethod def _create_session( total: int, backoff_factor: float, default_headers: Optional[Dict[str, str]] = None, ) -> Session: """ :param default_headers 默认请求头 :param total 最大重试次数 :param backoff_factor 重试间隔退避因子 """ # 创建会话对象 session = Session() # 设置请求头 if default_headers: session.headers.update(default_headers) # 设置重试策略(优先按照服响应等待时长,若未返回则默认按照退避算法等待) strategy_retries = Retry( allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "PATCH"], status_forcelist=[ 408, 502, 503, 504, ], # 408:请求超时,502:网关错误,503:服务不可用,504:网关超时 total=total, respect_retry_after_header=True, backoff_factor=backoff_factor, ) # 创建适配器并绑定重试策略 adapter = HTTPAdapter(max_retries=strategy_retries) # 就HTTP请求生效 session.mount("http://", adapter) # 就HTTPS请求生效 session.mount("https://", adapter) return session def _initialize_cache_database(self): """初始化缓存数据库""" # 创建缓存数据库连接(使用SQLite) self.cache_connection = sqlite3.connect( database="SQLite.db", check_same_thread=False ) self.cache_connection.execute( """CREATE TABLE IF NOT EXISTS caches (guid TEXT PRIMARY KEY, response TEXT, timestamp REAL)""" ) # 创建时间戳索引 self.cache_connection.execute( """CREATE INDEX IF NOT EXISTS index_timestamp ON caches(timestamp)""" ) # 删除过期缓存 self.cache_connection.execute( "DELETE FROM caches WHERE timestamp < ?", (time.time() - self.cache_ttl * 86400,), # 缓存生存时间单位转为秒 ) # 提交事物 self.cache_connection.commit() # 在缓存数据库查询响应 def _query_response(self, guid: str) -> Optional[Dict]: with threading.Lock(): cursor = None try: # 创建游标 cursor = self.cache_connection.cursor() # 根据请求唯一标识查询响应 cursor.execute( "SELECT response FROM caches WHERE guid = ? AND timestamp >= ?", (guid, time.time() - self.cache_ttl * 86400), ) if result := cursor.fetchone(): return json.loads(result[0]) return None # 若发生异常则返回NONE except: self.cache_connection.rollback() return None finally: if cursor: cursor.close() # 将响应保存至缓存数据库 def _save_response(self, guid: str, response: Dict): with threading.Lock(): cursor = None try: # 创建游标 cursor = self.cache_connection.cursor() # 新增或覆盖响应 cursor.execute( "INSERT OR REPLACE INTO caches (guid, response, timestamp) VALUES (?, ?, ?)", (guid, json.dumps(response, ensure_ascii=False), time.time()), ) # 提交事物 self.cache_connection.commit() # 若发生异常则返回NONE except: self.cache_connection.rollback() finally: if cursor: cursor.close() # GET请求 def get(self, **kwargs) -> Union[Dict, str]: return self._request(method="GET", arguments=Arguments(**kwargs)) # POST请求 def post(self, **kwargs) -> Union[Dict, str]: return self._request(method="POST", arguments=Arguments(**kwargs)) # 文件下载 def download( self, stream=False, chunk_size=1024, **kwargs ) -> Union[Dict, str, Generator[bytes, None, None]]: response = self._request( method="GET", arguments=Arguments(**{"stream": stream, **kwargs}) ) # 若禁用流式传输,则返回响应 if not stream: return response # 若启用流式传输,则处理流式传输响应并返回 return self._process_stream_response(response=response, chunk_size=chunk_size) def _request(self, method: Literal["GET", "POST"], arguments: Arguments) -> Any: """发送请求""" # 请求参数模型 arguments = arguments.model_dump(exclude_none=True, by_alias=True) # URL由HTTPURL对象转为字符串 arguments["url"] = str(arguments["url"]) # 重构表单数据 if arguments.get("data") is not None: arguments["data"] = { key: value for key, value in arguments["data"].items() if value is not None } # 重构JSON格式数据 if arguments.get("json_data") is not None: arguments["json_data"] = { key: value for key, value in arguments["json_data"].items() if value is not None } # 重构文件数据 if arguments.get("files") is not None: files_valid = {} # 遍历文件数据键值对 for key, value in arguments["files"].items(): if isinstance(value, (tuple, list)): match len(value): # 若文件数据包括文件名称和文件内容 case 2: files_valid[key] = (value[0], value[1], None, None) # 若文件数据包含文件名称、文件内容和内容类型 case 3: files_valid[key] = (value[0], value[1], value[2], None) # 若文件数据包含文件名称、文件内容、内容类型和请求头 case 4: files_valid[key] = (value[0], value[1], value[2], value[3]) arguments.update({"files": files_valid}) # 全局唯一标识 guid = arguments.pop("guid", None) # 若使用缓存且本次请求参数包含全局唯一标识,则优先返回缓存数据库中响应 if self.cache_enabled and guid is not None: # 在缓存数据库查询响应 response = self._query_response(guid=guid) # 若缓存响应非空则返回 if response is not None: return response try: # 发送请求 response = self.session.request( method=method, timeout=self.timeout, **arguments ) # 若返回错误状态码则抛出异常 response.raise_for_status() # 处理响应 response = self._process_response(response=response) # 若请求全局唯一标识非NONE则响应保存至缓存数据库 # noinspection PyUnboundLocalVariable if guid is not None: # noinspection PyUnboundLocalVariable self._save_response(guid=guid, response=response) return response except Exception as exception: # 尝试根据响应解析响应状态码和错误信息,否则进行构造 try: # JOSN反序列化 # noinspection PyUnboundLocalVariable response_decoded = response.json() # 响应状态码 status = response_decoded["status"] # 错误信息 message = response_decoded["message"] except: status = getattr(getattr(exception, "response", None), "status", None) url = arguments["url"] message = str(exception).split("\n")[0] # 重新构建错误信息 message = f"{method} {url} failed: {message}" raise RequestException(status=status, message=message) # 处理响应 @staticmethod def _process_response(response: Response) -> Any: # 响应内容 content = response.content # 若响应内容为空则返回NONE if not content: return None # 标准化内容类型 content_type = ( response.headers.get("Content-Type", "").split(";")[0].strip().lower() ) # 根据内容类型匹配解析返回内容方法 # noinspection PyUnreachableCode match content_type: case "application/json" | "text/json": # JSON反序列化 return response.json() case "application/xml" | "text/xml": # 解析为XML(ELEMENT对象) return ElementTree.fromstring(text=content) case _: # 若内容类型以IMAGE/开头则返回图片格式和图片数据 if content_type.startswith("image/"): # 图片格式 image_format = content_type.split(sep="/", maxsplit=1)[1] return f"{image_format}", content else: return content # 处理流式传输响应 @staticmethod def _process_stream_response( response: Response, chunk_size: int ) -> Generator[bytes, None, None]: # 生成器不接受发SEND发送至、结束时返回NONE # 检查数据分块 if not isinstance(chunk_size, int) and isinstance(chunk_size, bool): raise ValueError("chunk_size must type=int") if chunk_size <= 0: raise ValueError("chunk_size must >0") try: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: yield chunk finally: response.close() class Authenticator: def __init__( self, ): """认证器(用于获取访问令牌)""" # 初始化 self._initialize() def _initialize(self): """初始化访问凭证""" # 创建访问凭证地址对象 self.certifications_path = ( Path(__file__).parent.resolve() / "certifications.json" ) # 若访问凭证地址对象不存在则创建 if not self.certifications_path.exists(): with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( {}, file, ensure_ascii=False, ) # 初始化HTTP客户端 self.http_client = HTTPClient() def _szkt_get_certification(self) -> tuple[str, float]: """获取深圳快瞳访问凭证""" # 请求深圳快瞳访问凭证获取接口 response = self.http_client.get( url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" ) # 若响应非成功则抛出异常 if not (response["status"] == 200 and response["code"] == 0): raise RuntimeError("获取深圳快瞳访问凭证发生异常") # 返回令牌、失效时间戳 # noinspection PyTypeChecker return ( response["data"]["access_token"], time.time() + response["data"]["expires_in"], ) def _hlyj_get_certification(self) -> tuple[str, float]: """获取合力亿捷访问凭证""" # 企业访问标识 access_key_id = "25938f1c190448829dbdb5d344231e42" # 签名秘钥 secret_access_key = "44dc0299aff84d68ae27712f8784f173" # 时间戳(秒级) timestamp = int(time.time()) # 签名,企业访问标识、签名秘钥和时间戳拼接后计算的十六进制的HMAC-SHA256 signature = hmac.new( secret_access_key.encode("utf-8"), f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"), hashlib.sha256, ).hexdigest() # 请求合力亿捷访问凭证获取接口 response = self.http_client.get( url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}" ) # 若响应非成功则抛出异常 if not response["success"]: raise RuntimeError("获取合力亿捷访问凭证发生异常") # 返回令牌、失效时间戳 # noinspection PyTypeChecker return ( response["data"], time.time() + 3600, # 访问令牌有效期为1小时 ) def _feishu_get_certification(self) -> tuple[str, float]: """获取飞书访问凭证""" # 请求飞书访问凭证获取接口 response = self.http_client.post( url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", data={ "app_id": "cli_a1587980be78500c", "app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA", }, ) # 若响应非成功则抛出异常 if not response["code"] == 0: raise RuntimeError("获取飞书访问凭证发生异常") # 返回令牌、失效时间戳 # noinspection PyTypeChecker return ( response["tenant_access_token"], time.time() + response["expire"], ) def get_token(self, servicer: str) -> str | None: """获取访问令牌""" """ :param servicer: 服务商,数据类型为字符串 """ with threading.Lock(): # 初始化令牌和失效时间戳 token, expired_timestamp = None, 0 try: with open(self.certifications_path, "r", encoding="utf-8") as file: # 读取所有服务商访问凭证 certifications = json.load(file) # 获取服务商访问凭证 certification = certifications.get(servicer, None) # 若服务商访问凭证非NONE则解析令牌和失效时间戳 if certification is not None: # 解析服务商访问令牌 token = certification["token"] # 解析服务商访问令牌失效时间戳 expired_timestamp = certification["expired_timestamp"] # 若JSON反序列化时发生异常则重置访问凭证 except json.decoder.JSONDecodeError: with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( {}, file, ensure_ascii=False, ) except Exception: raise RuntimeError("获取访问令牌发生异常") # 若当前时间戳大于失效时间戳,则请求服务商获取访问凭证接口 if time.time() > expired_timestamp: # noinspection PyUnreachableCode match servicer: case "szkt": # 获取深圳快瞳访问凭证 token, expired_timestamp = self._szkt_get_certification() case "feishu": token, expired_timestamp = self._feishu_get_certification() case "hlyj": token, expired_timestamp = self._hlyj_get_certification() case _: raise RuntimeError(f"服务商({servicer})未设置获取访问令牌方法") # 更新服务商访问凭证 certifications[servicer] = { "token": token, "expired_timestamp": expired_timestamp, } # 将所有服务商访问凭证保存至本地文件 with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( certifications, file, ensure_ascii=False, ) return token """ 封装飞书客户端,实现获取验证码、操作多维表格等 """ class FeishuClinet: def __init__(self): self.authenticator = Authenticator() self.http_client = HTTPClient() def _headers(self): """请求头""" # 装配飞书访问凭证 return { "Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}", } @staticmethod def get_verification_code(): try: # 执行时间戳 execute_timestamp = time.time() # 超时时间戳 timeout_timestamp = execute_timestamp + 65 # 建立加密IMAP连接 server = IMAP4_SSL("imap.feishu.cn", 993) # 登录 server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2") while True: # 若当前时间戳大于超时时间戳则返回NONE if time.time() <= timeout_timestamp: # 等待10秒 time.sleep(10) # 选择文件夹(邮箱验证码) server.select("&kK57sZqMi8F4AQ-") try: # 获取最后一封邮件索引,server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引 index = server.search(None, "ALL")[1][0].split()[-1] # 获取最后一封邮件内容并解析,server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文 # noinspection PyUnresolvedReferences contents = BytesParser(policy=default).parsebytes( server.fetch(index, "(RFC822)")[1][0][1] ) # 遍历邮件内容,若正文内容类型为纯文本或HTML则解析发送时间和验证码 for content in contents.walk(): if ( content.get_content_type() == "text/plain" or content.get_content_type() == "text/html" ): # 邮件发送时间戳 # noinspection PyUnresolvedReferences send_timestamp = parsedate_to_datetime( content["Date"] ).timestamp() # 若邮件发送时间戳大于执行时间戳则解析验证码并返回 if ( execute_timestamp > send_timestamp >= execute_timestamp - 35 ): # 登出 server.logout() # 解析验证码 return re.search( r"【普康健康】您的验证码是:(\d+)", content.get_payload(decode=True).decode(), ).group(1) # 若文件夹无邮件则继续 except: pass # 若超时则登出 else: server.logout() return None except Exception: raise RuntimeError("获取邮箱验证码发生其它异常") # 查询多维表格记录,单次最多查询500条记录 @restrict(refill_rate=5, max_tokens=5) def query_bitable_records( self, bitable: str, table_id: str, field_names: Optional[list[str]] = None, filter_conditions: Optional[dict] = None, ) -> pandas.DataFrame: # 先查询多维表格记录,在根据字段解析记录 # 装配多维表格查询记录地址 url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20" response = self.http_client.post( url=url, headers=self._headers(), json={"field_names": field_names, "filter": filter_conditions}, ) # 响应业务码为0则定义为响应成功 assert response.get("code") == 0, "查询多维表格记录发生异常" # 多维表格记录 records = response.get("data").get("items") # 检查响应中是否包含还有下一页标识,若有则继续请求下一页 while response.get("data").get("has_more"): url_next = url + "&page_token={}".format( response.get("data").get("page_token") ) response = self.http_client.post( url=url_next, headers=self._headers(), json={"field_names": field_names, "filter": filter_conditions}, ) assert response.get("code") == 0, "查询多维表格记录发生异常" # 合并记录 records.append(response.get("data").get("items")) # 装配多维表格列出字段地址 url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/fields?page_size=20" response = self.http_client.get( url=url, headers=self._headers(), ) assert response.get("code") == 0, "列出多维表格字段发生异常" # 多维表格字段 fields = response.get("data").get("items") while response.get("data").get("has_more"): url_next = url + "&page_token={}".format( response.get("data").get("page_token") ) response = self.http_client.get( url=url_next, headers=self._headers(), ) assert response.get("code") == 0, "列出多维表格字段发生异常" fields.append(response.get("data").get("items")) # 字段映射 field_mappings = {} for field in fields: # 字段名 field_name = field["field_name"] # 根据字段类型匹配 match field["type"]: case 1005: field_type = "主键" case 1: field_type = "文本" case 3: field_type = "单选" case 2: # 数字、公式字段的显示格式 match field["property"]["formatter"]: case "0": field_type = "整数" case _: raise ValueError("未设置数字、公式字段的显示格式") case _: raise ValueError("未设置字段类型") # noinspection PyUnboundLocalVariable field_mappings.update({field_name: field_type}) # 记录数据体 records_data = [] # 解析记录 for record in records: # 单条记录数据体 record_data = {} for field_name, content in record["fields"].items(): match field_mappings[field_name]: case "主键" | "单选" | "整数": record_data.update({field_name: content}) case "文本": # 若存在多行文本则拼接 fragments_content = "" for fragment_content in content: fragments_content += fragment_content["text"] record_data.update({field_name: fragments_content}) case _: raise ValueError("未设置字段解析方法") records_data.append(record_data) return pandas.DataFrame(records_data)