From 0110be4b168e8d07ef7d67ae00ff2e99d7f512a9 Mon Sep 17 00:00:00 2001 From: liubiren Date: Thu, 8 Jan 2026 22:15:07 +0800 Subject: [PATCH] 1 --- .isort.cfg | 3 +- database.db | Bin 69632 -> 0 bytes utils/algorithms.py | 24 +- utils/authenticator.py | 13 +- utils/feishu.py | 9 +- utils/logger.py | 7 +- utils/mysql.py | 4 +- utils/request.py | 4 +- utils/restrict.py | 4 +- utils/rules_engine.py | 7 +- utils/sqlite.py | 4 +- 短视频合成自动化/draft.py | 3 +- 短视频合成自动化/edgetts.py | 6 +- 短视频合成自动化/export.py | 3 +- 短视频合成自动化/main.py | 2 +- 票据理赔自动化/case.py | 117 ++++++ 票据理赔自动化/common.py | 17 +- 票据理赔自动化/database.db | Bin 85553152 -> 85553152 bytes 票据理赔自动化/image.py | 733 ++++++++++++++++++----------------- 票据理赔自动化/main.py | 192 +++------ 票据理赔自动化/masterdata.py | 74 ++-- 21 files changed, 635 insertions(+), 591 deletions(-) delete mode 100644 database.db create mode 100644 票据理赔自动化/case.py diff --git a/.isort.cfg b/.isort.cfg index 53202d5..eb008e9 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,5 +1,4 @@ [settings] -order_by_type = true # 按照标准库、第三方库和本地模块分组 -alphabetical = true # 同组按照字母序排序 +order_by_type = true multi_line_output = 3 indent = " " \ No newline at end of file diff --git a/database.db b/database.db deleted file mode 100644 index 3d0bb31fe86de433c13c262bf00b1949d3011f99..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 69632 zcmeI3-ER|D8piFAj3E$0rAofU+K5%XX?BHBRn!WxY2^z<3MHg!>CI?jPr|hJ*o?=h zxu_Obbv{T;TnGzQ!&fQ@pU>ad=bb+$GNVy8$}yuHsfA;c zhQqz@4u=LuLV=ONcZNa%!-ejE#pgB4E9wIS1C{k}rBX>=$V-JQ%KfQIe$w`V@e~)S zR~iX@G!l4!_|$uY!xsXlLl<7Rtzla-mn}bfYI|#~eG>*nP3+9sk-(V`hK8J!c}^rNA<3D~V5U%em9jm89o) z?p5B)xJ(HZJBLv^rOZiXrS!Iq|43IoPA2k&A zO@7?v^Y{0A=RVc)1;-~-0vn-qJCb!p%9J8CHby>JZmY06DMeKBEjmZqzNH;ezSZ( zZQmj_SE)jLDmKars+21ssc=mylqyDynYHp->9?QerD(MB~-b0db z$qlwPIEJL<)4>xIbi_O9r>Fjdp9RHC=aG>n>sGUQ;B83U6UdT&@E6V+;N`BJzf$rMT1blm6B9gmWsv7 z)N*D0$8v76{QWH@zaafQYpa@Ig=Cy(qI8mtj?n@ma`7frrPA$&Rrxqi$JhuLCc?UA zz@4g`n^wgdZKc36YAcCPZ_ByU)0L#>cjWBNh8>lOe3D9vbRr(*!V|Rawr+dWjh^0@ zCOwm!r6UC2*h~3O^;u<(S`3gpKp!;}_Dz1=<@5LVd*?pY@&(5yQvw^Ibvu%EMaq;n z>MmNE=9oK{z9YS&MOd-nj(JH2(LlLLAV%FsH{@K_vI%NHo13eJxT$NLvPdPLmP#At zXC?WreLQO^gcB!daU#L$siX-Zd498eKW*P4HCL%Zd@44|3aXSVA*papDwHZljG49a zTIsi+=H$7IGP|NIu8?&0%awF_dPB))$-!18E2lT)>vQsIPJXynUY)VEP++5KI^&ZU zxr9!_t%b{V(MN16FFclt*DC1+<=#V*aLEm}HaLc)<$i$YvqO=%tUKY$ z^zG4NbLMj`HjRdmIIb&pnqsr=qNRyWv8nUfMO3;C%BtY}rlrZM9~hgb59{t6S4%gQ zA2yZ6d$u~^cr3>9VV3-ebc7LEm+q(OD630OIwC8?I9_vgUB;b`{LPZRo0A)q2h+Aq z9T(!ML^A`=iB8II?c~Ou(gvTp6J42uUY|c0^v-;!Ik~|Ma=W?IVRCWvHOsa+dH7Ct zmQ|^qw{zTa>)};F2M;&2uKM2Egon%1lX7N7o-dHG&FL*!Pd);fW5f~+uWB|S_~!{J z%T5ly-9h=!2Cb5|pI-%3@Z&`oOX-bp)&_>f?i9zGeAT@aRFxA3AK6Qfn2GH{k8VSHAqqexLuy5%09G zHZLVJm6SFYAS2cG@j(*|;}uJ59C@s-sz~Oq%1@sWD9U0{x%;JMZ7LCitXiy&oCY$E zQ^S`@HZoIuK|-G_x*D>TnJe<*y(9-{!}RW#TmQPx=N}mGX8)zRY5j6+eYxA@ zt~;)p7#IppE~_q|EZvdkZ_8I7?_#c21Mk9fbxqw>UrhUFo~z4j*XQ&lH>XLyzL;%T z?ND28@&Kp!skVG3yEXL{0b%JdYQ>}MSX`*Bpwmn&o+9hDO{g^>Rc%;RLlq~VCZy!t zwFcCfF*1v+?hMr0)r^{QYh&xyx-G+6k8!aVX`vY@my5AgS<_;yS=Q3(W|v2?ouYfK z;kUMWY|Pm1oEqCH2AtJHY^OA|73Xq2(zw`E-M%ros&&3NLD{Wpkyi`jHp$MVTm1ij zn@_V~9teN{2!H?xfB*=900@8p2!H?xw15Eq|GxzgQ3VKq00@8p2!H?xfB*=900@8p z2(&o??EklU2f;iL009sH0T2KI5C8!X009sH0T5^b0qp;`03xaY0T2KI5C8!X009sH O0T2KI5CDNTC-8qenn2b7 diff --git a/utils/algorithms.py b/utils/algorithms.py index 9fd7b5d..6f7fca9 100644 --- a/utils/algorithms.py +++ b/utils/algorithms.py @@ -1,23 +1,15 @@ # -*- coding: utf-8 -*- - -''' - -自定义计算相关模块 - -''' - -#加载模块 - -from distance import levenshtein - -import numpy - -import pandas - -from sklearn.cluster import KMeans +""" +算法模块 +""" import warnings +import numpy +import pandas +from distance import levenshtein +from sklearn.cluster import KMeans + warnings.simplefilter('ignore') ''' diff --git a/utils/authenticator.py b/utils/authenticator.py index d6425a9..e376b38 100644 --- a/utils/authenticator.py +++ b/utils/authenticator.py @@ -6,10 +6,10 @@ import hashlib import hmac import json -from pathlib import Path import sys import threading import time +from pathlib import Path from typing import Optional, Tuple sys.path.append(Path(__file__).parent.as_posix()) @@ -19,7 +19,7 @@ from request import Request class Authenticator: """ 认证器,支持: - 1、 + get_token:获取访问令牌 """ def __init__(self): @@ -53,12 +53,11 @@ class Authenticator: certifications = json.load(file) # 获取指定服务商的访问凭证 certification = certifications.get(servicer) - # 若指定服务商的访问凭证非空则解析访问令牌和失效时间戳 if certification: - # 访问令牌 - token = certification["token"] - # 失效时间戳 - expired_timestamp = certification["expired_timestamp"] + token = certification["token"] # 访问令牌 + expired_timestamp = certification[ + "expired_timestamp" + ] # 失效时间戳 except json.decoder.JSONDecodeError: with open(self.certifications_path, "w", encoding="utf-8") as file: diff --git a/utils/feishu.py b/utils/feishu.py index 701cbd6..54658c3 100644 --- a/utils/feishu.py +++ b/utils/feishu.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- - -# 导入模块 +""" +飞书客户端模块 +""" import re import time @@ -10,6 +11,8 @@ from email.utils import parsedate_to_datetime from imaplib import IMAP4_SSL import pandas +from request import Request +from authenticator import Authenticator class Feishu: @@ -17,7 +20,7 @@ class Feishu: def __init__(self): self.authenticator = Authenticator() - self.http_client = HTTPClient() + self.http_client = Request() def _headers(self): """请求头""" diff --git a/utils/logger.py b/utils/logger.py index a377ca6..7ca8b8c 100644 --- a/utils/logger.py +++ b/utils/logger.py @@ -1,13 +1,8 @@ # -*- coding: utf-8 -*- - """ - -基于LOGGING封装日志记录器 - +日志模块 """ -# 加载模块 - import logging from logging.handlers import RotatingFileHandler diff --git a/utils/mysql.py b/utils/mysql.py index 7545ab1..81f0bb2 100644 --- a/utils/mysql.py +++ b/utils/mysql.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- +""" +MySQL客户端模块 +""" -# 导入模块 from urllib.parse import quote_plus import pandas diff --git a/utils/request.py b/utils/request.py index 254a047..7fb6a65 100644 --- a/utils/request.py +++ b/utils/request.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- """ -请求客户端 +请求客户端模块 """ import json -from pathlib import Path import sys import time +from pathlib import Path from typing import Any, Dict, Generator, Literal, Optional, Tuple, Union from xml.etree import ElementTree diff --git a/utils/restrict.py b/utils/restrict.py index 99269e7..b78e329 100644 --- a/utils/restrict.py +++ b/utils/restrict.py @@ -1,11 +1,11 @@ # -*- coding: utf-8 -*- """ -请求限速器 +请求限速器模块 """ -from functools import wraps import threading import time +from functools import wraps from typing import Callable, Tuple diff --git a/utils/rules_engine.py b/utils/rules_engine.py index fc2ab5f..fb343b3 100644 --- a/utils/rules_engine.py +++ b/utils/rules_engine.py @@ -10,6 +10,7 @@ from typing import Any, Dict, Union from zen import ZenDecision, ZenEngine + class RulesEngine: """ 规则引擎,支持: @@ -45,7 +46,7 @@ class RulesEngine: def evaluate(self, decision: str, inputs: Dict[str, Any]) -> Dict[str, Any]: """ - 调用并返回评估结果 + 调用决策并返回评估结果 :param decision: 决策名称 :param inputs: 待评估对象 :return: 评估结果 @@ -70,6 +71,8 @@ class RulesEngine: case list(): return [self._formatter(i) for i in inputs] case dict(): - return {key: self._formatter(value) for key, value in inputs.items()} # 递归格式化 + return { + key: self._formatter(value) for key, value in inputs.items() + } # 递归格式化 case _: return inputs diff --git a/utils/sqlite.py b/utils/sqlite.py index 1113839..6a13b8e 100644 --- a/utils/sqlite.py +++ b/utils/sqlite.py @@ -17,7 +17,7 @@ class SQLite: execute:根据SQL语句执行操作 """ - def __init__(self, database: Union[str, Path]): + def __init__(self, database: Path): """ 初始化 :param database: 数据库地址 @@ -131,7 +131,7 @@ class SQLite: try: # 为当前线程创建数据库连接 self.threads.connection = sqlite3.connect( - database=self.database, + database=self.database.as_posix(), check_same_thread=True, timeout=30, # 数据库锁超时时间(单位:秒),默认为30秒,避免并发锁死 ) diff --git a/短视频合成自动化/draft.py b/短视频合成自动化/draft.py index f4d66e0..b9cb542 100644 --- a/短视频合成自动化/draft.py +++ b/短视频合成自动化/draft.py @@ -1,13 +1,12 @@ # -*- coding: utf-8 -*- """ -draft模块 +生成草稿模块 """ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union import pyJianYingDraft - from edgetts import EdgeTTS diff --git a/短视频合成自动化/edgetts.py b/短视频合成自动化/edgetts.py index eb95de6..b215d35 100644 --- a/短视频合成自动化/edgetts.py +++ b/短视频合成自动化/edgetts.py @@ -1,13 +1,13 @@ # -*- coding: utf-8 -*- """ -EdgeTTS模块 +合成语音模块 """ import asyncio - +from hashlib import md5 from pathlib import Path from typing import Tuple, Union -from hashlib import md5 + import edge_tts from mutagen.mp3 import MP3 diff --git a/短视频合成自动化/export.py b/短视频合成自动化/export.py index 5c94b34..2a802ae 100644 --- a/短视频合成自动化/export.py +++ b/短视频合成自动化/export.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -export模块 +导出草稿模块 """ import random @@ -12,7 +12,6 @@ from typing import Any, Dict, Optional import pyJianYingDraft import win32con import win32gui - from draft import JianYingDraft diff --git a/短视频合成自动化/main.py b/短视频合成自动化/main.py index 222fd87..0af11ee 100644 --- a/短视频合成自动化/main.py +++ b/短视频合成自动化/main.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -主程序 +主模块 """ from export import JianYingExport diff --git a/票据理赔自动化/case.py b/票据理赔自动化/case.py new file mode 100644 index 0000000..04dfeae --- /dev/null +++ b/票据理赔自动化/case.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +"""通用模块""" + +from typing import Any, Dict, List +from decimal import Decimal, ROUND_HALF_UP + +import pandas +from common import masterdata, rules_engine + + +def case_adjust(dossier: Dict[str, Any]) -> None: + """ + 理算赔案并整合至赔案档案 + :param dossier: 赔案档案 + :return: 无 + """ + # 基于拒付决策规则评估 + if not (result := rules_engine.evaluate(decision="拒付", inputs=dossier)): + raise RuntimeError("该保险分公司未配置拒付规则") + + dossier["adjustment_layer"].update( + { + "conclusion": (conclusion := result["conclusion"]), # 理赔结论 + "explanation": result["explanation"], # 结论说明 + } + ) + if conclusion == "拒付": + return + + # 赔案理算记录 + adjustments = ( + pandas.DataFrame(data=dossier["receipts_layer"]).assing( + adjustments=lambda dataframe: dataframe.apply( + lambda row: receipt_adjust( + row=row, liabilities=dossier["liabilities_layer"] + ), + axis="columns", + ) # 票据理算 + ) + ).explode("adjustments", ignore_index=True) + print(adjustments) + + +def receipt_adjust( + row: pandas.Series, liabilities: List[Dict[str, Any]] +) -> List[Dict[str, Any]]: + """ + 理算票据 + :param row: 一张票据数据 + :param liabilities: 理算责任 + :return: 理算记录 + """ + # 初始化票据理算记录 + adjustments = [] + # 初始化剩余个人自费金额 + remaining_personal_self_payment = row["personal_self_payment"] + # 初始化剩余个人自付金额 + remaining_non_medical_payment = row["non_medical_payment"] + # 初始化剩余合理金额 + remaining_reasonable_amount = row["reasonable_amount"] + + # 出险事故 + accident = row["accident"] + # 出险人 + accident_person = row["payer"] + # 出险日期 + accident_date = row["date"] + + # 查验状态 + verification = row["verification"] + + # 初始化理赔责任理算金额 + adjustment_amount = Decimal("0.00") + # 初始化理赔责任理赔金额 + claim_amount = Decimal("0.00") + + # 遍历所有理赔责任,根据出险事故、出险人、出险日期和查验状态匹配责任 + for liability in liabilities: + if ( + accident == liability["accident"] + and accident_person == liability["insured_person"] + and liability["commencement_date"] + <= accident_date + <= liability["termination_date"] + and verification == "真票" + ): + # 理赔责任理算金额 + adjustment_amount = ( + row["personal_self_payment"] + * liability["personal_self_ratio"] # 个人自费金额 + + row["non_medical_payment"] + * liability["non_medical_ratio"] # 个人自付金额 + + row["reasonable_amount"] * liability["reasonable_ratio"] # 合理金额 + ).quantize( + Decimal("0.00"), + rounding=ROUND_HALF_UP, + ) + + # 据变动保单唯一标识查询最新一条保额变动记录的变动后金额(理赔责任的理赔保单余额) + remaining_amount = masterdata.query_remaining_amount( + policy_guid=liability["policy_guid"], + ) + # 理赔责任理赔金额 + claim_amount = min( + remaining_amount, + adjustment_amount, + ) + + # 初始化票据理算记录 + adjustment = { + "liability": liability["liability"], # 理赔责任名称 + "type": row["就诊类型"], + "amount": row["合理金额"], + "payable": 0.0, + } + + return adjustments diff --git a/票据理赔自动化/common.py b/票据理赔自动化/common.py index 7eb85e6..552b8bc 100644 --- a/票据理赔自动化/common.py +++ b/票据理赔自动化/common.py @@ -1,25 +1,16 @@ # -*- coding: utf-8 -*- +"""通用模块""" -from pathlib import Path import sys +from pathlib import Path from masterdata import MasterData sys.path.append(Path(__file__).parent.parent.as_posix()) from utils.rules_engine import RulesEngine -# 初始化赔案档案(保险公司将提供投保公司、保险分公司和报案时间等,TPA作业系统签收后生成赔案号) -dossier = { - "report_layer": {}, # 报案层 - "images_layer": [], # 影像件层 - "insured_person_layer": {}, # 出险人层 - "insured_persons_layer": [], # 被保险人层 - "receipts_layer": [], # 票据层 - "adjustment_layer": {}, # 理算层 -} - # 实例化主数据 -masterdata = MasterData() +masterdata = MasterData(database=Path(__file__).parent / "database.db") # 实例化规则引擎 -rules_engine = RulesEngine(decisions_folder_path=Path("rules")) +rules_engine = RulesEngine(decisions_folder_path=Path(__file__).parent / "rules") diff --git a/票据理赔自动化/database.db b/票据理赔自动化/database.db index dc3147a54bd13c9e070e15093409034bd8257096..ec40379ebe1c1ea789d2f144e77f41f299b1ba0d 100644 GIT binary patch delta 5881 zcmWmELzGx)6h+~xO2xK2wyh33w(Z>5wsX_5ZQHhO+qTnj(q|9$x6b6BytmdJaPM*I z0cu?QsL28XDoqUtNE|01AXTk^fPh@%(`O5rEFfdAjswjt&@IR?((7Tzs_TST`=Zjs%hxJ7k~<`&&8hFeUxSZ=Z1;<&|ii{}>KErDA?x4+yHxg~Z> z;+E7cnOkzV6mBWqQn{seOXHT-EuC9>w+wC>-7>jlcFW?H)h(M_cDEdEIo)!(<#x;C zme(zx+uv^a-3quBbSvam*sX}$KW;_cin$ecE8$kst(047w=!;J-O9O@cdOu5(XEnO zWw$DBRo$w&Rd=i5R@1GPTWz;GZgt)2xz%@T;MUNsky~T8CT{vxP2HNgHFs;_*3zw& zTWhyAZf)J#xwUue;MUQtlUrxEE^b}jy18|C>*3bZt(RMGw?1xt-TJxpcN^d~&~1?0 zV7DP|?l#ozU$jdUC3Hrj2B+gP`8ZsXl1xJ`7MQZqX1L9C zo8>m!ZI0Vqw|Q>!-4?hlbX(-M*lmg1QnzJp%iUJEt#n)Ew%Tot+gi7EZtLAPxNUUX z~ZI9btw|#E=-43`NbUWmB*zJhhQMY4m$K6i2opd|p zcG~TX+gZ1BZs*-DxLtI+xr!_S)@@+grDHZtvYbxP5f{cZsE8_(imaljs4AL@u41T|Dwc|^;;6VP zo{Fy$sD$b-l}II4NmNpmOeI$-R7#afrB-QFT9rZm%Y z&Z>**s=BG}s)y>Sda2&3kLs)Xss3t!8mI=T!D@(dHB|kphNs z@oIvas3xh&YKoewrm5*_hMK8nso83dnycoi`D%e$s1~WkYKdB^mZ{}xg<7drsnu$Y zTC3Kn^=gCKs5Yt1YKz*cwyEuEhuW!jsoiRi+N<`d{px@^s1B*a>WDh3j;Z77ggU8C zsnhC=I;+m9^Xh`Ss4l6?>WaFmuBq$lhPtV4soUy~x~uM~`|5#us2-`u>WO-)o~h^R zg?g!8sn_a_daK^4_v(ZCs6MIB>Ob{GeO2GoclAU4RKL`3^(Sz+1?{%Lp!sh@2b~Rw zyf5hN-U#mkgAN9T8&)WO=-A1U1&u4Vci4}>pg|#`CvDQEQOmXsI<#-ovPsVd&AYa= zq;o5N4-CpSx5A&mppJW^eF+Ro9~>0E-`qlaP#`!UAh^ibL7E~6LO?KtgisI~!a!IE z2jL+CM1)8X8KOW`hz8Li2E>F|5F6q^T!;tpApsE`pL*ZW-2E${h3L3+pl86gv7hAfa3vO#vp0XZQTtO?IgiWv+w!l`{2HRl=?1Wvg8}`6n*a!RJ033uva2SrjQ8)(2 z;RKw7Q*av2z*#s4=ivfegiCN4uE15e2G`*R+=N?j8}7hexCi&)0X&39@ED%JQ+Ni? z;RU>eSMVC%z*~3+@8JV{gir7p{(~>@6~4iD_yIrR7yO1lA$0zNH3A_BLO?KtgisI~ z!a!IE2jL+CM1)8X8KOW`hz8Li2E>F|5F6q^T!;tpApsE`pL*ZW- z2E$ko~&+50_El`~Uy| delta 5850 zcmWmEQ;--~6h+bQPRF(w=ZXw))-9oyBatrMi#x1N{ zIJfX_5!@oWMRJSm7R4>9TQs-mZZX_qy2Wyf?H0!^u3J2}_-+Z@61x56mdGu!TN1aV zZpqw|yQOeT>6XeZwObmuv~KC#(z|7F%jlNLEwft|x2$g2+_JmnaLehI%PqHC9=E)1 z`P}lm6>ux)R>-ZeTM@USZpGY+yOnS&=~l|Ev|AasvTo(v%DYu?tLRqAt+HDcx2kT{ z+^W0PaI5K7%dNIs9k;*T{&B18R?n@zTLZU-ZjIa;yESpsTQzlS=GNS;gyp#*57S_+d#KLZiC&1 zxVhU-A1^LbQ|S1+HH*6ShsO*O>~>&HrZ{8+f=t{ZqwanxXpB%>)kfEZFJk@w%Kiq z+rMsG-L|>?=eFH#hucoKU2ePG_PFhJ+vm35?SR`sw?l4+-Hx~&bvx#E-0g(hNw-sO zr`^uDopn3scHZrR+eNoaZkOGzxLtL-=62ofhTBcITW+`A?zr7`yXSV_?Sb1vw?}S| z-JZBTb$jOa-0g+iOSe~Uuif6by>)x%_TKG-+ef!gZlB%0xP5i|=Jws~hucrLUv9tM z{*17|F&3zTR0tKULaIZXSS3+ORWg-arBEqVDwSHLQE62=m0o2~8C52gS!GdKRW_AfkrlzYIYNnc{W~(`Bu9~Ojs|9MITBH`MC2FZ!rk1M}YNcAGR;x8?ty-tn zs|{+S+N3tCE$UyjRc%xMsqJcq+NpM_-D;28tM;k=>VP_^4ynWHh&rl{spIN|I;l>n z)9Q>mtInzO>VmqcE~(4vin^+VbNw9;wIbiF&G@spsm2 zdZ}Ki*XoUWtKO;i>Vx{IKB>>@i~6d*sqgBC`l)`Y-|A1`2n*U}fk6x2gbp1oAoAYu z{q{t78yIvTXjj9WLB)diMEepLls-5pAYfkMAAvyy!woMKKXmM5$%4ig+jH%GV9=lt z8r{AST3u*boQeLOh5M2_PZ- z1&JUrB!Q%m43a|%NC~MRHKc*GkPgyA2FM7RATwlvtdI?|Lk`FZxga;>fxM6p@pfXf}s!$E8Lk*}2wV*cCfxqD&s0;O= zJ~V)a&iznJ^1x!yK3k z^I$$KfQ7IK7Q+%)3d>+QtbmoU3Rc4!SPSc5J#2uDun9K97Wfyo!Z!F1w!;qC3Av1fLkdU)=WPz-Z4YETH$O*Y1H{^l5kPq@h z0VoKCpfD7HqEHNqLkTDerJyvFfwE8z%0mUH2$i5RRDr5c4XQ&8s0p>8Hq?Q?;UB0A z^`Jg9fQHZr8bcE>XbR1sIkbS5&tH=>fQ_&THp3S97q-GS_z$+j4%i91 zU^nc6y|54V!vQ!5hu|8E!38&yRoPo1&4$i{`xCocvGF*YHa1E}*4Y&!n z;5OWWyKoQg!vlB-kKi#pfv4~cp2G`x39sNayn(my4&K8D_z0iiGkk%s@D0Ah5BLec z;5YmUq4^8e5ePvL0)inVgo4lz2Esx(2oDh;B1D475Cx(_G>8r{AST3u*boQeLOh5M z2_PZ-1&JUrB!Q%m43a|%NC~MRHKc*GkPgyA2FM7RATwlvtdI?|Lk`FZxga;>fxM6p z@pfXf}s!$E8Lk*}2wV*cCfxqD& zs0;O=J~V)a&iznJ^1x z!yK3k^I$$KfQ7IK7Q+%)3d>+QtbmoU3Rc4!SPSc5J#2uDun9K97Wfyo!Z!F1w!;qC z3A numpy.ndarray: +def image_classify(image_index: int, image_path: Path, dossier: Dict[str, Any]) -> None: """ - 打开并读取影像件 - :param image_path: 影像件路径 - :return: 影像件数据(numpy.ndarray对象) - """ - try: - # 打开并读取影像件(默认转为单通道灰度图) - image_ndarray = cv2.imread(image_path.as_posix(), cv2.IMREAD_GRAYSCALE) - if image_ndarray is None: - raise RuntimeError(f"影像件数据为空") - return image_ndarray - except Exception as exception: - raise RuntimeError(f"打开并读取影像件发生异常:{str(exception)}") from exception - - -def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str: - """ - 生成影像件唯一标识 - :param image_format: 影像件格式 - :param image_ndarray: 影像件数据 - :return: 影像件唯一标识 - """ - success, image_ndarray_encoded = cv2.imencode(image_format, image_ndarray) - if not success or image_ndarray_encoded is None: - raise RuntimeError("编码影像件发生异常") - - # 转为字节流并生成影像件唯一标识 - image_guid = md5(image_ndarray_encoded.tobytes()).hexdigest().upper() - return image_guid - - -def image_compress( - image_format: str, - image_ndarray: numpy.ndarray, - image_size_specified: float = 2.0, -) -> str: - """ - 压缩影像件 - :param image_format: 影像件格式 - :param image_ndarray: 影像件数据 - :param image_size_specified: 指定压缩影像件大小,单位为兆字节(MB) - :return: 压缩后影像件BASE64编码 - """ - # 转为字节 - image_size_specified = image_size_specified * 1024 * 1024 - - # 通过调整影像件质量和尺寸达到压缩影像件目的(先调整影像件质量再调整影像件尺寸) - for quality in range(100, 50, -10): - image_ndarray_copy = image_ndarray.copy() - for _ in range(10): - success, image_ndarray_encoded = cv2.imencode( - image_format, - image_ndarray_copy, - params=( - [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10] - if image_format == "png" - else [cv2.IMWRITE_JPEG_QUALITY, quality] - ), - ) - if not success or image_ndarray_encoded is None: - break - - # 影像件BASE64编码 - image_base64 = b64encode(image_ndarray_encoded.tobytes()).decode("utf-8") - if len(image_base64) <= image_size_specified: - return image_base64 - - image_ndarray_copy = cv2.resize( - image_ndarray_copy, - ( - int(image_ndarray_copy.shape[0] * 0.95), - int(image_ndarray_copy.shape[1] * 0.95), - ), - interpolation=cv2.INTER_AREA, - ) - # 若调整影像件尺寸后宽/高小于350像素则终止循环 - if min(image_ndarray_copy.shape[:2]) < 350: - break - - raise RuntimeError("压缩影像件发生异常") - - -def calculate_age(report_time: datetime, birth_date: datetime) -> int: - """ - 根据报案时间计算周岁 - :param report_time: 报案时间 - :param birth_date: 出生日期 - :return 周岁 - """ - age = report_time.year - birth_date.year - - return ( - age - 1 - if (report_time.month, report_time.day) - < ( - birth_date.month, - birth_date.day, - ) - else age - ) # 若报案时间的月日小于生成日期的月日则前推一年 - - -# TODO: 后续添加居民身份证(国徽面)和居民身份证(头像面)合并 -def identity_card_recognize(image, insurer_company) -> None: - """ - 识别居民身份证并整合至赔案档案 - :param image: 影像件 - :param insurer_company: 保险分公司 - :return: 无 - """ - # 请求深圳快瞳居民身份证识别接口 - response = request.post( - url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"), - headers={ - "X-RequestId-Header": image["image_guid"] - }, # 以影像件唯一标识作为请求唯一标识,用于双方联查 - data={ - "token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌 - "imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符 - }, # 深圳快瞳支持同时识别居民国徽面和头像面 - guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(), - ) - # TODO: 若请求深圳快瞳居民身份证识别接口发生异常则流转至人工处理 - if not (response.get("status") == 200 and response.get("code") == 0): - raise RuntimeError("请求深圳快瞳居民身份证识别接口发生异常") - - if image["image_type"] in [ - "居民身份证(国徽、头像面)", - "居民身份证(头像面)", - ]: - dossier["insured_person_layer"].update( - { - "insured_person": ( - insured_person := response["data"]["name"] - ), # 被保险人 - "identity_type": (identity_type := "居民身份证"), # 证件类型 - "identity_number": ( - indentity_number := response["data"]["idNo"] - ), # 证件号码 - "gender": response["data"]["sex"], # 性别 - "birth_date": ( - birth_date := datetime.strptime( - response["data"]["birthday"], "%Y-%m-%d" - ) - ), # 出生日期,转为日期时间(datetime对象),格式默认为%Y-%m-%d - "age": calculate_age( - dossier["report_layer"]["report_time"], birth_date - ), # 年龄 - "province": ( - residential_address := parse_location(response["data"]["address"]) - ).get( - "province" - ), # 就住址解析为所在省、市、区和详细地址 - "city": residential_address.get("city"), - "district": residential_address.get("county"), - "detailed_address": residential_address.get("detail"), - } - ) - - # 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询个单 - dossier["insured_persons_layer"] = masterdata.query_liabilities( - insurer_company, - insured_person, - identity_type, - indentity_number, - dossier["report_layer"]["report_time"].strftime("%Y-%m-%d"), - ) - - if image["image_type"] in [ - "居民身份证(国徽、头像面)", - "居民身份证(国徽面)", - ]: - dossier["insured_person_layer"].update( - { - "commencement_date": datetime.strptime( - (period := response["data"]["validDate"].split("-"))[0], - "%Y.%m.%d", - ), # 就有效期限解析为有效起期和有效止期。其中,若有效止期为长期则默认为9999-12-31 - "termination_date": ( - datetime(9999, 12, 31) - if period[1] == "长期" - else datetime.strptime(period[1], "%Y.%m.%d") - ), - } - ) - - -def image_classify(image_index: int, image_path: Path) -> None: - """ - 分类影像件并旋正 + 分类影像件、旋正并整合至赔案档案 :param image_index: 影像件编号 - :param image_path: 影像件路径(path对象) + :param image_path: 影像件路径 + :param dossier: 赔案档案 :return: 无 """ - # 打开并读取影像件 - image_ndarray = image_read(image_path) + # 读取影像件 + image_ndarray = image_read(image_path=image_path) image_format = image_path.suffix.lower() # 影像件格式 - # 生成影像件唯一标识 - image_guid = image_serialize(image_format, image_ndarray) + # 影像件序列化 + image_guid = image_serialize(image_format=image_format, image_ndarray=image_ndarray) # 压缩影像件 image_base64 = image_compress( - image_format, image_ndarray, image_size_specified=2 + image_format=image_format, image_ndarray=image_ndarray, image_size_specified=2 ) # 深圳快瞳要求影像件BASE64编码后大小小于等于2兆字节 # 请求深圳快瞳影像件分类接口 @@ -249,15 +60,14 @@ def image_classify(image_index: int, image_path: Path) -> None: }, # 以影像件唯一标识作为请求唯一标识,用于双方联查 data={ "token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌 - "imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", # 影像件BASE64编码嵌入数据统一资源标识符 + "imgBase64": f"data:image/{image_format.lstrip(".")};base64,{image_base64}", # 将影像件格式和BASE64编码嵌入数据统一资源标识符 }, guid=md5((url + image_guid).encode("utf-8")).hexdigest().upper(), ) - # TODO: 若响应非成功则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): raise RuntimeError("请求深圳快瞳影像件分类接口发生异常") - # 匹配影像件类型 + # 根据响应匹配影像件类型 match (response["data"]["flag"], response["data"]["type"]): case (14, _): image_type = "居民户口簿" @@ -284,14 +94,13 @@ def image_classify(image_index: int, image_path: Path) -> None: case _: image_type = "其它" - # 匹配影像件方向 + # 根据响应匹配影像件方向 image_orientation = { "0": "0度", "90": "顺时针90度", "180": "180度", "270": "逆时针90度", }.get(response["data"]["angle"], "0度") - # 若影像件方向非0度则旋正 if image_orientation != "0度": image_ndarray = cv2.rotate( image_ndarray, @@ -306,20 +115,269 @@ def image_classify(image_index: int, image_path: Path) -> None: image_format, image_ndarray, image_size_specified=2 ) + # 将影像件添加至影像件层 dossier["images_layer"].append( { "image_index": f"{image_index:02d}", # 影像件编号 - "image_path": image_path.as_posix(), - "image_name": image_path.stem, - "image_format": image_format, - "image_guid": image_guid, - "image_base64": image_base64, - "image_type": image_type, + "image_path": image_path.as_posix(), # 影像件路径 + "image_name": image_path.stem, # 影像件名称 + "image_format": image_format, # 影像件格式 + "image_guid": image_guid, # 影像件唯一标识 + "image_base64": image_base64, # 影像件BASE64编码 + "image_type": image_type, # 影像件类型 } ) -def mlm_recognize(image, schema) -> Dict[str, Any]: +def image_read( + image_path: Path, +) -> numpy.ndarray: + """ + 读取影像件 + :param image_path: 影像件路径 + :param dossier: 赔案档案 + :return: 影像件图像数组 + """ + try: + with open(image_path, "rb") as file: + image_bytes = file.read() # 读取影像件字节流 + + # 先将影像件字节流转为 numpy.ndarray 对象,再解码为单通道灰度图数组对象 + image_ndarray = cv2.imdecode( + buf=numpy.frombuffer(image_bytes, numpy.uint8), flags=cv2.IMREAD_GRAYSCALE + ) + if image_ndarray is None: + raise RuntimeError(f"影像件不存在") + return image_ndarray + except Exception as exception: + raise RuntimeError(f"读取影像件发生异常:{str(exception)}") from exception + + +def image_serialize(image_format: str, image_ndarray: numpy.ndarray) -> str: + """ + 影像件序列化 + :param image_format: 影像件格式 + :param image_ndarray: 影像件图像数组 + :return: 影像件唯一标识 + """ + # 将影像件图像数组编码为字节流 + success, image_ndarray_encoded = cv2.imencode(ext=image_format, img=image_ndarray) + if not success or image_ndarray_encoded is None: + raise RuntimeError("影像件编码发生异常") + + # 转为字节流并生成影像件唯一标识 + image_guid = md5(string=image_ndarray_encoded.tobytes()).hexdigest().upper() + return image_guid + + +def image_compress( + image_format: str, + image_ndarray: numpy.ndarray, + image_size_specified: float = 2.0, +) -> str: + """ + 压缩影像件 + :param image_format: 影像件格式 + :param image_ndarray: 影像件图像数组 + :param image_size_specified: 指定压缩影像件大小(单位为兆字节),默认为 2 + :return: 压缩后影像件BASE64编码 + """ + # 通过调整影像件质量和尺寸达到压缩影像件目的(先调整影像件质量再调整影像件尺寸) + for quality in range(100, 50, -10): + image_ndarray_copy = image_ndarray.copy() + for _ in range(10): + # 调整影像件质量后将影像件图像数组编码为字节流 + success, image_ndarray_encoded = cv2.imencode( + ext=image_format, + img=image_ndarray_copy, + params=( + [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10] + if image_format == "png" + else [cv2.IMWRITE_JPEG_QUALITY, quality] + ), + ) + if not success or image_ndarray_encoded is None: + break + + # 影像件BASE64编码 + image_base64 = b64encode(s=image_ndarray_encoded.tobytes()).decode("utf-8") + if len(image_base64) <= image_size_specified * 1_048_576: + return image_base64 + + # 调整影像件尺寸 + image_ndarray_copy = cv2.resize( + src=image_ndarray_copy, + dsize=( + int(image_ndarray_copy.shape[0] * 0.95), + int(image_ndarray_copy.shape[1] * 0.95), + ), + interpolation=cv2.INTER_AREA, + ) + if min(image_ndarray_copy.shape[:2]) < 350: + break + + raise RuntimeError("压缩影像件发生异常") + + +def image_recognize( + image: Dict[str, Any], + insurer_company: str, + dossier: Dict[str, Any], +) -> None: + """ + 识别影像件并整合至赔案档案 + :param image: 影像件 + :param insurer_company: 保险分公司 + :param dossier: 赔案档案 + :return: 无 + """ + # 基于影像件识别使能决策规则评估 + if not rules_engine.evaluate( + decision="影像件识别使能", + inputs={ + "insurer_company": insurer_company, + "image_type": image["image_type"], + }, + )["recognize_enabled"]: + return + + # 根据影像件类型匹配影像件识别方法 + match image["image_type"]: + case "居民户口簿": + raise RuntimeError("暂不支持居民户口簿") + case "居民身份证(国徽、头像面)" | "居民身份证(国徽面)" | "居民身份证(头像面)": + # 居民身份证识别并整合至赔案档案 + identity_card_recognize( + image=image, insurer_company=insurer_company, dossier=dossier + ) + case "中国港澳台地区及境外护照": + raise RuntimeError("暂不支持中国港澳台地区及境外护照") + case "理赔申请书": + application_recognize( + image=image, insurer_company=insurer_company, dossier=dossier + ) + case "增值税发票" | "医疗门诊收费票据" | "医疗住院收费票据": + # 票据识别并整合至赔案档案 + receipt_recognize( + image=image, insurer_company=insurer_company, dossier=dossier + ) + case "银行卡": + # 银行卡识别并整合至赔案档案 + bank_card_recognize(image=image, dossier=dossier) + + +def identity_card_recognize( + image: Dict[str, Any], insurer_company: str, dossier: Dict[str, Any] +) -> None: + """ + 识别居民身份证并整合至赔案档案 + :param image: 影像件 + :param insurer_company: 保险分公司 + :param dossier: 赔案档案 + :return: 无 + """ + # 请求深圳快瞳居民身份证识别接口 + response = request.post( + url=(url := "https://ai.inspirvision.cn/s/api/ocr/identityCard"), + headers={ + "X-RequestId-Header": image["image_guid"] + }, # 以影像件唯一标识作为请求唯一标识,用于双方联查 + data={ + "token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌 + "imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 将影像件格式和BASE64编码嵌入数据统一资源标识符 + }, # 深圳快瞳支持同时识别居民国徽面和头像面 + guid=md5(string=(url + image["image_guid"]).encode("utf-8")) + .hexdigest() + .upper(), + ) + if not (response.get("status") == 200 and response.get("code") == 0): + raise RuntimeError("请求深圳快瞳居民身份证识别接口发生异常") + + if image["image_type"] in [ + "居民身份证(国徽、头像面)", + "居民身份证(头像面)", + ]: + dossier["insured_person_layer"].update( + { + "insured_person": ( + insured_person := response["data"]["name"] + ), # 被保险人姓名 + "identity_type": (identity_type := "居民身份证"), # 证件类型 + "identity_number": ( + identity_number := response["data"]["idNo"] + ), # 证件号码 + "gender": response["data"]["sex"], # 性别 + "birth_date": ( + birth_date := datetime.strptime( + response["data"]["birthday"], "%Y-%m-%d" + ) + ), # 出生日期,转为日期时间(datetime对象),格式默认为%Y-%m-%d + "age": calculate_age( + report_time=dossier["report_layer"]["report_time"], + birth_date=birth_date, + ), # 年龄 + "province": ( + residential_address := parse_location( + location_text=response["data"]["address"] + ) + ).get( + "province" + ), # 就住址解析为所在省、市、区和详细地址 + "city": residential_address.get("city"), + "district": residential_address.get("county"), + "detailed_address": residential_address.get("detail"), + } + ) + + # 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询个单 + dossier["liabilities_layer"] = masterdata.query_liabilities( + insurer_company=insurer_company, + insured_person=insured_person, + identity_type=identity_type, + identity_number=identity_number, + report_date=dossier["report_layer"]["report_time"].strftime("%Y-%m-%d"), + ) + + if image["image_type"] in [ + "居民身份证(国徽、头像面)", + "居民身份证(国徽面)", + ]: + dossier["insured_person_layer"].update( + { + "commencement_date": datetime.strptime( + (period := response["data"]["validDate"].split("-"))[0], + "%Y.%m.%d", + ), # 就有效期限解析为有效起期和有效止期。其中,若有效止期为长期则默认为9999-12-31 + "termination_date": ( + datetime(9999, 12, 31) + if period[1] == "长期" + else datetime.strptime(period[1], "%Y.%m.%d") + ), + } + ) + + +def calculate_age(report_time: datetime, birth_date: datetime) -> int: + """ + 计算周岁 + :param report_time: 报案时间 + :param birth_date: 出生日期 + :return 周岁 + """ + age = report_time.year - birth_date.year + + return ( + age - 1 + if (report_time.month, report_time.day) + < ( + birth_date.month, + birth_date.day, + ) + else age + ) # 若报案时间的月和日小于出生日期的月和日则前减去一岁 + + +def mlm_recognize(image: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]: """ 使用多模态大模型就理赔申请书进行光学字符识别并结构化识别结果 :param image: 影像件 @@ -344,7 +402,7 @@ def mlm_recognize(image, schema) -> Dict[str, Any]: "type": "image_url", "image_url": { "url": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}" - }, # 影像件BASE64编码嵌入数据统一资源标识符 + }, # 将影像件格式和BASE64编码嵌入数据统一资源标识符 }, { "type": "text", @@ -368,7 +426,7 @@ def mlm_recognize(image, schema) -> Dict[str, Any]: } ), guid=md5( - json.dumps( + string=json.dumps( json_, sort_keys=True, ensure_ascii=False, @@ -380,16 +438,16 @@ def mlm_recognize(image, schema) -> Dict[str, Any]: # 就响应中消息内容JSON反序列化 try: - return json.loads(response["choices"][0]["message"]["content"]) - # TODO: 若请求火山引擎多模态大模型接口发生异常则流转至人工处理 + return json.loads(s=response["choices"][0]["message"]["content"]) except Exception as exception: raise RuntimeError("请求火山引擎多模态大模型接口发生异常") from exception -def boc_application_recognize(image: str) -> None: +def boc_application_recognize(image: Dict[str, Any], dossier: Dict[str, Any]) -> None: """ 识别中银保险有限公司的理赔申请书并整合至赔案档案 :param image: 影像件 + :param dossier: 赔案档案 :return: 无 """ # JSON格式 @@ -491,7 +549,7 @@ def boc_application_recognize(image: str) -> None: } # 使用多模态大模型就理赔申请书进行光学字符识别并结构化识别结果 - recognition = mlm_recognize(image, schema) + recognition = mlm_recognize(image=image, schema=schema) dossier["insured_person_layer"].update( { "phone_number": recognition["手机"], @@ -502,11 +560,14 @@ def boc_application_recognize(image: str) -> None: ) -def application_recognize(image, insurer_company) -> None: +def application_recognize( + image: Dict[str, Any], insurer_company: str, dossier: Dict[str, Any] +) -> None: """ 识别理赔申请书并整合至赔案档案 :param image: 影像件 :param insurer_company: 保险分公司 + :param dossier: 赔案档案 :return: 无 """ @@ -514,91 +575,17 @@ def application_recognize(image, insurer_company) -> None: match insurer_company: # 中银保险有限公司 case _ if insurer_company.startswith("中银保险有限公司"): - boc_application_recognize(image) + boc_application_recognize(image=image, dossier=dossier) -def fuzzy_match(contents: List[Dict[str, Any]], key: str) -> str: - """ - 根据内容列表(基于深圳快瞳增值税发票和医疗收费票据识别结果)模糊匹配键名 - :param contents: 内容列表 - :param key: 键名 - :return - """ - match contents[0].keys(): - # 对应深圳快瞳增值税发票识别结果 - case _ if "desc" in contents[0].keys(): - for content in contents: - if content["desc"] == key: - return content["value"] if content["value"] else "" - - candidates = [] - for content in contents: - candidates.append( - ( - content["value"], - fuzz.WRatio( - content["desc"], key, force_ascii=False - ), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度 - ) - ) - - return ( - (result[0] if result[0] else "") - if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 - else "" - ) # 返回似度>=80且最大的值 - - # 对应深圳快瞳医疗收费票据识别结果 - case _ if "name" in contents[0].keys(): - for content in contents: - if content["name"] == key: - return ( - content["word"]["value"] if content["word"]["value"] else "" - ) - - candidates = [] - for content in contents: - candidates.append( - ( - content["word"]["value"], - fuzz.WRatio( - content["name"], key, force_ascii=False - ), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度 - ) - ) - - return ( - (result[0] if result[0] else "") - if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 - else "" - ) # 返回>=80且最大的相似度的值 - - return "" - - -def parse_item(item: str) -> Tuple[str, Optional[str]]: - """ - 根据明细项解析明细项类别和具体内容,并根据具体内容查询药品/医疗服务 - :param item: 明细项 - return 明细项类别和药品/医疗服务 - """ - if match := re.match( - r"^\*(?P.*?)\*(?P.*)$", - item, - ): - return match.group("category"), masterdata.query_medicine( - match.group("specific") - ) - # 一般增值税发票明细项格式形如*{category}*{specific},其中category为明细项类别,例如中成药;specific为明细项具体内容,例如[同仁堂]金贵肾气水蜜丸 300丸/瓶,需要据此查询药品。其它格式则将明细项内容作为明细项类别,药品为空值 - else: - return item, None - - -def receipt_recognize(image, insurer_company) -> None: +def receipt_recognize( + image: Dict[str, Any], insurer_company: str, dossier: Dict[str, Any] +) -> None: """ 识别票据并整合至赔案档案 :param image: 影像件 :param insurer_company: 保险分公司 + :param dossier: 赔案档案 :return: 空 """ # 初始化票据数据 @@ -611,9 +598,9 @@ def receipt_recognize(image, insurer_company) -> None: }, # 以影像件唯一标识作为请求唯一标识,用于双方联查 data={ "token": authenticator.get_token(servicer="szkt"), # 获取深圳快瞳访问令牌 - "imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 影像件BASE64编码嵌入数据统一资源标识符 + "imgBase64": f"data:image/{image["image_format"].lstrip(".")};base64,{image["image_base64"]}", # 将影像件格式和BASE64编码嵌入数据统一资源标识符 }, - guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(), + guid=md5(string=(url + image["image_guid"]).encode("utf-8")).hexdigest().upper(), ) # 若查验状态为真票或红票则直接整合至赔案档案 if response.get("status") == 200 and response.get("code") == 10000: @@ -781,7 +768,6 @@ def receipt_recognize(image, insurer_company) -> None: .hexdigest() .upper(), ) - # TODO: 若请求深圳快瞳增值税发票识别接口发生异常则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): raise RuntimeError("请求深圳快瞳增值税发票识别接口发生异常") @@ -945,7 +931,6 @@ def receipt_recognize(image, insurer_company) -> None: .hexdigest() .upper(), ) - # TODO: 若请求深圳快瞳医疗收费票据识别接口发生异常则流转至人工处理 if not (response.get("status") == 200 and response.get("code") == 0): raise @@ -1080,6 +1065,7 @@ def receipt_recognize(image, insurer_company) -> None: .assign( reasonable_amount=lambda dataframe: dataframe.apply( lambda row: Decimal( + # 基于扣除明细项不合理费用决策规则评估 rules_engine.evaluate( decision="扣除明细项不合理费用", inputs={ @@ -1105,7 +1091,7 @@ def receipt_recognize(image, insurer_company) -> None: if dossier["insured_person_layer"]["insured_person"] in receipt["payer"] else None - ), # 出险人 + ), # 出险人姓名 "accident": "药店购药", # 出险事故 "diagnosis": "购药拟诊", # 医疗诊断 "personal_self_payment": Decimal("0.00"), # 个人自费金额 @@ -1126,16 +1112,12 @@ def receipt_recognize(image, insurer_company) -> None: "items": items.to_dict("records"), } ) - # TODO: 后续完善就购药及就医类型为门诊就诊(私立医院)处理 case ("增值税发票", "私立医院"): receipt["购药及就医类型"] = "门诊就医" - # TODO: 后续完善就购药及就医类型为门诊就诊(公立医院)处理 case ("医疗门诊收费票据", "公立医院"): receipt["购药及就医类型"] = "门诊就医" - # TODO: 后续完善就购药及就医类型为住院治疗处理 case ("医疗住院收费票据", "公立医院"): receipt["购药及就医类型"] = "住院治疗" - # TODO: 若根据影像件类型和购药及就医机构类型匹配购药及就医类型发生异常则流转至人工处理 case _: raise RuntimeError( "根据影像件类型和购药及就医机构类型匹配购药及就医类型发生异常" @@ -1144,10 +1126,86 @@ def receipt_recognize(image, insurer_company) -> None: dossier["receipts_layer"].append(receipt) -def bank_card_recognize(image) -> None: +def fuzzy_match(contents: List[Dict[str, Any]], key: str) -> str: + """ + 根据内容列表(基于深圳快瞳增值税发票和医疗收费票据识别结果)模糊匹配键名 + :param contents: 内容列表 + :param key: 键名 + :return + """ + match contents[0].keys(): + # 对应深圳快瞳增值税发票识别结果 + case _ if "desc" in contents[0].keys(): + for content in contents: + if content["desc"] == key: + return content["value"] if content["value"] else "" + + candidates = [] + for content in contents: + candidates.append( + ( + content["value"], + fuzz.WRatio( + content["desc"], key, force_ascii=False + ), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度 + ) + ) + + return ( + (result[0] if result[0] else "") + if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 + else "" + ) # 返回似度>=80且最大的值 + + # 对应深圳快瞳医疗收费票据识别结果 + case _ if "name" in contents[0].keys(): + for content in contents: + if content["name"] == key: + return content["word"]["value"] if content["word"]["value"] else "" + + candidates = [] + for content in contents: + candidates.append( + ( + content["word"]["value"], + fuzz.WRatio( + content["name"], key, force_ascii=False + ), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度 + ) + ) + + return ( + (result[0] if result[0] else "") + if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 + else "" + ) # 返回>=80且最大的相似度的值 + + return "" + + +def parse_item(item: str) -> Tuple[str, Optional[str]]: + """ + 根据明细项解析明细项类别和具体内容,并根据具体内容查询药品/医疗服务 + :param item: 明细项 + return 明细项类别和药品/医疗服务 + """ + if match := re.match( + r"^\*(?P.*?)\*(?P.*)$", + item, + ): + return match.group("category"), masterdata.query_medicine( + match.group("specific") + ) + # 一般增值税发票明细项格式形如*{category}*{specific},其中category为明细项类别,例如中成药;specific为明细项具体内容,例如[同仁堂]金贵肾气水蜜丸 300丸/瓶,需要据此查询药品。其它格式则将明细项内容作为明细项类别,药品为空值 + else: + return item, None + + +def bank_card_recognize(image: Dict[str, Any], dossier: Dict[str, Any]) -> None: """ 识别银行卡并整合至赔案档案 :param image: 影像件 + :param dossier: 赔案档案 :return: 空 """ # 请求深圳快瞳银行卡识别接口 @@ -1162,7 +1220,6 @@ def bank_card_recognize(image) -> None: }, guid=md5((url + image["image_guid"]).encode("utf-8")).hexdigest().upper(), ) - # TODO: 若响应非成功则流转至人工处理 if not ( response.get("status") == 200 and response.get("code") == 0 @@ -1178,45 +1235,3 @@ def bank_card_recognize(image) -> None: "account_number": response["data"]["cardNo"].replace(" ", ""), } ) - - -def image_recognize( - image, - insurer_company, -) -> None: - """ - 识别影像件并整合至赔案档案 - :param image: 影像件 - :param insurer_company: 保险分公司 - :return: 无 - """ - # 基于影像件识别使能规则评估影像件是否识别 - if not rules_engine.evaluate( - decision="影像件识别使能", - inputs={ - "insurer_company": insurer_company, - "image_type": image["image_type"], - }, - )["recognize_enabled"]: - return - - # 根据影像件类型匹配影像件识别方法 - match image["image_type"]: - # TODO: 后续添加居民户口簿识别和整合方法 - case "居民户口簿": - raise RuntimeError("暂不支持居民户口簿") - case "居民身份证(国徽、头像面)" | "居民身份证(国徽面)" | "居民身份证(头像面)": - # 居民身份证识别并整合至赔案档案 - identity_card_recognize(image, insurer_company) - # TODO: 后续添加居民户口簿识别和整合方法 - case "中国港澳台地区及境外护照": - raise RuntimeError("暂不支持中国港澳台地区及境外护照") - # TODO: 暂仅支持增值税发票识别且购药及就医类型为药店购药整合至赔案档案,后续逐步添加 - case "理赔申请书": - application_recognize(image, insurer_company) - case "增值税发票" | "医疗门诊收费票据" | "医疗住院收费票据": - # 票据识别并整合至赔案档案 - receipt_recognize(image, insurer_company) - case "银行卡": - # 银行卡识别并整合至赔案档案 - bank_card_recognize(image) diff --git a/票据理赔自动化/main.py b/票据理赔自动化/main.py index 1d1587d..6694a62 100644 --- a/票据理赔自动化/main.py +++ b/票据理赔自动化/main.py @@ -1,29 +1,27 @@ # -*- coding: utf-8 -*- """ -票据理赔自动化 +票据理赔自动化主模块 功能清单 https://liubiren.feishu.cn/docx/WFjTdBpzroUjQvxxrNIcKvGnneh?from=from_copylink """ from datetime import datetime from pathlib import Path -from typing import Any, Dict, List +from case import case_adjust +from image import image_classify, image_recognize from jinja2 import Environment, FileSystemLoader -import pandas - -from common import dossier, rule_engine -from image import image_classify -from image import image_recognize - if __name__ == "__main__": - # 初始化工作目录路径 - workplace_path = Path("directory") - workplace_path.mkdir(parents=True, exist_ok=True) # 若工作目录不存在则创建 + # 初始化文件路径 + file_path = Path(__file__).parent + + # 初始化所有赔案的文件夹路径(需要注意在TraeCN中,文件路径需显式声明) + folder_path = file_path / "directory" + folder_path.mkdir(parents=True, exist_ok=True) # 若文件夹路径不存在则创建 # 实例化JINJA2环境 - environment = Environment(loader=FileSystemLoader(".")) + environment = Environment(loader=FileSystemLoader(file_path)) # 添加DATE过滤器 environment.filters["date"] = lambda date: ( date.strftime("%Y-%m-%d") if date else "长期" @@ -31,130 +29,46 @@ if __name__ == "__main__": # 加载赔案档案模版 template = environment.get_template("template.html") - # ------------------------- - # 自定义方法 - # ------------------------- - - # noinspection PyShadowingNames - def case_adjust() -> None: - """ - 理算赔案并整合至赔案档案 - :return: 无 - """ - - def receipt_adjust(row: pandas.Series) -> List[Dict[str, Any]]: - """ - 票据理算 - :param row: 票据 - :return: 理算记录 - """ - date = row["date"] - current_type = row["就诊类型"] - current_amount = row["合理金额"] - remaining_claim = current_amount - claim_details = [] - - if current_amount <= 0: - return [] - - # 筛选有效保单并排序 - valid_rules = sorted( - [ - r - for r in policy_rules - if current_type in r["就诊类型"] - and r["生效日期"] <= current_date <= r["失效日期"] - and r["剩余额度"] > 0.0 - ], - key=lambda x: x["剩余额度"], - reverse=True, - ) - - # 循环分摊赔付,生成分明细列表 - for rule in valid_rules: - if remaining_claim <= 0.0: - break - - pay_ratio = rule["赔付比例"] - rule_name = rule["责任名称"] - remaining_quota = rule["剩余额度"] - - max_payable = remaining_claim * pay_ratio - actual_pay = min(remaining_quota, max_payable) - - if actual_pay > 0.0: - corresponding_actual_amount = actual_pay / pay_ratio - # 构建明细字典(字段与后续DataFrame列对应) - detail = { - "就诊类型": current_type, - "就诊合理金额": current_amount, - "保单责任名称": rule_name, - "保单赔付比例": pay_ratio, - "保单本次赔付金额": round(actual_pay, 2), - "本次对应合理金额部分": round(corresponding_actual_amount, 2), - "保单赔付后剩余额度": round(remaining_quota - actual_pay, 2), - } - claim_details.append(detail) - - # 更新保单额度和剩余待赔付金额 - rule["剩余额度"] -= actual_pay - remaining_claim -= corresponding_actual_amount - - return claim_details - - # 基于据拒付规则评估 - if not (result := rule_engine.evaluate(decision="拒付", inputs=dossier)): - # TODO: 若评估结果为空值(保险分公司未配置拒付规则)则流转至人工处理 - raise - - dossier["adjustment_layer"].update( - { - "conclusion": result["conclusion"], # 理赔结论 - "explanation": result["explanation"], # 结论说明 - } - ) - if result["conclusion"] == "拒付": - return - - adjustments = ( - pandas.DataFrame(dossier["receipts_layer"]).assing( - adjustments=lambda dataframe: dataframe.apply( - receipt_adjust, axis="columns" - ) - ) - ).explode("adjustments", ignore_index=True) - print(adjustments) - - # 遍历工作目录中赔案目录并创建赔案档案(模拟自动化域就待自动化任务创建理赔档案) - for case_path in [x for x in workplace_path.iterdir() if x.is_dir()]: - # 初始化赔案档案(保险公司将提供投保公司、保险分公司和报案时间等,TPA作业系统签收后生成赔案号) - dossier["report_layer"].update( - { - "report_time": datetime(2025, 7, 25, 12, 0, 0), # 指定报案时间 - "case_number": case_path.stem, # 设定:赔案目录名称为赔案号 + # 遍历文件夹中赔案文件夹并创建赔案档案 + for case_path in [x for x in folder_path.iterdir() if x.is_dir()]: + # 初始化赔案档案(推送至TPA时,保险公司会提保险分公司名称、报案时间和影像件等,TPA签收后生成赔案号) + dossier = { + "report_layer": { + "report_time": datetime( + 2025, 7, 25, 12, 0, 0 + ), # 指定报案时间,默认为 datetime对象 + "case_number": case_path.stem, # 默认为赔案文件夹名称 "insurer_company": ( insurer_company := "中银保险有限公司苏州分公司" - ), # 指定保险分公司 - } - ) - # 遍历赔案目录中影像件 + ), # 默认为中银保险有限公司苏州分公司 + }, # 报案层 + "images_layer": [], # 影像件层 + "insured_person_layer": {}, # 出险人层 + "liabilities_layer": [], # 理赔责任层 + "receipts_layer": [], # 票据层 + "adjustment_layer": {}, # 理算层 + } + + # 遍历赔案文件夹内所有影像件路径 for image_index, image_path in enumerate( sorted( [ - x - for x in case_path.glob(pattern="*") - if x.is_file() and x.suffix.lower() in [".jpg", ".jpeg", ".png"] - ], # 实际作业亦仅支持JPG、JPEG或PNG - key=lambda x: x.stat().st_ctime, # 根据影像件创建时间顺序排序 + i + for i in case_path.glob(pattern="*") + if i.is_file() and i.suffix.lower() in [".jpg", ".jpeg", ".png"] + ], + key=lambda i: i.stat().st_birthtime, # 根据影像件创建时间顺序排序 ), 1, ): - # 分类影像件并旋正(较初审自动化无使能检查) - image_classify(image_index, image_path) + # 分类影像件、旋正并整合至赔案档案 + image_classify( + image_index=image_index, image_path=image_path, dossier=dossier + ) # 就影像件层按照影像件类型指定排序 dossier["images_layer"].sort( - key=lambda x: [ + key=lambda i: [ "居民户口簿", "居民身份证(国徽面)", "居民身份证(头像面)", @@ -167,34 +81,20 @@ if __name__ == "__main__": "医疗费用清单", "银行卡", "其它", - ].index(x["image_type"]) + ].index(i["image_type"]) ) - # 遍历影像件层中影像件 + # 遍历影像件层内影像件 for image in dossier["images_layer"]: # 识别影像件并整合至赔案档案 image_recognize( - image, - insurer_company, + image=image, + insurer_company=insurer_company, + dossier=dossier, ) # 就票据层按照开票日期和票据号顺序排序 dossier["receipts_layer"].sort(key=lambda x: (x["date"], x["number"])) - print(dossier["insured_persons_layer"]) - exit() - - # 理算 - case_adjust() - - print(dossier["adjustment_layer"]) - - for receipt in dossier["receipts_layer"]: - print(receipt) - - print(dossier["report_layer"]) - print(dossier["insured_person_layer"]) - print(dossier["insured_persons_layer"]) - - dossier.pop("images_layer") - dossier.pop("receipts_layer") + # 理算赔案并整合至赔案档案 + case_adjust(dossier=dossier) diff --git a/票据理赔自动化/masterdata.py b/票据理赔自动化/masterdata.py index dfc64ed..83675a0 100644 --- a/票据理赔自动化/masterdata.py +++ b/票据理赔自动化/masterdata.py @@ -3,10 +3,10 @@ 主数据模块 """ -from datetime import datetime -from decimal import Decimal, ROUND_HALF_UP -from pathlib import Path import sys +from datetime import datetime +from decimal import ROUND_HALF_UP, Decimal +from pathlib import Path from typing import Any, Dict, List, Optional sys.path.append(Path(__file__).parent.parent.as_posix()) @@ -21,12 +21,13 @@ class MasterData(SQLite): query_medicine:根据明细项中具体内容查询药品/医疗服务名称 """ - def __init__(self): + def __init__(self, database: Path): """ 初始化 + :param database: 数据库路径 """ # 初始化SQLite客户端 - super().__init__(database="database.db") + super().__init__(database=database) try: with self: # 初始化团单表 @@ -86,14 +87,14 @@ class MasterData(SQLite): ) """ ) - # 初始化责任表 + # 初始化理赔责任表 self.execute( sql=""" CREATE TABLE IF NOT EXISTS liabilities ( - --责任唯一标识 + --理赔责任唯一标识 guid TEXT PRIMARY KEY, - --责任名称 + --理赔责任名称 liability TEXT NOT NULL, --出险事故 accident TEXT NOT NULL, @@ -103,10 +104,10 @@ class MasterData(SQLite): non_medical_ratio TEXT NOT NULL, --合理理算比例 reasonable_ratio TEXT NOT NULL, - --理算保单唯一标识 - adjust_policy_guid TEXT NOT NULL, + --理赔保单唯一标识 + claim_policy_guid TEXT NOT NULL, --个单唯一标识 - person_policy_guid TEXT NOT NULL + person_policy_guid TEXT NOT NULL ) """ ) @@ -159,7 +160,7 @@ class MasterData(SQLite): """ ) except Exception as exception: - raise RuntimeError(f"初始化数据库发生异常:{str(exception)}") from exception + raise RuntimeError(f"初始化主数据发生异常:{str(exception)}") from exception def query_liabilities( self, @@ -168,7 +169,7 @@ class MasterData(SQLite): identity_type: str, identity_number: str, report_date: str, - ) -> Optional[List[Dict[str, Any]]]: + ) -> List[Dict[str, Any]]: """ 根据保险分公司名称、被保险人姓名、证件类型、证件号码和报案时间查询被保险人的理赔责任 :param insurer_company: 保险分公司名称 @@ -199,7 +200,7 @@ class MasterData(SQLite): liabilities.personal_self_ratio, liabilities.non_medical_ratio, liabilities.reasonable_ratio, - liabilities.adjust_policy_guid + liabilities.claim_policy_guid FROM insured_persons INNER JOIN person_policies ON insured_persons.person_policy_guid = person_policies.guid @@ -211,10 +212,10 @@ class MasterData(SQLite): INNER JOIN liabilities ON person_policies.guid = liabilities.person_policy_guid INNER JOIN coverage_changes - ON liabilities.adjust_policy_guid = coverage_changes.change_policy_guid + ON liabilities.claim_policy_guid = coverage_changes.change_policy_guid AND coverage_changes.change_time = (SELECT MAX(change_time) FROM coverage_changes - WHERE liabilities.adjust_policy_guid = change_policy_guid) + WHERE liabilities.claim_policy_guid = change_policy_guid) WHERE group_policies.insurer_company = ? AND insured_persons.insured_person = ? AND insured_persons.identity_type = ? @@ -222,6 +223,7 @@ class MasterData(SQLite): AND ? BETWEEN group_policies.commencement_date AND group_policies.termination_date AND ? BETWEEN person_policies.commencement_date AND person_policies.termination_date AND CAST(coverage_changes.after_change_amount AS REAL) > 0 + ORDER BY commencement_date """, parameters=( insurer_company, @@ -232,7 +234,6 @@ class MasterData(SQLite): report_date, ), ) - # TODO: 若查无数据则流转至人工处理 if not result: raise RuntimeError("查无数据") @@ -256,7 +257,7 @@ class MasterData(SQLite): except Exception as exception: raise RuntimeError(f"{str(exception)}") from exception - def query_institution_type(self, institution: str) -> Optional[str]: + def query_institution_type(self, institution: str) -> str: """ 根据购药及就医机构名称查询购药及就医机构类型 :param institution: 购药及就医机构名称 @@ -272,7 +273,6 @@ class MasterData(SQLite): """, parameters=(institution,), ) - # TODO: 若查无数据则流转至人工处理 if not result: raise RuntimeError("查无数据") @@ -284,13 +284,12 @@ class MasterData(SQLite): def query_medicine( self, content: str, - ) -> Optional[str]: + ) -> str: """ 根据明细项中具体内容查询药品/医疗服务名称 :param content: 明细项具体内容 :return: 药品/医疗服务名称 """ - # TODO: 后续提供医疗耗材和服务查询 try: with self: result = self.query_all( @@ -301,7 +300,6 @@ class MasterData(SQLite): """, parameters=(content,), ) - # TODO: 若查无数据则流转至人工处理 if not result: raise RuntimeError("查无数据") @@ -311,3 +309,35 @@ class MasterData(SQLite): except Exception as exception: raise RuntimeError(f"{str(exception)}") from exception + + def query_remaining_amount( + self, + policy_guid: str, + ) -> Decimal: + """ + 根据变动保单唯一标识查询最新一条保额变动记录的变动后金额 + :param policy_guid: 变动保单唯一标识 + :return: 变动后金额 + """ + try: + with self: + result = self.query_one( + sql=""" + SELECT after_change_amount + FROM coverage_changes + WHERE change_policy_guid = ? + ORDER BY change_time DESC + LIMIT 1; + """, + parameters=(policy_guid,), + ) + if not result: + raise RuntimeError("查无数据") + + return Decimal(result["after_change_amount"]).quantize( + Decimal("0.00"), + rounding=ROUND_HALF_UP, + ) + + except Exception as exception: + raise RuntimeError(f"{str(exception)}") from exception