251203更新

This commit is contained in:
liubiren 2025-12-03 12:10:05 +08:00
parent 093444f590
commit 00ff6a5577
1 changed files with 86 additions and 138 deletions

View File

@ -92,12 +92,15 @@ class CacheClient:
def __init__(self, cache_ttl: int = 360, database: str = "Caches.db"): def __init__(self, cache_ttl: int = 360, database: str = "Caches.db"):
""" """
:param cache_ttl: 缓存生存时间单位默认为360 :param cache_ttl: 缓存生存时间单位为
:param database: 缓存数据库名称 :param database: 缓存数据库名称
""" """
try: # 初始化缓存数据库连接
self.connection: Optional[sqlite3.Connection] = None
# 初始化缓存生存时间,单位为天
self.cache_ttl = cache_ttl self.cache_ttl = cache_ttl
try:
# 创建缓存数据库连接 # 创建缓存数据库连接
self.connection = sqlite3.connect( self.connection = sqlite3.connect(
database=database, database=database,
@ -105,149 +108,93 @@ class CacheClient:
timeout=30, # 缓存数据库锁超时时间单位默认为30秒避免并发锁死 timeout=30, # 缓存数据库锁超时时间单位默认为30秒避免并发锁死
) )
# 创建缓存数据库连接使用SQLite # 创建缓存表和索引、清理过期缓存
self.cache_connection = sqlite3.connect( with self.connection:
database="SQLite.db", check_same_thread=False
)
# 创建缓存表
self.connection.execute( self.connection.execute(
"""CREATE TABLE IF NOT EXISTS caches ( """CREATE TABLE IF NOT EXISTS caches (
guid TEXT PRIMARY KEY, guid TEXT PRIMARY KEY,
scene TEXT,
cache TEXT NOT NULL, cache TEXT NOT NULL,
timestamp REAL NOT NULL timestamp REAL NOT NULL
)""" )"""
) )
# 创建时间戳索引(优化过期缓存查询效率)
self.connection.execute( self.connection.execute(
"""CREATE INDEX IF NOT EXISTS index_timestamp ON caches(timestamp)""" """CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)"""
) )
# 删除过期缓存
self.connection.execute( self.connection.execute(
"DELETE FROM caches WHERE timestamp < ?", "DELETE FROM caches WHERE timestamp < ?",
(time.time() - self.cache_ttl * 86400,), (time.time() - self.cache_ttl * 86400,),
) )
except Exception as exception:
self._disconnect()
raise f"初始缓存数据库失败:{str(exception)}" from exception
def _disconnect(self) -> None:
"""关闭缓存数据库连接"""
if self.connection:
# noinspection PyBroadException
try:
self.connection.close()
except Exception:
pass
def __enter__(self) -> "CacheClient":
"""实现上下文管理"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出时关闭连接"""
self._disconnect()
return False
def query(self, guid: str) -> Optional[Dict]:
"""
查询缓存
:param guid: 缓存唯一标识
:return: 缓存
"""
with threading.Lock(): # 线程锁,保证并发安全
with self.connection.cursor() as cursor:
# noinspection PyBroadException
try:
# 根据缓存唯一标识查询有效缓存
cursor.execute(
"SELECT cache FROM caches WHERE guid = ? AND timestamp >= ?",
(guid, time.time() - self.cache_ttl * 86400),
)
if result := cursor.fetchone():
return json.loads(result[0])
return None
except Exception:
self.connection.rollback()
return None
def update(self, guid: str, cache: Dict) -> bool:
"""
更新缓存存在则覆盖不存在则新增
:param guid: 缓存唯一标识
:param cache: 缓存
:return: 成功返回True失败返回False
"""
with threading.Lock(): # 线程锁,保证并发安全
with self.connection.cursor() as cursor:
# noinspection PyBroadException
try:
# 新增或覆盖缓存
cursor.execute(
"INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?)",
(
guid,
json.dumps(cache, ensure_ascii=False),
time.time(),
),
)
# 提交事务 # 提交事务
self.connection.commit() self.connection.commit()
except Exception as exception:
if self.connection:
self.connection.close()
raise f"{str(exception)}" from exception
def _query_response(self, guid: str) -> Optional[Dict]:
"""
私有方法根据guid查询有效缓存记录未过期
:param guid: 记录唯一标识
:return: 未过期的响应数据Dict不存在/过期/异常时返回None
"""
if not self.cache_connection:
logger.error("查询失败:缓存数据库未连接")
return None
with threading.Lock(): # 线程锁,保证并发安全
cursor = None
try:
cursor = self.cache_connection.cursor()
# 查询条件guid匹配 + 未过期
expire_time = time.time() - self.cache_ttl * 86400
cursor.execute(
"SELECT response FROM caches WHERE guid = ? AND timestamp >= ?",
(guid, expire_time),
)
result = cursor.fetchone() # 获取单条记录guid唯一
if result:
logger.info(f"查询缓存成功guid={guid}")
return json.loads(result[0]) # JSON字符串转Dict
logger.info(f"未查询到有效缓存guid={guid}(不存在或已过期)")
return None
except json.JSONDecodeError as e:
logger.error(
f"缓存数据解析失败JSON格式错误guid={guid}", exc_info=True
)
return None
except Exception as e:
logger.error(f"查询缓存异常guid={guid}", exc_info=True)
self.cache_connection.rollback() # 异常回滚事务
return None
finally:
if cursor:
cursor.close() # 确保游标关闭,释放资源
def _save_response(self, guid: str, response: Dict) -> bool:
"""
私有方法添加/更新缓存记录存在则覆盖不存在则新增
:param guid: 记录唯一标识
:param response: 待保存的响应数据Dict
:return: 保存成功返回True失败返回False
"""
if not self.cache_connection:
logger.error("保存失败:缓存数据库未连接")
return False
with threading.Lock(): # 线程锁,保证并发安全
cursor = None
try:
cursor = self.cache_connection.cursor()
# 转换Dict为JSON字符串ensure_ascii=False支持中文
response_str = json.dumps(response, ensure_ascii=False, indent=2)
# INSERT OR REPLACE存在则更新不存在则插入
cursor.execute(
"INSERT OR REPLACE INTO caches (guid, response, timestamp) VALUES (?, ?, ?)",
(guid, response_str, time.time()), # timestamp存储当前时间戳
)
self.cache_connection.commit() # 提交事务
logger.info(f"保存缓存成功guid={guid}")
return True return True
except json.JSONEncoderError as e: except Exception:
logger.error( self.connection.rollback() # 异常回滚事务
f"缓存数据序列化失败Dict转JSON错误guid={guid}", exc_info=True
)
self.cache_connection.rollback()
return False return False
except Exception as e:
logger.error(f"保存缓存异常guid={guid}", exc_info=True)
self.cache_connection.rollback() # 异常回滚事务
return False
finally:
if cursor:
cursor.close() # 确保游标关闭
def query_or_save_response(
self, guid: str, response: Optional[Dict] = None
) -> Optional[Dict]:
"""
二合一公开方法支持查询记录 / 添加/更新记录灵活复用
:param guid: 记录唯一标识必填
:param response: 待保存的响应数据可选
- 不传仅查询有效记录返回Dict/None
- 传入添加/更新记录返回保存后的有效记录/Dict
:return: 查询到的记录 / 保存后的记录 / 失败时返回None
"""
# 参数校验guid不能为空
if not guid or not isinstance(guid, str):
logger.error("guid无效必须是非空字符串")
return None
# 仅查询模式未传入response
if response is None:
return self._query_response(guid)
# 添加/更新模式传入response先保存再查询返回最新记录
if self._save_response(guid, response):
return self._query_response(guid)
logger.error(f"保存缓存失败无法返回记录guid={guid}")
return None
def close(self):
"""关闭数据库连接(程序退出时调用)"""
if self.cache_connection:
self.cache_connection.close()
self.cache_connection = None
""" """
@ -859,15 +806,16 @@ class Authenticator:
if time.time() > expired_timestamp: if time.time() > expired_timestamp:
# noinspection PyUnreachableCode # noinspection PyUnreachableCode
match servicer: match servicer:
case "szkt":
# 获取深圳快瞳访问凭证 # 获取深圳快瞳访问凭证
case "szkt":
token, expired_timestamp = self._szkt_get_certification() token, expired_timestamp = self._szkt_get_certification()
case "feishu": case "feishu":
token, expired_timestamp = self._feishu_get_certification() token, expired_timestamp = self._feishu_get_certification()
# 获取合力亿捷访问凭证
case "hlyj": case "hlyj":
token, expired_timestamp = self._hlyj_get_certification() token, expired_timestamp = self._hlyj_get_certification()
case _: case _:
raise RuntimeError(f"服务商({servicer})未设置获取访问令牌方法") raise RuntimeError(f"未设置服务商({servicer})")
# 更新服务商访问凭证 # 更新服务商访问凭证
certifications[servicer] = { certifications[servicer] = {