diff --git a/票据理赔自动化/SQLite_old.db b/utils/SQLite.db similarity index 51% rename from 票据理赔自动化/SQLite_old.db rename to utils/SQLite.db index 006b8ea..d2ef0d0 100644 Binary files a/票据理赔自动化/SQLite_old.db and b/utils/SQLite.db differ diff --git a/utils/certifications.json b/utils/certifications.json index 4158d3d..f8ea234 100644 --- a/utils/certifications.json +++ b/utils/certifications.json @@ -1 +1 @@ -{"szkt": {"token": "a62c56ace614a6546191d5af8ca8b1513cfaeaea7ce67d0a37de994ab6c2aa4e2a0b058e0da575ff376dd51dc19c5ad353ab2761cb6d9db4d521b83adeee2979b78f7ae70765b26985165b6266d084b75f2f918008966e72a116d8bca5ec4c7cecc5223f78fa47b4d40aa9cf5277a11b0b967ad06e84ef7c4acbc53ccdef936c062b2d037ae0dad8c29d50426b668ec349cc8c0099a0270e16f97d31e4f058bc086334468f88d934c7fd1464ed3800833d2f486dc06f0689b99abbb78a8ebf4a3877bd82d0dd765dc09b7a1594fa8849d51f59282a81048c52e82e8320d1ad042a6c307ca831647cba4356564704780f", "expired_timestamp": 1759201579.393386}} \ No newline at end of file +{"szkt": {"token": "a62c56ace614a6546191d5af8ca8b1513cfaeaea7ce67d0a37de994ab6c2aa4e2a0b058e0da575ff376dd51dc19c5ad353ab2761cb6d9db4d521b83adeee2979b78f7ae70765b26985165b6266d084b75f2f918008966e72a116d8bca5ec4c7cecc5223f78fa47b4d40aa9cf5277a11b0b967ad06e84ef7c4acbc53ccdef936c062b2d037ae0dad8c29d50426b668ec349cc8c0099a0270e16f97d31e4f058bc086334468f88d934c7fd1464ed3800833d2f486dc06f0689b99abbb78a8ebf4a3877bd82d0dd765dc09b7a1594fa8849d51f59282a81048c52e82e8320d1ad042a6c307ca831647cba4356564704780f", "expired_timestamp": 1859201579.393386}} \ No newline at end of file diff --git a/utils/client.py b/utils/client.py index 7521acb..4f01b06 100644 --- a/utils/client.py +++ b/utils/client.py @@ -103,7 +103,7 @@ class CacheClient: try: # 创建缓存数据库连接 self.connection = sqlite3.connect( - database=database, + database=(Path(__file__).parent.resolve() / database), # 当前目录下创建缓存数据库 check_same_thread=False, timeout=30, # 缓存数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 ) @@ -164,7 +164,6 @@ class CacheClient: (guid, time.time() - self.cache_ttl * 86400), ) if result := cursor.fetchone(): - print() return json.loads(result[0]) return None # 若发生异常则回滚事务并返回None @@ -218,19 +217,10 @@ class CacheClient: pass -""" -封装urllib.request的相关操作 -使用方法: -client = HTTPClient() -response = client.post(url) -""" - - class TokenBucket: def __init__(self, refill_rate, max_tokens): """令牌桶,基于令牌桶算法限制请求频率""" - # 填充令牌速率(个/秒) self.refill_rate = refill_rate # 令牌桶最大令牌数 @@ -243,18 +233,15 @@ class TokenBucket: # 获取令牌 # noinspection PyMissingReturnStatement def acquire(self) -> tuple[bool, float]: - with threading.Lock(): # 本次填充令牌时间戳 refill_timestamp = time.monotonic() - # 重新计算令牌桶中令牌数 self.tokens = min( self.max_tokens, self.tokens + self.refill_rate * (refill_timestamp - self.refill_timestamp), ) - self.refill_timestamp = refill_timestamp # 若令牌桶当前令牌数大于等于1则减少令牌 @@ -323,7 +310,6 @@ class Arguments(BaseModel): :param stream: 是否启用流式传输 :param guid: 全局唯一标识 """ - # 统一资源定位符 url: HttpUrl = Field(default=...) # 查询参数 @@ -365,43 +351,38 @@ class Arguments(BaseModel): return self -# HTTP客户端 class HTTPClient: + """请求客户端""" def __init__( self, - timeout: int = 60, default_headers: Optional[Dict[str, str]] = None, total: int = 3, backoff_factor: float = 0.5, + timeout: int = 60, cache_enabled: bool = False, cache_ttl: int = 360, ): """ - :param timeout: 超时时间,单位为秒 :param default_headers: 默认请求头 :param total: 最大重试次数 :param backoff_factor: 重试间隔退避因子 - :param cache_enabled: 是否使用缓存 + :param timeout: 超时时间,单位为秒 + :param cache_enabled: 使用缓存 :param cache_ttl: 缓存生存时间,单位为天 """ - - # 超时时间 - self.timeout = timeout - # 创建HTTP会话并挂载适配器 + # 创建请求会话并挂载适配器 self.session = self._create_session( default_headers=default_headers, total=total, backoff_factor=backoff_factor ) - - # 是否使用缓存 + # 初始化超时时间 + self.timeout = timeout + # 初始化使用缓存 self.cache_enabled = cache_enabled - # 缓存生存时间 - self.cache_ttl = cache_ttl - # 若使用缓存,则初始化缓存数据库 - if self.cache_enabled: - self._initialize_cache_database() + # 初始化缓存生存时间,单位为秒 + self.cache_ttl = cache_ttl * 24 * 60 * 60 - # 创建HTTP会话并挂载适配器 + # 创建请求会话并挂载适配器 @staticmethod def _create_session( total: int, @@ -409,11 +390,11 @@ class HTTPClient: default_headers: Optional[Dict[str, str]] = None, ) -> Session: """ - :param default_headers 默认请求头 - :param total 最大重试次数 - :param backoff_factor 重试间隔退避因子 + :param default_headers: 默认请求头 + :param total: 最大重试次数 + :param backoff_factor: 重试间隔退避因子 + :return Session: 会话对象 """ - # 创建会话对象 session = Session() @@ -429,7 +410,7 @@ class HTTPClient: 502, 503, 504, - ], # 408:请求超时,502:网关错误,503:服务不可用,504:网关超时 + ], # 408为请求超时,502为网关错误,503为服务不可用,504为网关超时 total=total, respect_retry_after_header=True, backoff_factor=backoff_factor, @@ -444,94 +425,21 @@ class HTTPClient: return session - def _initialize_cache_database(self): - """初始化缓存数据库""" - - # 创建缓存数据库连接(使用SQLite) - self.cache_connection = sqlite3.connect( - database="SQLite.db", check_same_thread=False - ) - - self.cache_connection.execute( - """CREATE TABLE IF NOT EXISTS caches (guid TEXT PRIMARY KEY, response TEXT, timestamp REAL)""" - ) - # 创建时间戳索引 - self.cache_connection.execute( - """CREATE INDEX IF NOT EXISTS index_timestamp ON caches(timestamp)""" - ) - # 删除过期缓存 - self.cache_connection.execute( - "DELETE FROM caches WHERE timestamp < ?", - (time.time() - self.cache_ttl * 86400,), # 缓存生存时间单位转为秒 - ) - # 提交事物 - self.cache_connection.commit() - - # 在缓存数据库查询响应 - def _query_response(self, guid: str) -> Optional[Dict]: - - with threading.Lock(): - cursor = None - try: - # 创建游标 - cursor = self.cache_connection.cursor() - # 根据请求唯一标识查询响应 - cursor.execute( - "SELECT response FROM caches WHERE guid = ? AND timestamp >= ?", - (guid, time.time() - self.cache_ttl * 86400), - ) - if result := cursor.fetchone(): - return json.loads(result[0]) - return None - # 若发生异常则返回NONE - except: - self.cache_connection.rollback() - return None - finally: - if cursor: - cursor.close() - - # 将响应保存至缓存数据库 - def _save_response(self, guid: str, response: Dict): - - with threading.Lock(): - cursor = None - try: - # 创建游标 - cursor = self.cache_connection.cursor() - # 新增或覆盖响应 - cursor.execute( - "INSERT OR REPLACE INTO caches (guid, response, timestamp) VALUES (?, ?, ?)", - (guid, json.dumps(response, ensure_ascii=False), time.time()), - ) - # 提交事物 - self.cache_connection.commit() - # 若发生异常则返回NONE - except: - self.cache_connection.rollback() - finally: - if cursor: - cursor.close() - # GET请求 def get(self, **kwargs) -> Union[Dict, str]: - return self._request(method="GET", arguments=Arguments(**kwargs)) # POST请求 def post(self, **kwargs) -> Union[Dict, str]: - return self._request(method="POST", arguments=Arguments(**kwargs)) - # 文件下载 + # DOWNLOAD请求 def download( self, stream=False, chunk_size=1024, **kwargs ) -> Union[Dict, str, Generator[bytes, None, None]]: - response = self._request( method="GET", arguments=Arguments(**{"stream": stream, **kwargs}) ) - # 若禁用流式传输,则返回响应 if not stream: return response @@ -539,100 +447,91 @@ class HTTPClient: return self._process_stream_response(response=response, chunk_size=chunk_size) def _request(self, method: Literal["GET", "POST"], arguments: Arguments) -> Any: - """发送请求""" - + """请求""" # 请求参数模型 arguments = arguments.model_dump(exclude_none=True, by_alias=True) - - # URL转为字符串 + # URL对象转为字符串 arguments["url"] = str(arguments["url"]) # 重构表单数据 if arguments.get("data") is not None: arguments["data"] = { - key: value - for key, value in arguments["data"].items() - if value is not None + k: v + for k, v in arguments["data"].items() + if v is not None } - # 重构JSON格式数据 + # 重构JSON数据 if arguments.get("json_data") is not None: arguments["json_data"] = { - key: value - for key, value in arguments["json_data"].items() - if value is not None + k: v + for k, v in arguments["json_data"].items() + if v is not None } # 重构文件数据 if arguments.get("files") is not None: - files_valid = {} - # 遍历文件数据键值对 - for key, value in arguments["files"].items(): - if isinstance(value, (tuple, list)): - match len(value): - # 若文件数据包括文件名称和文件内容 + files = {} + for k, v in arguments["files"].items(): + if isinstance(v, (tuple, list)): + match len(v): + # 文件数据包括文件名称和文件内容 case 2: - files_valid[key] = (value[0], value[1], None, None) - # 若文件数据包含文件名称、文件内容和内容类型 + files[k] = (v[0], v[1], None, None) + # 文件数据包含文件名称、文件内容和内容类型 case 3: - files_valid[key] = (value[0], value[1], value[2], None) - # 若文件数据包含文件名称、文件内容、内容类型和请求头 + files[k] = (v[0], v[1], v[2], None) + # 文件数据包含文件名称、文件内容、内容类型和请求头 case 4: - files_valid[key] = (value[0], value[1], value[2], value[3]) - arguments.update({"files": files_valid}) + files[k] = (v[0], v[1], v[2], v[3]) + arguments.update({"files": files}) # 全局唯一标识 guid = arguments.pop("guid", None) - - # 若使用缓存且本次请求参数包含全局唯一标识,则优先返回缓存数据库中响应 + # 若使用缓存且全局唯一标识非空则查询缓存 if self.cache_enabled and guid is not None: - # 在缓存数据库查询响应 - response = self._query_response(guid=guid) - # 若缓存响应非空则返回 - if response is not None: - return response + with CacheClient(cache_ttl=self.cache_ttl) as cache_client: + cache = cache_client.query(guid) + # 若缓存非空则返回 + if cache is not None: + return cache try: # 发送请求 response = self.session.request( method=method, timeout=self.timeout, **arguments ) - # 若返回错误状态码则抛出异常 response.raise_for_status() # 处理响应 response = self._process_response(response=response) - # 若请求全局唯一标识非NONE则响应保存至缓存数据库 - # noinspection PyUnboundLocalVariable - if guid is not None: - # noinspection PyUnboundLocalVariable - self._save_response(guid=guid, response=response) + # 若使用缓存且全局唯一标识非空则更新缓存 + if self.cache_enabled and guid is not None: + with CacheClient(cache_ttl=self.cache_ttl) as cache_client: + cache_client.update(guid, response) return response except Exception as exception: - # 尝试根据响应解析响应状态码和错误信息,否则进行构造 + # 尝试根据响应解析错误状态码和错误信息,否则进行构造 + # noinspection PyBroadException try: - # JSON反序列化 - # noinspection PyUnboundLocalVariable + # 响应反序列化 response_decoded = response.json() - # 响应状态码 + # 错误状态码 status = response_decoded["status"] # 错误信息 message = response_decoded["message"] except: status = getattr(getattr(exception, "response", None), "status", None) - url = arguments["url"] - message = str(exception).split("\n")[0] - # 重新构建错误信息 - message = f"{method} {url} failed: {message}" + # 重构错误信息 + message = f"{method} {arguments["url"]} failed: {str(exception).split("\n")[0]}" raise RequestException(status=status, message=message) # 处理响应 @staticmethod def _process_response(response: Response) -> Any: - # 响应内容 content = response.content # 若响应内容为空则返回NONE @@ -643,12 +542,11 @@ class HTTPClient: content_type = ( response.headers.get("Content-Type", "").split(";")[0].strip().lower() ) - # 根据内容类型匹配解析返回内容方法 # noinspection PyUnreachableCode match content_type: case "application/json" | "text/json": - # JSON反序列化 + # 响应反序列化 return response.json() case "application/xml" | "text/xml": # 解析为XML(ELEMENT对象) @@ -667,7 +565,6 @@ class HTTPClient: def _process_stream_response( response: Response, chunk_size: int ) -> Generator[bytes, None, None]: # 生成器不接受发SEND发送至、结束时返回NONE - # 检查数据分块 if not isinstance(chunk_size, int) and isinstance(chunk_size, bool): raise ValueError("chunk_size must type=int") @@ -689,14 +586,12 @@ class Authenticator: self, ): """认证器(用于获取访问令牌)""" - # 初始化 self._initialize() def _initialize(self): - """初始化访问凭证""" - - # 创建访问凭证地址对象 + """初始化""" + # 初始化访问凭证地址对象 self.certifications_path = ( Path(__file__).parent.resolve() / "certifications.json" ) @@ -709,40 +604,34 @@ class Authenticator: ensure_ascii=False, ) - # 初始化HTTP客户端 + # 初始化请求客户端 self.http_client = HTTPClient() def _szkt_get_certification(self) -> tuple[str, float]: """获取深圳快瞳访问凭证""" - - # 请求深圳快瞳访问凭证获取接口 response = self.http_client.get( url="https://ai.inspirvision.cn/s/api/getAccessToken?accessKey=APPID_6Gf78H59D3O2Q81u&accessSecret=947b8829d4d5d55890b304d322ac2d0d" ) - # 若响应非成功则抛出异常 + # 若非响应成功则抛出异常 if not (response["status"] == 200 and response["code"] == 0): raise RuntimeError("获取深圳快瞳访问凭证发生异常") - # 返回令牌、失效时间戳 + # 返回访问令牌、失效时间戳 # noinspection PyTypeChecker return ( response["data"]["access_token"], time.time() + response["data"]["expires_in"], ) - def _hlyj_get_certification(self) -> tuple[str, float]: + def _hlyj_get_certification(self) -> Tuple[str, float]: """获取合力亿捷访问凭证""" - # 企业访问标识 access_key_id = "25938f1c190448829dbdb5d344231e42" - # 签名秘钥 secret_access_key = "44dc0299aff84d68ae27712f8784f173" - # 时间戳(秒级) timestamp = int(time.time()) - # 签名,企业访问标识、签名秘钥和时间戳拼接后计算的十六进制的HMAC-SHA256 signature = hmac.new( secret_access_key.encode("utf-8"), @@ -750,26 +639,23 @@ class Authenticator: hashlib.sha256, ).hexdigest() - # 请求合力亿捷访问凭证获取接口 response = self.http_client.get( url=f"https://kms.7x24cc.com/api/v1/corp/auth/token?access_key_id={access_key_id}×tamp={timestamp}&signature={signature}" ) - # 若响应非成功则抛出异常 + # 若非响应成功则抛出异常 if not response["success"]: raise RuntimeError("获取合力亿捷访问凭证发生异常") - # 返回令牌、失效时间戳 + # 返回访问令牌、失效时间戳 # noinspection PyTypeChecker return ( response["data"], - time.time() + 3600, # 访问令牌有效期为1小时 + time.time() + 1 * 60 * 60, # 访问令牌有效期为1小时 ) def _feishu_get_certification(self) -> tuple[str, float]: """获取飞书访问凭证""" - - # 请求飞书访问凭证获取接口 response = self.http_client.post( url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", data={ @@ -778,11 +664,11 @@ class Authenticator: }, ) - # 若响应非成功则抛出异常 + # 若非响应成功则抛出异常 if not response["code"] == 0: raise RuntimeError("获取飞书访问凭证发生异常") - # 返回令牌、失效时间戳 + # 返回访问令牌、失效时间戳 # noinspection PyTypeChecker return ( response["tenant_access_token"], @@ -790,28 +676,28 @@ class Authenticator: ) def get_token(self, servicer: str) -> str | None: - """获取访问令牌""" """ - :param servicer: 服务商,数据类型为字符串 + 获取访问令牌 + :param servicer: 服务商,暂仅支持深圳快瞳、合力亿捷和飞书 + :return token: 访问令牌 """ - with threading.Lock(): - # 初始化令牌和失效时间戳 + # 初始化访问令牌和失效时间戳 token, expired_timestamp = None, 0 try: with open(self.certifications_path, "r", encoding="utf-8") as file: - # 读取所有服务商访问凭证 + # 本地打开并读取所有服务商的访问凭证 certifications = json.load(file) - # 获取服务商访问凭证 + # 获取指定服务商的访问凭证 certification = certifications.get(servicer, None) - # 若服务商访问凭证非NONE则解析令牌和失效时间戳 + # 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳 if certification is not None: - # 解析服务商访问令牌 + # 访问令牌 token = certification["token"] - # 解析服务商访问令牌失效时间戳 + # 失效时间戳 expired_timestamp = certification["expired_timestamp"] - # 若JSON反序列化时发生异常则重置访问凭证 + # 若反序列化发生异常则重置访问凭证储存文件 except json.decoder.JSONDecodeError: with open(self.certifications_path, "w", encoding="utf-8") as file: json.dump( @@ -823,21 +709,17 @@ class Authenticator: except Exception: raise RuntimeError("获取访问令牌发生异常") - # 若当前时间戳大于失效时间戳,则请求服务商获取访问凭证接口 if time.time() > expired_timestamp: # noinspection PyUnreachableCode match servicer: - # 获取深圳快瞳访问凭证 case "szkt": token, expired_timestamp = self._szkt_get_certification() - case "feishu": - token, expired_timestamp = self._feishu_get_certification() - # 获取合力亿捷访问凭证 case "hlyj": token, expired_timestamp = self._hlyj_get_certification() + case "feishu": + token, expired_timestamp = self._feishu_get_certification() case _: - raise RuntimeError(f"未设置服务商:({servicer})") - + raise RuntimeError(f"未设置服务商:{servicer}获取访问凭证方法") # 更新服务商访问凭证 certifications[servicer] = { "token": token, @@ -855,22 +737,15 @@ class Authenticator: return token -""" -封装飞书客户端,实现获取验证码、操作多维表格等 -""" - - -class FeishuClinet: +class FeishuClient: + """飞书客户端""" def __init__(self): - self.authenticator = Authenticator() - self.http_client = HTTPClient() def _headers(self): """请求头""" - # 装配飞书访问凭证 return { "Authorization": f"Bearer {self.authenticator.get_token(servicer='feishu')}", @@ -878,67 +753,51 @@ class FeishuClinet: @staticmethod def get_verification_code(): - try: - # 执行时间戳 execute_timestamp = time.time() - # 超时时间戳 timeout_timestamp = execute_timestamp + 65 - # 建立加密IMAP连接 server = IMAP4_SSL("imap.feishu.cn", 993) - # 登录 server.login("mars@liubiren.cloud", "a2SfPUgbKDmrjPV2") while True: - # 若当前时间戳大于超时时间戳则返回NONE if time.time() <= timeout_timestamp: - # 等待10秒 time.sleep(10) - # 选择文件夹(邮箱验证码) server.select("&kK57sZqMi8F4AQ-") - + # noinspection PyBroadException try: - # 获取最后一封邮件索引,server.search()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件索引字节串的列表);然后,从列表获取字节串并分割取最后一个,作为最后一封邮件索引 index = server.search(None, "ALL")[1][0].split()[-1] - # 获取最后一封邮件内容并解析,server.fetch()返回数据类型为元组,第一个元素为查询状态,第二个元素为查询结果(邮件内容字节串的列表);然后,从列表获取字节串并解析正文 # noinspection PyUnresolvedReferences contents = BytesParser(policy=default).parsebytes( server.fetch(index, "(RFC822)")[1][0][1] ) - # 遍历邮件内容,若正文内容类型为纯文本或HTML则解析发送时间和验证码 for content in contents.walk(): - if ( content.get_content_type() == "text/plain" or content.get_content_type() == "text/html" ): - # 邮件发送时间戳 # noinspection PyUnresolvedReferences send_timestamp = parsedate_to_datetime( content["Date"] ).timestamp() - # 若邮件发送时间戳大于执行时间戳则解析验证码并返回 if ( execute_timestamp > send_timestamp >= execute_timestamp - 35 ): - # 登出 server.logout() - # 解析验证码 return re.search( r"【普康健康】您的验证码是:(\d+)", @@ -947,18 +806,14 @@ class FeishuClinet: # 若文件夹无邮件则继续 except: - pass # 若超时则登出 else: - server.logout() - return None except Exception: - raise RuntimeError("获取邮箱验证码发生其它异常") # 查询多维表格记录,单次最多查询500条记录 @@ -970,9 +825,7 @@ class FeishuClinet: field_names: Optional[list[str]] = None, filter_conditions: Optional[dict] = None, ) -> pandas.DataFrame: - # 先查询多维表格记录,在根据字段解析记录 - # 装配多维表格查询记录地址 url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{bitable}/tables/{table_id}/records/search?page_size=20" diff --git a/票据理赔自动化/SQLite.db b/票据理赔自动化/SQLite.db index 5b06124..e69de29 100644 Binary files a/票据理赔自动化/SQLite.db and b/票据理赔自动化/SQLite.db differ diff --git a/票据理赔自动化/main.py b/票据理赔自动化/main.py index 7f8e3a3..5e6ae00 100644 --- a/票据理赔自动化/main.py +++ b/票据理赔自动化/main.py @@ -27,175 +27,6 @@ from utils.client import Authenticator, HTTPClient, CacheClient # from utils.ocr import fuzzy_match -# ------------------------- -# 封装方法 -# ------------------------- - - -# noinspection PyShadowingNames -def image_read( - image_path: Path, -) -> Tuple[numpy.ndarray | None, str | None, bytes | None]: - """ - 本地打开并读取影像件 - :param image_path: 影像件路径对象 - :return: 影像件数组、影像件格式和影像件字节流 - """ - # noinspection PyBroadException - try: - # 影像件打开并读取(默认转为单通道灰度图) - image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE) - if image_ndarray is None: - raise RuntimeError("影像件打开并读取发生异常") - - # 影像件格式 - image_format = image_path.suffix.lower() - - # 按照影像件格式将影像件数组编码 - success, image_ndarray_encoded = cv2.imencode(image_format, image_ndarray) - if not success or image_ndarray_encoded is None: - raise RuntimeError("编码为图像字节数组发生异常") - # 将编码后图像数组转为字节流 - image_bytes = image_ndarray_encoded.tobytes() - - return image_ndarray, image_format, image_bytes - except Exception: - return None, None, None - - -# noinspection PyShadowingNames -def images_compress( - image_ndarray, image_format, image_bytes, image_size_specified=2 -) -> Optional[str]: - """ - 影像件压缩 - :param image_ndarray: 影像件数组 - :param image_format: 影像件格式 - :param image_bytes: 影像件字节流 - :param image_size_specified: 指定影像件大小,单位为兆字节(MB) - :return: 压缩后影像件BASE64编码 - """ - # 将指定影像件大小单位由兆字节转为字节 - image_size_specified = image_size_specified * 1024 * 1024 - - # 影像件BASE64编码 - image_base64 = b64encode(image_bytes).decode("utf-8") - if len(image_base64) <= image_size_specified: - return image_base64 - - # 通过调整影像件质量和尺寸达到压缩影像件目的 - # 外循环压缩:通过调整影像件质量实现压缩影像件大小 - for quality in range(90, 50, -10): - image_ndarray_copy = image_ndarray.copy() - # 内循环压缩:通过调整影像件尺寸实现压缩影像件大小 - for i in range(25): - # 按照影像件格式和影像件质量将影像件数组编码 - success, image_ndarray_encoded = cv2.imencode( - image_format, - image_ndarray_copy, - params=( - [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10] - if image_format == "png" - else [cv2.IMWRITE_JPEG_QUALITY, quality] - ), - ) - if not success or image_ndarray_encoded is None: - break - - # 影像件BASE64编码 - image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode("utf-8") - if len(image_base64) <= image_size_specified: - return image_base64 - - # 调整影像件尺寸 - image_ndarray_copy = cv2.resize( - image_ndarray_copy, - ( - int(image_ndarray_copy.shape[0] * 0.9), - int(image_ndarray_copy.shape[1] * 0.9), - ), - interpolation=cv2.INTER_AREA, - ) - # 若调整后影像件尺寸中长或宽小于350像素则停止调整影像件尺寸 - if min(image_ndarray_copy.shape[:2]) < 350: - break - - return None - - -def images_classify( - image, image_ndarray, image_format, image_bytes -) -> tuple[str | None, str | None]: - """ - 影像件分类并旋正 - :param image: 影像件数据 - :param image_ndarray: 影像件数组 - :param image_format: 影像件格式 - :param image_bytes: 影像件字节流 - :return: 压缩后影像件BASE64编码 - """ - # 影像件唯一标识 - image_uuid = image["影像件唯一标识"] - with CacheClient() as client: - # 根据作业环节和影像件唯一标识生成缓存唯一标识 - cache_guid = md5(("初审" + image_uuid).encode("utf-8")).hexdigest().upper() - cache = client.query(cache_guid) - - if cache is not None: - # 影像件类型 - image_type = cache["image_type"] - # 影像件方向 - image_o - - # 请求深圳快瞳影像件分类接口 - response = globals()["http_client"].post( - url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"), - # 用于和深圳快瞳联查定位 - headers={"X-RequestId-Header": image_guid}, - data={ - "token": globals()["authenticator"].get_token( - servicer="szkt" - ), # 使用全局变量 - "imgBase64": f"data:image/{image_format};base64,{image_base64}", # 深圳快瞳要求修饰影像件BASE64编码的DATAURI - }, - guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(), - ) - - # 若响应非成功,则返回NONE - if not (response.get("status") == 200 and response.get("code") == 0): - return None, None - - # 根据票据类型和票据二级分类映射影像件类型 - match (response["data"]["flag"], response["data"]["type"]): - case (7, "idcard-front-back"): - image_type = "居民身份证(正背面)" - case (7, "idcard-front"): - image_type = "居民身份证(正面)" # 包含国徽一面 - case (7, "idcard-back"): - image_type = "居民身份证(背面)" # 包含头像一面 - case (8, _): - image_type = "银行卡" - case (4, _): - image_type = "增值税发票" - case (5, _): - image_type = "门诊收费票据" - case (3, _): - image_type = "住院收费票据" - case (18, _): - image_type = "理赔申请书" - case _: - return None, None - - # 影像件方向 - image_orientation = { - "0": "0度", - "90": "顺时针90度", - "180": "180度", - "270": "逆时针90度", - }.get(response["data"]["angle"], "0度") - - return image_type, image_orientation - def idcard_extraction(**kwargs) -> dict | None: """居民身份证数据提取""" @@ -1140,29 +971,25 @@ def disease_diagnosis(**kwargs) -> str | None: # ------------------------- -# 主逻辑部分 +# 主逻辑 # ------------------------- - - if __name__ == "__main__": - # 初始化HTTP客户端 - http_client = HTTPClient(timeout=300, cache_enabled=True) - - # 初始化认证器 + # 实例认证器 authenticator = Authenticator() + # 实例请求客户端 + http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存 + + # 初始化影像件识别规则引擎 + recognize_decision = decision(Path("rules/影像件是否需要数据提取.json")) # 初始化工作目录地址对象 directory_path = Path("directory") # 若不存在则创建 directory_path.mkdir(parents=True, exist_ok=True) - # 初始化影像件识别规则引擎 - recognize_decision = decision(Path("rules/影像件是否需要数据提取.json")) - # 初始化JINJA2环境 environment = Environment(loader=FileSystemLoader(".")) - # 添加DATE过滤器 environment.filters["date"] = lambda date: ( date.strftime("%Y-%m-%d") if date else "长期" @@ -1170,21 +997,212 @@ if __name__ == "__main__": # 加载赔案档案模版 template = environment.get_template("template.html") - # 遍历工作目录中赔案目录,根据赔案创建赔案档案(模拟自动化域就待自动化任务创建理赔档案) + + # ------------------------- + # 自定义方法 + # ------------------------- + # noinspection PyShadowingNames + def image_read( + image_path: Path, + ) -> Optional[numpy.ndarray | None]: + """ + 本地打开并读取影像件 + :param image_path: 影像件路径对象 + :return: 影像件数组 + """ + # noinspection PyBroadException + try: + # 影像件打开并读取(默认转为单通道灰度图) + image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE) + if image_ndarray is None: + raise RuntimeError("影像件打开并读取发生异常") + + return image_ndarray + except Exception: + # 若本地打开并读取影像件发生异常则抛出异常(实际作业需从影像件服务器下载并读取影像件,因签收时会转存,故必可下载) + raise RuntimeError("影像件打开并读取发生异常") + + + # noinspection PyShadowingNames + def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str: + """ + 影像件序列化 + :param image_format: 影像件格式 + :param image_ndarray: 影像件数组 + :return: 影像件唯一标识 + """ + # 按照影像件格式就影像件数组编码 + success, image_ndarray_encoded = cv2.imencode(image_format, image_ndarray) + if not success or image_ndarray_encoded is None: + raise RuntimeError("编码为图像字节数组发生异常") + + # 将编码后图像数组转为字节流 + image_bytes = image_ndarray_encoded.tobytes() + # 生成影像件唯一标识 + image_guid = md5(image_bytes).hexdigest().upper() + return image_guid + + + # noinspection PyShadowingNames + def images_classify( + image_guid: str, image_format: str, image_ndarray: numpy.ndarray + ) -> Optional[Tuple[str, str, str]]: + """ + 影像件分类并旋正 + :param image_guid: 影像件唯一标识 + :param image_format: 影像件格式 + :param image_ndarray: 影像件数据 + :return: 压缩后影像件BASE64编码,影像件类型和影像件方向 + """ + + # noinspection PyShadowingNames + def images_compress( + image_format, image_ndarray, image_size_specified=2 + ) -> Optional[str]: + """ + 影像件压缩 + :param image_ndarray: 影像件数组 + :param image_format: 影像件格式 + :param image_size_specified: 指定影像件大小,单位为兆字节(MB) + :return: 压缩后影像件BASE64编码 + """ + # 将指定影像件大小单位由兆字节转为字节 + image_size_specified = image_size_specified * 1024 * 1024 + + # 通过调整影像件质量和尺寸达到压缩影像件目的 + # 外循环压缩:通过调整影像件质量实现压缩影像件大小 + for quality in range(100, 50, -10): + image_ndarray_copy = image_ndarray.copy() + # 内循环压缩:通过调整影像件尺寸实现压缩影像件大小 + for i in range(10): + # 按照影像件格式和影像件质量将影像件数组编码 + success, image_ndarray_encoded = cv2.imencode( + image_format, + image_ndarray_copy, + params=( + [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10] + if image_format == "png" + else [cv2.IMWRITE_JPEG_QUALITY, quality] + ), + ) + # 若编码发生异常则停止循环 + if not success or image_ndarray_encoded is None: + break + + # 影像件BASE64编码 + image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode("utf-8") + if len(image_base64) <= image_size_specified: + return image_base64 + + # 调整影像件尺寸 + image_ndarray_copy = cv2.resize( + image_ndarray_copy, + ( + int(image_ndarray_copy.shape[0] * 0.95), + int(image_ndarray_copy.shape[1] * 0.95), + ), + interpolation=cv2.INTER_AREA, + ) + # 若调整后影像件尺寸中长或宽小于350像素则停止调整影像件尺寸 + if min(image_ndarray_copy.shape[:2]) < 350: + break + + return None + + # 影像件压缩 + image_base64 = images_compress(image_format, image_ndarray, image_size_specified=2) # 深圳快瞳要求为2兆字节 + # TODO: 若影像件压缩发生异常则流转至人工处理 + if image_base64 is None: + raise RuntimeError("影像件压缩发生异常") + + # 请求深圳快瞳影像件分类接口 + response = http_client.post( + url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"), + headers={"X-RequestId-Header": image_guid}, # 以影像件唯一标识作为请求唯一标识,用于双方联查 + data={ + "token": authenticator.get_token( + servicer="szkt" + ), # 获取深圳快瞳访问令牌 + "imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", # 影像件BASE64编码嵌入数据统一资源标识符 + }, + guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(), + ) + # 若响应非成功则抛出异常 + # TODO: 若响应非成功则流转至人工处理 + if not (response.get("status") == 200 and response.get("code") == 0): + raise RuntimeError("请求深圳快瞳影像件分类接口发生异常") + + # 解析影像件类型 + # TODO: 后续完成居民户口簿、中国港澳台地区及境外护照和医疗费用清单 + # noinspection PyTypeChecker + match (response["data"]["flag"], response["data"]["type"]): + case (14, _): + image_type = "居民户口簿" + case (7, "idcard-front-back"): + image_type = "居民身份证(国徽、头像面)" + case (7, "idcard-front"): + image_type = "居民身份证(国徽面)" + case (7, "idcard-back"): + image_type = "居民身份证(头像面)" + case (11, _): + image_type = "中国港澳台地区及境外护照" + case (8, _): + image_type = "银行卡" + case (4, _): + image_type = "增值税发票" + case (1, _): + image_type = "医疗费用清单" + case (5, _): + image_type = "门诊收费票据" + case (3, _): + image_type = "住院收费票据" + case (18, _): + image_type = "理赔申请书" + case _: + image_type = "其它" + + # 解析影像件方向 + # noinspection PyTypeChecker + image_orientation = { + "0": "0度", + "90": "顺时针90度", + "180": "180度", + "270": "逆时针90度", + }.get(response["data"]["angle"], "0度") + # 若影像件方向非0度则旋正 + if image_orientation != "0度": + image_ndarray = cv2.rotate( + image_ndarray, + { + "顺时针90度": cv2.ROTATE_90_COUNTERCLOCKWISE, # 逆时针旋转90度 + "180度": cv2.ROTATE_180, # 旋转180度 + "逆时针90度": cv2.ROTATE_90_CLOCKWISE, # 顺时针旋转90度 + }[image_orientation], + ) + # 旋正后影像件再次压缩 + image_base64 = images_compress(image_format, image_ndarray, image_size_specified=2) + # TODO: 若旋正后影像件再次压缩发生异常则流转至人工处理 + if image_base64 is None: + raise RuntimeError("旋正后影像件再次压缩发生异常") + + return image_base64, image_type, image_orientation + + + # 遍历工作目录中赔案目录并创建赔案档案(模拟自动化域就待自动化任务创建理赔档案) for case_path in [ - case_path for case_path in directory_path.iterdir() if case_path.is_dir() + x for x in directory_path.iterdir() if x.is_dir() ]: # 初始化赔案档案(实际报案层包括保险分公司名称、报案渠道、批次号、报案号和报案时间等) # 报案渠道包括:保司定义,例如中银项目包括总行和各地分行驻场报案和普康宝自助报案等 dossier = { "报案层": { - "保险分公司": "中银保险有限公司广东分公司", # 设定:保险分公司 + "保险分公司": "中银保险有限公司广东分公司", # 设定:保险分公司为中银保险有限公司广东分公司 "赔案号": (case_number := case_path.stem), # 设定:赔案目录名称为赔案号 }, "影像件层": [], } - # 遍历赔案目录中影像件地址 + # 遍历赔案目录中影像件路径对象 for image_index, image_path in enumerate( sorted( [ @@ -1201,64 +1219,31 @@ if __name__ == "__main__": "原始影像件": { "影像件地址": image_path.as_posix(), # 将影像件路径对象转为字符串 "影像件名称": (image_name := image_path.stem), + "影像件格式": (image_format := image_path.suffix.lower()), }, } - # 本地打开并读取影像件(实际作业为从作业系统的影像件服务器下载并读取影像件,因赔案签收时会转存影像件至影像件服务器,若下载并读取影像件发生异常则技术支持排查) - image_ndarray, image_format, image_bytes = image_read(image_path) - # 若本地打开并读取影像件发生异常则跳过该影像件 - if image_format is None or image_bytes is None: - raise RuntimeError("本地打开并读取影像件发生异常") + # 本地打开并读取影像件 + image_ndarray = image_read(image_path) - image["原始影像件"]["影像件格式"] = image_format - # 生成影像件唯一标识 + # 影像件序列化 # noinspection PyTypeChecker - image["影像件唯一标识"] = md5(image_bytes).hexdigest().upper() + image["影像件唯一标识"] = (image_guid := image_serialize(image_format, image_ndarray)) - # 影像件分类 + # 影像件分类并旋正(较初审自动化,无使能检查) + image_base64, image_type, image_orientation = images_classify(image_guid, image_format, image_ndarray) + image["影像件类型"] = image_type + image["影像件方向"] = image_orientation + # 将影像件数据添加至影像件层 + dossier["影像件层"].append(image) - print(image) - exit() + # 影像件识别 + + + print(dossier) + exit() """ - # 影像件压缩(输出BASE64编码) - image_guid, image_base64 = images_compression() - # 若发生异常则跳过该影像件 - if image_guid is None or image_base64 is None: - dossier["影像件层"][-1]["已分类"] = "否,压缩异常" - continue - - # 通过请求深圳快瞳影像件分类接口获取影像件类型和方向 - image_type, image_orientation = images_classification() - # 若发生异常则跳过该影像件 - if image_type is None or image_orientation is None: - dossier["影像件层"][-1]["已分类"] = "否,影像件分类异常" - continue - # - dossier["影像件层"].append( - { - "影像件序号": (image_index := f"{image_index:02d}"), - "影像件名称": (image_name := image_path.name), - } - ) - # 若影像件方向非0度,则影像件旋正并在此压缩 - if image_orientation != "0度": - # 影像件旋正 - image = cv2.rotate( - image, - { - "顺时针90度": cv2.ROTATE_90_COUNTERCLOCKWISE, # 逆时针旋转90度 - "180度": cv2.ROTATE_180, # 旋转180度 - "逆时针90度": cv2.ROTATE_90_CLOCKWISE, # 顺时针旋转90度 - }[image_orientation], - ) - # 影像件再次压缩 - image_guid, image_base64 = images_compression() - if image_guid is None or image_base64 is None: - dossier["影像件层"][-1]["已分类"] = "否,压缩异常" - continue - - dossier["影像件层"][-1].update({"已分类": "是", "影像件类型": image_type}) # 根据保险总公司和影像件类型评估影像件是否需要数据提取,若无需数据提取则跳过该影像件(例如,中银保险有限公司理赔申请书包含户名、开户银行和银行账号,无需识别银行卡) if not recognize_decision.evaluate(