426 lines
9.0 KiB
Python
426 lines
9.0 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
'''
|
||
|
||
脚本名称:
|
||
|
||
监控数据工厂
|
||
|
||
脚本说明:
|
||
|
||
根据监控任务初始化搜索任务,根据搜索任务初始化爬虫并启动
|
||
|
||
'''
|
||
|
||
#加载模块
|
||
|
||
from urllib.parse import quote
|
||
|
||
import time
|
||
|
||
import json
|
||
|
||
from selenium.webdriver import Chrome
|
||
|
||
#from undetected_chromedriver import Chrome
|
||
|
||
from selenium.webdriver.support.wait import WebDriverWait
|
||
|
||
from selenium.webdriver.support import expected_conditions
|
||
|
||
from selenium.webdriver.common.by import By
|
||
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
|
||
import sys
|
||
|
||
sys.path.append('..')
|
||
|
||
from utils.request import Feishu
|
||
|
||
'''
|
||
|
||
|
||
|
||
'''
|
||
|
||
#根据监控任务初始化搜索任务
|
||
def Initialize_search() -> None:
|
||
|
||
print('根据监控任务初始化搜索任务...', end = '')
|
||
|
||
#列出记录(监控任务)
|
||
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblcVxN52hZKT3z1/records'
|
||
|
||
monitor_tasks = Feishu().request(url)
|
||
|
||
#检查获取监控任务是否成功
|
||
if monitor_tasks.code != 200:
|
||
|
||
print('获取监控任务失败,本次启动终止!')
|
||
print()
|
||
|
||
return None
|
||
|
||
monitor_tasks = monitor_tasks.data.get('items')
|
||
|
||
#检查监控任务是否为空
|
||
if monitor_tasks is None:
|
||
|
||
print('无监控任务,本次启动终止!')
|
||
print()
|
||
|
||
return None
|
||
|
||
#列出记录(搜索任务)
|
||
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records'
|
||
|
||
search_tasks = Feishu().request(url)
|
||
|
||
#检查获取搜索任务是否成功
|
||
if search_tasks.code != 200:
|
||
|
||
print('获取搜索任务失败,本次启动终止!')
|
||
print()
|
||
|
||
return None
|
||
|
||
search_tasks = search_tasks.data.get('items')
|
||
|
||
#检查搜索任务是否为空
|
||
if search_tasks is None:
|
||
|
||
comparators = []
|
||
|
||
else:
|
||
|
||
#仅保留监控任务标识、搜索渠道和关键词,用于与添加、删除的搜索任务比较
|
||
comparators = [{'监控任务标识': search_task.get('fields').get('监控任务标识'), '搜索渠道': search_task.get('fields').get('搜索渠道'), '关键词': search_task.get('fields').get('关键词')} for search_task in search_tasks]
|
||
|
||
#用于保存新增的搜索任务
|
||
records = []
|
||
|
||
#定义最近搜索时间为23-12-25 00:00:00(毫秒级时间戳)
|
||
last_search_time = int(time.mktime(time.strptime('23-12-25', '%y-%m-%d')) * 1000)
|
||
|
||
for monitor_task in monitor_tasks:
|
||
|
||
#解析监控任务标识
|
||
record_id = monitor_task.get('record_id')
|
||
|
||
#解析搜索渠道
|
||
search_channels = monitor_task.get('fields').get('搜索渠道')
|
||
|
||
#解析关键词(可能存在多个关键词,使用“,”区隔)
|
||
keywords = monitor_task.get('fields').get('关键词').split(',')
|
||
|
||
#遍历搜索渠道和关键词生成新增的搜索任务
|
||
for search_channel in search_channels:
|
||
|
||
for keyword in keywords:
|
||
|
||
fields = {
|
||
|
||
'监控任务标识': record_id,
|
||
|
||
'搜索渠道': search_channel,
|
||
|
||
'关键词': keyword
|
||
|
||
}
|
||
|
||
if fields not in comparators:
|
||
|
||
#添加最多搜索页数、搜索周期、最近搜索时间和搜索状态(默认为停用)
|
||
fields.update({'最多搜索页数': 1, '搜索周期': 168, '最近搜索时间': last_search_time, '搜索状态': '停用'})
|
||
|
||
records.append(
|
||
|
||
{
|
||
|
||
'fields': fields
|
||
|
||
}
|
||
|
||
)
|
||
|
||
#检查新增的搜索任务是否为空
|
||
if records != []:
|
||
|
||
#新增多条记录
|
||
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_create'
|
||
|
||
response = Feishu().request(url, 'post', {'records': records})
|
||
|
||
#检查新增搜索任务是否成功
|
||
if response.code != 200:
|
||
|
||
print('新增搜索任务失败,本次启动终止!')
|
||
print()
|
||
|
||
return None
|
||
|
||
#检查搜索任务是否非空
|
||
if search_tasks is not None:
|
||
|
||
#生成删除的搜索任务
|
||
records = [search_task.get('record_id') for search_task in search_tasks if search_task.get('fields').get('监控任务标识') is None]
|
||
|
||
if records != []:
|
||
|
||
#删除多条记录
|
||
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_delete'
|
||
|
||
response = Feishu().request(url, 'post', {'records': records})
|
||
|
||
#检查删除搜索任务是否成功
|
||
if response.code != 200:
|
||
|
||
print('删除搜索任务失败,本次启动终止!')
|
||
print()
|
||
|
||
return None
|
||
|
||
print('初始化成功。')
|
||
print()
|
||
|
||
#根据搜索任务启动爬虫
|
||
def Spider(search_task) -> list:
|
||
|
||
records = []
|
||
|
||
#搜索链接
|
||
search_url = search_task.get('search_url')
|
||
|
||
print('正在爬取 %s ...' % search_url, end = '')
|
||
|
||
try:
|
||
|
||
#初始化chrome
|
||
#chrome = Chrome(headless = False, use_subprocess = True)
|
||
|
||
chrome = Chrome()
|
||
|
||
chrome.get(search_url)
|
||
|
||
#搜索渠道
|
||
search_channel = search_task.get('search_channel')
|
||
|
||
match search_channel:
|
||
|
||
case '百度':
|
||
|
||
#定位元素
|
||
elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"c-title t t tts-title")]/a')))
|
||
|
||
case '搜狗':
|
||
|
||
elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"vr-title")]/a')))
|
||
|
||
case '360':
|
||
|
||
elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"res-title")]/a')))
|
||
|
||
#解析监控对象
|
||
monitor_object = search_task.get('monitor_object')
|
||
|
||
#解析关键词
|
||
keyword = search_task.get('keyword')
|
||
|
||
#定义搜索时间
|
||
search_time = int(time.time() * 1000)
|
||
|
||
for element in elements:
|
||
|
||
records.append(
|
||
|
||
{
|
||
|
||
'monitor_object': monitor_object,
|
||
|
||
'search_channel': search_channel,
|
||
|
||
'keyword': keyword,
|
||
|
||
'search_url': search_url,
|
||
|
||
'fields':
|
||
|
||
{
|
||
|
||
#搜索结果标题
|
||
'title': element.get_attribute('outerText'),
|
||
|
||
#搜索结果连接
|
||
'hyperlink': element.get_attribute('href')
|
||
|
||
},
|
||
|
||
'search_time': search_time
|
||
|
||
}
|
||
|
||
)
|
||
|
||
print('已完成。')
|
||
print()
|
||
|
||
except:
|
||
|
||
print('爬取失败!')
|
||
print()
|
||
|
||
finally:
|
||
|
||
try:
|
||
|
||
chrome.quit()
|
||
|
||
except:
|
||
|
||
pass
|
||
|
||
return records
|
||
|
||
#根据搜索任务初始化爬取任务
|
||
def Initialize_crawl() -> None:
|
||
|
||
print('根据搜索任务初始化爬取任务...', end = '')
|
||
|
||
#列出记录(搜索状态为「启用」的搜索任务)
|
||
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records?filter={}'.format(quote('CurrentValue.[搜索状态]="启用"'))
|
||
|
||
search_tasks = Feishu().request(url)
|
||
|
||
#检查获取启用的搜索任务是否成功
|
||
if search_tasks.code != 200:
|
||
|
||
print('获取启用的搜索任务失败,本次启动终止!')
|
||
print()
|
||
|
||
return None
|
||
|
||
search_tasks = search_tasks.data.get('items')
|
||
|
||
#检查启用的搜索任务是否为空
|
||
if search_tasks is None:
|
||
|
||
print('无启用的搜索任务,本次启动终止!')
|
||
print()
|
||
|
||
return None
|
||
|
||
#用于记录爬取任务
|
||
crawl_tasks = []
|
||
|
||
for search_task in search_tasks:
|
||
|
||
#解析最近搜索时间
|
||
last_search_time = search_task.get('fields').get('最近搜索时间')
|
||
|
||
#解析搜索周期
|
||
search_period = int(search_task.get('fields').get('搜索周期')
|
||
|
||
#若最近搜索时间距现在的小时数超过搜索周期则生成爬取任务
|
||
if (int(time.time() * 1000) - last_search_time) / 3600000 >= search_period):
|
||
|
||
#解析搜索任务标识
|
||
record_id = search_task.get('record_id')
|
||
|
||
#解析搜索渠道
|
||
search_channel = search_task.get('fields').get('搜索渠道')
|
||
|
||
#解析关键词
|
||
keyword = search_task.get('fields').get('关键词')
|
||
|
||
#解析最多搜索页数
|
||
maximun_pages = int(search_task.get('fields').get('最多搜索页数'))
|
||
|
||
#根据最多搜索页数生成爬取任务
|
||
for page in range(maximun_pages):
|
||
|
||
#仅考虑通过链接爬取数据
|
||
match search_channel:
|
||
|
||
case '百度':
|
||
|
||
url = 'https://www.baidu.com/s?wd={}&ie=utf-8&pn={}'.format(quote(keyword), page * 10)
|
||
|
||
xpath = '//h3[contains(@class,"c-title t t tts-title")]/a'
|
||
|
||
case '搜狗':
|
||
|
||
#ie为关键词编码
|
||
url = 'https://www.sogou.com/web?query={}&page={}&ie=utf8'.format(quote(keyword), page + 1)
|
||
|
||
xpath = '//h3[contains(@class,"vr-title")]/a'
|
||
|
||
case '360':
|
||
|
||
url = 'https://www.so.com/s?q={}&pn={}'.format(quote(keyword), page + 1)
|
||
|
||
xpath = '//h3[contains(@class,"res-title")]/a'
|
||
|
||
crawl_tasks.append(
|
||
|
||
{
|
||
|
||
'monitor_task':
|
||
|
||
{
|
||
|
||
'record_id':
|
||
|
||
},
|
||
|
||
'search_task':
|
||
|
||
{
|
||
|
||
'search_channel': search_channel,
|
||
|
||
'keyword': keyword,
|
||
|
||
'search_url': search_url,
|
||
|
||
}
|
||
|
||
}
|
||
|
||
)
|
||
|
||
print('初始化成功。')
|
||
print()
|
||
|
||
print('启动爬虫... \\')
|
||
print()
|
||
|
||
#创建线程池
|
||
with ThreadPoolExecutor(max_workers = 2) as executor:
|
||
|
||
threads = []
|
||
|
||
for search_task in search_tasks:
|
||
|
||
thread = executor.submit(Spider, search_task)
|
||
|
||
#将搜索任务提交至进程池
|
||
threads.append(thread)
|
||
|
||
results = []
|
||
|
||
for thread in as_completed(threads):
|
||
|
||
result = thread.result()
|
||
|
||
#若爬虫返回非空则添加至结果
|
||
if result != []:
|
||
|
||
results.extend(result)
|
||
|
||
print(results)
|
||
|
||
|
||
print(Initialize_crawl())
|
||
|