diff --git a/票据理赔自动化/main.py b/票据理赔自动化/main.py index 5b28404..f8d6b29 100644 --- a/票据理赔自动化/main.py +++ b/票据理赔自动化/main.py @@ -8,6 +8,7 @@ https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink import json import re +import sqlite3 import uuid from base64 import b64encode from datetime import datetime @@ -29,6 +30,135 @@ from utils.client import Authenticator, HTTPClient # from utils.ocr import fuzzy_match +class SQLiteClient: + """SQLite客户端""" + + def __init__(self, database: str = "SQLite.db"): + """ + :param database: 数据库名称 + """ + # 初始化数据库连接 + self.connection: Optional[sqlite3.Connection] = None + + try: + # 创建数据库连接 + self.connection = sqlite3.connect( + database=( + Path(__file__).parent.resolve() / database + ), # 当前目录下创建缓存数据库 + check_same_thread=False, + timeout=30, # 缓存数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 + ) + + # 创建缓存表和索引、清理过期缓存 + with self.connection: + self.connection.execute( + """CREATE TABLE IF NOT EXISTS caches ( + guid TEXT PRIMARY KEY, + cache TEXT NOT NULL, + timestamp REAL NOT NULL + )""" + ) + self.connection.execute( + """CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)""" + ) + self.connection.execute( + "DELETE FROM caches WHERE timestamp < ?", + (time.time() - self.cache_ttl * 86400,), + ) + + except Exception as exception: + self._disconnect() + raise f"初始缓存数据库失败:{str(exception)}" from exception + + def _disconnect(self) -> None: + """关闭缓存数据库连接""" + if self.connection: + # noinspection PyBroadException + try: + self.connection.close() + except Exception: + pass + + def __enter__(self) -> "CacheClient": + """实现上下文管理""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """退出时关闭连接""" + self._disconnect() + return False + + def query(self, guid: str) -> Optional[Dict]: + """ + 查询缓存 + :param guid: 缓存唯一标识 + :return: 缓存 + """ + with threading.Lock(): # 线程锁,保证并发安全 + # noinspection PyBroadException + try: + # 创建游标 + cursor = self.connection.cursor() + # 根据缓存唯一标识查询有效缓存 + cursor.execute( + "SELECT cache FROM caches WHERE guid = ? AND timestamp >= ?", + (guid, time.time() - self.cache_ttl * 86400), + ) + if result := cursor.fetchone(): + return json.loads(result[0]) + return None + # 若发生异常则回滚事务并返回None + except Exception: + self.connection.rollback() + return None + finally: + # 确保游标关闭(关键:释放资源) + if cursor: + # noinspection PyBroadException + try: + cursor.close() + except Exception: + pass + + def update(self, guid: str, cache: Dict) -> bool: + """ + 更新缓存(存在则覆盖,不存在则新增) + :param guid: 缓存唯一标识 + :param cache: 缓存 + :return: 成功返回True,失败返回False + """ + with threading.Lock(): # 线程锁,保证并发安全 + # noinspection PyBroadException + try: + # 创建游标 + cursor = self.connection.cursor() + # 新增或覆盖缓存 + cursor.execute( + "INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?)", + ( + guid, + json.dumps(cache, ensure_ascii=False), + time.time(), + ), + ) + # 提交事务 + self.connection.commit() + return True + # 若发生异常则回滚事务并返回None + except Exception: + self.connection.rollback() + return False + finally: + # 确保游标关闭(关键:释放资源) + if cursor: + # noinspection PyBroadException + try: + cursor.close() + except Exception: + pass + + def common_extraction(**kwargs) -> dict | None: """通用数据提取""" @@ -708,6 +838,7 @@ if __name__ == "__main__": "购药及就医机构": ( institution := response["data"]["details"]["seller"] ), + # 根据购药及就医机构查询其类型若为药店则购药及就医类型为药店购药,其它则为门诊就医 } ) # 门诊/住院收费票据