From eb1e20813bc5160d3152ce33d2cf4ec98afb8761 Mon Sep 17 00:00:00 2001 From: marslbr Date: Wed, 29 Oct 2025 12:54:32 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=9B=AE=E5=BD=95=E5=92=8C?= =?UTF-8?q?=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/Python.iml | 8 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/modules.xml | 8 + utils/ocr.py | 471 ++++++ utils/operate.py | 61 + utils/pandas_extension.py | 1489 +++++++++++++++++ 普康健康审核机器人/pageobject.py | 1385 +++++++++++++++ 7 files changed, 3428 insertions(+) create mode 100644 .idea/Python.iml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/modules.xml create mode 100644 utils/ocr.py create mode 100644 utils/operate.py create mode 100644 utils/pandas_extension.py create mode 100644 普康健康审核机器人/pageobject.py diff --git a/.idea/Python.iml b/.idea/Python.iml new file mode 100644 index 0000000..9f1a926 --- /dev/null +++ b/.idea/Python.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..3097039 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/utils/ocr.py b/utils/ocr.py new file mode 100644 index 0000000..a3cbb43 --- /dev/null +++ b/utils/ocr.py @@ -0,0 +1,471 @@ +# -*- coding: utf-8 -*- + +# 导入模块 + +import warnings + +# 过滤使用提醒 +warnings.filterwarnings( + "ignore", + category=UserWarning, +) + +from fuzzywuzzy import fuzz + +import re + +import numpy + +import cv2 + +from decimal import Decimal, ROUND_HALF_UP + +from paddleocr import PaddleOCR + +""" + +封装百度飞桨PADDLEOCR + +""" + + +def fuzzy_match( + target: str, components: list, specify_key: str, return_key: str +) -> str: + """ + 根据目标在组成部分列表模糊匹指定键名的键值,并返回匹配的组成部分的返回键名的键值 + 需要匹配的键名的键值 + """ + + def _get_value(component, keys): + """根据键名递归获取键值,支持嵌套结构""" + key = keys[0] + if isinstance(component, dict) and key in component: + return ( + _get_value(component[key], keys[1:]) + if len(keys) > 1 + else component[key] + ) + return None + + results = [] + + for component in components: + # 在组成部分根据指定键名获取对应键值 + specify_value = _get_value(component, specify_key.split(".")) + if specify_value is None: + continue + + # 在组成部分根据返回键名获取对应键值 + return_value = _get_value(component, return_key.split(".")) + if return_value is not None: + results.append( + (return_value, fuzz.WRatio(target, specify_value)) + ) # 基于加权补偿莱文斯坦相似度算法 + + return max(results, key=lambda x: x[1])[0] if results else None + + +class PPOCR: + """OCR客户端""" + + def __init__(self): + + # 初始化PADDLEOCR + self.ocr_engine = PaddleOCR( + ocr_version="PP-OCRv4", + use_doc_orientation_classify=True, + use_doc_unwarping=True, + use_textline_orientation=True, + ) + + @staticmethod + def _texts_sort(texts): + """文本排序""" + + texts_merged = [] + + for texts, coordinates in zip( + texts[0]["rec_texts"], texts[0]["rec_polys"] + ): # 默认识别结果仅包含一张影像件 + + # 合并文本框的X/Y坐标、高度和文本 + texts_merged.append( + [ + # X坐标 + numpy.min(coordinates[:, 0]), + # Y坐标 + numpy.min(coordinates[:, 1]), + # 高度 + numpy.max(coordinates[:, 1]) - numpy.min(coordinates[:, 1]), + texts, + ] + ) + + # 按照文本框Y坐标升序(使用空间坐标算法) + texts_merged.sort(key=lambda x: x[1]) + + texts_sorted = [] + + for index, text in enumerate(texts_merged[1:]): + + if index == 0: + + # 初始化当前行 + row = [texts_merged[0]] + + continue + + # 若文本框Y坐标与当前行中最后一个文本框的Y坐标差值小于阈值,则归为同一行 + # noinspection PyUnboundLocalVariable + # noinspection PyTypeChecker + if ( + text[1] - row[-1][1] < numpy.mean([text[2] for text in row]) * 0.5 + ): # 注意NUMPY.NDARRAY和LIST区别,ROW[:, 1]仅适用于NUMPY.NDARRAY,故使用列表推导式计算当前行文本框Y坐标和高度 + + row.append(text) + + # 否则按照文本框X坐标就当前行中文本框升序 + else: + + row_sorted = sorted(row, key=lambda x: x[0]) + + texts_sorted.extend(row_sorted) + + row = [text] + + # 按照文本框X坐标就最后一行中文本框升序 + row_sorted = sorted(row, key=lambda x: x[0]) + + texts_sorted.extend(row_sorted) + + # 返回排序后文本 + return [text_sorted[3] for text_sorted in texts_sorted] + + def identity_card_recognition(self, image_path: str) -> dict: + """居民身份证识别""" + + # 读取影像件(数据类型为NUMPY.NDARRAY) + image = cv2.imread(image_path) + + texts = self.ocr_engine.predict( + image, + use_doc_orientation_classify=False, + use_doc_unwarping=False, + use_textline_orientation=True, + text_rec_score_thresh=0.5, + ) + + # 文本排序 + texts = self._texts_sort(texts) + + # 居民身份证模版 + result = { + "姓名": "", + "性别": "", + "民族": "", + "出生": "", + "住址": "", + "公民身份号码": "", + "有效期限": "", + "签发机关": "", + } + + for text in texts: # 默认只包含一套居民身份证正反面 + + # 姓名 + if not result["姓名"] and "姓名" in text: + + result["姓名"] = text.replace("姓名", "").strip() + + elif "性别" in text or "民族" in text: # 姓名和民族常同时返回 + + # 性别 + if not result["性别"] and "性别" in text: + + result["性别"] = ( + text.split("性别")[-1].strip().split("民族")[0].strip() + ) + + # 民族 + if not result["民族"] and "民族" in text: + + result["民族"] = text.split("民族")[-1].strip() + + # 出生 + elif not result["出生"] and "出生" in text: + + result["出生"] = text.replace("出生", "").strip() + + # 住址 + elif "住址" in text or ( + ( + not any( + keyword in text + for keyword in [ + "姓名", + "性别", + "民族", + "出生", + "公民身份号码", + "中华人民共和国", + "居民身份证", + "签发机关", + "有效期限", + ] + ) + ) + and not re.fullmatch( + r"^(\d{4}[.]\d{2}[.]\d{2})$", text.split("-")[0].strip() + ) + ): + + if not result["住址"] and "住址" in text: + + result["住址"] = text.replace("住址", "").strip() + + if result["住址"] and not "住址" in text: + + result["住址"] += text.strip() + + # 公民身份号码 + elif not result["公民身份号码"] and ("公民身份号码" in text): + + result["公民身份号码"] = text.replace("公民身份号码", "").strip() + + # 有效期限 + elif not result["有效期限"] and ( + "有效期限" in text + or re.fullmatch( + r"^(\d{4}[.]\d{2}[.]\d{2})$", text.split("-")[0].strip() + ) + ): + + result["有效期限"] = text.replace("有效期限", "").strip() + + # 签发机关 + elif not result["签发机关"] and "签发机关" in text: + + result["签发机关"] = text.replace("签发机关", "").strip() + + return result + + def invoice_recognition(self, image_path: str) -> dict: + """增值税发票识别""" + + # 读取影像件(数据类型为NUMPY.NDARRAY) + image = cv2.imread(image_path) + + texts = self.ocr_engine.predict( + image, + use_doc_orientation_classify=False, + use_doc_unwarping=False, + use_textline_orientation=False, + text_rec_score_thresh=0.5, + ) + + # 文本排序 + texts = self._texts_sort(texts) + + print(texts) + + # 增值税发票模版 + result = { + "票据类型": "", + "票据号码": "", + "票据代码": "", + "开票日期": "", + "票据金额": "", + "校验码": "", + "收款方": "", + "付款方": "", + "项目": [], + } + + for i, text in enumerate(texts): + + if not result["票据类型"] and "电子发票" in text: + + result["票据类型"] = "数电发票" + + elif not result["票据号码"] and "发票号码" in text: + + result["票据号码"] = ( + text.replace("发票号码", "") + .replace(":", "") + .replace(":", "") + .strip() + ) + + elif not result["开票日期"] and "开票日期" in text: + + result["开票日期"] = ( + text.replace("开票日期", "") + .replace(":", "") + .replace(":", "") + .strip() + ) + + elif not result["票据金额"] and "小写" in text: + + if re.match( + r"^-?\d+(\.\d+)?$", text.replace("¥", "¥").split("¥")[-1].strip() + ): + + result["票据金额"] = text.replace("¥", "¥").split("¥")[-1].strip() + + elif re.match( + r"^-?\d+(\.\d+)?$", + texts[i + 1].replace("¥", "¥").split("¥")[-1].strip(), + ): + + result["票据金额"] = ( + texts[i + 1].replace("¥", "¥").split("¥")[-1].strip() + ) + + elif "名称" in text and not "项目名称" in text: + + if not result["付款方"]: + + result["付款方"] = ( + text.replace("名称", "") + .replace(":", "") + .replace(":", "") + .strip() + ) + + else: + + result["收款方"] = ( + text.replace("名称", "") + .replace(":", "") + .replace(":", "") + .strip() + ) + + # 项目 + items = [] + + for i, text in enumerate(texts): + + # 通过首位为星号定位名称、规格和单位 + if text.startswith("*"): + + # 项目模版 + # noinspection PyDictCreation + item = { + "名称": "", + "规格": "", + "单位": "", + "数量": "", + "单价": "", + "金额": "", + "税率": "", + "税额": "", + } + + item["名称"] = text.strip("") + + # 若非数值则名称后一项为规格 + if not re.match( + r"^-?\d+(\.\d+)?$", + texts[i + 1].replace("%", "").strip(), + ): + + item["规格"] = texts[i + 1].strip() + + # 若非数值则名称后二项为单位 + if not re.match( + r"^-?\d+(\.\d+)?$", + texts[i + 2].replace("%", "").strip(), + ): + + item["单位"] = texts[i + 2].strip() + + for j, text_ in enumerate(texts): + + # 若内循环索引小于等于外循环索引则跳过 + if j <= i: + + continue + + # 若内循环首位为星号或为小计则将识别结果添加至项目并停止内循环 + if j > i and ( + text_.startswith("*") or text_ in "小计" or text_ in "合计" + ): + + items.append(item) + + break + + # 通过包含百分号定位税率、税额、数量、单价和金额 + if "%" in text_ and re.match( + r"^\d+(\.\d+)?$", + texts[j].replace("%", "").strip(), + ): + + item["税率"] = texts[j].replace("%", "").strip() + "%" + + # 税率后一项为税额 + if re.match( + r"^-?\d+(\.\d+)?$", + texts[j + 1].strip(), + ): + + item["税额"] = texts[j + 1].strip() + + # 税率前一项为金额 + if re.match( + r"^-?\d+(\.\d+)?$", + texts[j - 1].strip(), + ): + + item["金额"] = texts[j - 1].strip() + + # 若金额包含负号,税率前二项为单价、前三项为数量 + if not "-" in item["金额"]: + + if re.match( + r"^\d+(\.\d+)?$", + texts[j - 2].strip(), + ): + + item["单价"] = texts[j - 2].strip() + + if texts[j - 3].strip().isdigit(): + + item["数量"] = texts[j - 3].strip() + + elif j > i + 2 and not re.match( + r"^-?\d+(\.\d+)?$", + text_.replace("%", "").strip(), + ): + + item["名称"] += texts[j].strip() + + # 数值修正 + for item in items: + + if ( + not item["数量"] + and item["金额"] + and not "-" in item["金额"] + and item["单价"] + ): + + item["数量"] = ( + "" + if ( + quantity := int( + (Decimal(item["金额"]) / Decimal(item["单价"])).quantize( + Decimal("0"), rounding=ROUND_HALF_UP + ) + ) + ) + == 0 + else str(quantity) + ) + + result["项目"] = items + + return result diff --git a/utils/operate.py b/utils/operate.py new file mode 100644 index 0000000..e534604 --- /dev/null +++ b/utils/operate.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- + +""" + +脚本说明:基于MySQL、MongoDB、Request和飞书等API封装成常用功能 + +备注: +后续需要考虑优化,后续utils中脚本尽可能相互独立 + +""" + +# 导入模块 + +import json + + +import pandas + +import warnings + +import numpy + +from pydantic import BaseModel, ValidationError, AfterValidator, Field, HttpUrl + +from typing import Optional, Union, Unpack, Literal, Dict, TypedDict, Annotated + +from requests_toolbelt import MultipartEncoder + +import cv2 + +from requests import Session, Response + +from requests.adapters import HTTPAdapter + +from urllib.parse import ( + urlparse, + urlsplit, + urlunsplit, + parse_qs, + quote, + quote_plus, + unquote, + urlencode, +) + +from urllib.request import Request as request, urlopen + +from urllib.util.retry import Retry + +from urllib.error import HTTPError + +from pymongo import MongoClient + + +import os + +import threading + +import time + +from functools import wraps diff --git a/utils/pandas_extension.py b/utils/pandas_extension.py new file mode 100644 index 0000000..a82dec3 --- /dev/null +++ b/utils/pandas_extension.py @@ -0,0 +1,1489 @@ +# -*- coding: utf-8 -*- + +""" + +扩展PANDAS常用功能 + +""" + +# 加载模块 + +from pathlib import Path + +import pandas + +from prettytable import PrettyTable + +from openpyxl import Workbook + +from openpyxl.styles import Font, Alignment, Border, Side, PatternFill + +from openpyxl.utils import get_column_letter + +from pyecharts import __file__ as default_path, options + +from pyecharts.components import Table + +from pyecharts.charts import Scatter, Pie, Bar, Line, HeatMap + +from pyecharts.globals import ThemeType, CurrentConfig + +from jinja2 import Environment, FileSystemLoader + + +def open_csv(file_name): + """打开并读取CSV文件,返回数据体""" + + # 创建路径对象 + file_path = Path(file_name) + + if not file_path.exists(): + + raise FileNotFoundError(f"路径({file_path})不存在") + + if not file_path.is_file(): + + raise IsADirectoryError(f"路径({file_path})非文件") + + if file_path.suffix.lower() != ".csv": + + raise ValueError(f"文件名后缀({file_path.suffix.lower()})非CSV") + + # 使用PANDAS读取CSV文件并返回 + return pandas.read_csv( + filepath_or_buffer=file_path, + # 指定所有列数据类型为字符串(后续人工处理数据类型) + dtype=str, + encoding="utf-8", + # 若遇行解析错误则抛出PARSERERROR + on_bad_lines="error", + ) + + +def traverse_directory(directory_path, suffixes): + """遍历目录根据文件名后缀读取文件,返回数据体""" + + # 创建路径对象 + directory_path = Path(directory_path) + + if not directory_path.exists(): + + raise FileNotFoundError(f"路径({directory_path})不存在") + + if not directory_path.is_dir(): + + raise NotADirectoryError(f"路径({directory_path})非目录") + + if not isinstance(suffixes, list): + + raise TypeError("文件名后缀(suffixes)数据类型非列表") + + dataframe = [] + + for file_path in directory_path.rglob("*"): + + if file_path.is_file() and file_path.suffix.lower() in suffixes: + + dataframe.append( + { + "文件名": file_path.name, + "父目录": file_path.parent.name, + "相对路径": str(file_path.relative_to(".")), + } + ) + + # 创建并返回数据体 + return pandas.DataFrame(dataframe).sort_values(by=["父目录", "文件名"]) + + +def save_as_workbook(worksheets, workbook_name): + """将数据体保存为工作簿""" + + # 创建工作簿 + workbook = Workbook() + + # 删除默认创建的工作表 + for worksheet_name in workbook.sheetnames: + + workbook.remove(workbook[worksheet_name]) + + for worksheet_name, worksheet_dataframe in worksheets: + + # 若工作表数据体为空则跳过 + if not worksheet_dataframe.empty: + + # 创建工作表 + worksheet = workbook.create_sheet(title=worksheet_name) + + # 若为多层行索引则重置行索引(单层行索引则跳过) + if isinstance(worksheet_dataframe.index, pandas.MultiIndex): + + worksheet_dataframe.reset_index(inplace=True) + + # 遍历列索引层 + for level in range(worksheet_dataframe.columns.nlevels): + + # 添加列名行 + worksheet.append( + worksheet_dataframe.columns.get_level_values(level).tolist() + ) # 工作表一般包括标题行、列名行和数据行,在这里仅考虑列名行和数据行 + + # 初始化合并单元格开始列号 + merge_start_column = 0 + + # 初始化上一个单元格值 + previous_cell_value = worksheet.cell( + row=level + 1, column=merge_start_column + 1 + ).value + + for column in range( + len(worksheet_dataframe.columns.get_level_values(level)) + ): + + # 单元格值 + value = worksheet.cell( + row=level + 1, column=column + 1 + ).value # 工作表数据体为PANDAS.DATAFRAME,行号和列号从0开始,工作表为OPENPYXL,行号和列号从1开始 + + if value != previous_cell_value: + + # 合并单元格结束列号 + merge_end_column = column - 1 + + # 判断合并单元格结束列号和合并单元格开始列号的差值是否大于0 + if ( + previous_cell_value + and merge_end_column - merge_start_column > 0 + ): + + # 合并单元格(同行合并) + worksheet.merge_cells( + start_row=level + 1, + end_row=level + 1, + start_column=merge_start_column + 1, + end_column=merge_end_column + 1, + ) + + # 重新赋值合并单元格开始列号 + merge_start_column = column + + # 重新赋值上一单元格值 + previous_cell_value = value + + # 若单元格值和上一个单元格值相同,若列号为最大值则合并单元格 + else: + + # 判断列号是否为最大值 + if ( + column + == len(worksheet_dataframe.columns.get_level_values(level)) + - 1 + ): + + # 重新赋值合并单元格结束列号 + merge_end_column = column + + # 合并单元格(同行合并) + worksheet.merge_cells( + start_row=level + 1, + end_row=level + 1, + start_column=merge_start_column + 1, + end_column=merge_end_column + 1, + ) + + # 若单元格值为空则同列合并 + if not value: + + # 合并单元格(同列合并) + worksheet.merge_cells( + start_row=level, + end_row=level + 1, + start_column=column + 1, + end_column=column + 1, + ) + + # 工作表列宽:24磅 + for column in range(worksheet_dataframe.shape[1]): + + worksheet.column_dimensions[get_column_letter(column + 1)].width = 24 + + # 列名行行高:24磅 + for level in range(worksheet_dataframe.columns.nlevels): + + worksheet.row_dimensions[level + 1].height = 24 + + # 列名行单元格样式 + for column in range( + len(worksheet_dataframe.columns.get_level_values(level)) + ): + + cell = worksheet.cell(level + 1, column + 1) + + # 字体 + cell.font = Font(bold=True, size=12, name="Arial", color="00FFFFFF") + + # 对齐方式 + cell.alignment = Alignment(horizontal="center", vertical="center") + + # 边框 + cell.border = Border( + left=Side(style="thin", color="00333333"), + right=Side(style="thin", color="00333333"), + top=Side(style="thin", color="00333333"), + bottom=Side(style="thin", color="00333333"), + ) + + # 填充 + cell.fill = PatternFill(fill_type="solid", start_color="003366FF") + + for row, row_data in worksheet_dataframe.iterrows(): + + # 数据行行高:20磅 + worksheet.row_dimensions[ + row + worksheet_dataframe.columns.nlevels + 1 + ].height = 20 + + for column in range(worksheet_dataframe.shape[1]): + + cell = worksheet.cell( + row + worksheet_dataframe.columns.nlevels + 1, column + 1 + ) + + # 单元格值 + cell.value = row_data.iloc[column] + + # 字体 + cell.font = Font(size=12, name="Arial", color="00333333") + + # 对齐方式 + cell.alignment = Alignment(horizontal="center", vertical="center") + + cell.border = Border( + left=Side(style="thin", color="00333333"), + right=Side(style="thin", color="00333333"), + top=Side(style="thin", color="00333333"), + bottom=Side(style="thin", color="00333333"), + ) + + workbook.save(workbook_name) + + +def print_table(dataframe, index_as_the_first_column=False, title=None, remark=None): + """将数据体打印为表格""" + + dataframe_temporary = dataframe.copy() + + table = PrettyTable() + + # 若索引作为第一列,则重置数据集索引 + if index_as_the_first_column: + + # 重置索引 + dataframe_temporary.reset_index(inplace=True) + + # 修改第一列列名 + dataframe_temporary.rename(columns={"index": ""}, inplace=True) + + # 列标签 + column_labels = dataframe_temporary.columns.tolist() + + # 赋值给表格列标签 + table.field_names = column_labels + + # 行数据(若数据类型为浮点则保留两位小数并转为字符) + row_data = dataframe_temporary.map( + lambda x: f"{x:.2f}" if isinstance(x, float) else x + ).to_numpy() + + # 赋值给表格行数据 + table.add_rows(row_data) + + # 第一列左对齐,其余列右对齐 + table.align[column_labels[0]] = "l" + + for column_label in column_labels[1:]: + + table.align[column_label] = "r" + + # 若表格标题数据类型为字符则打印 + if isinstance(title, str): + + print(title) + + print(table) + + # 若表格备注数据类型为字符则打印 + if isinstance(remark, str): + + print(remark) + + +class DrawAsHTML: + + def __init__(self): + """绘制图表并保存为HTML""" + + # 全局网页标题 + CurrentConfig.PAGE_TITLE = "DrawAsHTML" + + # 全局环境 + CurrentConfig.GLOBAL_ENV = Environment( + loader=FileSystemLoader( + [ + Path(default_path).parent / "render" / "templates", # 默认模版路径 + Path(__file__).parent / "templates", # 自定义模版路径 + ] + ) + ) + + @staticmethod + def table(dataframe: pandas.DataFrame, file_name: str) -> None: + """表格""" + + # 创建表格,HTML标题默认为全局网页标题 + chart = Table(page_title=CurrentConfig.PAGE_TITLE) + + chart.add( + # 表格表头 + headers=dataframe.columns.tolist(), + # 表格数据 + rows=dataframe.values.tolist(), + ) + + chart.render( + template_name="table.html", + html_content=chart.html_content, + path=file_name, + ) + + # 直角坐标系图表 + + def scatter( + self, + dataframe: pandas.DataFrame, + file_name: str, + xaxis_opts_min: int | float | None = None, + xaxis_opts_max: int | float | None = None, + yaxis_opts_min: int | float | None = None, + yaxis_opts_max: int | float | None = None, + ) -> None: + """散点图""" + + # 根据数据体列数匹配生成散点图方法 + match dataframe.shape[1]: + + # 默认为数据行第一个数据为系列名称、第二个数据为X坐标轴值、第三个数据为Y坐标轴值、第四个数据为标记大小 + case 4: + + pass + + # 创建散点图(默认初始化配置项) + chart = Scatter(init_opts=self._init_opts()) + + chart.set_global_opts( + # 不显示图例 + legend_opts=self._legend_opts(), + # 不显示提示框 + tooltip_opts=self._tooltip_opts(), + # X坐标轴配置项,默认第二个列名作为X坐标轴名称 + xaxis_opts=self._axis_opts( + name=dataframe.columns[1], min_=xaxis_opts_min, max_=xaxis_opts_max + ), + # Y坐标轴配置项,默认第三个列名作为X坐标轴名称 + yaxis_opts=self._axis_opts( + name=dataframe.columns[2], min_=yaxis_opts_min, max_=yaxis_opts_max + ), + ) + + for series_name in dataframe[dataframe.columns[0]].unique(): + + chart.add_xaxis( + xaxis_data=dataframe.loc[ + dataframe[dataframe.columns[0]] == series_name, + dataframe.columns[1], + ].tolist() + ) + + # noinspection PyTypeChecker + chart.add_yaxis( + # 系列名称 + series_name=series_name, + # 系列数据 + y_axis=[ + options.ScatterItem( + # 数据项名称 + name=row.iloc[3], + # 数据项值 + value=[row.iloc[1], row.iloc[2]], + # 标记大小,经过MIN-MAX归一化至20~60 + symbol_size=20 + + 40 + * (row.iloc[3] - dataframe[dataframe.columns[3]].min()) + / ( + dataframe[dataframe.columns[3]].max() + - dataframe[dataframe.columns[3]].min() + ), + # 标签配置项 + label_opts=self._label_opts( + # 标签的位置 + position="right", + # 标签内容格式器,在散点图中:{a}(系列名称),{b}(数据名称),{c}(数值数组) + formatter="{a|{a}}\n{b|{b}}", + # 文字块富文本样式 + rich={ + "a": {"fontWeight": "bold"}, + "b": {"lineHeight": 25}, + }, + ), + ) + for _, row in dataframe[ + dataframe[dataframe.columns[0]] == series_name + ].iterrows() + ], # 使用散点图数据项设置系列数据 + ) + + chart.render(file_name) + + def bar( + self, dataframe: pandas.DataFrame, file_name: str, stack: bool = False + ) -> None: + """柱状图""" + + # 创建柱状图(默认初始化配置项) + chart = Bar(init_opts=self._init_opts()) + + chart.set_global_opts( + # 显示图例 + legend_opts=self._legend_opts(is_show=True if stack else False), + # 不显示提示框 + tooltip_opts=self._tooltip_opts(), + # X坐标轴配置项,默认第二个列名作为X坐标轴名称 + xaxis_opts=self._axis_opts( + type_="category", name=dataframe.columns[1], axislabel_rotate=-30 + ), + # Y坐标轴配置项,默认第三个列名作为Y坐标轴名称 + yaxis_opts=self._axis_opts( + name=dataframe.columns[2], + ), + ) + + chart.add_xaxis( + xaxis_data=dataframe[dataframe.columns[1]].unique().tolist() + ) # 柱状图和散点图在新增X坐标轴方法不相同 + + for series_name in dataframe[dataframe.columns[0]].unique(): + + # noinspection PyTypeChecker + chart.add_yaxis( + # 系列名称 + series_name=series_name, + # 系列数据 + y_axis=[ + options.BarItem( + # 数据项名称 + name=None, + # 数据项值 + value=[row.iloc[1], row.iloc[2]], + # 标签配置项 + label_opts=self._label_opts( + # 标签内容格式器,在柱状图中:{a}(系列名称),{b}(数据名称),{c}(数值数组) + # 显示数据项值的第二个值 + formatter="{value|{@[1]}}", + # 文字块富文本样式 + rich={ + "value": {"fontWeight": "bold"}, + }, + ), + ) + for _, row in dataframe[ + dataframe[dataframe.columns[0]] == series_name + ].iterrows() + ], # 使用散点图数据项设置系列数据 + # 数据堆叠 + stack="bar_stacked" if stack else None, + ) + + chart.render(file_name) + + def line( + self, + dataframe: pandas.DataFrame, + file_name: str, + yaxis_opts_min: int | float | None = None, + yaxis_opts_max: int | float | None = None, + ) -> None: + """折线图""" + + # 创建折线图(默认初始化配置项) + chart = Line(init_opts=self._init_opts()) + + chart.set_global_opts( + # 不显示图例 + legend_opts=self._legend_opts(is_show=True), + # 不显示提示框 + tooltip_opts=self._tooltip_opts(), + # X坐标轴配置项,默认第二个列名作为X坐标轴名称 + xaxis_opts=self._axis_opts( + type_="category", + name=dataframe.columns[1], + boundary_gap=False, + axislabel_rotate=-30, + ), + # Y坐标轴配置项,默认第三个列名作为Y坐标轴名称 + yaxis_opts=self._axis_opts( + name=dataframe.columns[2], + min_=yaxis_opts_min, + max_=yaxis_opts_max, + ), + ) + + chart.add_xaxis(xaxis_data=dataframe[dataframe.columns[1]].unique().tolist()) + + for series_name in dataframe[dataframe.columns[0]].unique(): + + # noinspection PyTypeChecker + chart.add_yaxis( + series_name=series_name, + # 系列数据 + y_axis=[ + options.LineItem( + # 数据项值 + value=[row.iloc[1], row.iloc[2]], + # 标签配置项 + label_opts=self._label_opts( + # 标签的位置 + position="right", + # 标签内容格式器,在折线图中:{a}(系列名称),{b}(数据名称),{c}(数值数组) + # 显示数据项值的第二个值 + formatter="{value|{@[1]}}", + # 文字块富文本样式 + rich={ + "value": {"fontWeight": "bold"}, + }, + ), + ) + for _, row in dataframe[ + dataframe[dataframe.columns[0]] == series_name + ].iterrows() + ], # 使用折线图数据项设置系列数据 + # 是否平滑曲线 + is_smooth=True, + # 线样式配置项,默认线宽为2.5、线的颜色为跟随系列颜色 + linestyle_opts=self._linestyle_opts(width=2.5, color=None), + ) + + chart.render(file_name) + + def area( + self, + dataframe: pandas.DataFrame, + file_name: str, + yaxis_opts_min: int | float | None = None, + yaxis_opts_max: int | float | None = None, + ) -> None: + """面积图""" + + # 创建面积图(默认初始化配置项) + chart = Line(init_opts=self._init_opts()) + + chart.set_global_opts( + # 不显示图例 + legend_opts=self._legend_opts(), + # 不显示提示框 + tooltip_opts=self._tooltip_opts(), + # X坐标轴配置项,默认第一个列名作为X坐标轴名称 + xaxis_opts=self._axis_opts( + type_="category", + name=dataframe.columns[0], + boundary_gap=False, + axislabel_rotate=-30, + ), + # Y坐标轴配置项,默认第二个列名作为Y坐标轴名称 + yaxis_opts=self._axis_opts( + name=dataframe.columns[1], + min_=yaxis_opts_min, + max_=yaxis_opts_max, + ), + ) + + chart.add_xaxis(xaxis_data=dataframe[dataframe.columns[0]].tolist()) + + # noinspection PyTypeChecker + chart.add_yaxis( + series_name="series_name", + # 系列数据 + y_axis=[ + options.LineItem( + # 数据项值 + value=[row.iloc[0], row.iloc[1]], + # 标签配置项 + label_opts=self._label_opts( + # 标签的位置 + position="right", + # 标签内容格式器,在折线图中:{a}(系列名称),{b}(数据名称),{c}(数值数组) + # 显示数据项值的第二个值 + formatter="{value|{@[1]}}", + # 文字块富文本样式 + rich={ + "value": {"fontWeight": "bold"}, + }, + ), + ) + for _, row in dataframe.iterrows() + ], # 使用折线图数据项设置系列数据 + # 是否平滑曲线 + is_smooth=True, + # 线样式配置项,默认线宽为2.5、线的颜色为跟随系列颜色 + linestyle_opts=self._linestyle_opts(width=2.5, color=None), + # 填充区域配置项 + areastyle_opts=self._areastyle_opts(), + ) + + chart.render(file_name) + + # 基本图表 + + def pie( + self, + dataframe: pandas.DataFrame, + file_name: str, + rosetype: str | None = "area", + ) -> None: + """饼图""" + + # 创建饼图(默认初始化配置项) + chart = Pie(init_opts=self._init_opts()) + + chart.set_global_opts( + # 不显示图例 + legend_opts=self._legend_opts(), + # 不显示提示框 + tooltip_opts=self._tooltip_opts(), + ) + + chart.add( + # 系列名称,默认为第一个列名 + series_name=dataframe.columns.tolist()[0], + # 系列数据项,默认为数据行前两个数据 + data_pair=[[row.iloc[0], row.iloc[1]] for _, row in dataframe.iterrows()], + # 是否展示成南丁格尔图,玫瑰图,扇区圆心角相同,通过半径区分组数据大小 + # area:所有扇区圆心角相同,通过半径展现数据大小 + # radius:扇区圆心角展现数据的百分比,半径展现数据的大小 + # None:不展示成南丁格尔图(饼图) + rosetype=rosetype, + # 中心坐标 + center=["50%", "60%"], + # 饼图的扇区顺时针排布 + is_clockwise=True, + # 标签配置项 + label_opts=self._label_opts( + # 标签的位置默认为外部 + position="outside", + # 标签内容格式器,在饼图中:{a}系列名称,{b}数据项名称,{c}数值,{d}百分比 + formatter="{b|{b}}\n{c|{c}}", + # 文字块富文本样式 + rich={"b": {"fontWeight": "bold"}, "c": {"lineHeight": 25}}, + ), + ) + + chart.render(file_name) + + # 全局配置项(先总再分,先全局再系列) + + # 初始化配置项 + def _init_opts( + self, + # 图表画布宽度 + width="500px", + # 图表画布高度 + height="350px", + # 图表主题 + theme=ThemeType.WALDEN, + # 网页标题,默认为全局网页标题 + page_title=CurrentConfig.PAGE_TITLE, + # 图表动画初始化配置 + animation_opts=None, + ): + + # 若图表动画初始化配置为空则为默认配置 + if not animation_opts: + + animation_opts = self._animation_opts() + + return options.InitOpts( + width=width, + height=height, + theme=theme, + page_title=page_title, + animation_opts=animation_opts, + ) + + # 图表动画初始化配置 + @staticmethod + def _animation_opts( + # 是否开启动画 + animation=False, + ): + + return options.AnimationOpts(animation=animation) + + # 图例配置项 + def _legend_opts( + self, + # 图例字体样式,默认为中黑、常规、12像素 + textstyle_opts=None, + # 是否显示图例,默认为不显示 + is_show=False, + # 组件距离容器左侧位置,默认为居中 + pos_left="center", + # 组件距离容器上侧位置,默认为上侧 + pos_top="top", + # 组件布局朝向,默认为水平 + orient="horizontal", + # 组件对齐方式,默认为自动 + align="auto", + # 组件内边距,默认为5 + padding=5, + # 图例标记的图形宽度,默认为10 + item_width=10, + # 图例标记的图形高度,默认为10 + item_height=10, + # 图例关闭时的颜色,默认为中黑 + inactive_color="#86909C", + # 图例的边框线宽 + border_width=0, + ): + + # 若文字样式配置项为空则为默认配置 + if not textstyle_opts: + + textstyle_opts = self._textstyle_opts() + + return options.LegendOpts( + textstyle_opts=textstyle_opts, + is_show=is_show, + pos_left=pos_left, + pos_top=pos_top, + orient=orient, + align=align, + padding=padding, + item_width=item_width, + item_height=item_height, + inactive_color=inactive_color, + border_width=border_width, + ) + + # 提示框配置项 + @staticmethod + def _tooltip_opts( + # 是否显示提示框,默认为不显示 + is_show=False, + ): + + return options.TooltipOpts(is_show=is_show) + + # 标题配置项 + def _title_opts( + self, + # 标题的文字样式,默认为黑、加粗、20 + title_textstyle_opts=None, + # 副标题的文字样式,默认为中黑、常规、14 + subtitle_textstyle_opts=None, + # 否显示标题,默认为不显示 + is_show=False, + # 标题的文本,默认为空 + title=None, + # 副标题的文本,默认为空 + subtitle=None, + # 组件距离容器左侧位置,默认为左侧 + pos_left="left", + # 组件距离容器上侧位置,默认为上侧 + pos_top="top", + # 组件内边距,默认为[上0、右0、下0、左60] + padding=None, + ): + + # 若标题的文字样式配置项为空则为默认配置 + if not title_textstyle_opts: + + title_textstyle_opts = self._textstyle_opts( + color="#1D2129", font_weight="bold", font_size=20 + ) + + # 若标题的文字样式配置项为空则为默认配置 + if not subtitle_textstyle_opts: + + subtitle_textstyle_opts = self._textstyle_opts(font_size=14) + + # 若组件内边距为空则为默认配置 + if not padding: + + padding = [0, 0, 0, 60] + + return options.TitleOpts( + title_textstyle_opts=title_textstyle_opts, + subtitle_textstyle_opts=subtitle_textstyle_opts, + is_show=is_show, + title=title, + subtitle=subtitle, + pos_left=pos_left, + pos_top=pos_top, + padding=padding, + ) + + # 坐标轴配置项 + def _axis_opts( + self, + # 坐标轴轴线配置项,默认为中黑 + axisline_opts=None, + # 坐标轴刻度配置项,默认为中黑 + axistick_opts=None, + # 坐标轴标签配置项-标签旋转 + axislabel_rotate=None, + # 坐标轴名称文字样式,默认为中黑、常规、12像素 + name_textstyle_opts=None, + # 分割线配置项,默认为不显示 + splitline_opts=None, + # 坐标轴类型,默认为数值轴,可选CATEGORY类目轴、TIME时间轴 + type_="value", + # 坐标轴名称,默认为空 + name=None, + # 是否显示坐标轴,默认为显示 + is_show=True, + # 坐标轴名称与轴线之间的距离,默认为15 + name_gap=15, + # 强制设置坐标轴分割间隔 + interval=None, + # 坐标轴的分割段数,默认为5 + split_number=5, + # 坐标轴两边留白策略 + boundary_gap=None, + # 坐标轴刻度最小值 + min_=None, + # 坐标轴刻度最大值 + max_=None, + ): + + # 若坐标轴轴线配置项为空则为默认配置 + if not axisline_opts: + + axisline_opts = self._axisline_opts() + + # 若坐标轴刻度配置项为空则为默认配置 + if not axistick_opts: + + axistick_opts = self._axistick_opts() + + # 若坐标轴名称文字样式为空则为默认配置 + if not name_textstyle_opts: + + name_textstyle_opts = self._textstyle_opts() + + if not splitline_opts: + + splitline_opts = self._splitline_opts() + + return options.AxisOpts( + axisline_opts=axisline_opts, + axistick_opts=axistick_opts, + name_textstyle_opts=name_textstyle_opts, + splitline_opts=splitline_opts, + type_=type_, + name=name, + is_show=is_show, + name_gap=name_gap, + interval=interval, + split_number=split_number, + boundary_gap=boundary_gap, + min_=min_, + max_=max_, + axislabel_opts=self._label_opts(rotate=axislabel_rotate), + ) + + # 坐标轴轴线配置项 + def _axisline_opts( + self, + # 是否显示轴线,默认显示 + is_show=True, + # 线样式配置项,默认为中黑 + linestyle_opts=None, + ): + + # 若线样式配置项为空则为默认配置 + if not linestyle_opts: + + linestyle_opts = self._linestyle_opts() + + return options.AxisLineOpts(is_show=is_show, linestyle_opts=linestyle_opts) + + # 坐标轴刻度配置项 + def _axistick_opts( + self, + # 是否显示坐标轴刻度,默认为否 + is_show=False, + # 线样式配置项,默认为中黑 + linestyle_opts=None, + # 坐标轴刻度是否朝内,默认为朝内 + is_inside=True, + ): + + # 若线样式配置项为空则为默认配置 + if not linestyle_opts: + + linestyle_opts = self._linestyle_opts() + + return options.AxisTickOpts( + is_show=is_show, is_inside=is_inside, linestyle_opts=linestyle_opts + ) + + # 视觉映射配置项 + def _visualmap_opts( + self, + # 指定visualMapPiecewise组件的最大值 + max_=1, + # 如何放置visualMap组件 + orient="horizontal", + # visualMap组件离容器左侧的距离 + pos_left="center", + # visualMap组件离容器上侧的距离 + pos_top="top", + # 数据展示的小数精度 + precision=2, + # 文字样式配置项 + textstyle_opts=None, + ): + + # 若文字样式配置项为空则为默认配置 + if not textstyle_opts: + + textstyle_opts = self._textstyle_opts() + + return options.VisualMapOpts( + max_=max_, + orient=orient, + pos_left=pos_left, + pos_top=pos_top, + precision=precision, + textstyle_opts=textstyle_opts, + ) + + # 系列配置项 + + # 图元样式配置项 + @staticmethod + def _itemstyle_opts( + # 图形透明度 + opacity=0.2, + ): + + return options.ItemStyleOpts(opacity=opacity) + + # 文本样式配置项 + @staticmethod + def _textstyle_opts( + # 文字颜色,默认为中黑 + color="#86909C", + # 文字字体的风格,默认为常规 + font_style="normal", + # 文字字体的粗细,默认为常规 + font_weight="normal", + # 文字字体的系列,默认为苹方 + font_family="PingFang SC", + # 文字字体的大小,默认为12 + font_size=12, + ): + + return options.TextStyleOpts( + color=color, + font_style=font_style, + font_weight=font_weight, + font_family=font_family, + font_size=font_size, + ) + + # 标签配置项 + @staticmethod + def _label_opts( + # 是否显示标签 + is_show=True, + # 标签的位置 + position="inside", + # 文字的颜色,默认为中黑 + color="#86909C", + # 文字的字体大小,默认为12 + font_size=12, + # 文字的字体粗细,默认为常规 + font_weight="normal", + # 文字的字体系列,默认为苹方 + font_family="PingFang SC", + # 标签旋转 + rotate=None, + # 标签内容格式器,默认为空 + formatter=None, + # 文字块背景色,默认为无 + background_color=None, + # 文字块边框颜色 + border_color=None, + # 文字块边框宽度 + border_width=None, + # 文字块圆角 + border_radius=None, + # 文字块富文本样式 + rich=None, + ): + + return options.LabelOpts( + is_show=is_show, + position=position, + color=color, + font_size=font_size, + font_weight=font_weight, + font_family=font_family, + rotate=rotate, + formatter=formatter, + background_color=background_color, + border_color=border_color, + border_width=border_width, + border_radius=border_radius, + rich=rich, + ) + + # 线样式配置项 + @staticmethod + def _linestyle_opts( + # 线宽,默认为1 + width=1, + # 线的类型,默认为实线 + type_="solid", + # 线的颜色,默认为中黑 + color="#86909C", + ): + + return options.LineStyleOpts(width=width, type_=type_, color=color) + + # 分割线配置项 + def _splitline_opts( + self, + # 线样式配置项,默认为虚线、浅黑 + linestyle_opts=None, + # 是否显示分割线,默认为不显示 + is_show=False, + ): + + # 若线样式配置项为空则为默认配置 + if not linestyle_opts: + # 线样式配置项,默认为虚线、浅黑 + linestyle_opts = self._linestyle_opts(type_="dashed", color="#E5E6EB") + + return options.SplitLineOpts(is_show=is_show, linestyle_opts=linestyle_opts) + + # 标记线配置项 + def _markline_opts( + self, + # 标记线数据 + data, + # 响应和触发鼠标事件,默认为不响应 + is_silent=False, + # 标签配置项 + label_opts=None, + # 线样式配置项 + linestyle_opts=None, + ): + + # 若标签配置项为空则默认配置 + if not label_opts: + + label_opts = self._label_opts() + + # 线样式配置项为空则默认配置 + if not linestyle_opts: + # 线样式配置项,默认为虚线、浅黑 + linestyle_opts = self._linestyle_opts(type_="dashed", color="#E5E6EB") + + return options.MarkLineOpts( + is_silent=is_silent, + data=data, + label_opts=label_opts, + linestyle_opts=linestyle_opts, + ) + + # 区域填充样式配置项 + @staticmethod + def _areastyle_opts( + # 图形透明度(0为透明,1为不透明),默认为0.2 + opacity=0.2, + ): + + return options.AreaStyleOpts(opacity) + + +""" + + + + # 条柱和折线图,左侧为条柱图纵坐标,右侧为折线图纵坐标 + case "bar+line": + + # 先生成柱状图再叠加折线图 + + bar = ( + Bar(init_opts=self._init_opts()) + .set_global_opts( + # 显示图例 + legend_opts=self.LegendOpts(is_show=True), + # 不显示提示框 + tooltip_opts=self.TooltipOpts(), + # X轴配置项 + xaxis_opts=self.AxisOpts( + # 以第一个列名作为坐标轴名称 + name=dataframe.columns[0], + # 坐标轴轴线配置项 + axisline_opts=self.AxisLineOpts(), + # 显示刻线 + axistick_opts=self.AxisTickOpts(is_show=True), + # 不显示分割线 + splitline_opts=self.SplitLineOpts(), + # 坐标轴名称配置项,加粗、14像素 + name_textstyle_opts=self.TextStyleOpts( + font_weight="bold", font_size="14px" + ), + # 坐标轴标签配置项 + axislabel_opts=self.LabelOpts( + # 标签旋转30度(逆时针) + rotate=30 + ), + ), + # 不显示轴线 + yaxis_opts=self.AxisOpts( + # 不显示坐标轴 + is_show=False + ), + ) + # 第一列作为X轴 + .add_xaxis( + # 将X轴数值转为字符 + xaxis_data=dataframe[dataframe.columns[0]] + .astype("str") + .tolist() + ) + # 第二列作为左侧Y轴 + .add_yaxis( + # 系列名称 + series_name=dataframe.columns[1], + # Y轴数据 + y_axis=dataframe[dataframe.columns[1]].tolist(), + # 使用的Y轴的索引,存在多个Y轴的时候有用 + yaxis_index=0, + # 系列颜色 + color="#165DFF", + # 同一系列的柱间距离 + category_gap="35%", + # 不同系列的柱间距离 + gap="15%", + # 标签配置项 + label_opts=self.LabelOpts( + # 标签位置,左侧 + position="left", + # 文字字体的粗细,加粗 + font_weight="bold", + ), + ) + .extend_axis( + yaxis=self.AxisOpts( + # 不显示坐标轴 + is_show=False + ) + ) + ) + + line = ( + Line(init_opts=self._init_opts()).add_xaxis( + xaxis_data=dataframe[dataframe.columns[0]] + .astype("str") + .tolist() + ) + # 第三列作为右侧Y轴 + .add_yaxis( + series_name=dataframe.columns[-1], + y_axis=dataframe[dataframe.columns[-1]].tolist(), + yaxis_index=1, + # 系列颜色,碧涛青 + color="#14C9C9", + # 标记的图形 + symbol="circle", + # 标记的大小 + symbol_size=6, + # 是否平滑曲线 + is_smooth=True, + # 折线图在柱状图上 + z_level=1, + # 标签配置项 + label_opts=self.LabelOpts( + # 标签位置,顶部 + position="top", + # 文字字体的粗细,加粗 + font_weight="bold", + ), + ) + ) + + bar.overlap(line).render(path) + + # 双折线图 + case "line+line": + + # 先生成左侧折线图再叠加右侧折线图 + + line_left = ( + Line(init_opts=self._init_opts()) + .set_global_opts( + # 显示图例 + legend_opts=self.LegendOpts(is_show=True), + # 不显示提示框 + tooltip_opts=self.TooltipOpts(), + # X轴配置项 + xaxis_opts=self.AxisOpts( + # 以第一个列名作为坐标轴名称 + name=dataframe.columns[0], + # 坐标轴轴线配置项 + axisline_opts=self.AxisLineOpts(), + # 显示刻线 + axistick_opts=self.AxisTickOpts(is_show=True), + # 不显示分割线 + splitline_opts=self.SplitLineOpts(), + # 坐标轴名称配置项,加粗、14像素 + name_textstyle_opts=self.TextStyleOpts( + font_weight="bold", font_size="14px" + ), + # 坐标轴标签配置项 + axislabel_opts=self.LabelOpts( + # 标签旋转30度(逆时针) + rotate=30 + ), + ), + # 不显示轴线 + yaxis_opts=self.AxisOpts( + # 不显示坐标轴 + is_show=False + ), + ) + # 第一列作为X轴 + .add_xaxis( + # 将X轴数值转为字符 + xaxis_data=dataframe[dataframe.columns[0]] + .astype("str") + .tolist() + ) + # 第二列作为左侧Y轴 + .add_yaxis( + # 系列名称 + series_name=dataframe.columns[1], + y_axis=dataframe[dataframe.columns[1]].tolist(), + # 使用的Y轴的索引,存在多个Y轴的时候有用 + yaxis_index=0, + # 系列颜色 + color="#165DFF", + # 标记的图形 + symbol="circle", + # 标记的大小 + symbol_size=6, + # 是否平滑曲线 + is_smooth=True, + # 标签配置项 + label_opts=self.LabelOpts( + # 标签位置,左侧 + position="left", + # 文字字体的粗细,加粗 + font_weight="bold", + ), + ) + .extend_axis( + yaxis=self.AxisOpts( + # 不显示坐标轴 + is_show=False + ) + ) + ) + + line_right = ( + Line(init_opts=self._init_opts()).add_xaxis( + xaxis_data=dataframe[dataframe.columns[0]] + .astype("str") + .tolist() + ) + # 第三列作为右侧Y轴 + .add_yaxis( + series_name=dataframe.columns[-1], + y_axis=dataframe[dataframe.columns[-1]].tolist(), + yaxis_index=1, + # 系列颜色,碧涛青 + color="#14C9C9", + # 标记的图形 + symbol="circle", + # 标记的大小 + symbol_size=6, + # 是否平滑曲线 + is_smooth=True, + # 折线图在柱状图上 + z_level=1, + # 标签配置项 + label_opts=self.LabelOpts( + # 标签位置,顶部 + position="top", + # 文字字体的粗细,加粗 + font_weight="bold", + ), + ) + ) + + line_left.overlap(line_right).render(path) + + # 热力图 + case "heatmap": + + chart = ( + HeatMap(init_opts=self.InitOpts()) + .set_global_opts( + # 不显示图例 + legend_opts=self.LegendOpts(), + # 不显示提示框 + tooltip_opts=self.TooltipOpts(), + # X轴配置项 + xaxis_opts=self.AxisOpts( + # 坐标轴名称 + name=None, + # 坐标轴轴线配置项 + axisline_opts=self.AxisLineOpts(), + # 显示刻线 + axistick_opts=self.AxisTickOpts(is_show=True), + # 不显示分割线 + splitline_opts=self.SplitLineOpts(), + # 坐标轴名称配置项,加粗、14像素 + name_textstyle_opts=self.TextStyleOpts( + font_weight="bold", font_size="14px" + ), + # 坐标轴标签配置项 + axislabel_opts=self.LabelOpts( + # 标签旋转30度(逆时针) + rotate=30 + ), + ), + # Y轴配置项 + yaxis_opts=self.AxisOpts( + # 坐标轴名称 + name=None, + # 坐标轴轴线配置项 + axisline_opts=self.AxisLineOpts(), + # 显示刻线 + axistick_opts=self.AxisTickOpts(is_show=True), + # 不显示分割线 + splitline_opts=self.SplitLineOpts(), + # 坐标轴名称配置项,加粗、14像素 + name_textstyle_opts=self.TextStyleOpts( + font_weight="bold", font_size="14px" + ), + # 坐标轴标签配置项 + axislabel_opts=self.LabelOpts( + # 标签旋转30度(逆时针) + rotate=30 + ), + ), + visualmap_opts=self.VisualMapOpts(), + ) + .add_xaxis( + # 索引作为X轴 + xaxis_data=dataframe.index.tolist() + ) + .add_yaxis( + # 第三个列名作为系列名称 + series_name=dataframe.columns[2], + # 列名作为Y轴 + yaxis_data=dataframe.columns.tolist(), + # 系列数据项 + value=[ + [i, j, formatter(dataframe.iloc[i, j])] + for i in range(dataframe.shape[0]) + for j in range(dataframe.shape[1]) + ], + # 标签配置项 + label_opts=self.LabelOpts( + # 标签的位置默认为内部 + position="inside" + ), + ) + ) + + chart.render(path) + + + # 堆积柱形图 + case "stacked_bar": + + chart = ( + Bar(init_opts=self._init_opts()).set_global_opts( + # 显示图例 + legend_opts=self.LegendOpts(is_show=True), + # 不显示提示框 + tooltip_opts=self.TooltipOpts(), + # X轴配置项 + xaxis_opts=self.AxisOpts( + # 以第一个列名作为坐标轴名称 + name=None, + # 坐标轴轴线配置项 + axisline_opts=self.AxisLineOpts(), + # 显示刻线 + axistick_opts=self.AxisTickOpts(is_show=True), + # 不显示分割线 + splitline_opts=self.SplitLineOpts(), + # 坐标轴名称配置项,加粗、14像素 + name_textstyle_opts=self.TextStyleOpts( + font_weight="bold", font_size="14px" + ), + # 坐标轴标签配置项 + axislabel_opts=self.LabelOpts( + # 标签旋转30度(逆时针) + rotate=30 + ), + ), + # 不显示轴线 + yaxis_opts=self.AxisOpts( + # 不显示坐标轴 + is_show=False + ), + ) + # 第一列作为X轴 + .add_xaxis( + # 将X轴数值转为字符 + xaxis_data=dataframe[dataframe.columns[0]] + .astype("str") + .tolist() + ) + # 颜色配置项 + .set_colors(paint(dataframe.shape[1] - 1)) + ) + + for column in dataframe.columns[1:]: + + chart.add_yaxis( + # 系列名称 + series_name=column, + # Y轴数据 + y_axis=dataframe[column].tolist(), + stack="stacked_bar", + # 同一系列的柱间距离 + category_gap="35%", + # 不同系列的柱间距离 + gap="15%", + # 标签配置项 + label_opts=self.LabelOpts( + # 标签位置,左侧 + position="left", + # 文字字体的粗细,加粗 + font_weight="bold", + ), + )) + + chart.render(path) + + +""" diff --git a/普康健康审核机器人/pageobject.py b/普康健康审核机器人/pageobject.py new file mode 100644 index 0000000..c6677c0 --- /dev/null +++ b/普康健康审核机器人/pageobject.py @@ -0,0 +1,1385 @@ +# -*- coding: utf-8 -*- + +''' + +脚本说明:基于Selenium封装常用页面操作,例如在当前标签页打开链接 + +遗留问题: + +普康健康自动审核尚未拆解动作 + +''' + +#导入模块 + +from urllib.parse import urlparse + +from selenium.webdriver import ChromeService, ChromeOptions, Chrome + +from selenium.webdriver.support.wait import WebDriverWait + +from selenium.webdriver.support.expected_conditions import presence_of_element_located, presence_of_all_elements_located, element_to_be_clickable, text_to_be_present_in_element, title_is + +from selenium.webdriver.common.by import By + +import re + +import time + +from datetime import datetime + +import json + +#导入普康自动化审核决策模型 +from cognition import Cognition + +import os + +import sys + +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + +from utils.logger import Logger + +from utils.operate import FeishuMail + +#创建日志记录器 +logger = Logger(logger_name = 'pageobject').get_logger() + +''' + +函数说明:初始化浏览器 + +备注: + +暂仅支持一个浏览器包括一个窗口,一个窗口若干标签页(BrowserTab) + +''' + +def Browser(): + + #使用本地浏览器 + service = ChromeService(executable_path = '/usr/local/bin/chromedriver') + + #设置浏览器参数 + options = ChromeOptions() + + #浏览器启用无头模式 + #options.add_argument('--headless') + + #初始化浏览器 + browser = Chrome(service = service, options = options) + + #最大化浏览器窗口 + browser.maximize_window() + + return browser + +''' + +类说明:自定义页面对象模型 + +''' + +class PageObject: + + def __init__(self): + + #实例化浏览 + self.browser = Browser() + + #隐式等待,超时时间设置为30秒,检查间隔设置为1秒 + #self.browser.implicitly_wait(timeout = 30, poll_frequency = 1) + + #显式等待,超时时间设置为30秒,检查间隔设置为1秒 + self.wait = WebDriverWait(driver = self.browser, timeout = 30, poll_frequency = 1) + + #用于记录已完成任务数,一次启动可执行多个任务 + self.tasks = 0 + + #用于保存在任务执行过程中抽取的内容 + self.data = [] + + #用于保存所有抽取内容 + self.dataset = [] + + #在当前标签页打开链接 + def open_link(self, url): + + #断定链接是否包含协议和网络位置 + assert urlparse(url).scheme and urlparse(url).netloc, 'url invalid' + + self.browser.get(url) + + time.sleep(1) + + #点击(如无特殊说明使用XPATH定位元素) + def click(self, xpath): + + #尝试等待定位元素可点击再使用CLIKC方法点击,若超时或发生其它异常则等待定位元素可见再使用JAVASCRIPT方法点击 + try: + + element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) + + element.click() + + time.sleep(1) + + except: + + try: + + element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) + + self.browser.execute_script('arguments[0].click();', element) + + time.sleep(1) + + except Exception as exception: + + raise exception + + #点击并切换至新标签页 + def click_and_switch(self, xpath): + + #获取点击前所有标签页句柄 + window_handles = self.browser.window_handles + + self.click(xpath = xpath) + + #等待至点击后所有标签页句柄数等于点击前加一 + self.wait.until(lambda condition: len(self.browser.window_handles) == len(window_handles) + 1) + + #获取新标签页句柄(暂仅支持点击新建一个标签页的场景) + new_window_handle = [window_handle for window_handle in self.browser.window_handles if window_handle not in window_handles][0] + + #切换至新标签页 + self.browser.switch_to.window(new_window_handle) + + time.sleep(1) + + #选择(适用于非HTML原生SELECT标签的场景) + def select(self, xpaths): + + for xpath in xpaths: + + element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) + + self.browser.execute_script('arguments[0].click();', element) + + time.sleep(1) + + #输入 + def input(self, xpath, content): + + #等待至定位元素出现 + element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) + + #清除定位元素原内容 + element.clear() + + #使用SENDKEYS方法输入 + element.send_keys(content) + + time.sleep(1) + + #关闭模态弹窗(适用于非HTML原生DIALOG弹窗开发场景) + #需要优化!!! + def close_dialog(self): + + #等待至焦点元素文本为指定内容 + self.wait.until(lambda condition: self.browser.execute_script('return document.activeElement;').text.replace(' ', '') in ['确定', '确认', '关闭']) + + #使用JAVASCRIPT方法获取焦点元素 + element = self.browser.execute_script('return document.activeElement;') + + #使用JAVASCRIPT方法点击 + element.click() + + time.sleep(1) + + #关闭当前标签页并切换至上一标签页 + def close_and_switch(self): + + #获取关闭当前标签页前所有标签页句柄 + window_handles = self.browser.window_handles + + #若关闭当前标签页前所有标签页句柄数大于等于2则获取上一标签页句柄、关闭当前标签页并切换至上一标签页,否则切换至第一标签页 + if len(window_handles) >= 2: + + current_window_handle = self.browser.current_window_handle + + target_window_handle = [window_handle for window_handle in window_handles if window_handle != current_window_handle][-1] + + #关闭当前标签页 + self.browser.close() + + #切换至上一标签页 + self.browser.switch_to.window(target_window_handle) + + time.sleep(1) + + else: + + self.browser.switch_to.window(window_handles[0]) + + #关闭除第一标签页以外的所有标签页并切换至第一标签页 + def close_and_switch_to_first(self): + + #获取关闭前所有标签页句柄 + window_handles = self.browser.window_handles + + #根据标签页句柄数减1关闭当前标签页并切换至上一标签页 + for index in range(len(window_handles) - 1): + + self.close_and_switch() + + #抽取数据 + def extract(self, extractions): + + #遍历抽取内容 + #抽取内容包括两种类型,一种针对字段,另一种针对表格 + for extraction in extractions: + + #考虑部分字段在页面中不存在,使用直接定位元素 + try: + + #针对抽取字段 + if isinstance(extraction.get('field'), str): + + #正则匹配字段XPATH最后一个/后面的内容 + matcher = re.search(r'/([^/]*)$', extraction.get('field_xpath')).group(1) + + #根据正则匹配结果匹配解析方法 + match matcher: + + case matcher if 'input' in matcher: + + content = self.browser.find_element(By.XPATH, extraction.get('field_xpath')).get_attribute('value') + + case default: + + content = self.browser.find_element(By.XPATH, extraction.get('field_xpath')).text + + #针对抽取表格字段 + if isinstance(extraction.get('table'), str): + + content = [] + + #遍历表格 + for row_index in range(1, int(self.browser.find_element(By.XPATH, extraction.get('table_xpath')).get_attribute('childElementCount')) + 1): + + row_contents = {} + + #遍历表格字段 + for field in extraction.get('fields'): + + #先尝试使用字段XPATH定位并解析内容,若为空字符则尝试定位至INPUT标签解析内容 + try: + + #基于模版生成字段XPATH + field_xpath = field.get('field_xpath').replace('[index]', '[{}]'.format(row_index)) + + field_content = self.browser.find_element(By.XPATH, field_xpath).text + + if field_content == '': + + #定位至INPUT标签 + field_xpath = '{}//input'.format(field_xpath) + + field_content = self.browser.find_element(By.XPATH, field_xpath).get_attribute('value') + + except: + + field_content = '' + + finally: + + row_contents.update({field.get('field'): field_content}) + + content.append(row_contents) + + except: + + content = '' + + #保存抽取内容 + finally: + + if isinstance(extraction.get('field'), str): + + self.data.append({extraction.get('field'): content}) + + if isinstance(extraction.get('table'), str): + + self.data.append({extraction.get('table'): content}) + + #普康健康-等待至模态弹窗标题是否包含约定,若包含则关闭模态弹窗,若不包含则跳过 + def close_stipulation(self, action): + + logger.info('正在等待 模态弹窗标题 是否包含 约定') + + try: + + #等待至模态弹窗标题包含约定 + WebDriverWait(driver = self.browser, timeout = 5, poll_frequency = 1).until(lambda condition: '约定' in self.browser.find_element(By.XPATH, action.get('text_stipulation_xpath')).text) + + logger.info('关闭模态弹窗') + + self.click(xpath = action.get('button_close_stipulation_xpath')) + + except: + + logger.info('继续') + + #普康健康-拒付票据 + def invoices_refuse(self, action, insurance, cognition = None): + + logger.info('正在点击: 修改信息按钮') + + self.click(xpath = action.get('button_modify_xpath')) + + #根据保险总公司匹配点击修改信息按钮之后的动作 + match insurance: + + #使用瑞泰审核页面 + case '瑞泰人寿保险有限公司': + + logger.info('正在选择: 差错原因') + + self.select(xpaths = action.get('droplist_modify_xpath')) + + logger.info('正在输入: 原因说明 自动化处理') + + self.input(xpath = action.get('textarea_modify_xpath'), content = '自动化处理') + + logger.info('正在点击: 确定按钮') + + self.click(xpath = action.get('button_modification_confirm_xpath')) + + #等待至赔案号加载完成 + self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) + + #等待至票据表格所有元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_invoices_xpath')))) + + #解析票据行数 + indices = int(element.get_attribute('childElementCount')) + + #若COGNITION为空则创建所有的票据索引 + if cognition is None: + + cognition = {'拒付票据索引': {'所有的票据索引': [index + 1 for index in range(indices)]}} + + #遍历票据表格索引 + for index in range(indices): + + index += 1 + + #遍历需要拒付的票据索引键值对 + for key, value in list(cognition.get('拒付票据索引').items()): + + #若票据索引在需要拒付的票据索引中则拒付该张票据 + if index in cognition.get('拒付票据索引').get(key): + + #等待至索引行元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, action.get('field_invoice_identifier_xpath').replace('tr[index]', 'tr[{}]'.format(index)).rsplit('/', 1)[0]))) + + #将索引行移动至可见 + self.browser.execute_script("arguments[0].scrollIntoView(true);", element) + + #点击索引行的行唯一标识 + self.click(xpath = action.get('field_invoice_identifier_xpath').replace('[index]', '[{}]'.format(index))) + + #解析合理金额 + reasonable_amounts = float(self.wait.until(presence_of_element_located((By.XPATH, action.get('field_reasonable_amounts_xpath').replace('[index]', '[{}]'.format(index))))).text) + + #解析部分自费 + part_self_amounts = float(self.wait.until(presence_of_element_located((By.XPATH, action.get('field_part_self_amounts_xpath').replace('[index]', '[{}]'.format(index))))).text) + + #解析全部自费 + all_self_amounts = float(self.wait.until(presence_of_element_located((By.XPATH, action.get('field_all_self_amounts_xpath').replace('[index]', '[{}]'.format(index))))).text) + + #解析不合理金额 + unreasonable_amounts = reasonable_amounts + part_self_amounts + all_self_amounts + + if unreasonable_amounts != 0: + + logger.info('拒付第 {} 张票据'.format(index)) + + logger.info('正在点击: 修改按钮') + + self.click(xpath = action.get('button_invoice_modify_xpath').replace('[index]', '[{}]'.format(index))) + + logger.info('正在输入: 部分自费 0') + + self.input(xpath = action.get('input_invoice_part_self_xpath').replace('[index]', '[{}]'.format(index)), content = 0) + + logger.info('正在输入: 全部自费 0') + + self.input(xpath = action.get('input_invoice_all_self_xpath').replace('[index]', '[{}]'.format(index)), content = 0) + + logger.info('正在输入: 不合理金额 {}'.format(unreasonable_amounts)) + + self.input(xpath = action.get('input_invoice_unreasonable_xpath').replace('[index]', '[{}]'.format(index)), content = unreasonable_amounts) + + match key: + + case '不在保单保障期的票据索引': + + content = '不在保单保障期歉难给付' + + case '交款人非出险人的票据索引': + + content = '非本人发票歉难给付' + + case '已验换开的票据索引': + + content = '已验换开歉难给付' + + case '已验红冲的票据索引': + + content = '已验红冲歉难给付' + + case '已验假票的票据索引': + + content = '已验假票歉难给付' + + case '无法验真的票据索引': + + content = '无法验真歉难给付' + + case default: + + content = '' + + logger.info('正在输入: 票据备注 {}'.format(content)) + + self.input(xpath = action.get('input_invoice_remark_xpath').replace('[index]', '[{}]'.format(index)), content = content) + + logger.info('正在点击: 确定按钮') + + self.click(xpath = action.get('button_invoice_confirm_xpath').replace('[index]', '[{}]'.format(index))) + + logger.info('正在点击: 保存按钮') + + self.click(xpath = action.get('button_save_xpath')) + + logger.info('正在关闭模态弹窗: 保存确认弹窗') + + self.close_dialog() + + logger.info('正在点击: 确认修改按钮') + + self.click(xpath = action.get('button_confirm_xpath')) + + #普康健康选择保单 + def slip_select(self, action, slip_index): + + logger.info('正在判断 保单是否已选择') + + #等待至所选保单复选框元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, '{}/span/input'.format(action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index)))))) + + #若应选保单复选框已选择则跳过,否则选择 + if element.get_attribute('checked') != 'true': + + logger.info('否,选择保单') + + self.click(xpath = action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index))) + + logger.info('正在关闭模态弹窗: 选择保单确认弹窗') + + self.close_dialog() + + logger.info('正在等待提示选择保单成功') + + self.close_stipulation(action = action) + + #等待至赔案号加载完成 + self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) + + #等待至票据表格所有元素加载完成 + WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_invoices_xpath')))) + + else: + + logger.info('是,继续') + + self.click(xpath = action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index))) + + self.click(xpath = action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index))) + + logger.info('正在关闭模态弹窗: 选择保单确认弹窗') + + self.close_dialog() + + logger.info('正在等待提示选择保单成功') + + self.close_stipulation(action = action) + + #等待至赔案号加载完成 + self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) + + #等待至票据表格所有元素加载完成 + WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_invoices_xpath')))) + + #普康健康报案 + def case_report(self, action, insurance, cognition): + + logger.info('正在判断 该赔案是否需要报案') + + if cognition.get('在线报案'): + + logger.info('该赔案需要在线报案') + + logger.info('正在点击: 在线报案按钮') + + self.click(xpath = action.get('button_report_xpath')) + + logger.info('正在点击: 确定按钮') + + self.click(xpath = action.get('button_report_confirm_xpath')) + + logger.info('正在等待提示在线报案成功') + + #等待至提示选择报案成功 + WebDriverWait(driver = self.browser, timeout = 10, poll_frequency = 0.2).until(lambda condition: '成功' in self.browser.find_element(By.XPATH, action.get('toast_report')).text) + + else: + + logger.info('该赔案无需在线报案,继续') + + #普康健康理算 + def adjust(self, action): + + logger.info('正在点击: 理算按钮') + + self.click(xpath = action.get('button_adjust_xpath')) + + logger.info('正在关闭模态弹窗: 理算确认弹窗') + + self.close_dialog() + + logger.info('正在判断 模态弹窗标题 是否包含 警告') + + try: + + #等待至等待至模态弹窗标题包含警告(理算锁,该主被保险人存在未审核赔案,该赔案无法理算) + WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '警告' in self.browser.find_element(By.XPATH, action.get('text_caution_xpath')).text) + + logger.info('是,跳过该赔案') + + return False + + except: + + logger.info('否,继续') + + logger.info('正在判断 模态弹窗标题 是否包含 不一致') + + try: + + #等待至等待至模态弹窗标题包含不一致(例如,票据交款人与出险人不一致) + WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '不一致' in self.browser.find_element(By.XPATH, action.get('text_caution_xpath')).text) + + logger.info('是,关闭模态弹窗') + + self.click(xpath = action.get('button_close_caution_xpath')) + + except: + + logger.info('否,继续') + + logger.info('正在等待提示理算成功') + + #等待至赔案号加载完成 + self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) + + #等待至票据表格所有元素加载完成 + WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_invoices_xpath')))) + + #等待至理算表格所有元素加载完成 + WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_adjustment_xpath')))) + + self.close_stipulation(action = action) + + return True + + #动作解释器。其中,actions为动作组,index为实际索引、默认为空 + def translator(self, actions, index = None): + + #遍历动作 + for action in actions: + + #若实际索引数据类型为整数且包含“[INDEX]”且对象不包含“EXECUTE:”则将其替换为实际索引 + try: + + assert isinstance(index, int) and '[index]' in action.get('object') and 'execute:' not in action.get('object') + + object_ = action.get('object').replace('[index]', '[{}]'.format(index)) + + except: + + object_ = action.get('object') + + #根据动作类型匹配动作内容 + match action.get('action_type'): + + #在当前标签页打开链接 + #动作配置项须包含action_type和object + case 'open_link': + + logger.info('正在当前标签页打开链接: {}'.format(object_)) + + self.open_link(url = object_) + + #点击 + #动作配置项须包含action_type、object_name和object,若first_row_identifier_xpath数据类型为字符其非空字符则先解析再点击,等待条件为第一行唯一标识与点击之前不相同 + case 'click': + + logger.info('正在点击: {}'.format(action.get('object_name'))) + + if isinstance(action.get('first_row_identifier_xpath'), str) and action.get('first_row_identifier_xpath') != '': + + #解析点击之前第一行唯一标识 + first_row_identifier = self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text + + self.click(xpath = object_) + + if isinstance(action.get('first_row_identifier_xpath'), str) and action.get('first_row_identifier_xpath') != '': + + #等待至第一行唯一标识与点击之前不相同 + WebDriverWait(driver = self.browser, timeout = 300, poll_frequency = 1).until(lambda condition: self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text != first_row_identifier) + + #选择 + case 'select': + + logger.info('正在选择: {}'.format(action.get('object_name'))) + + self.select(xpaths = object_) + + #输入 + case 'input': + + #若对象内容包含“EXECUTE:”则执行函数并将返回作为对象内容 + if 'execute:' in action.get('content'): + + content = eval(action.get('content').split(' ', 1)[1]) + + else: + + content = action.get('content') + + logger.info('正在输入: {} {}'.format(action.get('object_name'), content)) + + self.input(xpath = object_, content = content) + + #等待至条件达成或超时 + case 'wait_until': + + if action.get('content') == '': + + content_ = '空字符' + + else: + + content_ = action.get('content') + + match action.get('expected_condition'): + + case 'browser_tab_title_is': + + logger.info('正在等待 标签页标题 为 {}'.format(content_)) + + #等待至标签页标题为指定内容 + self.wait.until(lambda condition: self.browser.title == content_) + + time.sleep(1) + + case 'element_text_is': + + logger.info('正在等待 {} 为 {}'.format(action.get('object_name'), content_)) + + #等待至定位元素为指定内容 + self.wait.until(lambda condition: self.browser.find_element(By.XPATH, object_).text == content_) + + time.sleep(1) + + case 'element_text_is_not': + + logger.info('正在等待 {} 不为 {}'.format(action.get('object_name'), content_)) + + self.wait.until(lambda condition: self.browser.find_element(By.XPATH, object_).text != content_) + + time.sleep(1) + + case 'element_text_is_loaded': + + logger.info('正在等待 {} 加载完成'.format(action.get('object_name'))) + + self.wait.until(presence_of_element_located((By.XPATH, object_))) + + time.sleep(1) + + case 'table_rows_is_not_zero': + + logger.info('正在等待 {} 行数不为0'.format(action.get('object_name'))) + + #等待至表格行数不为0 + WebDriverWait(driver = self.browser, timeout = 300, poll_frequency = 1).until(lambda condition: self.browser.find_element(By.XPATH, object_).get_attribute('childElementCount') != '0') + + time.sleep(1) + + #若未匹配则返假 + case default: + + raise Exception('等待条件未定义') + + #认知(需要补充) + case 'cognize': + + match action.get('cognized_condition'): + + case 'text_is': + + logger.info('正在判断 {} 是否为 {}'.format(action.get('object_name'), action.get('content'))) + + #若定位元素非指定内容则终止后续动作 + if self.browser.find_element(By.XPATH, object_).text == action.get('content'): + + match action.get('meet'): + + case 'pass': + + logger.info('是,跳过') + + pass + + case 'break': + + logger.info('是,终止执行后续动作') + + break + + case default: + + raise Exception('预期结果为是时动作未定义') + + else: + + match action.get('otherwies'): + + case 'pass': + + logger.info('否,跳过') + + pass + + case 'break': + + logger.info('否,终止执行后续动作') + + break + + case default: + + raise Exception('预期结果为不是时动作未定义') + + #普康健康自动化审核 + #后续考虑配置项化 + case 'auto_audit': + + #获取保险总公司 + insurance = action.get('insurance') + + self.close_stipulation(action = action) + + logger.info('正在判断 该赔案是否可自动审核') + + try: + + #实例化普康健康认知模型,获取理算前认知模型 + cognition = Cognition(extractions = self.data).before_adjustment(insurance = insurance) + + #获取赔付结论原因(用于赔付时在结论原因中备注票据拒付等信息) + payment_remark = cognition.get('赔付结论原因') + + assert cognition.get('自动审核') + + except: + + logger.info('该赔案不可自动审核,转人工审核') + + self.close_and_switch() + + return 'failure' + + logger.info('该赔案可自动审核,继续') + + logger.info('正在判断 该赔案是否需要拒付票据') + + try: + + #拒付票据,考虑在拒付时可能需要将合理金额不为0的票据拒付,故抽象若需要拒付票据再根据认知处理 + assert cognition.get('票据拒付') + + logger.info('该赔案需要拒付票据') + + try: + + self.invoices_refuse(action = action, insurance = insurance, cognition = cognition) + + except: + + logger.info('拒付票据发生异常,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + except: + + logger.info('该赔案无需拒付票据,继续') + + logger.info('正在判断 该赔案理算保单') + + try: + + #获取所选保单索引 + slip_index = cognition.get('所选保单索引') + + self.slip_select(action = action, slip_index = slip_index) + + except: + + logger.info('判断理算保单发生异常,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + try: + + self.case_report(action = action, insurance = insurance, cognition = cognition) + + #在线报案发生异常则跳过 + except: + + logger.info('在线报案发生异常,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + #就中银保天津分公司若票据就诊类型:包含药房购药和门急诊就诊则取消门急诊就诊关联药房购药责任 + if cognition.get('所选保单所属保险分公司') == '中银保险有限公司天津分公司': + + #就诊类型包括药店购药和门急诊就诊 + if '药店购药' in [invoice.get('就诊类型') for invoice in cognition.get('转换数据').get('票据信息')] and '门急诊就诊' in [invoice.get('就诊类型') for invoice in cognition.get('转换数据').get('票据信息')]: + + #等待至票据表格所有元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_invoices_xpath')))) + + #解析票据行数 + indices = int(element.get_attribute('childElementCount')) + + #遍历票据表格索引 + for index in range(indices): + + index += 1 + + #等待至索引行元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, action.get('field_invoice_identifier_xpath').replace('tr[index]', 'tr[{}]'.format(index)).rsplit('/', 1)[0]))) + + #将索引行移动至可见 + self.browser.execute_script("arguments[0].scrollIntoView(true);", element) + + #点击索引行的行唯一标识 + self.click(xpath = action.get('field_invoice_identifier_xpath').replace('[index]', '[{}]'.format(index))) + + element = self.wait.until(presence_of_element_located((By.XPATH, '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[7]'.replace('tr[index]', 'tr[{}]'.format(index))))) + + #若该张票据就诊类型为真急诊就诊 + if element.text == '门/急诊': + + time.sleep(1) + + self.click(xpath = '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[9]/div/div/div/div[1]/input'.replace('tr[index]', 'tr[{}]'.format(index))) + + time.sleep(1) + + self.click(xpath = '(/html/body/div/div[1]/div[1]/ul/li[2])[last()]') + + time.sleep(1) + + #就诊类型均为门急诊就诊 + if all([invoice.get('就诊类型') == '门急诊就诊' for invoice in cognition.get('转换数据').get('票据信息')]): + + #等待至票据表格所有元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_invoices_xpath')))) + + #解析票据行数 + indices = int(element.get_attribute('childElementCount')) + + #遍历票据表格索引 + for index in range(indices): + + index += 1 + + #等待至索引行元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, action.get('field_invoice_identifier_xpath').replace('tr[index]', 'tr[{}]'.format(index)).rsplit('/', 1)[0]))) + + #将索引行移动至可见 + self.browser.execute_script("arguments[0].scrollIntoView(true);", element) + + #点击索引行的行唯一标识 + self.click(xpath = action.get('field_invoice_identifier_xpath').replace('[index]', '[{}]'.format(index))) + + element = self.wait.until(presence_of_element_located((By.XPATH, '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[7]'.replace('tr[index]', 'tr[{}]'.format(index))))) + + #若该张票据就诊类型为真急诊就诊 + if element.text == '门/急诊': + + time.sleep(1) + + #先选择 + self.click(xpath = '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[9]/div/div/div/div[1]/input'.replace('tr[index]', 'tr[{}]'.format(index))) + + time.sleep(1) + + #再选择相应责任 + self.click(xpath = '(/html/body/div/div[1]/div[1]/ul/li[1])[last()]') + + time.sleep(1) + + self.click(xpath = '//*[@id="app"]/div/div/section/main/section[1]/div[5]/div[3]/button[7]') + + time.sleep(1) + + try: + + assert self.adjust(action = action) is True + + #理算发生异常则跳过 + except: + + logger.info('理算发生异常,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + logger.info('正在抽取数据') + + time.sleep(3) + + #仅保留影像件抽取内容 + self.data = [{key: value} for extraction in self.data for key, value in extraction.items() if '影像件' in key] + + self.extract(extractions = action.get('extractions')) + + #实例化普康健康认知模型,获取理算后相应认知 + cognition = Cognition(extractions = self.data).after_adjustment(insurance = insurance) + + logger.info('正在判断 该赔案是否赔付') + + #将页面滑动至底部 + self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight);') + + time.sleep(1) + + print(cognition) + + #根据决策结果赔付 + if cognition.get('自动化:审核结论') == 1: + + logger.info('该赔案应该赔付') + + logger.info('正在选择: 理赔结论-赔付') + + self.select(xpaths = action.get('droplist_pay_xpaths')) + + logger.info('正在输入: 结论原因') + + #若足额赔付为否,需要调整赔付是结论原因 + if not cognition.get('足额赔付'): + + payment_remark = payment_remark + '\n累计赔付已达到个人账户年度限额' + + self.input(xpath = action.get('textarea_refuse_remark_xpath'), content = payment_remark) + + else: + + logger.info('该赔案应该拒付') + + logger.info('拒付合理金额、部分自费金额和全部自费金额之和不为0的票据') + + try: + + self.invoices_refuse(action = action, insurance = insurance) + + except Exception as e: + + print(e) + + logger.info('拒付票据发生异常,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + #刷新页面,取消已选保单 + self.browser.refresh() + + #等待至赔案号加载完成 + self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) + + try: + + self.slip_select(action = action, slip_index = slip_index) + + #选择保单发生异常则跳过 + except: + + logger.info('选择保单发生异常,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + try: + + assert self.adjust(action = action) is True + + #理算发生异常则跳过 + except: + + logger.info('理算发生异常,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + logger.info('正在选择: 理赔结论-拒付') + + self.select(xpaths = action.get('droplist_refuse_xpaths')) + + logger.info('正在输入: 结论原因') + + self.input(xpath = action.get('textarea_refuse_remark_xpath'), content = cognition.get('自动化:审核说明')) + + logger.info('正在点击: 通过按钮') + + self.click(xpath = action.get('button_audit_xpath')) + + logger.info('正在判断 模态弹窗标题 是否包含 发票日期超保期') + + try: + + #等待至模态弹窗标题包含发票日期超保期 + WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '发票日期超保期' in self.browser.find_element(By.XPATH, action.get('text_without_assurance_xpath')).text) + + logger.info('是,关闭模态弹窗') + + self.click(xpath = action.get('button_confrim_without_assurance_xpath')) + + except: + + logger.info('否,继续') + + self.close_dialog() + + logger.info('正在判断 模态弹窗标题 是否包含 提示') + + try: + + #等待至模态弹窗标题包含提示 + WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '提示' in self.browser.find_element(By.XPATH, action.get('text_prompt_xpath')).text) + + logger.info('是,关闭模态弹窗') + + self.click(xpath = action.get('button_close_prompt_xpath')) + + except: + + logger.info('否,继续') + + logger.info('正在判断 模态弹窗标题 是否包含 发票关联影像') + + try: + + #等待至提示审核成功 + WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '发票关联影像' in self.browser.find_element(By.XPATH, action.get('text_prompt_invoices_xpath')).text) + + logger.info('是,跳过该赔案') + + self.close_and_switch() + + return 'failure' + + except: + + logger.info('审核成功') + + case default: + + raise Exception('判断条件未定义') + + #关闭模态弹窗 + case 'close_dialog': + + logger.info('正在关闭模态弹窗') + + #默认在模态弹窗中点击无动作预期 + self.close_dialog() + + #点击并切换至新标签页 + case 'click_and_switch': + + logger.info('正在点击 {} 并切换至 {} 标签页'.format(action.get('object_name')[0], action.get('object_name')[1])) + + self.click_and_switch(xpath = object_) + + #关闭当前标签页并切换至上一标签页 + case 'close_and_switch': + + logger.info('正在关闭 {} 标签页并切换至 {} 标签页'.format(action.get('object_name')[0], action.get('object_name')[1])) + + self.close_and_switch() + + #抽取数据 + case 'extract': + + logger.info('正在抽取数据') + + time.sleep(3) + + self.extract(extractions = action.get('extractions')) + + #重做 + #动作组配置项包含:row_identifier_xpath 行唯一标识 + #若行唯一标识包含索引则替换标识: + #适用场景: + #若动作组配置项包含下一分页按钮XPATH,则依次遍历直至达到预期重复数 + #若动作组配置项不包含下一分页按钮XPATH,则重做 + case 'repeat': + + logger.info('重做: {}'.format(action.get('object_name'))) + + #若行唯一标识包含[index]则使用实际索引替换索引标识(index)至达到预期完成任务数或下一分页不可点击 + if '[index]' in action.get('row_identifier_xpath'): + + while True: + + #解析当前分页表格行数 + indices = int(self.browser.find_element(By.XPATH, action.get('table_xpath')).get_attribute('childElementCount')) + + #若当前分页表格行数为0则结束重复 + if indices == 0: + + logger.info('表格行数为0,结束重复') + + break + + #遍历当前分页表格行 + for index in range(1, indices + 1): + + #解析行唯一标识 + row_identifier = self.browser.find_element(By.XPATH, action.get('row_identifier_xpath').replace('[index]', '[{}]'.format(index))).text + + if row_identifier != '': + + logger.info('就 {} 执行任务'.format(row_identifier)) + + else: + + logger.info('就第 {} 个执行任务'.format(index)) + + #点击行唯一标识(用于将该行可见) + self.click(xpath = action.get('row_identifier_xpath').replace('[index]', '[{}]'.format(index))) + + #若执行动作时发生异常则跳过 + try: + + assert self.translator(actions = action.get('actions'), index = index) == 'success' + + logger.info('执行成功,继续下一任务') + + except: + + #尝试关闭除第一个标签页以外的标签页,否则抛出异常 + try: + + logger.info('执行任务时发生异常,跳过并继续下一任务') + + self.close_and_switch_to_first() + + continue + + except Exception as exception: + + raise exception + + #若动作组配置项重下一分页按钮XPATH数据类型为字符且非空字符则判断是否达到预期完成任务数,若达到则终止并跳出循环 + if isinstance(action.get('button_next_xpath'), str) and action.get('button_next_xpath') != '': + + if self.tasks >= action.get('expected_tasks'): + + break + + #若动作组配置项重下一分页按钮XPATH数据类型为字符且非空字符则判断是否达到预期完成任务数或者下一分页按钮不可点击 + if isinstance(action.get('button_next_xpath'), str) and action.get('button_next_xpath') != '': + + if self.tasks >= action.get('expected_tasks'): + + logger.info('达到预期完成任务数,结束重复') + + break + + if self.wait.until(presence_of_element_located((By.XPATH, action.get('button_next_xpath')))).get_attribute('disabled'): + + logger.info('下一分页按钮不可点击,结束重复') + + break + + logger.info('正在点击: 下一分页按钮') + + #解析点击之前第一行唯一标识 + first_row_identifier = self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text + + self.click(xpath = action.get('button_next_xpath')) + + #等待至第一行唯一标识与点击之前不相同 + WebDriverWait(driver = self.browser, timeout = 300, poll_frequency = 1).until(lambda condition: self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text != first_row_identifier) + + #若不满足下一分页按钮XPATH数据类型为字符且非空字符则退出循环(此情况不支持翻页) + else: + + break + + else: + + #预期行索引(用于初始化行索引) + index = action.get('expected_index') + + while True: + + if index > 20: + + self.select(xpaths = action.get('droplist_more_xpaths')) + + try: + + #将索引行移动至可见 + self.browser.execute_script("arguments[0].scrollIntoView(true);", self.wait.until(presence_of_element_located((By.XPATH, action.get('row_xpath').replace('tr[index]', 'tr[{}]'.format(index)))))) + + except: + + logger.info('行唯一标识加载发生异常,停止重复执行动作组') + + break + + #点击行唯一标识(用于将行可见) + self.click(xpath = action.get('row_identifier_xpath').replace('tr[1]', 'tr[{}]'.format(index))) + + #解析行唯一标识 + row_identifier = self.browser.find_element(By.XPATH, action.get('row_identifier_xpath').replace('tr[1]', 'tr[{}]'.format(index))).text + + logger.info('就 {} 执行任务'.format(row_identifier)) + + #若成功执行则刷新并继续以当前行索引执行重复动作组,若无法执行则以下一个行索引执行重复动作组,若发生异常则重新执行重复动作组 + try: + + if self.translator(actions = action.get('actions'), index = index) == 'success': + + while True: + + #刷新页面,等待至行唯一标识与刷新前不一致 + try: + + self.browser.refresh() + + self.select(xpaths = action.get('droplist_more_xpaths')) + + self.wait.until(lambda condition: self.browser.find_element(By.XPATH, action.get('row_identifier_xpath').replace('tr[1]', 'tr[{}]'.format(index))).text != row_identifier) + + logger.info('执行成功') + + break + + except: + + time.sleep(1) + + else: + + index += 1 + + logger.info('执行动作组失败,以下一个行索引执行动作组') + + except: + + try: + + self.close_dialog() + + self.close_and_switch_to_first() + + logger.info('执行动作组发生异常,重新执行该动作组') + + continue + + except Exception as exception: + + raise exception + + #若重复数大于等于预期重复数则终止 + if self.tasks >= action.get('expected_tasks'): + + logger.info('达到预期重复数') + + break + + #重复动作结束 + case 'repeat_finish': + + #将抽取内容保存为本地文件 + with open('data/{}.json'.format(datetime.now().strftime('%y-%m-%d %H-%M-%S')), 'w', encoding= 'utf-8') as file: + + json.dump(self.data, file, ensure_ascii = False) + + #将抽取内容添加至数据集 + self.dataset.append(self.data) + + #重置抽取数据 + self.data = [] + + #重复数自增 + self.tasks += 1 + + #结束 + case 'finish': + + #将所有抽取内容保存为本地文件 + with open('dataset/{}.json'.format(datetime.now().strftime('%y-%m-%d %H-%M-%S')), 'w', encoding= 'utf-8') as file: + + json.dump(self.dataset, file, ensure_ascii = False) + + #若未匹配则返假 + case default: + + raise Exception('动作类型未定义') + + return 'success' + +''' + #等待至票据表格所有元素加载完成 + element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_xpath')))) + + #解析票据行数 + indices = int(element.get_attribute('childElementCount')) + + #若行索引大于当前分页票据表格行数则点击更多按钮 + if index > indices: +'''