426 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			426 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			Python
		
	
	
	
| # -*- coding: utf-8 -*-
 | ||
| 
 | ||
| '''
 | ||
| 
 | ||
| 脚本名称:
 | ||
| 
 | ||
| 监控数据工厂
 | ||
| 
 | ||
| 脚本说明:
 | ||
| 
 | ||
| 根据监控任务初始化搜索任务,根据搜索任务初始化爬虫并启动
 | ||
| 
 | ||
| '''
 | ||
| 
 | ||
| #加载模块
 | ||
| 
 | ||
| from urllib.parse import quote
 | ||
| 
 | ||
| import time
 | ||
| 
 | ||
| import json
 | ||
| 
 | ||
| from selenium.webdriver import Chrome
 | ||
| 
 | ||
| #from undetected_chromedriver import Chrome
 | ||
| 
 | ||
| from selenium.webdriver.support.wait import WebDriverWait
 | ||
| 
 | ||
| from selenium.webdriver.support import expected_conditions
 | ||
| 
 | ||
| from selenium.webdriver.common.by import By
 | ||
| 
 | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed
 | ||
| 
 | ||
| import sys
 | ||
| 
 | ||
| sys.path.append('..')
 | ||
| 
 | ||
| from utils.request import Feishu
 | ||
| 
 | ||
| '''
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
| '''
 | ||
| 
 | ||
| #根据监控任务初始化搜索任务
 | ||
| def Initialize_search() -> None:
 | ||
| 
 | ||
| 	print('根据监控任务初始化搜索任务...', end = '')
 | ||
| 
 | ||
| 	#列出记录(监控任务)
 | ||
| 	url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblcVxN52hZKT3z1/records'
 | ||
| 
 | ||
| 	monitor_tasks = Feishu().request(url)
 | ||
| 
 | ||
| 	#检查获取监控任务是否成功
 | ||
| 	if monitor_tasks.code != 200:
 | ||
| 
 | ||
| 		print('获取监控任务失败,本次启动终止!')
 | ||
| 		print()
 | ||
| 
 | ||
| 		return None
 | ||
| 
 | ||
| 	monitor_tasks = monitor_tasks.data.get('items')
 | ||
| 
 | ||
| 	#检查监控任务是否为空
 | ||
| 	if monitor_tasks is None:
 | ||
| 
 | ||
| 		print('无监控任务,本次启动终止!')
 | ||
| 		print()
 | ||
| 
 | ||
| 		return None
 | ||
| 
 | ||
| 	#列出记录(搜索任务)
 | ||
| 	url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records'
 | ||
| 
 | ||
| 	search_tasks = Feishu().request(url)
 | ||
| 
 | ||
| 	#检查获取搜索任务是否成功
 | ||
| 	if search_tasks.code != 200:
 | ||
| 
 | ||
| 		print('获取搜索任务失败,本次启动终止!')
 | ||
| 		print()
 | ||
| 
 | ||
| 		return None
 | ||
| 
 | ||
| 	search_tasks = search_tasks.data.get('items')
 | ||
| 
 | ||
| 	#检查搜索任务是否为空
 | ||
| 	if search_tasks is None:
 | ||
| 
 | ||
| 		comparators = []
 | ||
| 
 | ||
| 	else:
 | ||
| 
 | ||
| 		#仅保留监控任务标识、搜索渠道和关键词,用于与添加、删除的搜索任务比较
 | ||
| 		comparators = [{'监控任务标识': search_task.get('fields').get('监控任务标识'), '搜索渠道': search_task.get('fields').get('搜索渠道'), '关键词': search_task.get('fields').get('关键词')} for search_task in search_tasks]
 | ||
| 
 | ||
| 	#用于保存新增的搜索任务
 | ||
| 	records = []
 | ||
| 
 | ||
| 	#定义最近搜索时间为23-12-25 00:00:00(毫秒级时间戳)
 | ||
| 	last_search_time = int(time.mktime(time.strptime('23-12-25', '%y-%m-%d')) * 1000)
 | ||
| 
 | ||
