chg: [crawler] manage crawlers

This commit is contained in:
Terrtia 2020-07-27 15:46:09 +02:00
parent c31aae4efc
commit 39c3918d09
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
4 changed files with 139 additions and 25 deletions

View file

@ -143,7 +143,7 @@ def get_crawler_config(redis_server, mode, service_type, domain, url=None):
def load_crawler_config(service_type, domain, paste, url, date):
crawler_config = {}
crawler_config['splash_url'] = splash_url
crawler_config['splash_url'] = f'http://{splash_url}'
crawler_config['item'] = paste
crawler_config['service_type'] = service_type
crawler_config['domain'] = domain
@ -197,7 +197,7 @@ def crawl_onion(url, domain, port, type_service, message, crawler_config):
nb_retry = 0
while retry:
try:
r = requests.get(splash_url , timeout=30.0)
r = requests.get(f'http://{splash_url}' , timeout=30.0)
retry = False
except Exception:
# TODO: relaunch docker or send error message
@ -244,6 +244,8 @@ def crawl_onion(url, domain, port, type_service, message, crawler_config):
print('------------------------------------------------------------------------')
r_cache.hset('metadata_crawler:{}'.format(splash_url), 'status', 'Error')
exit(-2)
else:
crawlers.update_splash_manager_connection_status(True)
else:
print(process.stdout.read())
exit(-1)

View file

@ -21,6 +21,9 @@ config_loader = None
import screen
# # TODO: lauch me in core screen
# # TODO: check if already launched in tor screen
def launch_crawlers():
for crawler_splash in crawlers_to_launch:
splash_name = crawler_splash[0]
@ -41,21 +44,46 @@ def launch_crawlers():
# # TODO: handle mutltiple splash_manager
if __name__ == '__main__':
if not crawlers.ping_splash_manager():
print('Error, Can\'t cnnect to Splash manager')
crawlers.reload_splash_and_proxies_list()
launch_crawlers()
last_refresh = time.time()
is_manager_connected = crawlers.ping_splash_manager()
if not is_manager_connected:
print('Error, Can\'t connect to Splash manager')
session_uuid = None
else:
print('Splash manager connected')
session_uuid = crawlers.get_splash_manager_session_uuid()
is_manager_connected = crawlers.reload_splash_and_proxies_list()
print(is_manager_connected)
if is_manager_connected:
launch_crawlers()
last_check = int(time.time())
while True:
# check if manager is connected
if int(time.time()) - last_check > 60:
is_manager_connected = crawlers.is_splash_manager_connected()
current_session_uuid = crawlers.get_splash_manager_session_uuid()
# reload proxy and splash list
if current_session_uuid and current_session_uuid != session_uuid:
is_manager_connected = crawlers.reload_splash_and_proxies_list()
if is_manager_connected:
print('reload proxies and splash list')
launch_crawlers()
session_uuid = current_session_uuid
if not is_manager_connected:
print('Error, Can\'t connect to Splash manager')
last_check = int(time.time())
# # TODO: lauch crawlers if was never connected
# refresh splash and proxy list
if False:
elif False:
crawlers.reload_splash_and_proxies_list()
print('list of splash and proxies refreshed')
else:
time.sleep(10)
time.sleep(5)
# kill/launch new crawler / crawler manager check if already launched
# # TODO: handle mutltiple splash_manager
# catch reload request

View file

