日常更新

from NUC
This commit is contained in:
liubiren 2025-12-16 22:18:59 +08:00
parent 1f0be4f198
commit ef3699973a
4 changed files with 318 additions and 325 deletions

Binary file not shown.

View File

@ -15,7 +15,7 @@ from email.utils import parsedate_to_datetime
from functools import wraps
from imaplib import IMAP4_SSL
from pathlib import Path
from typing import Any, Callable, Dict, Generator, Literal, Optional, Tuple, Union
from typing import Any, Callable, Dict, Generator, List, Literal, Optional, Tuple, Union
from urllib.parse import quote_plus
from xml.etree import ElementTree
@ -188,6 +188,39 @@ class SQLiteClient:
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _query_all(
self, sql: str, parameters: Tuple[Any, ...] = ()
) -> List[Dict[str, Any]]:
"""
为当前线程查询并获取多行数据
:param sql: 查询SQL语句
:param parameters: SQL参数
:return: 多行数据
"""
# noinspection PyBroadException
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无数据库游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建数据库游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
result = []
while batch := self.threads.cursor.fetchmany(1000):
result.extend([dict(row) for row in batch])
return result
# 若发生异常则回滚事务并抛出异常
except Exception as exception:
# 检查当前线程有数据库连接,若有则回滚
if (
hasattr(self.threads, "connection")
and self.threads.connection is not None
):
self.threads.connection.rollback()
raise RuntimeError("为当前线程查询并获取单行数据发生异常") from exception
def _execute(self, sql: str, parameters: Tuple[Any, ...] = ()) -> bool:
"""
为当前线程执行SQL

Binary file not shown.

View File

@ -8,7 +8,6 @@ https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink
import json
import re
import uuid
from base64 import b64encode
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
@ -25,223 +24,9 @@ from zen import ZenDecision, ZenEngine
from utils.client import Authenticator, HTTPClient, SQLiteClient
# from utils.ocr import fuzzy_match
def common_extraction(**kwargs) -> dict | None:
"""通用数据提取"""
# 影像件全局唯一标识:优先使用关键词变量,其次使用全局变量,再次使用随机唯一标识
image_guid = kwargs.get(
"image_guid", globals().get("image_guid", uuid.uuid4().hex.upper())
)
# 影像件格式
image_format = kwargs.get("image_format", globals()["image_format"])
if image_format is None:
raise RuntimeError("请入参image_format")
# 影像件BASE64编码
image_base64 = kwargs.get("image_base64", globals()["image_base64"])
if image_base64 is None:
raise RuntimeError("请入参image_base64")
# 请求深圳快瞳通用文本识别接口
response = globals()["http_client"].post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/general"),
headers={"X-RequestId-Header": image_guid},
data={
"token": globals()["authenticator"].get_token(servicer="szkt"),
"imgBase64": f"data:image/{image_format};base64,{image_base64}",
},
guid=hashlib.md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功则返回NONE
if not (response.get("status") == 200 and response.get("code") == 0):
return None
# 基于空间坐标法就识别结果中文本框进行分行排序
texts = []
# 重构文本框列表
for text in response["data"]:
texts.append(
[
# 文本框左上角的X坐标
numpy.float64(text["itemPolygon"]["x"]),
# 文本框左上角的Y坐标
numpy.float64(text["itemPolygon"]["y"]),
# 文本框的高度
numpy.float64(
text["itemPolygon"]["height"]
), # 深圳快瞳基于文本框的Y坐标最大值和最小值的差值
text["value"],
]
)
# 按照文本框Y坐标升序使用空间坐标算法从上到下从左到右
texts.sort(key=lambda x: x[1])
rows = []
# 分行
for index, text in enumerate(texts[1:]):
# 若为第一行则初始化当前行
if index == 0:
row = [texts[0]]
continue
# 若文本框的Y坐标与当前行中最后一个文本框的Y坐标差值小于阈值则归为同一行
# noinspection PyUnboundLocalVariable
# noinspection PyTypeChecker
if text[1] - row[-1][1] < numpy.mean([x[2] for x in row]) * 0.5:
row.append(text)
# 否则结束当前行、初始化当前行
else:
rows.append(row)
row = [text]
# 添加最后一行
rows.append(row)
extraction = []
# 按照文本框X坐标升序
for row in rows:
extraction.extend(
[x[3].replace(" ", "") for x in sorted(row, key=lambda x: x[0])]
)
# 以空格拼接
extraction = " ".join(extraction)
# 根据理赔申请书匹配提示词
match application_form := kwargs.get(
"application_form", globals().get("application_form")
):
case "中行员工福利保障计划索赔申请书":
prompt = f"""
指令你是一个从OCR文本中智能提取信息并生成JSON的工具请严格按照要求执行
输入OCR文本可能包含错漏
{extraction}
输出要求
1只输出可被Python中json.loads()解析的JSON格式字符串不包含任何代码块标记说明文字等其它非JSON格式内容
2无法确定的值设置为`null`不是"null"字符串
JSON结构
{{
"基础信息": {{
"申请人": "字符串或null",
"性别": "字符串或null",
"年龄": "字符串或null",
"手机": "字符串或null",
"身份证号": "字符串或null",
"开户银行": "字符串或null",
"户名": "字符串或null",
"账号": "字符串或null",
}},
"票据表格": [
{{
"就诊序号": "字符串或null",
"发票日期": "YYYY-MM-DD或null",
"发票上的就诊医院/药店": "字符串或null",
"票据张数": "字符串或null",
"票据金额": "字符串或null",
"诊断": "字符串或null"
}},
]
}}
开始输出
"""
case _:
raise RuntimeError(f"理赔申请书{application_form}未设置处理方法")
# 请求大语言模型创建对话接口
response = globals()["http_client"].post(
url="https://api.siliconflow.cn/v1/chat/completions",
headers={
"Authorization": "Bearer sk-xsnuwirjjphhfdbvznfdfjqlinfdlrnlxuhkbbqynfnbhiqz", # 基于硅基流动
"Content-Type": "application/json; charset=utf-8",
},
json={
"model": "deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", # 通过从DeepSeek-R1-0528模型蒸馏思维链接至Qwen3-8B-Base获得的模型
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 10240, # 生成文本最大令牌数
"temperature": 0.2,
"top_p": 0.5,
"top_k": 20,
"frequency_penalty": 0.0,
"thinking_budget": 1,
},
guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(),
)
extraction = (
json.loads(match.group("json"))
if (
match := re.search(
r"```json\s*(?P<json>\{.*})\s*```",
response["choices"][0]["message"]["content"],
re.DOTALL,
)
)
else None
)
return extraction
def disease_diagnosis(**kwargs) -> str | None:
"""疾病推定"""
# 赔案档案:优先使用关键词变量,其次使用全局变量
dossier = kwargs.get("dossier", globals().get("dossier"))
prompt = f"""
指令你是一个医学疾病分类诊断的工具请严格按照要求执行
患者信息
性别 {gender if (gender := dossier["赔案层"]["申请人信息"].get("性别")) is not None else "未知"}
年龄 {age if (age := dossier["赔案层"]["申请人信息"].get("年龄")) is not None else "未知"}
近期在药房/医院开具发票中内容 {dossier["赔案层"]["其它信息"]["小项合集"]}
输出要求
1患者自述症状在 {dossier["赔案层"]["其它信息"]["自述症状"]} 其中之一
2依据患者信息自述症状和其提供的发票中内容 {kwargs["items"]} 综合诊断只输出一个最可能的ICD-11中的疾病分类中亚类目代码对应的中文名称字符串不包含任何代码块标记说明文字等
开始输出
"""
# 请求大语言模型创建对话接口
response = globals()["http_client"].post(
url="https://ark.cn-beijing.volces.com/api/v3/chat/completions",
headers={
"Authorization": "Bearer 2c28ab07-888c-45be-84a2-fc4b2cb5f3f2", # 火山引擎
"Content-Type": "application/json; charset=utf-8",
},
json={
"model": "deepseek-r1-250528",
"messages": [
{"role": "system", "content": "你是人工智能助手"},
{"role": "user", "content": prompt},
],
"temperature": 0.2,
"top_p": 0.5,
"top_k": 20,
"frequency_penalty": 0.0,
"thinking_budget": 1,
},
guid=hashlib.md5(prompt.encode("utf-8")).hexdigest().upper(),
)
recognition = (
match.group("text")
if (
match := re.match(
r"\s*(?P<text>.*)", response["choices"][0]["message"]["content"]
)
)
else None
)
return recognition
# -------------------------
# 主逻辑
# -------------------------
@ -298,57 +83,41 @@ if __name__ == "__main__":
--所在市
city TEXT NOT NULL
)
"""
"""
)
# 初始化团单表
# 初始化在保被保人表TPA作业系统包括团单、个单和被保人,此处直接整合为宽表)
self._execute(
sql="""
CREATE TABLE IF NOT EXISTS group_policies
CREATE TABLE IF NOT EXISTS insured_person_policies
(
--团单号一张团单包括多张个单
group_policy TEXT NOT NULL,
--被保人
insured_person TEXT NOT NULL,
--被保人的证件类型
identity_type TEXT NOT NULL,
--被保人的证件号码
identity_number TEXT NOT NULL,
--与主被保人关系包括本人和附属配偶父母和子女
relationship TEXT NOT NULL,
--个单号
person_policy TEXT NOT NULL,
--主被保人
master_insured_person TEXT NOT NULL,
--保险起期取个单和团单起期最大值
commencement_date REAL NOT NULL,
--保险止期取个单和团单止期最小值
termination_date REAL NOT NULL,
--团单号
group_policy TEXT NOT NULL,
--投保公司
insurance_company TEXT NOT NULL,
insurance_company TEXT NOT NULL,
--保险分公司
insurer_company TEXT NOT NULL,
--团单有效起期
from_date REAL NOT NULL,
--团单有效止期
to_date REAL NOT NULL,
--联合主键团单号+投保公司+保险分公司
PRIMARY KEY (group_policy, insurance_company, insurer_company)
insurer_company TEXT NOT NULL,
--联合主键投保公司+保险分公司+被保人+被保人的证件类型+被保人的证件号码
PRIMARY KEY (insurance_company, insurer_company, insured_person, identity_type,
identity_number)
)
"""
)
# 初始化个单表
self._execute(
sql="""
CREATE TABLE IF NOT EXISTS person_policies
(
group_policy TEXT NOT NULL,
person_policy TEXT NOT NULL,
from_date REAL NOT NULL,
to_date REAL NOT NULL,
PRIMARY KEY (person_policy, group_policy)
)
"""
)
# 初始化被保人表
self._execute(
sql="""
CREATE TABLE IF NOT EXISTS insured_persons
(
insured_person TEXT NOT NULL,
identity_type TEXT NOT NULL,
identity_number TEXT NOT NULL,
relationship TEXT NOT NULL,
person_policy TEXT NOT NULL,
PRIMARY KEY (person_policy, insured_person, identity_type, identity_number)
)
"""
"""
)
except Exception as exception:
@ -368,7 +137,11 @@ if __name__ == "__main__":
with self:
# noinspection SqlResolve
result = self._query_one(
sql="SELECT institution_type FROM institutions WHERE institution = ?",
sql="""
SELECT institution_type
FROM institutions
WHERE institution = ?
""",
parameters=(institution,),
)
return (
@ -380,10 +153,70 @@ if __name__ == "__main__":
) from exception
# noinspection PyShadowingNames
def query_individual_policy(
self, insurer_company: str, certificate_type: str, certificate_number: str
def query_insured_person_records(
self,
insurance_company: str,
insurer_company: str,
insured_person: str,
identity_type: str,
identity_number: str,
) -> Optional[List[Dict[str, Any]]]:
pass
"""
查询并获取多条被保人记录例如若夫妻同在投保公司则互为附加被保人一方被保人记录包括本人和配偶两条
:param insurance_company: 投保公司
:param insurer_company: 保险分公司
:param insured_person: 被保人
:param identity_type: 被保人的证件类型
:param identity_number: 被保人的证件号码
:return: 被保人记录
"""
# noinspection PyBroadException
try:
with self:
# noinspection SqlResolve
result = self._query_all(
sql="""
SELECT insured_person AS "被保人",
relationship AS "与主被保人关系",
person_policy AS "个单号",
master_insured_person AS "主被保人",
commencement_date AS "保险起期",
termination_date AS "保险止期"
FROM insured_person_policies
WHERE insurance_company = ?
AND insurer_company = ?
AND insured_person = ?
AND identity_type = ?
AND identity_number = ?
""",
parameters=(
insurance_company,
insurer_company,
insured_person,
identity_type,
identity_number,
),
)
return (
None
if result == []
else [
{
k: (
datetime.fromtimestamp(v)
if k in ["保险起期", "保险止期"]
else v
)
for k, v in e.items()
}
for e in result
] # 将保险起期和保险止期由时间戳转为datetime对象
)
except Exception as exception:
raise RuntimeError(
"查询并获取多条个单和被保人记录发生异常"
) from exception
# 实例化主数据
master_data = MasterData()
@ -605,6 +438,73 @@ if __name__ == "__main__":
:return:
"""
# noinspection PyShadowingNames
def general_text_recognize(image) -> str:
"""
通用文本识别
:param image: 影像件
:return: 识别文本
"""
# 请求深圳快瞳通用文本识别接口
response = http_client.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/general"),
headers={
"X-RequestId-Header": image["影像件唯一标识"]
}, # 以影像件唯一标识作为请求唯一标识,用于双方联查
data={
"token": authenticator.get_token(
servicer="szkt"
), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["影像件格式"].lstrip(".")};base64,{image["影像件BASE64编码"]}",
},
guid=md5((url + image["影像件唯一标识"]).encode("utf-8"))
.hexdigest()
.upper(),
)
# TODO: 若响应非成功则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise RuntimeError("请求深圳快瞳通用文本识别接口发生异常")
boxes = []
for box in response["data"]:
# noinspection PyTypeChecker
boxes.append(
[
numpy.float64(
box["itemPolygon"]["x"]
), # 文本标注框左上角的X坐标
numpy.float64(
box["itemPolygon"]["y"]
), # 文本标注框左上角的Y坐标
numpy.float64(
box["itemPolygon"]["height"]
), # 文本标注框左上角的高度
box["value"], # 文本标注框的文本
]
)
# 按照文本标注框的Y坐标升序先从上到下
boxes.sort(key=lambda x: x[1])
rows = []
for idx, box in enumerate(boxes[1:]):
if idx == 0:
row = [boxes[0]]
continue
# 若文本标注框的Y坐标与当前行的最后一个文本标注框的Y坐标差值小于阈值则归为同一行否则结束当前行分行
if box[1] - row[-1][1] < row[-1][2] * 0.5:
row.append(box)
else:
rows.append(row)
row = [box]
rows.append(row)
boxes = []
for row in rows:
boxes.extend(
[re.sub(r"\s", "", x[3]) for x in sorted(row, key=lambda x: x[0])]
) # 按照文本标注框的X坐标升序再从左到右并去除所有空字符
return "\n".join(boxes) # 整合
# TODO: 后续添加居民身份证(国徽面)和居民身份证(头像面)合并
# noinspection PyShadowingNames
def identity_card_recognize(image, insurance_company, insurer_company) -> None:
@ -631,7 +531,6 @@ if __name__ == "__main__":
.hexdigest()
.upper(),
)
# 若响应非成功则抛出异常
# TODO: 若响应非成功则流转至人工处理
if not (response.get("status") == 200 and response.get("code") == 0):
raise RuntimeError("请求深圳快瞳居民身份证识别接口发生异常")
@ -643,15 +542,14 @@ if __name__ == "__main__":
# noinspection PyTypeChecker
dossier["出险人层"].update(
{
"有效期起": parse(
(period := response["data"]["validDate"].split("-"))[0]
).strftime(
"%Y-%m-%d"
), # 就有效期限解析为有效期起和有效期止。其中若有效期止为长期则默认为9999-12-31
"有效期止": (
datetime(9999, 12, 31).strftime("%Y-%m-%d")
"有效起期": datetime.strptime(
(period := response["data"]["validDate"].split("-"))[0],
"%Y.%m.%d",
), # 就有效期限解析为有效起期和有效止期。其中若有效止期为长期则默认为9999-12-31
"有效止期": (
datetime(9999, 12, 31)
if period[1] == "长期"
else parse(period[1]).strftime("%Y-%m-%d")
else datetime.strptime(period[1], "%Y.%m.%d")
),
}
)
@ -663,9 +561,9 @@ if __name__ == "__main__":
# noinspection PyTypeChecker
dossier["出险人层"].update(
{
"姓名": response["data"]["name"],
"证件类型": "居民身份证",
"证件号码": response["data"]["idNo"],
"姓名": (insured_person := response["data"]["name"]),
"证件类型": (identity_type := "居民身份证"),
"证件号码": (indentity_number := response["data"]["idNo"]),
"性别": response["data"]["sex"],
"出生": datetime.strptime(
response["data"]["birthday"], "%Y-%m-%d"
@ -681,63 +579,83 @@ if __name__ == "__main__":
}
)
# 查询
print(dossier["报案层"])
print(1)
exit()
# 查询并获取多条被保人记录
insured_person_records = master_data.query_insured_person_records(
insurance_company,
insurer_company,
insured_person, # 出险人和被保人为同一人,视角不同:出险人为理赔,被保人为承保/保全
identity_type,
indentity_number,
)
# TODO: 若查询并获取多条被保人记录发生异常则流转至项目运营岗人工处理
if insured_person_records is None:
raise RuntimeError("查询并获取多条被保人记录发生异常")
dossier["被保人层"] = insured_person_records
# noinspection PyShadowingNames
def bank_card_recognize(image_guid, image_format, image_base64) -> None:
def application_recognize(image, insurer_company) -> None:
"""
银行卡识别并整合至赔案档案
:param image_guid: 影像件唯一标识
:param image_format: 影像件格式
:param image_base64: 影像件BASE64编码
理赔申请书识别并整合至赔案档案
:param image: 影像件
:param insurer_company: 保险分公司
:return:
"""
# 请求深圳快瞳居民身份证识别接口
response = http_client.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/bankCard"),
headers={"X-RequestId-Header": image_guid},
data={
"token": authenticator.get_token(
servicer="szkt"
), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}",
},
guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(),
)
# 若响应非成功或银行卡类型非借记卡则抛出异常
# TODO: 若响应非成功则流转至人工处理
if not (
response.get("status") == 200
and response.get("code") == 0
and response.get("data", {}).get("bankCardType") == 1
):
raise RuntimeError(
"请求深圳快瞳居民身份证识别接口发生异常或已识别非借记卡"
# noinspection PyShadowingNames
def boc_application_recognize(image: str) -> str:
"""
中银保险有限公司-理赔申请书识别并整合至赔案档案
:param image: 影像件
:return:
"""
# 方法1先使用深圳快瞳通用文本识别再使用硅基流动中大语言模型结构化可行但是需要请求二次
# 方法2使用硅基流动中支持OCR的大语言模型
# 请求硅基流动的大语言模型接口
response = http_client.post(
url="https://api.siliconflow.cn/v1/chat/completions",
headers={
"Authorization": "Bearer sk-xsnuwirjjphhfdbvznfdfjqlinfdlrnlxuhkbbqynfnbhiqz",
"Content-Type": "application/json; charset=utf-8",
},
json={
"model": "deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", # 通过从DeepSeek-R1-0528模型蒸馏思维链接至Qwen3-8B-Base获得的模型
"messages": [{"role": "user", "content": ""}],
"max_tokens": 10240, # 生成文本最大令牌数
"temperature": 0.2,
"top_p": 0.5,
"top_k": 20,
"frequency_penalty": 0.0,
"thinking_budget": 1,
},
guid=md5(prompt.encode("utf-8")).hexdigest().upper(),
)
# noinspection PyTypeChecker
dossier["受益人层"].update(
{
"开户行": response["data"]["bankInfo"],
"户名": None,
"户号": response["data"]["cardNo"].replace(" ", ""),
}
)
recognition = (
json.loads(match.group("json"))
if (
match := re.search(
r"```json\s*(?P<json>\{.*})\s*```",
response["choices"][0]["message"]["content"],
re.DOTALL,
)
)
else None
)
print(recognition)
exit()
# 根据保险分公司匹配结构化识别文本方法
match insurer_company:
case _ if insurer_company.startswith("中银保险有限公司"):
boc_application_recognize(image)
# noinspection PyShadowingNames
def receipt_recognize(
image_index, image_guid, image_format, image_base64, image_type
) -> None:
def receipt_recognize(image) -> None:
"""
票据识别并整合至赔案档案
:param image_index: 影像件编号
:param image_guid: 影像件唯一标识
:param image_format: 影像件格式
:param image_base64: 影像件BASE64编码
:param image_type: 影像件类型
:param image: 影像件
:return:
"""
# 初始化票据数据
@ -1425,6 +1343,44 @@ if __name__ == "__main__":
except:
return None
# noinspection PyShadowingNames
def bank_card_recognize(image) -> None:
"""
银行卡识别并整合至赔案档案
:param image: 影像件
:return:
"""
# 请求深圳快瞳银行卡识别接口
response = http_client.post(
url=(url := "https://ai.inspirvision.cn/s/api/ocr/bankCard"),
headers={"X-RequestId-Header": image["影像件唯一标识"]},
data={
"token": authenticator.get_token(
servicer="szkt"
), # 获取深圳快瞳访问令牌
"imgBase64": f"data:image/{image["影像件格式"].lstrip(".")};base64,{image["影像件BASE64编码"]}",
},
guid=md5((url + image["影像件唯一标识"]).encode("utf-8"))
.hexdigest()
.upper(),
)
# TODO: 若响应非成功则流转至人工处理
if not (
response.get("status") == 200
and response.get("code") == 0
and response.get("data", {}).get("bankCardType")
== 1 # # 实际作业亦仅支持借记卡
):
raise RuntimeError("请求深圳快瞳银行卡识别接口发生异常或非借记卡")
# noinspection PyTypeChecker
dossier["受益人层"].update(
{
"开户行": response["data"]["bankInfo"],
"户名": None,
"户号": response["data"]["cardNo"].replace(" ", ""),
}
)
# 影像件识别使能检查,若影像件不识别则跳过
if not recognition_enable.evaluate(
{
@ -1447,13 +1403,16 @@ if __name__ == "__main__":
# TODO: 后续添加居民户口簿识别和整合方法
case "中国港澳台地区及境外护照":
raise RuntimeError("暂不支持中国港澳台地区及境外护照")
case "银行卡":
# 银行卡识别并整合至赔案档案
bank_card_recognize(image_guid, image_format, image_base64)
# TODO: 暂仅支持增值税发票识别且购药及就医类型为药店购药整合至赔案档案,后续逐步添加
case "理赔申请书":
application_recognize(image, insurer_company)
case "增值税发票" | "医疗门诊收费票据" | "医疗住院收费票据":
# 票据识别并整合至赔案档案
receipt_recognize(image_guid, image_format, image_base64, image_type)
# receipt_recognize(image)
pass
case "银行卡":
# 银行卡识别并整合至赔案档案
bank_card_recognize(image)
# 遍历工作目录中赔案目录并创建赔案档案(模拟自动化域就待自动化任务创建理赔档案)
for case_path in [x for x in directory_path.iterdir() if x.is_dir()]:
@ -1473,6 +1432,7 @@ if __name__ == "__main__":
},
"影像件层": [],
"出险人层": {},
"被保人层": [],
"受益人层": {},
}