| 	for monitor_task in monitor_tasks:
 | ||
| 
 | ||
| 		#解析监控任务标识
 | ||
| 		record_id = monitor_task.get('record_id')
 | ||
| 
 | ||
| 		#解析搜索渠道
 | ||
| 		search_channels = monitor_task.get('fields').get('搜索渠道')
 | ||
| 
 | ||
| 		#解析关键词(可能存在多个关键词,使用“,”区隔)
 | ||
| 		keywords = monitor_task.get('fields').get('关键词').split(',')
 | ||
| 
 | ||
| 		#遍历搜索渠道和关键词生成新增的搜索任务
 | ||
| 		for search_channel in search_channels:
 | ||
| 
 | ||
| 			for keyword in keywords:
 | ||
| 
 | ||
| 				fields = {
 | ||
| 
 | ||
| 					'监控任务标识': record_id,
 | ||
| 
 | ||
| 					'搜索渠道': search_channel,
 | ||
| 
 | ||
| 					'关键词': keyword
 | ||
| 
 | ||
| 				}
 | ||
| 
 | ||
| 				if fields not in comparators:
 | ||
| 
 | ||
| 					#添加最多搜索页数、搜索周期、最近搜索时间和搜索状态(默认为停用)
 | ||
| 					fields.update({'最多搜索页数': 1, '搜索周期': 168, '最近搜索时间': last_search_time, '搜索状态': '停用'})
 | ||
| 
 | ||
| 					records.append(
 | ||
| 
 | ||
| 						{
 | ||
| 
 | ||
| 							'fields': fields
 | ||
| 
 | ||
| 						}
 | ||
| 
 | ||
| 					)
 | ||
| 
 | ||
| 	#检查新增的搜索任务是否为空
 | ||
| 	if records != []:
 | ||
| 
 | ||
| 		#新增多条记录
 | ||
| 		url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_create'
 | ||
| 
 | ||
| 		response = Feishu().request(url, 'post', {'records': records})
 | ||
| 
 | ||
| 		#检查新增搜索任务是否成功
 | ||
| 		if response.code != 200:
 | ||
| 
 | ||
| 			print('新增搜索任务失败,本次启动终止!')
 | ||
| 			print()
 | ||
| 
 | ||
| 			return None
 | ||
| 
 | ||
| 	#检查搜索任务是否非空
 | ||
| 	if search_tasks is not None:
 | ||
| 
 | ||
| 		#生成删除的搜索任务
 | ||
| 		records = [search_task.get('record_id') for search_task in search_tasks if search_task.get('fields').get('监控任务标识') is None]
 | ||
| 
 | ||
| 		if records != []:
 | ||
| 
 | ||
| 			#删除多条记录
 | ||
| 			url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_delete'
 | ||
| 
 | ||
| 			response = Feishu().request(url, 'post', {'records': records})
 | ||
| 
 | ||
| 			#检查删除搜索任务是否成功
 | ||
| 			if response.code != 200:
 | ||
| 
 | ||
| 				print('删除搜索任务失败,本次启动终止!')
 | ||
| 				print()
 | ||
| 
 | ||
| 				return None
 | ||
| 
 | ||
| 	print('初始化成功。')
 | ||
| 	print()
 | ||
| 
 | ||
| #根据搜索任务启动爬虫
 | ||
| def Spider(search_task) -> list:
 | ||
| 
 | ||
| 	records = []
 | ||
| 
 | ||
| 	#搜索链接
 | ||
| 	search_url = search_task.get('search_url')
 | ||
| 
 | ||
| 	print('正在爬取 %s ...' % search_url, end = '')
 | ||
| 
 | ||
| 	try:
 | ||
| 
 | ||
| 		#初始化chrome
 | ||
| 		#chrome = Chrome(headless = False, use_subprocess = True)
 | ||
| 
 | ||
| 		chrome = Chrome()
 | ||
| 
 | ||
| 		chrome.get(search_url)
 | ||
| 
 | ||
| 		#搜索渠道
 | ||
| 		search_channel = search_task.get('search_channel')
 | ||
