chg: [crawler] crawler queue + restart docker on error

This commit is contained in:
Terrtia 2020-07-24 08:54:54 +02:00
parent 7e9115d4d5
commit c31aae4efc
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
4 changed files with 112 additions and 68 deletions

View file

@ -19,6 +19,9 @@ sys.path.append(os.environ['AIL_BIN'])
from Helper import Process
from pubsublogger import publisher
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib'))
import crawlers
# ======== FUNCTIONS ========
def load_blacklist(service_type):
@ -117,43 +120,6 @@ def unpack_url(url):
return to_crawl
# get url, paste and service_type to crawl
def get_elem_to_crawl(rotation_mode):
message = None
domain_service_type = None
#load_priority_queue
for service_type in rotation_mode:
message = redis_crawler.spop('{}_crawler_priority_queue'.format(service_type))
if message is not None:
domain_service_type = service_type
break
#load_discovery_queue
if message is None:
for service_type in rotation_mode:
message = redis_crawler.spop('{}_crawler_discovery_queue'.format(service_type))
if message is not None:
domain_service_type = service_type
break
#load_normal_queue
if message is None:
for service_type in rotation_mode:
message = redis_crawler.spop('{}_crawler_queue'.format(service_type))
if message is not None:
domain_service_type = service_type
break
if message:
splitted = message.rsplit(';', 1)
if len(splitted) == 2:
url, paste = splitted
if paste:
paste = paste.replace(PASTES_FOLDER+'/', '')
message = {'url': url, 'paste': paste, 'type_service': domain_service_type, 'original_message': message}
return message
def get_crawler_config(redis_server, mode, service_type, domain, url=None):
crawler_options = {}
if mode=='auto':
@ -237,6 +203,9 @@ def crawl_onion(url, domain, port, type_service, message, crawler_config):
# TODO: relaunch docker or send error message
nb_retry += 1
if nb_retry == 2:
crawlers.restart_splash_docker(splash_url)
if nb_retry == 6:
on_error_send_message_back_in_queue(type_service, domain, message)
publisher.error('{} SPASH DOWN'.format(splash_url))
@ -304,11 +273,23 @@ def search_potential_source_domain(type_service, domain):
if __name__ == '__main__':
if len(sys.argv) != 2 and len(sys.argv) != 3:
print('usage:', 'Crawler.py', 'splash_port')
print('usage:', 'Crawler.py', 'splash_name', 'splash_url')
if len(sys.argv) != 2:
print('usage:', 'Crawler.py', 'splash_url')
exit(1)
##################################################
splash_url = sys.argv[1]
splash_name = crawlers.get_splash_name_by_url(splash_url)
crawler_type = crawlers.get_splash_crawler_type(splash_name)
print(splash_name)
print(crawler_type)
#rotation_mode = deque(['onion', 'regular'])
rotation_mode = deque(crawlers.get_crawler_queue_type_by_proxy(splash_name, crawler_type))
default_proto_map = {'http': 80, 'https': 443}
######################################################## add ftp ???
publisher.port = 6380
publisher.channel = "Script"
@ -318,20 +299,8 @@ if __name__ == '__main__':
# Setup the I/O queues
p = Process(config_section)
if len(sys.argv) == 2:
splash_port = sys.argv[1]
splash_url = '{}:{}'.format( p.config.get("Crawler", "splash_url"), splash_port)
else:
splash_name = sys.argv[1]
splash_url = sys.argv[2]
print(splash_name)
print('splash url: {}'.format(splash_url))
rotation_mode = deque(['onion', 'regular'])
default_proto_map = {'http': 80, 'https': 443}
######################################################## add ftp ???
PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], p.config.get("Directories", "pastes"))
r_serv_metadata = redis.StrictRedis(
@ -391,7 +360,7 @@ if __name__ == '__main__':
update_auto_crawler()
rotation_mode.rotate()
to_crawl = get_elem_to_crawl(rotation_mode)
to_crawl = crawlers.get_elem_to_crawl_by_queue_type(rotation_mode)
if to_crawl:
url_data = unpack_url(to_crawl['url'])
# remove domain from queue

View file

@ -1,9 +1,9 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import json
import os
import sys
import time
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib'))
import ConfigLoader
@ -21,15 +21,7 @@ config_loader = None
import screen
if __name__ == '__main__':
if not crawlers.ping_splash_manager():
print('Error, Can\'t cnnect to Splash manager')
crawlers.reload_splash_and_proxies_list()
# # TODO: handle mutltiple splash_manager
def launch_crawlers():
for crawler_splash in crawlers_to_launch:
splash_name = crawler_splash[0]
nb_crawlers = int(crawler_splash[1])
@ -44,4 +36,26 @@ if __name__ == '__main__':
splash_url = all_crawler_urls[i]
print(all_crawler_urls[i])
crawlers.launch_ail_splash_crawler('http://127.0.0.1:8054', script_options='{} {}'.format(splash_name, splash_url))
crawlers.launch_ail_splash_crawler(splash_url, script_options='{}'.format(splash_url))
# # TODO: handle mutltiple splash_manager
if __name__ == '__main__':
if not crawlers.ping_splash_manager():
print('Error, Can\'t cnnect to Splash manager')
crawlers.reload_splash_and_proxies_list()
launch_crawlers()
last_refresh = time.time()
while True:
# refresh splash and proxy list
if False:
crawlers.reload_splash_and_proxies_list()
print('list of splash and proxies refreshed')
else:
time.sleep(10)
# # TODO: handle mutltiple splash_manager

View file

@ -34,6 +34,7 @@ config_loader = ConfigLoader.ConfigLoader()
r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata")
r_serv_onion = config_loader.get_redis_conn("ARDB_Onion")
r_cache = config_loader.get_redis_conn("Redis_Cache")
PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes"))
config_loader = None
# load crawler config
@ -545,6 +546,48 @@ def save_har(har_dir, item_id, har_content):
with open(filename, 'w') as f:
f.write(json.dumps(har_content))
#### CRAWLER QUEUES ####
def get_crawler_queue_type_by_proxy(splash_name, proxy_type):
all_domain_type = []
if splash_name != 'default_splash' and splash_name != 'default_splash_tor':
all_domain_type.append(splash_name)
# check if can be used for discovery
if not is_splash_used_in_discovery(splash_name):
return all_domain_type
if proxy_type == 'tor':
all_domain_type.append('onion')
all_domain_type.append('regular')
# proxy_type = web
else:
all_domain_type.append('regular')
return all_domain_type
def get_elem_to_crawl_by_queue_type(l_queue_type):
## queues priority:
# 1 - priority queue
# 2 - discovery queue
# 3 - normal queue
##
all_queue_key = ['{}_crawler_priority_queue', '{}_crawler_discovery_queue', '{}_crawler_queue']
for queue_key in all_queue_key:
for queue_type in l_queue_type:
message = r_serv_onion.spop(queue_key.format(queue_type))
if message:
dict_to_crawl = {}
splitted = message.rsplit(';', 1)
if len(splitted) == 2:
url, item_id = splitted
item_id = item_id.replace(PASTES_FOLDER+'/', '')
else:
# # TODO: to check/refractor
item_id = None
url = message
return {'url': url, 'paste': item_id, 'type_service': queue_type, 'original_message': message}
return None
#### ---- ####
#### SPLASH MANAGER ####
def get_splash_manager_url(reload=False): # TODO: add config reload
@ -558,6 +601,17 @@ def get_splash_url_from_manager_url(splash_manager_url, splash_port):
host = url.netloc.split(':', 1)[0]
return 'http://{}:{}'.format(host, splash_port)
def is_splash_used_in_discovery(splash_name):
res = r_serv_onion.hget('splash:metadata:{}'.format(splash_name), 'discovery_queue')
if res == 'True':
return True
else:
return False
def restart_splash_docker(splash_url):
splash_port = splash_url.split(':')[-1]
return _restart_splash_docker(splash_port)
## API ##
def ping_splash_manager():
req = requests.get('{}/api/v1/ping'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False)
@ -580,6 +634,14 @@ def get_all_splash_manager_proxies():
return req.json()
else:
print(req.json())
def _restart_splash_docker(splash_port):
dict_to_send = {'docker_port': splash_port}
req = requests.post('{}/api/v1/splash/restart'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False, json=dict_to_send)
if req.status_code == 200:
return req.json()
else:
print(req.json())
## -- ##
## SPLASH ##
@ -648,6 +710,9 @@ def delete_all_proxies():
for proxy_name in get_all_proxies():
delete_proxy(proxy_name)
def set_proxy_used_in_discovery(proxy_name, value):
r_serv_onion.hset('splash:metadata:{}'.format(splash_name), 'discovery_queue', value)
def delete_proxy(proxy_name): # # TODO: force delete (delete all proxy)
proxy_splash = get_all_splash_by_proxy(proxy_name)
if proxy_splash:

View file

@ -1,4 +0,0 @@
[proxy]
host=localhost
port=9050
type=SOCKS5