This commit is contained in:
liubiren 2026-01-08 22:15:07 +08:00
parent 6efdd9e1bf
commit 0110be4b16
21 changed files with 635 additions and 591 deletions

View File

@ -1,5 +1,4 @@
[settings]
order_by_type = true # 按照标准库、第三方库和本地模块分组
alphabetical = true # 同组按照字母序排序
order_by_type = true
multi_line_output = 3
indent = " "

Binary file not shown.

View File

@ -1,23 +1,15 @@
# -*- coding: utf-8 -*-
'''
自定义计算相关模块
'''
#加载模块
from distance import levenshtein
import numpy
import pandas
from sklearn.cluster import KMeans
"""
算法模块
"""
import warnings
import numpy
import pandas
from distance import levenshtein
from sklearn.cluster import KMeans
warnings.simplefilter('ignore')
'''

View File

@ -6,10 +6,10 @@
import hashlib
import hmac
import json
from pathlib import Path
import sys
import threading
import time
from pathlib import Path
from typing import Optional, Tuple
sys.path.append(Path(__file__).parent.as_posix())
@ -19,7 +19,7 @@ from request import Request
class Authenticator:
"""
认证器支持
1
get_token获取访问令牌
"""
def __init__(self):
@ -53,12 +53,11 @@ class Authenticator:
certifications = json.load(file)
# 获取指定服务商的访问凭证
certification = certifications.get(servicer)
# 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳
if certification:
# 访问令牌
token = certification["token"]
# 失效时间戳
expired_timestamp = certification["expired_timestamp"]
token = certification["token"] # 访问令牌
expired_timestamp = certification[
"expired_timestamp"
] # 失效时间戳
except json.decoder.JSONDecodeError:
with open(self.certifications_path, "w", encoding="utf-8") as file:

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# 导入模块
"""
飞书客户端模块
"""
import re
import time
@ -10,6 +11,8 @@ from email.utils import parsedate_to_datetime
from imaplib import IMAP4_SSL
import pandas
from request import Request
from authenticator import Authenticator
class Feishu:
@ -17,7 +20,7 @@ class Feishu:
def __init__(self):
self.authenticator = Authenticator()
self.http_client = HTTPClient()
self.http_client = Request()
def _headers(self):
"""请求头"""

View File

@ -1,13 +1,8 @@
# -*- coding: utf-8 -*-
"""
基于LOGGING封装日志记录器
日志模块
"""
# 加载模块
import logging
from logging.handlers import RotatingFileHandler

View File

@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
"""
MySQL客户端模块
"""
# 导入模块
from urllib.parse import quote_plus
import pandas

View File

@ -1,12 +1,12 @@
# -*- coding: utf-8 -*-
"""
请求客户端
请求客户端模块
"""
import json
from pathlib import Path
import sys
import time
from pathlib import Path
from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union
from xml.etree import ElementTree

View File

@ -1,11 +1,11 @@
# -*- coding: utf-8 -*-
"""
请求限速器
请求限速器模块
"""
from functools import wraps
import threading
import time
from functools import wraps
from typing import Callable, Tuple

View File

@ -10,6 +10,7 @@ from typing import Any, Dict, Union
from zen import ZenDecision, ZenEngine
class RulesEngine:
"""
规则引擎支持
@ -45,7 +46,7 @@ class RulesEngine:
def evaluate(self, decision: str, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""
调用并返回评估结果
调用决策并返回评估结果
:param decision: 决策名称
:param inputs: 待评估对象
:return: 评估结果
@ -70,6 +71,8 @@ class RulesEngine:
case list():
return [self._formatter(i) for i in inputs]
case dict():
return {key: self._formatter(value) for key, value in inputs.items()} # 递归格式化
return {
key: self._formatter(value) for key, value in inputs.items()
} # 递归格式化
case _:
return inputs

View File

@ -17,7 +17,7 @@ class SQLite:
execute根据SQL语句执行操作
"""
def __init__(self, database: Union[str, Path]):
def __init__(self, database: Path):
"""
初始化
:param database: 数据库地址
@ -131,7 +131,7 @@ class SQLite:
try:
# 为当前线程创建数据库连接
self.threads.connection = sqlite3.connect(
database=self.database,
database=self.database.as_posix(),
check_same_thread=True,
timeout=30, # 数据库锁超时时间单位默认为30秒避免并发锁死
)

View File

@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-
"""
draft模块
生成草稿模块
"""
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import pyJianYingDraft
from edgetts import EdgeTTS

View File

@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
"""
EdgeTTS模块
合成语音模块
"""
import asyncio
from hashlib import md5
from pathlib import Path
from typing import Tuple, Union
from hashlib import md5
import edge_tts
from mutagen.mp3 import MP3

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
export模块
导出草稿模块
"""
import random
@ -12,7 +12,6 @@ from typing import Any, Dict, Optional
import pyJianYingDraft
import win32con
import win32gui
from draft import JianYingDraft

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
程序
模块
"""
from export import JianYingExport

View File

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
"""通用模块"""
from typing import Any, Dict, List
from decimal import Decimal, ROUND_HALF_UP
import pandas
from common import masterdata, rules_engine
def case_adjust(dossier: Dict[str, Any]) -> None:
"""
理算赔案并整合至赔案档案
:param dossier: 赔案档案
:return:
"""
# 基于拒付决策规则评估
if not (result := rules_engine.evaluate(decision="拒付", inputs=dossier)):
raise RuntimeError("该保险分公司未配置拒付规则")
dossier["adjustment_layer"].update(
{
"conclusion": (conclusion := result["conclusion"]), # 理赔结论
"explanation": result["explanation"], # 结论说明
}
)
if conclusion == "拒付":
return
# 赔案理算记录
adjustments = (
pandas.DataFrame(data=dossier["receipts_layer"]).assing(
adjustments=lambda dataframe: dataframe.apply(
lambda row: receipt_adjust(
row=row, liabilities=dossier["liabilities_layer"]
),
axis="columns",
) # 票据理算
)
).explode("adjustments", ignore_index=True)
print(adjustments)
def receipt_adjust(
row: pandas.Series, liabilities: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
理算票据
:param row: 一张票据数据
:param liabilities: 理算责任
:return: 理算记录
"""
# 初始化票据理算记录
adjustments = []
# 初始化剩余个人自费金额
remaining_personal_self_payment = row["personal_self_payment"]
# 初始化剩余个人自付金额
remaining_non_medical_payment = row["non_medical_payment"]
# 初始化剩余合理金额
remaining_reasonable_amount = row["reasonable_amount"]
# 出险事故
accident = row["accident"]
# 出险人
accident_person = row["payer"]
# 出险日期
accident_date = row["date"]
# 查验状态
verification = row["verification"]
# 初始化理赔责任理算金额
adjustment_amount = Decimal("0.00")
# 初始化理赔责任理赔金额
claim_amount = Decimal("0.00")
# 遍历所有理赔责任,根据出险事故、出险人、出险日期和查验状态匹配责任
for liability in liabilities:
if (
accident == liability["accident"]
and accident_person == liability["insured_person"]
and liability["commencement_date"]
<= accident_date
<= liability["termination_date"]
and verification == "真票"
):
# 理赔责任理算金额
adjustment_amount = (
row["personal_self_payment"]
* liability["personal_self_ratio"] # 个人自费金额
+ row["non_medical_payment"]
* liability["non_medical_ratio"] # 个人自付金额
+ row["reasonable_amount"] * liability["reasonable_ratio"] # 合理金额
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
# 据变动保单唯一标识查询最新一条保额变动记录的变动后金额(理赔责任的理赔保单余额)
remaining_amount = masterdata.query_remaining_amount(
policy_guid=liability["policy_guid"],
)
# 理赔责任理赔金额
claim_amount = min(
remaining_amount,
adjustment_amount,
)
# 初始化票据理算记录
adjustment = {
"liability": liability["liability"], # 理赔责任名称
"type": row["就诊类型"],
"amount": row["合理金额"],
"payable": 0.0,
}
return adjustments

View File

@ -1,25 +1,16 @@
# -*- coding: utf-8 -*-
"""通用模块"""
from pathlib import Path
import sys
from pathlib import Path
from masterdata import MasterData
sys.path.append(Path(__file__).parent.parent.as_posix())
from utils.rules_engine import RulesEngine
# 初始化赔案档案保险公司将提供投保公司、保险分公司和报案时间等TPA作业系统签收后生成赔案号
dossier = {
"report_layer": {}, # 报案层
"images_layer": [], # 影像件层
"insured_person_layer": {}, # 出险人层
"insured_persons_layer": [], # 被保险人层
"receipts_layer": [], # 票据层
"adjustment_layer": {}, # 理算层
}
# 实例化主数据
masterdata = MasterData()
masterdata = MasterData(database=Path(__file__).parent / "database.db")
# 实例化规则引擎
rules_engine = RulesEngine(decisions_folder_path=Path("rules"))
rules_engine = RulesEngine(decisions_folder_path=Path(__file__).parent / "rules")

Binary file not shown.

View File

@ -3,28 +3,28 @@
影像件处理模块
"""
from base64 import b64encode
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from hashlib import md5
import json
from pathlib import Path
import re
import sys
from base64 import b64encode
from datetime import datetime
from decimal import ROUND_HALF_UP, Decimal
from hashlib import md5
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import cv2
from fuzzywuzzy import fuzz
from jionlp import parse_location
import numpy
import pandas
from common import masterdata, rules_engine
from fuzzywuzzy import fuzz
from jionlp import parse_location
from common import dossier, masterdata, rules_engine
sys.path.append(Path(__file__).parent.parent.as_posix())
from utils.authenticator import Authenticator
from utils.request import Request
sys.path.append(Path(__file__).parent.parent.as_posix())
# 实例化认证器
authenticator = Authenticator()
@ -32,213 +32,24 @@ authenticator = Authenticator()
request = Request(timeout=300, cache_enabled=True) # 使用缓存
def image_read(
image_path: Path,
) -> numpy.ndarray:
def image_classify(image_index: int, image_path: Path, dossier: Dict[str, Any]) -> None:
"""
打开并读取影像件
:param image_path: 影像件路径
:return: 影像件数据numpy.ndarray对象
"""
try:
# 打开并读取影像件(默认转为单通道灰度图)
image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE)
if image_ndarray is None:
raise RuntimeError(f"影像件数据为空")
return image_ndarray
except Exception as exception:
raise RuntimeError(f"打开并读取影像件发生异常:{str(exception)}") from exception
def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str:
"""
生成影像件唯一标识
:param image_format: 影像件格式
:param image_ndarray: 影像件数据
:return: 影像件唯一标识
"""
success, image_ndarray_encoded = cv2.imencode(image_format, image_ndarray)
if not success or image_ndarray_encoded is None:
raise RuntimeError("编码影像件发生异常")
# 转为字节流并生成影像件唯一标识
image_guid = md5(image_ndarray_encoded.tobytes()).hexdigest().upper()
return image_guid
def image_compress(
image_format: str,
image_ndarray: numpy.ndarray,
image_size_specified: float = 2.0,
) -> str:
"""
压缩影像件
:param image_format: 影像件格式
:param image_ndarray: 影像件数据
:param image_size_specified: 指定压缩影像件大小单位为兆字节MB
:return: 压缩后影像件BASE64编码
"""
# 转为字节
image_size_specified = image_size_specified * 1024 * 1024
# 通过调整影像件质量和尺寸达到压缩影像件目的(先调整影像件质量再调整影像件尺寸)
for quality in range(100, 50, -10):
image_ndarray_copy = image_ndarray.copy()
for _ in range(10):
success, image_ndarray_encoded = cv2.imencode(
image_format,
image_ndarray_copy,
params=(
[cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10]
if image_format == "png"
else [cv2.IMWRITE_JPEG_QUALITY, quality]
),
)
if not success or image_ndarray_encoded is None:
break
# 影像件BASE64编码
image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode("utf-8")
if len(image_base64) <= image_size_specified:
return image_base64
image_ndarray_copy = cv2.resize(
image_ndarray_copy,
(
int(image_ndarray_copy.shape[0] * 0.95),
int(image_ndarray_copy.shape[1] * 0.95),
),
interpolation=cv2.INTER_AREA,
)
# 若调整影像件尺寸后宽/高小于350像素则终止循环
if min(image_ndarray_copy.shape[:2]) < 350:
break
raise RuntimeError("压缩影像件发生异常")
def calculate_age(report_time: datetime, birth_date: datetime) -> int:
"""
根据报案时间计算周岁
:param report_time: 报案时间
:param birth_date: 出生日期
:return 周岁
"""
age = report_time.year - birth_date.year
return (
age - 1
if (report_time.month, report_time.day)
< (
birth_date.month,
birth_date.day,
)
else age
) # 若报案时间的月日小于生成日期的月日则前推一年
# TODO: 后续添加居民身份证(国徽面)和居民身份证(头像面)合并
def identity_card_recognize(image, insurer_company) -> None:
"""
识别居民身份证并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# 请求深圳快瞳居民身份证识别接口
response = request.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"),
headers={
"X-RequestId-Header": image["image_guid"]
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符
}, # 深圳快瞳支持同时识别居民国徽面和头像面
guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
)
# TODO: 若请求深圳快瞳居民身份证识别接口发生异常则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise RuntimeError("请求深圳快瞳居民身份证识别接口发生异常")
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(头像面)",
]:
dossier["insured_person_layer"].update(
{
"insured_person": (
insured_person := response["data"]["name"]
), # 被保险人
"identity_type": (identity_type := "居民身份证"), # 证件类型
"identity_number": (
indentity_number := response["data"]["idNo"]
), # 证件号码
"gender": response["data"]["sex"], # 性别
"birth_date": (
birth_date := datetime.strptime(
response["data"]["birthday"], "%Y-%m-%d"
)
), # 出生日期转为日期时间datetime对象格式默认为%Y-%m-%d
"age": calculate_age(
dossier["report_layer"]["report_time"], birth_date
), # 年龄
"province": (
residential_address := parse_location(response["data"]["address"])
).get(
"province"
), # 就住址解析为所在省、市、区和详细地址
"city": residential_address.get("city"),
"district": residential_address.get("county"),
"detailed_address": residential_address.get("detail"),
}
)
# 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询个单
dossier["insured_persons_layer"] = masterdata.query_liabilities(
insurer_company,
insured_person,
identity_type,
indentity_number,
dossier["report_layer"]["report_time"].strftime("%Y-%m-%d"),
)
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(国徽面)",
]:
dossier["insured_person_layer"].update(
{
"commencement_date": datetime.strptime(
(period := response["data"]["validDate"].split("-"))[0],
"%Y.%m.%d",
), # 就有效期限解析为有效起期和有效止期。其中若有效止期为长期则默认为9999-12-31
"termination_date": (
datetime(9999, 12, 31)
if period[1] == "长期"
else datetime.strptime(period[1], "%Y.%m.%d")
),
}
)
def image_classify(image_index: int, image_path: Path) -> None:
"""
分类影像件并旋正
分类影像件旋正并整合至赔案档案
:param image_index: 影像件编号
:param image_path: 影像件路径path对象
:param image_path: 影像件路径
:param dossier: 赔案档案
:return:
"""
# 打开并读取影像件
image_ndarray = image_read(image_path)
# 读取影像件
image_ndarray = image_read(image_path=image_path)
image_format = image_path.suffix.lower() # 影像件格式
# 生成影像件唯一标识
image_guid = image_serialize(image_format, image_ndarray)
# 影像件序列化
image_guid = image_serialize(image_format=image_format, image_ndarray=image_ndarray)
# 压缩影像件
image_base64 = image_compress(
image_format, image_ndarray, image_size_specified=2
image_format=image_format, image_ndarray=image_ndarray, image_size_specified=2
) # 深圳快瞳要求影像件BASE64编码后大小小于等于2兆字节
# 请求深圳快瞳影像件分类接口
@ -249,15 +60,14 @@ def image_classify(image_index: int, image_path: Path) -> None:
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", # 影像件BASE64编码嵌入数据统一资源标识符
"imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", # 影像件格式和BASE64编码嵌入数据统一资源标识符
},
guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# TODO: 若响应非成功则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise RuntimeError("请求深圳快瞳影像件分类接口发生异常")
# 匹配影像件类型
# 根据响应匹配影像件类型
match (response["data"]["flag"], response["data"]["type"]):
case (14, _):
image_type = "居民户口簿"
@ -284,14 +94,13 @@ def image_classify(image_index: int, image_path: Path) -> None:
case _:
image_type = "其它"
# 匹配影像件方向
# 根据响应匹配影像件方向
image_orientation = {
"0": "0度",
"90": "顺时针90度",
"180": "180度",
"270": "逆时针90度",
}.get(response["data"]["angle"], "0度")
# 若影像件方向非0度则旋正
if image_orientation != "0度":
image_ndarray = cv2.rotate(
image_ndarray,
@ -306,20 +115,269 @@ def image_classify(image_index: int, image_path: Path) -> None:
image_format, image_ndarray, image_size_specified=2
)
# 将影像件添加至影像件层
dossier["images_layer"].append(
{
"image_index": f"{image_index:02d}", # 影像件编号
"image_path": image_path.as_posix(),
"image_name": image_path.stem,
"image_format": image_format,
"image_guid": image_guid,
"image_base64": image_base64,
"image_type": image_type,
"image_path": image_path.as_posix(), # 影像件路径
"image_name": image_path.stem, # 影像件名称
"image_format": image_format, # 影像件格式
"image_guid": image_guid, # 影像件唯一标识
"image_base64": image_base64, # 影像件BASE64编码
"image_type": image_type, # 影像件类型
}
)
def mlm_recognize(image, schema) -> Dict[str, Any]:
def image_read(
image_path: Path,
) -> numpy.ndarray:
"""
读取影像件
:param image_path: 影像件路径
:param dossier: 赔案档案
:return: 影像件图像数组
"""
try:
with open(image_path, "rb") as file:
image_bytes = file.read() # 读取影像件字节流
# 先将影像件字节流转为 numpy.ndarray 对象,再解码为单通道灰度图数组对象
image_ndarray = cv2.imdecode(
buf=numpy.frombuffer(image_bytes, numpy.uint8), flags=cv2.IMREAD_GRAYSCALE
)
if image_ndarray is None:
raise RuntimeError(f"影像件不存在")
return image_ndarray
except Exception as exception:
raise RuntimeError(f"读取影像件发生异常:{str(exception)}") from exception
def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str:
"""
影像件序列化
:param image_format: 影像件格式
:param image_ndarray: 影像件图像数组
:return: 影像件唯一标识
"""
# 将影像件图像数组编码为字节流
success, image_ndarray_encoded = cv2.imencode(ext=image_format, img=image_ndarray)
if not success or image_ndarray_encoded is None:
raise RuntimeError("影像件编码发生异常")
# 转为字节流并生成影像件唯一标识
image_guid = md5(string=image_ndarray_encoded.tobytes()).hexdigest().upper()
return image_guid
def image_compress(
image_format: str,
image_ndarray: numpy.ndarray,
image_size_specified: float = 2.0,
) -> str:
"""
压缩影像件
:param image_format: 影像件格式
:param image_ndarray: 影像件图像数组
:param image_size_specified: 指定压缩影像件大小单位为兆字节默认为 2
:return: 压缩后影像件BASE64编码
"""
# 通过调整影像件质量和尺寸达到压缩影像件目的(先调整影像件质量再调整影像件尺寸)
for quality in range(100, 50, -10):
image_ndarray_copy = image_ndarray.copy()
for _ in range(10):
# 调整影像件质量后将影像件图像数组编码为字节流
success, image_ndarray_encoded = cv2.imencode(
ext=image_format,
img=image_ndarray_copy,
params=(
[cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10]
if image_format == "png"
else [cv2.IMWRITE_JPEG_QUALITY, quality]
),
)
if not success or image_ndarray_encoded is None:
break
# 影像件BASE64编码
image_base64 = b64encode(s=image_ndarray_encoded.tobytes()).decode("utf-8")
if len(image_base64) <= image_size_specified * 1_048_576:
return image_base64
# 调整影像件尺寸
image_ndarray_copy = cv2.resize(
src=image_ndarray_copy,
dsize=(
int(image_ndarray_copy.shape[0] * 0.95),
int(image_ndarray_copy.shape[1] * 0.95),
),
interpolation=cv2.INTER_AREA,
)
if min(image_ndarray_copy.shape[:2]) < 350:
break
raise RuntimeError("压缩影像件发生异常")
def image_recognize(
image: Dict[str, Any],
insurer_company: str,
dossier: Dict[str, Any],
) -> None:
"""
识别影像件并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:param dossier: 赔案档案
:return:
"""
# 基于影像件识别使能决策规则评估
if not rules_engine.evaluate(
decision="影像件识别使能",
inputs={
"insurer_company": insurer_company,
"image_type": image["image_type"],
},
)["recognize_enabled"]:
return
# 根据影像件类型匹配影像件识别方法
match image["image_type"]:
case "居民户口簿":
raise RuntimeError("暂不支持居民户口簿")
case "居民身份证(国徽、头像面)" | "居民身份证(国徽面)" | "居民身份证(头像面)":
# 居民身份证识别并整合至赔案档案
identity_card_recognize(
image=image, insurer_company=insurer_company, dossier=dossier
)
case "中国港澳台地区及境外护照":
raise RuntimeError("暂不支持中国港澳台地区及境外护照")
case "理赔申请书":
application_recognize(
image=image, insurer_company=insurer_company, dossier=dossier
)
case "增值税发票" | "医疗门诊收费票据" | "医疗住院收费票据":
# 票据识别并整合至赔案档案
receipt_recognize(
image=image, insurer_company=insurer_company, dossier=dossier
)
case "银行卡":
# 银行卡识别并整合至赔案档案
bank_card_recognize(image=image, dossier=dossier)
def identity_card_recognize(
image: Dict[str, Any], insurer_company: str, dossier: Dict[str, Any]
) -> None:
"""
识别居民身份证并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:param dossier: 赔案档案
:return:
"""
# 请求深圳快瞳居民身份证识别接口
response = request.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"),
headers={
"X-RequestId-Header": image["image_guid"]
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 将影像件格式和BASE64编码嵌入数据统一资源标识符
}, # 深圳快瞳支持同时识别居民国徽面和头像面
guid=md5(string=(url + image["image_guid"]).encode("utf-8"))
.hexdigest()
.upper(),
)
if not (response.get("status") == 200 and response.get("code") == 0):
raise RuntimeError("请求深圳快瞳居民身份证识别接口发生异常")
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(头像面)",
]:
dossier["insured_person_layer"].update(
{
"insured_person": (
insured_person := response["data"]["name"]
), # 被保险人姓名
"identity_type": (identity_type := "居民身份证"), # 证件类型
"identity_number": (
identity_number := response["data"]["idNo"]
), # 证件号码
"gender": response["data"]["sex"], # 性别
"birth_date": (
birth_date := datetime.strptime(
response["data"]["birthday"], "%Y-%m-%d"
)
), # 出生日期转为日期时间datetime对象格式默认为%Y-%m-%d
"age": calculate_age(
report_time=dossier["report_layer"]["report_time"],
birth_date=birth_date,
), # 年龄
"province": (
residential_address := parse_location(
location_text=response["data"]["address"]
)
).get(
"province"
), # 就住址解析为所在省、市、区和详细地址
"city": residential_address.get("city"),
"district": residential_address.get("county"),
"detailed_address": residential_address.get("detail"),
}
)
# 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询个单
dossier["liabilities_layer"] = masterdata.query_liabilities(
insurer_company=insurer_company,
insured_person=insured_person,
identity_type=identity_type,
identity_number=identity_number,
report_date=dossier["report_layer"]["report_time"].strftime("%Y-%m-%d"),
)
if image["image_type"] in [
"居民身份证(国徽、头像面)",
"居民身份证(国徽面)",
]:
dossier["insured_person_layer"].update(
{
"commencement_date": datetime.strptime(
(period := response["data"]["validDate"].split("-"))[0],
"%Y.%m.%d",
), # 就有效期限解析为有效起期和有效止期。其中若有效止期为长期则默认为9999-12-31
"termination_date": (
datetime(9999, 12, 31)
if period[1] == "长期"
else datetime.strptime(period[1], "%Y.%m.%d")
),
}
)
def calculate_age(report_time: datetime, birth_date: datetime) -> int:
"""
计算周岁
:param report_time: 报案时间
:param birth_date: 出生日期
:return 周岁
"""
age = report_time.year - birth_date.year
return (
age - 1
if (report_time.month, report_time.day)
< (
birth_date.month,
birth_date.day,
)
else age
) # 若报案时间的月和日小于出生日期的月和日则前减去一岁
def mlm_recognize(image: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
"""
使用多模态大模型就理赔申请书进行光学字符识别并结构化识别结果
:param image: 影像件
@ -344,7 +402,7 @@ def mlm_recognize(image, schema) -> Dict[str, Any]:
"type": "image_url",
"image_url": {
"url": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}"
}, # 影像件BASE64编码嵌入数据统一资源标识符
}, # 影像件格式和BASE64编码嵌入数据统一资源标识符
},
{
"type": "text",
@ -368,7 +426,7 @@ def mlm_recognize(image, schema) -> Dict[str, Any]:
}
),
guid=md5(
json.dumps(
string=json.dumps(
json_,
sort_keys=True,
ensure_ascii=False,
@ -380,16 +438,16 @@ def mlm_recognize(image, schema) -> Dict[str, Any]:
# 就响应中消息内容JSON反序列化
try:
return json.loads(response["choices"][0]["message"]["content"])
# TODO: 若请求火山引擎多模态大模型接口发生异常则流转至人工处理
return json.loads(s=response["choices"][0]["message"]["content"])
except Exception as exception:
raise RuntimeError("请求火山引擎多模态大模型接口发生异常") from exception
def boc_application_recognize(image: str) -> None:
def boc_application_recognize(image: Dict[str, Any], dossier: Dict[str, Any]) -> None:
"""
识别中银保险有限公司的理赔申请书并整合至赔案档案
:param image: 影像件
:param dossier: 赔案档案
:return:
"""
# JSON格式
@ -491,7 +549,7 @@ def boc_application_recognize(image: str) -> None:
}
# 使用多模态大模型就理赔申请书进行光学字符识别并结构化识别结果
recognition = mlm_recognize(image, schema)
recognition = mlm_recognize(image=image, schema=schema)
dossier["insured_person_layer"].update(
{
"phone_number": recognition["手机"],
@ -502,11 +560,14 @@ def boc_application_recognize(image: str) -> None:
)
def application_recognize(image, insurer_company) -> None:
def application_recognize(
image: Dict[str, Any], insurer_company: str, dossier: Dict[str, Any]
) -> None:
"""
识别理赔申请书并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:param dossier: 赔案档案
:return:
"""
@ -514,91 +575,17 @@ def application_recognize(image, insurer_company) -> None:
match insurer_company:
# 中银保险有限公司
case _ if insurer_company.startswith("中银保险有限公司"):
boc_application_recognize(image)
boc_application_recognize(image=image, dossier=dossier)
def fuzzy_match(contents: List[Dict[str, Any]], key: str) -> str:
"""
根据内容列表基于深圳快瞳增值税发票和医疗收费票据识别结果模糊匹配键名
:param contents: 内容列表
:param key: 键名
:return
"""
match contents[0].keys():
# 对应深圳快瞳增值税发票识别结果
case _ if "desc" in contents[0].keys():
for content in contents:
if content["desc"] == key:
return content["value"] if content["value"] else ""
candidates = []
for content in contents:
candidates.append(
(
content["value"],
fuzz.WRatio(
content["desc"], key, force_ascii=False
), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度
)
)
return (
(result[0] if result[0] else "")
if (result := max(candidates, key=lambda x: x[1]))[1] >= 80
else ""
) # 返回似度>=80且最大的值
# 对应深圳快瞳医疗收费票据识别结果
case _ if "name" in contents[0].keys():
for content in contents:
if content["name"] == key:
return (
content["word"]["value"] if content["word"]["value"] else ""
)
candidates = []
for content in contents:
candidates.append(
(
content["word"]["value"],
fuzz.WRatio(
content["name"], key, force_ascii=False
), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度
)
)
return (
(result[0] if result[0] else "")
if (result := max(candidates, key=lambda x: x[1]))[1] >= 80
else ""
) # 返回>=80且最大的相似度的值
return ""
def parse_item(item: str) -> Tuple[str, Optional[str]]:
"""
根据明细项解析明细项类别和具体内容并根据具体内容查询药品/医疗服务
:param item: 明细项
return 明细项类别和药品/医疗服务
"""
if match := re.match(
r"^\*(?P<category>.*?)\*(?P<specific>.*)$",
item,
):
return match.group("category"), masterdata.query_medicine(
match.group("specific")
)
# 一般增值税发票明细项格式形如*{category}*{specific}其中category为明细项类别例如中成药specific为明细项具体内容例如[同仁堂]金贵肾气水蜜丸 300丸/瓶,需要据此查询药品。其它格式则将明细项内容作为明细项类别,药品为空值
else:
return item, None
def receipt_recognize(image, insurer_company) -> None:
def receipt_recognize(
image: Dict[str, Any], insurer_company: str, dossier: Dict[str, Any]
) -> None:
"""
识别票据并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:param dossier: 赔案档案
:return:
"""
# 初始化票据数据
@ -611,9 +598,9 @@ def receipt_recognize(image, insurer_company) -> None:
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符
"imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件格式和BASE64编码嵌入数据统一资源标识符
},
guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
guid=md5(string=(url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
)
# 若查验状态为真票或红票则直接整合至赔案档案
if response.get("status") == 200 and response.get("code") == 10000:
@ -781,7 +768,6 @@ def receipt_recognize(image, insurer_company) -> None:
.hexdigest()
.upper(),
)
# TODO: 若请求深圳快瞳增值税发票识别接口发生异常则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise RuntimeError("请求深圳快瞳增值税发票识别接口发生异常")
@ -945,7 +931,6 @@ def receipt_recognize(image, insurer_company) -> None:
.hexdigest()
.upper(),
)
# TODO: 若请求深圳快瞳医疗收费票据识别接口发生异常则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise
@ -1080,6 +1065,7 @@ def receipt_recognize(image, insurer_company) -> None:
.assign(
reasonable_amount=lambda dataframe: dataframe.apply(
lambda row: Decimal(
# 基于扣除明细项不合理费用决策规则评估
rules_engine.evaluate(
decision="扣除明细项不合理费用",
inputs={
@ -1105,7 +1091,7 @@ def receipt_recognize(image, insurer_company) -> None:
if dossier["insured_person_layer"]["insured_person"]
in receipt["payer"]
else None
), # 出险人
), # 出险人姓名
"accident": "药店购药", # 出险事故
"diagnosis": "购药拟诊", # 医疗诊断
"personal_self_payment": Decimal("0.00"), # 个人自费金额
@ -1126,16 +1112,12 @@ def receipt_recognize(image, insurer_company) -> None:
"items": items.to_dict("records"),
}
)
# TODO: 后续完善就购药及就医类型为门诊就诊(私立医院)处理
case ("增值税发票", "私立医院"):
receipt["购药及就医类型"] = "门诊就医"
# TODO: 后续完善就购药及就医类型为门诊就诊(公立医院)处理
case ("医疗门诊收费票据", "公立医院"):
receipt["购药及就医类型"] = "门诊就医"
# TODO: 后续完善就购药及就医类型为住院治疗处理
case ("医疗住院收费票据", "公立医院"):
receipt["购药及就医类型"] = "住院治疗"
# TODO: 若根据影像件类型和购药及就医机构类型匹配购药及就医类型发生异常则流转至人工处理
case _:
raise RuntimeError(
"根据影像件类型和购药及就医机构类型匹配购药及就医类型发生异常"
@ -1144,10 +1126,86 @@ def receipt_recognize(image, insurer_company) -> None:
dossier["receipts_layer"].append(receipt)
def bank_card_recognize(image) -> None:
def fuzzy_match(contents: List[Dict[str, Any]], key: str) -> str:
"""
根据内容列表基于深圳快瞳增值税发票和医疗收费票据识别结果模糊匹配键名
:param contents: 内容列表
:param key: 键名
:return
"""
match contents[0].keys():
# 对应深圳快瞳增值税发票识别结果
case _ if "desc" in contents[0].keys():
for content in contents:
if content["desc"] == key:
return content["value"] if content["value"] else ""
candidates = []
for content in contents:
candidates.append(
(
content["value"],
fuzz.WRatio(
content["desc"], key, force_ascii=False
), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度
)
)
return (
(result[0] if result[0] else "")
if (result := max(candidates, key=lambda x: x[1]))[1] >= 80
else ""
) # 返回似度>=80且最大的值
# 对应深圳快瞳医疗收费票据识别结果
case _ if "name" in contents[0].keys():
for content in contents:
if content["name"] == key:
return content["word"]["value"] if content["word"]["value"] else ""
candidates = []
for content in contents:
candidates.append(
(
content["word"]["value"],
fuzz.WRatio(
content["name"], key, force_ascii=False
), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度
)
)
return (
(result[0] if result[0] else "")
if (result := max(candidates, key=lambda x: x[1]))[1] >= 80
else ""
) # 返回>=80且最大的相似度的值
return ""
def parse_item(item: str) -> Tuple[str, Optional[str]]:
"""
根据明细项解析明细项类别和具体内容并根据具体内容查询药品/医疗服务
:param item: 明细项
return 明细项类别和药品/医疗服务
"""
if match := re.match(
r"^\*(?P<category>.*?)\*(?P<specific>.*)$",
item,
):
return match.group("category"), masterdata.query_medicine(
match.group("specific")
)
# 一般增值税发票明细项格式形如*{category}*{specific}其中category为明细项类别例如中成药specific为明细项具体内容例如[同仁堂]金贵肾气水蜜丸 300丸/瓶,需要据此查询药品。其它格式则将明细项内容作为明细项类别,药品为空值
else:
return item, None
def bank_card_recognize(image: Dict[str, Any], dossier: Dict[str, Any]) -> None:
"""
识别银行卡并整合至赔案档案
:param image: 影像件
:param dossier: 赔案档案
:return:
"""
# 请求深圳快瞳银行卡识别接口
@ -1162,7 +1220,6 @@ def bank_card_recognize(image) -> None:
},
guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(),
)
# TODO: 若响应非成功则流转至人工处理
if not (
response.get("status") == 200
and response.get("code") == 0
@ -1178,45 +1235,3 @@ def bank_card_recognize(image) -> None:
"account_number": response["data"]["cardNo"].replace(" ", ""),
}
)
def image_recognize(
image,
insurer_company,
) -> None:
"""
识别影像件并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# 基于影像件识别使能规则评估影像件是否识别
if not rules_engine.evaluate(
decision="影像件识别使能",
inputs={
"insurer_company": insurer_company,
"image_type": image["image_type"],
},
)["recognize_enabled"]:
return
# 根据影像件类型匹配影像件识别方法
match image["image_type"]:
# TODO: 后续添加居民户口簿识别和整合方法
case "居民户口簿":
raise RuntimeError("暂不支持居民户口簿")
case "居民身份证(国徽、头像面)" | "居民身份证(国徽面)" | "居民身份证(头像面)":
# 居民身份证识别并整合至赔案档案
identity_card_recognize(image, insurer_company)
# TODO: 后续添加居民户口簿识别和整合方法
case "中国港澳台地区及境外护照":
raise RuntimeError("暂不支持中国港澳台地区及境外护照")
# TODO: 暂仅支持增值税发票识别且购药及就医类型为药店购药整合至赔案档案,后续逐步添加
case "理赔申请书":
application_recognize(image, insurer_company)
case "增值税发票" | "医疗门诊收费票据" | "医疗住院收费票据":
# 票据识别并整合至赔案档案
receipt_recognize(image, insurer_company)
case "银行卡":
# 银行卡识别并整合至赔案档案
bank_card_recognize(image)

View File

@ -1,29 +1,27 @@
# -*- coding: utf-8 -*-
"""
票据理赔自动化
票据理赔自动化主模块
功能清单
https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink
"""
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
from case import case_adjust
from image import image_classify, image_recognize
from jinja2 import Environment, FileSystemLoader
import pandas
from common import dossier, rule_engine
from image import image_classify
from image import image_recognize
if __name__ == "__main__":
# 初始化工作目录路径
workplace_path = Path("directory")
workplace_path.mkdir(parents=True, exist_ok=True) # 若工作目录不存在则创建
# 初始化文件路径
file_path = Path(__file__).parent
# 初始化所有赔案的文件夹路径需要注意在TraeCN中文件路径需显式声明
folder_path = file_path / "directory"
folder_path.mkdir(parents=True, exist_ok=True) # 若文件夹路径不存在则创建
# 实例化JINJA2环境
environment = Environment(loader=FileSystemLoader("."))
environment = Environment(loader=FileSystemLoader(file_path))
# 添加DATE过滤器
environment.filters["date"] = lambda date: (
date.strftime("%Y-%m-%d") if date else "长期"
@ -31,130 +29,46 @@ if __name__ == "__main__":
# 加载赔案档案模版
template = environment.get_template("template.html")
# -------------------------
# 自定义方法
# -------------------------
# noinspection PyShadowingNames
def case_adjust() -> None:
"""
理算赔案并整合至赔案档案
:return:
"""
def receipt_adjust(row: pandas.Series) -> List[Dict[str, Any]]:
"""
票据理算
:param row: 票据
:return: 理算记录
"""
date = row["date"]
current_type = row["就诊类型"]
current_amount = row["合理金额"]
remaining_claim = current_amount
claim_details = []
if current_amount <= 0:
return []
# 筛选有效保单并排序
valid_rules = sorted(
[
r
for r in policy_rules
if current_type in r["就诊类型"]
and r["生效日期"] <= current_date <= r["失效日期"]
and r["剩余额度"] > 0.0
],
key=lambda x: x["剩余额度"],
reverse=True,
)
# 循环分摊赔付,生成分明细列表
for rule in valid_rules:
if remaining_claim <= 0.0:
break
pay_ratio = rule["赔付比例"]
rule_name = rule["责任名称"]
remaining_quota = rule["剩余额度"]
max_payable = remaining_claim * pay_ratio
actual_pay = min(remaining_quota, max_payable)
if actual_pay > 0.0:
corresponding_actual_amount = actual_pay / pay_ratio
# 构建明细字典字段与后续DataFrame列对应
detail = {
"就诊类型": current_type,
"就诊合理金额": current_amount,
"保单责任名称": rule_name,
"保单赔付比例": pay_ratio,
"保单本次赔付金额": round(actual_pay, 2),
"本次对应合理金额部分": round(corresponding_actual_amount, 2),
"保单赔付后剩余额度": round(remaining_quota - actual_pay, 2),
}
claim_details.append(detail)
# 更新保单额度和剩余待赔付金额
rule["剩余额度"] -= actual_pay
remaining_claim -= corresponding_actual_amount
return claim_details
# 基于据拒付规则评估
if not (result := rule_engine.evaluate(decision="拒付", inputs=dossier)):
# TODO: 若评估结果为空值(保险分公司未配置拒付规则)则流转至人工处理
raise
dossier["adjustment_layer"].update(
{
"conclusion": result["conclusion"], # 理赔结论
"explanation": result["explanation"], # 结论说明
}
)
if result["conclusion"] == "拒付":
return
adjustments = (
pandas.DataFrame(dossier["receipts_layer"]).assing(
adjustments=lambda dataframe: dataframe.apply(
receipt_adjust, axis="columns"
)
)
).explode("adjustments", ignore_index=True)
print(adjustments)
# 遍历工作目录中赔案目录并创建赔案档案(模拟自动化域就待自动化任务创建理赔档案)
for case_path in [x for x in workplace_path.iterdir() if x.is_dir()]:
# 初始化赔案档案保险公司将提供投保公司、保险分公司和报案时间等TPA作业系统签收后生成赔案号
dossier["report_layer"].update(
{
"report_time": datetime(2025, 7, 25, 12, 0, 0), # 指定报案时间
"case_number": case_path.stem, # 设定:赔案目录名称为赔案号
# 遍历文件夹中赔案文件夹并创建赔案档案
for case_path in [x for x in folder_path.iterdir() if x.is_dir()]:
# 初始化赔案档案推送至TPA时保险公司会提保险分公司名称、报案时间和影像件等TPA签收后生成赔案号
dossier = {
"report_layer": {
"report_time": datetime(
2025, 7, 25, 12, 0, 0
), # 指定报案时间,默认为 datetime对象
"case_number": case_path.stem, # 默认为赔案文件夹名称
"insurer_company": (
insurer_company := "中银保险有限公司苏州分公司"
), # 指定保险分公司
}
)
# 遍历赔案目录中影像件
), # 默认为中银保险有限公司苏州分公司
}, # 报案层
"images_layer": [], # 影像件层
"insured_person_layer": {}, # 出险人层
"liabilities_layer": [], # 理赔责任层
"receipts_layer": [], # 票据层
"adjustment_layer": {}, # 理算层
}
# 遍历赔案文件夹内所有影像件路径
for image_index, image_path in enumerate(
sorted(
[
x
for x in case_path.glob(pattern="*")
if x.is_file() and x.suffix.lower() in [".jpg", ".jpeg", ".png"]
], # 实际作业亦仅支持JPG、JPEG或PNG
key=lambda x: x.stat().st_ctime, # 根据影像件创建时间顺序排序
i
for i in case_path.glob(pattern="*")
if i.is_file() and i.suffix.lower() in [".jpg", ".jpeg", ".png"]
],
key=lambda i: i.stat().st_birthtime, # 根据影像件创建时间顺序排序
),
1,
):
# 分类影像件并旋正(较初审自动化无使能检查)
image_classify(image_index, image_path)
# 分类影像件、旋正并整合至赔案档案
image_classify(
image_index=image_index, image_path=image_path, dossier=dossier
)
# 就影像件层按照影像件类型指定排序
dossier["images_layer"].sort(
key=lambda x: [
key=lambda i: [
"居民户口簿",
"居民身份证(国徽面)",
"居民身份证(头像面)",
@ -167,34 +81,20 @@ if __name__ == "__main__":
"医疗费用清单",
"银行卡",
"其它",
].index(x["image_type"])
].index(i["image_type"])
)
# 遍历影像件层影像件
# 遍历影像件层影像件
for image in dossier["images_layer"]:
# 识别影像件并整合至赔案档案
image_recognize(
image,
insurer_company,
image=image,
insurer_company=insurer_company,
dossier=dossier,
)
# 就票据层按照开票日期和票据号顺序排序
dossier["receipts_layer"].sort(key=lambda x: (x["date"], x["number"]))
print(dossier["insured_persons_layer"])
exit()
# 理算
case_adjust()
print(dossier["adjustment_layer"])
for receipt in dossier["receipts_layer"]:
print(receipt)
print(dossier["report_layer"])
print(dossier["insured_person_layer"])
print(dossier["insured_persons_layer"])
dossier.pop("images_layer")
dossier.pop("receipts_layer")
# 理算赔案并整合至赔案档案
case_adjust(dossier=dossier)

View File

@ -3,10 +3,10 @@
主数据模块
"""
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
import sys
from datetime import datetime
from decimal import ROUND_HALF_UP, Decimal
from pathlib import Path
from typing import Any, Dict, List, Optional
sys.path.append(Path(__file__).parent.parent.as_posix())
@ -21,12 +21,13 @@ class MasterData(SQLite):
query_medicine根据明细项中具体内容查询药品/医疗服务名称
"""
def __init__(self):
def __init__(self, database: Path):
"""
初始化
:param database: 数据库路径
"""
# 初始化SQLite客户端
super().__init__(database="database.db")
super().__init__(database=database)
try:
with self:
# 初始化团单表
@ -86,14 +87,14 @@ class MasterData(SQLite):
)
"""
)
# 初始化责任表
# 初始化理赔责任表
self.execute(
sql="""
CREATE TABLE IF NOT EXISTS liabilities
(
--责任唯一标识
--理赔责任唯一标识
guid TEXT PRIMARY KEY,
--责任名称
--理赔责任名称
liability TEXT NOT NULL,
--出险事故
accident TEXT NOT NULL,
@ -103,10 +104,10 @@ class MasterData(SQLite):
non_medical_ratio TEXT NOT NULL,
--合理理算比例
reasonable_ratio TEXT NOT NULL,
--保单唯一标识
adjust_policy_guid TEXT NOT NULL,
--保单唯一标识
claim_policy_guid TEXT NOT NULL,
--个单唯一标识
person_policy_guid TEXT NOT NULL
person_policy_guid TEXT NOT NULL
)
"""
)
@ -159,7 +160,7 @@ class MasterData(SQLite):
"""
)
except Exception as exception:
raise RuntimeError(f"初始化数据发生异常:{str(exception)}") from exception
raise RuntimeError(f"初始化数据发生异常:{str(exception)}") from exception
def query_liabilities(
self,
@ -168,7 +169,7 @@ class MasterData(SQLite):
identity_type: str,
identity_number: str,
report_date: str,
) -> Optional[List[Dict[str, Any]]]:
) -> List[Dict[str, Any]]:
"""
根据保险分公司名称被保险人姓名证件类型证件号码和报案时间查询被保险人的理赔责任
:param insurer_company: 保险分公司名称
@ -199,7 +200,7 @@ class MasterData(SQLite):
liabilities.personal_self_ratio,
liabilities.non_medical_ratio,
liabilities.reasonable_ratio,
liabilities.adjust_policy_guid
liabilities.claim_policy_guid
FROM insured_persons
INNER JOIN person_policies
ON insured_persons.person_policy_guid = person_policies.guid
@ -211,10 +212,10 @@ class MasterData(SQLite):
INNER JOIN liabilities
ON person_policies.guid = liabilities.person_policy_guid
INNER JOIN coverage_changes
ON liabilities.adjust_policy_guid = coverage_changes.change_policy_guid
ON liabilities.claim_policy_guid = coverage_changes.change_policy_guid
AND coverage_changes.change_time = (SELECT MAX(change_time)
FROM coverage_changes
WHERE liabilities.adjust_policy_guid = change_policy_guid)
WHERE liabilities.claim_policy_guid = change_policy_guid)
WHERE group_policies.insurer_company = ?
AND insured_persons.insured_person = ?
AND insured_persons.identity_type = ?
@ -222,6 +223,7 @@ class MasterData(SQLite):
AND ? BETWEEN group_policies.commencement_date AND group_policies.termination_date
AND ? BETWEEN person_policies.commencement_date AND person_policies.termination_date
AND CAST(coverage_changes.after_change_amount AS REAL) > 0
ORDER BY commencement_date
""",
parameters=(
insurer_company,
@ -232,7 +234,6 @@ class MasterData(SQLite):
report_date,
),
)
# TODO: 若查无数据则流转至人工处理
if not result:
raise RuntimeError("查无数据")
@ -256,7 +257,7 @@ class MasterData(SQLite):
except Exception as exception:
raise RuntimeError(f"{str(exception)}") from exception
def query_institution_type(self, institution: str) -> Optional[str]:
def query_institution_type(self, institution: str) -> str:
"""
根据购药及就医机构名称查询购药及就医机构类型
:param institution: 购药及就医机构名称
@ -272,7 +273,6 @@ class MasterData(SQLite):
""",
parameters=(institution,),
)
# TODO: 若查无数据则流转至人工处理
if not result:
raise RuntimeError("查无数据")
@ -284,13 +284,12 @@ class MasterData(SQLite):
def query_medicine(
self,
content: str,
) -> Optional[str]:
) -> str:
"""
根据明细项中具体内容查询药品/医疗服务名称
:param content: 明细项具体内容
:return: 药品/医疗服务名称
"""
# TODO: 后续提供医疗耗材和服务查询
try:
with self:
result = self.query_all(
@ -301,7 +300,6 @@ class MasterData(SQLite):
""",
parameters=(content,),
)
# TODO: 若查无数据则流转至人工处理
if not result:
raise RuntimeError("查无数据")
@ -311,3 +309,35 @@ class MasterData(SQLite):
except Exception as exception:
raise RuntimeError(f"{str(exception)}") from exception
def query_remaining_amount(
self,
policy_guid: str,
) -> Decimal:
"""
根据变动保单唯一标识查询最新一条保额变动记录的变动后金额
:param policy_guid: 变动保单唯一标识
:return: 变动后金额
"""
try:
with self:
result = self.query_one(
sql="""
SELECT after_change_amount
FROM coverage_changes
WHERE change_policy_guid = ?
ORDER BY change_time DESC
LIMIT 1;
""",
parameters=(policy_guid,),
)
if not result:
raise RuntimeError("查无数据")
return Decimal(result["after_change_amount"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
except Exception as exception:
raise RuntimeError(f"{str(exception)}") from exception