From 39c3918d09dc0ea0b30d6e3eba50c28c3680010d Mon Sep 17 00:00:00 2001 From: Terrtia Date: Mon, 27 Jul 2020 15:46:09 +0200 Subject: [PATCH] chg: [crawler] manage crawlers --- bin/Crawler.py | 6 ++-- bin/core/Crawler_manager.py | 44 ++++++++++++++++++++++------ bin/core/screen.py | 57 +++++++++++++++++++++++++++++++++---- bin/lib/crawlers.py | 57 ++++++++++++++++++++++++++++++------- 4 files changed, 139 insertions(+), 25 deletions(-) diff --git a/bin/Crawler.py b/bin/Crawler.py index c34f6f80..34406574 100755 --- a/bin/Crawler.py +++ b/bin/Crawler.py @@ -143,7 +143,7 @@ def get_crawler_config(redis_server, mode, service_type, domain, url=None): def load_crawler_config(service_type, domain, paste, url, date): crawler_config = {} - crawler_config['splash_url'] = splash_url + crawler_config['splash_url'] = f'http://{splash_url}' crawler_config['item'] = paste crawler_config['service_type'] = service_type crawler_config['domain'] = domain @@ -197,7 +197,7 @@ def crawl_onion(url, domain, port, type_service, message, crawler_config): nb_retry = 0 while retry: try: - r = requests.get(splash_url , timeout=30.0) + r = requests.get(f'http://{splash_url}' , timeout=30.0) retry = False except Exception: # TODO: relaunch docker or send error message @@ -244,6 +244,8 @@ def crawl_onion(url, domain, port, type_service, message, crawler_config): print('------------------------------------------------------------------------') r_cache.hset('metadata_crawler:{}'.format(splash_url), 'status', 'Error') exit(-2) + else: + crawlers.update_splash_manager_connection_status(True) else: print(process.stdout.read()) exit(-1) diff --git a/bin/core/Crawler_manager.py b/bin/core/Crawler_manager.py index a5ac7dd6..6f1e3cf7 100755 --- a/bin/core/Crawler_manager.py +++ b/bin/core/Crawler_manager.py @@ -21,6 +21,9 @@ config_loader = None import screen +# # TODO: lauch me in core screen +# # TODO: check if already launched in tor screen + def launch_crawlers(): for crawler_splash in crawlers_to_launch: splash_name = crawler_splash[0] @@ -41,21 +44,46 @@ def launch_crawlers(): # # TODO: handle mutltiple splash_manager if __name__ == '__main__': - if not crawlers.ping_splash_manager(): - print('Error, Can\'t cnnect to Splash manager') - - crawlers.reload_splash_and_proxies_list() - launch_crawlers() - last_refresh = time.time() + is_manager_connected = crawlers.ping_splash_manager() + if not is_manager_connected: + print('Error, Can\'t connect to Splash manager') + session_uuid = None + else: + print('Splash manager connected') + session_uuid = crawlers.get_splash_manager_session_uuid() + is_manager_connected = crawlers.reload_splash_and_proxies_list() + print(is_manager_connected) + if is_manager_connected: + launch_crawlers() + last_check = int(time.time()) while True: + # check if manager is connected + if int(time.time()) - last_check > 60: + is_manager_connected = crawlers.is_splash_manager_connected() + current_session_uuid = crawlers.get_splash_manager_session_uuid() + # reload proxy and splash list + if current_session_uuid and current_session_uuid != session_uuid: + is_manager_connected = crawlers.reload_splash_and_proxies_list() + if is_manager_connected: + print('reload proxies and splash list') + launch_crawlers() + session_uuid = current_session_uuid + if not is_manager_connected: + print('Error, Can\'t connect to Splash manager') + last_check = int(time.time()) + # # TODO: lauch crawlers if was never connected # refresh splash and proxy list - if False: + elif False: crawlers.reload_splash_and_proxies_list() print('list of splash and proxies refreshed') else: - time.sleep(10) + time.sleep(5) + + # kill/launch new crawler / crawler manager check if already launched + # # TODO: handle mutltiple splash_manager + # catch reload request diff --git a/bin/core/screen.py b/bin/core/screen.py index 1be37e68..8b65daa4 100755 --- a/bin/core/screen.py +++ b/bin/core/screen.py @@ -53,6 +53,14 @@ def get_screen_pid(screen_name, with_sudoer=False): return all_pids return [] +def detach_screen(screen_name): + cmd = ['screen', '-d', screen_name] + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + #if p.stdout: + # print(p.stdout) + if p.stderr: + print(p.stderr) + def create_screen(screen_name): if not exist_screen(screen_name): cmd = ['screen', '-dmS', screen_name] @@ -79,15 +87,44 @@ def kill_screen(screen_name, with_sudoer=False): # # TODO: add check if len(window_name) == 20 # use: screen -S 'pid.screen_name' -p %window_id% -Q title # if len(windows_name) > 20 (truncated by default) -def get_screen_windows_list(screen_name): +def get_screen_windows_list(screen_name, r_set=True): + # detach screen to avoid incomplete result + detach_screen(screen_name) + if r_set: + all_windows_name = set() + else: + all_windows_name = [] cmd = ['screen', '-S', screen_name, '-Q', 'windows'] p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if p.stdout: for window_row in p.stdout.split(b' '): window_id, window_name = window_row.decode().split() - print(window_id) - print(window_name) - print('---') + #print(window_id) + #print(window_name) + #print('---') + if r_set: + all_windows_name.add(window_name) + else: + all_windows_name.append(window_name) + if p.stderr: + print(p.stderr) + return all_windows_name + +def get_screen_windows_id(screen_name): + # detach screen to avoid incomplete result + detach_screen(screen_name) + all_windows_id = {} + cmd = ['screen', '-S', screen_name, '-Q', 'windows'] + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if p.stdout: + for window_row in p.stdout.split(b' '): + window_id, window_name = window_row.decode().split() + if window_name not in all_windows_id: + all_windows_id[window_name] = [] + all_windows_id[window_name].append(window_id) + if p.stderr: + print(p.stderr) + return all_windows_id # script_location ${AIL_BIN} def launch_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options=''): @@ -98,6 +135,16 @@ def launch_windows_script(screen_name, window_name, dir_project, script_location print(p.stdout) print(p.stderr) +def launch_uniq_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options='', kill_previous_windows=False): + all_screen_name = get_screen_windows_id(screen_name) + if window_name in all_screen_name: + if kill_previous_windows: + kill_screen_window(screen_name, all_screen_name[window_name][0], force=True) + else: + print('Error: screen {} already contain a windows with this name {}'.format(screen_name, window_name)) + return None + launch_windows_script(screen_name, window_name, dir_project, script_location, script_name, script_options=script_options) + def kill_screen_window(screen_name, window_id, force=False): if force:# kill cmd = ['screen', '-S', screen_name, '-p', window_id, '-X', 'kill'] @@ -108,5 +155,5 @@ def kill_screen_window(screen_name, window_id, force=False): print(p.stderr) if __name__ == '__main__': - res = kill_screen('Docker_Splash', with_sudoer=True) + res = get_screen_windows_list('Script_AIL') print(res) diff --git a/bin/lib/crawlers.py b/bin/lib/crawlers.py index 06399658..3fcf82bf 100755 --- a/bin/lib/crawlers.py +++ b/bin/lib/crawlers.py @@ -13,6 +13,7 @@ import os import re import redis import sys +import time import uuid from datetime import datetime, timedelta @@ -590,16 +591,16 @@ def get_elem_to_crawl_by_queue_type(l_queue_type): #### SPLASH MANAGER #### -def get_splash_manager_url(reload=False): # TODO: add config reload +def get_splash_manager_url(reload=False): # TODO: add in db config return splash_manager_url -def get_splash_api_key(reload=False): # TODO: add config reload +def get_splash_api_key(reload=False): # TODO: add in db config return splash_api_key def get_splash_url_from_manager_url(splash_manager_url, splash_port): url = urlparse(splash_manager_url) host = url.netloc.split(':', 1)[0] - return 'http://{}:{}'.format(host, splash_port) + return '{}:{}'.format(host, splash_port) def is_splash_used_in_discovery(splash_name): res = r_serv_onion.hget('splash:metadata:{}'.format(splash_name), 'discovery_queue') @@ -612,14 +613,47 @@ def restart_splash_docker(splash_url): splash_port = splash_url.split(':')[-1] return _restart_splash_docker(splash_port) +def is_splash_manager_connected(delta_check=30): + last_check = r_cache.hget('crawler:splash:manager', 'last_check') + if last_check: + if int(time.time()) - int(last_check) > delta_check: + ping_splash_manager() + else: + ping_splash_manager() + res = r_cache.hget('crawler:splash:manager', 'connected') + return res == 'True' + +def update_splash_manager_connection_status(is_connected): + r_cache.hset('crawler:splash:manager', 'connected', is_connected) + r_cache.hset('crawler:splash:manager', 'last_check', int(time.time())) + ## API ## def ping_splash_manager(): - req = requests.get('{}/api/v1/ping'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False) - if req.status_code == 200: - return True - else: - print(req.json()) - return False + try: + req = requests.get('{}/api/v1/ping'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False) + if req.status_code == 200: + update_splash_manager_connection_status(True) + return True + else: + print(req.json()) + except requests.exceptions.ConnectionError: + pass + # splash manager unreachable + update_splash_manager_connection_status(False) + return False + +def get_splash_manager_session_uuid(): + try: + req = requests.get('{}/api/v1/get/session_uuid'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False) + if req.status_code == 200: + res = req.json() + if res: + return res['session_uuid'] + else: + print(req.json()) + except requests.exceptions.ConnectionError: + # splash manager unreachable + update_splash_manager_connection_status(False) def get_all_splash_manager_containers_name(): req = requests.get('{}/api/v1/get/splash/name/all'.format(get_splash_manager_url()), headers={"Authorization": get_splash_api_key()}, verify=False) @@ -764,6 +798,9 @@ def reload_splash_and_proxies_list(): # LOAD PROXIES containers delete_all_proxies() load_all_proxy() + return True + else: + return False # # TODO: kill crawler screen ? ## -- ## @@ -774,7 +811,7 @@ def launch_ail_splash_crawler(splash_url, script_options=''): script_location = os.path.join(os.environ['AIL_BIN']) script_name = 'Crawler.py' screen.create_screen(screen_name) - screen.launch_windows_script(screen_name, splash_url, dir_project, script_location, script_name, script_options=script_options) + screen.launch_uniq_windows_script(screen_name, splash_url, dir_project, script_location, script_name, script_options=script_options, kill_previous_windows=True) ## -- ##