Python/普康健康发票查验/main.py

788 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
普康健康_发票查验
"""
# 加载模块
import hashlib
import json
import shutil
import uuid
from base64 import b64decode, b64encode
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import cv2
import numpy
import pandas
from utils.client import Authenticator, HTTPClient, RequestException, restrict
from utils.pandas_extension import open_csv, save_as_workbook, traverse_directory
# 影像件压缩
def image_compression(
image_path: str | None = None,
image_format: str | None = None,
image_data: bytes | None = None, # 数据类型为包含图像文件的二进制数据的字节串
image_size_specified: int = 2, # 指定影像件大小
raw: bool = False, # 250804新增返回是否为完整URI数据格式
) -> str | None:
try:
# 若影像件路径数据类型为STR则创建路径对象
if isinstance(image_path, str):
image_path = Path(image_path)
# 影像件文件名称后缀
image_format = image_path.suffix.strip().lstrip(".").lower()
# 读取影像件数据
with open(image_path, "rb") as image:
image_data = image.read()
# 影像件数据BASE64编码
image_data_base64 = b64encode(image_data).decode("utf-8")
# 指定影像件大小的单位由MB转为KB
image_size_specified = image_size_specified * 1024 * 1024
# 若影像件大小小于指定影像件大小则返回BASE64编码后影像件数据
if len(image_data_base64) < image_size_specified:
if raw:
# 返回非完整URI数据格式
return image_data_base64
else:
# 返回完整URI数据格式
return f"data:image/{image_format};base64,{image_data_base64}"
# OPENCV解码数据类型为NUMPY-UINT8
image_data_cv2 = cv2.imdecode(
numpy.frombuffer(image_data, numpy.uint8), cv2.IMREAD_COLOR
)
# 若OPENCV解码失败则抛出异常
if image_data_cv2 is None:
raise RuntimeError(f"OPENCV解码发生异常")
# 初始化近似BASE64编码后影像件数据
proximate_image_data_base64 = None
# 初始化最小压缩前后影像件大小差值
min_image_size_difference = float("inf")
# 基于双层压缩方法:先外层降低图像质量,再内层缩小图像尺寸
for quality in range(90, 0, -10):
image_data_cv2_ = image_data_cv2.copy()
# 根据影像件格式匹配图片质量配置
# noinspection PyUnreachableCode
match image_format:
case "png":
encoding_params = [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10]
case _:
encoding_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
for i in range(25):
# 降低图像质量
# noinspection PyTypeChecker
success, image_data_encoded = cv2.imencode(
image_format, image_data_cv2_, encoding_params
) # 图像编码
# 若图像编码失败则退出
if not success:
break
image_data_base64 = b64encode(image_data_encoded.tobytes()).decode(
"utf-8"
)
# 压缩前后影像件大小差值
image_size_difference = len(image_data_base64) - image_size_specified
if image_size_difference <= 0:
if raw:
return image_data_base64
else:
return f"data:image/{image_format};base64,{image_data_base64}"
if image_size_difference < min_image_size_difference:
min_image_size_difference = image_size_difference
proximate_image_data_base64 = image_data_base64
# 影像件高度和宽度
image_height, image_weight = image_data_cv2_.shape[:2]
# 若仍超过影像件指定大小则调整图像尺寸
image_data_cv2_ = cv2.resize(
image_data_cv2_,
dsize=(int(image_weight * 0.9), int(image_height * 0.9)),
interpolation=cv2.INTER_AREA,
)
if proximate_image_data_base64:
if raw:
return proximate_image_data_base64
else:
return f"data:image/{image_format};base64,{image_data_base64}"
else:
raise RuntimeError("影像件压缩失败")
except:
return None
# 票据查验接口(需要)
@restrict(refill_rate=5, max_tokens=5) # 限速至5QPS
def invoices_verification(
image_index,
image_path=None,
invoice_number=None,
invoice_code=None,
invoice_check_code=None,
invoice_date=None,
invoice_amount=None,
id_number=None,
process_mode=None,
supplier=None,
):
try:
# 若影像件地址非空则imgBASE64请求否则根据发票五要素请求
if image_path:
match process_mode:
case "通过影像件本地地址":
# 创建路径对象
image_path = Path(image_path)
# 影像件文件名称后缀
image_format = image_path.suffix.strip().lstrip(".").lower()
# 读取影像件数据
with open(image_path, "rb") as image:
image_data = image.read()
case "通过影像件对象服务器地址":
image_format, image_data = http_client.download(url=image_path)
# 断定影像件格式为JGP、JPEG或者PNG
# noinspection PyUnboundLocalVariable
assert image_format in [
"jpg",
"jpeg",
"png",
], f"影像件格式({image_format})不支持"
match supplier:
case "szkt":
image_data_base64 = image_compression(
image_format=image_format, image_data=image_data
)
# noinspection PyUnusedLocal
response = http_client.post(
# 深圳快瞳增值税发票、医疗发票查验兼容版
url="https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll",
# 用于和深圳快瞳联查时定位请求
headers={"X-RequestId-Header": image_index},
data={
"token": authenticator.get_token(servicer="szkt"),
"imgBase64": image_data_base64,
},
)
case "bjfd":
image_data_base64 = image_compression(
image_format=image_format, image_data=image_data, raw=True
) # 北京分单不支持完整URI数据格式
# 业务入参序列化并BASE64编码
data = b64encode(
json.dumps(
{
"fileByte": image_data_base64,
"fileType": (
"png" if image_format == "png" else "jpg"
), # 北京分单影像件格式支持JPG、PNG或PDF本脚本暂不支持PDF
}
).encode("utf-8")
).decode("utf-8")
# 应用账号
appid = "mbYr11Rc_42"
# 随机标识
noise = image_index
# 版本号
version = "1.0"
# 装配签名
sign = (
hashlib.md5(
f"appid={appid}&data={data}&noise={noise}&key=80357535c95333c3b133dfe5533f6334fe5e9321&version={version}".encode(
"utf-8"
)
)
.hexdigest()
.upper()
)
# noinspection PyUnusedLocal
response = http_client.post(
# 北京分单增值税发票、医疗票据二维码查验接口
url="https://api.fendanyun.com/rsx/api/checkByQRCode",
headers={"Content-Type": "application/json; charset=utf-8"},
json={
"appid": appid,
"data": data,
"noise": noise,
"version": version,
"sign": sign,
},
)
else:
response = http_client.post(
# 深圳快瞳增值税发票、医疗发票查验兼容版
url="https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll",
data={
"token": authenticator.get_token(servicer="szkt"),
"invoiceNumber": invoice_number,
"invoiceCode": invoice_code,
"checkCode": invoice_check_code,
"invoicingDate": invoice_date,
"pretaxAmount": invoice_amount,
"idCardNo": id_number,
},
)
except RequestException as request_exception:
response = {
"status": request_exception.status,
"code": request_exception.code,
"message": request_exception.message,
}
except Exception as exception:
response = {
"code": "40000",
"message": f"发生其它异常{exception}",
}
return image_index, response
if __name__ == "__main__":
print("已启动批量票据查验")
match input("请选择票据查验供应商(1:深圳快瞳2:北京分单,其它任意字符:退出脚本):"):
case "1":
supplier = "szkt"
case "2":
supplier = "bjfd"
case _:
print("选择退出脚本!")
exit(0)
match input(
"请选择处理流程(1:批量解析已归档响应报文2:根据影像件地址或票据五要素批量查验,其它任意字符:退出脚本):"
):
case "1":
# 打开前置影像件索引CSV文件
dataframe = open_csv(file_name="dataframe_indexed.csv")
case "2":
print("正在归档响应报文...", end="")
# 创建响应报文目录路径对象
responses_path = Path("temporary/responses")
# 若响应报文目录路径不存在则创建
if not responses_path.exists():
responses_path.mkdir(parents=True, exist_ok=True)
# 创建归档响应报文目录路径对象
archives_path = Path("temporary/archives")
# 若归档响应报文目录路径不存在则创建
if not archives_path.exists():
archives_path.mkdir(parents=True, exist_ok=True)
# 遍历响应报文目录下所有文件名后缀为JSON的文件路径
for file_path in Path(responses_path).glob("*.json"):
# 若文件路径为文件
if file_path.is_file():
# 移动响应报文由响应报文目录至归档响应报文目录
shutil.move(str(file_path), str(archives_path / file_path.name))
print("已完成")
match input(
"请选择批量查验方法(1:通过影像件本地地址2:通过影像件对象服务器地址3:通过增值税发票和医疗票据的五要素,其它任意字符:退出脚本):"
):
case "1":
print("正在读取影像件本地地址...", end="")
dataframe = traverse_directory(
directory_path="待查验发票", suffixes=[".jpg", ".jpeg", ".png"]
)
# 修改列名相对路径为影像件地址
dataframe.rename(columns={"相对路径": "影像件地址"}, inplace=True)
process_mode = "通过影像件本地地址"
case "2":
print("正在读取影像件对象服务器地址...", end="")
dataframe = open_csv(file_name="dataframe.csv")
# 断定列名包括赔案编号、发票编号和影像件地址
assert all(
[
column_name in dataframe.columns
for column_name in ["赔案编号", "发票编号", "影像件地址"]
]
), "CSV文件中列名必须包括赔案编号、发票编号和影像件地址"
# 根据赔案编号和发票编号去重
dataframe.drop_duplicates(
subset=["赔案编号", "发票编号"], keep="first", inplace=True
)
# 处理方式
process_mode = "通过影像件对象服务器地址"
case "3":
print("正在读取增值税发票和医疗票据的五要素...", end="")
dataframe = open_csv(file_name="dataframe.csv")
# 断定列名包括身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额
assert all(
[
column_name in dataframe.columns
for column_name in [
"身份证号码后六位",
"发票编号",
"发票代码",
"校验号码后六位",
"开票日期",
"发票金额",
]
]
), "CSV文件中列名必须包括身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额"
# 根据身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额去重
dataframe.drop_duplicates(
subset=[
"身份证号码后六位",
"发票编号",
"发票代码",
"校验号码后六位",
"开票日期",
"发票金额",
],
keep="first",
inplace=True,
)
# 格式化开票日期
dataframe["开票日期"] = dataframe["开票日期"].str.replace(
"-", "", regex=False
)
# 处理方式
process_mode = "通过增值税发票和医疗票据的五要素"
case _:
print("选择退出脚本!")
exit(0)
# 统计待查验发票张数
rows = dataframe.shape[0]
# 若待查验发票张数为0则退出脚本
if rows == 0:
print("待查验发票张数为0退出脚本")
exit(0)
print(f"已完成,待查验发票张数为 {rows}")
# 添加索引
dataframe["索引"] = dataframe.apply(
lambda x: uuid.uuid4().hex, axis="columns"
)
dataframe.to_csv("dataframe_indexed.csv", index=False)
# 创建深圳快瞳获取访问令牌方法
authenticator = Authenticator()
# 初始化请求客户端
http_client = HTTPClient()
# 用于记录已完成任务数
completed_futures = 0
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# noinspection PyUnreachableCode
# noinspection PyUnboundLocalVariable
match process_mode:
case "通过影像件本地地址" | "通过影像件对象服务器地址":
futures = [
executor.submit(
invoices_verification,
image_index=row.索引,
image_path=row.影像件地址,
process_mode=process_mode,
supplier=supplier,
)
for row in dataframe[["索引", "影像件地址"]].itertuples(
index=False, name="row"
)
]
case "通过增值税发票和医疗票据的五要素":
# 提交任务
futures = [
executor.submit(
invoices_verification,
image_index=row.索引,
invoice_number=row.发票编号,
invoice_code=row.发票代码,
invoice_check_code=row.校验号码后六位,
invoice_date=row.开票日期,
invoice_amount=row.发票金额,
id_number=row.身份证号码后六位,
process_mode=process_mode,
supplier=supplier,
)
for row in dataframe[
[
"索引",
"发票编号",
"发票代码",
"校验号码后六位",
"开票日期",
"发票金额",
"身份证号码后六位",
]
].itertuples(index=False, name="row")
]
for future in as_completed(futures):
index, response = future.result()
# 保存报文
with open(
"temporary/responses/{}.json".format(index),
"w",
encoding="utf-8",
) as file:
json.dump(response, file, ensure_ascii=False)
completed_futures += 1
print(f"已完成 {completed_futures / rows * 100:.2f} %")
case _:
print("选择退出脚本!")
exit(0)
print("正在解析报文...", end="")
# 解析后数据体
dataframe_parsed = []
# 遍历报文所在目录
for path_object in list(Path("temporary/responses").glob("*.json")):
# 解析报文结构
parse = {
"索引": "",
"机打发票号码": "",
"发票金额": "",
"购买方": "",
"销售方": "",
"发票状态": "",
"最大销售项目名称": "",
"最大销售项目数量": "",
"XML版式文件": "",
}
# 若路径对象包含下划线则在解析报文结构添加赔案编号和发票编号
if "_" in path_object.stem:
parse["赔案编号"] = path_object.stem.split("_")[0]
parse["发票编号"] = path_object.stem.split("_")[1]
# 打开报文并JSON逆序列化
with open(path_object, "r", encoding="utf-8") as file:
response = json.load(file)
# 索引
parse["索引"] = path_object.stem
match supplier:
case "szkt":
try:
# 响应状态码
status_code = response.get("status", "")
# 错误码
code = response.get("code", "")
# 流水号
serial = response.get("serialNo", "")
# 若响应状态码为200且错误码为10000则定义为响应成功
if status_code == 200 and code == 10000:
# 查验类型若查验类型为003081则为医疗票据查验003082则为增值税发票查验两者报文结构不一致
match response.get("data").get(
"productCode"
): # 若响应成功则必定存在键DATA和PRODUCTCODE
# 解析医疗票据查验结果
case "003081":
parse["机打发票号码"] = response.get("data").get(
"billNumber"
)
parse["校验码"] = response.get("data").get("checkCode")
parse["发票金额"] = response.get("data").get("amount")
parse["购买方"] = response.get("data").get("payer")
parse["销售方"] = response.get("data").get("payeeName")
# 发票状态
match response.get("data").get("flushedRed"):
case "true":
parse["发票状态"] = "正常"
case "false":
parse["发票状态"] = "已红冲"
# 最大销售项目数量
max_item_quantity = 0
# 遍历销售项目列表
for item in response.get("data").get("feeitems", []):
# 销售项目数量
item_quantity = item.get("number")
# 若销售项目数量非空,进一步判断是否包含斜杠,若包含斜杠则分割并取第一部分,最后转为浮点
if item_quantity:
if "/" in item_quantity:
item_quantity = item_quantity.split("/")[0]
item_quantity = float(item_quantity)
else:
item_quantity = 1
if item_quantity > max_item_quantity:
parse["最大销售项目名称"] = item.get(
"itemName", ""
)
parse["最大销售项目数量"] = str(item_quantity)
parse["XML版式文件"] = response.get("PDFInfo", {}).get(
"fileUrl"
)
# 解析增值税发票查验结果
case "003082":
parse["机打发票号码"] = (
response.get("data").get("details").get("number")
)
parse["校验码"] = (
response.get("data")
.get("details")
.get("check_code")
)
parse["发票金额"] = (
response.get("data").get("details").get("total")
)
parse["购买方"] = (
response.get("data").get("details").get("buyer")
)
parse["销售方"] = (
response.get("data").get("details").get("seller")
)
# 发票状态
match response.get("data").get("details").get(
"invoiceTypeNo"
):
case "0":
parse["发票状态"] = "正常"
case "1":
parse["发票状态"] = "无法查验"
case "2" | "3" | "7" | "8":
parse["发票状态"] = "已红冲"
max_item_quantity = 0
items = (
response.get("data").get("details").get("items", [])
)
for item in items:
item_quantity = (
float(item.get("quantity", 1))
if item.get("quantity")
else 1
)
if item_quantity > max_item_quantity:
parse["最大销售项目名称"] = item.get("name")
parse["最大销售项目数量"] = str(item_quantity)
# XML版式文件25-06-11本接口不在提供版式文件通过另一接口可获取数电增值税发票版式文件
parse["XML版式文件"] = "本接口不再提供版式文件"
# 若响应状态码为400且错误码为10001或10100则定义为假票
elif status_code == 400 and (code == 10001 or code == 10100):
parse["发票状态"] = "假票"
else:
raise Exception("解析报文发生其它异常")
except Exception as exception:
parse["发票状态"] = "{}".format(response.get("message"))
case "bjfd":
try:
# 不验签业务出参BASE64解码并反序列化
response = json.loads(
b64decode(response.get("data")).decode("utf-8")
)
# 增值税发票、医疗票据查验结果BASE64解码并反序列化
response["message"] = json.loads(
b64decode(response.get("message")).decode("utf-8")
)
# 错误码
code = response.get("result")
# 流水号
serial = response.get("message").get("checkId")
# 核验结果代码
result_code = response.get("message").get("resultCode")
# 若错误码为S0000则定义为响应成功
if code == "S0000":
# noinspection PyUnreachableCode
match result_code:
# 若查验成功则根据增值税发票、医疗票据状态匹配发票状态
case "200":
parse["机打发票号码"] = (
response.get("message")
.get("tickMainInfo")
.get("invoiceNo")
)
parse["发票金额"] = (
response.get("message")
.get("tickMainInfo")
.get("invoiceTotalPrice")
)
parse["购买方"] = (
response.get("message")
.get("tickMainInfo")
.get("payerPartyName")
)
parse["销售方"] = (
response.get("message")
.get("tickMainInfo")
.get("invoicingPartyName")
)
max_item_quantity = 0
for item in (
response.get("message")
.get("tickMainInfo")
.get("chargeItems", [])
):
item_quantity = (
float(item.get("num", 1))
if item.get("num")
else 1
)
if item_quantity > max_item_quantity:
parse["最大销售项目名称"] = item.get(
"chargeName"
)
parse["最大销售项目数量"] = str(item_quantity)
match response.get("message").get("invoiceStatus"):
case "0":
parse["发票状态"] = "正常"
case (
"1" | "2"
): # 沿用深圳快瞳解析规则,北京分单已开红票和已作废映射为已红冲
parse["发票状态"] = "已红冲"
case "E20003" | "E20007 ":
parse["发票状态"] = "假票"
# 其它情况发票状态根据核验结果描述
case _:
parse["发票状态"] = response.get("message").get(
"resultMsg"
)
except Exception as exception:
parse["发票状态"] = str(exception)
dataframe_parsed.append(parse)
dataframe_parsed = pandas.DataFrame(data=dataframe_parsed, dtype=str)
# 将解析数据集拼接至数据集
dataframe = dataframe.merge(right=dataframe_parsed, how="left", on=["索引"])
# 填补缺失值
dataframe = dataframe.fillna(value="")
print("已完成")
print("正在保存为工作簿...", end="")
save_as_workbook(worksheets=[("Sheet1", dataframe)], workbook_name="results.xlsx")
print("已完成")