| 
 | ||
| 		match search_channel:
 | ||
| 
 | ||
| 			case '百度':
 | ||
| 
 | ||
| 				#定位元素
 | ||
| 				elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"c-title t t tts-title")]/a')))
 | ||
| 
 | ||
| 			case '搜狗':
 | ||
| 
 | ||
| 				elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"vr-title")]/a')))
 | ||
| 
 | ||
| 			case '360':
 | ||
| 
 | ||
| 				elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"res-title")]/a')))
 | ||
| 
 | ||
| 		#解析监控对象
 | ||
| 		monitor_object = search_task.get('monitor_object')
 | ||
| 
 | ||
| 		#解析关键词
 | ||
| 		keyword = search_task.get('keyword')
 | ||
| 
 | ||
| 		#定义搜索时间
 | ||
| 		search_time = int(time.time() * 1000)
 | ||
| 
 | ||
| 		for element in elements:
 | ||
| 
 | ||
| 			records.append(
 | ||
| 
 | ||
| 				{
 | ||
| 
 | ||
| 					'monitor_object': monitor_object,
 | ||
| 
 | ||
| 					'search_channel': search_channel,
 | ||
| 
 | ||
| 					'keyword': keyword,
 | ||
| 
 | ||
| 					'search_url': search_url,
 | ||
| 
 | ||
| 					'fields':
 | ||
| 
 | ||
| 					{
 | ||
| 
 | ||
| 						#搜索结果标题
 | ||
| 						'title': element.get_attribute('outerText'),
 | ||
| 
 | ||
| 						#搜索结果连接
 | ||
| 						'hyperlink': element.get_attribute('href')
 | ||
| 
 | ||
| 					},
 | ||
| 
 | ||
| 					'search_time': search_time
 | ||
| 
 | ||
| 				}
 | ||
| 
 | ||
| 			)
 | ||
| 
 | ||
| 		print('已完成。')
 | ||
| 		print()
 | ||
| 
 | ||
| 	except:
 | ||
| 
 | ||
| 		print('爬取失败!')
 | ||
| 		print()
 | ||
| 
 | ||
| 	finally:
 | ||
| 
 | ||
| 		try:
 | ||
| 
 | ||
| 			chrome.quit()
 | ||
| 
 | ||
| 		except:
 | ||
| 
 | ||
| 			pass   
 | ||
| 
 | ||
| 		return records
 | ||
| 
 | ||
| #根据搜索任务初始化爬取任务
 | ||
| def Initialize_crawl() -> None:
 | ||
| 
 | ||
| 	print('根据搜索任务初始化爬取任务...', end = '')
 | ||
| 
 | ||
| 	#列出记录(搜索状态为「启用」的搜索任务)
 | ||
| 	url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records?filter={}'.format(quote('CurrentValue.[搜索状态]="启用"'))
 | ||
| 
 | ||
| 	search_tasks = Feishu().request(url)
 | ||
| 
 | ||
| 	#检查获取启用的搜索任务是否成功
 | ||
| 	if search_tasks.code != 200:
 | ||
| 
 | ||
| 		print('获取启用的搜索任务失败,本次启动终止!')
 | ||
| 		print()
 | ||
| 
 | ||
| 		return None
 | ||
| 
 | ||
| 	search_tasks = search_tasks.data.get('items')
 | ||
| 
 | ||
| 	#检查启用的搜索任务是否为空
 | ||
| 	if search_tasks is None:
 | ||
| 
 | ||
| 		print('无启用的搜索任务,本次启动终止!')
 | ||
| 		print()
 | ||
| 
 | ||
| 		return None
 | ||
| 
 | ||
| 	#用于记录爬取任务
 | ||
| 	crawl_tasks = []
 | ||
| 
 | ||
| 	for search_task in search_tasks:
 | ||
| 
 | ||
| 		#解析最近搜索时间
 | ||
| 		last_search_time = search_task.get('fields').get('最近搜索时间')
 | ||
| 
 | ||
| 		#解析搜索周期
 | ||
