Python/普康健康自动化录入/main.py

1568 lines
69 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
普康健康_自动化录入
--优先使用深圳快瞳,就增值税发票、医疗发票优先使用深圳快瞳票据查验、其次使用深圳快瞳票据识别,最后使用本地识别
--优先考虑增值税发票
"""
import hashlib
import json
import re
import uuid
from base64 import b64encode
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
import cv2
import numpy
import pandas
from dateutil.parser import parse
from jinja2 import Environment, FileSystemLoader
from jionlp import parse_location
from zen import ZenDecision, ZenEngine
from utils.client import Authenticator, HTTPClient
from utils.ocr import fuzzy_match
# -------------------------
# 工具函数
# -------------------------
def images_compression(**kwargs) -> tuple[str | None, str | None]:
"""影像件压缩并BASE64编码"""
# 影像件格式
image_format = kwargs.get("image_format", globals()["image_format"])
if image_format is None:
return None, None
# 影像件
image = kwargs.get("image", globals()["image"])
if image is None:
return None, None
# 编码为图像字节流
success, image_bytes = cv2.imencode(ext=f".{image_format}", img=image)
# 若发生异常则返回NONE
if not success or image_bytes is None:
return None, None
# 生成影像件唯一标识
image_guid = hashlib.md5(image_bytes.tobytes()).hexdigest().upper()
# BASE64编码
image_base64 = b64encode(image_bytes.tobytes()).decode("utf-8")
# 将指定影像件大小单位由MB转为B
image_size_specified = kwargs.get("image_size_specified", 2) * 1048576
# 若影像件BASE64编码后大小小于指定影像件大小则返回
if len(image_base64) <= image_size_specified:
return image_guid, image_base64
# 双循环压缩影像件
# 外循环压缩:通过降低影像件质量实现压缩影像件大小
for quality in range(90, 0, -10):
image_copy = image.copy()
# 内循环压缩:通过等比例调整影像件尺寸实现压缩影像件大小
for i in range(25):
success, image_bytes = cv2.imencode(
ext=f".{image_format}",
img=image_copy,
params=(
[cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10]
if image_format == "png"
else [cv2.IMWRITE_JPEG_QUALITY, quality]
),
)
if not success or image_bytes is None:
break
image_base64 = b64encode(image_bytes.tobytes()).decode("utf-8")
if len(image_base64) <= image_size_specified:
return image_guid, image_base64
# 调整影像件尺寸
image_copy = cv2.resize(
src=image_copy,
dsize=(int(image_copy.shape[0] * 0.9), int(image_copy.shape[1] * 0.9)),
interpolation=cv2.INTER_AREA,
)
# 若调整后影像件尺寸中长或宽小于200像素则停止调整影像件尺寸
if min(image_copy.shape[:2]) < 200:
break
# 若仍未压缩至指定影像件大小则返回NONE
return None, None
def images_classification(**kwargs) -> tuple[str | None, str | None]:
"""影像件分类"""
# 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识
image_guid = kwargs.get(
"image_guid", globals().get("image_guid", uuid.uuid4().hex.upper())
)
# 影像件格式
image_format = kwargs.get("image_format", globals()["image_format"])
if image_format is None:
return None, None
# 影像件BASE64编码
image_base64 = kwargs.get("image_base64", globals()["image_base64"])
if image_base64 is None:
return None, None
# 请求深圳快瞳影像件分类接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/genalClassify"),
# 用于和深圳快瞳联查定位
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(
servicer="szkt"
), # 使用全局变量
"imgBase64": f"data:image/{image_format};base64,{image_base64}", # 深圳快瞳要求修饰影像件BASE64编码的DATAURI
},
guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功则返回NONE
if not (response.get("status") == 200 and response.get("code") == 0):
return None, None
# 根据票据类型和票据二级分类映射影像件类型
match (response["data"]["flag"], response["data"]["type"]):
case (7, "idcard-front-back"):
image_type = "居民身份证(正背面)"
case (7, "idcard-front"):
image_type = "居民身份证(正面)" # 包含国徽一面
case (7, "idcard-back"):
image_type = "居民身份证(背面)" # 包含头像一面
case (8, _):
image_type = "银行卡"
case (4, _):
image_type = "增值税发票"
case (5, _):
image_type = "门诊收费票据"
case (3, _):
image_type = "住院收费票据"
case (18, _):
image_type = "理赔申请书"
case _:
return None, None
# 影像件方向
image_orientation = {
"0": "0度",
"90": "顺时针90度",
"180": "180度",
"270": "逆时针90度",
}.get(response["data"]["angle"], "0度")
return image_type, image_orientation
def idcard_extraction(**kwargs) -> dict | None:
"""居民身份证数据提取"""
# 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识
image_guid = kwargs.get(
"image_guid", globals().get("image_guid", uuid.uuid4().hex.upper())
)
# 影像件格式
image_format = kwargs.get("image_format", globals()["image_format"])
if image_format is None:
return None
# 影像件BASE64编码
image_base64 = kwargs.get("image_base64", globals()["image_base64"])
if image_base64 is None:
return None
# 请求深圳快瞳居民身份证识别接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"),
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(
servicer="szkt"
), # 使用全局变量
"imgBase64": f"data:image/{image_format};base64,{image_base64}",
}, # 支持同时识别居民身份证正反面
guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功则返回NONE
if not (response.get("status") == 200 and response.get("code") == 0):
return None
extraction = {
"姓名": response["data"]["name"],
"性别": response["data"]["sex"],
"民族": response["data"]["nation"],
"出生": response["data"][
"birthday"
], # 深圳快瞳居民身份证出生日期格式为%Y-%m-%d
"住址": response["data"]["address"],
"公民身份号码": response["data"]["idNo"],
"签发机关": response["data"]["issuedBy"],
"有效期起": parse(
(date := response["data"]["validDate"]).split("-")[0]
).strftime(
"%Y-%m-%d"
), # 深圳快瞳居民身份证识别中有效期日期格式为%Y.%m.%d转为%Y-%m-%d
"有效期止": (
date
if (date := date.split("-")[1]) == "长期"
else parse(date).strftime("%Y-%m-%d")
),
}
return extraction
def bankcard_extraction(**kwargs) -> dict | None:
"""银行卡数据提取"""
# 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识
image_guid = kwargs.get(
"image_guid", globals().get("image_guid", uuid.uuid4().hex.upper())
)
# 影像件格式
image_format = kwargs.get("image_format", globals()["image_format"])
if image_format is None:
raise RuntimeError("请入参image_format")
# 影像件BASE64编码
image_base64 = kwargs.get("image_base64", globals()["image_base64"])
if image_base64 is None:
raise RuntimeError("请入参image_base64")
# 请求深圳快瞳银行卡识别接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/bankCard"),
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(servicer="szkt"),
"imgBase64": f"data:image/{image_format};base64,{image_base64}",
},
guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功则返回NONE
if not (response.get("status") == 200 and response.get("code") == 0):
return None
extraction = {
"卡类型": {"1": "借记卡", "2": "贷记卡"}.get(
response["data"]["bankCardType"], "其它"
), # 0不能识别、3准贷记卡、4预付卡合并为其它
"银行名称": response["data"]["bankInfo"],
"卡号": response["data"]["cardNo"].replace(" ", ""),
}
return extraction
def invoice_extraction(**kwargs) -> dict | None:
"""增值税发票/收费票据数据提取"""
# 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识
image_guid = kwargs.get(
"image_guid", globals().get("image_guid", uuid.uuid4().hex.upper())
)
# 影像件格式
image_format = kwargs.get("image_format", globals()["image_format"])
if image_format is None:
return None
# 影像件BASE64编码
image_base64 = kwargs.get("image_base64", globals()["image_base64"])
if image_base64 is None:
return None
try:
# 请求深圳快瞳票据查验接口(兼容增值税发票、医疗门诊/住院收费票据)
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll"),
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(servicer="szkt"),
"imgBase64": f"data:image/{image_format};base64,{image_base64}",
},
guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
if not (response.get("status") == 200 and response.get("code") == 10000):
raise RuntimeError("深圳快瞳票据查验发生异常")
match response["data"]["productCode"]:
case "003082": # 增值税发票
extraction = {
"票据类型": {
"10108": "数电票",
"10101": "增值税普通发票",
"10100": "增值税专用发票",
"30100": "数电票",
"30101": "数电票",
"30104": "增值税专用发票",
"30105": "数电票",
"10106": "区块链电子发票",
"30109": "数电票",
"30121": "增值税普通发票",
"10102": "增值税普通发票",
"10103": "增值税普通发票",
"10107": "数电票",
}.get(response["data"]["type"], "其它增值税发票"),
"票据号码": response["data"]["details"]["number"],
"票据代码": (
code if (code := response["data"]["details"]["code"]) else None
), # 深圳快瞳票据查验中数电票票据代码为空字符转为NONE
"开票日期": datetime.strptime(
response["data"]["details"]["date"], "%Y年%m月%d"
).strftime(
"%Y-%m-%d"
), # 深圳快瞳票据查验中就增值税发票开票日期格式为%Y年%m月%d日转为%Y-%m-%d
"校验码": response["data"]["details"]["check_code"],
"收款方": response["data"]["details"]["seller"],
"付款方": response["data"]["details"]["buyer"],
"票据金额": format(
Decimal(response["data"]["details"]["total"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"查验状态": (
"真票"
if response["data"]["details"]["invoiceTypeNo"] == "0"
else "红票"
),
"备注": (
remark
if (remark := response["data"]["details"]["remark"])
else None
), # 深圳快瞳票据查验中增值税发票备注可能为空字符转为NONE
"项目": [
{
"名称": item["name"],
"规格": (
specification
if (specification := item["specification"])
else None
),
"单位": unit if (unit := item["unit"]) else None,
"数量": (
format(
Decimal(quantity).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if (quantity := item["quantity"])
else None
),
"金额": format(
(
Decimal(item["total"]) + Decimal(item["tax"])
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
), # 价税合计
}
for item in response["data"]["details"].get("items", [])
],
} # 深圳快瞳票据查验中就部分增值税发票仅可查,数据标准化抛出异常
return extraction
case "003081": # 门诊/住院收费票据
extraction = {
"票据类型": (
"门诊收费票据"
if "门诊" in response["data"]["billName"]
else "住院收费票据"
),
"票据号码": response["data"]["billNumber"],
"票据代码": response["data"]["billCode"],
"开票日期": response["data"][
"invoiceDate"
], # 深圳快瞳票据查验中就收费票据开票日期格式为%Y-%m-%d
"校验码": response["data"]["checkCode"],
"收款方": response["data"]["payeeName"],
"付款方": response["data"]["payer"],
"票据金额": format(
Decimal(response["data"]["amount"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"查验状态": {"true": "真票", "false": "红票"}[
response["data"]["flushedRed"]
],
"备注": response["data"].get("remark"),
"医保支付": format(
Decimal(response["data"].get("medicarePay", "0.00")).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"其它支付": format(
Decimal(response["data"].get("otherPayment", "0.00")).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"个人自付": format(
Decimal(response["data"].get("personalPay", "0.00")).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"自付一": format(
Decimal(response["data"].get("self_pay_one", "0.00")).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
), # 深圳快瞳票据查验中就部分地区无自付一
"自付二": format(
Decimal(
response["data"].get("classificationPays", "0.00")
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
), # 深圳快瞳票据查验中就部分地区无自付二
"个人自费": format(
Decimal(
response["data"].get("personalExpense", "0.00")
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"住院日期": (
parse(date.split("-")[0]).strftime("%Y-%m-%d")
if (date := response["data"].get("hospitalizationDate"))
else None
), # 深圳快瞳票据查验中就收费票据住院日期格式为%Y%m%d-%Y%m%d即住院日期-出院日期
"出院日期": (
parse(date.split("-")[1]).strftime("%Y-%m-%d") if date else None
),
"医疗机构类型": response["data"]["institutionsType"],
"项目": [
{
"名称": item["itemName"],
"规格": item[
"medical_level"
], # 甲类无自付、乙类有自付、丙类全自付
"单位": item["unit"],
"数量": format(
Decimal(item["number"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"金额": format(
Decimal(item["totalAmount"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
}
for item in response["data"]["feedetails"]
],
}
return extraction
# 若请求深圳快瞳票据查验接口或解析发生异常,则根据影像件类型请求深圳快瞳增值税发票/收费票据识别接口
except:
# 影像件类型
image_type = kwargs.get("image_type", globals()["image_type"])
if image_type is None:
return None
match image_type:
case "增值税发票":
try:
# 请求深圳快瞳增值税发票识别接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/vatInvoice"),
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(
servicer="szkt"
),
"imgBase64": f"data:image/{image_format};base64,{image_base64}",
},
guid=hashlib.md5((url + image_guid).encode("utf-8"))
.hexdigest()
.upper(),
)
# 若深圳快瞳增值税发票识别响应非成功则返回NONE
if not (
response.get("status") == 200 and response.get("code") == 0
):
return None
extraction = {
"票据类型": (
invoice_type := (
data := {
item["desc"]: item["value"]
for item in response["data"]
}
).get("发票类型")
),
"票据号码": (number := data.get("发票号码")),
"票据代码": data.get("发票代码"),
"开票日期": (
datetime.strptime(date, "%Y年%m月%d").strftime("%Y-%m-%d")
if re.match(
r"\d{4}\d{1,2}月\d{1,2}日",
(date := data.get("开票日期")),
)
else date
),
"校验码": (
check_code if (check_code := data.get("校验码")) else number
), # 若校验码为空则默认为票据号码
"收款方": data.get("销售方名称"),
"付款方": data.get("购买方名称"),
"票据金额": format(
Decimal(
data.get("小写金额").replace("¥", "")
if invoice_type == "电子发票(普通发票)"
else data.get("合计金额(小写)")
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"备注": remark if (remark := data.get("备注")) else None,
"项目": (
[
{
"名称": name,
"规格": specification if specification else None,
"单位": unit if unit else None,
"数量": (
format(
Decimal(quantity).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if quantity
else None
),
"金额": format(
(Decimal(amount) + Decimal(tax)).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f", # 价税合计,保留两位小数
),
}
for name, specification, unit, quantity, amount, tax in zip(
[
component["value"]
for component in response["data"]
if re.match(
r"^项目名称(\d+)?$",
component["desc"],
)
],
[
component["value"]
for component in response["data"]
if re.match(
r"^规格型号(\d+)?$",
component["desc"],
)
],
[
component["value"]
for component in response["data"]
if re.match(
r"^单位(\d+)?$",
component["desc"],
)
],
[
component["value"]
for component in response["data"]
if re.match(
r"^数量(\d+)?$",
component["desc"],
)
],
[
component["value"]
for component in response["data"]
if re.match(
r"^金额(\d+)?$",
component["desc"],
)
],
[
component["value"]
for component in response["data"]
if re.match(
r"^税额(\d+)?$",
component["desc"],
)
],
)
]
if invoice_type == "电子发票(普通发票)"
else [
{
"名称": name,
"数量": format(
Decimal(quantity).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
"0.2f",
),
"金额": format(
Decimal(amount).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
}
for name, quantity, amount in zip(
[
component["value"]
for component in response["data"]
if re.match(
r"^项目名称明细(\d+)?$",
component["desc"],
)
],
[
component["value"]
for component in response["data"]
if re.match(
r"^项目数量明细(\d+)?$",
component["desc"],
)
],
[
component["value"]
for component in response["data"]
if re.match(
r"^项目金额明细(\d+)?$",
component["desc"],
)
],
)
]
),
"查验状态": "无法查验",
}
return extraction
except:
return None
case _:
try:
# 请求深圳快瞳收费票据识别接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/medical"),
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(
servicer="szkt"
),
"imgBase64": f"data:image/{image_format};base64,{image_base64}",
},
guid=hashlib.md5((url + image_guid).encode("utf-8"))
.hexdigest()
.upper(),
)
# 若深圳快瞳收费票据识别响应非成功则返回NONE
if not (
response.get("status") == 200 and response.get("code") == 0
):
return None
extraction = {
"票据类型": (
"门诊收费票据"
if response["data"]["insured"]["receipt_outpatient"]
else "住院收费票据"
),
"票据号码": (
receipt := (
response["data"]["insured"]["receipt_outpatient"]
or response["data"]["insured"][
"receipt_hospitalization"
]
)["receipts"][0]
)["receipt_no"][
"value"
], # 默认提取门诊/住院收费票据的第一张票据
"票据代码": receipt["global_detail"]["invoice_code"]["value"],
"开票日期": receipt["global_detail"]["invoice_date"][
"value"
], # 深圳快瞳收费票据识别中就开票日期格式为%Y-%m-%d
"校验码": fuzzy_match(
target="校验码",
components=receipt["global_detail"]["region_specific"],
specify_key="name",
return_key="word.value",
),
"收款方": receipt["hospital_name"]["value"],
"付款方": receipt["name"]["value"],
"票据金额": format(
Decimal(receipt["total_amount"]["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
),
"医保支付": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance((field := receipt.get("medicare_pay")), dict)
else None
),
"其它支付": format(
(
Decimal(value).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if (
value := fuzzy_match(
target="其它支付",
components=receipt.get("global_detail", {}).get(
"pay_list", []
),
specify_key="name",
return_key="word.value",
)
)
else None
),
".2f",
),
"个人自付": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance((field := receipt.get("self_pay")), dict)
else None
),
"自付一": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance(field := (receipt.get("self_pay_one")), dict)
else None
),
"自付二": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance(field := (receipt.get("self_pay_two")), dict)
else None
),
"个人自费": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance(field := (receipt.get("self_cost")), dict)
else None
),
"住院日期": (
datetime.strptime(field["value"], "%Y%m%d").strftime(
"%Y-%m-%d"
)
if isinstance(field := (receipt.get("starttime")), dict)
else None
),
"出院日期": (
datetime.strptime(field["value"], "%Y%m%d").strftime(
"%Y-%m-%d"
)
if isinstance(field := (receipt.get("endtime")), dict)
else None
),
"医疗机构类型": receipt["others"]["medical_institution_type"][
"value"
],
"项目": [
{
"名称": (
field["value"]
if isinstance((field := item["item_name"]), dict)
else None
),
"规格": (
field["value"]
if isinstance(
(field := item["specifications"]), dict
)
else None
),
"单位": (
field["value"]
if isinstance((field := item["unit"]), dict)
else None
),
"数量": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance((field := item["number"]), dict)
else None
),
"金额": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance((field := item["total_amount"]), dict)
else None
),
}
for item in receipt["feeitems"]
],
"查验状态": "无法查验",
}
return extraction
except:
return None
def common_extraction(**kwargs) -> dict | None:
"""通用数据提取"""
# 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识
image_guid = kwargs.get(
"image_guid", globals().get("image_guid", uuid.uuid4().hex.upper())
)
# 影像件格式
image_format = kwargs.get("image_format", globals()["image_format"])
if image_format is None:
raise RuntimeError("请入参image_format")
# 影像件BASE64编码
image_base64 = kwargs.get("image_base64", globals()["image_base64"])
if image_base64 is None:
raise RuntimeError("请入参image_base64")
# 请求深圳快瞳通用文本识别接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/general"),
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(servicer="szkt"),
"imgBase64": f"data:image/{image_format};base64,{image_base64}",
},
guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功则返回NONE
if not (response.get("status") == 200 and response.get("code") == 0):
return None
# 基于空间坐标法就识别结果中文本框进行分行排序
texts = []
# 重构文本框列表
for text in response["data"]:
texts.append(
[
# 文本框左上角的X坐标
numpy.float64(text["itemPolygon"]["x"]),
# 文本框左上角的Y坐标
numpy.float64(text["itemPolygon"]["y"]),
# 文本框的高度
numpy.float64(
text["itemPolygon"]["height"]
), # 深圳快瞳基于文本框的Y坐标最大值和最小值的差值
text["value"],
]
)
# 按照文本框Y坐标升序使用空间坐标算法从上到下从左到右
texts.sort(key=lambda x: x[1])
rows = []
# 分行
for index, text in enumerate(texts[1:]):
# 若为第一行则初始化当前行
if index == 0:
row = [texts[0]]
continue
# 若文本框的Y坐标与当前行中最后一个文本框的Y坐标差值小于阈值则归为同一行
# noinspection PyUnboundLocalVariable
# noinspection PyTypeChecker
if text[1] - row[-1][1] < numpy.mean([x[2] for x in row]) * 0.5:
row.append(text)
# 否则结束当前行、初始化当前行
else:
rows.append(row)
row = [text]
# 添加最后一行
rows.append(row)
extraction = []
# 按照文本框X坐标升序
for row in rows:
extraction.extend(
[x[3].replace(" ", "") for x in sorted(row, key=lambda x: x[0])]
)
# 以空格拼接
extraction = " ".join(extraction)
# 根据理赔申请书匹配提示词
match application_form := kwargs.get(
"application_form", globals().get("application_form")
):
case "中行员工福利保障计划索赔申请书":
prompt = f"""
指令你是一个从OCR文本中智能提取信息并生成JSON的工具请严格按照要求执行。
输入OCR文本可能包含错漏
{extraction}
输出要求:
1、只输出可被Python中json.loads()解析的JSON格式字符串不包含任何代码块标记、说明文字等其它非JSON格式内容
2、无法确定的值设置为`null`(不是"null"字符串)
JSON结构
{{
"基础信息": {{
"申请人": "字符串或null",
"性别": "字符串或null",
"年龄": "字符串或null",
"手机": "字符串或null",
"身份证号": "字符串或null",
"开户银行": "字符串或null",
"户名": "字符串或null",
"账号": "字符串或null",
}},
"票据表格": [
{{
"就诊序号": "字符串或null",
"发票日期": "YYYY-MM-DD或null",
"发票上的就诊医院/药店": "字符串或null",
"票据张数": "字符串或null",
"票据金额": "字符串或null",
"诊断": "字符串或null"
}},
]
}}
开始输出:
"""
case _:
raise RuntimeError(f"理赔申请书{application_form}未设置处理方法")
# 请求大语言模型创建对话接口
response = globals()["http_client"].post(
url="https://api.siliconflow.cn/v1/chat/completions",
headers={
"Authorization": "Bearer sk-xsnuwirjjphhfdbvznfdfjqlinfdlrnlxuhkbbqynfnbhiqz", # 基于硅基流动
"Content-Type": "application/json; charset=utf-8",
},
json={
"model": "deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", # 通过从DeepSeek-R1-0528模型蒸馏思维链接至Qwen3-8B-Base获得的模型
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 10240, # 生成文本最大令牌数
"temperature": 0.2,
"top_p": 0.5,
"top_k": 20,
"frequency_penalty": 0.0,
"thinking_budget": 1,
},
guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(),
)
extraction = (
json.loads(match.group("json"))
if (
match := re.search(
r"```json\s*(?P<json>\{.*})\s*```",
response["choices"][0]["message"]["content"],
re.DOTALL,
)
)
else None
)
return extraction
# 规则模型初始化
def decision(rules_path: Path) -> ZenDecision:
def loader(path):
with open(path, "r") as file:
return file.read()
return ZenEngine({"loader": loader}).get_decision(rules_path.as_posix())
def disease_diagnosis(**kwargs) -> str | None:
"""疾病推定"""
# 赔案档案:优先使用关键词变量,其次使用全局变量
dossier = kwargs.get("dossier", globals().get("dossier"))
prompt = f"""
指令:你是一个医学疾病分类诊断的工具,请严格按照要求执行。
患者信息:
性别 {gender if (gender := dossier["赔案层"]["申请人信息"].get("性别")) is not None else "未知"}
年龄 {age if (age := dossier["赔案层"]["申请人信息"].get("年龄")) is not None else "未知"}
近期在药房/医院开具发票中内容 {dossier["赔案层"]["其它信息"]["小项合集"]}
输出要求:
1、患者自述症状在 {dossier["赔案层"]["其它信息"]["自述症状"]} 其中之一
2、依据患者信息、自述症状和其提供的发票中内容 {kwargs["items"]} 综合诊断只输出一个最可能的ICD-11中的疾病分类中亚类目代码对应的中文名称字符串不包含任何代码块标记、说明文字等
开始输出:
"""
# 请求大语言模型创建对话接口
response = globals()["http_client"].post(
url="https://ark.cn-beijing.volces.com/api/v3/chat/completions",
headers={
"Authorization": "Bearer 2c28ab07-888c-45be-84a2-fc4b2cb5f3f2", # 火山引擎
"Content-Type": "application/json; charset=utf-8",
},
json={
"model": "deepseek-r1-250528",
"messages": [
{"role": "system", "content": "你是人工智能助手"},
{"role": "user", "content": prompt},
],
"temperature": 0.2,
"top_p": 0.5,
"top_k": 20,
"frequency_penalty": 0.0,
"thinking_budget": 1,
},
guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(),
)
recognition = (
match.group("text")
if (
match := re.match(
r"\s*(?P<text>.*)", response["choices"][0]["message"]["content"]
)
)
else None
)
return recognition
# -------------------------
# 主程序
# -------------------------
if __name__ == "__main__":
# 初始化HTTP客户端
http_client = HTTPClient(timeout=300, cache_enabled=True)
# 初始化认证器
authenticator = Authenticator()
# 初始化工作目录地址对象
directory_path = Path("directory")
# 若不存在则创建
directory_path.mkdir(parents=True, exist_ok=True)
# 初始化影像件识别规则引擎
recognize_decision = decision(Path("rules/影像件是否需要数据提取.json"))
# 初始化JINJA2环境
environment = Environment(loader=FileSystemLoader("."))
# 添加DATE过滤器
environment.filters["date"] = lambda date: (
date.strftime("%Y-%m-%d") if date else "长期"
)
# 加载赔案档案模版
template = environment.get_template("template.html")
# 遍历工作目录中赔案目录
for case_path in [
case_path for case_path in directory_path.iterdir() if case_path.is_dir()
]:
# 初始化赔案档案
dossier = {
"影像件层": [],
"赔案层": {
"赔案编号": (
case_number := case_path.stem
), # 假设赔案已签收,系统已生成赔案编号并根据签收时填报保单编号可知保险总公司、保险分公司和投保公司
"签收保单编号": "3291120243205000000002",
"保险总公司": "中银保险有限公司",
"保险分公司": None, # 实验阶段保险分公司、投保公司和申请时间为NONE
"投保公司": None,
"申请时间": None,
"申请人信息": {},
"受益人信息": {},
"被保人信息": {}, # 实验阶段被保人信息为空字典
"其它信息": {},
},
"发票层": [],
"小项层": [],
}
# 遍历赔案目录中影像件地址
for image_index, image_path in enumerate(
sorted(case_path.glob(pattern="*"), key=lambda x: x.stat().st_ctime), 1
):
dossier["影像件层"].append(
{
"影像件序号": (image_index := f"{image_index:02d}"),
"影像件名称": (image_name := image_path.name),
}
)
# 若影像件格式非JPG/JPEG/PNG则跳过该影像件
if (image_format := image_path.suffix.lower().lstrip(".")) not in [
"jpg",
"jpeg",
"png",
]:
dossier["影像件层"][-1]["已分类"] = "否,不支持的影像件"
continue
# 影像件读取
image = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE)
# 若发生异常则跳过该影像件
if image is None:
dossier["影像件层"][-1]["已分类"] = "否,读取异常"
continue
# 影像件压缩输出BASE64编码
image_guid, image_base64 = images_compression()
# 若发生异常则跳过该影像件
if image_guid is None or image_base64 is None:
dossier["影像件层"][-1]["已分类"] = "否,压缩异常"
continue
# 通过请求深圳快瞳影像件分类接口获取影像件类型和方向
image_type, image_orientation = images_classification()
# 若发生异常则跳过该影像件
if image_type is None or image_orientation is None:
dossier["影像件层"][-1]["已分类"] = "否,影像件分类异常"
continue
# 若影像件方向非0度则影像件旋正并在此压缩
if image_orientation != "0度":
# 影像件旋正
image = cv2.rotate(
image,
{
"顺时针90度": cv2.ROTATE_90_COUNTERCLOCKWISE, # 逆时针旋转90度
"180度": cv2.ROTATE_180, # 旋转180度
"逆时针90度": cv2.ROTATE_90_CLOCKWISE, # 顺时针旋转90度
}[image_orientation],
)
# 影像件再次压缩
image_guid, image_base64 = images_compression()
if image_guid is None or image_base64 is None:
dossier["影像件层"][-1]["已分类"] = "否,压缩异常"
continue
dossier["影像件层"][-1].update({"已分类": "", "影像件类型": image_type})
# 根据保险总公司和影像件类型评估影像件是否需要数据提取,若无需数据提取则跳过该影像件(例如,中银保险有限公司理赔申请书包含户名、开户银行和银行账号,无需识别银行卡)
if not recognize_decision.evaluate(
{
"insurer": (insurer := dossier["赔案层"]["保险总公司"]),
"image_type": image_type,
}
)["result"]["extract"]:
dossier["影像件层"][-1]["已识别"] = "否,无需识别"
continue
# 根据影像件类型匹配影像件数据提取
# noinspection PyUnreachableCode
match image_type:
case "居民身份证(正背面)" | "居民身份证(正面)" | "居民身份证(背面)":
extraction = idcard_extraction()
# 若发生异常则跳过该影像件
if extraction is None:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
if image_type in ["居民身份证(正背面)", "居民身份证(正面)"]:
dossier["赔案层"]["申请人信息"].update(
{
"证件有效期起": datetime.strptime(
extraction["有效期起"], "%Y-%m-%d"
),
"证件有效期止": (
date
if (date := extraction["有效期止"]) == "长期"
else datetime.strptime(date, "%Y-%m-%d")
), # 若证件有效期止为NONE默认为“长期”,
}
) # 原则上由影像件数据提取环节负责数据标准化,赔案档案数据填充环节负责数据机构化
if image_type in ["居民身份证(正背面)", "居民身份证(背面)"]:
dossier["赔案层"]["申请人信息"].update(
{
"姓名": extraction["姓名"],
"证件类型": "居民身份证",
"证件号码": extraction["公民身份号码"],
"性别": extraction["性别"],
"出生": datetime.strptime(
extraction["出生"], "%Y-%m-%d"
), # 默认日期格式为%Y-%m-%d
"": (
address := parse_location(extraction["住址"])
).get("province"),
"": address.get("city"),
"": address.get("county"),
"详细地址": address.get("detail"),
}
)
case "银行卡":
extraction = bankcard_extraction()
# 若发生异常则跳过该影像件
if extraction is None:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
dossier["赔案层"]["受益人信息"].update(
{
"开户银行": extraction["银行名称"],
"银行账号": extraction["卡号"],
}
)
case "增值税发票" | "门诊收费票据" | "住院收费票据":
extraction = invoice_extraction()
# 若发生异常则跳过该影像件
if extraction is None:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
dossier["发票层"].append(
{
"关联影像件序号": image_index,
"票据类型": extraction["票据类型"],
"票据号码": extraction["票据号码"],
"票据代码": (
extraction["票据代码"]
if extraction["票据代码"]
else "--"
), # 数电票无票据代码,校验码同票据号码
"开票日期": datetime.strptime(
extraction["开票日期"], "%Y-%m-%d"
),
"校验码后六位": (
check_code[-6:]
if (check_code := extraction["校验码"])
else "--"
),
"医药机构": extraction["收款方"],
"就诊人": (
match.group("name")
if (
match := re.search(
r"^(?P<name>[^(]+)", extraction["付款方"]
)
)
else extraction["付款方"]
),
"票据金额": Decimal(extraction["票据金额"]).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
), # 默认金额转为小数,保留两位小数
"查验状态": extraction["查验状态"],
"项目": (
pandas.DataFrame(extraction["项目"])
.assign(
数量=lambda dataframe: dataframe["数量"].apply(
lambda row: (
Decimal(row).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if row
else Decimal("0.00")
)
),
金额=lambda dataframe: dataframe["金额"].apply(
lambda row: (
Decimal(row).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if row
else Decimal("0.00")
)
),
)
.groupby(by="名称", as_index=False)
.agg(数量=("数量", "sum"), 金额=("金额", "sum"))
.assign(
大项=lambda dataframe: dataframe["名称"].apply(
lambda row: (
match.group("category")
if (
match := re.match(
r"^\*(?P<category>.+?)\*.*$",
row,
)
)
else row
)
),
小项=lambda dataframe: dataframe["名称"].apply(
lambda row: (
re.sub(
r"[^\u4e00-\u9fa5a-zA-Z0-9./%*]",
"",
match.group("name"),
)
if (
match := re.match(
r"^\*.+?\*(?:\[[^]]+])?(?P<name>[^\s(]+)(?:\([^\s(]+\))?(?:.*?)?$",
row,
)
)
else ""
)
),
)
.loc[
lambda dataframe: dataframe["金额"] != 0,
["名称", "大项", "小项", "数量", "金额"],
]
.to_dict(orient="records")
),
"就诊类型": (
"药店购药"
if "增值税发票" in image_type
else (
"门诊就诊"
if "门诊收费票据" in image_type
else "住院治疗"
)
),
}
)
case "理赔申请书":
# 根据保险总公司匹配理赔申请书
# noinspection PyUnreachableCode
match insurer:
case "中银保险有限公司":
extraction = common_extraction(
application_form="中行员工福利保障计划索赔申请书"
)
# 若识别异常则跳过该影像件
if extraction is None:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
dossier["赔案层"]["申请人信息"].update(
{
"与被保险人关系": "本人", # 中银保险有限公司:默认申请人与被保险人关系为本人
"年龄": (
Decimal(age).quantize(
Decimal("0"),
rounding=ROUND_HALF_UP,
)
if (
age := extraction.get("基础信息", {}).get(
"年龄", "--"
)
).isdigit()
else age
), # 若年龄仅数字则转为小数、取整,否则默认为“--”
"手机号": (
phone_number
if re.match(
r"^1[3-9]\d{9}$",
phone_number := extraction.get(
"基础信息", {}
).get("手机", "--"),
)
else phone_number
), # 若手机未正则匹配手机号格式则为“--”
}
)
dossier["赔案层"]["受益人信息"].update(
{
"与被保险人关系": "本人", # 中银保险有限公司:默认受益人与被保人关系为本人
"户名": (
account_name
if (
account_name := extraction.get(
"基础信息", {}
).get("户名")
)
else "--"
), # 若户名为NONE则为“--”
"开户银行": (
account_name
if (
account_name := extraction.get(
"基础信息", {}
).get("开户银行")
)
else "--"
), # 若开户银行为NONE则为“--”
"银行账号": (
account_name
if (
account_name := extraction.get(
"基础信息", {}
).get("账号")
)
is not None
else "--"
), # 若银行账号为NONE则为“--”
}
)
dossier["赔案层"]["其它信息"]["自述症状"] = (
("".join(diagnoses))
if (
diagnoses := sorted(
set(
"".join(
[
diagnosis
for invoice in extraction.get(
"票据表格", []
)
if (
diagnosis := invoice.get("诊断")
)
]
).split("")
)
)
)
else "--"
)
case _:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
case _:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
dossier["影像件层"][-1].update(
{
"已识别": "",
"识别结果": extraction,
}
)
# 发票层根据开票日期顺序排序
dossier["发票层"] = sorted(
dossier["发票层"], key=lambda x: (x["开票日期"], x["票据号码"])
)
# 构建小项层
# noinspection PyTypeChecker
dossier["小项层"] = (
pandas.DataFrame(
[
{
"小项": item["小项"],
"数量": item["数量"],
"金额": item["金额"],
}
for invoice in dossier["发票层"]
for item in invoice["项目"]
]
)
.groupby(by="小项", as_index=False)
.agg(数量=("数量", "sum"), 金额=("金额", "sum"))
.to_dict(orient="records")
)
for invoice in dossier["发票层"]:
# noinspection PyTypeChecker
invoice["推定疾病"] = disease_diagnosis(
items="".join(sorted(set([item["小项"] for item in invoice["项目"]])))
)
print(dossier)
exit()
with open(f"dossiers/{case_number}.html", "w", encoding="utf-8") as file:
file.write(
template.render(
{
"dossier": dossier,
}
)
)