# -*- coding: utf-8 -*- """ 根据现普康票据理赔自动化最小化实现 功能清单 https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink """ import json import re import uuid from base64 import b64encode from datetime import datetime from decimal import Decimal, ROUND_HALF_UP from hashlib import md5 from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy from dateutil.parser import parse from jinja2 import Environment, FileSystemLoader from jionlp import parse_location from zen import ZenDecision, ZenEngine from utils.client import Authenticator, HTTPClient, SQLiteClient # from utils.ocr import fuzzy_match def common_extraction(**kwargs) -> dict | None: """通用数据提取""" # 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识 image_guid = kwargs.get( "image_guid", globals().get("image_guid", uuid.uuid4().hex.upper()) ) # 影像件格式 image_format = kwargs.get("image_format", globals()["image_format"]) if image_format is None: raise RuntimeError("请入参:image_format") # 影像件BASE64编码 image_base64 = kwargs.get("image_base64", globals()["image_base64"]) if image_base64 is None: raise RuntimeError("请入参:image_base64") # 请求深圳快瞳通用文本识别接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/general"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token(servicer="szkt"), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若响应非成功,则返回NONE if not (response.get("status") == 200 and response.get("code") == 0): return None # 基于空间坐标法就识别结果中文本框进行分行排序 texts = [] # 重构文本框列表 for text in response["data"]: texts.append( [ # 文本框左上角的X坐标 numpy.float64(text["itemPolygon"]["x"]), # 文本框左上角的Y坐标 numpy.float64(text["itemPolygon"]["y"]), # 文本框的高度 numpy.float64( text["itemPolygon"]["height"] ), # 深圳快瞳基于文本框的Y坐标最大值和最小值的差值 text["value"], ] ) # 按照文本框Y坐标升序(使用空间坐标算法,从上到下,从左到右) texts.sort(key=lambda x: x[1]) rows = [] # 分行 for index, text in enumerate(texts[1:]): # 若为第一行则初始化当前行 if index == 0: row = [texts[0]] continue # 若文本框的Y坐标与当前行中最后一个文本框的Y坐标差值小于阈值,则归为同一行 # noinspection PyUnboundLocalVariable # noinspection PyTypeChecker if text[1] - row[-1][1] < numpy.mean([x[2] for x in row]) * 0.5: row.append(text) # 否则结束当前行、初始化当前行 else: rows.append(row) row = [text] # 添加最后一行 rows.append(row) extraction = [] # 按照文本框X坐标升序 for row in rows: extraction.extend( [x[3].replace(" ", "") for x in sorted(row, key=lambda x: x[0])] ) # 以空格拼接 extraction = " ".join(extraction) # 根据理赔申请书匹配提示词 match application_form := kwargs.get( "application_form", globals().get("application_form") ): case "中行员工福利保障计划索赔申请书": prompt = f""" 指令:你是一个从OCR文本中智能提取信息并生成JSON的工具,请严格按照要求执行。 输入:OCR文本(可能包含错漏): {extraction} 输出要求: 1、只输出可被Python中json.loads()解析的JSON格式字符串,不包含任何代码块标记、说明文字等其它非JSON格式内容 2、无法确定的值设置为`null`(不是"null"字符串) JSON结构: {{ "基础信息": {{ "申请人": "字符串或null", "性别": "字符串或null", "年龄": "字符串或null", "手机": "字符串或null", "身份证号": "字符串或null", "开户银行": "字符串或null", "户名": "字符串或null", "账号": "字符串或null", }}, "票据表格": [ {{ "就诊序号": "字符串或null", "发票日期": "YYYY-MM-DD或null", "发票上的就诊医院/药店": "字符串或null", "票据张数": "字符串或null", "票据金额": "字符串或null", "诊断": "字符串或null" }}, ] }} 开始输出: """ case _: raise RuntimeError(f"理赔申请书{application_form}未设置处理方法") # 请求大语言模型创建对话接口 response = globals()["http_client"].post( url="https://api.siliconflow.cn/v1/chat/completions", headers={ "Authorization": "Bearer sk-xsnuwirjjphhfdbvznfdfjqlinfdlrnlxuhkbbqynfnbhiqz", # 基于硅基流动 "Content-Type": "application/json; charset=utf-8", }, json={ "model": "deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", # 通过从DeepSeek-R1-0528模型蒸馏思维链接至Qwen3-8B-Base获得的模型 "messages": [{"role": "user", "content": prompt}], "max_tokens": 10240, # 生成文本最大令牌数 "temperature": 0.2, "top_p": 0.5, "top_k": 20, "frequency_penalty": 0.0, "thinking_budget": 1, }, guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(), ) extraction = ( json.loads(match.group("json")) if ( match := re.search( r"```json\s*(?P\{.*})\s*```", response["choices"][0]["message"]["content"], re.DOTALL, ) ) else None ) return extraction def disease_diagnosis(**kwargs) -> str | None: """疾病推定""" # 赔案档案:优先使用关键词变量,其次使用全局变量 dossier = kwargs.get("dossier", globals().get("dossier")) prompt = f""" 指令:你是一个医学疾病分类诊断的工具,请严格按照要求执行。 患者信息: 性别 {gender if (gender := dossier["赔案层"]["申请人信息"].get("性别")) is not None else "未知"}, 年龄 {age if (age := dossier["赔案层"]["申请人信息"].get("年龄")) is not None else "未知"}, 近期在药房/医院开具发票中内容 {dossier["赔案层"]["其它信息"]["小项合集"]} 输出要求: 1、患者自述症状在 {dossier["赔案层"]["其它信息"]["自述症状"]} 其中之一 2、依据患者信息、自述症状和其提供的发票中内容 {kwargs["items"]} 综合诊断,只输出一个最可能的ICD-11中的疾病分类中亚类目代码对应的中文名称字符串,不包含任何代码块标记、说明文字等 开始输出: """ # 请求大语言模型创建对话接口 response = globals()["http_client"].post( url="https://ark.cn-beijing.volces.com/api/v3/chat/completions", headers={ "Authorization": "Bearer 2c28ab07-888c-45be-84a2-fc4b2cb5f3f2", # 火山引擎 "Content-Type": "application/json; charset=utf-8", }, json={ "model": "deepseek-r1-250528", "messages": [ {"role": "system", "content": "你是人工智能助手"}, {"role": "user", "content": prompt}, ], "temperature": 0.2, "top_p": 0.5, "top_k": 20, "frequency_penalty": 0.0, "thinking_budget": 1, }, guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(), ) recognition = ( match.group("text") if ( match := re.match( r"\s*(?P.*)", response["choices"][0]["message"]["content"] ) ) else None ) return recognition # ------------------------- # 主逻辑 # ------------------------- if __name__ == "__main__": # 实例认证器 authenticator = Authenticator() # 实例请求客户端 http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存 # 初始化工作目录地址对象 directory_path = Path("directory") # 若不存在则创建 directory_path.mkdir(parents=True, exist_ok=True) def rule_engine(rule_path: Path) -> ZenDecision: """ 本地打开并读取规则文件并实例化规则引擎 :param rule_path: 规则文件路径对象 """ def loader(path): with open(path, "r", encoding="utf-8") as file: return file.read() return ZenEngine({"loader": loader}).get_decision(rule_path.as_posix()) # 影像件识别使能 recognition_enable = rule_engine(Path("rules/影像件识别使能.json")) class MasterData(SQLiteClient): """主数据""" def __init__(self): """ 初始化主数据 """ # 初始化SQLite客户端 super().__init__(database="SQLite.db") try: with self: # 初始化购药及就医机构表 self._execute( sql=""" CREATE TABLE IF NOT EXISTS institutions ( --购药及就医机构 institution TEXT PRIMARY KEY, --购药及就医机构类型 institution_type TEXT NOT NULL, --所在省 province TEXT NOT NULL, --所在市 city TEXT NOT NULL ) """ ) # 初始化团单表 self._execute( sql=""" CREATE TABLE IF NOT EXISTS group_policies ( --团单号,一张团单包括多张个单 group_policy TEXT NOT NULL, --投保公司 insurance_company TEXT NOT NULL, --保险分公司 insurer_company TEXT NOT NULL, --团单有效起期 from_date REAL NOT NULL, --团单有效止期 to_date REAL NOT NULL, --联合主键:团单号+投保公司+保险分公司 PRIMARY KEY (group_policy, insurance_company, insurer_company) ) """ ) # 初始化个单表 self._execute( sql=""" CREATE TABLE IF NOT EXISTS person_policies ( group_policy TEXT NOT NULL, person_policy TEXT NOT NULL, from_date REAL NOT NULL, to_date REAL NOT NULL, PRIMARY KEY (person_policy, group_policy) ) """ ) # 初始化被保人表 self._execute( sql=""" CREATE TABLE IF NOT EXISTS insured_persons ( insured_person TEXT NOT NULL, identity_type TEXT NOT NULL, identity_number TEXT NOT NULL, relationship TEXT NOT NULL, person_policy TEXT NOT NULL, PRIMARY KEY (person_policy, insured_person, identity_type, identity_number) ) """ ) except Exception as exception: raise RuntimeError( f"初始化数据库发生异常:{str(exception)}" ) from exception # noinspection PyShadowingNames def query_institution_type(self, institution: str) -> Optional[str]: """ 查询并获取单条购药及就医机构类型 :param institution: 购药及就医机构 :return: 购药及就医机构类型 """ # noinspection PyBroadException try: with self: # noinspection SqlResolve result = self._query_one( sql="SELECT institution_type FROM institutions WHERE institution = ?", parameters=(institution,), ) return ( None if result is None else result["institution_type"] ) # 返回购药及就医机构类型 except Exception as exception: raise RuntimeError( "查询并获取单条购药及就医机构类型发生异常" ) from exception # noinspection PyShadowingNames def query_individual_policy( self, insurer_company: str, certificate_type: str, certificate_number: str ) -> Optional[List[Dict[str, Any]]]: pass # 实例化主数据 master_data = MasterData() # 实例化JINJA2环境 environment = Environment(loader=FileSystemLoader(".")) # 添加DATE过滤器 environment.filters["date"] = lambda date: ( date.strftime("%Y-%m-%d") if date else "长期" ) # 加载赔案档案模版 template = environment.get_template("template.html") # ------------------------- # 自定义方法 # ------------------------- # noinspection PyShadowingNames def image_read( image_path: Path, ) -> Optional[numpy.ndarray | None]: """ 本地打开并读取影像件 :param image_path: 影像件路径对象 :return: 影像件数组 """ # noinspection PyBroadException try: # 影像件打开并读取(默认转为单通道灰度图) image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE) if image_ndarray is None: raise RuntimeError("影像件打开并读取发生异常") return image_ndarray except Exception: # 若本地打开并读取影像件发生异常则抛出异常(实际作业需从影像件服务器下载并读取影像件,因签收时会转存,故必可下载) raise RuntimeError("影像件打开并读取发生异常") # noinspection PyShadowingNames def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str: """ 影像件序列化 :param image_format: 影像件格式 :param image_ndarray: 影像件数组 :return: 影像件唯一标识 """ # 按照影像件格式就影像件数组编码 success, image_ndarray_encoded = cv2.imencode(image_format, image_ndarray) if not success or image_ndarray_encoded is None: raise RuntimeError("编码为图像字节数组发生异常") # 将编码后图像数组转为字节流 image_bytes = image_ndarray_encoded.tobytes() # 生成影像件唯一标识 image_guid = md5(image_bytes).hexdigest().upper() return image_guid # noinspection PyShadowingNames def image_classify( image_guid: str, image_format: str, image_ndarray: numpy.ndarray ) -> Optional[Tuple[str, str]]: """ 影像件分类并旋正 :param image_guid: 影像件唯一标识 :param image_format: 影像件格式 :param image_ndarray: 影像件数据 :return: 压缩后影像件BASE64编码和影像件类型 """ # noinspection PyShadowingNames def image_compress( image_format, image_ndarray, image_size_specified=2 ) -> Optional[str]: """ 影像件压缩 :param image_ndarray: 影像件数组 :param image_format: 影像件格式 :param image_size_specified: 指定影像件大小,单位为兆字节(MB) :return: 压缩后影像件BASE64编码 """ # 将指定影像件大小单位由兆字节转为字节 image_size_specified = image_size_specified * 1024 * 1024 # 通过调整影像件质量和尺寸达到压缩影像件目的 # 外循环压缩:通过调整影像件质量实现压缩影像件大小 for quality in range(100, 50, -10): image_ndarray_copy = image_ndarray.copy() # 内循环压缩:通过调整影像件尺寸实现压缩影像件大小 for i in range(10): # 按照影像件格式和影像件质量将影像件数组编码 success, image_ndarray_encoded = cv2.imencode( image_format, image_ndarray_copy, params=( [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10] if image_format == "png" else [cv2.IMWRITE_JPEG_QUALITY, quality] ), ) # 若编码发生异常则停止循环 if not success or image_ndarray_encoded is None: break # 影像件BASE64编码 image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode( "utf-8" ) if len(image_base64) <= image_size_specified: return image_base64 # 调整影像件尺寸 image_ndarray_copy = cv2.resize( image_ndarray_copy, ( int(image_ndarray_copy.shape[0] * 0.95), int(image_ndarray_copy.shape[1] * 0.95), ), interpolation=cv2.INTER_AREA, ) # 若调整后影像件尺寸中长或宽小于350像素则停止调整影像件尺寸 if min(image_ndarray_copy.shape[:2]) < 350: break return None # 影像件压缩 image_base64 = image_compress( image_format, image_ndarray, image_size_specified=2 ) # 深圳快瞳要求为2兆字节 # TODO: 若影像件压缩发生异常则流转至人工处理 if image_base64 is None: raise RuntimeError("影像件压缩发生异常") # 请求深圳快瞳影像件分类接口 response = http_client.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"), headers={ "X-RequestId-Header": image_guid }, # 以影像件唯一标识作为请求唯一标识,用于双方联查 data={ "token": authenticator.get_token( servicer="szkt" ), # 获取深圳快瞳访问令牌 "imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", # 影像件BASE64编码嵌入数据统一资源标识符 }, guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若响应非成功则抛出异常 # TODO: 若响应非成功则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): raise RuntimeError("请求深圳快瞳影像件分类接口发生异常") # 解析影像件类型 # noinspection PyTypeChecker match (response["data"]["flag"], response["data"]["type"]): case (14, _): image_type = "居民户口簿" case (7, "idcard-front-back"): image_type = "居民身份证(国徽、头像面)" case (7, "idcard-front"): image_type = "居民身份证(国徽面)" case (7, "idcard-back"): image_type = "居民身份证(头像面)" case (11, _): image_type = "中国港澳台地区及境外护照" case (8, _): image_type = "银行卡" case (4, _): image_type = "增值税发票" case (1, _): image_type = "医疗费用清单" case (5, _): image_type = "医疗门诊收费票据" case (3, _): image_type = "医疗住院收费票据" case (18, _): image_type = "理赔申请书" case _: image_type = "其它" # 解析影像件方向 # noinspection PyTypeChecker image_orientation = { "0": "0度", "90": "顺时针90度", "180": "180度", "270": "逆时针90度", }.get(response["data"]["angle"], "0度") # 若影像件方向非0度则旋正 if image_orientation != "0度": image_ndarray = cv2.rotate( image_ndarray, { "顺时针90度": cv2.ROTATE_90_COUNTERCLOCKWISE, # 逆时针旋转90度 "180度": cv2.ROTATE_180, # 旋转180度 "逆时针90度": cv2.ROTATE_90_CLOCKWISE, # 顺时针旋转90度 }[image_orientation], ) # 旋正后影像件再次压缩 image_base64 = image_compress( image_format, image_ndarray, image_size_specified=2 ) # TODO: 若旋正后影像件再次压缩发生异常则流转至人工处理 if image_base64 is None: raise RuntimeError("旋正后影像件再次压缩发生异常") return image_base64, image_type # noinspection PyShadowingNames def image_recognize( image, insurance_company, insurer_company, ) -> None: """ 影像件识别并整合至赔案档案 :param image: 影像件 :param insurance_company: 投保公司 :param insurer_company: 保险分公司 :return: 空 """ # TODO: 后续添加居民身份证(国徽面)和居民身份证(头像面)合并 # noinspection PyShadowingNames def identity_card_recognize(image, insurance_company, insurer_company) -> None: """ 居民身份证识别并整合至赔案档案 :param image: 影像件 :param insurance_company: 投保公司 :param insurer_company: 保险分公司 :return: 空 """ # 请求深圳快瞳居民身份证识别接口 response = http_client.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"), headers={ "X-RequestId-Header": image["影像件唯一标识"] }, # 以影像件唯一标识作为请求唯一标识,用于双方联查 data={ "token": authenticator.get_token( servicer="szkt" ), # 获取深圳快瞳访问令牌 "imgBase64": f"data:image/{image["影像件格式"].lstrip(".")};base64,{image["影像件BASE64编码"]}", }, # 深圳快瞳支持同时识别居民国徽面和头像面 guid=md5((url + image["影像件唯一标识"]).encode("utf-8")) .hexdigest() .upper(), ) # 若响应非成功则抛出异常 # TODO: 若响应非成功则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): raise RuntimeError("请求深圳快瞳居民身份证识别接口发生异常") if image["影像件类型"] in [ "居民身份证(国徽、头像面)", "居民身份证(国徽面)", ]: # noinspection PyTypeChecker dossier["出险人层"].update( { "有效期起": parse( (period := response["data"]["validDate"].split("-"))[0] ).strftime( "%Y-%m-%d" ), # 就有效期限解析为有效期起和有效期止。其中,若有效期止为长期则默认为9999-12-31 "有效期止": ( datetime(9999, 12, 31).strftime("%Y-%m-%d") if period[1] == "长期" else parse(period[1]).strftime("%Y-%m-%d") ), } ) if image["影像件类型"] in [ "居民身份证(国徽、头像面)", "居民身份证(头像面)", ]: # noinspection PyTypeChecker dossier["出险人层"].update( { "姓名": response["data"]["name"], "证件类型": "居民身份证", "证件号码": response["data"]["idNo"], "性别": response["data"]["sex"], "出生": datetime.strptime( response["data"]["birthday"], "%Y-%m-%d" ), # 深圳快瞳居民身份证识别接口中出生由字符串(%Y.%m.%d)转为日期,日期格式默认为%Y-%m-%d "省": ( address := parse_location(response["data"]["address"]) ).get( "province" ), # 就住址解析为省、市、区和详细地址 "市": address.get("city"), "区": address.get("county"), "详细地址": address.get("detail"), } ) # 查询 print(dossier["报案层"]) print(1) exit() # noinspection PyShadowingNames def bank_card_recognize(image_guid, image_format, image_base64) -> None: """ 银行卡识别并整合至赔案档案 :param image_guid: 影像件唯一标识 :param image_format: 影像件格式 :param image_base64: 影像件BASE64编码 :return: 空 """ # 请求深圳快瞳居民身份证识别接口 response = http_client.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/bankCard"), headers={"X-RequestId-Header": image_guid}, data={ "token": authenticator.get_token( servicer="szkt" ), # 获取深圳快瞳访问令牌 "imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", }, guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若响应非成功或银行卡类型非借记卡则抛出异常 # TODO: 若响应非成功则流转至人工处理 if not ( response.get("status") == 200 and response.get("code") == 0 and response.get("data", {}).get("bankCardType") == 1 ): raise RuntimeError( "请求深圳快瞳居民身份证识别接口发生异常或已识别非借记卡" ) # noinspection PyTypeChecker dossier["受益人层"].update( { "开户行": response["data"]["bankInfo"], "户名": None, "户号": response["data"]["cardNo"].replace(" ", ""), } ) # noinspection PyShadowingNames def receipt_recognize( image_index, image_guid, image_format, image_base64, image_type ) -> None: """ 票据识别并整合至赔案档案 :param image_index: 影像件编号 :param image_guid: 影像件唯一标识 :param image_format: 影像件格式 :param image_base64: 影像件BASE64编码 :param image_type: 影像件类型 :return: 空 """ # 初始化票据数据 receipt = {"影像件编号": image_index} # 请求深圳快瞳票据查验接口(兼容增值税发票、医疗门诊/住院收费票据) response = http_client.post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll"), headers={"X-RequestId-Header": image_guid}, data={ "token": authenticator.get_token( servicer="szkt" ), # 获取深圳快瞳访问令牌 "imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", }, guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若查验成功则直接整合至赔案档案 if response.get("status") == 200 and response.get("code") == 10000: # noinspection PyTypeChecker match response["data"]["productCode"]: # 增值税发票,目前深圳快瞳支持全电和全电纸质增值税发票查验 case "003082": # noinspection PyTypeChecker receipt.update( { "查验状态": ( "真票" if response["data"]["details"]["invoiceTypeNo"] == "0" else "红票" ), "票据号码": response["data"]["details"]["number"], "票据代码": ( response["data"]["details"]["code"] if response["data"]["details"]["code"] else None ), # 全电发票无发票代码,深圳快瞳票据查验接口中票据代码由空字符转为None "开票日期": datetime.strptime( response["data"]["details"]["date"], "%Y年%m月%d日" ).strftime( "%Y-%m-%d" ), # 深圳快瞳票据查验接口中开票日期由字符串(%Y年%m月%d日)转为日期 "校验码": response["data"]["details"]["check_code"], "开票金额": Decimal( response["data"]["details"]["total"] ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), # 深圳快瞳票据查验接口中开票金额由字符串转为Decimal,保留两位小数 "姓名": response["data"]["details"]["buyer"], "购药及就医机构": ( institution := response["data"]["details"]["seller"] ), "备注": ( response["data"]["details"]["remark"] if response["data"]["details"]["remark"] else None ), # 深圳快瞳票据查验接口中备注由空字符转为None "费项层": [ { "名称": item["name"], "规格": ( item["specification"] if item["specification"] else None ), # 深圳快瞳票据查验接口中明细规则由空字符转为None "单位": ( item["unit"] if item["unit"] else None ), # 深圳快瞳票据查验接口中明细单位由空字符转为None "数量": ( Decimal(item["quantity"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) if item["quantity"] else None ), # 深圳快瞳票据查验接口中明细单位由空字符转为None,若非空字符由字符串转为Decimal,保留两位小数 "金额": ( Decimal(item["total"]) + Decimal(item["tax"]) ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), # 深圳快瞳票据查验接口中明细金额税额由字符串转为Decimal,保留两位小数,求和 } for item in response["data"]["details"].get( "items", [] ) ], } ) # 查询并获取单条购药及就医机构类型 institution_type = master_data.query_institution_type( institution ) # TODO: 若查询并获取单条购药及就医机构类型发生异常则流转至主数据人工处理 if institution_type is None: raise RuntimeError( "查询并获取单条购药及就医机构类型发生异常" ) receipt["购药及就医机构类型"] = institution_type # 根据购药及就医机构类型匹配处理方法 match institution_type: # 若购药及就医机构类型为药店,则根据 case "药店": pass case "私立医院": pass case _: raise RuntimeError("") # 门诊/住院收费票据 case "003081": # noinspection PyTypeChecker receipt.update( { "查验状态": ( "真票" if response["data"]["flushedRed"] == "true" else "红票" ), "票据号码": response["data"]["billNumber"], "票据代码": ( response["data"]["billCode"] if response["data"]["billCode"] else None ), # 部分地区医疗收费票据无发票代码,深圳快瞳票据查验接口中票据代码由空字符转为None "开票日期": parse( response["data"]["invoiceDate"] ).strftime( "%Y-%m-%d" ), # 深圳快瞳票据查验接口中开票日期由字符串(%Y-%m-%d)转为日期 "校验码": response["data"]["checkCode"], "票据金额": Decimal( response["data"]["amount"] ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), "姓名": response["data"]["payer"], "购药及就医机构": response["data"][ "receivablesInstitution" ], "医保支付": format( Decimal( response["data"].get("medicarePay", "0.00") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "其它支付": format( Decimal( response["data"].get("otherPayment", "0.00") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "个人自付": format( Decimal( response["data"].get("personalPay", "0.00") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "自付一": format( Decimal( response["data"].get("self_pay_one", "0.00") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), # 深圳快瞳票据查验中就部分地区无自付一 "自付二": format( Decimal( response["data"].get( "classificationPays", "0.00" ) ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), # 深圳快瞳票据查验中就部分地区无自付二 "个人自费": format( Decimal( response["data"].get("personalExpense", "0.00") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "住院日期": ( parse(date.split("-")[0]).strftime("%Y-%m-%d") if ( date := response["data"].get( "hospitalizationDate" ) ) else None ), # 深圳快瞳票据查验中就收费票据住院日期格式为%Y%m%d-%Y%m%d,即住院日期-出院日期 "出院日期": ( parse(date.split("-")[1]).strftime("%Y-%m-%d") if date else None ), "医疗机构类型": response["data"]["institutionsType"], "项目": [ { "名称": item["itemName"], "规格": item[ "medical_level" ], # 甲类无自付、乙类有自付、丙类全自付 "单位": item["unit"], "数量": format( Decimal(item["number"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "金额": format( Decimal(item["totalAmount"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), } for item in response["data"]["feedetails"] ], } ) # 若查验为假票或无法查验则 else: if response.get("status") == 400 and ( response.get("code") == 10100 or response.get("code") == 10001 ): receipt["查验结果"] = "假票" else: receipt["查验结果"] = "无法查验" try: match image_type: case "增值税发票": try: # 请求深圳快瞳增值税发票识别接口 response = globals()["http_client"].post( url=( url := "https://ai.inspirvision.cn/s/api/ocr/vatInvoice" ), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()[ "authenticator" ].get_token(servicer="szkt"), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5( (url + image_guid).encode("utf-8") ) .hexdigest() .upper(), ) # 若深圳快瞳增值税发票识别响应非成功则返回None if not ( response.get("status") == 200 and response.get("code") == 0 ): return None extraction = { "票据类型": ( invoice_type := ( data := { item["desc"]: item["value"] for item in response["data"] } ).get("发票类型") ), "票据号码": (number := data.get("发票号码")), "票据代码": data.get("发票代码"), "开票日期": ( datetime.strptime( date, "%Y年%m月%d日" ).strftime("%Y-%m-%d") if re.match( r"\d{4}年\d{1,2}月\d{1,2}日", (date := data.get("开票日期")), ) else date ), "校验码": ( check_code if (check_code := data.get("校验码")) else number ), # 若校验码为空则默认为票据号码 "收款方": data.get("销售方名称"), "付款方": data.get("购买方名称"), "票据金额": format( Decimal( data.get("小写金额").replace("¥", "") if invoice_type == "电子发票(普通发票)" else data.get("合计金额(小写)") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "备注": ( remark if (remark := data.get("备注")) else None ), "项目": ( [ { "名称": name, "规格": ( specification if specification else None ), "单位": unit if unit else None, "数量": ( format( Decimal(quantity).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if quantity else None ), "金额": format( ( Decimal(amount) + Decimal(tax) ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", # 价税合计,保留两位小数 ), } for name, specification, unit, quantity, amount, tax in zip( [ component["value"] for component in response[ "data" ] if re.match( r"^项目名称(\d+)?$", component["desc"], ) ], [ component["value"] for component in response[ "data" ] if re.match( r"^规格型号(\d+)?$", component["desc"], ) ], [ component["value"] for component in response[ "data" ] if re.match( r"^单位(\d+)?$", component["desc"], ) ], [ component["value"] for component in response[ "data" ] if re.match( r"^数量(\d+)?$", component["desc"], ) ], [ component["value"] for component in response[ "data" ] if re.match( r"^金额(\d+)?$", component["desc"], ) ], [ component["value"] for component in response[ "data" ] if re.match( r"^税额(\d+)?$", component["desc"], ) ], ) ] if invoice_type == "电子发票(普通发票)" else [ { "名称": name, "数量": format( Decimal(quantity).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), "0.2f", ), "金额": format( Decimal(amount).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), } for name, quantity, amount in zip( [ component["value"] for component in response[ "data" ] if re.match( r"^项目名称明细(\d+)?$", component["desc"], ) ], [ component["value"] for component in response[ "data" ] if re.match( r"^项目数量明细(\d+)?$", component["desc"], ) ], [ component["value"] for component in response[ "data" ] if re.match( r"^项目金额明细(\d+)?$", component["desc"], ) ], ) ] ), "查验状态": "无法查验", } except: pass # 请求深圳快瞳收费票据识别接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/medical"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token( servicer="szkt" ), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5((url + image_guid).encode("utf-8")) .hexdigest() .upper(), ) # 若深圳快瞳收费票据识别响应非成功则返回NONE if not ( response.get("status") == 200 and response.get("code") == 0 ): return None extraction = { "票据类型": ( "门诊收费票据" if response["data"]["insured"]["receipt_outpatient"] else "住院收费票据" ), "票据号码": ( receipt := ( response["data"]["insured"]["receipt_outpatient"] or response["data"]["insured"][ "receipt_hospitalization" ] )["receipts"][0] )["receipt_no"][ "value" ], # 默认提取门诊/住院收费票据的第一张票据 "票据代码": receipt["global_detail"]["invoice_code"][ "value" ], "开票日期": receipt["global_detail"]["invoice_date"][ "value" ], # 深圳快瞳收费票据识别中就开票日期格式为%Y-%m-%d "校验码": fuzzy_match( target="校验码", components=receipt["global_detail"]["region_specific"], specify_key="name", return_key="word.value", ), "收款方": receipt["hospital_name"]["value"], "付款方": receipt["name"]["value"], "票据金额": format( Decimal(receipt["total_amount"]["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "医保支付": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance( (field := receipt.get("medicare_pay")), dict ) else None ), "其它支付": format( ( Decimal(value).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) if ( value := fuzzy_match( target="其它支付", components=receipt.get( "global_detail", {} ).get("pay_list", []), specify_key="name", return_key="word.value", ) ) else None ), ".2f", ), "个人自付": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance((field := receipt.get("self_pay")), dict) else None ), "自付一": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance( field := (receipt.get("self_pay_one")), dict ) else None ), "自付二": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance( field := (receipt.get("self_pay_two")), dict ) else None ), "个人自费": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance(field := (receipt.get("self_cost")), dict) else None ), "住院日期": ( datetime.strptime(field["value"], "%Y%m%d").strftime( "%Y-%m-%d" ) if isinstance(field := (receipt.get("starttime")), dict) else None ), "出院日期": ( datetime.strptime(field["value"], "%Y%m%d").strftime( "%Y-%m-%d" ) if isinstance(field := (receipt.get("endtime")), dict) else None ), "医疗机构类型": receipt["others"][ "medical_institution_type" ]["value"], "项目": [ { "名称": ( field["value"] if isinstance( (field := item["item_name"]), dict ) else None ), "规格": ( field["value"] if isinstance( (field := item["specifications"]), dict ) else None ), "单位": ( field["value"] if isinstance((field := item["unit"]), dict) else None ), "数量": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance((field := item["number"]), dict) else None ), "金额": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance( (field := item["total_amount"]), dict ) else None ), } for item in receipt["feeitems"] ], "查验状态": "无法查验", } return extraction except: return None # 影像件识别使能检查,若影像件不识别则跳过 if not recognition_enable.evaluate( { "insurer_company": insurer_company, "image_type": image["影像件类型"], } )["result"]["recognition_enable"]: return # 根据影像件类型匹配影像件识别方法 match image["影像件类型"]: # TODO: 后续添加居民户口簿识别和整合方法 case "居民户口簿": raise RuntimeError("暂不支持居民户口簿") case ( "居民身份证(国徽、头像面)" | "居民身份证(国徽面)" | "居民身份证(头像面)" ): # 居民身份证识别并整合至赔案档案 identity_card_recognize(image, insurance_company, insurer_company) # TODO: 后续添加居民户口簿识别和整合方法 case "中国港澳台地区及境外护照": raise RuntimeError("暂不支持中国港澳台地区及境外护照") case "银行卡": # 银行卡识别并整合至赔案档案 bank_card_recognize(image_guid, image_format, image_base64) # TODO: 暂仅支持增值税发票识别且购药及就医类型为药店购药整合至赔案档案,后续逐步添加 case "增值税发票" | "医疗门诊收费票据" | "医疗住院收费票据": # 票据识别并整合至赔案档案 receipt_recognize(image_guid, image_format, image_base64, image_type) # 遍历工作目录中赔案目录并创建赔案档案(模拟自动化域就待自动化任务创建理赔档案) for case_path in [x for x in directory_path.iterdir() if x.is_dir()]: # 初始化赔案档案(保险公司将提供投保公司、保险分公司和报案时间等,TPA作业系统签收后生成赔案号) dossier = { "报案层": { "投保公司": ( insurance_company := "中国银行股份有限公司昆山分行" ), # 指定投保公司 "保险分公司": ( insurer_company := "中银保险有限公司苏州分公司" ), # 指定保险分公司 "报案时间": datetime(2025, 7, 25, 12, 0, 0).strftime( "%Y-%m-%d %H:%M:%S" ), # 指定报案时间 "赔案号": (case_number := case_path.stem), # 设定:赔案目录名称为赔案号 }, "影像件层": [], "出险人层": {}, "受益人层": {}, } # 遍历赔案目录中影像件 for image_index, image_path in enumerate( sorted( [ x for x in case_path.glob(pattern="*") if x.is_file() and x.suffix.lower() in [".jpg", ".jpeg", ".png"] ], # 实际作业亦仅支持JPG、JPEG或PNG key=lambda x: x.stat().st_ctime, # 根据影像件创建时间顺序排序 ), 1, ): # 初始化影像件数据 image = { "影像件编号": image_index, "影像件地址": image_path.as_posix(), # 将影像件路径对象转为字符串 "影像件名称": image_path.stem, "影像件格式": (image_format := image_path.suffix.lower()), } # 本地打开并读取影像件 image_ndarray = image_read(image_path) # 影像件序列化 # noinspection PyTypeChecker image["影像件唯一标识"] = ( image_guid := image_serialize(image_format, image_ndarray) ) # 影像件分类并旋正(较初审自动化无使能检查) image_base64, image_type = image_classify( image_guid, image_format, image_ndarray ) image["影像件BASE64编码"] = image_base64 image["影像件类型"] = image_type dossier["影像件层"].append(image) # 就影像件按照影像件类型排序 dossier["影像件层"].sort( key=lambda x: [ "居民户口簿", "居民身份证(国徽面)", "居民身份证(头像面)", "居民身份证(国徽、头像面)", "中国港澳台地区及境外护照", "理赔申请书", "增值税发票", "医疗门诊收费票据", "医疗住院收费票据", "医疗费用清单", "银行卡", "其它", ].index(x["影像件类型"]) ) # 优先居民户口簿、居民身份证、中国港澳台地区及境外护照和理赔申请书以查询被保人信息 # 遍历影像件层中影像件 for image in dossier["影像件层"]: # 影像件识别并整合至赔案档案 image_recognize( image, insurance_company, insurer_company, ) """ case "增值税发票" | "门诊收费票据" | "住院收费票据": extraction = invoice_extraction() # 若发生异常则跳过该影像件 if extraction is None: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue dossier["发票层"].append( { "关联影像件序号": image_index, "票据类型": extraction["票据类型"], "票据号码": extraction["票据号码"], "票据代码": ( extraction["票据代码"] if extraction["票据代码"] else "--" ), # 数电票无票据代码,校验码同票据号码 "开票日期": datetime.strptime( extraction["开票日期"], "%Y-%m-%d" ), "校验码后六位": ( check_code[-6:] if (check_code := extraction["校验码"]) else "--" ), "医药机构": extraction["收款方"], "就诊人": ( match.group("name") if ( match := re.search( r"^(?P[^((]+)", extraction["付款方"] ) ) else extraction["付款方"] ), "票据金额": Decimal(extraction["票据金额"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP ), # 默认金额转为小数,保留两位小数 "查验状态": extraction["查验状态"], "项目": ( pandas.DataFrame(extraction["项目"]) .assign( 数量=lambda dataframe: dataframe["数量"].apply( lambda row: ( Decimal(row).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) if row else Decimal("0.00") ) ), 金额=lambda dataframe: dataframe["金额"].apply( lambda row: ( Decimal(row).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) if row else Decimal("0.00") ) ), ) .groupby(by="名称", as_index=False) .agg(数量=("数量", "sum"), 金额=("金额", "sum")) .assign( 大项=lambda dataframe: dataframe["名称"].apply( lambda row: ( match.group("category") if ( match := re.match( r"^\*(?P.+?)\*.*$", row, ) ) else row ) ), 小项=lambda dataframe: dataframe["名称"].apply( lambda row: ( re.sub( r"[^\u4e00-\u9fa5a-zA-Z0-9./%*]", "", match.group("name"), ) if ( match := re.match( r"^\*.+?\*(?:\[[^]]+])?(?P[^\s(]+)(?:\([^\s(]+\))?(?:.*?)?$", row, ) ) else "" ) ), ) .loc[ lambda dataframe: dataframe["金额"] != 0, ["名称", "大项", "小项", "数量", "金额"], ] .to_dict(orient="records") ), "就诊类型": ( "药店购药" if "增值税发票" in image_type else ( "门诊就诊" if "门诊收费票据" in image_type else "住院治疗" ) ), } ) case "理赔申请书": # 根据保险总公司匹配理赔申请书 # noinspection PyUnreachableCode match insurer: case "中银保险有限公司": extraction = common_extraction( application_form="中行员工福利保障计划索赔申请书" ) # 若识别异常则跳过该影像件 if extraction is None: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue dossier["赔案层"]["申请人信息"].update( { "与被保险人关系": "本人", # 中银保险有限公司:默认申请人与被保险人关系为本人 "年龄": ( Decimal(age).quantize( Decimal("0"), rounding=ROUND_HALF_UP, ) if ( age := extraction.get("基础信息", {}).get( "年龄", "--" ) ).isdigit() else age ), # 若年龄仅数字则转为小数、取整,否则默认为“--” "手机号": ( phone_number if re.match( r"^1[3-9]\d{9}$", phone_number := extraction.get( "基础信息", {} ).get("手机", "--"), ) else phone_number ), # 若手机未正则匹配手机号格式则为“--” } ) dossier["赔案层"]["受益人信息"].update( { "与被保险人关系": "本人", # 中银保险有限公司:默认受益人与被保人关系为本人 "户名": ( account_name if ( account_name := extraction.get( "基础信息", {} ).get("户名") ) else "--" ), # 若户名为NONE则为“--” "开户银行": ( account_name if ( account_name := extraction.get( "基础信息", {} ).get("开户银行") ) else "--" ), # 若开户银行为NONE则为“--” "银行账号": ( account_name if ( account_name := extraction.get( "基础信息", {} ).get("账号") ) is not None else "--" ), # 若银行账号为NONE则为“--” } ) dossier["赔案层"]["其它信息"]["自述症状"] = ( ("、".join(diagnoses)) if ( diagnoses := sorted( set( "、".join( [ diagnosis for invoice in extraction.get( "票据表格", [] ) if ( diagnosis := invoice.get("诊断") ) ] ).split("、") ) ) ) else "--" ) case _: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue case _: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue dossier["影像件层"][-1].update( { "已识别": "是", "识别结果": extraction, } ) # 发票层根据开票日期顺序排序 dossier["发票层"] = sorted( dossier["发票层"], key=lambda x: (x["开票日期"], x["票据号码"]) ) # 构建小项层 # noinspection PyTypeChecker dossier["小项层"] = ( pandas.DataFrame( [ { "小项": item["小项"], "数量": item["数量"], "金额": item["金额"], } for invoice in dossier["发票层"] for item in invoice["项目"] ] ) .groupby(by="小项", as_index=False) .agg(数量=("数量", "sum"), 金额=("金额", "sum")) .to_dict(orient="records") ) for invoice in dossier["发票层"]: # noinspection PyTypeChecker invoice["推定疾病"] = disease_diagnosis( items="、".join(sorted(set([item["小项"] for item in invoice["项目"]]))) ) print(dossier) exit() with open(f"dossiers/{case_number}.html", "w", encoding="utf-8") as file: file.write( template.render( { "dossier": dossier, } ) ) """