日常更新

from NUC
This commit is contained in:
liubiren 2025-12-29 20:41:38 +08:00
parent c9a025d2f6
commit 2ba89f571e
2 changed files with 1079 additions and 1110 deletions

View File

@ -24,15 +24,6 @@ authenticator = Authenticator()
http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存 http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存
# noinspection PyShadowingNames
def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, str]]:
"""
分类影像件并旋正
:param image_index: 影像件编号
:param image_path: 影像件路径path对象
:return:
"""
# noinspection PyShadowingNames # noinspection PyShadowingNames
def image_read( def image_read(
image_path: Path, image_path: Path,
@ -52,6 +43,7 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st
except Exception as exception: except Exception as exception:
raise RuntimeError(f"打开并读取影像件发生异常:{str(exception)}") raise RuntimeError(f"打开并读取影像件发生异常:{str(exception)}")
# noinspection PyShadowingNames # noinspection PyShadowingNames
def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str: def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str:
""" """
@ -68,6 +60,7 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st
image_guid = md5(image_ndarray_encoded.tobytes()).hexdigest().upper() image_guid = md5(image_ndarray_encoded.tobytes()).hexdigest().upper()
return image_guid return image_guid
# noinspection PyShadowingNames # noinspection PyShadowingNames
def image_compress( def image_compress(
image_format: str, image_format: str,
@ -101,9 +94,7 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st
break break
# 影像件BASE64编码 # 影像件BASE64编码
image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode( image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode("utf-8")
"utf-8"
)
if len(image_base64) <= image_size_specified: if len(image_base64) <= image_size_specified:
return image_base64 return image_base64
@ -121,6 +112,124 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st
return None return None
# noinspection PyShadowingNames
def calculate_age(report_time: datetime, birth_date: datetime) -> int:
"""
根据报案时间计算周岁
:param report_time: 报案时间
:param birth_date: 出生日期
:return 周岁
"""
age = report_time.year - birth_date.year
return (
age - 1
if (report_time.month, report_time.day)
< (
birth_date.month,
birth_date.day,
)
else age
) # 若报案时间的月日小于生成日期的月日则前推一年
# TODO: 后续添加居民身份证(国徽面)和居民身份证(头像面)合并
# noinspection PyShadowingNames
def identity_card_recognize(image, insurer_company) -> None:
"""
识别居民身份证并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# 请求深圳快瞳居民身份证识别接口
response = http_client.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"),
headers={
"X-RequestId-Header": image["image_guid"]
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符
}, # 深圳快瞳支持同时识别居民国徽面和头像面
guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
)
# TODO: 若请求深圳快瞳居民身份证识别接口发生异常则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(头像面)",
]:
# noinspection PyTypeChecker
dossier["insured_person_layer"].update(
{
"insured_person": (
insured_person := response["data"]["name"]
), # 被保险人
"identity_type": (identity_type := "居民身份证"), # 证件类型
"identity_number": (
indentity_number := response["data"]["idNo"]
), # 证件号码
"gender": response["data"]["sex"], # 性别
"birth_date": (
birth_date := datetime.strptime(
response["data"]["birthday"], "%Y-%m-%d"
)
), # 出生日期转为日期时间datetime对象格式默认为%Y-%m-%d
"age": calculate_age(
dossier["report_layer"]["report_time"], birth_date
), # 年龄
"province": (
residential_address := parse_location(response["data"]["address"])
).get(
"province"
), # 就住址解析为所在省、市、区和详细地址
"city": residential_address.get("city"),
"district": residential_address.get("county"),
"detailed_address": residential_address.get("detail"),
}
)
# 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询个单
dossier["insured_persons_layer"] = master_data.query_liabilities(
insurer_company,
insured_person,
identity_type,
indentity_number,
dossier["report_layer"]["report_time"].strftime("%Y-%m-%d"),
)
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(国徽面)",
]:
# noinspection PyTypeChecker
dossier["insured_person_layer"].update(
{
"commencement_date": datetime.strptime(
(period := response["data"]["validDate"].split("-"))[0],
"%Y.%m.%d",
), # 就有效期限解析为有效起期和有效止期。其中若有效止期为长期则默认为9999-12-31
"termination_date": (
datetime(9999, 12, 31)
if period[1] == "长期"
else datetime.strptime(period[1], "%Y.%m.%d")
),
}
)
# noinspection PyShadowingNames
def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, str]]:
"""
分类影像件并旋正
:param image_index: 影像件编号
:param image_path: 影像件路径path对象
:return:
"""
# 打开并读取影像件 # 打开并读取影像件
image_ndarray = image_read(image_path) image_ndarray = image_read(image_path)
image_index = f"{image_index:02d}" image_index = f"{image_index:02d}"
@ -221,139 +330,6 @@ def image_classify(image_index: int, image_path: Path) -> Optional[Tuple[str, st
) )
# noinspection PyShadowingNames
def image_recognize(
image,
insurer_company,
) -> None:
"""
识别影像件并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# TODO: 后续添加居民身份证(国徽面)和居民身份证(头像面)合并
# noinspection PyShadowingNames
def identity_card_recognize(image, insurer_company) -> None:
"""
识别居民身份证并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# noinspection PyShadowingNames
def calculate_age(report_time: datetime, birth_date: datetime) -> int:
"""
根据报案时间计算周岁
:param report_time: 报案时间
:param birth_date: 出生日期
:return 周岁
"""
age = report_time.year - birth_date.year
return (
age - 1
if (report_time.month, report_time.day)
< (
birth_date.month,
birth_date.day,
)
else age
) # 若报案时间的月日小于生成日期的月日则前推一年
# 请求深圳快瞳居民身份证识别接口
response = http_client.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"),
headers={
"X-RequestId-Header": image["image_guid"]
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(
servicer="szkt"
), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符
}, # 深圳快瞳支持同时识别居民国徽面和头像面
guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
)
# TODO: 若请求深圳快瞳居民身份证识别接口发生异常则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(头像面)",
]:
# noinspection PyTypeChecker
dossier["insured_person_layer"].update(
{
"insured_person": (
insured_person := response["data"]["name"]
), # 被保险人
"identity_type": (identity_type := "居民身份证"), # 证件类型
"identity_number": (
indentity_number := response["data"]["idNo"]
), # 证件号码
"gender": response["data"]["sex"], # 性别
"birth_date": (
birth_date := datetime.strptime(
response["data"]["birthday"], "%Y-%m-%d"
)
), # 出生日期转为日期时间datetime对象格式默认为%Y-%m-%d
"age": calculate_age(
dossier["report_layer"]["report_time"], birth_date
), # 年龄
"province": (
residential_address := parse_location(
response["data"]["address"]
)
).get(
"province"
), # 就住址解析为所在省、市、区和详细地址
"city": residential_address.get("city"),
"district": residential_address.get("county"),
"detailed_address": residential_address.get("detail"),
}
)
# 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询个单
dossier["insured_persons_layer"] = master_data.query_liabilities(
insurer_company,
insured_person,
identity_type,
indentity_number,
dossier["report_layer"]["report_time"].strftime("%Y-%m-%d"),
)
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(国徽面)",
]:
# noinspection PyTypeChecker
dossier["insured_person_layer"].update(
{
"commencement_date": datetime.strptime(
(period := response["data"]["validDate"].split("-"))[0],
"%Y.%m.%d",
), # 就有效期限解析为有效起期和有效止期。其中若有效止期为长期则默认为9999-12-31
"termination_date": (
datetime(9999, 12, 31)
if period[1] == "长期"
else datetime.strptime(period[1], "%Y.%m.%d")
),
}
)
# noinspection PyShadowingNames
def application_recognize(image, insurer_company) -> None:
"""
识别理赔申请书并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# noinspection PyShadowingNames # noinspection PyShadowingNames
def mlm_recognize(image, schema) -> Optional[Dict[str, Any]]: def mlm_recognize(image, schema) -> Optional[Dict[str, Any]]:
""" """
@ -422,6 +398,7 @@ def image_recognize(
except: except:
return None return None
# noinspection PyShadowingNames # noinspection PyShadowingNames
def boc_application_recognize(image: str) -> None: def boc_application_recognize(image: str) -> None:
""" """
@ -541,20 +518,22 @@ def image_recognize(
} }
) )
# noinspection PyShadowingNames
def application_recognize(image, insurer_company) -> None:
"""
识别理赔申请书并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# 根据保险分公司匹配处理方法 # 根据保险分公司匹配处理方法
match insurer_company: match insurer_company:
# 中银保险有限公司 # 中银保险有限公司
case _ if insurer_company.startswith("中银保险有限公司"): case _ if insurer_company.startswith("中银保险有限公司"):
boc_application_recognize(image) boc_application_recognize(image)
# noinspection PyShadowingNames
def receipt_recognize(image, insurer_company) -> None:
"""
识别票据并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# noinspection PyShadowingNames # noinspection PyShadowingNames
def fuzzy_match(contents: list, key: str) -> Optional[str]: def fuzzy_match(contents: list, key: str) -> Optional[str]:
@ -598,9 +577,7 @@ def image_recognize(
for content in contents: for content in contents:
if content["name"] == key: if content["name"] == key:
return ( return (
content["word"]["value"] content["word"]["value"] if content["word"]["value"] else None
if content["word"]["value"]
else None
) )
candidates = [] candidates = []
@ -620,6 +597,7 @@ def image_recognize(
else None else None
) # 返回>=80且最大的相似度的值 ) # 返回>=80且最大的相似度的值
def parse_item(item: str) -> Tuple[str, Optional[str]]: def parse_item(item: str) -> Tuple[str, Optional[str]]:
""" """
根据明细项解析明细项类别和具体内容并根据具体内容查询药品/医疗服务 根据明细项解析明细项类别和具体内容并根据具体内容查询药品/医疗服务
@ -637,6 +615,15 @@ def image_recognize(
else: else:
return item, None return item, None
# noinspection PyShadowingNames
def receipt_recognize(image, insurer_company) -> None:
"""
识别票据并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# 初始化票据数据 # 初始化票据数据
receipt = {"image_index": image["image_index"]} receipt = {"image_index": image["image_index"]}
# 请求深圳快瞳票据查验接口(兼容增值税发票、医疗门诊/住院收费票据) # 请求深圳快瞳票据查验接口(兼容增值税发票、医疗门诊/住院收费票据)
@ -646,9 +633,7 @@ def image_recognize(
"X-RequestId-Header": image["image_guid"] "X-RequestId-Header": image["image_guid"]
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查 }, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={ data={
"token": authenticator.get_token( "token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌
servicer="szkt"
), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符 "imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符
}, },
guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(), guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
@ -676,9 +661,7 @@ def image_recognize(
"date": datetime.strptime( "date": datetime.strptime(
response["data"]["details"]["date"], "%Y年%m月%d" response["data"]["details"]["date"], "%Y年%m月%d"
), # 转为日期时间datetime对象 ), # 转为日期时间datetime对象
"verification_code": response["data"]["details"][ "verification_code": response["data"]["details"]["check_code"],
"check_code"
],
"amount": Decimal( "amount": Decimal(
response["data"]["details"]["total"] response["data"]["details"]["total"]
).quantize( ).quantize(
@ -731,9 +714,7 @@ def image_recognize(
), # 转为日期时间datetime对象 ), # 转为日期时间datetime对象
"admission_date": ( "admission_date": (
datetime.strptime( datetime.strptime(
response["data"]["hospitalizationDate"].split("-")[ response["data"]["hospitalizationDate"].split("-")[0],
0
],
"%Y%m%d", "%Y%m%d",
) )
if response["data"]["hospitalizationDate"] if response["data"]["hospitalizationDate"]
@ -741,9 +722,7 @@ def image_recognize(
), # 深圳快瞳票据查验接口中住院日期解析为入院日期和出院日期 ), # 深圳快瞳票据查验接口中住院日期解析为入院日期和出院日期
"discharge_date": ( "discharge_date": (
datetime.strptime( datetime.strptime(
response["data"]["hospitalizationDate"].split("-")[ response["data"]["hospitalizationDate"].split("-")[1],
1
],
"%Y%m%d", "%Y%m%d",
) )
if response["data"]["hospitalizationDate"] if response["data"]["hospitalizationDate"]
@ -829,9 +808,7 @@ def image_recognize(
.upper(), .upper(),
) )
# TODO: 若请求深圳快瞳增值税发票识别接口发生异常则流转至人工处理 # TODO: 若请求深圳快瞳增值税发票识别接口发生异常则流转至人工处理
if not ( if not (response.get("status") == 200 and response.get("code") == 0):
response.get("status") == 200 and response.get("code") == 0
):
raise raise
match fuzzy_match(response["data"], "发票类型"): match fuzzy_match(response["data"], "发票类型"):
@ -849,16 +826,14 @@ def image_recognize(
response["data"], "校验码" response["data"], "校验码"
), ),
"amount": Decimal( "amount": Decimal(
fuzzy_match( fuzzy_match(response["data"], "小写金额").replace(
response["data"], "小写金额" "¥", ""
).replace("¥", "") )
).quantize( ).quantize(
Decimal("0.00"), Decimal("0.00"),
rounding=ROUND_HALF_UP, rounding=ROUND_HALF_UP,
), ),
"payer": fuzzy_match( "payer": fuzzy_match(response["data"], "购买方名称"),
response["data"], "购买方名称"
),
"institution": fuzzy_match( "institution": fuzzy_match(
response["data"], "销售方名称" response["data"], "销售方名称"
), ),
@ -935,9 +910,7 @@ def image_recognize(
Decimal("0.00"), Decimal("0.00"),
rounding=ROUND_HALF_UP, rounding=ROUND_HALF_UP,
), ),
"payer": fuzzy_match( "payer": fuzzy_match(response["data"], "购买方名称"),
response["data"], "购买方名称"
),
"institution": fuzzy_match( "institution": fuzzy_match(
response["data"], "销售方名称" response["data"], "销售方名称"
), ),
@ -1001,9 +974,7 @@ def image_recognize(
.upper(), .upper(),
) )
# TODO: 若请求深圳快瞳医疗收费票据识别接口发生异常则流转至人工处理 # TODO: 若请求深圳快瞳医疗收费票据识别接口发生异常则流转至人工处理
if not ( if not (response.get("status") == 200 and response.get("code") == 0):
response.get("status") == 200 and response.get("code") == 0
):
raise raise
# noinspection PyTypeChecker # noinspection PyTypeChecker
@ -1028,16 +999,12 @@ def image_recognize(
"%Y-%m-%d", "%Y-%m-%d",
), ),
"admission_date": ( "admission_date": (
datetime.strptime( datetime.strptime(receipt["starttime"]["value"], "%Y-%m-%d")
receipt["starttime"]["value"], "%Y-%m-%d"
)
if isinstance(receipt["starttime"], dict) if isinstance(receipt["starttime"], dict)
else None else None
), ),
"discharge_date": ( "discharge_date": (
datetime.strptime( datetime.strptime(receipt["endtime"]["value"], "%Y-%m-%d")
receipt["endtime"]["value"], "%Y-%m-%d"
)
if isinstance(receipt["endtime"], dict) if isinstance(receipt["endtime"], dict)
else None else None
), ),
@ -1045,9 +1012,7 @@ def image_recognize(
receipt["global_detail"]["region_specific"], receipt["global_detail"]["region_specific"],
"校验码", "校验码",
), ),
"amount": Decimal( "amount": Decimal(receipt["total_amount"]["value"]).quantize(
receipt["total_amount"]["value"]
).quantize(
Decimal("0.00"), Decimal("0.00"),
rounding=ROUND_HALF_UP, rounding=ROUND_HALF_UP,
), ),
@ -1207,6 +1172,7 @@ def image_recognize(
dossier["receipts_layer"].append(receipt) dossier["receipts_layer"].append(receipt)
# noinspection PyShadowingNames # noinspection PyShadowingNames
def bank_card_recognize(image) -> None: def bank_card_recognize(image) -> None:
""" """
@ -1221,9 +1187,7 @@ def image_recognize(
"X-RequestId-Header": image["image_guid"] "X-RequestId-Header": image["image_guid"]
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查 }, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={ data={
"token": authenticator.get_token( "token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌
servicer="szkt"
), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符 "imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符
}, },
guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(), guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
@ -1246,6 +1210,18 @@ def image_recognize(
} }
) )
# noinspection PyShadowingNames
def image_recognize(
image,
insurer_company,
) -> None:
"""
识别影像件并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# 基于影像件识别使能规则评估影像件是否识别 # 基于影像件识别使能规则评估影像件是否识别
if not rule_engine.evaluate( if not rule_engine.evaluate(
decision="影像件识别使能", decision="影像件识别使能",

View File

@ -15,18 +15,11 @@ from jinja2 import Environment, FileSystemLoader
from common import dossier, rule_engine from common import dossier, rule_engine
from image import image_classify from image import image_classify
from image import image_recognize from image import image_recognize
from utils.client import Authenticator, HTTPClient
# ------------------------- # -------------------------
# 主逻辑 # 主逻辑
# ------------------------- # -------------------------
if __name__ == "__main__": if __name__ == "__main__":
# 实例化认证器
authenticator = Authenticator()
# 实例化请求客户端
http_client = HTTPClient(timeout=300, cache_enabled=True) # 使用缓存
# 初始化工作目录路径 # 初始化工作目录路径
workplace_path = Path("directory") workplace_path = Path("directory")
workplace_path.mkdir(parents=True, exist_ok=True) # 若工作目录不存在则创建 workplace_path.mkdir(parents=True, exist_ok=True) # 若工作目录不存在则创建