# -*- coding: utf-8 -*- # 导入模块 import sqlite3 import threading from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union class SQLite: """ SQLite客户端 """ def __init__(self, database: Union[str, Path]): """ 初始化 :param database: 数据库地址 """ self.database = database # 初始化本地线程存储 self.threads = threading.local() def _connect(self): """为当前线程创建数据库连接和游标""" # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 if hasattr(self.threads, "connection") and self.threads.connection is not None: return # 为当前线程关闭数据库连接和游标 self._disconnect() # noinspection PyBroadException try: # 为当前线程创建数据库连接 self.threads.connection = sqlite3.connect( database=self.database, check_same_thread=True, timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 ) # 开启行映射,支持按照字段名取值 self.threads.connection.row_factory = sqlite3.Row # 为当前线程创建游标 self.threads.cursor = self.threads.connection.cursor() except Exception as exception: self.threads.connection = None self.threads.cursor = None raise RuntimeError( f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" ) from exception def _disconnect(self) -> None: """为当前线程关闭数据库连接和游标""" # 检查当前线程有游标,若有则关闭游标 if hasattr(self.threads, "cursor") and self.threads.cursor is not None: # noinspection PyBroadException try: # 为当前线程关闭游标 self.threads.cursor.close() self.threads.cursor = None except Exception as exception: raise RuntimeError( f"为当前线程关闭游标发生异常,{str(exception)}" ) from exception # 检查当前线程有数据库连接,若有则关闭数据库连接 if hasattr(self.threads, "connection") and self.threads.connection is not None: # noinspection PyBroadException try: # 为当前线程提交事务 self.threads.connection.commit() # 为当前线程关闭数据库连接 self.threads.connection.close() self.threads.connection = None except Exception as exception: raise RuntimeError( f"为当前线程关闭数据库连接发生异常,{str(exception)}" ) from exception def _query_one( self, sql: str, parameters: Tuple[Any, ...] = () ) -> Optional[Dict[str, Any]]: """ 为当前线程查询并获取单行数据 :param sql: 查询SQL语句 :param parameters: SQL参数 :return: 单行数据 """ # noinspection PyBroadException try: # 为当前线程创建数据库连接和游标 self._connect() # 检查当前线程无游标,若无则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("为当前线程创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) return ( None if (result := self.threads.cursor.fetchone()) is None else dict(result) ) # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception def _query_all( self, sql: str, parameters: Tuple[Any, ...] = () ) -> List[Dict[str, Any]]: """ 为当前线程查询并获取多行数据 :param sql: 查询SQL语句 :param parameters: SQL参数 :return: 多行数据 """ # noinspection PyBroadException try: # 为当前线程创建数据库连接和游标 self._connect() # 检查当前线程无游标,若无则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("为当前线程创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) result = [] while batch := self.threads.cursor.fetchmany(1000): result.extend([dict(row) for row in batch]) return result # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: """ 为当前线程执行SQL :param sql: 新增、删除和修改SQL语句 :param parameters: SQL参数 :return: 执行结果 """ try: self._connect() # 检查当前线程无游标,若无则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("为当前线程创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) # 为当前线程提交事务 self.threads.connection.commit() return True # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError("为当前线程执行SQL发生异常") from exception def __enter__(self): """进入上下文管理时为当前线程创建数据库连接和游标""" self._connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """退出上下文管理时为当前线程关闭数据库连接和游标""" self._disconnect() return False def __del__(self): """析构时为当前线程关闭数据库连接和游标""" self._disconnect()