@ -53,6 +53,14 @@ def get_screen_pid(screen_name, with_sudoer=False):
return all_pids
return []
def detach_screen(screen_name):
cmd = ['screen', '-d', screen_name]
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#if p.stdout:
# print(p.stdout)
if p.stderr:
print(p.stderr)
def create_screen(screen_name):
if not exist_screen(screen_name):
cmd = ['screen', '-dmS', screen_name]
@ -79,15 +87,44 @@ def kill_screen(screen_name, with_sudoer=False):
# # TODO: add check if len(window_name) == 20
# use: screen -S 'pid.screen_name' -p %window_id% -Q title
# if len(windows_name) > 20 (truncated by default)
def get_screen_windows_list(screen_name):
def get_screen_windows_list(screen_name, r_set=True):
# detach screen to avoid incomplete result
detach_screen(screen_name)
if r_set:
all_windows_name = set()
else:
all_windows_name = []
cmd = ['screen', '-S', screen_name, '-Q', 'windows']
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.stdout:
for window_row in p.stdout.split(b' '):
window_id, window_name = window_row.decode().split()
print(window_id)
print(window_name)
print('---')
#print(window_id)
#print(window_name)
#print('---')
if r_set:
all_windows_name.add(window_name)
else:
all_windows_name.append(window_name)
if p.stderr:
print(p.stderr)
return all_windows_name
def get_screen_windows_id(screen_name):
# detach screen to avoid incomplete result
detach_screen(screen_name)
all_windows_id = {}
cmd = ['screen', '-S', screen_name, '-Q', 'windows']
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.stdout:
for window_row in p.stdout.split(b' '):
window_id, window_name = window_row.decode().split()
if window_name not in all_windows_id:
all_windows_id[window_name] = []
all_windows_id[window_name].append(window_id)
if p.stderr:
print(p.stderr)
return all_windows_id
# script_location ${AIL_BIN}
def launch_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options=''):
@ -98,6 +135,16 @@ def launch_windows_script(screen_name, window_name, dir_project, script_location
print(p.stdout)
print(p.stderr)
def launch_uniq_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options='', kill_previous_windows=False):
all_screen_name = get_screen_windows_id(screen_name)
if window_name in all_screen_name:
if kill_previous_windows:
kill_screen_window(screen_name, all_screen_name[window_name][0], force=True)
else:
print('Error: screen {} already contain a windows with this name {}'.format(screen_name, window_name))
return None
launch_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options=script_options)
def kill_screen_window(screen_name, window_id, force=False):
if force:# kill
cmd = ['screen', '-S', screen_name, '-p', window_id, '-X', 'kill']
@ -108,5 +155,5 @@ def kill_screen_window(screen_name, window_id, force=False):
print(p.stderr)
if __name__ == '__main__':
res = kill_screen('Docker_Splash', with_sudoer=True)
res = get_screen_windows_list('Script_AIL')
print(res)

View file

@ -13,6 +13,7 @@ import os
import re
import redis
import sys
import time
import uuid
from datetime import datetime, timedelta
@ -590,16 +591,16 @@ def get_elem_to_crawl_by_queue_type(l_queue_type):
#### SPLASH MANAGER ####
def get_splash_manager_url(reload=False): # TODO: add config reload
def get_splash_manager_url(reload=False): # TODO: add in db config
return splash_manager_url
def get_splash_api_key(reload=False): # TODO: add config reload
def get_splash_api_key(reload=False): # TODO: add in db config
return splash_api_key
def get_splash_url_from_manager_url(splash_manager_url, splash_port):
url = urlparse(splash_manager_url)
host = url.netloc.split(':', 1)[0]
return 'http://{}:{}'.format(host, splash_port)
return '{}:{}'.format(host, splash_port)
def is_splash_used_in_discovery(splash_name):
res = r_serv_onion.hget('splash:metadata:{}'.format(splash_name), 'discovery_queue')
@ -612,14 +613,47 @@ def restart_splash_docker(splash_url):
splash_port = splash_url.split(':')[-1]
return _restart_splash_docker(splash_port)
def is_splash_manager_connected(delta_check=30):
last_check = r_cache.hget('crawler:splash:manager', 'last_check')
if last_check:
if int(time.time()) - int(last_check) > delta_check:
ping_splash_manager()
else:
ping_splash_manager()
res = r_cache.hget('crawler:splash:manager', 'connected')
return res == 'True'
def update_splash_manager_connection_status(is_connected):
r_cache.hset('crawler:splash:manager', 'connected', is_connected)
r_cache.hset('crawler:splash:manager', 'last_check', int(time.time()))
## API ##
def ping_splash_manager():
req = requests.get('{}/api/v1/ping'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False)
if req.status_code == 200:
return True
else:
print(req.json())
return False
try:
req = requests.get('{}/api/v1/ping'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False)
if req.status_code == 200:
update_splash_manager_connection_status(True)
return True
else:
print(req.json())
except requests.exceptions.ConnectionError:
pass
# splash manager unreachable
update_splash_manager_connection_status(False)
return False
def get_splash_manager_session_uuid():
try:
req = requests.get('{}/api/v1/get/session_uuid'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False)
if req.status_code == 200:
res = req.json()
if res:
return res['session_uuid']
else:
print(req.json())
except requests.exceptions.ConnectionError:
# splash manager unreachable
update_splash_manager_connection_status(False)
def get_all_splash_manager_containers_name():
req = requests.get('{}/api/v1/get/splash/name/all'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False)
@ -764,6 +798,9 @@ def reload_splash_and_proxies_list():
# LOAD PROXIES containers
delete_all_proxies()
load_all_proxy()
return True
else:
return False
# # TODO: kill crawler screen ?
## -- ##
@ -774,7 +811,7 @@ def launch_ail_splash_crawler(splash_url, script_options=''):
script_location = os.path.join(os.environ['AIL_BIN'])
script_name = 'Crawler.py'
screen.create_screen(screen_name)
screen.launch_windows_script(screen_name, splash_url, dir_project, script_location, script_name, script_options=script_options)
screen.launch_uniq_windows_script(screen_name, splash_url, dir_project, script_location, script_name, script_options=script_options, kill_previous_windows=True)
## -- ##