日常更新

from NUC
This commit is contained in:
liubiren 2025-12-22 22:14:59 +08:00
parent a2e03510f7
commit 27a1b5e07b
7 changed files with 55745 additions and 30227 deletions

View File

@ -117,7 +117,7 @@ class SQLiteClient:
)
# 开启行映射,支持按照字段名取值
self.threads.connection.row_factory = sqlite3.Row
# 为当前线程创建数据库游标
# 为当前线程创建游标
self.threads.cursor = self.threads.connection.cursor()
except Exception as exception:
self.threads.connection = None
@ -128,16 +128,16 @@ class SQLiteClient:
def _disconnect(self) -> None:
"""为当前线程关闭数据库连接和游标"""
# 检查当前线程有数据库游标,若有则关闭数据库游标
# 检查当前线程有游标,若有则关闭游标
if hasattr(self.threads, "cursor") and self.threads.cursor is not None:
# noinspection PyBroadException
try:
# 为当前线程关闭数据库游标
# 为当前线程关闭游标
self.threads.cursor.close()
self.threads.cursor = None
except Exception as exception:
raise RuntimeError(
f"为当前线程关闭数据库游标发生异常,{str(exception)}"
f"为当前线程关闭游标发生异常,{str(exception)}"
) from exception
# 检查当前线程有数据库连接,若有则关闭数据库连接
@ -167,9 +167,9 @@ class SQLiteClient:
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无数据库游标,若无则抛出异常
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建数据库游标发生异常")
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
@ -201,9 +201,9 @@ class SQLiteClient:
try:
# 为当前线程创建数据库连接和游标
self._connect()
# 检查当前线程无数据库游标,若无则抛出异常
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建数据库游标发生异常")
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)
@ -230,9 +230,9 @@ class SQLiteClient:
"""
try:
self._connect()
# 检查当前线程无数据库游标,若无则抛出异常
# 检查当前线程无游标,若无则抛出异常
if not hasattr(self.threads, "cursor") or self.threads.cursor is None:
raise RuntimeError("为当前线程创建数据库游标发生异常")
raise RuntimeError("为当前线程创建游标发生异常")
# 为当前线程执行SQL
self.threads.cursor.execute(sql, parameters)

View File

@ -0,0 +1,55 @@
import csv
import chardet
def convert_csv_to_utf8(input_csv: str, output_csv: str, encoding: str = None):
"""
将非UTF-8编码的CSV文件转换为UTF-8编码
:param input_csv: 输入CSV文件路径
:param output_csv: 输出UTF-8编码的CSV文件路径
:param encoding: 手动指定输入文件编码"gbk"若为None则自动检测
"""
# 步骤1检测输入文件的原始编码若未手动指定
if not encoding:
print(f"🔍 正在检测 {input_csv} 的编码...")
with open(input_csv, "rb") as f:
raw_data = f.read(10240) # 读取前10KB数据用于检测足够识别编码
result = chardet.detect(raw_data)
encoding = result["encoding"]
confidence = result["confidence"]
print(f"✅ 检测到编码:{encoding}(置信度:{confidence:.2f}")
# 处理chardet检测结果为空的情况兜底用gbk适配中文常见编码
if not encoding:
encoding = "gbk"
print(f"⚠️ 编码检测失败,兜底使用 {encoding}")
# 步骤2按原始编码读取CSV并转换为UTF-8保存
try:
# 读取原始CSV处理编码错误replace表示用<E7A4BA>替换无法解码的字符避免程序崩溃
with open(input_csv, "r", encoding=encoding, errors="replace") as infile:
# 兼容CSV的不同分隔符默认逗号若为制表符可改delimiter='\t'
reader = csv.reader(infile)
rows = list(reader) # 读取所有行
# 保存为UTF-8编码的CSVnewline=''避免空行encoding='utf-8-sig'带BOM适配Excel打开
with open(output_csv, "w", encoding="utf-8-sig", newline="") as outfile:
writer = csv.writer(outfile)
writer.writerows(rows)
print(f"✅ 转换完成UTF-8编码文件已保存至{output_csv}")
return True
except Exception as e:
print(f"❌ 转换失败:{str(e)}")
return False
# ========== 示例调用 ==========
if __name__ == "__main__":
# 输入/输出文件路径(替换为你的实际路径)
input_path = "转换后_UTF8.csv"
output_path = "转换后_UTF8.csv"
# 自动检测编码并转换
convert_csv_to_utf8(input_path, output_path)

View File

@ -56,3 +56,67 @@ def general_text_recognize(image) -> str:
[re.sub(r"\s", "", x[3]) for x in sorted(line, key=lambda x: x[0])]
) # 按照文本块的X坐标升序从左到右并去除文本块的文本内容中所有空字符
return "\n".join(blocks)
class JiojioTokenizer:
"""中文分词器"""
def __init__(self):
# 初始化jiojio分词器
# noinspection PyBroadException
try:
jiojio.init()
except:
raise RuntimeError("初始化jiojio分词器发生异常")
# noinspection PyShadowingNames
@staticmethod
def callback(text: str, flags: int, cursor) -> None:
"""
分词回调函数
:param text: 待分词文本
:param flags: FTS5分词场景标记位
:param cursor: FTS5分词回传游标
return
"""
if not text or not isinstance(text, str):
return
tokens = []
begin_idx = 0 # 当前分词开始索引
for word in jiojio.cut(text):
if word.strip() == "":
begin_idx += len(word)
continue
tokens.append(
(word, begin_idx, end_idx := begin_idx + len(word))
) # SQLite FTS5要求回传分词语音文本开始和结束索引
begin_idx = end_idx
for token, begin_idx, end_idx in tokens:
cursor.send((token, begin_idx, end_idx))
# 实例化jiojio分词器
self.threads.jiojio_tokenizer = self.JiojioTokenizer()
# 创建分词器方法
def create_tokenizer_module(tokenizer):
class JiojioTokenizerModule:
"""创建jiojio分词器方法"""
# noinspection PyShadowingNames
@staticmethod
def tokenize(text: str, flags: int, cursor) -> None:
tokenizer.callback(text, flags, cursor)
return JiojioTokenizerModule()
self.threads.connection.create_module(
"jiojio_fts5_module",
create_tokenizer_module(self.threads.jiojio_tokenizer),
)
self.threads.connection.execute(
"""
CREATE VIRTUAL TABLE IF NOT EXISTS jiojio_tokenizer USING fts5tokenizer(jiojio_fts5_module)
"""
)

Binary file not shown.

View File

@ -15,7 +15,6 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import cv2
import jiojio
import numpy
import pandas
from fuzzywuzzy import fuzz
@ -67,6 +66,39 @@ if __name__ == "__main__":
try:
with self:
# 初始化在保被保险人表TPA作业系统包括团单、个单和被保险人表此处直接整合为宽表
self._execute(
sql="""
CREATE TABLE IF NOT EXISTS insured_person_policies
(
--被保险人
insured_person TEXT NOT NULL,
--被保险人的证件类型
identity_type TEXT NOT NULL,
--被保险人的证件号码
identity_number TEXT NOT NULL,
--与主被保险人关系包括本人和附属配偶父母和子女
relationship TEXT NOT NULL,
--个单号
person_policy TEXT NOT NULL,
--主被保险人
master_insured_person TEXT NOT NULL,
--保险起期取个单和团单起期最大值
commencement_date REAL NOT NULL,
--保险止期取个单和团单止期最小值
termination_date REAL NOT NULL,
--团单号
group_policy TEXT NOT NULL,
--投保公司
insurance_company TEXT NOT NULL,
--保险分公司
insurer_company TEXT NOT NULL,
--联合主键投保公司+保险分公司+被保险人+被保险人的证件类型+被保险人的证件号码
PRIMARY KEY (insurance_company, insurer_company, insured_person, identity_type,
identity_number)
)
"""
)
# 初始化购药及就医机构表
self._execute(
sql="""
@ -83,45 +115,19 @@ if __name__ == "__main__":
)
"""
)
# 初始化在保被保人表TPA作业系统包括团单、个单和被保人表此处直接整合为宽表
# 初始化药品表
self._execute(
sql="""
CREATE TABLE IF NOT EXISTS insured_person_policies
CREATE TABLE IF NOT EXISTS drugs
(
--被保人
insured_person TEXT NOT NULL,
--被保人的证件类型
identity_type TEXT NOT NULL,
--被保人的证件号码
identity_number TEXT NOT NULL,
--与主被保人关系包括本人和附属配偶父母和子女
relationship TEXT NOT NULL,
--个单号
person_policy TEXT NOT NULL,
--主被保人
master_insured_person TEXT NOT NULL,
--保险起期取个单和团单起期最大值
commencement_date REAL NOT NULL,
--保险止期取个单和团单止期最小值
termination_date REAL NOT NULL,
--团单号
group_policy TEXT NOT NULL,
--投保公司
insurance_company TEXT NOT NULL,
--保险分公司
insurer_company TEXT NOT NULL,
--联合主键投保公司+保险分公司+被保人+被保人的证件类型+被保人的证件号码
PRIMARY KEY (insurance_company, insurer_company, insured_person, identity_type,
identity_number)
--药品
drug TEXT PRIMARY KEY
)
"""
)
except Exception as exception:
raise RuntimeError(
f"初始化数据库发生异常:{str(exception)}"
) from exception
raise RuntimeError(f"初始化数据库发生异常:{str(exception)}")
# noinspection PyShadowingNames
def query_institution_type(self, institution: str) -> Optional[str]:
@ -142,14 +148,12 @@ if __name__ == "__main__":
""",
parameters=(institution,),
)
# TODO: 若购药及就医机构类型为空值则流转至主数据人工处理
if result is None:
raise RuntimeError("购药及就医机构类型为空值")
if result:
return result["institution_type"]
except Exception as exception:
raise RuntimeError(
"查询并获取单条购药及就医机构类型发生异常"
) from exception
raise
# TODO: 若购药及就医机构类型为空值则流转至主数据人工处理
except Exception:
raise RuntimeError("查询并获取单条购药及就医机构类型发生异常")
# noinspection PyShadowingNames
def query_insured_person_records(
@ -161,13 +165,13 @@ if __name__ == "__main__":
identity_number: str,
) -> Optional[List[Dict[str, Any]]]:
"""
查询并获取多条被保人记录例如若夫妻同在投保公司则互为附加被保一方被保人记录包括本人和配偶两条
查询并获取多条被保人记录例如若夫妻同在投保公司则互为附加被保一方被保人记录包括本人和配偶两条
:param insurance_company: 投保公司
:param insurer_company: 保险分公司
:param insured_person: 被保
:param identity_type: 被保人的证件类型
:param identity_number: 被保人的证件号码
:return: 被保人记录
:param insured_person: 被保
:param identity_type: 被保人的证件类型
:param identity_number: 被保人的证件号码
:return: 被保人记录
"""
# noinspection PyBroadException
try:
@ -175,10 +179,10 @@ if __name__ == "__main__":
# noinspection SqlResolve
result = self._query_all(
sql="""
SELECT insured_person AS "被保",
relationship AS "与主被保人关系",
SELECT insured_person AS "被保",
relationship AS "与主被保人关系",
person_policy AS "个单号",
master_insured_person AS "主被保",
master_insured_person AS "主被保",
commencement_date AS "保险起期",
termination_date AS "保险止期"
FROM insured_person_policies
@ -196,10 +200,8 @@ if __name__ == "__main__":
identity_number,
),
)
return (
None
if result == []
else [
if result:
return [
{
k: (
datetime.fromtimestamp(v)
@ -210,12 +212,41 @@ if __name__ == "__main__":
}
for e in result
] # 将保险起期和保险止期由时间戳转为datetime对象
)
raise
# TODO: 若查询并获取多条个单和被保险人记录发生异常则流转至主数据人工处理
except Exception:
raise RuntimeError("查询并获取多条个单和被保险人记录发生异常")
except Exception as exception:
raise RuntimeError(
"查询并获取多条个单和被保人记录发生异常"
) from exception
# noinspection PyShadowingNames
def query_drug(
self,
content: str,
) -> Optional[str]:
"""
根据明细项具体内容查询药品
:param content: 明细项具体内容
:return: 药品
"""
# noinspection PyBroadException
try:
with self:
# noinspection SqlResolve
result = self._query_all(
sql="""
SELECT drug
FROM drugs
WHERE ? LIKE '%' || drug || '%'
""",
parameters=(content,),
)
if result:
return max(result, key=lambda x: len(x["drug"]))[
"drug"
] # 仅返回最大长度的药品
raise
# TODO: 若根据明细项具体内容查询药品发生异常则流转至主数据人工处理
except Exception:
raise RuntimeError("根据明细项具体内容查询药品发生异常")
# 实例化主数据
master_data = MasterData()
@ -537,18 +568,14 @@ if __name__ == "__main__":
}
)
# 查询并获取多条被保人记录
insured_person_records = master_data.query_insured_person_records(
# 查询并获取多条被保人记录
dossier["被保险人层"] = master_data.query_insured_person_records(
insurance_company,
insurer_company,
insured_person, # 出险人和被保人为同一人,视角不同:出险人为理赔,被保人为承保/保全
insured_person, # 出险人和被保人为同一人,视角不同:出险人为理赔,被保人为承保/保全
identity_type,
indentity_number,
)
# TODO: 若查询并获取多条被保人记录发生异常则流转至项目运营岗人工处理
if insured_person_records is None:
raise RuntimeError("查询并获取多条被保人记录发生异常")
dossier["被保人层"] = insured_person_records
# noinspection PyShadowingNames
def application_recognize(image, insurer_company) -> None:
@ -1268,27 +1295,19 @@ if __name__ == "__main__":
)
for item in items:
# 解析明细项大类名称和具体名称
# 匹配并解析明细项大类和具体内容
if match := re.match(
r"^\*(?P<category_name>.*?)\*(?P<specific_name>.*)$",
r"^\*(?P<category>.*?)\*(?P<content>.*)$",
item["名称"],
):
category_name = match.group(
"category_name"
) # 明细项大类名称
specific_name = match.group(
"specific_name"
) # 明细项具体名称
category = match.group("category") # 明细项大类
# 根据明细项具体内容查询药品
drug = master_data.query_drug(match.group("content"))
# TODO: 若匹配明细项大类和具体内容发生异常则流转至人工处理
else:
pass
raise RuntimeError("匹配明细项大类和具体内容发生异常")
jiojio.init()
print(jiojio.cut(specific_name))
print()
print(specific_name)
print(dossier["被保险人层"])
exit()
case ("增值税发票", "私立医院"):
@ -1388,8 +1407,9 @@ if __name__ == "__main__":
},
"影像件层": [],
"出险人层": {},
"被保人层": [],
"被保人层": [],
"受益人层": {},
"费项层": [],
}
# 遍历赔案目录中影像件
@ -1446,7 +1466,7 @@ if __name__ == "__main__":
"银行卡",
"其它",
].index(x["影像件类型"])
) # 优先居民户口簿、居民身份证、中国港澳台地区及境外护照和理赔申请书以查询被保人信息
) # 优先居民户口簿、居民身份证、中国港澳台地区及境外护照和理赔申请书以查询被保人信息
# 遍历影像件层中影像件
for image in dossier["影像件层"]:

File diff suppressed because it is too large Load Diff