Python/utils/sqlite.py

188 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# 导入模块
import sqlite3
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
class SQLite:
"""
SQLite客户端
"""
def __init__(self, database: Union[str, Path]):
"""
初始化
:param database: 数据库地址
"""
self.database = database
# 初始化本地线程存储
self.threads = threading.local()
def _connect(self):
"""为当前线程创建数据库连接和游标"""
# 检查当前线程有数据库连接,若有则继续否则创建数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
return
# 为当前线程关闭数据库连接和游标
self._disconnect()
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接
self.threads.connection = sqlite3.connect(
database=self.database,
check_same_thread=True,
timeout=30, # 数据库锁超时时间单位默认为30秒避免并发锁死
)
# 开启行映射,支持按照字段名取值
self.threads.connection.row_factory = sqlite3.Row
# 为当前线程创建游标
self.threads.cursor = self.threads.connection.cursor()
except Exception as exception:
self.threads.connection = None
self.threads.cursor = None
raise RuntimeError(
f"为当前线程创建数据库连接和游标发生异常,{str(exception)}"
) from exception
def _disconnect(self) -> None:
"""为当前线程关闭数据库连接和游标"""
# 检查当前线程有游标,若有则关闭游标
if hasattr(self.threads, "cursor") and self.threads.cursor is not None:
# noinspection PyBroadException
try:
# 为当前线程关闭游标
self.threads.cursor.close()
self.threads.cursor = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭游标发生异常,{str(exception)}"
) from exception
# 检查当前线程有数据库连接,若有则关闭数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
# noinspection PyBroadException
try:
# 为当前线程提交事务
self.threads.connection.commit()
# 为当前线程关闭数据库连接
self.threads.connection.close()
self.threads.connection = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭数据库连接发生异常,{str(exception)}"
) from exception
def _query_one(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> Optional[Dict[str, Any]]:
"""
为当前线程查询并获取单行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 单行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
return (
None
if (result := self.threads.cursor.fetchone()) is None
else dict(result)
)
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _query_all(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> List[Dict[str, Any]]:
"""
为当前线程查询并获取多行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 多行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
result = []
while batch := self.threads.cursor.fetchmany(1000):
result.extend([dict(row) for row in batch])
return result
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool:
"""
为当前线程执行SQL
:param sql: 新增、删除和修改SQL语句
:param parameters: SQL参数
:return: 执行结果
"""
try:
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
# 为当前线程提交事务
self.threads.connection.commit()
return True
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程执行SQL发生异常") from exception
def __enter__(self):
"""进入上下文管理时为当前线程创建数据库连接和游标"""
self._connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文管理时为当前线程关闭数据库连接和游标"""
self._disconnect()
return False
def __del__(self):
"""析构时为当前线程关闭数据库连接和游标"""
self._disconnect()