Python/utils/sqlite.py

188 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
SQLite客户端
"""
from pathlib import Path
import sqlite3
import threading
from typing import Any, Dict, List, Optional, Tuple
class SQLite:
"""
SQLite客户端支持
query_one根据SQL语句执行查询并返回一行查询数据
query_all根据SQL语句执行查询并返回所有查询数据
execute根据SQL语句执行操作
"""
def __init__(self, database: Path):
"""
初始化
:param database: 数据库地址
"""
self.database = database
# 初始化本地线程存储
self.threads = threading.local()
def query_one(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> Optional[Dict[str, Any]]:
"""
在当前线程根据SQL语句执行查询并返回一行查询数据
:param sql: SQL语句
:param parameters: SQL参数
:return: 查询数据
"""
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程游标,若游标为空值则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
return (
None
if (result := self.threads.cursor.fetchone()) is None
else dict(result)
)
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError(f"执行查询发生异常:{str(exception)}") from exception
def query_all(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> List[Dict[str, Any]]:
"""
在当前线程根据SQL语句执行查询并返回所有查询数据
:param sql: SQL语句
:param parameters: SQL参数
:return: 查询数据
"""
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程游标,若游标为空值则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
result = []
# 分批处理
while batch := self.threads.cursor.fetchmany(1000):
result.extend([dict(row) for row in batch])
return result
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError(f"执行查询发生异常:{str(exception)}") from exception
def execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool:
"""
在当前线程根据SQL语句执行操作
:param sql: 新增、删除和修改SQL语句
:param parameters: SQL参数
:return: 执行结果
"""
try:
self._connect()
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
# 为当前线程提交事务
self.threads.connection.commit()
return True
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError(f"执行SQL发生异常{str(exception)}") from exception
def _connect(self):
"""为当前线程创建数据库连接和游标"""
# 检查当前线程有数据库连接,若有则继续否则创建数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
return
# 为当前线程关闭数据库连接和游标
self._disconnect()
try:
# 为当前线程创建数据库连接
self.threads.connection = sqlite3.connect(
database=self.database.as_posix(),
check_same_thread=True,
timeout=30, # 数据库锁超时时间单位默认为30秒避免并发锁死
)
# 开启行映射,支持按照字段名取值
self.threads.connection.row_factory = sqlite3.Row
# 为当前线程创建游标
self.threads.cursor = self.threads.connection.cursor()
except Exception as exception:
self.threads.connection = None
self.threads.cursor = None
raise RuntimeError(
f"为当前线程创建数据库连接和游标发生异常,{str(exception)}"
) from exception
def _disconnect(self) -> None:
"""为当前线程关闭数据库连接和游标"""
# 检查当前线程有游标,若有则关闭游标
if hasattr(self.threads, "cursor") and self.threads.cursor is not None:
try:
# 为当前线程关闭游标
self.threads.cursor.close()
self.threads.cursor = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭游标发生异常,{str(exception)}"
) from exception
# 检查当前线程有数据库连接,若有则关闭数据库连接
if hasattr(self.threads, "connection") and self.threads.connection is not None:
try:
# 为当前线程提交事务
self.threads.connection.commit()
# 为当前线程关闭数据库连接
self.threads.connection.close()
self.threads.connection = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭数据库连接发生异常,{str(exception)}"
) from exception
def __enter__(self):
"""进入上下文管理时为当前线程创建数据库连接和游标"""
self._connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文管理时为当前线程关闭数据库连接和游标"""
self._disconnect()
return False
def __del__(self):
"""析构时为当前线程关闭数据库连接和游标"""
self._disconnect()