From 963ce19609bd52fed7039be7f0e87b765551db73 Mon Sep 17 00:00:00 2001 From: liubiren Date: Sun, 14 Dec 2025 02:20:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=B8=B8=E6=9B=B4=E6=96=B0=20from=20m?= =?UTF-8?q?ac=20mini(m1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/{SQLite.db => caches.db} | Bin utils/client.py | 822 +++++++++++++++++++-------------- 票据理赔自动化/SQLite.db | Bin 0 -> 12288 bytes 票据理赔自动化/main.py | 131 +----- 4 files changed, 484 insertions(+), 469 deletions(-) rename utils/{SQLite.db => caches.db} (100%) diff --git a/utils/SQLite.db b/utils/caches.db similarity index 100% rename from utils/SQLite.db rename to utils/caches.db diff --git a/utils/client.py b/utils/client.py index 5968eb0..f5377d6 100644 --- a/utils/client.py +++ b/utils/client.py @@ -15,7 +15,7 @@ from email.utils import parsedate_to_datetime from functools import wraps from imaplib import IMAP4_SSL from pathlib import Path -from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union +from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union from urllib.parse import quote_plus from xml.etree import ElementTree @@ -87,192 +87,235 @@ class MySQLClient: raise RuntimeError("执行SQL查询并返回DATAFRAME发生其它异常") -class CacheClient: - """缓存客户端""" +class SQLiteClient: + """SQLite客户端""" - def __init__(self, cache_ttl: int = 360, database: str = "SQLite.db"): + def __init__(self, database: Union[str, Path]): """ - :param cache_ttl: 缓存生存时间,单位为天 - :param database: 缓存数据库名称 + 初始化SQLite客户端 + :param database: 数据库 """ - # 初始化缓存数据库连接 - self.connection: Optional[sqlite3.Connection] = None - # 初始化缓存生存时间,单位为天 - self.cache_ttl = cache_ttl + self.database = database + # 初始化本地线程存储 + self.threads = threading.local() + def _connect(self): + """为当前线程创建数据库连接和游标""" + # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 + if hasattr(self.threads, "connection") and self.threads.connection is not None: + return + # 为当前线程关闭数据库连接和游标 + self._disconnect() + + # noinspection PyBroadException try: - # 创建缓存数据库连接 - self.connection = sqlite3.connect( - database=( - Path(__file__).parent.resolve() / database - ), # 当前目录下创建缓存数据库 - check_same_thread=False, - timeout=30, # 缓存数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 + # 为当前线程创建数据库连接 + self.threads.connection = sqlite3.connect( + database=self.database, + check_same_thread=True, + timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 ) - - # 创建缓存表和索引、清理过期缓存 - with self.connection: - self.connection.execute( - """CREATE TABLE IF NOT EXISTS caches ( - guid TEXT PRIMARY KEY, - cache TEXT NOT NULL, - timestamp REAL NOT NULL - )""" - ) - self.connection.execute( - """CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)""" - ) - self.connection.execute( - "DELETE FROM caches WHERE timestamp < ?", - (time.time() - self.cache_ttl * 86400,), - ) - + # 开启行映射,支持按照字段名取值 + self.threads.connection.row_factory = sqlite3.Row + # 为当前线程创建数据库游标 + self.threads.cursor = self.threads.connection.cursor() except Exception as exception: - self._disconnect() - raise f"初始缓存数据库失败:{str(exception)}" from exception + self.threads.connection = None + self.threads.cursor = None + raise RuntimeError( + f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" + ) from exception def _disconnect(self) -> None: - """关闭缓存数据库连接""" - if self.connection: + """为当前线程关闭数据库连接和游标""" + # 检查当前线程有数据库游标,若有则关闭数据库游标 + if hasattr(self.threads, "cursor") and self.threads.cursor is not None: # noinspection PyBroadException try: - self.connection.close() - except Exception: - pass + # 为当前线程关闭数据库游标 + self.threads.cursor.close() + self.threads.cursor = None + except Exception as exception: + raise RuntimeError( + f"为当前线程关闭数据库游标发生异常,{str(exception)}" + ) from exception - def __enter__(self) -> "CacheClient": - """实现上下文管理""" + # 检查当前线程有数据库连接,若有则关闭数据库连接 + if hasattr(self.threads, "connection") and self.threads.connection is not None: + # noinspection PyBroadException + try: + # 为当前线程提交事务 + self.threads.connection.commit() + # 为当前线程关闭数据库连接 + self.threads.connection.close() + self.threads.connection = None + except Exception as exception: + raise RuntimeError( + f"为当前线程关闭数据库连接发生异常,{str(exception)}" + ) from exception + + def _query_one( + self, sql: str, parameters: Tuple[Any, ...] = () + ) -> Optional[Dict[str, Any]]: + """ + 为当前线程查询并获取单行数据 + :param sql: 查询SQL语句 + :param parameters: SQL参数 + :return: 单行数据 + """ + # noinspection PyBroadException + try: + # 为当前线程创建数据库连接和游标 + self._connect() + # 检查当前线程无数据库游标,若无则抛出异常 + if not hasattr(self.threads, "cursor") or self.threads.cursor is None: + raise RuntimeError("为当前线程创建数据库游标发生异常") + + # 为当前线程执行SQL + self.threads.cursor.execute(sql, parameters) + return ( + None + if (result := self.threads.cursor.fetchone()) is None + else dict(result) + ) + # 若发生异常则回滚事务并抛出异常 + except Exception as exception: + # 检查当前线程有数据库连接,若有则回滚 + if ( + hasattr(self.threads, "connection") + and self.threads.connection is not None + ): + self.threads.connection.rollback() + raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception + + def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: + """ + 为当前线程执行SQL + :param sql: 新增、删除和修改SQL语句 + :param parameters: SQL参数 + :return: 执行结果 + """ + try: + self._connect() + # 检查当前线程无数据库游标,若无则抛出异常 + if not hasattr(self.threads, "cursor") or self.threads.cursor is None: + raise RuntimeError("为当前线程创建数据库游标发生异常") + + # 为当前线程执行SQL + self.threads.cursor.execute(sql, parameters) + # 为当前线程提交事务 + self.threads.connection.commit() + return True + # 若发生异常则回滚事务并抛出异常 + except Exception as exception: + # 检查当前线程有数据库连接,若有则回滚 + if ( + hasattr(self.threads, "connection") + and self.threads.connection is not None + ): + self.threads.connection.rollback() + raise RuntimeError("为当前线程执行SQL发生异常") from exception + + def __enter__(self): + """进入上下文管理时为当前线程创建数据库连接和游标""" + self._connect() return self def __exit__(self, exc_type, exc_val, exc_tb): - """退出时关闭连接""" + """退出上下文管理时为当前线程关闭数据库连接和游标""" self._disconnect() return False - def query(self, guid: str) -> Optional[Dict]: - """ - 查询缓存 - :param guid: 缓存唯一标识 - :return: 缓存 - """ - with threading.Lock(): # 线程锁,保证并发安全 - # noinspection PyBroadException - try: - # 创建游标 - cursor = self.connection.cursor() - # 根据缓存唯一标识查询有效缓存 - cursor.execute( - "SELECT cache FROM caches WHERE guid = ? AND timestamp >= ?", - (guid, time.time() - self.cache_ttl * 86400), - ) - if result := cursor.fetchone(): - return json.loads(result[0]) - return None - # 若发生异常则回滚事务并返回None - except Exception: - self.connection.rollback() - return None - finally: - # 确保游标关闭(关键:释放资源) - if cursor: - # noinspection PyBroadException - try: - cursor.close() - except Exception: - pass + def __del__(self): + """析构时为当前线程关闭数据库连接和游标""" + self._disconnect() - def update(self, guid: str, cache: Dict) -> bool: - """ - 更新缓存(存在则覆盖,不存在则新增) - :param guid: 缓存唯一标识 - :param cache: 缓存 - :return: 成功返回True,失败返回False - """ - with threading.Lock(): # 线程锁,保证并发安全 - # noinspection PyBroadException - try: - # 创建游标 - cursor = self.connection.cursor() - # 新增或覆盖缓存 - cursor.execute( - "INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?)", - ( - guid, - json.dumps(cache, ensure_ascii=False), - time.time(), + +# 基于令牌桶限流算法的装饰器 +def restrict(refill_rate: float = 5.0, max_tokens: int = 5): + + class TokenBucket: + + # noinspection PyShadowingNames + def __init__(self, max_tokens: int, refill_rate: float): + """ + 初始化令牌桶限流 + :param refill_rate: 令牌填充速率,单位为个/秒 + :param max_tokens: 最大令牌数,单位为个 + """ + # 初始化最大令牌数 + self.max_tokens = max_tokens + # 初始化当前令牌数 + self.tokens = self.max_tokens * 0.5 + # 初始化令牌填充速率 + self.refill_rate = refill_rate + # 初始化上一次填充令牌的时间戳(使用单调时间戳) + self.refill_timestamp = time.monotonic() + # 初始化线程锁(所有线程共用) + self.thread_lock = threading.Lock() + + # 填充令牌 + def _refill(self) -> None: + with self.thread_lock: + # 本次填充令牌的时间戳 + refill_timestamp = time.monotonic() + # 重新计算令牌桶中令牌数 + # noinspection PyTypeChecker + self.tokens = min( + self.max_tokens, + max( + 0, + self.tokens + + self.refill_rate * (refill_timestamp - self.refill_timestamp), ), ) - # 提交事务 - self.connection.commit() - return True - # 若发生异常则回滚事务并返回None - except Exception: - self.connection.rollback() - return False - finally: - # 确保游标关闭(关键:释放资源) - if cursor: - # noinspection PyBroadException - try: - cursor.close() - except Exception: - pass + self.refill_timestamp = refill_timestamp + # 尝试消耗令牌 + def consume(self) -> Tuple[bool, float]: + # 填充令牌 + self._refill() -class TokenBucket: + with self.thread_lock: + if self.tokens >= 1: + self.tokens -= 1 + return True, 0 - def __init__(self, refill_rate, max_tokens): - """令牌桶,基于令牌桶算法限制请求频率""" - # 填充令牌速率(个/秒) - self.refill_rate = refill_rate - # 令牌桶最大令牌数 - self.max_tokens = max_tokens - # 令牌桶当前令牌数 - self.tokens = max_tokens - # 上一次填充令牌时间戳(使用单调递增时间,单位为秒) - self.refill_timestamp = time.monotonic() + # 等待时长 + # noinspection PyTypeChecker + wait_time = min( + 1 / self.refill_rate, + max( + 0, + 1 / self.refill_rate + - (time.monotonic() - self.refill_timestamp), + ), + ) + return False, wait_time - # 获取令牌 - # noinspection PyMissingReturnStatement - def acquire(self) -> tuple[bool, float]: - with threading.Lock(): - # 本次填充令牌时间戳 - refill_timestamp = time.monotonic() - # 重新计算令牌桶中令牌数 - self.tokens = min( - self.max_tokens, - self.tokens - + self.refill_rate * (refill_timestamp - self.refill_timestamp), - ) - self.refill_timestamp = refill_timestamp - - # 若令牌桶当前令牌数大于等于1则减少令牌 - if self.tokens >= 1: - self.tokens -= 1 - return True, 0.0 - - # 同时返回等待时间 - return False, 0.2 - - -# 将令牌桶以装饰函数封装为请求频率限制方法 -def restrict(refill_rate=5, max_tokens=5): - - def decorator(func): - # 初始化令牌桶 - token_bucket = TokenBucket(refill_rate=refill_rate, max_tokens=max_tokens) + # 初始化所有被装饰的函数创建令牌桶限流存储 + buckets = {} + def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs): + # 若当前被装饰的函数不在所有被装饰的函数创建令牌桶限流存储则为当前被装饰的函数实例化令牌桶限流 + if func not in buckets: + # 初始化令牌桶限流 + buckets[func] = TokenBucket( + refill_rate=refill_rate, max_tokens=max_tokens + ) + bucket = buckets[func] + # 重试次数 retries = 0 - # 若重试数小于等于最大重试次数,则循环检查是否允许请求 while retries <= 10: - success, wait_time = token_bucket.acquire() - # 若允许请求则返回嵌套函数,若不允许请求则等待 + # 尝试消耗令牌 + success, wait_time = bucket.consume() + # 若消耗令牌成功则返回被装饰的函数,否则等待 if success: return func(*args, **kwargs) - time.sleep(wait_time * 1.5**retries) + time.sleep(wait_time * 2) retries += 1 raise Exception("request too frequently") @@ -281,82 +324,150 @@ def restrict(refill_rate=5, max_tokens=5): return decorator -class RequestException(Exception): - """请求异常""" - - def __init__( - self, status: int = 400, code: int = 0, message: str = "request failed" - ): - """ - :param status: 状态编码,默认为0 - :param message: 错误信息,默认为RequestException - """ - self.status = status - self.code = code - self.message = message - super().__init__(self.message) - - def __str__(self): - return f"请求发生异常({self.status}, {self.message})" - - -# 请求参数数据模型 -class Arguments(BaseModel): - """ - :param url: 统一资源定位符,基于统一资源定位符校验器进行校验 - :param params: 查询参数 - :param headers: 请求头 - :param data: 表单数据 - :param json_data: JSON # 入参时使用别名,出参时根据BY_ALIAS=TRUE确定是否使用别名 - :param files: 上传文件 - :param stream: 是否启用流式传输 - :param guid: 全局唯一标识 - """ - - # 统一资源定位符 - url: HttpUrl = Field(default=...) - # 查询参数 - params: Optional[Dict] = Field(default=None) - # 请求头 - headers: Optional[Dict] = Field(default=None) - # 表单数据 - data: Optional[Dict] = Field(default=None) - # JSON - json_data: Optional[Dict] = Field(default=None, alias="json") - # 上传文件 - files: Optional[ - Dict[ - str, - Union[ - Tuple[str, bytes], Tuple[str, bytes, str], Tuple[str, bytes, str, dict] - ], - ] - ] = Field(default=None) - # 是否启用流式传输 - stream: Optional[bool] = Field(default=None) - # 全局唯一标识 - guid: Optional[str] = Field(default=None) - - # 表单数据和JSON数据互斥 - @model_validator(mode="after") - def validate_data(self): - if self.data and self.json_data: - raise ValueError("cannot use both data and json parameters simultaneously") - return self - - # 上传文件和启用流式传输互斥 - @model_validator(mode="after") - def validate_files(self): - if self.files and self.stream: - raise ValueError( - "cannot use both files and stream parameters simultaneously" - ) - return self - - class HTTPClient: """请求客户端""" + class RequestException(Exception): + """请求异常""" + + def __init__( + self, + status: Optional[int] = 400, + code: int = 0, + message: str = "请求发生异常", + ): + """ + :param status: 状态码 + :param code: 错误码 + :param message: 错误信息 + """ + self.status = status + self.code = code + self.message = message + super().__init__(self.message) + + def __str__(self): + return f"请求发生异常(status={self.status}, code={self.code},message={self.message})" + + class Parameters(BaseModel): + """ + 请求参数模型,支持自动校验 + """ + + url: HttpUrl = Field( + default=..., description="统一资源定位符,基于HttpUrl自动校验" + ) + params: Optional[Dict[str, Any]] = Field( + default=None, description="统一资源定位符的查询参数" + ) + headers: Optional[Dict[str, str]] = Field(default=None, description="请求头") + data: Optional[Dict[str, Any]] = Field(default=None, description="表单数据") + json_data: Optional[Dict[str, Any]] = Field( + default=None, alias="json", description="JSON数据" + ) + files: Optional[ + Dict[ + str, + Union[ + Tuple[str, bytes], + Tuple[str, bytes, str], + Tuple[str, bytes, str, Dict[str, str]], + ], + ] + ] = Field( + default=None, + description="上传文件,{字段名: (文件名, 字节数据, 内容类型, 请求头)}", + ) + stream_enabled: Optional[bool] = Field(default=None, description="使用流式传输") + guid: Optional[str] = Field(default=None, description="缓存全局唯一标识") + + @model_validator(mode="after") + def validate_data(self): + """校验:表单数据和JSON数据互斥""" + if self.data is not None and self.json_data is not None: + raise ValueError("表单数据和JSON数据不能同时使用") + return self + + @model_validator(mode="after") + def validate_files(self): + if self.files is not None and self.stream_enabled: + raise ValueError("上传文件和使用流式传输不能同时使用") + return self + + class CacheClient(SQLiteClient): + """缓存客户端""" + + def __init__(self, cache_ttl: int): + """ + 初始化缓存数据库 + :param cache_ttl: 缓存生存时间,单位为天 + """ + # 初始化SQLite客户端 + super().__init__(database=Path(__file__).parent.resolve() / "caches.db") + # 初始化缓存生存时间,单位为秒 + self.cache_ttl = cache_ttl + + # 初始化缓存表、时间戳索引和清理过期缓存 + try: + with self: + self._execute( + sql="""CREATE TABLE IF NOT EXISTS caches (guid TEXT PRIMARY KEY, cache TEXT NOT NULL, timestamp REAL NOT NULL)""" + ) + self._execute( + sql="""CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)""" + ) + self._execute( + sql="DELETE FROM caches WHERE timestamp < ?", + parameters=(time.time() - self.cache_ttl,), + ) + except Exception as exception: + raise RuntimeError( + f"初始化缓存数据库发生异常:{str(exception)}" + ) from exception + + # noinspection PyShadowingNames + def query(self, guid: str) -> Optional[Dict[str, Any]]: + """ + 查询并获取单条缓存 + :param guid: 缓存唯一标识 + :return: 缓存 + """ + # noinspection PyBroadException + try: + with self: + result = self._query_one( + sql="SELECT cache FROM caches WHERE guid = ? AND timestamp >= ?", + parameters=(guid, time.time() - self.cache_ttl), + ) + # 就查询结果JSON反序列化 + if result is not None and "cache" in result: + return json.loads(result["cache"]) + return None + except Exception as exception: + raise RuntimeError("查询并获取单条缓存发生异常") from exception + + # noinspection PyShadowingNames + def update(self, guid: str, cache: Dict) -> Optional[bool]: + """ + 新增或更新缓存(若无则新增缓存,若有则更新缓存) + :param guid: 缓存唯一标识 + :param cache: 缓存 + :return: 成功返回True,失败返回False + """ + # noinspection PyBroadException + try: + with self: + return self._execute( + sql="INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?)", + parameters=( + guid, + json.dumps(cache, ensure_ascii=False), + time.time(), + ), + ) + except Exception as exception: + raise RuntimeError("新增或更新缓存发生异常") from exception + def __init__( self, default_headers: Optional[Dict[str, str]] = None, @@ -383,9 +494,19 @@ class HTTPClient: # 初始化使用缓存 self.cache_enabled = cache_enabled # 初始化缓存生存时间,单位为秒 - self.cache_ttl = cache_ttl * 24 * 60 * 60 + self.cache_ttl = cache_ttl * 86400 + + self.cache_client: Optional[HTTPClient.CacheClient] = None + # 若使用缓存则实例化缓存客户端 + if self.cache_enabled: + # 初始化缓存客户端 + self.cache_client = self.CacheClient(cache_ttl=self.cache_ttl) + + def __del__(self): + """析构时关闭请求会话""" + if hasattr(self, "session") and self.session: + self.session.close() - # 创建请求会话并挂载适配器 @staticmethod def _create_session( total: int, @@ -393,183 +514,200 @@ class HTTPClient: default_headers: Optional[Dict[str, str]] = None, ) -> Session: """ + 创建请求会话并挂载适配器 :param default_headers: 默认请求头 :param total: 最大重试次数 :param backoff_factor: 重试间隔退避因子 - :return Session: 会话对象 + :return Session: 请求会话实例 """ - # 创建会话对象 + # 实例化请求会话 session = Session() - # 设置请求头 + # 设置默认请求头 if default_headers: session.headers.update(default_headers) - # 设置重试策略(优先按照服响应等待时长,若未返回则默认按照退避算法等待) - strategy_retries = Retry( - allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "PATCH"], - status_forcelist=[ - 408, - 502, - 503, - 504, - ], # 408为请求超时,502为网关错误,503为服务不可用,504为网关超时 - total=total, - respect_retry_after_header=True, - backoff_factor=backoff_factor, + # 设置重试策略并挂载适配器 + adapter = HTTPAdapter( + max_retries=Retry( + allowed_methods=["HEAD", "GET", "POST", "PUT", "DELETE", "PATCH"], + status_forcelist=[ + 408, + 502, + 503, + 504, + ], # 408为请求超时,502为网关错误,503为服务不可用,504为网关超时 + total=total, + respect_retry_after_header=True, + backoff_factor=backoff_factor, + ) ) - - # 创建适配器并绑定重试策略 - adapter = HTTPAdapter(max_retries=strategy_retries) - # 就HTTP请求生效 session.mount("http://", adapter) - # 就HTTPS请求生效 session.mount("https://", adapter) return session - # GET请求 - def get(self, **kwargs) -> Union[Dict, str]: - return self._request(method="GET", arguments=Arguments(**kwargs)) + def get( + self, **kwargs + ) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]: + """发送GET请求""" + return self._request(method="GET", parameters=self.Parameters(**kwargs)) - # POST请求 - def post(self, **kwargs) -> Union[Dict, str]: - return self._request(method="POST", arguments=Arguments(**kwargs)) + def post( + self, **kwargs + ) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]: + """发送POST请求""" + return self._request(method="POST", parameters=self.Parameters(**kwargs)) - # DOWNLOAD请求 def download( - self, stream=False, chunk_size=1024, **kwargs - ) -> Union[Dict, str, Generator[bytes, None, None]]: + self, stream_enabled: bool = False, chunk_size: int = 1024, **kwargs + ) -> Union[ + str, + Tuple[str, bytes], + Dict[str, Any], + ElementTree.Element, + Generator[bytes, None, None], + None, + ]: response = self._request( - method="GET", arguments=Arguments(**{"stream": stream, **kwargs}) + method="GET", + parameters=self.Parameters(**{"stream_enabled": stream_enabled, **kwargs}), ) - # 若禁用流式传输,则返回响应 - if not stream: - return response - # 若启用流式传输,则处理流式传输响应并返回 - return self._process_stream_response(response=response, chunk_size=chunk_size) + """ + 下载文件 + :param stream_enabled: 使用流式传输 + :param chunk_size: 流式传输的分块大小 + """ + # 若是用流式传输则处理流式传输响应 + if stream_enabled: + return self._process_stream_response( + response=response, chunk_size=chunk_size + ) + return response - def _request(self, method: Literal["GET", "POST"], arguments: Arguments) -> Any: + def _request( + self, method: Literal["GET", "POST"], parameters: Parameters + ) -> Union[ + str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, Response, None + ]: """请求""" - # 请求参数模型 - arguments = arguments.model_dump(exclude_none=True, by_alias=True) - # URL对象转为字符串 - arguments["url"] = str(arguments["url"]) + # 将请求参数模型转为请求参数字典 + parameters = parameters.model_dump(exclude_none=True, by_alias=True) + # 将URL由HttpUrl对象转为字符串 + parameters["url"] = str(parameters["url"]) - # 重构表单数据 - if arguments.get("data") is not None: - arguments["data"] = { - k: v for k, v in arguments["data"].items() if v is not None + # 过滤表单数据中None值 + if parameters.get("data") is not None: + parameters["data"] = { + k: v for k, v in parameters["data"].items() if v is not None } - # 重构JSON数据 - if arguments.get("json_data") is not None: - arguments["json_data"] = { - k: v for k, v in arguments["json_data"].items() if v is not None + # 过滤JSON数据中None值 + if parameters.get("json_data") is not None: + parameters["json_data"] = { + k: v for k, v in parameters["json_data"].items() if v is not None } - # 重构文件数据 - if arguments.get("files") is not None: - files = {} - for k, v in arguments["files"].items(): - if isinstance(v, (tuple, list)): - match len(v): - # 文件数据包括文件名称和文件内容 - case 2: - files[k] = (v[0], v[1], None, None) - # 文件数据包含文件名称、文件内容和内容类型 - case 3: - files[k] = (v[0], v[1], v[2], None) - # 文件数据包含文件名称、文件内容、内容类型和请求头 - case 4: - files[k] = (v[0], v[1], v[2], v[3]) - arguments.update({"files": files}) + # 使用流式传输 + stream_enabled = parameters.pop("stream_enabled", False) - # 全局唯一标识 - guid = arguments.pop("guid", None) - # 若使用缓存且全局唯一标识非空则查询缓存 + # 缓存全局唯一标识 + guid = parameters.pop("guid", None) + # 若使用缓存且缓存全局唯一标识非空则查询并获取单条缓存 if self.cache_enabled and guid is not None: - with CacheClient(cache_ttl=self.cache_ttl) as cache_client: - cache = cache_client.query(guid) - # 若缓存非空则返回 + cache = self.cache_client.query(guid) if cache is not None: return cache + # 发送请求并处理响应 + # noinspection PyBroadException try: - # 发送请求 response = self.session.request( - method=method, timeout=self.timeout, **arguments + method=method, timeout=self.timeout, **parameters ) - # 若返回错误状态码则抛出异常 - response.raise_for_status() + response.raise_for_status() # 若返回非2??状态码则抛出异常 + + # 若使用流式传输则直接返回(不缓存) + if stream_enabled: + return response + # 处理响应 response = self._process_response(response=response) - # 若使用缓存且全局唯一标识非空则更新缓存 + # 若使用缓存且缓存全局唯一标识非空则新增或更新缓存 if self.cache_enabled and guid is not None: - with CacheClient(cache_ttl=self.cache_ttl) as cache_client: - cache_client.update(guid, response) + self.cache_client.update(guid, response) return response except Exception as exception: - # 尝试根据响应解析错误状态码和错误信息,否则进行构造 # noinspection PyBroadException try: - # 响应反序列化 - response_decoded = response.json() - # 错误状态码 - status = response_decoded["status"] - # 错误信息 - message = response_decoded["message"] - except: - status = getattr(getattr(exception, "response", None), "status", None) - # 重构错误信息 - message = f"{method} {arguments["url"]} failed: {str(exception).split("\n")[0]}" - raise RequestException(status=status, message=message) + response = response or getattr(exception, "response", None) + status = ( + response.json().get("status", response.status_code) + if response is not None + else None + ) + message = ( + response.json().get("message", str(exception).splitlines()[0]) + if response is not None + else str(exception).splitlines()[0] + ) + except Exception: + status = None + message = f"{method} {parameters["url"]} 请求发生异常:{str(exception).split("\n")[0]}" + raise self.RequestException(status=status, message=message) from exception # 处理响应 @staticmethod - def _process_response(response: Response) -> Any: - # 响应内容 + def _process_response( + response: Response, + ) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]: + # 若响应内容为空则返回None content = response.content - # 若响应内容为空则返回NONE - if not content: + if not response: return None - # 标准化内容类型 - content_type = ( - response.headers.get("Content-Type", "").split(";")[0].strip().lower() - ) - # 根据内容类型匹配解析返回内容方法 + # 响应类型 + _type = response.headers.get("Content-Type", "").split(";")[0].strip().lower() + # 根据响应类型匹配响应内容解析方法并返回 # noinspection PyUnreachableCode - match content_type: + match _type: + # JSON:JSON反序列化 case "application/json" | "text/json": - # 响应反序列化 return response.json() + # XML:解析为XML对象(Element实例) case "application/xml" | "text/xml": - # 解析为XML(ELEMENT对象) - return ElementTree.fromstring(text=content) + return ElementTree.fromstring(content) + # 以image/开头:返回影像件格式和响应内容 + case _ if _type.startswith("image/"): + # 影像件格式 + image_format = _type.split(sep="/", maxsplit=1)[1] + return image_format, content + # 其它的响应类型,先UTF8解码再返回,若解码发生异常则直接返回 case _: - # 若内容类型以IMAGE/开头则返回图片格式和图片数据 - if content_type.startswith("image/"): - # 图片格式 - image_format = content_type.split(sep="/", maxsplit=1)[1] - return f"{image_format}", content - else: + try: + return content.decode("utf-8") + except UnicodeDecodeError: return content # 处理流式传输响应 @staticmethod def _process_stream_response( response: Response, chunk_size: int - ) -> Generator[bytes, None, None]: # 生成器不接受发SEND发送至、结束时返回NONE - # 检查数据分块 - if not isinstance(chunk_size, int) and isinstance(chunk_size, bool): - raise ValueError("chunk_size must type=int") + ) -> Generator[bytes, None, None]: + """ + 处理流式响应 + :param response: requests.Response对象 + :param chunk_size: 分块大小 + :return: 字节数据块生成器 + """ + if not isinstance(chunk_size, int): + raise ValueError("分块大小数据类型必须为整数") if chunk_size <= 0: - raise ValueError("chunk_size must >0") + raise ValueError("分块大小必须大于0") try: for chunk in response.iter_content(chunk_size=chunk_size): diff --git a/票据理赔自动化/SQLite.db b/票据理赔自动化/SQLite.db index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..2678b94a4e86e822dbd3a346a21603f7b1dbe419 100644 GIT binary patch literal 12288 zcmeI#&r8EF6bJB^cJ;?##M`d#Qcw_Kc-Yk{Imqg!wSqg9h!p~}Rhx+(=gq&$e@pSQ zR2lT(*vZTIKJrK&c_HL;%jHFEY|eM()4jHQNIN719WfJ;=hAn%2;mm3Md9YUUKJi4 zJimA3CwVd;8OV=S2e1zT2tWV=5P$##AOHafKmY;|_&0%BUu<^6P`vf5zM15vsI0M5 zYs#W(uDz2)MXBO6I*t`LcX+qWOQO-nsk%=2JUKm!k}(g|cz@|B`}&aowhu=s53l0* z&(^)Zx6vJhf=qFnzf_MC_m|J~)RsSW*0{UMdd;f-*KgIH9hX9WksAmIKmY;|fB*y_ z009U<00Izz00h=eAP~NkTDRT1g#0Eq5D1Rwwb2tWV=5P$##AOHafte-&3m)m*{ DA*oXS literal 0 HcmV?d00001 diff --git a/票据理赔自动化/main.py b/票据理赔自动化/main.py index ae04757..31ac156 100644 --- a/票据理赔自动化/main.py +++ b/票据理赔自动化/main.py @@ -8,7 +8,6 @@ https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink import json import re -import sqlite3 import uuid from base64 import b64encode from datetime import datetime @@ -26,137 +25,15 @@ from zen import ZenDecision, ZenEngine from utils.client import Authenticator, HTTPClient - # from utils.ocr import fuzzy_match -class SQLiteClient: - """SQLite客户端""" +client = SQLiteClient() +a = client._query_one("SELECT * FROM institutions") - def __init__(self, database: str = "SQLite.db"): - """ - :param database: 数据库名称 - """ - # 初始化数据库连接 - self.connection: Optional[sqlite3.Connection] = None +print(a) - try: - # 创建数据库连接 - self.connection = sqlite3.connect( - database=( - Path(__file__).parent.resolve() / database - ), # 当前目录下创建数据库 - check_same_thread=False, - timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 - ) - - # 创建缓存表和索引、清理过期缓存 - with self.connection: - self.connection.execute( - """CREATE TABLE IF NOT EXISTS caches ( - guid TEXT PRIMARY KEY, - cache TEXT NOT NULL, - timestamp REAL NOT NULL - )""" - ) - self.connection.execute( - """CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)""" - ) - self.connection.execute( - "DELETE FROM caches WHERE timestamp < ?", - (time.time() - self.cache_ttl * 86400,), - ) - - except Exception as exception: - self._disconnect() - raise f"初始缓存数据库失败:{str(exception)}" from exception - - def _disconnect(self) -> None: - """关闭缓存数据库连接""" - if self.connection: - # noinspection PyBroadException - try: - self.connection.close() - except Exception: - pass - - def __enter__(self) -> "CacheClient": - """实现上下文管理""" - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """退出时关闭连接""" - self._disconnect() - return False - - def query(self, guid: str) -> Optional[Dict]: - """ - 查询缓存 - :param guid: 缓存唯一标识 - :return: 缓存 - """ - with threading.Lock(): # 线程锁,保证并发安全 - # noinspection PyBroadException - try: - # 创建游标 - cursor = self.connection.cursor() - # 根据缓存唯一标识查询有效缓存 - cursor.execute( - "SELECT cache FROM caches WHERE guid = ? AND timestamp >= ?", - (guid, time.time() - self.cache_ttl * 86400), - ) - if result := cursor.fetchone(): - return json.loads(result[0]) - return None - # 若发生异常则回滚事务并返回None - except Exception: - self.connection.rollback() - return None - finally: - # 确保游标关闭(关键:释放资源) - if cursor: - # noinspection PyBroadException - try: - cursor.close() - except Exception: - pass - - def update(self, guid: str, cache: Dict) -> bool: - """ - 更新缓存(存在则覆盖,不存在则新增) - :param guid: 缓存唯一标识 - :param cache: 缓存 - :return: 成功返回True,失败返回False - """ - with threading.Lock(): # 线程锁,保证并发安全 - # noinspection PyBroadException - try: - # 创建游标 - cursor = self.connection.cursor() - # 新增或覆盖缓存 - cursor.execute( - "INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?)", - ( - guid, - json.dumps(cache, ensure_ascii=False), - time.time(), - ), - ) - # 提交事务 - self.connection.commit() - return True - # 若发生异常则回滚事务并返回None - except Exception: - self.connection.rollback() - return False - finally: - # 确保游标关闭(关键:释放资源) - if cursor: - # noinspection PyBroadException - try: - cursor.close() - except Exception: - pass +exit() def common_extraction(**kwargs) -> dict | None: