# -*- coding: utf-8 -*- """ 普康健康_自动化录入 --优先使用深圳快瞳,就增值税发票、医疗发票优先使用深圳快瞳票据查验、其次使用深圳快瞳票据识别,最后使用本地识别 --优先考虑增值税发票 """ import hashlib import json import re import uuid from base64 import b64encode from datetime import datetime from decimal import Decimal, ROUND_HALF_UP from pathlib import Path import cv2 import numpy import pandas from dateutil.parser import parse from jinja2 import Environment, FileSystemLoader from jionlp import parse_location from zen import ZenDecision, ZenEngine from utils.client import Authenticator, HTTPClient from utils.ocr import fuzzy_match # ------------------------- # 工具函数 # ------------------------- def images_compression(**kwargs) -> tuple[str | None, str | None]: """影像件压缩并BASE64编码""" # 影像件格式 image_format = kwargs.get("image_format", globals()["image_format"]) if image_format is None: return None, None # 影像件 image = kwargs.get("image", globals()["image"]) if image is None: return None, None # 编码为图像字节流 success, image_bytes = cv2.imencode(ext=f".{image_format}", img=image) # 若发生异常则返回NONE if not success or image_bytes is None: return None, None # 生成影像件唯一标识 image_guid = hashlib.md5(image_bytes.tobytes()).hexdigest().upper() # BASE64编码 image_base64 = b64encode(image_bytes.tobytes()).decode("utf-8") # 将指定影像件大小单位由MB转为B image_size_specified = kwargs.get("image_size_specified", 2) * 1048576 # 若影像件BASE64编码后大小小于指定影像件大小则返回 if len(image_base64) <= image_size_specified: return image_guid, image_base64 # 双循环压缩影像件 # 外循环压缩:通过降低影像件质量实现压缩影像件大小 for quality in range(90, 0, -10): image_copy = image.copy() # 内循环压缩:通过等比例调整影像件尺寸实现压缩影像件大小 for i in range(25): success, image_bytes = cv2.imencode( ext=f".{image_format}", img=image_copy, params=( [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10] if image_format == "png" else [cv2.IMWRITE_JPEG_QUALITY, quality] ), ) if not success or image_bytes is None: break image_base64 = b64encode(image_bytes.tobytes()).decode("utf-8") if len(image_base64) <= image_size_specified: return image_guid, image_base64 # 调整影像件尺寸 image_copy = cv2.resize( src=image_copy, dsize=(int(image_copy.shape[0] * 0.9), int(image_copy.shape[1] * 0.9)), interpolation=cv2.INTER_AREA, ) # 若调整后影像件尺寸中长或宽小于200像素则停止调整影像件尺寸 if min(image_copy.shape[:2]) < 200: break # 若仍未压缩至指定影像件大小则返回NONE return None, None def images_classification(**kwargs) -> tuple[str | None, str | None]: """影像件分类""" # 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识 image_guid = kwargs.get( "image_guid", globals().get("image_guid", uuid.uuid4().hex.upper()) ) # 影像件格式 image_format = kwargs.get("image_format", globals()["image_format"]) if image_format is None: return None, None # 影像件BASE64编码 image_base64 = kwargs.get("image_base64", globals()["image_base64"]) if image_base64 is None: return None, None # 请求深圳快瞳影像件分类接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"), # 用于和深圳快瞳联查定位 headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token( servicer="szkt" ), # 使用全局变量 "imgBase64": f"data:image/{image_format};base64,{image_base64}", # 深圳快瞳要求修饰影像件BASE64编码的DATAURI }, guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若响应非成功,则返回NONE if not (response.get("status") == 200 and response.get("code") == 0): return None, None # 根据票据类型和票据二级分类映射影像件类型 match (response["data"]["flag"], response["data"]["type"]): case (7, "idcard-front-back"): image_type = "居民身份证(正背面)" case (7, "idcard-front"): image_type = "居民身份证(正面)" # 包含国徽一面 case (7, "idcard-back"): image_type = "居民身份证(背面)" # 包含头像一面 case (8, _): image_type = "银行卡" case (4, _): image_type = "增值税发票" case (5, _): image_type = "门诊收费票据" case (3, _): image_type = "住院收费票据" case (18, _): image_type = "理赔申请书" case _: return None, None # 影像件方向 image_orientation = { "0": "0度", "90": "顺时针90度", "180": "180度", "270": "逆时针90度", }.get(response["data"]["angle"], "0度") return image_type, image_orientation def idcard_extraction(**kwargs) -> dict | None: """居民身份证数据提取""" # 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识 image_guid = kwargs.get( "image_guid", globals().get("image_guid", uuid.uuid4().hex.upper()) ) # 影像件格式 image_format = kwargs.get("image_format", globals()["image_format"]) if image_format is None: return None # 影像件BASE64编码 image_base64 = kwargs.get("image_base64", globals()["image_base64"]) if image_base64 is None: return None # 请求深圳快瞳居民身份证识别接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token( servicer="szkt" ), # 使用全局变量 "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, # 支持同时识别居民身份证正反面 guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若响应非成功,则返回NONE if not (response.get("status") == 200 and response.get("code") == 0): return None extraction = { "姓名": response["data"]["name"], "性别": response["data"]["sex"], "民族": response["data"]["nation"], "出生": response["data"][ "birthday" ], # 深圳快瞳居民身份证出生日期格式为%Y-%m-%d "住址": response["data"]["address"], "公民身份号码": response["data"]["idNo"], "签发机关": response["data"]["issuedBy"], "有效期起": parse( (date := response["data"]["validDate"]).split("-")[0] ).strftime( "%Y-%m-%d" ), # 深圳快瞳居民身份证识别中有效期日期格式为%Y.%m.%d,转为%Y-%m-%d "有效期止": ( date if (date := date.split("-")[1]) == "长期" else parse(date).strftime("%Y-%m-%d") ), } return extraction def bankcard_extraction(**kwargs) -> dict | None: """银行卡数据提取""" # 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识 image_guid = kwargs.get( "image_guid", globals().get("image_guid", uuid.uuid4().hex.upper()) ) # 影像件格式 image_format = kwargs.get("image_format", globals()["image_format"]) if image_format is None: raise RuntimeError("请入参:image_format") # 影像件BASE64编码 image_base64 = kwargs.get("image_base64", globals()["image_base64"]) if image_base64 is None: raise RuntimeError("请入参:image_base64") # 请求深圳快瞳银行卡识别接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/bankCard"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token(servicer="szkt"), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若响应非成功,则返回NONE if not (response.get("status") == 200 and response.get("code") == 0): return None extraction = { "卡类型": {"1": "借记卡", "2": "贷记卡"}.get( response["data"]["bankCardType"], "其它" ), # 0不能识别、3准贷记卡、4预付卡合并为其它 "银行名称": response["data"]["bankInfo"], "卡号": response["data"]["cardNo"].replace(" ", ""), } return extraction def invoice_extraction(**kwargs) -> dict | None: """增值税发票/收费票据数据提取""" # 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识 image_guid = kwargs.get( "image_guid", globals().get("image_guid", uuid.uuid4().hex.upper()) ) # 影像件格式 image_format = kwargs.get("image_format", globals()["image_format"]) if image_format is None: return None # 影像件BASE64编码 image_base64 = kwargs.get("image_base64", globals()["image_base64"]) if image_base64 is None: return None try: # 请求深圳快瞳票据查验接口(兼容增值税发票、医疗门诊/住院收费票据) response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token(servicer="szkt"), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) if not (response.get("status") == 200 and response.get("code") == 10000): raise RuntimeError("深圳快瞳票据查验发生异常") match response["data"]["productCode"]: case "003082": # 增值税发票 extraction = { "票据类型": { "10108": "数电票", "10101": "增值税普通发票", "10100": "增值税专用发票", "30100": "数电票", "30101": "数电票", "30104": "增值税专用发票", "30105": "数电票", "10106": "区块链电子发票", "30109": "数电票", "30121": "增值税普通发票", "10102": "增值税普通发票", "10103": "增值税普通发票", "10107": "数电票", }.get(response["data"]["type"], "其它增值税发票"), "票据号码": response["data"]["details"]["number"], "票据代码": ( code if (code := response["data"]["details"]["code"]) else None ), # 深圳快瞳票据查验中数电票票据代码为空字符,转为NONE "开票日期": datetime.strptime( response["data"]["details"]["date"], "%Y年%m月%d日" ).strftime( "%Y-%m-%d" ), # 深圳快瞳票据查验中就增值税发票开票日期格式为%Y年%m月%d日,转为%Y-%m-%d "校验码": response["data"]["details"]["check_code"], "收款方": response["data"]["details"]["seller"], "付款方": response["data"]["details"]["buyer"], "票据金额": format( Decimal(response["data"]["details"]["total"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "查验状态": ( "真票" if response["data"]["details"]["invoiceTypeNo"] == "0" else "红票" ), "备注": ( remark if (remark := response["data"]["details"]["remark"]) else None ), # 深圳快瞳票据查验中增值税发票备注可能为空字符,转为NONE "项目": [ { "名称": item["name"], "规格": ( specification if (specification := item["specification"]) else None ), "单位": unit if (unit := item["unit"]) else None, "数量": ( format( Decimal(quantity).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if (quantity := item["quantity"]) else None ), "金额": format( ( Decimal(item["total"]) + Decimal(item["tax"]) ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), # 价税合计 } for item in response["data"]["details"].get("items", []) ], } # 深圳快瞳票据查验中就部分增值税发票仅可查,数据标准化抛出异常 return extraction case "003081": # 门诊/住院收费票据 extraction = { "票据类型": ( "门诊收费票据" if "门诊" in response["data"]["billName"] else "住院收费票据" ), "票据号码": response["data"]["billNumber"], "票据代码": response["data"]["billCode"], "开票日期": response["data"][ "invoiceDate" ], # 深圳快瞳票据查验中就收费票据开票日期格式为%Y-%m-%d "校验码": response["data"]["checkCode"], "收款方": response["data"]["payeeName"], "付款方": response["data"]["payer"], "票据金额": format( Decimal(response["data"]["amount"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "查验状态": {"true": "真票", "false": "红票"}[ response["data"]["flushedRed"] ], "备注": response["data"].get("remark"), "医保支付": format( Decimal(response["data"].get("medicarePay", "0.00")).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "其它支付": format( Decimal(response["data"].get("otherPayment", "0.00")).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "个人自付": format( Decimal(response["data"].get("personalPay", "0.00")).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "自付一": format( Decimal(response["data"].get("self_pay_one", "0.00")).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), # 深圳快瞳票据查验中就部分地区无自付一 "自付二": format( Decimal( response["data"].get("classificationPays", "0.00") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), # 深圳快瞳票据查验中就部分地区无自付二 "个人自费": format( Decimal( response["data"].get("personalExpense", "0.00") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "住院日期": ( parse(date.split("-")[0]).strftime("%Y-%m-%d") if (date := response["data"].get("hospitalizationDate")) else None ), # 深圳快瞳票据查验中就收费票据住院日期格式为%Y%m%d-%Y%m%d,即住院日期-出院日期 "出院日期": ( parse(date.split("-")[1]).strftime("%Y-%m-%d") if date else None ), "医疗机构类型": response["data"]["institutionsType"], "项目": [ { "名称": item["itemName"], "规格": item[ "medical_level" ], # 甲类无自付、乙类有自付、丙类全自付 "单位": item["unit"], "数量": format( Decimal(item["number"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "金额": format( Decimal(item["totalAmount"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), } for item in response["data"]["feedetails"] ], } return extraction # 若请求深圳快瞳票据查验接口或解析发生异常,则根据影像件类型请求深圳快瞳增值税发票/收费票据识别接口 except: # 影像件类型 image_type = kwargs.get("image_type", globals()["image_type"]) if image_type is None: return None match image_type: case "增值税发票": try: # 请求深圳快瞳增值税发票识别接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/vatInvoice"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token( servicer="szkt" ), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5((url + image_guid).encode("utf-8")) .hexdigest() .upper(), ) # 若深圳快瞳增值税发票识别响应非成功则返回NONE if not ( response.get("status") == 200 and response.get("code") == 0 ): return None extraction = { "票据类型": ( invoice_type := ( data := { item["desc"]: item["value"] for item in response["data"] } ).get("发票类型") ), "票据号码": (number := data.get("发票号码")), "票据代码": data.get("发票代码"), "开票日期": ( datetime.strptime(date, "%Y年%m月%d日").strftime("%Y-%m-%d") if re.match( r"\d{4}年\d{1,2}月\d{1,2}日", (date := data.get("开票日期")), ) else date ), "校验码": ( check_code if (check_code := data.get("校验码")) else number ), # 若校验码为空则默认为票据号码 "收款方": data.get("销售方名称"), "付款方": data.get("购买方名称"), "票据金额": format( Decimal( data.get("小写金额").replace("¥", "") if invoice_type == "电子发票(普通发票)" else data.get("合计金额(小写)") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "备注": remark if (remark := data.get("备注")) else None, "项目": ( [ { "名称": name, "规格": specification if specification else None, "单位": unit if unit else None, "数量": ( format( Decimal(quantity).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if quantity else None ), "金额": format( (Decimal(amount) + Decimal(tax)).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", # 价税合计,保留两位小数 ), } for name, specification, unit, quantity, amount, tax in zip( [ component["value"] for component in response["data"] if re.match( r"^项目名称(\d+)?$", component["desc"], ) ], [ component["value"] for component in response["data"] if re.match( r"^规格型号(\d+)?$", component["desc"], ) ], [ component["value"] for component in response["data"] if re.match( r"^单位(\d+)?$", component["desc"], ) ], [ component["value"] for component in response["data"] if re.match( r"^数量(\d+)?$", component["desc"], ) ], [ component["value"] for component in response["data"] if re.match( r"^金额(\d+)?$", component["desc"], ) ], [ component["value"] for component in response["data"] if re.match( r"^税额(\d+)?$", component["desc"], ) ], ) ] if invoice_type == "电子发票(普通发票)" else [ { "名称": name, "数量": format( Decimal(quantity).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), "0.2f", ), "金额": format( Decimal(amount).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), } for name, quantity, amount in zip( [ component["value"] for component in response["data"] if re.match( r"^项目名称明细(\d+)?$", component["desc"], ) ], [ component["value"] for component in response["data"] if re.match( r"^项目数量明细(\d+)?$", component["desc"], ) ], [ component["value"] for component in response["data"] if re.match( r"^项目金额明细(\d+)?$", component["desc"], ) ], ) ] ), "查验状态": "无法查验", } return extraction except: return None case _: try: # 请求深圳快瞳收费票据识别接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/medical"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token( servicer="szkt" ), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5((url + image_guid).encode("utf-8")) .hexdigest() .upper(), ) # 若深圳快瞳收费票据识别响应非成功则返回NONE if not ( response.get("status") == 200 and response.get("code") == 0 ): return None extraction = { "票据类型": ( "门诊收费票据" if response["data"]["insured"]["receipt_outpatient"] else "住院收费票据" ), "票据号码": ( receipt := ( response["data"]["insured"]["receipt_outpatient"] or response["data"]["insured"][ "receipt_hospitalization" ] )["receipts"][0] )["receipt_no"][ "value" ], # 默认提取门诊/住院收费票据的第一张票据 "票据代码": receipt["global_detail"]["invoice_code"]["value"], "开票日期": receipt["global_detail"]["invoice_date"][ "value" ], # 深圳快瞳收费票据识别中就开票日期格式为%Y-%m-%d "校验码": fuzzy_match( target="校验码", components=receipt["global_detail"]["region_specific"], specify_key="name", return_key="word.value", ), "收款方": receipt["hospital_name"]["value"], "付款方": receipt["name"]["value"], "票据金额": format( Decimal(receipt["total_amount"]["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ), "医保支付": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance((field := receipt.get("medicare_pay")), dict) else None ), "其它支付": format( ( Decimal(value).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) if ( value := fuzzy_match( target="其它支付", components=receipt.get("global_detail", {}).get( "pay_list", [] ), specify_key="name", return_key="word.value", ) ) else None ), ".2f", ), "个人自付": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance((field := receipt.get("self_pay")), dict) else None ), "自付一": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance(field := (receipt.get("self_pay_one")), dict) else None ), "自付二": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance(field := (receipt.get("self_pay_two")), dict) else None ), "个人自费": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance(field := (receipt.get("self_cost")), dict) else None ), "住院日期": ( datetime.strptime(field["value"], "%Y%m%d").strftime( "%Y-%m-%d" ) if isinstance(field := (receipt.get("starttime")), dict) else None ), "出院日期": ( datetime.strptime(field["value"], "%Y%m%d").strftime( "%Y-%m-%d" ) if isinstance(field := (receipt.get("endtime")), dict) else None ), "医疗机构类型": receipt["others"]["medical_institution_type"][ "value" ], "项目": [ { "名称": ( field["value"] if isinstance((field := item["item_name"]), dict) else None ), "规格": ( field["value"] if isinstance( (field := item["specifications"]), dict ) else None ), "单位": ( field["value"] if isinstance((field := item["unit"]), dict) else None ), "数量": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance((field := item["number"]), dict) else None ), "金额": ( format( Decimal(field["value"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), ".2f", ) if isinstance((field := item["total_amount"]), dict) else None ), } for item in receipt["feeitems"] ], "查验状态": "无法查验", } return extraction except: return None def common_extraction(**kwargs) -> dict | None: """通用数据提取""" # 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识 image_guid = kwargs.get( "image_guid", globals().get("image_guid", uuid.uuid4().hex.upper()) ) # 影像件格式 image_format = kwargs.get("image_format", globals()["image_format"]) if image_format is None: raise RuntimeError("请入参:image_format") # 影像件BASE64编码 image_base64 = kwargs.get("image_base64", globals()["image_base64"]) if image_base64 is None: raise RuntimeError("请入参:image_base64") # 请求深圳快瞳通用文本识别接口 response = globals()["http_client"].post( url=(url := "https://ai.inspirvision.cn/s/api/ocr/general"), headers={"X-RequestId-Header": image_guid}, data={ "token": globals()["authenticator"].get_token(servicer="szkt"), "imgBase64": f"data:image/{image_format};base64,{image_base64}", }, guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) # 若响应非成功,则返回NONE if not (response.get("status") == 200 and response.get("code") == 0): return None # 基于空间坐标法就识别结果中文本框进行分行排序 texts = [] # 重构文本框列表 for text in response["data"]: texts.append( [ # 文本框左上角的X坐标 numpy.float64(text["itemPolygon"]["x"]), # 文本框左上角的Y坐标 numpy.float64(text["itemPolygon"]["y"]), # 文本框的高度 numpy.float64( text["itemPolygon"]["height"] ), # 深圳快瞳基于文本框的Y坐标最大值和最小值的差值 text["value"], ] ) # 按照文本框Y坐标升序(使用空间坐标算法,从上到下,从左到右) texts.sort(key=lambda x: x[1]) rows = [] # 分行 for index, text in enumerate(texts[1:]): # 若为第一行则初始化当前行 if index == 0: row = [texts[0]] continue # 若文本框的Y坐标与当前行中最后一个文本框的Y坐标差值小于阈值,则归为同一行 # noinspection PyUnboundLocalVariable # noinspection PyTypeChecker if text[1] - row[-1][1] < numpy.mean([x[2] for x in row]) * 0.5: row.append(text) # 否则结束当前行、初始化当前行 else: rows.append(row) row = [text] # 添加最后一行 rows.append(row) extraction = [] # 按照文本框X坐标升序 for row in rows: extraction.extend( [x[3].replace(" ", "") for x in sorted(row, key=lambda x: x[0])] ) # 以空格拼接 extraction = " ".join(extraction) # 根据理赔申请书匹配提示词 match application_form := kwargs.get( "application_form", globals().get("application_form") ): case "中行员工福利保障计划索赔申请书": prompt = f""" 指令:你是一个从OCR文本中智能提取信息并生成JSON的工具,请严格按照要求执行。 输入:OCR文本(可能包含错漏): {extraction} 输出要求: 1、只输出可被Python中json.loads()解析的JSON格式字符串,不包含任何代码块标记、说明文字等其它非JSON格式内容 2、无法确定的值设置为`null`(不是"null"字符串) JSON结构: {{ "基础信息": {{ "申请人": "字符串或null", "性别": "字符串或null", "年龄": "字符串或null", "手机": "字符串或null", "身份证号": "字符串或null", "开户银行": "字符串或null", "户名": "字符串或null", "账号": "字符串或null", }}, "票据表格": [ {{ "就诊序号": "字符串或null", "发票日期": "YYYY-MM-DD或null", "发票上的就诊医院/药店": "字符串或null", "票据张数": "字符串或null", "票据金额": "字符串或null", "诊断": "字符串或null" }}, ] }} 开始输出: """ case _: raise RuntimeError(f"理赔申请书{application_form}未设置处理方法") # 请求大语言模型创建对话接口 response = globals()["http_client"].post( url="https://api.siliconflow.cn/v1/chat/completions", headers={ "Authorization": "Bearer sk-xsnuwirjjphhfdbvznfdfjqlinfdlrnlxuhkbbqynfnbhiqz", # 基于硅基流动 "Content-Type": "application/json; charset=utf-8", }, json={ "model": "deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", # 通过从DeepSeek-R1-0528模型蒸馏思维链接至Qwen3-8B-Base获得的模型 "messages": [{"role": "user", "content": prompt}], "max_tokens": 10240, # 生成文本最大令牌数 "temperature": 0.2, "top_p": 0.5, "top_k": 20, "frequency_penalty": 0.0, "thinking_budget": 1, }, guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(), ) extraction = ( json.loads(match.group("json")) if ( match := re.search( r"```json\s*(?P\{.*})\s*```", response["choices"][0]["message"]["content"], re.DOTALL, ) ) else None ) return extraction # 规则模型初始化 def decision(rules_path: Path) -> ZenDecision: def loader(path): with open(path, "r") as file: return file.read() return ZenEngine({"loader": loader}).get_decision(rules_path.as_posix()) def disease_diagnosis(**kwargs) -> str | None: """疾病推定""" # 赔案档案:优先使用关键词变量,其次使用全局变量 dossier = kwargs.get("dossier", globals().get("dossier")) prompt = f""" 指令:你是一个医学疾病分类诊断的工具,请严格按照要求执行。 患者信息: 性别 {gender if (gender := dossier["赔案层"]["申请人信息"].get("性别")) is not None else "未知"}, 年龄 {age if (age := dossier["赔案层"]["申请人信息"].get("年龄")) is not None else "未知"}, 近期在药房/医院开具发票中内容 {dossier["赔案层"]["其它信息"]["小项合集"]} 输出要求: 1、患者自述症状在 {dossier["赔案层"]["其它信息"]["自述症状"]} 其中之一 2、依据患者信息、自述症状和其提供的发票中内容 {kwargs["items"]} 综合诊断,只输出一个最可能的ICD-11中的疾病分类中亚类目代码对应的中文名称字符串,不包含任何代码块标记、说明文字等 开始输出: """ # 请求大语言模型创建对话接口 response = globals()["http_client"].post( url="https://ark.cn-beijing.volces.com/api/v3/chat/completions", headers={ "Authorization": "Bearer 2c28ab07-888c-45be-84a2-fc4b2cb5f3f2", # 火山引擎 "Content-Type": "application/json; charset=utf-8", }, json={ "model": "deepseek-r1-250528", "messages": [ {"role": "system", "content": "你是人工智能助手"}, {"role": "user", "content": prompt}, ], "temperature": 0.2, "top_p": 0.5, "top_k": 20, "frequency_penalty": 0.0, "thinking_budget": 1, }, guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(), ) recognition = ( match.group("text") if ( match := re.match( r"\s*(?P.*)", response["choices"][0]["message"]["content"] ) ) else None ) return recognition # ------------------------- # 主程序 # ------------------------- if __name__ == "__main__": # 初始化HTTP客户端 http_client = HTTPClient(timeout=300, cache_enabled=True) # 初始化认证器 authenticator = Authenticator() # 初始化工作目录地址对象 directory_path = Path("directory") # 若不存在则创建 directory_path.mkdir(parents=True, exist_ok=True) # 初始化影像件识别规则引擎 recognize_decision = decision(Path("rules/影像件是否需要数据提取.json")) # 初始化JINJA2环境 environment = Environment(loader=FileSystemLoader(".")) # 添加DATE过滤器 environment.filters["date"] = lambda date: ( date.strftime("%Y-%m-%d") if date else "长期" ) # 加载赔案档案模版 template = environment.get_template("template.html") # 遍历工作目录中赔案目录 for case_path in [ case_path for case_path in directory_path.iterdir() if case_path.is_dir() ]: # 初始化赔案档案 dossier = { "影像件层": [], "赔案层": { "赔案编号": ( case_number := case_path.stem ), # 假设赔案已签收,系统已生成赔案编号并根据签收时填报保单编号可知保险总公司、保险分公司和投保公司 "签收保单编号": "3291120243205000000002", "保险总公司": "中银保险有限公司", "保险分公司": None, # 实验阶段保险分公司、投保公司和申请时间为NONE "投保公司": None, "申请时间": None, "申请人信息": {}, "受益人信息": {}, "被保人信息": {}, # 实验阶段被保人信息为空字典 "其它信息": {}, }, "发票层": [], "小项层": [], } # 遍历赔案目录中影像件地址 for image_index, image_path in enumerate( sorted(case_path.glob(pattern="*"), key=lambda x: x.stat().st_ctime), 1 ): dossier["影像件层"].append( { "影像件序号": (image_index := f"{image_index:02d}"), "影像件名称": (image_name := image_path.name), } ) # 若影像件格式非JPG/JPEG/PNG则跳过该影像件 if (image_format := image_path.suffix.lower().lstrip(".")) not in [ "jpg", "jpeg", "png", ]: dossier["影像件层"][-1]["已分类"] = "否,不支持的影像件" continue # 影像件读取 image = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE) # 若发生异常则跳过该影像件 if image is None: dossier["影像件层"][-1]["已分类"] = "否,读取异常" continue # 影像件压缩(输出BASE64编码) image_guid, image_base64 = images_compression() # 若发生异常则跳过该影像件 if image_guid is None or image_base64 is None: dossier["影像件层"][-1]["已分类"] = "否,压缩异常" continue # 通过请求深圳快瞳影像件分类接口获取影像件类型和方向 image_type, image_orientation = images_classification() # 若发生异常则跳过该影像件 if image_type is None or image_orientation is None: dossier["影像件层"][-1]["已分类"] = "否,影像件分类异常" continue # 若影像件方向非0度,则影像件旋正并在此压缩 if image_orientation != "0度": # 影像件旋正 image = cv2.rotate( image, { "顺时针90度": cv2.ROTATE_90_COUNTERCLOCKWISE, # 逆时针旋转90度 "180度": cv2.ROTATE_180, # 旋转180度 "逆时针90度": cv2.ROTATE_90_CLOCKWISE, # 顺时针旋转90度 }[image_orientation], ) # 影像件再次压缩 image_guid, image_base64 = images_compression() if image_guid is None or image_base64 is None: dossier["影像件层"][-1]["已分类"] = "否,压缩异常" continue dossier["影像件层"][-1].update({"已分类": "是", "影像件类型": image_type}) # 根据保险总公司和影像件类型评估影像件是否需要数据提取,若无需数据提取则跳过该影像件(例如,中银保险有限公司理赔申请书包含户名、开户银行和银行账号,无需识别银行卡) if not recognize_decision.evaluate( { "insurer": (insurer := dossier["赔案层"]["保险总公司"]), "image_type": image_type, } )["result"]["extract"]: dossier["影像件层"][-1]["已识别"] = "否,无需识别" continue # 根据影像件类型匹配影像件数据提取 # noinspection PyUnreachableCode match image_type: case "居民身份证(正背面)" | "居民身份证(正面)" | "居民身份证(背面)": extraction = idcard_extraction() # 若发生异常则跳过该影像件 if extraction is None: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue if image_type in ["居民身份证(正背面)", "居民身份证(正面)"]: dossier["赔案层"]["申请人信息"].update( { "证件有效期起": datetime.strptime( extraction["有效期起"], "%Y-%m-%d" ), "证件有效期止": ( date if (date := extraction["有效期止"]) == "长期" else datetime.strptime(date, "%Y-%m-%d") ), # 若证件有效期止为NONE默认为“长期”, } ) # 原则上由影像件数据提取环节负责数据标准化,赔案档案数据填充环节负责数据机构化 if image_type in ["居民身份证(正背面)", "居民身份证(背面)"]: dossier["赔案层"]["申请人信息"].update( { "姓名": extraction["姓名"], "证件类型": "居民身份证", "证件号码": extraction["公民身份号码"], "性别": extraction["性别"], "出生": datetime.strptime( extraction["出生"], "%Y-%m-%d" ), # 默认日期格式为%Y-%m-%d "省": ( address := parse_location(extraction["住址"]) ).get("province"), "地": address.get("city"), "县": address.get("county"), "详细地址": address.get("detail"), } ) case "银行卡": extraction = bankcard_extraction() # 若发生异常则跳过该影像件 if extraction is None: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue dossier["赔案层"]["受益人信息"].update( { "开户银行": extraction["银行名称"], "银行账号": extraction["卡号"], } ) case "增值税发票" | "门诊收费票据" | "住院收费票据": extraction = invoice_extraction() # 若发生异常则跳过该影像件 if extraction is None: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue dossier["发票层"].append( { "关联影像件序号": image_index, "票据类型": extraction["票据类型"], "票据号码": extraction["票据号码"], "票据代码": ( extraction["票据代码"] if extraction["票据代码"] else "--" ), # 数电票无票据代码,校验码同票据号码 "开票日期": datetime.strptime( extraction["开票日期"], "%Y-%m-%d" ), "校验码后六位": ( check_code[-6:] if (check_code := extraction["校验码"]) else "--" ), "医药机构": extraction["收款方"], "就诊人": ( match.group("name") if ( match := re.search( r"^(?P[^((]+)", extraction["付款方"] ) ) else extraction["付款方"] ), "票据金额": Decimal(extraction["票据金额"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP ), # 默认金额转为小数,保留两位小数 "查验状态": extraction["查验状态"], "项目": ( pandas.DataFrame(extraction["项目"]) .assign( 数量=lambda dataframe: dataframe["数量"].apply( lambda row: ( Decimal(row).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) if row else Decimal("0.00") ) ), 金额=lambda dataframe: dataframe["金额"].apply( lambda row: ( Decimal(row).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ) if row else Decimal("0.00") ) ), ) .groupby(by="名称", as_index=False) .agg(数量=("数量", "sum"), 金额=("金额", "sum")) .assign( 大项=lambda dataframe: dataframe["名称"].apply( lambda row: ( match.group("category") if ( match := re.match( r"^\*(?P.+?)\*.*$", row, ) ) else row ) ), 小项=lambda dataframe: dataframe["名称"].apply( lambda row: ( re.sub( r"[^\u4e00-\u9fa5a-zA-Z0-9./%*]", "", match.group("name"), ) if ( match := re.match( r"^\*.+?\*(?:\[[^]]+])?(?P[^\s(]+)(?:\([^\s(]+\))?(?:.*?)?$", row, ) ) else "" ) ), ) .loc[ lambda dataframe: dataframe["金额"] != 0, ["名称", "大项", "小项", "数量", "金额"], ] .to_dict(orient="records") ), "就诊类型": ( "药店购药" if "增值税发票" in image_type else ( "门诊就诊" if "门诊收费票据" in image_type else "住院治疗" ) ), } ) case "理赔申请书": # 根据保险总公司匹配理赔申请书 # noinspection PyUnreachableCode match insurer: case "中银保险有限公司": extraction = common_extraction( application_form="中行员工福利保障计划索赔申请书" ) # 若识别异常则跳过该影像件 if extraction is None: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue dossier["赔案层"]["申请人信息"].update( { "与被保险人关系": "本人", # 中银保险有限公司:默认申请人与被保险人关系为本人 "年龄": ( Decimal(age).quantize( Decimal("0"), rounding=ROUND_HALF_UP, ) if ( age := extraction.get("基础信息", {}).get( "年龄", "--" ) ).isdigit() else age ), # 若年龄仅数字则转为小数、取整,否则默认为“--” "手机号": ( phone_number if re.match( r"^1[3-9]\d{9}$", phone_number := extraction.get( "基础信息", {} ).get("手机", "--"), ) else phone_number ), # 若手机未正则匹配手机号格式则为“--” } ) dossier["赔案层"]["受益人信息"].update( { "与被保险人关系": "本人", # 中银保险有限公司:默认受益人与被保人关系为本人 "户名": ( account_name if ( account_name := extraction.get( "基础信息", {} ).get("户名") ) else "--" ), # 若户名为NONE则为“--” "开户银行": ( account_name if ( account_name := extraction.get( "基础信息", {} ).get("开户银行") ) else "--" ), # 若开户银行为NONE则为“--” "银行账号": ( account_name if ( account_name := extraction.get( "基础信息", {} ).get("账号") ) is not None else "--" ), # 若银行账号为NONE则为“--” } ) dossier["赔案层"]["其它信息"]["自述症状"] = ( ("、".join(diagnoses)) if ( diagnoses := sorted( set( "、".join( [ diagnosis for invoice in extraction.get( "票据表格", [] ) if ( diagnosis := invoice.get("诊断") ) ] ).split("、") ) ) ) else "--" ) case _: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue case _: dossier["影像件层"][-1]["已识别"] = "否,无法识别" continue dossier["影像件层"][-1].update( { "已识别": "是", "识别结果": extraction, } ) # 发票层根据开票日期顺序排序 dossier["发票层"] = sorted( dossier["发票层"], key=lambda x: (x["开票日期"], x["票据号码"]) ) # 构建小项层 # noinspection PyTypeChecker dossier["小项层"] = ( pandas.DataFrame( [ { "小项": item["小项"], "数量": item["数量"], "金额": item["金额"], } for invoice in dossier["发票层"] for item in invoice["项目"] ] ) .groupby(by="小项", as_index=False) .agg(数量=("数量", "sum"), 金额=("金额", "sum")) .to_dict(orient="records") ) for invoice in dossier["发票层"]: # noinspection PyTypeChecker invoice["推定疾病"] = disease_diagnosis( items="、".join(sorted(set([item["小项"] for item in invoice["项目"]]))) ) print(dossier) exit() with open(f"dossiers/{case_number}.html", "w", encoding="utf-8") as file: file.write( template.render( { "dossier": dossier, } ) )