# -*- coding: utf-8 -*- """ SQLite客户端 """ from pathlib import Path import sqlite3 import threading from typing import Any, Dict, List, Optional, Tuple class SQLite: """ SQLite客户端,支持: query_one:根据SQL语句执行查询并返回一行查询数据 query_all:根据SQL语句执行查询并返回所有查询数据 execute:根据SQL语句执行操作 """ def __init__(self, database: Path): """ 初始化 :param database: 数据库地址 """ self.database = database # 初始化本地线程存储 self.threads = threading.local() def query_one( self, sql: str, parameters: Tuple[Any, ...] = () ) -> Optional[Dict[str, Any]]: """ 在当前线程,根据SQL语句执行查询并返回一行查询数据 :param sql: SQL语句 :param parameters: SQL参数 :return: 查询数据 """ try: # 为当前线程创建数据库连接和游标 self._connect() # 检查当前线程游标,若游标为空值则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) return ( None if (result := self.threads.cursor.fetchone()) is None else dict(result) ) # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError(f"执行查询发生异常:{str(exception)}") from exception def query_all( self, sql: str, parameters: Tuple[Any, ...] = () ) -> List[Dict[str, Any]]: """ 在当前线程,根据SQL语句执行查询并返回所有查询数据 :param sql: SQL语句 :param parameters: SQL参数 :return: 查询数据 """ try: # 为当前线程创建数据库连接和游标 self._connect() # 检查当前线程游标,若游标为空值则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) result = [] # 分批处理 while batch := self.threads.cursor.fetchmany(1000): result.extend([dict(row) for row in batch]) return result # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError(f"执行查询发生异常:{str(exception)}") from exception def execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: """ 在当前线程,根据SQL语句执行操作 :param sql: 新增、删除和修改SQL语句 :param parameters: SQL参数 :return: 执行结果 """ try: self._connect() # 检查当前线程无游标,若无则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: raise RuntimeError("为当前线程创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) # 为当前线程提交事务 self.threads.connection.commit() return True # 若发生异常则回滚事务并抛出异常 except Exception as exception: # 检查当前线程有数据库连接,若有则回滚 if ( hasattr(self.threads, "connection") and self.threads.connection is not None ): self.threads.connection.rollback() raise RuntimeError(f"执行SQL发生异常:{str(exception)}") from exception def _connect(self): """为当前线程创建数据库连接和游标""" # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 if hasattr(self.threads, "connection") and self.threads.connection is not None: return # 为当前线程关闭数据库连接和游标 self._disconnect() try: # 为当前线程创建数据库连接 self.threads.connection = sqlite3.connect( database=self.database.as_posix(), check_same_thread=True, timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 ) # 开启行映射,支持按照字段名取值 self.threads.connection.row_factory = sqlite3.Row # 为当前线程创建游标 self.threads.cursor = self.threads.connection.cursor() except Exception as exception: self.threads.connection = None self.threads.cursor = None raise RuntimeError( f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" ) from exception def _disconnect(self) -> None: """为当前线程关闭数据库连接和游标""" # 检查当前线程有游标,若有则关闭游标 if hasattr(self.threads, "cursor") and self.threads.cursor is not None: try: # 为当前线程关闭游标 self.threads.cursor.close() self.threads.cursor = None except Exception as exception: raise RuntimeError( f"为当前线程关闭游标发生异常,{str(exception)}" ) from exception # 检查当前线程有数据库连接,若有则关闭数据库连接 if hasattr(self.threads, "connection") and self.threads.connection is not None: try: # 为当前线程提交事务 self.threads.connection.commit() # 为当前线程关闭数据库连接 self.threads.connection.close() self.threads.connection = None except Exception as exception: raise RuntimeError( f"为当前线程关闭数据库连接发生异常,{str(exception)}" ) from exception def __enter__(self): """进入上下文管理时为当前线程创建数据库连接和游标""" self._connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """退出上下文管理时为当前线程关闭数据库连接和游标""" self._disconnect() return False def __del__(self): """析构时为当前线程关闭数据库连接和游标""" self._disconnect()