| 		search_period = int(search_task.get('fields').get('搜索周期')
 | ||
| 
 | ||
| 		#若最近搜索时间距现在的小时数超过搜索周期则生成爬取任务
 | ||
| 		if (int(time.time() * 1000) - last_search_time) / 3600000 >= search_period):
 | ||
| 
 | ||
| 			#解析搜索任务标识
 | ||
| 			record_id = search_task.get('record_id')
 | ||
| 
 | ||
| 			#解析搜索渠道
 | ||
| 			search_channel = search_task.get('fields').get('搜索渠道')
 | ||
| 
 | ||
| 			#解析关键词
 | ||
| 			keyword = search_task.get('fields').get('关键词')
 | ||
| 
 | ||
| 			#解析最多搜索页数
 | ||
| 			maximun_pages = int(search_task.get('fields').get('最多搜索页数'))
 | ||
| 
 | ||
| 			#根据最多搜索页数生成爬取任务
 | ||
| 			for page in range(maximun_pages):
 | ||
| 
 | ||
| 				#仅考虑通过链接爬取数据
 | ||
| 				match search_channel:
 | ||
| 
 | ||
| 					case '百度':
 | ||
| 
 | ||
| 						url = 'https://www.baidu.com/s?wd={}&ie=utf-8&pn={}'.format(quote(keyword), page * 10)
 | ||
| 
 | ||
| 						xpath = '//h3[contains(@class,"c-title t t tts-title")]/a'
 | ||
| 
 | ||
| 					case '搜狗':
 | ||
| 
 | ||
| 						#ie为关键词编码
 | ||
| 						url = 'https://www.sogou.com/web?query={}&page={}&ie=utf8'.format(quote(keyword), page + 1)
 | ||
| 
 | ||
| 						xpath = '//h3[contains(@class,"vr-title")]/a'
 | ||
| 
 | ||
| 					case '360':
 | ||
| 
 | ||
| 						url = 'https://www.so.com/s?q={}&pn={}'.format(quote(keyword), page + 1)
 | ||
| 
 | ||
| 						xpath = '//h3[contains(@class,"res-title")]/a'
 | ||
| 
 | ||
| 				crawl_tasks.append(
 | ||
| 
 | ||
| 					{
 | ||
| 
 | ||
| 						'monitor_task':
 | ||
| 
 | ||
| 							{
 | ||
| 
 | ||
| 								'record_id': 
 | ||
| 
 | ||
| 							},
 | ||
| 
 | ||
| 						'search_task':
 | ||
| 
 | ||
| 							{
 | ||
| 
 | ||
| 								'search_channel': search_channel,
 | ||
| 
 | ||
| 								'keyword': keyword,
 | ||
| 
 | ||
| 								'search_url': search_url,
 | ||
| 
 | ||
| 							}
 | ||
| 
 | ||
| 					}
 | ||
| 
 | ||
| 				)
 | ||
| 
 | ||
| 	print('初始化成功。')
 | ||
| 	print()
 | ||
| 
 | ||
| 	print('启动爬虫... \\')
 | ||
| 	print()
 | ||
| 
 | ||
| 	#创建线程池
 | ||
| 	with ThreadPoolExecutor(max_workers = 2) as executor:
 | ||
| 
 | ||
| 		threads = []
 | ||
| 
 | ||
| 		for search_task in search_tasks:
 | ||
| 
 | ||
| 			thread = executor.submit(Spider, search_task)
 | ||
| 
 | ||
| 			#将搜索任务提交至进程池
 | ||
| 			threads.append(thread)
 | ||
| 
 | ||
| 		results = []
 | ||
| 
 | ||
| 		for thread in as_completed(threads):
 | ||
| 
 | ||
| 			result = thread.result()
 | ||
| 
 | ||
| 			#若爬虫返回非空则添加至结果
 | ||
| 			if result != []:
 | ||
| 
 | ||
| 				results.extend(result)
 | ||
| 
 | ||
| 	print(results)
 | ||
| 
 | ||
| 
 | ||
| print(Initialize_crawl())
 | ||
| 
 |