# -*- coding: utf-8 -*- ''' 脚本说明:基于Selenium封装常用页面操作,例如在当前标签页打开链接 遗留问题: 普康健康自动审核尚未拆解动作 ''' #导入模块 from urllib.parse import urlparse from selenium.webdriver import ChromeService, ChromeOptions, Chrome from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support.expected_conditions import presence_of_element_located, presence_of_all_elements_located, element_to_be_clickable, text_to_be_present_in_element, title_is from selenium.webdriver.common.by import By import re import time from datetime import datetime import json #导入普康自动化审核决策模型 from cognition import Cognition import os import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from utils.logger import Logger from utils.operate import FeishuMail #创建日志记录器 logger = Logger(logger_name = 'pageobject').get_logger() ''' 函数说明:初始化浏览器 备注: 暂仅支持一个浏览器包括一个窗口,一个窗口若干标签页(BrowserTab) ''' def Browser(): #使用本地浏览器 service = ChromeService(executable_path = '/usr/local/bin/chromedriver') #设置浏览器参数 options = ChromeOptions() #浏览器启用无头模式 #options.add_argument('--headless') #初始化浏览器 browser = Chrome(service = service, options = options) #最大化浏览器窗口 browser.maximize_window() return browser ''' 类说明:自定义页面对象模型 ''' class PageObject: def __init__(self): #实例化浏览 self.browser = Browser() #隐式等待,超时时间设置为30秒,检查间隔设置为1秒 #self.browser.implicitly_wait(timeout = 30, poll_frequency = 1) #显式等待,超时时间设置为30秒,检查间隔设置为1秒 self.wait = WebDriverWait(driver = self.browser, timeout = 30, poll_frequency = 1) #用于记录已完成任务数,一次启动可执行多个任务 self.tasks = 0 #用于保存在任务执行过程中抽取的内容 self.data = [] #用于保存所有抽取内容 self.dataset = [] #在当前标签页打开链接 def open_link(self, url): #断定链接是否包含协议和网络位置 assert urlparse(url).scheme and urlparse(url).netloc, 'url invalid' self.browser.get(url) time.sleep(1) #点击(如无特殊说明使用XPATH定位元素) def click(self, xpath): #尝试等待定位元素可点击再使用CLIKC方法点击,若超时或发生其它异常则等待定位元素可见再使用JAVASCRIPT方法点击 try: element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) element.click() time.sleep(1) except: try: element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) self.browser.execute_script('arguments[0].click();', element) time.sleep(1) except Exception as exception: raise exception #点击并切换至新标签页 def click_and_switch(self, xpath): #获取点击前所有标签页句柄 window_handles = self.browser.window_handles self.click(xpath = xpath) #等待至点击后所有标签页句柄数等于点击前加一 self.wait.until(lambda condition: len(self.browser.window_handles) == len(window_handles) + 1) #获取新标签页句柄(暂仅支持点击新建一个标签页的场景) new_window_handle = [window_handle for window_handle in self.browser.window_handles if window_handle not in window_handles][0] #切换至新标签页 self.browser.switch_to.window(new_window_handle) time.sleep(1) #选择(适用于非HTML原生SELECT标签的场景) def select(self, xpaths): for xpath in xpaths: element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) self.browser.execute_script('arguments[0].click();', element) time.sleep(1) #输入 def input(self, xpath, content): #等待至定位元素出现 element = self.wait.until(presence_of_element_located((By.XPATH, xpath))) #清除定位元素原内容 element.clear() #使用SENDKEYS方法输入 element.send_keys(content) time.sleep(1) #关闭模态弹窗(适用于非HTML原生DIALOG弹窗开发场景) #需要优化!!! def close_dialog(self): #等待至焦点元素文本为指定内容 self.wait.until(lambda condition: self.browser.execute_script('return document.activeElement;').text.replace(' ', '') in ['确定', '确认', '关闭']) #使用JAVASCRIPT方法获取焦点元素 element = self.browser.execute_script('return document.activeElement;') #使用JAVASCRIPT方法点击 element.click() time.sleep(1) #关闭当前标签页并切换至上一标签页 def close_and_switch(self): #获取关闭当前标签页前所有标签页句柄 window_handles = self.browser.window_handles #若关闭当前标签页前所有标签页句柄数大于等于2则获取上一标签页句柄、关闭当前标签页并切换至上一标签页,否则切换至第一标签页 if len(window_handles) >= 2: current_window_handle = self.browser.current_window_handle target_window_handle = [window_handle for window_handle in window_handles if window_handle != current_window_handle][-1] #关闭当前标签页 self.browser.close() #切换至上一标签页 self.browser.switch_to.window(target_window_handle) time.sleep(1) else: self.browser.switch_to.window(window_handles[0]) #关闭除第一标签页以外的所有标签页并切换至第一标签页 def close_and_switch_to_first(self): #获取关闭前所有标签页句柄 window_handles = self.browser.window_handles #根据标签页句柄数减1关闭当前标签页并切换至上一标签页 for index in range(len(window_handles) - 1): self.close_and_switch() #抽取数据 def extract(self, extractions): #遍历抽取内容 #抽取内容包括两种类型,一种针对字段,另一种针对表格 for extraction in extractions: #考虑部分字段在页面中不存在,使用直接定位元素 try: #针对抽取字段 if isinstance(extraction.get('field'), str): #正则匹配字段XPATH最后一个/后面的内容 matcher = re.search(r'/([^/]*)$', extraction.get('field_xpath')).group(1) #根据正则匹配结果匹配解析方法 match matcher: case matcher if 'input' in matcher: content = self.browser.find_element(By.XPATH, extraction.get('field_xpath')).get_attribute('value') case default: content = self.browser.find_element(By.XPATH, extraction.get('field_xpath')).text #针对抽取表格字段 if isinstance(extraction.get('table'), str): content = [] #遍历表格 for row_index in range(1, int(self.browser.find_element(By.XPATH, extraction.get('table_xpath')).get_attribute('childElementCount')) + 1): row_contents = {} #遍历表格字段 for field in extraction.get('fields'): #先尝试使用字段XPATH定位并解析内容,若为空字符则尝试定位至INPUT标签解析内容 try: #基于模版生成字段XPATH field_xpath = field.get('field_xpath').replace('[index]', '[{}]'.format(row_index)) field_content = self.browser.find_element(By.XPATH, field_xpath).text if field_content == '': #定位至INPUT标签 field_xpath = '{}//input'.format(field_xpath) field_content = self.browser.find_element(By.XPATH, field_xpath).get_attribute('value') except: field_content = '' finally: row_contents.update({field.get('field'): field_content}) content.append(row_contents) except: content = '' #保存抽取内容 finally: if isinstance(extraction.get('field'), str): self.data.append({extraction.get('field'): content}) if isinstance(extraction.get('table'), str): self.data.append({extraction.get('table'): content}) #普康健康-等待至模态弹窗标题是否包含约定,若包含则关闭模态弹窗,若不包含则跳过 def close_stipulation(self, action): logger.info('正在等待 模态弹窗标题 是否包含 约定') try: #等待至模态弹窗标题包含约定 WebDriverWait(driver = self.browser, timeout = 5, poll_frequency = 1).until(lambda condition: '约定' in self.browser.find_element(By.XPATH, action.get('text_stipulation_xpath')).text) logger.info('关闭模态弹窗') self.click(xpath = action.get('button_close_stipulation_xpath')) except: logger.info('继续') #普康健康-拒付票据 def invoices_refuse(self, action, insurance, cognition = None): logger.info('正在点击: 修改信息按钮') self.click(xpath = action.get('button_modify_xpath')) #根据保险总公司匹配点击修改信息按钮之后的动作 match insurance: #使用瑞泰审核页面 case '瑞泰人寿保险有限公司': logger.info('正在选择: 差错原因') self.select(xpaths = action.get('droplist_modify_xpath')) logger.info('正在输入: 原因说明 自动化处理') self.input(xpath = action.get('textarea_modify_xpath'), content = '自动化处理') logger.info('正在点击: 确定按钮') self.click(xpath = action.get('button_modification_confirm_xpath')) #等待至赔案号加载完成 self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) #等待至票据表格所有元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_invoices_xpath')))) #解析票据行数 indices = int(element.get_attribute('childElementCount')) #若COGNITION为空则创建所有的票据索引 if cognition is None: cognition = {'拒付票据索引': {'所有的票据索引': [index + 1 for index in range(indices)]}} #遍历票据表格索引 for index in range(indices): index += 1 #遍历需要拒付的票据索引键值对 for key, value in list(cognition.get('拒付票据索引').items()): #若票据索引在需要拒付的票据索引中则拒付该张票据 if index in cognition.get('拒付票据索引').get(key): #等待至索引行元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, action.get('field_invoice_identifier_xpath').replace('tr[index]', 'tr[{}]'.format(index)).rsplit('/', 1)[0]))) #将索引行移动至可见 self.browser.execute_script("arguments[0].scrollIntoView(true);", element) #点击索引行的行唯一标识 self.click(xpath = action.get('field_invoice_identifier_xpath').replace('[index]', '[{}]'.format(index))) #解析合理金额 reasonable_amounts = float(self.wait.until(presence_of_element_located((By.XPATH, action.get('field_reasonable_amounts_xpath').replace('[index]', '[{}]'.format(index))))).text) #解析部分自费 part_self_amounts = float(self.wait.until(presence_of_element_located((By.XPATH, action.get('field_part_self_amounts_xpath').replace('[index]', '[{}]'.format(index))))).text) #解析全部自费 all_self_amounts = float(self.wait.until(presence_of_element_located((By.XPATH, action.get('field_all_self_amounts_xpath').replace('[index]', '[{}]'.format(index))))).text) #解析不合理金额 unreasonable_amounts = reasonable_amounts + part_self_amounts + all_self_amounts if unreasonable_amounts != 0: logger.info('拒付第 {} 张票据'.format(index)) logger.info('正在点击: 修改按钮') self.click(xpath = action.get('button_invoice_modify_xpath').replace('[index]', '[{}]'.format(index))) logger.info('正在输入: 部分自费 0') self.input(xpath = action.get('input_invoice_part_self_xpath').replace('[index]', '[{}]'.format(index)), content = 0) logger.info('正在输入: 全部自费 0') self.input(xpath = action.get('input_invoice_all_self_xpath').replace('[index]', '[{}]'.format(index)), content = 0) logger.info('正在输入: 不合理金额 {}'.format(unreasonable_amounts)) self.input(xpath = action.get('input_invoice_unreasonable_xpath').replace('[index]', '[{}]'.format(index)), content = unreasonable_amounts) match key: case '不在保单保障期的票据索引': content = '不在保单保障期歉难给付' case '交款人非出险人的票据索引': content = '非本人发票歉难给付' case '已验换开的票据索引': content = '已验换开歉难给付' case '已验红冲的票据索引': content = '已验红冲歉难给付' case '已验假票的票据索引': content = '已验假票歉难给付' case '无法验真的票据索引': content = '无法验真歉难给付' case default: content = '' logger.info('正在输入: 票据备注 {}'.format(content)) self.input(xpath = action.get('input_invoice_remark_xpath').replace('[index]', '[{}]'.format(index)), content = content) logger.info('正在点击: 确定按钮') self.click(xpath = action.get('button_invoice_confirm_xpath').replace('[index]', '[{}]'.format(index))) logger.info('正在点击: 保存按钮') self.click(xpath = action.get('button_save_xpath')) logger.info('正在关闭模态弹窗: 保存确认弹窗') self.close_dialog() logger.info('正在点击: 确认修改按钮') self.click(xpath = action.get('button_confirm_xpath')) #普康健康选择保单 def slip_select(self, action, slip_index): logger.info('正在判断 保单是否已选择') #等待至所选保单复选框元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, '{}/span/input'.format(action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index)))))) #若应选保单复选框已选择则跳过,否则选择 if element.get_attribute('checked') != 'true': logger.info('否,选择保单') self.click(xpath = action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index))) logger.info('正在关闭模态弹窗: 选择保单确认弹窗') self.close_dialog() logger.info('正在等待提示选择保单成功') self.close_stipulation(action = action) #等待至赔案号加载完成 self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) #等待至票据表格所有元素加载完成 WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_invoices_xpath')))) else: logger.info('是,继续') self.click(xpath = action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index))) self.click(xpath = action.get('checkbox_select_xpath').replace('[index]', '[{}]'.format(slip_index))) logger.info('正在关闭模态弹窗: 选择保单确认弹窗') self.close_dialog() logger.info('正在等待提示选择保单成功') self.close_stipulation(action = action) #等待至赔案号加载完成 self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) #等待至票据表格所有元素加载完成 WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_invoices_xpath')))) #普康健康报案 def case_report(self, action, insurance, cognition): logger.info('正在判断 该赔案是否需要报案') if cognition.get('在线报案'): logger.info('该赔案需要在线报案') logger.info('正在点击: 在线报案按钮') self.click(xpath = action.get('button_report_xpath')) logger.info('正在点击: 确定按钮') self.click(xpath = action.get('button_report_confirm_xpath')) logger.info('正在等待提示在线报案成功') #等待至提示选择报案成功 WebDriverWait(driver = self.browser, timeout = 10, poll_frequency = 0.2).until(lambda condition: '成功' in self.browser.find_element(By.XPATH, action.get('toast_report')).text) else: logger.info('该赔案无需在线报案,继续') #普康健康理算 def adjust(self, action): logger.info('正在点击: 理算按钮') self.click(xpath = action.get('button_adjust_xpath')) logger.info('正在关闭模态弹窗: 理算确认弹窗') self.close_dialog() logger.info('正在判断 模态弹窗标题 是否包含 警告') try: #等待至等待至模态弹窗标题包含警告(理算锁,该主被保险人存在未审核赔案,该赔案无法理算) WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '警告' in self.browser.find_element(By.XPATH, action.get('text_caution_xpath')).text) logger.info('是,跳过该赔案') return False except: logger.info('否,继续') logger.info('正在判断 模态弹窗标题 是否包含 不一致') try: #等待至等待至模态弹窗标题包含不一致(例如,票据交款人与出险人不一致) WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '不一致' in self.browser.find_element(By.XPATH, action.get('text_caution_xpath')).text) logger.info('是,关闭模态弹窗') self.click(xpath = action.get('button_close_caution_xpath')) except: logger.info('否,继续') logger.info('正在等待提示理算成功') #等待至赔案号加载完成 self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) #等待至票据表格所有元素加载完成 WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_invoices_xpath')))) #等待至理算表格所有元素加载完成 WebDriverWait(driver = self.browser, timeout = 60, poll_frequency = 1).until(lambda condition: presence_of_all_elements_located((By.XPATH, action.get('table_adjustment_xpath')))) self.close_stipulation(action = action) return True #动作解释器。其中,actions为动作组,index为实际索引、默认为空 def translator(self, actions, index = None): #遍历动作 for action in actions: #若实际索引数据类型为整数且包含“[INDEX]”且对象不包含“EXECUTE:”则将其替换为实际索引 try: assert isinstance(index, int) and '[index]' in action.get('object') and 'execute:' not in action.get('object') object_ = action.get('object').replace('[index]', '[{}]'.format(index)) except: object_ = action.get('object') #根据动作类型匹配动作内容 match action.get('action_type'): #在当前标签页打开链接 #动作配置项须包含action_type和object case 'open_link': logger.info('正在当前标签页打开链接: {}'.format(object_)) self.open_link(url = object_) #点击 #动作配置项须包含action_type、object_name和object,若first_row_identifier_xpath数据类型为字符其非空字符则先解析再点击,等待条件为第一行唯一标识与点击之前不相同 case 'click': logger.info('正在点击: {}'.format(action.get('object_name'))) if isinstance(action.get('first_row_identifier_xpath'), str) and action.get('first_row_identifier_xpath') != '': #解析点击之前第一行唯一标识 first_row_identifier = self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text self.click(xpath = object_) if isinstance(action.get('first_row_identifier_xpath'), str) and action.get('first_row_identifier_xpath') != '': #等待至第一行唯一标识与点击之前不相同 WebDriverWait(driver = self.browser, timeout = 300, poll_frequency = 1).until(lambda condition: self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text != first_row_identifier) #选择 case 'select': logger.info('正在选择: {}'.format(action.get('object_name'))) self.select(xpaths = object_) #输入 case 'input': #若对象内容包含“EXECUTE:”则执行函数并将返回作为对象内容 if 'execute:' in action.get('content'): content = eval(action.get('content').split(' ', 1)[1]) else: content = action.get('content') logger.info('正在输入: {} {}'.format(action.get('object_name'), content)) self.input(xpath = object_, content = content) #等待至条件达成或超时 case 'wait_until': if action.get('content') == '': content_ = '空字符' else: content_ = action.get('content') match action.get('expected_condition'): case 'browser_tab_title_is': logger.info('正在等待 标签页标题 为 {}'.format(content_)) #等待至标签页标题为指定内容 self.wait.until(lambda condition: self.browser.title == content_) time.sleep(1) case 'element_text_is': logger.info('正在等待 {} 为 {}'.format(action.get('object_name'), content_)) #等待至定位元素为指定内容 self.wait.until(lambda condition: self.browser.find_element(By.XPATH, object_).text == content_) time.sleep(1) case 'element_text_is_not': logger.info('正在等待 {} 不为 {}'.format(action.get('object_name'), content_)) self.wait.until(lambda condition: self.browser.find_element(By.XPATH, object_).text != content_) time.sleep(1) case 'element_text_is_loaded': logger.info('正在等待 {} 加载完成'.format(action.get('object_name'))) self.wait.until(presence_of_element_located((By.XPATH, object_))) time.sleep(1) case 'table_rows_is_not_zero': logger.info('正在等待 {} 行数不为0'.format(action.get('object_name'))) #等待至表格行数不为0 WebDriverWait(driver = self.browser, timeout = 300, poll_frequency = 1).until(lambda condition: self.browser.find_element(By.XPATH, object_).get_attribute('childElementCount') != '0') time.sleep(1) #若未匹配则返假 case default: raise Exception('等待条件未定义') #认知(需要补充) case 'cognize': match action.get('cognized_condition'): case 'text_is': logger.info('正在判断 {} 是否为 {}'.format(action.get('object_name'), action.get('content'))) #若定位元素非指定内容则终止后续动作 if self.browser.find_element(By.XPATH, object_).text == action.get('content'): match action.get('meet'): case 'pass': logger.info('是,跳过') pass case 'break': logger.info('是,终止执行后续动作') break case default: raise Exception('预期结果为是时动作未定义') else: match action.get('otherwies'): case 'pass': logger.info('否,跳过') pass case 'break': logger.info('否,终止执行后续动作') break case default: raise Exception('预期结果为不是时动作未定义') #普康健康自动化审核 #后续考虑配置项化 case 'auto_audit': #获取保险总公司 insurance = action.get('insurance') self.close_stipulation(action = action) logger.info('正在判断 该赔案是否可自动审核') try: #实例化普康健康认知模型,获取理算前认知模型 cognition = Cognition(extractions = self.data).before_adjustment(insurance = insurance) #获取赔付结论原因(用于赔付时在结论原因中备注票据拒付等信息) payment_remark = cognition.get('赔付结论原因') assert cognition.get('自动审核') except: logger.info('该赔案不可自动审核,转人工审核') self.close_and_switch() return 'failure' logger.info('该赔案可自动审核,继续') logger.info('正在判断 该赔案是否需要拒付票据') try: #拒付票据,考虑在拒付时可能需要将合理金额不为0的票据拒付,故抽象若需要拒付票据再根据认知处理 assert cognition.get('票据拒付') logger.info('该赔案需要拒付票据') try: self.invoices_refuse(action = action, insurance = insurance, cognition = cognition) except: logger.info('拒付票据发生异常,跳过该赔案') self.close_and_switch() return 'failure' except: logger.info('该赔案无需拒付票据,继续') logger.info('正在判断 该赔案理算保单') try: #获取所选保单索引 slip_index = cognition.get('所选保单索引') self.slip_select(action = action, slip_index = slip_index) except: logger.info('判断理算保单发生异常,跳过该赔案') self.close_and_switch() return 'failure' try: self.case_report(action = action, insurance = insurance, cognition = cognition) #在线报案发生异常则跳过 except: logger.info('在线报案发生异常,跳过该赔案') self.close_and_switch() return 'failure' #就中银保天津分公司若票据就诊类型:包含药房购药和门急诊就诊则取消门急诊就诊关联药房购药责任 if cognition.get('所选保单所属保险分公司') == '中银保险有限公司天津分公司': #就诊类型包括药店购药和门急诊就诊 if '药店购药' in [invoice.get('就诊类型') for invoice in cognition.get('转换数据').get('票据信息')] and '门急诊就诊' in [invoice.get('就诊类型') for invoice in cognition.get('转换数据').get('票据信息')]: #等待至票据表格所有元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_invoices_xpath')))) #解析票据行数 indices = int(element.get_attribute('childElementCount')) #遍历票据表格索引 for index in range(indices): index += 1 #等待至索引行元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, action.get('field_invoice_identifier_xpath').replace('tr[index]', 'tr[{}]'.format(index)).rsplit('/', 1)[0]))) #将索引行移动至可见 self.browser.execute_script("arguments[0].scrollIntoView(true);", element) #点击索引行的行唯一标识 self.click(xpath = action.get('field_invoice_identifier_xpath').replace('[index]', '[{}]'.format(index))) element = self.wait.until(presence_of_element_located((By.XPATH, '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[7]'.replace('tr[index]', 'tr[{}]'.format(index))))) #若该张票据就诊类型为真急诊就诊 if element.text == '门/急诊': time.sleep(1) self.click(xpath = '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[9]/div/div/div/div[1]/input'.replace('tr[index]', 'tr[{}]'.format(index))) time.sleep(1) self.click(xpath = '(/html/body/div/div[1]/div[1]/ul/li[2])[last()]') time.sleep(1) #就诊类型均为门急诊就诊 if all([invoice.get('就诊类型') == '门急诊就诊' for invoice in cognition.get('转换数据').get('票据信息')]): #等待至票据表格所有元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_invoices_xpath')))) #解析票据行数 indices = int(element.get_attribute('childElementCount')) #遍历票据表格索引 for index in range(indices): index += 1 #等待至索引行元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, action.get('field_invoice_identifier_xpath').replace('tr[index]', 'tr[{}]'.format(index)).rsplit('/', 1)[0]))) #将索引行移动至可见 self.browser.execute_script("arguments[0].scrollIntoView(true);", element) #点击索引行的行唯一标识 self.click(xpath = action.get('field_invoice_identifier_xpath').replace('[index]', '[{}]'.format(index))) element = self.wait.until(presence_of_element_located((By.XPATH, '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[7]'.replace('tr[index]', 'tr[{}]'.format(index))))) #若该张票据就诊类型为真急诊就诊 if element.text == '门/急诊': time.sleep(1) #先选择 self.click(xpath = '//*[@id="pane-first"]/div/div[3]/table/tbody/tr[index]/td[9]/div/div/div/div[1]/input'.replace('tr[index]', 'tr[{}]'.format(index))) time.sleep(1) #再选择相应责任 self.click(xpath = '(/html/body/div/div[1]/div[1]/ul/li[1])[last()]') time.sleep(1) self.click(xpath = '//*[@id="app"]/div/div/section/main/section[1]/div[5]/div[3]/button[7]') time.sleep(1) try: assert self.adjust(action = action) is True #理算发生异常则跳过 except: logger.info('理算发生异常,跳过该赔案') self.close_and_switch() return 'failure' logger.info('正在抽取数据') time.sleep(3) #仅保留影像件抽取内容 self.data = [{key: value} for extraction in self.data for key, value in extraction.items() if '影像件' in key] self.extract(extractions = action.get('extractions')) #实例化普康健康认知模型,获取理算后相应认知 cognition = Cognition(extractions = self.data).after_adjustment(insurance = insurance) logger.info('正在判断 该赔案是否赔付') #将页面滑动至底部 self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight);') time.sleep(1) print(cognition) #根据决策结果赔付 if cognition.get('自动化:审核结论') == 1: logger.info('该赔案应该赔付') logger.info('正在选择: 理赔结论-赔付') self.select(xpaths = action.get('droplist_pay_xpaths')) logger.info('正在输入: 结论原因') #若足额赔付为否,需要调整赔付是结论原因 if not cognition.get('足额赔付'): payment_remark = payment_remark + '\n累计赔付已达到个人账户年度限额' self.input(xpath = action.get('textarea_refuse_remark_xpath'), content = payment_remark) else: logger.info('该赔案应该拒付') logger.info('拒付合理金额、部分自费金额和全部自费金额之和不为0的票据') try: self.invoices_refuse(action = action, insurance = insurance) except Exception as e: print(e) logger.info('拒付票据发生异常,跳过该赔案') self.close_and_switch() return 'failure' #刷新页面,取消已选保单 self.browser.refresh() #等待至赔案号加载完成 self.wait.until(lambda condition: presence_of_element_located((By.XPATH, action.get('field_case_number_xpath')))) try: self.slip_select(action = action, slip_index = slip_index) #选择保单发生异常则跳过 except: logger.info('选择保单发生异常,跳过该赔案') self.close_and_switch() return 'failure' try: assert self.adjust(action = action) is True #理算发生异常则跳过 except: logger.info('理算发生异常,跳过该赔案') self.close_and_switch() return 'failure' logger.info('正在选择: 理赔结论-拒付') self.select(xpaths = action.get('droplist_refuse_xpaths')) logger.info('正在输入: 结论原因') self.input(xpath = action.get('textarea_refuse_remark_xpath'), content = cognition.get('自动化:审核说明')) logger.info('正在点击: 通过按钮') self.click(xpath = action.get('button_audit_xpath')) logger.info('正在判断 模态弹窗标题 是否包含 发票日期超保期') try: #等待至模态弹窗标题包含发票日期超保期 WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '发票日期超保期' in self.browser.find_element(By.XPATH, action.get('text_without_assurance_xpath')).text) logger.info('是,关闭模态弹窗') self.click(xpath = action.get('button_confrim_without_assurance_xpath')) except: logger.info('否,继续') self.close_dialog() logger.info('正在判断 模态弹窗标题 是否包含 提示') try: #等待至模态弹窗标题包含提示 WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '提示' in self.browser.find_element(By.XPATH, action.get('text_prompt_xpath')).text) logger.info('是,关闭模态弹窗') self.click(xpath = action.get('button_close_prompt_xpath')) except: logger.info('否,继续') logger.info('正在判断 模态弹窗标题 是否包含 发票关联影像') try: #等待至提示审核成功 WebDriverWait(driver = self.browser, timeout = 3, poll_frequency = 1).until(lambda condition: '发票关联影像' in self.browser.find_element(By.XPATH, action.get('text_prompt_invoices_xpath')).text) logger.info('是,跳过该赔案') self.close_and_switch() return 'failure' except: logger.info('审核成功') case default: raise Exception('判断条件未定义') #关闭模态弹窗 case 'close_dialog': logger.info('正在关闭模态弹窗') #默认在模态弹窗中点击无动作预期 self.close_dialog() #点击并切换至新标签页 case 'click_and_switch': logger.info('正在点击 {} 并切换至 {} 标签页'.format(action.get('object_name')[0], action.get('object_name')[1])) self.click_and_switch(xpath = object_) #关闭当前标签页并切换至上一标签页 case 'close_and_switch': logger.info('正在关闭 {} 标签页并切换至 {} 标签页'.format(action.get('object_name')[0], action.get('object_name')[1])) self.close_and_switch() #抽取数据 case 'extract': logger.info('正在抽取数据') time.sleep(3) self.extract(extractions = action.get('extractions')) #重做 #动作组配置项包含:row_identifier_xpath 行唯一标识 #若行唯一标识包含索引则替换标识: #适用场景: #若动作组配置项包含下一分页按钮XPATH,则依次遍历直至达到预期重复数 #若动作组配置项不包含下一分页按钮XPATH,则重做 case 'repeat': logger.info('重做: {}'.format(action.get('object_name'))) #若行唯一标识包含[index]则使用实际索引替换索引标识(index)至达到预期完成任务数或下一分页不可点击 if '[index]' in action.get('row_identifier_xpath'): while True: #解析当前分页表格行数 indices = int(self.browser.find_element(By.XPATH, action.get('table_xpath')).get_attribute('childElementCount')) #若当前分页表格行数为0则结束重复 if indices == 0: logger.info('表格行数为0,结束重复') break #遍历当前分页表格行 for index in range(1, indices + 1): #解析行唯一标识 row_identifier = self.browser.find_element(By.XPATH, action.get('row_identifier_xpath').replace('[index]', '[{}]'.format(index))).text if row_identifier != '': logger.info('就 {} 执行任务'.format(row_identifier)) else: logger.info('就第 {} 个执行任务'.format(index)) #点击行唯一标识(用于将该行可见) self.click(xpath = action.get('row_identifier_xpath').replace('[index]', '[{}]'.format(index))) #若执行动作时发生异常则跳过 try: assert self.translator(actions = action.get('actions'), index = index) == 'success' logger.info('执行成功,继续下一任务') except: #尝试关闭除第一个标签页以外的标签页,否则抛出异常 try: logger.info('执行任务时发生异常,跳过并继续下一任务') self.close_and_switch_to_first() continue except Exception as exception: raise exception #若动作组配置项重下一分页按钮XPATH数据类型为字符且非空字符则判断是否达到预期完成任务数,若达到则终止并跳出循环 if isinstance(action.get('button_next_xpath'), str) and action.get('button_next_xpath') != '': if self.tasks >= action.get('expected_tasks'): break #若动作组配置项重下一分页按钮XPATH数据类型为字符且非空字符则判断是否达到预期完成任务数或者下一分页按钮不可点击 if isinstance(action.get('button_next_xpath'), str) and action.get('button_next_xpath') != '': if self.tasks >= action.get('expected_tasks'): logger.info('达到预期完成任务数,结束重复') break if self.wait.until(presence_of_element_located((By.XPATH, action.get('button_next_xpath')))).get_attribute('disabled'): logger.info('下一分页按钮不可点击,结束重复') break logger.info('正在点击: 下一分页按钮') #解析点击之前第一行唯一标识 first_row_identifier = self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text self.click(xpath = action.get('button_next_xpath')) #等待至第一行唯一标识与点击之前不相同 WebDriverWait(driver = self.browser, timeout = 300, poll_frequency = 1).until(lambda condition: self.browser.find_element(By.XPATH, action.get('first_row_identifier_xpath')).text != first_row_identifier) #若不满足下一分页按钮XPATH数据类型为字符且非空字符则退出循环(此情况不支持翻页) else: break else: #预期行索引(用于初始化行索引) index = action.get('expected_index') while True: if index > 20: self.select(xpaths = action.get('droplist_more_xpaths')) try: #将索引行移动至可见 self.browser.execute_script("arguments[0].scrollIntoView(true);", self.wait.until(presence_of_element_located((By.XPATH, action.get('row_xpath').replace('tr[index]', 'tr[{}]'.format(index)))))) except: logger.info('行唯一标识加载发生异常,停止重复执行动作组') break #点击行唯一标识(用于将行可见) self.click(xpath = action.get('row_identifier_xpath').replace('tr[1]', 'tr[{}]'.format(index))) #解析行唯一标识 row_identifier = self.browser.find_element(By.XPATH, action.get('row_identifier_xpath').replace('tr[1]', 'tr[{}]'.format(index))).text logger.info('就 {} 执行任务'.format(row_identifier)) #若成功执行则刷新并继续以当前行索引执行重复动作组,若无法执行则以下一个行索引执行重复动作组,若发生异常则重新执行重复动作组 try: if self.translator(actions = action.get('actions'), index = index) == 'success': while True: #刷新页面,等待至行唯一标识与刷新前不一致 try: self.browser.refresh() self.select(xpaths = action.get('droplist_more_xpaths')) self.wait.until(lambda condition: self.browser.find_element(By.XPATH, action.get('row_identifier_xpath').replace('tr[1]', 'tr[{}]'.format(index))).text != row_identifier) logger.info('执行成功') break except: time.sleep(1) else: index += 1 logger.info('执行动作组失败,以下一个行索引执行动作组') except: try: self.close_dialog() self.close_and_switch_to_first() logger.info('执行动作组发生异常,重新执行该动作组') continue except Exception as exception: raise exception #若重复数大于等于预期重复数则终止 if self.tasks >= action.get('expected_tasks'): logger.info('达到预期重复数') break #重复动作结束 case 'repeat_finish': #将抽取内容保存为本地文件 with open('data/{}.json'.format(datetime.now().strftime('%y-%m-%d %H-%M-%S')), 'w', encoding= 'utf-8') as file: json.dump(self.data, file, ensure_ascii = False) #将抽取内容添加至数据集 self.dataset.append(self.data) #重置抽取数据 self.data = [] #重复数自增 self.tasks += 1 #结束 case 'finish': #将所有抽取内容保存为本地文件 with open('dataset/{}.json'.format(datetime.now().strftime('%y-%m-%d %H-%M-%S')), 'w', encoding= 'utf-8') as file: json.dump(self.dataset, file, ensure_ascii = False) #若未匹配则返假 case default: raise Exception('动作类型未定义') return 'success' ''' #等待至票据表格所有元素加载完成 element = self.wait.until(presence_of_element_located((By.XPATH, action.get('table_xpath')))) #解析票据行数 indices = int(element.get_attribute('childElementCount')) #若行索引大于当前分页票据表格行数则点击更多按钮 if index > indices: '''