日常更新

from NUC
This commit is contained in:
liubiren 2025-12-21 22:19:14 +08:00
parent 0a92a848d3
commit 9a06479491
1 changed files with 158 additions and 182 deletions

View File

@ -16,6 +16,7 @@ from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy
import pandas
from fuzzywuzzy import fuzz
from jinja2 import Environment, FileSystemLoader
from jionlp import parse_location
@ -813,14 +814,13 @@ if __name__ == "__main__":
)
candidates = []
# 基于加权补偿的莱文斯坦距离算法计算所有内容的键名和指定键名的相似度
for content in contents:
candidates.append(
(
content["word"]["value"],
fuzz.WRatio(
content["name"], key, force_ascii=False
),
), # 基于加权补偿的莱文斯坦距离算法计算所有内容的键名和指定键名的相似度
)
)
@ -831,63 +831,6 @@ if __name__ == "__main__":
else None
)
def parse_items(contents):
"""
就识别结果解析明细项
:param contents: 识别结果
:return 解析后的明细项
"""
# noinspection PyInconsistentReturns
match contents[0].keys():
case _ if "desc" in contents[0].keys():
# noinspection PyTypeChecker
return [
{
"名称": name,
"数量": Decimal(quantity).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
"金额": (Decimal(amount) + Decimal(tax)).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
), # 深圳快瞳票据识别接口中明细的金额和税额由字符串转为Decimal保留两位小数并求和
}
for name, quantity, amount, tax in zip(
[
x["value"]
for x in contents
if re.match(
r"^项目名称(\d+)?$",
x["desc"],
)
],
[
x["value"]
for x in contents
if re.match(
r"^数量(\d+)?$",
x["desc"],
)
],
[
x["value"]
for x in contents
if re.match(
r"^金额(\d+)?$",
x["desc"],
)
],
[
x["value"]
for x in contents
if re.match(
r"^税额(\d+)?$",
x["desc"],
)
],
)
]
# 初始化票据数据
receipt = {"影像件编号": image["影像件编号"]}
# 请求深圳快瞳票据查验接口(兼容增值税发票、医疗门诊/住院收费票据)
@ -1115,7 +1058,54 @@ if __name__ == "__main__":
"购药及就医机构": query_value(
response["data"], "销售方名称"
),
"明细项": parse_items(response["data"]),
"明细项": [
{
"名称": name,
"数量": Decimal(quantity).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
"金额": (
Decimal(amount) + Decimal(tax)
).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
), # 深圳快瞳票据识别接口中明细的金额和税额由字符串转为Decimal保留两位小数并求和
}
for name, quantity, amount, tax in zip(
[
x["value"]
for x in response["data"]
if re.match(
r"^项目名称(\d+)?$",
x["desc"],
)
],
[
x["value"]
for x in response["data"]
if re.match(
r"^数量(\d+)?$",
x["desc"],
)
],
[
x["value"]
for x in response["data"]
if re.match(
r"^金额(\d+)?$",
x["desc"],
)
],
[
x["value"]
for x in response["data"]
if re.match(
r"^税额(\d+)?$",
x["desc"],
)
],
)
],
"备注": query_value(response["data"], "备注"),
}
)
@ -1168,6 +1158,20 @@ if __name__ == "__main__":
receipt["global_detail"]["invoice_date"]["value"],
"%Y-%m-%d",
),
"入院日期": (
datetime.strptime(
receipt["starttime"]["value"], "%Y-%m-%d"
)
if isinstance(receipt["starttime"], dict)
else None
),
"出院日期": (
datetime.strptime(
receipt["endtime"]["value"], "%Y-%m-%d"
)
if isinstance(receipt["endtime"], dict)
else None
),
"校验码": query_value(
receipt["global_detail"]["region_specific"],
"校验码",
@ -1182,142 +1186,114 @@ if __name__ == "__main__":
"购药及就医机构": receipt["hospital_name"]["value"],
"明细项": [
{
"名称": field["value"],
"数量": Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=(
ROUND_HALF_UP
if isinstance(
(field := item["number"]), dict
)
else None
),
),
"金额": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance(
(field := item["total_amount"]), dict
)
"名称": (
item["item_name"]["value"]
if isinstance(item["item_name"], dict)
else None
),
"数量": Decimal(
item["number"]["value"]
if isinstance(item["number"], dict)
else Decimal("1.00")
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
"金额": Decimal(
item["total_amount"]["value"]
if isinstance(item["total_amount"], dict)
else Decimal("1.00")
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
}
for item in receipt["feeitems"]
if isinstance(item, dict)
],
"医保支付": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
"个人自费": (
Decimal(receipt["self_cost"]["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if isinstance(
(field := receipt.get("medicare_pay")), dict
)
else None
),
"其它支付": format(
(
Decimal(value).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if (
value := fuzzy_match(
target="其它支付",
components=receipt.get(
"global_detail", {}
).get("pay_list", []),
specify_key="name",
return_key="word.value",
)
)
else None
),
".2f",
),
"个人自付": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
Decimal(receipt["self_pay"]["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if isinstance(
(field := receipt.get("self_pay")), dict
)
else None
),
"自付一": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance(
field := (receipt.get("self_pay_one")), dict
)
else None
"医保支付": (
Decimal(
receipt["medicare_pay"]["value"]
) # 医保基金统筹支付
+ (
Decimal(receipt["addition_pay"]["value"])
if isinstance(receipt["addition_pay"], dict)
else Decimal("0.00")
) # 附加支付
+ (
Decimal(receipt["third_pay"]["value"])
if isinstance(receipt["third_pay"], dict)
else Decimal("0.00")
) # 第三方支付
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
"自付二": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance(
field := (receipt.get("self_pay_two")), dict
)
else None
),
"个人自费": (
format(
Decimal(field["value"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
".2f",
)
if isinstance(
field := (receipt.get("self_cost")), dict
)
else None
),
"住院日期": (
datetime.strptime(
field["value"], "%Y%m%d"
).strftime("%Y-%m-%d")
if isinstance(
field := (receipt.get("starttime")), dict
)
else None
),
"出院日期": (
datetime.strptime(
field["value"], "%Y%m%d"
).strftime("%Y-%m-%d")
if isinstance(
field := (receipt.get("endtime")), dict
)
else None
),
"医疗机构类型": receipt["others"][
"medical_institution_type"
]["value"],
}
)
# 根据购药及就医机构查询购药及就医机构类型
receipt["购药及就医机构类型"] = master_data.query_institution_type(
receipt["购药及就医机构"]
)
# 根据影像件类型和购药及就医机构类型匹配购药及就医类型,就增值税发票且药店扣除不合理费用、增值税发票且私立医院解析个人自费、个人自付和医保支付
match (image["影像件类型"], receipt["购药及就医机构类型"]):
case ("增值税发票", "药店"):
receipt["购药及就医类型"] = "药店购药"
# 就相同明细项合并其数量和金额
items = (
pandas.DataFrame(receipt["明细项"])
.groupby("名称")
.agg(数量=("数量", "sum"), 金额=("金额", "sum"))
.loc[
lambda dataframe: dataframe["金额"] != 0
] # 仅保留金额非0的明细项
.reset_index()
.to_dict("records")
)
for item in items:
# 解析明细项大类名称和具体名称
if match := re.match(
r"^\*(?P<category_name>.*?)\*(?P<specific_name>.*)$",
item["名称"],
):
category_name = match.group(
"category_name"
) # 明细项大类名称
specific_name = match.group(
"specific_name"
) # 明细项具体名称
else:
pass
print(specific_name)
exit()
case ("增值税发票", "私立医院"):
receipt["购药及就医类型"] = "门诊就医"
case ("医疗门诊收费票据", "公立医院"):
receipt["购药及就医类型"] = "门诊就医"
case ("医疗住院收费票据", "公立医院"):
receipt["购药及就医类型"] = "住院治疗"
case _:
# TODO: 若匹配购药及就医类型发生异常则流转至人工处理
raise RuntimeError("匹配购药及就医类型发生异常")
# noinspection PyShadowingNames
def bank_card_recognize(image) -> None:
"""