diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..53202d5 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,5 @@ +[settings] +order_by_type = true # 按照标准库、第三方库和本地模块分组 +alphabetical = true # 同组按照字母序排序 +multi_line_output = 3 +indent = " " \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..db3b42b --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,13 @@ +{ + "version": "0.0.1", + "configurations": [ + { + "name": "Python: 运行", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "cwd": "${fileDirname}", + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/database.db b/database.db new file mode 100644 index 0000000..3d0bb31 Binary files /dev/null and b/database.db differ diff --git a/utils/__init__.py b/utils/__init__.py deleted file mode 100644 index aad0a9d..0000000 --- a/utils/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from mysql import MySQL -from sqlite import SQLite -from request import restrict, Authenticator, Request -from feishu import Feishu -from rules_engine import RulesEngine \ No newline at end of file diff --git a/utils/authenticator.py b/utils/authenticator.py new file mode 100644 index 0000000..d6425a9 --- /dev/null +++ b/utils/authenticator.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +""" +认证器 +""" + +import hashlib +import hmac +import json +from pathlib import Path +import sys +import threading +import time +from typing import Optional, Tuple + +sys.path.append(Path(__file__).parent.as_posix()) +from request import Request + + +class Authenticator: + """ + 认证器,支持: + 1、 + """ + + def __init__(self): + """初始化""" + # 初始化访问凭证路径 + self.certifications_path = ( + Path(__file__).parent.resolve() / "certifications.json" + ) + # 若访问凭证路径不存在则创建 + if not self.certifications_path.exists(): + with open(self.certifications_path, "w", encoding="utf-8") as file: + json.dump( + {}, + file, + ensure_ascii=False, + ) + + # 初始化请求客户端 + self.request = Request() + + def get_token(self, servicer: str) -> Optional[str]: + """ + 获取访问令牌 + :param servicer: 服务商名称,暂仅支持深圳快瞳、合力亿捷和飞书 + :return token: 访问令牌 + """ + with threading.Lock(): + token, expired_timestamp = None, 0 + try: + with open(self.certifications_path, "r", encoding="utf-8") as file: + certifications = json.load(file) + # 获取指定服务商的访问凭证 + certification = certifications.get(servicer) + # 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳 + if certification: + # 访问令牌 + token = certification["token"] + # 失效时间戳 + expired_timestamp = certification["expired_timestamp"] + + except json.decoder.JSONDecodeError: + with open(self.certifications_path, "w", encoding="utf-8") as file: + json.dump( + certifications := {}, + file, + ensure_ascii=False, + ) + + except Exception as exception: + raise RuntimeError( + f"获取访问凭证发生异常:{str(exception)}" + ) from exception + + if time.time() > expired_timestamp: + match servicer: + case "szkt": + token, expired_timestamp = self._get_szkt_certification() + case "hlyj": + token, expired_timestamp = self._get_hlyj_certification() + case "feishu": + token, expired_timestamp = self._get_feishu_certification() + case _: + raise RuntimeError(f"暂不支持该服务商:{servicer}") + + # 更新访问凭证 + certifications[servicer] = { + "token": token, + "expired_timestamp": expired_timestamp, + } + with open(self.certifications_path, "w", encoding="utf-8") as file: + json.dump( + certifications, + file, + ensure_ascii=False, + ) + + return token + + def _get_szkt_certification(self) -> tuple[str, float]: + """ + 获取深圳快瞳访问凭证 + :return: 访问令牌和失效时间戳 + """ + response = self.request.get( + url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" + ) + + # 若非响应成功则抛出异常 + if not (response["status"] == 200 and response["code"] == 0): + raise RuntimeError("获取深圳快瞳访问凭证发生异常") + + return ( + response["data"]["access_token"], + time.time() + response["data"]["expires_in"], + ) + + def _get_hlyj_certification(self) -> Tuple[str, float]: + """获取合力亿捷访问凭证""" + # 企业访问标识 + access_key_id = "25938f1c190448829dbdb5d344231e42" + # 签名秘钥 + secret_access_key = "44dc0299aff84d68ae27712f8784f173" + # 时间戳(秒级) + timestamp = int(time.time()) + # 构建签名 + signature = hmac.new( + secret_access_key.encode("utf-8"), + f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"), + hashlib.sha256, + ).hexdigest() + response = self.request.get( + url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}" + ) + + # 若非响应成功则抛出异常 + if not response["success"]: + raise RuntimeError("获取合力亿捷访问凭证发生异常") + + return ( + response["data"], + time.time() + 1 * 3600, # 访问令牌有效期为1小时 + ) + + def _get_feishu_certification(self) -> tuple[str, float]: + """获取飞书访问凭证""" + response = self.request.post( + url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", + data={ + "app_id": "cli_a1587980be78500c", + "app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA", + }, + ) + + # 若非响应成功则抛出异常 + if not response["code"] == 0: + raise RuntimeError("获取飞书访问凭证发生异常") + + return ( + response["tenant_access_token"], + time.time() + response["expire"], + ) diff --git a/utils/logger.py b/utils/logger.py index 4ec7655..a377ca6 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -9,7 +9,6 @@ # 加载模块 import logging - from logging.handlers import RotatingFileHandler """ diff --git a/utils/request.py b/utils/request.py index 96ab79e..254a047 100644 --- a/utils/request.py +++ b/utils/request.py @@ -1,15 +1,13 @@ # -*- coding: utf-8 -*- +""" +请求客户端 +""" -# 导入模块 - -import hashlib -import hmac import json -import threading -import time -from functools import wraps from pathlib import Path -from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union +import sys +import time +from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union from xml.etree import ElementTree from pydantic import BaseModel, Field, HttpUrl, model_validator @@ -17,259 +15,166 @@ from requests import Response, Session from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry +sys.path.append(Path(__file__).parent.as_posix()) from sqlite import SQLite -def restrict(refill_rate: float = 5.0, max_tokens: int = 5): +class Parameters(BaseModel): """ - 请求限速装饰器 - :param refill_rate: 令牌填充速率,单位为个/秒 - :param max_tokens: 最大令牌数,单位为个 + 请求参数模型 """ - class TokenBucket: + url: HttpUrl = Field(default=..., description="统一资源定位符,基于HttpUrl自动校验") + params: Optional[Dict[str, Any]] = Field( + default=None, description="统一资源定位符的查询参数" + ) + headers: Optional[Dict[str, str]] = Field(default=None, description="请求头") + data: Optional[Dict[str, Any]] = Field(default=None, description="表单数据") + json_: Optional[Dict[str, Any]] = Field( + default=None, alias="json", description="JSON数据" + ) + files: Optional[ + Dict[ + str, + Union[ + Tuple[str, bytes], + Tuple[str, bytes, str], + Tuple[str, bytes, str, Dict[str, str]], + ], + ] + ] = Field( + default=None, + description="上传文件,{字段名: (文件名, 字节数据, 内容类型, 请求头)}", + ) + stream_enabled: Optional[bool] = Field(default=None, description="使用流式传输") + guid: Optional[str] = Field(default=None, description="缓存全局唯一标识") - # noinspection PyShadowingNames - def __init__(self, max_tokens: int, refill_rate: float): - """ - 初始化令牌桶限流 - :param refill_rate: 令牌填充速率,单位为个/秒 - :param max_tokens: 最大令牌数,单位为个 - """ - # 初始化最大令牌数 - self.max_tokens = max_tokens - # 初始化当前令牌数 - self.tokens = self.max_tokens * 0.5 - # 初始化令牌填充速率 - self.refill_rate = refill_rate - # 初始化上一次填充令牌的时间戳(使用单调时间戳) - self.refill_timestamp = time.monotonic() - # 初始化线程锁(所有线程共用) - self.thread_lock = threading.Lock() + @model_validator(mode="after") + def validate_data(self): + """校验:表单数据和JSON数据互斥""" + if self.data is not None and self.json_ is not None: + raise ValueError("表单数据和JSON数据不能同时使用") + return self - # 填充令牌 - def _refill(self) -> None: - with self.thread_lock: - # 本次填充令牌的时间戳 - refill_timestamp = time.monotonic() - # 重新计算令牌桶中令牌数 - # noinspection PyTypeChecker - self.tokens = min( - self.max_tokens, - max( - 0, - self.tokens - + self.refill_rate * (refill_timestamp - self.refill_timestamp), + @model_validator(mode="after") + def validate_files(self): + if self.files is not None and self.stream_enabled: + raise ValueError("上传文件和使用流式传输不能同时使用") + return self + + +class RequestException(Exception): + """请求异常""" + + def __init__( + self, + status: Optional[int] = 400, + code: int = 0, + message: str = "请求发生异常", + ): + """ + :param status: 状态码 + :param code: 错误码 + :param message: 错误信息 + """ + self.status = status + self.code = code + self.message = message + super().__init__(self.message) + + +class Caches(SQLite): + """ + 缓存,支持: + query:查询并返回单条缓存 + update:新增或更新单条缓存 + """ + + def __init__(self, cache_ttl: int): + """ + 初始化 + :param cache_ttl: 缓存生存时间,单位为秒 + """ + # 初始化 + super().__init__(database=Path(__file__).parent.resolve() / "caches.db") + self.cache_ttl = cache_ttl + + # 初始化缓存表(不清理过期缓存) + try: + with self: + self.execute( + sql=""" + CREATE TABLE IF NOT EXISTS caches + ( + --缓存唯一标识 + guid TEXT PRIMARY KEY, + --缓存(JSON序列化) + cache TEXT NOT NULL, + --缓存时间 + timestamp REAL NOT NULL + ) + """ + ) + self.execute( + sql=""" + CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp) + """ + ) + except Exception as exception: + raise RuntimeError(f"初始化缓存表发生异常:{str(exception)}") from exception + + def query(self, guid: str) -> Optional[Dict[str, Any]]: + """ + 查询并返回单条缓存 + :param guid: 缓存唯一标识 + :return: 缓存 + """ + try: + with self: + result = self.query_one( + sql=""" + SELECT cache + FROM caches + WHERE guid = ? AND timestamp >= ? + """, + parameters=(guid, time.time() - self.cache_ttl), + ) + return None if result is None else json.loads(result["cache"]) + except Exception as exception: + raise RuntimeError( + f"查询并获取单条缓存发生异常:{str(exception)}" + ) from exception + + def update(self, guid: str, cache: Dict) -> Optional[bool]: + """ + 新增或更新单条缓存(若无则新增缓存,若有则更新缓存) + :param guid: 缓存唯一标识 + :param cache: 缓存 + :return: 成功返回True,失败返回False + """ + try: + with self: + return self.execute( + sql=""" + INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?) + """, + parameters=( + guid, + json.dumps(cache, ensure_ascii=False), + time.time(), ), ) - self.refill_timestamp = refill_timestamp - - # 尝试消耗令牌 - def consume(self) -> Tuple[bool, float]: - # 填充令牌 - self._refill() - - with self.thread_lock: - if self.tokens >= 1: - self.tokens -= 1 - return True, 0 - - # 等待时长 - # noinspection PyTypeChecker - wait_time = min( - 1 / self.refill_rate, - max( - 0, - 1 / self.refill_rate - - (time.monotonic() - self.refill_timestamp), - ), - ) - return False, wait_time - - # 初始化所有被装饰的函数创建令牌桶限流存储 - buckets = {} - - def decorator(func: Callable) -> Callable: - @wraps(func) - def wrapper(*args, **kwargs): - # 若当前被装饰的函数不在所有被装饰的函数创建令牌桶限流存储则为当前被装饰的函数实例化令牌桶限流 - if func not in buckets: - # 初始化令牌桶限流 - buckets[func] = TokenBucket( - refill_rate=refill_rate, max_tokens=max_tokens - ) - bucket = buckets[func] - - # 重试次数 - retries = 0 - while retries <= 10: - # 尝试消耗令牌 - success, wait_time = bucket.consume() - # 若消耗令牌成功则返回被装饰的函数,否则等待 - if success: - return func(*args, **kwargs) - time.sleep(wait_time * 2) - retries += 1 - raise Exception("request too frequently") - - return wrapper - - return decorator + except Exception as exception: + raise RuntimeError("新增或更新缓存发生异常") from exception class Request: - """请求客户端""" - - class RequestException(Exception): - """请求异常""" - - def __init__( - self, - status: Optional[int] = 400, - code: int = 0, - message: str = "请求发生异常", - ): - """ - :param status: 状态码 - :param code: 错误码 - :param message: 错误信息 - """ - self.status = status - self.code = code - self.message = message - super().__init__(self.message) - - class Parameters(BaseModel): - """ - 请求参数模型,支持自动校验 - """ - - url: HttpUrl = Field( - default=..., description="统一资源定位符,基于HttpUrl自动校验" - ) - params: Optional[Dict[str, Any]] = Field( - default=None, description="统一资源定位符的查询参数" - ) - headers: Optional[Dict[str, str]] = Field(default=None, description="请求头") - data: Optional[Dict[str, Any]] = Field(default=None, description="表单数据") - json_: Optional[Dict[str, Any]] = Field( - default=None, alias="json", description="JSON数据" - ) - files: Optional[ - Dict[ - str, - Union[ - Tuple[str, bytes], - Tuple[str, bytes, str], - Tuple[str, bytes, str, Dict[str, str]], - ], - ] - ] = Field( - default=None, - description="上传文件,{字段名: (文件名, 字节数据, 内容类型, 请求头)}", - ) - stream_enabled: Optional[bool] = Field(default=None, description="使用流式传输") - guid: Optional[str] = Field(default=None, description="缓存全局唯一标识") - - @model_validator(mode="after") - def validate_data(self): - """校验:表单数据和JSON数据互斥""" - if self.data is not None and self.json_ is not None: - raise ValueError("表单数据和JSON数据不能同时使用") - return self - - @model_validator(mode="after") - def validate_files(self): - if self.files is not None and self.stream_enabled: - raise ValueError("上传文件和使用流式传输不能同时使用") - return self - - class Caches(SQLite): - """请求缓存""" - - def __init__(self, cache_ttl: int): - """ - 初始化 - :param cache_ttl: 缓存生存时间,单位为秒 - """ - # 初始化 - super().__init__(database=Path(__file__).parent.resolve() / "caches.db") - # 初始化缓存生存时间,单位为秒 - self.cache_ttl = cache_ttl - - # 初始化缓存表和时间戳索引(不清理过期缓存) - try: - with self: - self._execute( - sql=""" - CREATE TABLE IF NOT EXISTS caches - ( - --缓存唯一标识 - guid TEXT PRIMARY KEY, - --缓存(JSON序列化) - cache TEXT NOT NULL, - --缓存时间 - timestamp REAL NOT NULL - ) - """ - ) - self._execute( - sql=""" - CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp) - """ - ) - except Exception as exception: - raise RuntimeError( - f"初始化缓存数据库发生异常:{str(exception)}" - ) from exception - - # noinspection PyShadowingNames - def query(self, guid: str) -> Optional[Dict[str, Any]]: - """ - 查询并获取单条缓存 - :param guid: 缓存唯一标识 - :return: 缓存 - """ - # noinspection PyBroadException - try: - with self: - result = self._query_one( - sql=""" - SELECT cache - FROM caches - WHERE guid = ? AND timestamp >= ? - """, - parameters=(guid, time.time() - self.cache_ttl), - ) - return ( - None if result is None else json.loads(result["cache"]) - ) # 就缓存JSON反序列化 - except Exception as exception: - raise RuntimeError("查询并获取单条缓存发生异常") from exception - - # noinspection PyShadowingNames - def update(self, guid: str, cache: Dict) -> Optional[bool]: - """ - 新增或更新缓存(若无则新增缓存,若有则更新缓存) - :param guid: 缓存唯一标识 - :param cache: 缓存 - :return: 成功返回True,失败返回False - """ - # noinspection PyBroadException - try: - with self: - return self._execute( - sql=""" - INSERT OR REPLACE INTO caches (guid, cache, timestamp) VALUES (?, ?, ?) - """, - parameters=( - guid, - json.dumps(cache, ensure_ascii=False), - time.time(), - ), - ) - except Exception as exception: - raise RuntimeError("新增或更新缓存发生异常") from exception + """ + 请求客户端,支持: + get:GET请求 + post:POST请求 + download:下载 + """ def __init__( self, @@ -282,11 +187,11 @@ class Request: ): """ :param default_headers: 默认请求头 - :param total: 最大重试次数 - :param backoff_factor: 重试间隔退避因子 - :param timeout: 超时时间,单位为秒 - :param cache_enabled: 使用缓存 - :param cache_ttl: 缓存生存时间,单位为天 + :param total: 最大重试次数,默认 3 + :param backoff_factor: 重试间隔退避因子,默认 0.5 + :param timeout: 超时时间(单位为秒),默认为 60 + :param cache_enabled: 使用缓存,默认 False + :param cache_ttl: 缓存生存时间(单位为天),默认为 360 """ # 创建请求会话并挂载适配器 self.session = self._create_session( @@ -294,20 +199,8 @@ class Request: ) # 初始化超时时间 self.timeout = timeout - # 初始化使用缓存 - self.cache_enabled = cache_enabled - # 初始化缓存生存时间,单位由天转为秒 - self.cache_ttl = cache_ttl * 86400 - - self.caches: Optional[Request.Caches] = None - # 若使用缓存则实例化缓存 - if self.cache_enabled: - self.caches = Request.Caches(cache_ttl=self.cache_ttl) - - def __del__(self): - """析构时关闭请求会话""" - if hasattr(self, "session") and self.session: - self.session.close() + # 实例化缓存 + self.caches = Caches(cache_ttl=cache_ttl * 86400) if cache_enabled else None @staticmethod def _create_session( @@ -349,29 +242,35 @@ class Request: return session - def get( - self, **kwargs - ) -> Any: - """发送GET请求""" - return self._request(method="GET", parameters=self.Parameters(**kwargs)) + def get(self, **kwargs) -> Any: + """ + GET请求 + :param kwargs: 请求参数 + :return: 响应内容 + """ + return self._request(method="GET", parameters=Parameters(**kwargs)) - def post( - self, **kwargs - ) -> Any: - """发送POST请求""" - return self._request(method="POST", parameters=self.Parameters(**kwargs)) + def post(self, **kwargs) -> Any: + """ + POST请求 + :param kwargs: 请求参数 + :return: 响应内容 + """ + return self._request(method="POST", parameters=Parameters(**kwargs)) def download( self, stream_enabled: bool = False, chunk_size: int = 1024, **kwargs ) -> Any: """ - 下载文件 + 下载 :param stream_enabled: 使用流式传输 :param chunk_size: 流式传输的分块大小 + :param kwargs: 请求参数 + :return: 响应内容 """ response = self._request( method="GET", - parameters=self.Parameters(**{"stream_enabled": stream_enabled, **kwargs}), + parameters=Parameters(**{"stream_enabled": stream_enabled, **kwargs}), ) # 若使用流式传输则处理流式传输响应 if stream_enabled: @@ -380,107 +279,104 @@ class Request: ) return response - def _request( - self, method: Literal["GET", "POST"], parameters: Parameters - ) -> Union[ - str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, Response, None - ]: - """请求""" + def _request(self, method: Literal["GET", "POST"], parameters: Parameters) -> Any: + """ + 请求 + :param method: 请求方法 + :param parameters: 请求参数模型 + :return: 响应内容 + """ # 将请求参数模型转为请求参数字典 - parameters = parameters.model_dump(exclude_none=True, by_alias=True) - # 将URL由HttpUrl对象转为字符串 - parameters["url"] = str(parameters["url"]) + kwargs = parameters.model_dump(exclude_none=True, by_alias=True) - # 过滤表单数据中None值 - if parameters.get("data") is not None: - parameters["data"] = { - k: v for k, v in parameters["data"].items() if v is not None - } + # 将统一资源定位符转为字符串 + url = str(kwargs.pop("url")) - # 过滤JSON数据中None值 - if parameters.get("json") is not None: - parameters["json"] = { - k: v for k, v in parameters["json"].items() if v is not None - } + # 过滤表单数据中空值 + if kwargs.get("data"): + kwargs["data"] = {k: v for k, v in kwargs["data"].items() if v} + + # 过滤JSON数据中空值 + if kwargs.get("json"): + kwargs["json"] = {k: v for k, v in kwargs["json"].items() if v} # 使用流式传输 - stream_enabled = parameters.pop("stream_enabled", False) + stream_enabled = kwargs.pop("stream_enabled", False) # 缓存全局唯一标识 - guid = parameters.pop("guid", None) - # 若使用缓存且缓存全局唯一标识非空则查询并获取单条缓存 - if self.cache_enabled and guid is not None: - cache = self.cache_client.query(guid) - if cache is not None: + guid = kwargs.pop("guid", None) + # 若缓存非空且缓存全局唯一标识非空则查询并获取单条缓存 + if self.caches and guid: + cache = self.caches.query(guid) + if cache: return cache # 发送请求并处理响应 - # noinspection PyBroadException try: response = self.session.request( - method=method, timeout=self.timeout, **parameters + method=method, url=url, timeout=self.timeout, **kwargs ) response.raise_for_status() # 若返回非2??状态码则抛出异常 - # 若使用流式传输则直接返回(不缓存) + # 若使用流式传输则直接返回响应对象(不缓存) if stream_enabled: return response - # 处理响应 + # 处理响应对象 response = self._process_response(response=response) - # 若使用缓存且缓存全局唯一标识非空则新增或更新缓存 - if self.cache_enabled and guid is not None: - self.cache_client.update(guid, response) + if self.caches and guid: + self.caches.update(guid, response) return response + # 重构异常信息 except Exception as exception: - # noinspection PyBroadException try: response = getattr(exception, "response", None) status = ( response.json().get("status", response.status_code) - if response is not None + if response else None ) message = ( response.json().get("message", response.text) - if response is not None + if response else str(exception).splitlines()[0] ) except Exception: status = None - message = f"{method} {parameters["url"]} 请求发生异常:{str(exception).splitlines()[0]}" - return self.RequestException(status=status, message=message).__dict__ + message = f"{method} {kwargs["url"]} 请求发生异常:{str(exception).splitlines()[0]}" + return RequestException(status=status, message=message).__dict__ - # 处理响应 @staticmethod def _process_response( response: Response, - ) -> Union[str, Tuple[str, bytes], Dict[str, Any], ElementTree.Element, None]: - # 若响应内容为空则返回None + ) -> Any: + """ + 处理响应对象 + :param response: 响应对象 + :return: 响应内容 + """ content = response.content if not content: return None # 响应类型 - _type = response.headers.get("Content-Type", "").split(";")[0].strip().lower() + response_type = ( + response.headers.get("Content-Type", "").split(";")[0].strip().lower() + ) # 根据响应类型匹配响应内容解析方法并返回 - # noinspection PyUnreachableCode - match _type: - # JSON:JSON反序列化 + match response_type: + # 若为JSON则反序列化 case "application/json" | "text/json": return response.json() - # XML:解析为XML对象(Element实例) + # 若为XML解析为Element对象 case "application/xml" | "text/xml": return ElementTree.fromstring(content) - # 以image/开头:返回影像件格式和响应内容 - case _ if _type.startswith("image/"): - # 影像件格式 - image_format = _type.split(sep="/", maxsplit=1)[1] - return image_format, content - # 其它的响应类型,先UTF8解码再返回,若解码发生异常则直接返回 + # 若为影像件格式则返回影像件格式和响应内容 + case _ if response_type.startswith("image/"): + return response_type.split(sep="/", maxsplit=1)[1], content case _: try: return content.decode("utf-8") @@ -494,176 +390,13 @@ class Request: ) -> Generator[bytes, None, None]: """ 处理流式响应 - :param response: requests.Response对象 + :param response: 响应对象 :param chunk_size: 分块大小 - :return: 字节数据块生成器 + :return: 响应内容迭代器 """ - if not isinstance(chunk_size, int): - raise ValueError("分块大小数据类型必须为整数") - - if chunk_size <= 0: - raise ValueError("分块大小必须大于0") - try: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: yield chunk finally: response.close() - - -class Authenticator: - - def __init__( - self, - ): - """认证器(用于获取访问令牌)""" - # 初始化 - self._initialize() - - def _initialize(self): - """初始化""" - # 初始化访问凭证地址对象 - self.certifications_path = ( - Path(__file__).parent.resolve() / "certifications.json" - ) - # 若访问凭证地址对象不存在则创建 - if not self.certifications_path.exists(): - with open(self.certifications_path, "w", encoding="utf-8") as file: - json.dump( - {}, - file, - ensure_ascii=False, - ) - - # 初始化请求客户端 - self.http_client = HTTPClient() - - def _szkt_get_certification(self) -> tuple[str, float]: - """获取深圳快瞳访问凭证""" - response = self.http_client.get( - url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" - ) - - # 若非响应成功则抛出异常 - if not (response["status"] == 200 and response["code"] == 0): - raise RuntimeError("获取深圳快瞳访问凭证发生异常") - - # 返回访问令牌、失效时间戳 - # noinspection PyTypeChecker - return ( - response["data"]["access_token"], - time.time() + response["data"]["expires_in"], - ) - - def _hlyj_get_certification(self) -> Tuple[str, float]: - """获取合力亿捷访问凭证""" - # 企业访问标识 - access_key_id = "25938f1c190448829dbdb5d344231e42" - # 签名秘钥 - secret_access_key = "44dc0299aff84d68ae27712f8784f173" - # 时间戳(秒级) - timestamp = int(time.time()) - # 签名,企业访问标识、签名秘钥和时间戳拼接后计算的十六进制的HMAC-SHA256 - signature = hmac.new( - secret_access_key.encode("utf-8"), - f"{access_key_id}{secret_access_key}{timestamp}".encode("utf-8"), - hashlib.sha256, - ).hexdigest() - - response = self.http_client.get( - url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}" - ) - - # 若非响应成功则抛出异常 - if not response["success"]: - raise RuntimeError("获取合力亿捷访问凭证发生异常") - - # 返回访问令牌、失效时间戳 - # noinspection PyTypeChecker - return ( - response["data"], - time.time() + 1 * 60 * 60, # 访问令牌有效期为1小时 - ) - - def _feishu_get_certification(self) -> tuple[str, float]: - """获取飞书访问凭证""" - response = self.http_client.post( - url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", - data={ - "app_id": "cli_a1587980be78500c", - "app_secret": "vZXGZomwfmyaHXoG8s810d1YYGLsIqCA", - }, - ) - - # 若非响应成功则抛出异常 - if not response["code"] == 0: - raise RuntimeError("获取飞书访问凭证发生异常") - - # 返回访问令牌、失效时间戳 - # noinspection PyTypeChecker - return ( - response["tenant_access_token"], - time.time() + response["expire"], - ) - - def get_token(self, servicer: str) -> str | None: - """ - 获取访问令牌 - :param servicer: 服务商,暂仅支持深圳快瞳、合力亿捷和飞书 - :return token: 访问令牌 - """ - with threading.Lock(): - # 初始化访问令牌和失效时间戳 - token, expired_timestamp = None, 0 - try: - with open(self.certifications_path, "r", encoding="utf-8") as file: - # 本地打开并读取所有服务商的访问凭证 - certifications = json.load(file) - # 获取指定服务商的访问凭证 - certification = certifications.get(servicer, None) - # 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳 - if certification is not None: - # 访问令牌 - token = certification["token"] - # 失效时间戳 - expired_timestamp = certification["expired_timestamp"] - - # 若反序列化发生异常则重置访问凭证储存文件 - except json.decoder.JSONDecodeError: - with open(self.certifications_path, "w", encoding="utf-8") as file: - json.dump( - {}, - file, - ensure_ascii=False, - ) - - except Exception: - raise RuntimeError("获取访问令牌发生异常") - - if time.time() > expired_timestamp: - # noinspection PyUnreachableCode - match servicer: - case "szkt": - token, expired_timestamp = self._szkt_get_certification() - case "hlyj": - token, expired_timestamp = self._hlyj_get_certification() - case "feishu": - token, expired_timestamp = self._feishu_get_certification() - case _: - raise RuntimeError(f"未设置服务商:{servicer}获取访问凭证方法") - # 更新服务商访问凭证 - certifications[servicer] = { - "token": token, - "expired_timestamp": expired_timestamp, - } - - # 将所有服务商访问凭证保存至本地文件 - with open(self.certifications_path, "w", encoding="utf-8") as file: - json.dump( - certifications, - file, - ensure_ascii=False, - ) - - return token diff --git a/utils/restrict.py b/utils/restrict.py new file mode 100644 index 0000000..99269e7 --- /dev/null +++ b/utils/restrict.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +""" +请求限速器 +""" + +from functools import wraps +import threading +import time +from typing import Callable, Tuple + + +class TokenBucket: + """ + 令牌桶 + """ + + def __init__(self, max_tokens: int, refill_rate: float): + """ + 初始化 + :param refill_rate: 令牌填充速率,单位为个/秒 + :param max_tokens: 最大令牌数,单位为个 + """ + self.max_tokens = max_tokens + # 初始化当前令牌数 + self.tokens = int(round(self.max_tokens * 0.5)) + self.refill_rate = refill_rate + # 初始化上一次填充令牌的时间戳(使用单调时间戳) + self.refill_timestamp = time.monotonic() + # 初始化线程锁(所有线程共用) + self.thread_lock = threading.Lock() + + def _refill(self) -> None: + """ + 填充令牌 + :return: 无 + """ + with self.thread_lock: + # 本次填充令牌的时间戳 + refill_timestamp = time.monotonic() + # 重新计算令牌桶中令牌数 + self.tokens = min( + self.max_tokens, + max( + 0, + self.tokens + + self.refill_rate * (refill_timestamp - self.refill_timestamp), + ), + ) + self.refill_timestamp = refill_timestamp + + def consume(self) -> Tuple[bool, float]: + """ + 消耗令牌 + :return: 消耗令牌是否成功与否和等待时长 + """ + # 填充令牌 + self._refill() + + with self.thread_lock: + if self.tokens >= 1: + self.tokens -= 1 + return True, 0 + + # 等待时长 + wait_time = min( + 1 / self.refill_rate, + max( + 0, + 1 / self.refill_rate - (time.monotonic() - self.refill_timestamp), + ), + ) + return False, wait_time + + +def restrict( + max_tokens: int = 5, + refill_rate: float = 5.0, +) -> Callable: + """ + 请求限速器 + :param max_tokens: 最大令牌数 + :param refill_rate: 令牌填充速率,单位为个/秒 + :return: 限速器装饰器 + """ + + # 初始化所有被装饰的函数创建令牌桶限流存储 + buckets = {} + + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + # 若当前被装饰的函数不在所有被装饰的函数创建令牌桶存储则为当前被装饰的函数实例化令牌桶 + if func not in buckets: + # 初始化令牌桶 + buckets[func] = TokenBucket( + refill_rate=refill_rate, max_tokens=max_tokens + ) + + # 重试次数 + retries = 0 + bucket = buckets[func] + while retries <= 10: + # 消耗令牌 + success, wait_time = bucket.consume() + # 若消耗令牌成功则返回被装饰的函数,否则等待 + if success: + return func(*args, **kwargs) + time.sleep(wait_time * 2) + retries += 1 + raise Exception("request too frequently") + + return wrapper + + return decorator diff --git a/utils/rules_engine.py b/utils/rules_engine.py index f3a8187..fc2ab5f 100644 --- a/utils/rules_engine.py +++ b/utils/rules_engine.py @@ -1,40 +1,40 @@ # -*- coding: utf-8 -*- - -# 导入模块 +""" +规则引擎 +""" from datetime import datetime from decimal import Decimal from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, Union from zen import ZenDecision, ZenEngine - class RulesEngine: """ - 规则引擎,实现打开并读取规则,根据规则和输入评估并输出 + 规则引擎,支持: + evaluate:调用并返回评估结果 """ - def __init__(self, rules_path: Path): + def __init__(self, decisions_folder_path: Path): """ 初始化规则引擎 - :param rules_path: 规则文件夹路径(path对象) + :param decisions_folder_path: 决策文件夹路径 """ - rules_path = Path(rules_path) - rules_path.mkdir(parents=True, exist_ok=True) # 若规则文件夹不存在则创建 + decisions_folder_path.mkdir(exist_ok=True) # 若规则文件夹不存在则创建 - # 初始化规则缓存 + # 初始化决策缓存 self.decisions = {} - for decision_path in rules_path.glob("*.json"): - # 打开并读取规则文件并实例化规则 + for decision_path in decisions_folder_path.glob("*.json"): + # 打开并读取决策文件并实例化 ZenDecision self.decisions[decision_path.stem] = self._get_decision(decision_path) @staticmethod def _get_decision(decision_path: Path) -> ZenDecision: """ - 打开并读取规则文件并实例化规则 - :param decision_path: 规则文件路径(path对象) - :return: 实例化规则 + 打开并读取决策文件并实例化 ZenDecision + :param decision_path: 决策文件路径 + :return: 实例化 ZenDecision """ def loader(path): @@ -45,33 +45,31 @@ class RulesEngine: def evaluate(self, decision: str, inputs: Dict[str, Any]) -> Dict[str, Any]: """ - 调用规则并评估 - :param decision: 规则 - :param inputs: 输入 - :return: 输出 + 调用并返回评估结果 + :param decision: 决策名称 + :param inputs: 待评估对象 + :return: 评估结果 """ - if decision not in self.decisions: - raise ValueError(f"规则不存在:{decision}") + return self.decisions[decision].evaluate(self._formatter(inputs))["result"] - return self.decisions[decision].evaluate(self._format_value(inputs))["result"] - - def _format_value(self, value: Any) -> Any: + def _formatter( + self, inputs: Union[str, int, Decimal, datetime, list, dict, Any] + ) -> Any: """ - 格式化值为字符串 - :param value: 值 - :return: 格式化后值 + 格式化器 + :param inputs: 待评估对象 + :return: 格式化后待评估对象 """ - match value: + match inputs: case int(): - return str(value) + return str(inputs) case Decimal(): - # noinspection PyTypeChecker - return format(value, f".{abs(value.as_tuple().exponent)}f") + return format(inputs, ".2f") case datetime(): - return value.strftime("%Y-%m-%d %H:%M:%S") + return inputs.strftime("%Y-%m-%d %H:%M:%S") case list(): - return [self._format_value(e) for e in value] + return [self._formatter(i) for i in inputs] case dict(): - return {k: self._format_value(v) for k, v in value.items()} + return {key: self._formatter(value) for key, value in inputs.items()} # 递归格式化 case _: - return value + return inputs diff --git a/utils/sqlite.py b/utils/sqlite.py index 6c28901..1113839 100644 --- a/utils/sqlite.py +++ b/utils/sqlite.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- - -# 导入模块 +""" +SQLite客户端 +""" import sqlite3 import threading @@ -10,7 +11,10 @@ from typing import Any, Dict, List, Optional, Tuple, Union class SQLite: """ - SQLite客户端 + SQLite客户端,支持: + query_one:根据SQL语句执行查询并返回一行查询数据 + query_all:根据SQL语句执行查询并返回所有查询数据 + execute:根据SQL语句执行操作 """ def __init__(self, database: Union[str, Path]): @@ -22,77 +26,21 @@ class SQLite: # 初始化本地线程存储 self.threads = threading.local() - def _connect(self): - """为当前线程创建数据库连接和游标""" - # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 - if hasattr(self.threads, "connection") and self.threads.connection is not None: - return - # 为当前线程关闭数据库连接和游标 - self._disconnect() - - # noinspection PyBroadException - try: - # 为当前线程创建数据库连接 - self.threads.connection = sqlite3.connect( - database=self.database, - check_same_thread=True, - timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 - ) - # 开启行映射,支持按照字段名取值 - self.threads.connection.row_factory = sqlite3.Row - # 为当前线程创建游标 - self.threads.cursor = self.threads.connection.cursor() - except Exception as exception: - self.threads.connection = None - self.threads.cursor = None - raise RuntimeError( - f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" - ) from exception - - def _disconnect(self) -> None: - """为当前线程关闭数据库连接和游标""" - # 检查当前线程有游标,若有则关闭游标 - if hasattr(self.threads, "cursor") and self.threads.cursor is not None: - # noinspection PyBroadException - try: - # 为当前线程关闭游标 - self.threads.cursor.close() - self.threads.cursor = None - except Exception as exception: - raise RuntimeError( - f"为当前线程关闭游标发生异常,{str(exception)}" - ) from exception - - # 检查当前线程有数据库连接,若有则关闭数据库连接 - if hasattr(self.threads, "connection") and self.threads.connection is not None: - # noinspection PyBroadException - try: - # 为当前线程提交事务 - self.threads.connection.commit() - # 为当前线程关闭数据库连接 - self.threads.connection.close() - self.threads.connection = None - except Exception as exception: - raise RuntimeError( - f"为当前线程关闭数据库连接发生异常,{str(exception)}" - ) from exception - - def _query_one( + def query_one( self, sql: str, parameters: Tuple[Any, ...] = () ) -> Optional[Dict[str, Any]]: """ - 为当前线程查询并获取单行数据 - :param sql: 查询SQL语句 + 在当前线程,根据SQL语句执行查询并返回一行查询数据 + :param sql: SQL语句 :param parameters: SQL参数 - :return: 单行数据 + :return: 查询数据 """ - # noinspection PyBroadException try: # 为当前线程创建数据库连接和游标 self._connect() - # 检查当前线程无游标,若无则抛出异常 + # 检查当前线程游标,若游标为空值则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: - raise RuntimeError("为当前线程创建游标发生异常") + raise RuntimeError("创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) @@ -109,28 +57,28 @@ class SQLite: and self.threads.connection is not None ): self.threads.connection.rollback() - raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception + raise RuntimeError(f"执行查询发生异常:{str(exception)}") from exception - def _query_all( + def query_all( self, sql: str, parameters: Tuple[Any, ...] = () ) -> List[Dict[str, Any]]: """ - 为当前线程查询并获取多行数据 - :param sql: 查询SQL语句 + 在当前线程,根据SQL语句执行查询并返回所有查询数据 + :param sql: SQL语句 :param parameters: SQL参数 - :return: 多行数据 + :return: 查询数据 """ - # noinspection PyBroadException try: # 为当前线程创建数据库连接和游标 self._connect() - # 检查当前线程无游标,若无则抛出异常 + # 检查当前线程游标,若游标为空值则抛出异常 if not hasattr(self.threads, "cursor") or self.threads.cursor is None: - raise RuntimeError("为当前线程创建游标发生异常") + raise RuntimeError("创建游标发生异常") # 为当前线程执行SQL self.threads.cursor.execute(sql, parameters) result = [] + # 分批处理 while batch := self.threads.cursor.fetchmany(1000): result.extend([dict(row) for row in batch]) return result @@ -142,11 +90,11 @@ class SQLite: and self.threads.connection is not None ): self.threads.connection.rollback() - raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception + raise RuntimeError(f"执行查询发生异常:{str(exception)}") from exception - def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: + def execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool: """ - 为当前线程执行SQL + 在当前线程,根据SQL语句执行操作 :param sql: 新增、删除和修改SQL语句 :param parameters: SQL参数 :return: 执行结果 @@ -170,7 +118,59 @@ class SQLite: and self.threads.connection is not None ): self.threads.connection.rollback() - raise RuntimeError("为当前线程执行SQL发生异常") from exception + raise RuntimeError(f"执行SQL发生异常:{str(exception)}") from exception + + def _connect(self): + """为当前线程创建数据库连接和游标""" + # 检查当前线程有数据库连接,若有则继续否则创建数据库连接 + if hasattr(self.threads, "connection") and self.threads.connection is not None: + return + # 为当前线程关闭数据库连接和游标 + self._disconnect() + + try: + # 为当前线程创建数据库连接 + self.threads.connection = sqlite3.connect( + database=self.database, + check_same_thread=True, + timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 + ) + # 开启行映射,支持按照字段名取值 + self.threads.connection.row_factory = sqlite3.Row + # 为当前线程创建游标 + self.threads.cursor = self.threads.connection.cursor() + except Exception as exception: + self.threads.connection = None + self.threads.cursor = None + raise RuntimeError( + f"为当前线程创建数据库连接和游标发生异常,{str(exception)}" + ) from exception + + def _disconnect(self) -> None: + """为当前线程关闭数据库连接和游标""" + # 检查当前线程有游标,若有则关闭游标 + if hasattr(self.threads, "cursor") and self.threads.cursor is not None: + try: + # 为当前线程关闭游标 + self.threads.cursor.close() + self.threads.cursor = None + except Exception as exception: + raise RuntimeError( + f"为当前线程关闭游标发生异常,{str(exception)}" + ) from exception + + # 检查当前线程有数据库连接,若有则关闭数据库连接 + if hasattr(self.threads, "connection") and self.threads.connection is not None: + try: + # 为当前线程提交事务 + self.threads.connection.commit() + # 为当前线程关闭数据库连接 + self.threads.connection.close() + self.threads.connection = None + except Exception as exception: + raise RuntimeError( + f"为当前线程关闭数据库连接发生异常,{str(exception)}" + ) from exception def __enter__(self): """进入上下文管理时为当前线程创建数据库连接和游标""" diff --git a/票据理赔自动化/__init__.py b/票据理赔自动化/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/票据理赔自动化/a.py b/票据理赔自动化/a.py deleted file mode 100644 index ab3cf4d..0000000 --- a/票据理赔自动化/a.py +++ /dev/null @@ -1,2 +0,0 @@ -a = 1 -print(a) \ No newline at end of file diff --git a/票据理赔自动化/common.py b/票据理赔自动化/common.py index b3a0fcf..7eb85e6 100644 --- a/票据理赔自动化/common.py +++ b/票据理赔自动化/common.py @@ -1,9 +1,12 @@ # -*- coding: utf-8 -*- from pathlib import Path +import sys from masterdata import MasterData -# from ..utils import RuleEngine + +sys.path.append(Path(__file__).parent.parent.as_posix()) +from utils.rules_engine import RulesEngine # 初始化赔案档案(保险公司将提供投保公司、保险分公司和报案时间等,TPA作业系统签收后生成赔案号) dossier = { @@ -16,7 +19,7 @@ dossier = { } # 实例化主数据 -master_data = MasterData() +masterdata = MasterData() # 实例化规则引擎 -rule_engine = RuleEngine(rules_path=Path("rules")) +rules_engine = RulesEngine(decisions_folder_path=Path("rules")) diff --git a/票据理赔自动化/image.py b/票据理赔自动化/image.py index e4563a1..70a8f8a 100644 --- a/票据理赔自动化/image.py +++ b/票据理赔自动化/image.py @@ -1,51 +1,55 @@ # -*- coding: utf-8 -*- +""" +影像件处理模块 +""" -import json -import re from base64 import b64encode from datetime import datetime from decimal import Decimal, ROUND_HALF_UP - -from pathlib import Path -from typing import Optional, Tuple, Dict, Any from hashlib import md5 +import json +from pathlib import Path +import re +import sys +from typing import Any, Dict, List, Optional, Tuple + import cv2 -import numpy -import pandas from fuzzywuzzy import fuzz from jionlp import parse_location +import numpy +import pandas -from common import dossier, master_data, rule_engine +from common import dossier, masterdata, rules_engine + +sys.path.append(Path(__file__).parent.parent.as_posix()) +from utils.authenticator import Authenticator +from utils.request import Request -from utils import Authenticator, Request # 实例化认证器 authenticator = Authenticator() # 实例化请求客户端 -http_client = Request(timeout=300, cache_enabled=True) # 使用缓存 +request = Request(timeout=300, cache_enabled=True) # 使用缓存 -# noinspection PyShadowingNames def image_read( image_path: Path, -) -> Optional[numpy.ndarray | None]: +) -> numpy.ndarray: """ 打开并读取影像件 - :param image_path: 影像件路径(path对象) + :param image_path: 影像件路径 :return: 影像件数据(numpy.ndarray对象) """ - # noinspection PyBroadException try: # 打开并读取影像件(默认转为单通道灰度图) image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE) if image_ndarray is None: - raise + raise RuntimeError(f"影像件数据为空") return image_ndarray except Exception as exception: - raise RuntimeError(f"打开并读取影像件发生异常:{str(exception)}") + raise RuntimeError(f"打开并读取影像件发生异常:{str(exception)}") from exception -# noinspection PyShadowingNames def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str: """ 生成影像件唯一标识 @@ -62,12 +66,11 @@ def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str: return image_guid -# noinspection PyShadowingNames def image_compress( image_format: str, image_ndarray: numpy.ndarray, image_size_specified: float = 2.0, -) -> Optional[str]: +) -> str: """ 压缩影像件 :param image_format: 影像件格式 @@ -111,10 +114,9 @@ def image_compress( if min(image_ndarray_copy.shape[:2]) < 350: break - return None + raise RuntimeError("压缩影像件发生异常") -# noinspection PyShadowingNames def calculate_age(report_time: datetime, birth_date: datetime) -> int: """ 根据报案时间计算周岁 @@ -136,7 +138,6 @@ def calculate_age(report_time: datetime, birth_date: datetime) -> int: # TODO: 后续添加居民身份证(国徽面)和居民身份证(头像面)合并 -# noinspection PyShadowingNames def identity_card_recognize(image, insurer_company) -> None: """ 识别居民身份证并整合至赔案档案 @@ -145,7 +146,7 @@ def identity_card_recognize(image, insurer_company) -> None: :return: 无 """ # 请求深圳快瞳居民身份证识别接口 - response = http_client.post( + response = request.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"), headers={ "X-RequestId-Header": image["image_guid"] @@ -158,13 +159,12 @@ def identity_card_recognize(image, insurer_company) -> None: ) # TODO: 若请求深圳快瞳居民身份证识别接口发生异常则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): - raise + raise RuntimeError("请求深圳快瞳居民身份证识别接口发生异常") if image["image_type"] in [ "居民身份证(国徽、头像面)", "居民身份证(头像面)", ]: - # noinspection PyTypeChecker dossier["insured_person_layer"].update( { "insured_person": ( @@ -195,7 +195,7 @@ def identity_card_recognize(image, insurer_company) -> None: ) # 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询个单 - dossier["insured_persons_layer"] = master_data.query_liabilities( + dossier["insured_persons_layer"] = masterdata.query_liabilities( insurer_company, insured_person, identity_type, @@ -207,7 +207,6 @@ def identity_card_recognize(image, insurer_company) -> None: "居民身份证(国徽、头像面)", "居民身份证(国徽面)", ]: - # noinspection PyTypeChecker dossier["insured_person_layer"].update( { "commencement_date": datetime.strptime( @@ -223,8 +222,7 @@ def identity_card_recognize(image, insurer_company) -> None: ) -# noinspection PyShadowingNames -def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, str]]: +def image_classify(image_index: int, image_path: Path) -> None: """ 分类影像件并旋正 :param image_index: 影像件编号 @@ -233,23 +231,18 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st """ # 打开并读取影像件 image_ndarray = image_read(image_path) - image_index = f"{image_index:02d}" image_format = image_path.suffix.lower() # 影像件格式 # 生成影像件唯一标识 - # noinspection PyTypeChecker image_guid = image_serialize(image_format, image_ndarray) # 压缩影像件 image_base64 = image_compress( image_format, image_ndarray, image_size_specified=2 ) # 深圳快瞳要求影像件BASE64编码后大小小于等于2兆字节 - # TODO: 若压缩影像件发生异常则流转至人工处理 - if not image_base64: - raise # 请求深圳快瞳影像件分类接口 - response = http_client.post( + response = request.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"), headers={ "X-RequestId-Header": image_guid @@ -262,10 +255,9 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st ) # TODO: 若响应非成功则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): - raise + raise RuntimeError("请求深圳快瞳影像件分类接口发生异常") # 匹配影像件类型 - # noinspection PyTypeChecker match (response["data"]["flag"], response["data"]["type"]): case (14, _): image_type = "居民户口簿" @@ -293,7 +285,6 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st image_type = "其它" # 匹配影像件方向 - # noinspection PyTypeChecker image_orientation = { "0": "0度", "90": "顺时针90度", @@ -314,13 +305,10 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st image_base64 = image_compress( image_format, image_ndarray, image_size_specified=2 ) - # TODO: 若旋正后再次压缩影像件发生异常则流转至人工处理 - if not image_base64: - raise dossier["images_layer"].append( { - "image_index": image_index, + "image_index": f"{image_index:02d}", # 影像件编号 "image_path": image_path.as_posix(), "image_name": image_path.stem, "image_format": image_format, @@ -331,8 +319,7 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st ) -# noinspection PyShadowingNames -def mlm_recognize(image, schema) -> Optional[Dict[str, Any]]: +def mlm_recognize(image, schema) -> Dict[str, Any]: """ 使用多模态大模型就理赔申请书进行光学字符识别并结构化识别结果 :param image: 影像件 @@ -340,7 +327,7 @@ def mlm_recognize(image, schema) -> Optional[Dict[str, Any]]: :return: 结构化后识别结果 """ # 请求火山引擎多模态大模型接口并就消息内容JSON反序列化 - response = http_client.post( + response = request.post( url="https://ark.cn-beijing.volces.com/api/v3/chat/completions", headers={ "Authorization": "Bearer 2c28ab07-888c-45be-84a2-fc4b2cb5f3f2", @@ -392,15 +379,13 @@ def mlm_recognize(image, schema) -> Optional[Dict[str, Any]]: ) # 就响应中消息内容JSON反序列化 - # noinspection PyBroadException try: - # noinspection PyTypeChecker return json.loads(response["choices"][0]["message"]["content"]) - except: - return None + # TODO: 若请求火山引擎多模态大模型接口发生异常则流转至人工处理 + except Exception as exception: + raise RuntimeError("请求火山引擎多模态大模型接口发生异常") from exception -# noinspection PyShadowingNames def boc_application_recognize(image: str) -> None: """ 识别中银保险有限公司的理赔申请书并整合至赔案档案 @@ -507,9 +492,6 @@ def boc_application_recognize(image: str) -> None: # 使用多模态大模型就理赔申请书进行光学字符识别并结构化识别结果 recognition = mlm_recognize(image, schema) - # TODO: 若识别中银保险有限公司的理赔申请书并整合至赔案档案发生异常则流转至人工处理 - if not recognition: - raise dossier["insured_person_layer"].update( { "phone_number": recognition["手机"], @@ -520,7 +502,6 @@ def boc_application_recognize(image: str) -> None: ) -# noinspection PyShadowingNames def application_recognize(image, insurer_company) -> None: """ 识别理赔申请书并整合至赔案档案 @@ -536,25 +517,19 @@ def application_recognize(image, insurer_company) -> None: boc_application_recognize(image) -# noinspection PyShadowingNames -def fuzzy_match(contents: list, key: str) -> Optional[str]: +def fuzzy_match(contents: List[Dict[str, Any]], key: str) -> str: """ 根据内容列表(基于深圳快瞳增值税发票和医疗收费票据识别结果)模糊匹配键名 :param contents: 内容列表 :param key: 键名 - :return 值 + :return """ - # 若内容列表为空值则返回None - if not contents: - return None - - # noinspection PyInconsistentReturns match contents[0].keys(): # 对应深圳快瞳增值税发票识别结果 case _ if "desc" in contents[0].keys(): for content in contents: if content["desc"] == key: - return content["value"] if content["value"] else None + return content["value"] if content["value"] else "" candidates = [] for content in contents: @@ -568,9 +543,9 @@ def fuzzy_match(contents: list, key: str) -> Optional[str]: ) return ( - (result[0] if result[0] else None) + (result[0] if result[0] else "") if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 - else None + else "" ) # 返回似度>=80且最大的值 # 对应深圳快瞳医疗收费票据识别结果 @@ -578,7 +553,7 @@ def fuzzy_match(contents: list, key: str) -> Optional[str]: for content in contents: if content["name"] == key: return ( - content["word"]["value"] if content["word"]["value"] else None + content["word"]["value"] if content["word"]["value"] else "" ) candidates = [] @@ -593,11 +568,13 @@ def fuzzy_match(contents: list, key: str) -> Optional[str]: ) return ( - (result[0] if result[0] else None) + (result[0] if result[0] else "") if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 - else None + else "" ) # 返回>=80且最大的相似度的值 + return "" + def parse_item(item: str) -> Tuple[str, Optional[str]]: """ @@ -609,7 +586,7 @@ def parse_item(item: str) -> Tuple[str, Optional[str]]: r"^\*(?P.*?)\*(?P.*)$", item, ): - return match.group("category"), master_data.query_medicine( + return match.group("category"), masterdata.query_medicine( match.group("specific") ) # 一般增值税发票明细项格式形如*{category}*{specific},其中category为明细项类别,例如中成药;specific为明细项具体内容,例如[同仁堂]金贵肾气水蜜丸 300丸/瓶,需要据此查询药品。其它格式则将明细项内容作为明细项类别,药品为空值 @@ -617,7 +594,6 @@ def parse_item(item: str) -> Tuple[str, Optional[str]]: return item, None -# noinspection PyShadowingNames def receipt_recognize(image, insurer_company) -> None: """ 识别票据并整合至赔案档案 @@ -628,7 +604,7 @@ def receipt_recognize(image, insurer_company) -> None: # 初始化票据数据 receipt = {"image_index": image["image_index"]} # 请求深圳快瞳票据查验接口(兼容增值税发票、医疗门诊/住院收费票据) - response = http_client.post( + response = request.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll"), headers={ "X-RequestId-Header": image["image_guid"] @@ -641,11 +617,9 @@ def receipt_recognize(image, insurer_company) -> None: ) # 若查验状态为真票或红票则直接整合至赔案档案 if response.get("status") == 200 and response.get("code") == 10000: - # noinspection PyTypeChecker match response["data"]["productCode"]: # 增值税发票,目前深圳快瞳支持全电发票和全电纸质发票、区块链发票和增值税发票查验 case "003082": - # noinspection PyTypeChecker receipt.update( { "verification": ( @@ -700,7 +674,6 @@ def receipt_recognize(image, insurer_company) -> None: ) # 医疗门诊、住院收费票据 case "003081": - # noinspection PyTypeChecker receipt.update( { "verification": ( @@ -793,7 +766,7 @@ def receipt_recognize(image, insurer_company) -> None: match image["image_type"]: case "增值税发票": # 请求深圳快瞳增值税发票识别接口 - response = http_client.post( + response = request.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/vatInvoice"), headers={ "X-RequestId-Header": image["image_guid"] @@ -810,11 +783,10 @@ def receipt_recognize(image, insurer_company) -> None: ) # TODO: 若请求深圳快瞳增值税发票识别接口发生异常则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): - raise + raise RuntimeError("请求深圳快瞳增值税发票识别接口发生异常") match fuzzy_match(response["data"], "发票类型"): case "电子发票(普通发票)": - # noinspection PyTypeChecker receipt.update( { "number": fuzzy_match(response["data"], "发票号码"), @@ -891,7 +863,6 @@ def receipt_recognize(image, insurer_company) -> None: } ) case "增值税普通发票(卷票)": - # noinspection PyTypeChecker receipt.update( { "number": fuzzy_match(response["data"], "发票号码"), @@ -959,7 +930,7 @@ def receipt_recognize(image, insurer_company) -> None: ) case "医疗门诊收费票据" | "医疗住院收费票据": # 请求深圳快瞳医疗收费票据识别接口 - response = http_client.post( + response = request.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/medical"), headers={ "X-RequestId-Header": image["image_guid"] @@ -978,7 +949,6 @@ def receipt_recognize(image, insurer_company) -> None: if not (response.get("status") == 200 and response.get("code") == 0): raise - # noinspection PyTypeChecker receipt.update( { "number": ( @@ -1080,7 +1050,7 @@ def receipt_recognize(image, insurer_company) -> None: ) # 根据购药及就医机构查询购药及就医机构类型 - receipt["institution_type"] = master_data.query_institution_type( + receipt["institution_type"] = masterdata.query_institution_type( receipt["institution"] ) @@ -1110,7 +1080,7 @@ def receipt_recognize(image, insurer_company) -> None: .assign( reasonable_amount=lambda dataframe: dataframe.apply( lambda row: Decimal( - rule_engine.evaluate( + rules_engine.evaluate( decision="扣除明细项不合理费用", inputs={ "insurer_company": insurer_company, @@ -1174,7 +1144,6 @@ def receipt_recognize(image, insurer_company) -> None: dossier["receipts_layer"].append(receipt) -# noinspection PyShadowingNames def bank_card_recognize(image) -> None: """ 识别银行卡并整合至赔案档案 @@ -1182,7 +1151,7 @@ def bank_card_recognize(image) -> None: :return: 空 """ # 请求深圳快瞳银行卡识别接口 - response = http_client.post( + response = request.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/bankCard"), headers={ "X-RequestId-Header": image["image_guid"] @@ -1201,7 +1170,6 @@ def bank_card_recognize(image) -> None: == 1 # # 实际作业亦仅支持借记卡 ): raise RuntimeError("请求深圳快瞳银行卡识别接口发生异常或非借记卡") - # noinspection PyTypeChecker dossier["insured_person_layer"].update( { "phone_number": None, @@ -1212,7 +1180,6 @@ def bank_card_recognize(image) -> None: ) -# noinspection PyShadowingNames def image_recognize( image, insurer_company, @@ -1224,7 +1191,7 @@ def image_recognize( :return: 无 """ # 基于影像件识别使能规则评估影像件是否识别 - if not rule_engine.evaluate( + if not rules_engine.evaluate( decision="影像件识别使能", inputs={ "insurer_company": insurer_company, diff --git a/票据理赔自动化/main.py b/票据理赔自动化/main.py index 3e53463..1d1587d 100644 --- a/票据理赔自动化/main.py +++ b/票据理赔自动化/main.py @@ -1,21 +1,22 @@ # -*- coding: utf-8 -*- - """ 票据理赔自动化 功能清单 https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink """ + from datetime import datetime from pathlib import Path from typing import Any, Dict, List -import pandas from jinja2 import Environment, FileSystemLoader +import pandas from common import dossier, rule_engine from image import image_classify from image import image_recognize + if __name__ == "__main__": # 初始化工作目录路径 workplace_path = Path("directory") diff --git a/票据理赔自动化/masterdata.py b/票据理赔自动化/masterdata.py index 0fc557f..dfc64ed 100644 --- a/票据理赔自动化/masterdata.py +++ b/票据理赔自动化/masterdata.py @@ -1,27 +1,36 @@ # -*- coding: utf-8 -*- +""" +主数据模块 +""" from datetime import datetime from decimal import Decimal, ROUND_HALF_UP +from pathlib import Path +import sys from typing import Any, Dict, List, Optional -import sys -sys.path.append(".") -from utils import SQLite +sys.path.append(Path(__file__).parent.parent.as_posix()) +from utils.sqlite import SQLite class MasterData(SQLite): - """主数据""" + """ + 主数据,支持: + query_liabilities:根据保险分公司名称、被保险人姓名、证件类型、证件号码和报案时间查询被保险人的理赔责任 + query_institution_type:根据购药及就医机构名称查询购药及就医机构类型 + query_medicine:根据明细项中具体内容查询药品/医疗服务名称 + """ def __init__(self): """ - 初始化主数据 + 初始化 """ # 初始化SQLite客户端 super().__init__(database="database.db") try: with self: # 初始化团单表 - self._execute( + self.execute( sql=""" CREATE TABLE IF NOT EXISTS group_policies ( @@ -39,7 +48,7 @@ class MasterData(SQLite): """ ) # 初始化个单表 - self._execute( + self.execute( sql=""" CREATE TABLE IF NOT EXISTS person_policies ( @@ -56,8 +65,8 @@ class MasterData(SQLite): ) """ ) - # 初始化被保险人表,保司推送赔案时,一般无团单号,需先根据保险分公司名称、被保险人姓名、证件类型和证件号码查询被保人,再在票据理算时根据事故起期确定个单和相应责任 - self._execute( + # 初始化被保险人表,保司推送赔案时,一般无团单号,需先根据保险分公司名称、被保险人姓名、证件类型和证件号码查询被保人 + self.execute( sql=""" CREATE TABLE IF NOT EXISTS insured_persons ( @@ -78,7 +87,7 @@ class MasterData(SQLite): """ ) # 初始化责任表 - self._execute( + self.execute( sql=""" CREATE TABLE IF NOT EXISTS liabilities ( @@ -102,7 +111,7 @@ class MasterData(SQLite): """ ) # 初始化保额变动表 - self._execute( + self.execute( sql=""" CREATE TABLE IF NOT EXISTS coverage_changes ( @@ -124,7 +133,7 @@ class MasterData(SQLite): """ ) # 初始化购药及就医机构表 - self._execute( + self.execute( sql=""" CREATE TABLE IF NOT EXISTS institutions ( @@ -140,7 +149,7 @@ class MasterData(SQLite): """ ) # 初始化药品表 - self._execute( + self.execute( sql=""" CREATE TABLE IF NOT EXISTS medicines ( @@ -150,9 +159,8 @@ class MasterData(SQLite): """ ) except Exception as exception: - raise RuntimeError(f"初始化数据库发生异常:{str(exception)}") + raise RuntimeError(f"初始化数据库发生异常:{str(exception)}") from exception - # noinspection PyShadowingNames def query_liabilities( self, insurer_company: str, @@ -162,34 +170,30 @@ class MasterData(SQLite): report_date: str, ) -> Optional[List[Dict[str, Any]]]: """ - 根据保险分公司名称、被保险人姓名、证件类型、证件号码和出险时间查询个单和责任数据 + 根据保险分公司名称、被保险人姓名、证件类型、证件号码和报案时间查询被保险人的理赔责任 :param insurer_company: 保险分公司名称 :param insured_person: 被保险人姓名 :param identity_type: 证件类型 :param identity_number: 证件号码 :param report_date: 报案时间 - :return: 个单和责任数据 + :return: 被保险人的责任数据 """ - # noinspection PyBroadException try: with self: - # noinspection SqlResolve - results = self._query_all( + result = self.query_all( sql=""" SELECT group_policies.group_policy, group_policies.insurer_company, person_policies.person_policy, - person_policy_coverage_changes.after_change_amount AS remaining_amount, - master_insured_persons.insured_person AS master_insured_person, insured_persons.insured_person, insured_persons.identity_type, insured_persons.identity_number, insured_persons.relationship, + master_insured_persons.insured_person AS master_insured_person, MAX(group_policies.commencement_date, - person_policies.commencement_date) AS commencement_date, + person_policies.commencement_date) AS commencement_date, MIN(group_policies.termination_date, - person_policies.termination_date) AS termination_date, - person_policies.guid AS person_policy_guid, + person_policies.termination_date) AS termination_date, liabilities.liability, liabilities.accident, liabilities.personal_self_ratio, @@ -197,22 +201,15 @@ class MasterData(SQLite): liabilities.reasonable_ratio, liabilities.adjust_policy_guid FROM insured_persons + INNER JOIN person_policies + ON insured_persons.person_policy_guid = person_policies.guid INNER JOIN insured_persons master_insured_persons ON person_policies.guid = master_insured_persons.person_policy_guid AND master_insured_persons.relationship = "本人" - INNER JOIN person_policies - ON insured_persons.person_policy_guid = person_policies.guid INNER JOIN group_policies ON person_policies.group_policy_guid = group_policies.guid INNER JOIN liabilities ON person_policies.guid = liabilities.person_policy_guid - INNER JOIN coverage_changes person_policy_coverage_changes - ON person_policies.guid = - person_policy_coverage_changes.change_policy_guid - AND - person_policy_coverage_changes.change_time = (SELECT MAX(change_time) - FROM coverage_changes - WHERE change_policy_guid = person_policies.guid) INNER JOIN coverage_changes ON liabilities.adjust_policy_guid = coverage_changes.change_policy_guid AND coverage_changes.change_time = (SELECT MAX(change_time) @@ -235,81 +232,39 @@ class MasterData(SQLite): report_date, ), ) - if results: - # 就个人自费比例、个人自付比例和合理比例转为小数(decimal对象),保险起期、止期则转为日期时间(datetime对象) - results = [ - { - k: ( - Decimal(v).quantize( + # TODO: 若查无数据则流转至人工处理 + if not result: + raise RuntimeError("查无数据") + + # 就个人自费比例、个人自付比例和合理比例转为小数(decimal对象),保险起期、止期则转为日期时间(datetime对象) + for index, record in enumerate(result): + for key, value in record.items(): + match key: + case _ if key in [ + "personal_self_ratio", + "non_medical_ratio", + "reasonable_ratio", + ]: + result[index][key] = Decimal(value).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) - if k - in [ - "remaining_amount", - "personal_self_ratio", - "non_medical_ratio", - "reasonable_ratio", - ] - else ( - datetime.strptime(v, "%Y-%m-%d") - if k in ["commencement_date", "termination_date"] - else v - ) - ) - for k, v in e.items() - } - for e in results - ] + case _ if key in ["commencement_date", "termination_date"]: + result[index][key] = datetime.strptime(value, "%Y-%m-%d") + return result - person_policies = {} # 重构数据结构 - for result in results: - liabilities = { - "liability": result["liability"], - "accident": result["accident"], - "personal_self_ratio": result["personal_self_ratio"], - "non_medical_ratio": result["non_medical_ratio"], - "reasonable_ratio": result["reasonable_ratio"], - "adjust_policy_guid": result["adjust_policy_guid"], - } - person_policy_guid = result["person_policy_guid"] - if person_policy_guid in person_policies: - person_policies[person_policy_guid]["liabilities"].append( - liabilities - ) - else: - person_policies[person_policy_guid] = { - "group_policy": result["group_policy"], - "insurer_company": result["insurer_company"], - "person_policy": result["person_policy"], - "master_insured_person": result["master_insured_person"], - "insured_person": result["insured_person"], - "identity_type": result["identity_type"], - "identity_number": result["identity_number"], - "relationship": result["relationship"], - "commencement_date": result["commencement_date"], - "termination_date": result["termination_date"], - "remaining_amount": result["remaining_amount"], - "liabilities": [liabilities], - } - return [v for k, v in person_policies.items()] - raise RuntimeError("查无数据") - # TODO: 若根据保险分公司名称、被保险人姓名、证件类型、证件号码和出险时间查询被保险人发生异常则流转至主数据人工处理 except Exception as exception: - raise RuntimeError(f"{str(exception)}") + raise RuntimeError(f"{str(exception)}") from exception - # noinspection PyShadowingNames def query_institution_type(self, institution: str) -> Optional[str]: """ - 根据购药及就医机构查询购药及就医机构类型 - :param institution: 购药及就医机构 + 根据购药及就医机构名称查询购药及就医机构类型 + :param institution: 购药及就医机构名称 :return: 购药及就医机构类型 """ - # noinspection PyBroadException try: with self: - # noinspection SqlResolve - result = self._query_one( + result = self.query_one( sql=""" SELECT institution_type FROM institutions @@ -317,29 +272,28 @@ class MasterData(SQLite): """, parameters=(institution,), ) - if result: - return result["institution_type"] - raise - # TODO: 若根据购药及就医机构查询购药及就医机构类型发生异常则流转至主数据人工处理 - except Exception: - raise + # TODO: 若查无数据则流转至人工处理 + if not result: + raise RuntimeError("查无数据") + + return result["institution_type"] + + except Exception as exception: + raise RuntimeError(f"{str(exception)}") from exception - # noinspection PyShadowingNames def query_medicine( self, content: str, ) -> Optional[str]: """ - 根据明细项中具体内容查询药品/医疗服务 + 根据明细项中具体内容查询药品/医疗服务名称 :param content: 明细项具体内容 - :return: 药品/医疗服务 + :return: 药品/医疗服务名称 """ - # TODO: 暂仅支持查询药品、通过药品/医疗服务包含明细项中具体内容查询 - # noinspection PyBroadException + # TODO: 后续提供医疗耗材和服务查询 try: with self: - # noinspection SqlResolve - result = self._query_all( + result = self.query_all( sql=""" SELECT medicine FROM medicines @@ -347,11 +301,13 @@ class MasterData(SQLite): """, parameters=(content,), ) - if result: - return max(result, key=lambda x: len(x["medicine"]))[ - "medicine" - ] # 返回最大长度的药品/医疗服务 - raise - # TODO: 若根据明细项中具体内容查询药品/医疗服务发生异常则流转至主数据人工处理 - except Exception: - raise + # TODO: 若查无数据则流转至人工处理 + if not result: + raise RuntimeError("查无数据") + + return max(result, key=lambda x: len(x["medicine"]))[ + "medicine" + ] # 返回最大长度的药品/医疗服务 + + except Exception as exception: + raise RuntimeError(f"{str(exception)}") from exception