# -*- coding: utf-8 -*- """ MySQL客户端模块 """ from urllib.parse import quote_plus import pandas from sqlalchemy import create_engine, text class MySQL: """ MySQL客户端 """ def __init__( self, database: str, host: str = "cdb-7z9lzx4y.cd.tencentcdb.com", # 默认为刘弼仁的腾讯云MySQL数据库 port: int = 10039, username: str = "root", password: str = "Te198752", ) -> None: """ 初始化 :param database: 数据库名称 :param host: 主机 :param port: 端口 :param username: 用户名 :param password: 登录密码 """ # 就登录密码编码 password = quote_plus(password) # 构建数据库连接 connection_url = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8" # 创建MySQL引擎并连接数据库 self.engine = create_engine( connection_url, pool_size=5, max_overflow=10, pool_recycle=3600, pool_pre_ping=True, ) # 连接池中保持打开连接的数量为5,额外连接数为10,连接1小时后重新回收重连,检查连接有效性 def execute_query(self, sql: str) -> pandas.DataFrame: """执行SQL查询并返回DATAFRAME""" if not hasattr(self, "engine") or not self.engine: raise ConnectionError("未创建数据库连接") try: with self.engine.connect() as connection: dataframe = pandas.read_sql_query( text(sql), connection, coerce_float=False ) # 不尝试将非整数数值转为浮点(维持DECIMAL) return dataframe except: connection.rollback() raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常") def __del__(self): """析构时自动关闭连接""" if hasattr(self, "engine") and self.engine: self.engine.dispose()