From 57fbacc49cb82f9578bd58d54e998634aec131dd Mon Sep 17 00:00:00 2001 From: Terrtia Date: Thu, 14 Oct 2021 14:23:11 +0200 Subject: [PATCH 01/10] chg: [crawler] add auto crawler functions --- bin/lib/crawlers.py | 48 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/bin/lib/crawlers.py b/bin/lib/crawlers.py index 23ba0ae1..c035b8bd 100755 --- a/bin/lib/crawlers.py +++ b/bin/lib/crawlers.py @@ -479,6 +479,15 @@ def is_crawler_activated(): def get_crawler_all_types(): return ['onion', 'regular'] +def sanitize_crawler_types(l_crawler_types): + all_crawler_types = get_crawler_all_types() + if not l_crawler_types: + return all_crawler_types + for crawler_type in l_crawler_types: + if crawler_type not in all_crawler_types: + return all_crawler_types + return l_crawler_types + def get_all_spash_crawler_status(): crawler_metadata = [] all_crawlers = r_cache.smembers('all_splash_crawlers') @@ -600,9 +609,40 @@ def api_set_nb_crawlers_to_launch(dict_splash_name): else: return ({'error':'invalid input'}, 400) - ##-- CRAWLER GLOBAL --## +#### AUTOMATIC CRAWLER #### + +def get_auto_crawler_all_domain(l_crawler_types=[]): + l_crawler_types = sanitize_crawler_types(l_crawler_types) + if len(l_crawler_types) == 1: + return r_serv_onion.smembers(f'auto_crawler_url:{crawler_type[0]}') + else: + l_keys_name = [] + for crawler_type in l_crawler_types: + l_keys_name.append(f'auto_crawler_url:{crawler_type}') + return r_serv_onion.sunion(l_keys_name[0], *l_keys_name[1:]) + +def add_auto_crawler_in_queue(domain, domain_type, port, epoch, delta, message): + r_serv_onion.zadd('crawler_auto_queue', int(time.time() + delta) , f'{message};{domain_type}') + # update list, last auto crawled domains + r_serv_onion.lpush('last_auto_crawled', f'{domain}:{port};{epoch}') + r_serv_onion.ltrim('last_auto_crawled', 0, 9) + +def update_auto_crawler_queue(): + current_epoch = int(time.time()) + current_epoch = 1631096842 + # check if current_epoch > domain_next_epoch + l_queue = r_serv_onion.zrangebyscore('crawler_auto_queue', 0, current_epoch) + for elem in l_queue: + mess, domain_type = elem.rsplit(';', 1) + print(domain_type) + print(mess) + r_serv_onion.sadd(f'{domain_type}_crawler_priority_queue', mess) + + +##-- AUTOMATIC CRAWLER --## + #### CRAWLER TASK #### def create_crawler_task(url, screenshot=True, har=True, depth_limit=1, max_pages=100, auto_crawler=False, crawler_delta=3600, crawler_type=None, cookiejar_uuid=None, user_agent=None): @@ -1448,10 +1488,14 @@ def test_ail_crawlers(): #### ---- #### -#if __name__ == '__main__': +if __name__ == '__main__': # res = get_splash_manager_version() # res = test_ail_crawlers() # res = is_test_ail_crawlers_successful() # print(res) # print(get_test_ail_crawlers_message()) #print(get_all_queues_stats()) + + #res = get_auto_crawler_all_domain() + res = update_auto_crawler_queue() + print(res) From 966f61bb946646b7318fcedf228fd67291ade594 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Fri, 29 Oct 2021 18:48:12 +0200 Subject: [PATCH 02/10] chg_ [AIL 2 AIL] add backend --- bin/LAUNCH.sh | 39 ++- bin/core/Sync_importer.py | 89 ++++++ bin/core/Sync_manager.py | 22 ++ bin/core/Sync_module.py | 94 +++++++ bin/core/ail_2_ail.py | 492 +++++++++++++++++++++++++++++++++ bin/core/ail_2_ail_client.py | 115 ++++++++ bin/core/ail_2_ail_server.py | 250 +++++++++++++++++ bin/lib/ail_objects.py | 47 +++- bin/modules/Categ.py | 7 + bin/modules/Tags.py | 7 +- bin/modules/abstract_module.py | 3 + bin/packages/Item.py | 32 ++- bin/packages/Tag.py | 9 +- bin/packages/User.py | 3 + bin/packages/modules.cfg | 11 +- 15 files changed, 1197 insertions(+), 23 deletions(-) create mode 100755 bin/core/Sync_importer.py create mode 100755 bin/core/Sync_manager.py create mode 100755 bin/core/Sync_module.py create mode 100755 bin/core/ail_2_ail.py create mode 100755 bin/core/ail_2_ail_client.py create mode 100755 bin/core/ail_2_ail_server.py diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index 2ca79326..f3b3a71f 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -44,6 +44,8 @@ isredis=`screen -ls | egrep '[0-9]+.Redis_AIL' | cut -d. -f1` isardb=`screen -ls | egrep '[0-9]+.ARDB_AIL' | cut -d. -f1` islogged=`screen -ls | egrep '[0-9]+.Logging_AIL' | cut -d. -f1` isqueued=`screen -ls | egrep '[0-9]+.Queue_AIL' | cut -d. -f1` +is_ail_core=`screen -ls | egrep '[0-9]+.Core_AIL' | cut -d. -f1` +is_ail_2_ail=`screen -ls | egrep '[0-9]+.AIL_2_AIL' | cut -d. -f1` isscripted=`screen -ls | egrep '[0-9]+.Script_AIL' | cut -d. -f1` isflasked=`screen -ls | egrep '[0-9]+.Flask_AIL' | cut -d. -f1` iscrawler=`screen -ls | egrep '[0-9]+.Crawler_AIL' | cut -d. -f1` @@ -145,11 +147,25 @@ function launching_scripts { screen -dmS "Script_AIL" sleep 0.1 - echo -e $GREEN"\t* Launching scripts"$DEFAULT ################################## # CORE MODULES # ################################## + # screen -dmS "Core_AIL" + # sleep 0.1 + echo -e $GREEN"\t* Launching core scripts ..."$DEFAULT + + # TODO: MOOVE IMPORTER ???? => multiple scripts + + #### SYNC #### + screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x" + sleep 0.1 + screen -S "Script_AIL" -X screen -t "ail_2_ail_server" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./ail_2_ail_server.py; read x" + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Sync_manager" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_manager.py; read x" + sleep 0.1 + ##-- SYNC --## + screen -S "Script_AIL" -X screen -t "JSON_importer" bash -c "cd ${AIL_BIN}/import; ${ENV_PY} ./JSON_importer.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Crawler_manager" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Crawler_manager.py; read x" @@ -165,6 +181,10 @@ function launching_scripts { ################################## # MODULES # ################################## + # screen -dmS "Script_AIL" + # sleep 0.1 + echo -e $GREEN"\t* Launching scripts"$DEFAULT + screen -S "Script_AIL" -X screen -t "Global" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Global.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Categ" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Categ.py; read x" @@ -176,6 +196,9 @@ function launching_scripts { screen -S "Script_AIL" -X screen -t "SubmitPaste" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./submit_paste.py; read x" sleep 0.1 + screen -S "Script_AIL" -X screen -t "Sync_module" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_module.py; read x" + sleep 0.1 + screen -S "Script_AIL" -X screen -t "ApiKey" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./ApiKey.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Credential" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Credential.py; read x" @@ -366,7 +389,7 @@ function launch_queues { } function launch_scripts { - if [[ ! $isscripted ]]; then + if [[ ! $isscripted ]]; then ############################# is core sleep 1 if checking_ardb && checking_redis; then launching_scripts; @@ -414,19 +437,19 @@ function launch_feeder { } function killscript { - if [[ $islogged || $isqueued || $isscripted || $isflasked || $isfeeded || $iscrawler ]]; then + if [[ $islogged || $isqueued || $is_ail_core || $isscripted || $isflasked || $isfeeded || $iscrawler ]]; then echo -e $GREEN"Killing Script"$DEFAULT - kill $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler + kill $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $iscrawler sleep 0.2 echo -e $ROSE`screen -ls`$DEFAULT - echo -e $GREEN"\t* $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler killed."$DEFAULT + echo -e $GREEN"\t* $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $iscrawler killed."$DEFAULT else echo -e $RED"\t* No script to kill"$DEFAULT fi } function killall { - if [[ $isredis || $isardb || $islogged || $isqueued || $isscripted || $isflasked || $isfeeded || $iscrawler ]]; then + if [[ $isredis || $isardb || $islogged || $isqueued || $is_ail_2_ail || $isscripted || $isflasked || $isfeeded || $iscrawler || $is_ail_core ]]; then if [[ $isredis ]]; then echo -e $GREEN"Gracefully closing redis servers"$DEFAULT shutting_down_redis; @@ -437,10 +460,10 @@ function killall { shutting_down_ardb; fi echo -e $GREEN"Killing all"$DEFAULT - kill $isredis $isardb $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler + kill $isredis $isardb $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $iscrawler $is_ail_2_ail sleep 0.2 echo -e $ROSE`screen -ls`$DEFAULT - echo -e $GREEN"\t* $isredis $isardb $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler killed."$DEFAULT + echo -e $GREEN"\t* $isredis $isardb $islogged $isqueued $isscripted $is_ail_2_ail $isflasked $isfeeded $iscrawler $is_ail_core killed."$DEFAULT else echo -e $RED"\t* No screen to kill"$DEFAULT fi diff --git a/bin/core/Sync_importer.py b/bin/core/Sync_importer.py new file mode 100755 index 00000000..7ed7a8f0 --- /dev/null +++ b/bin/core/Sync_importer.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +""" +The SYNC Module +================================ + +This module . + +""" + +################################## +# Import External packages +################################## +import json +import os +import sys +import time + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail +from modules.abstract_module import AbstractModule +from packages.Item import Item +from packages import Tag + + +class Sync_importer(AbstractModule): + """ + Tags module for AIL framework + """ + + def __init__(self): + super(Sync_importer, self).__init__() + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 10 + + #self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict() + #self.last_refresh = time.time() + + # Send module state to logs + self.redis_logger.info(f'Module {self.module_name} Launched') + + + def run(self): + while self.proceed: + ### REFRESH DICT + # if self.last_refresh < ail_2_ail.get_last_updated_ail_instance(): + # self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict() + # self.last_refresh = time.time() + + ail_stream = ail_2_ail.get_sync_importer_ail_stream() + if ail_stream: + ail_stream = json.loads(ail_stream) + self.compute(ail_stream) + + else: + self.computeNone() + # Wait before next process + self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s") + time.sleep(self.pending_seconds) + + + def compute(self, ail_stream): + + # # TODO: SANITYZE AIL STREAM + # # TODO: CHECK FILTER + + # import Object + b64_gzip_content = ail_stream['payload']['raw'] + + # # TODO: create default id + item_id = ail_stream['meta']['ail:id'] + 'test' + + message = f'{item_id} {b64_gzip_content}' + print(message) + self.send_message_to_queue(message, 'Mixer') + + # # increase nb of paste by feeder name + # server_cache.hincrby("mixer_cache:list_feeder", Sync, 1) + + +if __name__ == '__main__': + + module = Sync_importer() + module.run() diff --git a/bin/core/Sync_manager.py b/bin/core/Sync_manager.py new file mode 100755 index 00000000..1ee97bcb --- /dev/null +++ b/bin/core/Sync_manager.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import sys +import time + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib')) +import ail_2_ail + +# # TODO: lauch me in core screen + +if __name__ == '__main__': + + Client_Manager = ail_2_ail.AIL2AILClientManager() + + while True: + command = Client_Manager.get_manager_command() + if command: + Client_Manager.execute_manager_command(command) + else: + time.sleep(5) diff --git a/bin/core/Sync_module.py b/bin/core/Sync_module.py new file mode 100755 index 00000000..144cf37f --- /dev/null +++ b/bin/core/Sync_module.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +""" +The SYNC Module +================================ + +This module . + +""" + +################################## +# Import External packages +################################## +import os +import sys +import time + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail +from modules.abstract_module import AbstractModule +from packages.Item import Item +from packages import Tag + + +class Sync_module(AbstractModule): + """ + Sync_module module for AIL framework + """ + + def __init__(self): + super(Sync_module, self).__init__() + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 10 + + self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict() + self.last_refresh = time.time() + + print(self.dict_sync_queues) + + # Send module state to logs + self.redis_logger.info(f'Module {self.module_name} Launched') + + + def compute(self, message): + + print(message) + + ### REFRESH DICT + if self.last_refresh < ail_2_ail.get_last_updated_ail_instance(): + self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict() + self.last_refresh = time.time() + + # Extract object from message + # # TODO: USE JSON DICT ???? + mess_split = message.split(';') + if len(mess_split) == 3: + obj_type = mess_split[0] + obj_subtype = mess_split[1] + obj_id = mess_split[2] + + # OBJECT => Item + if obj_type == 'item': + obj = Item(obj_id) + tags = obj.get_tags(r_set=True) + + # check filter + tags + for queue_uuid in self.dict_sync_queues: + filter_tags = self.dict_sync_queues[queue_uuid]['filter'] + print(tags) + print(filter_tags) + print(tags.issubset(filter_tags)) + if filter_tags and tags: + if tags.issubset(filter_tags): + obj_dict = obj.get_default_meta() + # send to queue push and/or pull + for dict_ail in self.dict_sync_queues[queue_uuid]['ail_instances']: + + ail_2_ail.add_object_to_sync_queue(queue_uuid, dict_ail['ail_uuid'], obj_dict, + push=dict_ail['push'], pull=dict_ail['pull']) + + else: + # Malformed message + raise Exception(f'too many values to unpack (expected 3) given {len(mess_split)} with message {message}') + + +if __name__ == '__main__': + + module = Sync_module() + module.run() diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py new file mode 100755 index 00000000..43f5c706 --- /dev/null +++ b/bin/core/ail_2_ail.py @@ -0,0 +1,492 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import json +import secrets +import sys +import time +import uuid + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'core/')) +import screen + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/')) +from Item import Item + +config_loader = ConfigLoader.ConfigLoader() +r_cache = config_loader.get_redis_conn("Redis_Cache") +r_serv_db = config_loader.get_redis_conn("ARDB_DB") +r_serv_sync = config_loader.get_redis_conn("ARDB_DB") +config_loader = None + +def generate_uuid(): + return str(uuid.uuid4()).replace('-', '') + +def generate_sync_api_key(): + return secrets.token_urlsafe(42) + +def get_ail_uuid(): + return r_serv_db.get('ail:uuid') + +# # TODO: # TODO: # TODO: # TODO: # TODO: ADD SYNC MODE == PUSH +# # TODO: get connection status +# # TODO: get connection METADATA +############################# +# # +#### SYNC CLIENT MANAGER #### + +def get_all_sync_clients(r_set=False): + res = r_cache.smembers('ail_2_ail:all_sync_clients') + if r_set: + return set(res) + else: + return res + +def get_sync_client_ail_uuid(client_id): + return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid') + +def get_sync_client_queue_uuid(client_id): + return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'queue_uuid') + +def delete_sync_client_cache(client_id): + ail_uuid = get_sync_client_ail_uuid(client_id) + queue_uuid = get_sync_client_queue_uuid(client_id) + # map ail_uuid/queue_uuid + r_cache.srem(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) + r_cache.srem(f'ail_2_ail:queue_uuid:{queue_uuid}', client_id) + + r_cache.delete(f'ail_2_ail:sync_client:{client_id}') + r_cache.srem('ail_2_ail:all_sync_clients', client_id) + +def delete_all_sync_clients_cache(): + for client_id in get_all_sync_clients(): + delete_sync_client_cache(client_id) + r_cache.delete('ail_2_ail:all_sync_clients') + +# command: -launch +# -kill +# -relaunch +## TODO: check command +def send_command_to_manager(command, client_id=-1): + dict_action = {'command': command, 'client_id': client_id} + r_cache.sadd('ail_2_ail:client_manager:command', dict_action) + +class AIL2AILClientManager(object): + """AIL2AILClientManager.""" + + SCREEN_NAME = 'AIL_2_AIL' + SCRIPT_NAME = 'ail_2_ail_client.py' + SCRIPT_DIR = os.path.join(os.environ['AIL_BIN'], 'core') + + def __init__(self): + # dict client_id: AIL2AILCLIENT or websocket + self.clients = {} + # launch all sync clients + self.relaunch_all_sync_clients() + + def get_all_clients(self): + return self.clients + + # return new client id + def get_new_sync_client_id(self): + for new_id in range(100000): + if new_id not in self.clients: + return str(new_id) + + def get_sync_client_ail_uuid(self, client_id): + return self.clients[client_id]['ail_uuid'] + + def get_sync_client_queue_uuid(self, client_id): + return self.clients[client_id]['queue_uuid'] + + # # TODO: check PUSH ACL + def get_all_sync_clients_to_launch(self): + return get_all_ail_instance() + + def relaunch_all_sync_clients(self): + delete_all_sync_clients_cache() + self.clients = {} + for ail_uuid in self.get_all_sync_clients_to_launch(): + self.launch_sync_client(ail_uuid) + + def launch_sync_client(self, ail_uuid): + dir_project = os.environ['AIL_HOME'] + client_id = self.get_new_sync_client_id() + script_options = f'-a {ail_uuid} -m push -i {client_id}' + screen.create_screen(AIL2AILClientManager.SCREEN_NAME) + screen.launch_uniq_windows_script(AIL2AILClientManager.SCREEN_NAME, + client_id, dir_project, + AIL2AILClientManager.SCRIPT_DIR, + AIL2AILClientManager.SCRIPT_NAME, + script_options=script_options, kill_previous_windows=True) + # save sync client status + r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid', ail_uuid) + r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'launch_time', int(time.time())) + + # create map ail_uuid/queue_uuid + r_cache.sadd(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) + + self.clients[client_id] = {'ail_uuid': ail_uuid} + + # # TODO: FORCE KILL ???????????? + # # TODO: check if exists + def kill_sync_client(self, client_id): + if not kill_screen_window('AIL_2_AIL', client_id): + # # TODO: log kill error + pass + + delete_sync_client_cache(client_id) + self.clients.pop(client_id) + + ## COMMANDS ## + + def get_manager_command(self): + res = r_cache.spop('ail_2_ail:client_manager:command') + if res: + return json.dumps(res) + else: + return None + + def execute_manager_command(self, command_dict): + command = command_dict.get('command') + if command == 'launch': + ail_uuid = int(command_dict.get('ail_uuid')) + queue_uuid = int(command_dict.get('queue_uuid')) + self.launch_sync_client(ail_uuid, queue_uuid) + elif command == 'relaunch': + self.relaunch_all_sync_clients() + else: + # only one sync client + client_id = int(command_dict.get('client_id')) + if client_id < 1: + print('Invalid client id') + return None + if command == 'kill': + self.kill_sync_client(client_id) + elif command == 'relaunch': + ail_uuid = self.get_sync_client_ail_uuid(client_id) + queue_uuid = self.get_sync_client_queue_uuid(client_id) + self.kill_sync_client(client_id) + self.launch_sync_client(ail_uuid, queue_uuid) + +######################################## +######################################## +######################################## + +# # TODO: ADD METADATA +def get_sync_client_status(client_id): + dict_client = {'id': client_id} + dict_client['ail_uuid'] = get_sync_client_ail_uuid(client_id) + dict_client['queue_uuid'] = get_sync_client_queue_uuid(client_id) + return dict_client + +def get_all_sync_client_status(): + sync_clients = [] + all_sync_clients = r_cache.smembers('ail_2_ail:all_sync_clients') + for client_id in all_sync_clients: + sync_clients.append(get_sync_client_status(client_id)) + return sync_clients + +###################### +# # +#### AIL INSTANCE #### + +## AIL KEYS ## + +def get_all_ail_instance_keys(): + return r_serv_sync.smembers(f'ail:instance:key:all') + +def is_allowed_ail_instance_key(key): + return r_serv_sync.sismember(f'ail:instance:key:all', key) + +def get_ail_instance_key(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'api_key') + +def get_ail_instance_by_key(key): + return r_serv_sync.get(f'ail:instance:key:{key}') + +# def check_acl_sync_queue_ail(ail_uuid, queue_uuid, key): +# return is_ail_instance_queue(ail_uuid, queue_uuid) + +def update_ail_instance_key(ail_uuid, new_key): + old_key = get_ail_instance_key(ail_uuid) + r_serv_sync.srem(f'ail:instance:key:all', old_key) + r_serv_sync.delete(f'ail:instance:key:{old_key}') + + r_serv_sync.sadd(f'ail:instance:key:all', new_key) + r_serv_sync.delete(f'ail:instance:key:{new_key}', ail_uuid) + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'api_key', new_key) + +#- AIL KEYS -# + +def get_all_ail_instance(): + return r_serv_sync.smembers('ail:instance:all') + +def get_ail_instance_all_sync_queue(ail_uuid): + return r_serv_sync.smembers(f'ail:instance:sync_queue:{ail_uuid}') + +def is_ail_instance_queue(ail_uuid, queue_uuid): + return r_serv_sync.sismember(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + +def get_ail_instance_url(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'url') + +def get_ail_instance_description(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'description') + +def is_ail_instance_push_enabled(ail_uuid): + res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'push') + return res == 'True' + +def is_ail_instance_pull_enabled(ail_uuid): + res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'pull') + return res == 'True' + +def is_ail_instance_sync_enabled(ail_uuid, sync_mode=None): + if sync_mode is None: + return is_ail_instance_push_enabled(ail_uuid) or is_ail_instance_pull_enabled(ail_uuid) + elif sync_mode == 'pull': + return is_ail_instance_pull_enabled(ail_uuid) + elif sync_mode == 'push': + return is_ail_instance_push_enabled(ail_uuid) + else: + return False + +def change_pull_push_state(ail_uuid, pull=False, push=False): + # sanityze pull/push + if pull: + pull = True + else: + pull = False + if push: + push = True + else: + push = False + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'push', push) + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'pull', pull) + + +# # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) +# # TODO: push/pull +def get_ail_instance_metadata(ail_uuid): + dict_meta = {} + dict_meta['uuid'] = ail_uuid + dict_meta['url'] = get_ail_instance_url(ail_uuid) + dict_meta['description'] = get_ail_instance_description(ail_uuid) + + # # TODO: HIDE + dict_meta['api_key'] = get_ail_instance_key(ail_uuid) + + # # TODO: + # - set UUID sync_queue + + return dict_meta + +# # TODO: VALIDATE URL +# API KEY +def create_ail_instance(ail_uuid, url, api_key=None, description=None): + r_serv_sync.sadd('ail:instance:all', ail_uuid) + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'url', url) + ## API KEY ## + if not api_key: + api_key = generate_sync_api_key() + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'api_key', api_key) + r_serv_sync.sadd('ail:instance:key:all', api_key) + r_serv_sync.set(f'ail:instance:key:{api_key}', ail_uuid) + #- API KEY -# + if description: + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'description', description) + return ail_uuid + +def delete_ail_instance(ail_uuid): + for queue_uuid in get_ail_instance_all_sync_queue(ail_uuid): + unregister_ail_to_sync_queue(ail_uuid, queue_uuid) + r_serv_sync.delete(f'ail:instance:sync_queue:{ail_uuid}') + key = get_ail_instance_by_key(ail_uuid) + r_serv_sync.delete(f'ail:instance:{ail_uuid}') + r_serv_sync.srem('ail:instance:key:all', ail_uuid) + r_serv_sync.delete(f'ail:instance:key:{key}', ail_uuid) + r_serv_sync.srem('ail:instance:all', ail_uuid) + +def get_last_updated_ail_instance(): + epoch = r_serv_sync.get(f'ail:instance:queue:last_updated') + if not epoch: + epoch = 0 + return float(epoch) + +#################### +# # +#### SYNC QUEUE #### + +class Sync_Queue(object): # # TODO: use for edit + """Sync_Queue.""" + + def __init__(self, uuid): + self.uuid = uuid + +def get_all_sync_queue(): + return r_serv_sync.smembers('ail2ail:sync_queue:all') + +def get_sync_queue_all_ail_instance(queue_uuid): + return r_serv_sync.smembers(f'ail2ail:sync_queue:ail_instance:{queue_uuid}') + +# # TODO: check if push or pull enabled ? +def is_queue_used_by_ail_instace(queue_uuid): + return r_serv_sync.exists(f'ail2ail:sync_queue:ail_instance:{queue_uuid}') + +# # TODO: add others filter +def get_sync_queue_filter(queue_uuid): + return r_serv_sync.smembers(f'ail2ail:sync_queue:filter:tags:{queue_uuid}') + +# # TODO: ADD FILTER +def get_sync_queue_metadata(queue_uuid): + dict_meta = {} + dict_meta['uuid'] = queue_uuid + dict_meta['name'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'name') + dict_meta['description'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'description') + dict_meta['max_size'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'max_size') + + # # TODO: TO ADD: + # - set uuid instance + + return dict_meta + +##################################################### +def get_all_sync_queue_dict(): + dict_sync_queues = {} + for queue_uuid in get_all_sync_queue(): + if is_queue_used_by_ail_instace(queue_uuid): + dict_queue = {} + dict_queue['filter'] = get_sync_queue_filter(queue_uuid) + + dict_queue['ail_instances'] = [] ############ USE DICT ????????? + for ail_uuid in get_sync_queue_all_ail_instance(queue_uuid): + dict_ail = {'ail_uuid': ail_uuid, + 'pull': is_ail_instance_pull_enabled(ail_uuid), + 'push': is_ail_instance_push_enabled(ail_uuid)} + if dict_ail['pull'] or dict_ail['push']: + dict_queue['ail_instances'].append(dict_ail) + if dict_queue['ail_instances']: + dict_sync_queues[queue_uuid] = dict_queue + return dict_sync_queues + + +def register_ail_to_sync_queue(ail_uuid, queue_uuid): + r_serv_sync.sadd(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid) + r_serv_sync.sadd(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + +def unregister_ail_to_sync_queue(ail_uuid, queue_uuid): + r_serv_sync.srem(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid) + r_serv_sync.srem(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + +# # TODO: optionnal name ??? +# # TODO: SANITYZE TAGS +def create_sync_queue(name, tags=[], description=None, max_size=100): + queue_uuid = generate_uuid() + r_serv_sync.sadd('ail2ail:sync_queue:all', queue_uuid) + + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'name', name) + if description: + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'description', description) + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'max_size', max_size) + + for tag in tags: + r_serv_sync.sadd(f'ail2ail:sync_queue:filter:tags:{queue_uuid}', tag) + + return queue_uuid + +def delete_sync_queue(queue_uuid): + for ail_uuid in get_sync_queue_all_ail_instance(queue_uuid): + unregister_ail_to_sync_queue(ail_uuid, queue_uuid) + r_serv_sync.delete(f'ail2ail:sync_queue:{queue_uuid}') + r_serv_sync.srem('ail2ail:sync_queue:all', queue_uuid) + return queue_uuid + +############################# +# # +#### SYNC REDIS QUEUE ####### + +def get_sync_queue_object(ail_uuid, push=True): + for queue_uuid in get_ail_instance_all_sync_queue(ail_uuid): + obj_dict = get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=push) + if obj_dict: + return obj_dict + return None + +def get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=True): + if push: + sync_mode = 'push' + else: + sync_mode = 'pull' + obj_dict = r_serv_sync.lpop(f'sync:queue:{sync_mode}:{queue_uuid}:{ail_uuid}') + if obj_dict: + obj_dict = json.loads(obj_dict) + # # REVIEW: # TODO: create by obj type + return Item(obj_dict['id']) + +def add_object_to_sync_queue(queue_uuid, ail_uuid, obj_dict, push=True, pull=True): + obj = json.dumps(obj_dict) + + # # TODO: # FIXME: USE CACHE ?????? + if push: + r_serv_sync.lpush(f'sync:queue:push:{queue_uuid}:{ail_uuid}', obj) + r_serv_sync.ltrim(f'sync:queue:push:{queue_uuid}:{ail_uuid}', 0, 200) + + if pull: + r_serv_sync.lpush(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', obj) + r_serv_sync.ltrim(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', 0, 200) + +# # TODO: # REVIEW: USE CACHE ????? USE QUEUE FACTORY ????? +def get_sync_importer_ail_stream(): + return r_serv_sync.spop('sync:queue:importer') + +def add_ail_stream_to_sync_importer(ail_stream): + ail_stream = json.dumps(ail_stream) + r_serv_sync.sadd('sync:queue:importer', ail_stream) + +############################# +# # +#### AIL EXCHANGE FORMAT #### + +def create_ail_stream(Object): + ail_stream = {'format': 'ail', + 'version': 1, + 'type': Object.get_type()} + + # OBJECT META + ail_stream['meta'] = {'ail_mime-type': 'text/plain'} + ail_stream['meta']['ail:id'] = Object.get_id() + ail_stream['meta']['ail:tags'] = Object.get_tags() + # GLOBAL PAYLOAD + ail_stream['meta']['ail:uuid'] = get_ail_uuid() + + # OBJECT PAYLOAD + ail_stream['payload'] = Object.get_ail_2_ail_payload() + + return ail_stream + +if __name__ == '__main__': + + ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' + url = "wss://localhost:4443" + api_key = 'secret' + #description = 'first test instance' + queue_uuid = '79bcafc0a6d644deb2c75fb5a83d7caa' + tags = ['infoleak:submission="manual"'] + name = 'submitted queue' + description = 'first test queue, all submitted items' + #queue_uuid = '' + + #res = create_ail_instance(ail_uuid, url, api_key=api_key, description=description) + + #res = create_sync_queue(name, tags=tags, description=description, max_size=100) + #res = delete_sync_queue(queue_uuid) + + #res = register_ail_to_sync_queue(ail_uuid, queue_uuid) + res = change_pull_push_state(ail_uuid, push=True, pull=True) + + print(res) diff --git a/bin/core/ail_2_ail_client.py b/bin/core/ail_2_ail_client.py new file mode 100755 index 00000000..678999a3 --- /dev/null +++ b/bin/core/ail_2_ail_client.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import argparse +import json +import os +import sys +import time +from urllib.parse import urljoin + +import asyncio +import http +import ssl +import websockets + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail + +#################################################################### + +class AIL2AILClient(object): + """AIL2AILClient.""" + + def __init__(self, client_id, ail_uuid, sync_mode): + self.client_id + self.ail_uuid = ail_uuid + self.sync_mode = sync_mode + + # # TODO: + self.ail_url = "wss://localhost:4443" + + self.uri = f"{ail_url}/{sync_mode}/{ail_uuid}" + +#################################################################### + +# # TODO: ADD TIMEOUT +async def pull(websocket, ail_uuid): + while True: + obj = await websocket.recv() + print(obj) + +async def push(websocket, ail_uuid): + + while True: + # get elem to send + Obj = ail_2_ail.get_sync_queue_object(ail_uuid) + if Obj: + obj_ail_stream = ail_2_ail.create_ail_stream(Obj) + obj_ail_stream = json.dumps(obj_ail_stream) + print(obj_ail_stream) + + # send objects + await websocket.send(obj_ail_stream) + # DEBUG: + await asyncio.sleep(0.1) + else: + await asyncio.sleep(10) + + +async def ail_to_ail_client(ail_uuid, sync_mode, ail_key=None): + if not ail_key: + ail_key = ail_2_ail.get_ail_instance_key(ail_uuid) + ail_url = "wss://localhost:4443" + + uri = f"{ail_url}/{sync_mode}/{ail_uuid}" + print(uri) + + async with websockets.connect( + uri, + ssl=ssl_context, + extra_headers={"Authorization": f"{ail_key}"} + ) as websocket: + + if sync_mode == 'pull': + await pull(websocket, ail_uuid) + + elif sync_mode == 'push': + await push(websocket, ail_uuid) + await websocket.close() + + elif sync_mode == 'api': + await websocket.close() + +##########################################################3 +# # TODO:manual key +########################################################## +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Websocket SYNC Client') + parser.add_argument('-a', '--ail', help='AIL UUID', type=str, dest='ail_uuid', required=True, default=None) + parser.add_argument('-i', '--client_id', help='Client ID', type=str, dest='client_id', default=None) + parser.add_argument('-m', '--mode', help='SYNC Mode, pull or push', type=str, dest='sync_mode', default='pull') + #parser.add_argument('-k', '--key', type=str, default='', help='AIL Key') + args = parser.parse_args() + + ail_uuid = args.ail_uuid + sync_mode = args.sync_mode + + if ail_uuid is None or sync_mode not in ['pull', 'push']: + parser.print_help() + sys.exit(0) + + #ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' + #sync_mode = 'pull' + + # SELF SIGNED CERTIFICATES + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + # SELF SIGNED CERTIFICATES + + asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode)) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py new file mode 100755 index 00000000..d4a6646d --- /dev/null +++ b/bin/core/ail_2_ail_server.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import json +import os +import sys +import uuid + +import asyncio +import http +import ssl +import websockets + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail + + +obj_test = { + "format": "ail", + "version": 1, + "type": "item", + "meta": { + "ail:uuid": "03c51929-eeab-4d47-9dc0-c667f94c7d2c", + "ail:uuid_org": "28bc3db3-16da-461c-b20b-b944f4058708", + }, + "payload": { + "raw" : "MjhiYzNkYjMtMTZkYS00NjFjLWIyMGItYjk0NGY0MDU4NzA4Cg==", + "compress": "gzip", + "encoding": "base64" + } +} + +############################# + +CONNECTED_CLIENT = set() +# # TODO: Store in redis + +############################# + +# # # # # # # +# # +# UTILS # +# # +# # # # # # # + +def is_valid_uuid_v4(UUID): + if not UUID: + return False + UUID = UUID.replace('-', '') + try: + uuid_test = uuid.UUID(hex=UUID, version=4) + return uuid_test.hex == UUID + except: + return False + +def unpack_path(path): + dict_path = {} + path = path.split('/') + if len(path) < 3: + raise Exception('Invalid url path') + dict_path['sync_mode'] = path[1] + dict_path['ail_uuid'] = path[2] + return dict_path + +# # # # # # # + + +# async def send_object(): +# if CONNECTED_CLIENT: +# message = 'new json object {"id": "test01"}' +# await asyncio.wait([user.send(message) for user in USERS]) + + +async def register(websocket): + CONNECTED_CLIENT.add(websocket) + print(CONNECTED_CLIENT) + +async def unregister(websocket): + CONNECTED_CLIENT.remove(websocket) + +# PULL: Send data to client +# # TODO: ADD TIMEOUT ??? +async def pull(websocket, ail_uuid): + + for queue_uuid in ail_2_ail.get_ail_instance_all_sync_queue(ail_uuid): + while True: + # get elem to send + Obj = ail_2_ail.get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=False) + if Obj: + obj_ail_stream = ail_2_ail.create_ail_stream(Obj) + Obj = json.dumps(obj_ail_stream) + print(Obj) + + # send objects + await websocket.send(Obj) + # END PULL + else: + break + + # END PULL + return None + + +# PUSH: receive data from client +# # TODO: optional queue_uuid +async def push(websocket, ail_uuid): + print(ail_uuid) + while True: + ail_stream = await websocket.recv() + + # # TODO: CHECK ail_stream + ail_stream = json.loads(ail_stream) + print(ail_stream) + + ail_2_ail.add_ail_stream_to_sync_importer(ail_stream) + +async def ail_to_ail_serv(websocket, path): + + + # # TODO: check if it works + # # DEBUG: + print(websocket.ail_key) + print(websocket.ail_uuid) + + print(websocket.remote_address) + path = unpack_path(path) + sync_mode = path['sync_mode'] + print(f'sync mode: {sync_mode}') + + await register(websocket) + try: + if sync_mode == 'pull': + await pull(websocket, websocket.ail_uuid) + await websocket.close() + print('closed') + + elif sync_mode == 'push': + await push(websocket, websocket.ail_uuid) + + elif sync_mode == 'api': + await websocket.close() + + finally: + await unregister(websocket) + + +########################################### +# CHECK Authorization HEADER and URL PATH # + +# # TODO: check AIL UUID (optional header) + +class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): + """AIL_2_AIL_Protocol websockets server.""" + + async def process_request(self, path, request_headers): + + print(self.remote_address) + print(request_headers) + # API TOKEN + api_key = request_headers.get('Authorization', '') + print(api_key) + if api_key is None: + print('Missing token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n" + + if not ail_2_ail.is_allowed_ail_instance_key(api_key): + print('Invalid token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" + + # PATH + try: + dict_path = unpack_path(path) + except Exception as e: + print('Invalid path') + return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" + + + ail_uuid = ail_2_ail.get_ail_instance_by_key(api_key) + if ail_uuid != dict_path['ail_uuid']: + print('Invalid token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" + + + if not api_key != ail_2_ail.get_ail_instance_key(api_key): + print('Invalid token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" + + self.ail_key = api_key + self.ail_uuid = ail_uuid + + if dict_path['sync_mode'] == 'pull' or dict_path['sync_mode'] == 'push': + + # QUEUE UUID + # if dict_path['queue_uuid']: + # + # if not is_valid_uuid_v4(dict_path['queue_uuid']): + # print('Invalid UUID') + # return http.HTTPStatus.BAD_REQUEST, [], b"Invalid UUID\n" + # + # self.queue_uuid = dict_path['queue_uuid'] + # else: + # self.queue_uuid = None + # + # if not ail_2_ail.is_ail_instance_queue(ail_uuid, dict_path['queue_uuid']): + # print('UUID not found') + # return http.HTTPStatus.FORBIDDEN, [], b"UUID not found\n" + + # SYNC MODE + if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=dict_path['sync_mode']): + print('SYNC mode disabled') + return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n" + + # # TODO: CHECK API + elif dict_path[sync_mode] == 'api': + pass + + else: + print('Invalid path') + return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" + + +########################################### + +# # TODO: logging +# # TODO: clean shutdown / kill all connections +# # TODO: API +# # TODO: Filter object +# # TODO: process_request check +# # TODO: IP/uuid to block + +if __name__ == '__main__': + + host = 'localhost' + port = 4443 + + print('Launching Server...') + + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + cert_dir = os.environ['AIL_FLASK'] + ssl_context.load_cert_chain(certfile=os.path.join(cert_dir, 'server.crt'), keyfile=os.path.join(cert_dir, 'server.key')) + + start_server = websockets.serve(ail_to_ail_serv, "localhost", 4443, ssl=ssl_context, create_protocol=AIL_2_AIL_Protocol) + + print(f'Server Launched: wss://{host}:{port}') + + asyncio.get_event_loop().run_until_complete(start_server) + asyncio.get_event_loop().run_forever() diff --git a/bin/lib/ail_objects.py b/bin/lib/ail_objects.py index 565661dc..97ec275d 100755 --- a/bin/lib/ail_objects.py +++ b/bin/lib/ail_objects.py @@ -9,9 +9,16 @@ import redis from abc import ABC from flask import url_for +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/')) +import Tag + sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader +config_loader = ConfigLoader.ConfigLoader() +r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata") +config_loader = None + class AbstractObject(ABC): """ Abstract Object @@ -22,7 +29,7 @@ class AbstractObject(ABC): # - handle + refactor coorelations # - creates others objects - def __init__(self, obj_type, id): + def __init__(self, obj_type, id, subtype=None): """ Abstract for all the AIL object :param obj_type: object type (item, ...) @@ -30,17 +37,43 @@ class AbstractObject(ABC): """ self.id = id self.type = obj_type - - def get_type(self): - return self.type + self.subtype = None def get_id(self): return self.id + def get_type(self): + return self.type + + def get_subtype(self, r_str=False): + if not self.subtype: + if r_str: + return '' + return self.subtype + + def get_default_meta(self): + dict_meta = {'id': self.get_id(), + 'type': self.get_type()} + if self.subtype: + dict_meta['subtype'] = self.subtype + return dict_meta + + def get_tags(self, r_set=False): + tags = Tag.get_obj_tag(self.id) + if r_set: + tags = set(tags) + return tags + + ## ADD TAGS ???? + #def add_tags(self): + + def _delete(self): + # DELETE TAGS + Tag.delete_obj_all_tags(self.id, self.type) + if self.type == 'item': + # delete tracker + pass -config_loader = ConfigLoader.ConfigLoader() -r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata") -config_loader = None def is_valid_object_type(object_type): if object_type in ['domain', 'item', 'image', 'decoded']: diff --git a/bin/modules/Categ.py b/bin/modules/Categ.py index 36e749a6..a071fb2f 100755 --- a/bin/modules/Categ.py +++ b/bin/modules/Categ.py @@ -102,6 +102,13 @@ class Categ(AbstractModule): if r_result: return categ_found + # DIRTY FIX AIL SYNC + # # FIXME: DIRTY FIX + message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' + print(message) + self.send_message_to_queue(message, 'SyncModule') + + if __name__ == '__main__': # SCRIPT PARSER # diff --git a/bin/modules/Tags.py b/bin/modules/Tags.py index 2a42bfca..2ed37b74 100755 --- a/bin/modules/Tags.py +++ b/bin/modules/Tags.py @@ -45,14 +45,17 @@ class Tags(AbstractModule): if len(mess_split) == 2: tag = mess_split[0] item = Item(mess_split[1]) - item_id = item.get_id() # Create a new tag Tag.add_tag('item', tag, item.get_id()) - print(f'{item_id}: Tagged {tag}') + print(f'{item.get_id()}: Tagged {tag}') # Forward message to channel self.send_message_to_queue(message, 'MISP_The_Hive_feeder') + + message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' + self.send_message_to_queue(message, 'Sync_module') + else: # Malformed message raise Exception(f'too many values to unpack (expected 2) given {len(mess_split)} with message {message}') diff --git a/bin/modules/abstract_module.py b/bin/modules/abstract_module.py index de38faa2..632f8e43 100644 --- a/bin/modules/abstract_module.py +++ b/bin/modules/abstract_module.py @@ -72,6 +72,7 @@ class AbstractModule(ABC): ex: send_to_queue(item_id, 'Global') """ self.process.populate_set_out(message, queue_name) + # add to new set_module def run(self): """ @@ -98,6 +99,8 @@ class AbstractModule(ABC): print('TRACEBACK:') for line in trace: print(line) + # remove from set_module + ## check if item process == completed else: self.computeNone() diff --git a/bin/packages/Item.py b/bin/packages/Item.py index 6ca7ba43..6f0a2859 100755 --- a/bin/packages/Item.py +++ b/bin/packages/Item.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* +import base64 import os import re import sys @@ -502,7 +503,6 @@ def delete_item(obj_id): if not exist_item(obj_id): return False else: - Tag.delete_obj_tags(obj_id, 'item', Tag.get_obj_tag(obj_id)) delete_item_duplicate(obj_id) # delete MISP event r_serv_metadata.delete('misp_events:{}'.format(obj_id)) @@ -532,6 +532,8 @@ def delete_item(obj_id): ### TODO in inport V2 # delete from tracked items + + # # # TODO: # FIXME: LATER # delete from queue ### return False @@ -594,6 +596,18 @@ class Item(AbstractObject): """ return item_basic.get_item_content(self.id) + def get_gzip_content(self, b64=False): + with open(self.get_filename(), 'rb') as f: + content = f.read() + if b64: + content = base64.b64encode(content) + return content.decode() + + def get_ail_2_ail_payload(self): + payload = {'raw': self.get_gzip_content(b64=True), + 'compress': 'gzip'} + return payload + # # TODO: def create(self): pass @@ -607,6 +621,22 @@ class Item(AbstractObject): except FileNotFoundError: return False + ############################################################################ + ############################################################################ + ############################################################################ + + def exist_correlation(self): + pass + + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + #if __name__ == '__main__': diff --git a/bin/packages/Tag.py b/bin/packages/Tag.py index ad521d0a..266d6a68 100755 --- a/bin/packages/Tag.py +++ b/bin/packages/Tag.py @@ -421,7 +421,7 @@ def add_tag(object_type, tag, object_id, obj_date=None): r_serv_tags.hincrby('daily_tags:{}'.format(datetime.date.today().strftime("%Y%m%d")), tag, 1) def delete_obj_tag(object_type, object_id, tag, obj_date): - if object_type=="item": # # TODO: # FIXME: # REVIEW: rename me !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + if object_type=="item": # # TODO: # FIXME: # REVIEW: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! obj_date = get_obj_date(object_type, object_id) r_serv_metadata.srem('tag:{}'.format(object_id), tag) r_serv_tags.srem('{}:{}'.format(tag, obj_date), object_id) @@ -455,7 +455,7 @@ def api_delete_obj_tags(tags=[], object_id=None, object_type="item"): if not tags: return ({'status': 'error', 'reason': 'No Tag(s) specified'}, 400) - res = delete_obj_tags(object_id, object_type, tags=tags) + res = delete_obj_tags(object_id, object_type, tags) if res: return res @@ -464,13 +464,16 @@ def api_delete_obj_tags(tags=[], object_id=None, object_type="item"): dict_res['id'] = object_id return (dict_res, 200) -def delete_obj_tags(object_id, object_type, tags=[]): +def delete_obj_tags(object_id, object_type, tags): obj_date = get_obj_date(object_type, object_id) for tag in tags: res = delete_tag(object_type, tag, object_id, obj_date=obj_date) if res: return res +def delete_obj_all_tags(obj_id, obj_type): + delete_obj_tags(obj_id, obj_type, get_obj_tag(obj_id)) + def sanitise_tags_date_range(l_tags, date_from=None, date_to=None): if date_from is None or date_to is None: date_from = get_tags_min_last_seen(l_tags, r_int=False) diff --git a/bin/packages/User.py b/bin/packages/User.py index ef6eba74..fe5197c7 100755 --- a/bin/packages/User.py +++ b/bin/packages/User.py @@ -11,6 +11,9 @@ import ConfigLoader from flask_login import UserMixin +def get_all_users(): + return r_serv_db.hkeys('user:all') + class User(UserMixin): def __init__(self, id): diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index f1ec8ef7..4cc5ba6a 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -2,6 +2,9 @@ subscribe = ZMQ_Global publish = Redis_Mixer,Redis_preProcess1 +[Sync_importer] +publish = Redis_Mixer,Redis_Tags + [Importer_Json] publish = Redis_Mixer,Redis_Tags @@ -55,7 +58,7 @@ subscribe = Redis_Global [Categ] subscribe = Redis_Global -publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Urls,Redis_Credential,Redis_SourceCode,Redis_Cve,Redis_ApiKey +publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Urls,Redis_Credential,Redis_SourceCode,Redis_Cve,Redis_ApiKey,Redis_SyncModule [CreditCards] subscribe = Redis_CreditCards @@ -96,7 +99,11 @@ subscribe = Redis_ModuleStats [Tags] subscribe = Redis_Tags -publish = Redis_Tags_feed +publish = Redis_Tags_feed,Redis_SyncModule + +# dirty fix +[Sync_module] +subscribe = Redis_SyncModule [MISP_The_hive_feeder] subscribe = Redis_Tags_feed From b94411f71081a4715ed7d01d7cf7372567654507 Mon Sep 17 00:00:00 2001 From: Steve Clement Date: Tue, 9 Nov 2021 13:43:32 +0100 Subject: [PATCH 03/10] fix: [py] Minor python dependency change --- installing_deps.sh | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/installing_deps.sh b/installing_deps.sh index 4206d48c..977ed642 100755 --- a/installing_deps.sh +++ b/installing_deps.sh @@ -18,7 +18,7 @@ sudo apt-get install wget -qq sudo apt-get install tor -qq #Needed for bloom filters -sudo apt-get install libssl-dev libfreetype6-dev python-numpy -qq +sudo apt-get install libssl-dev libfreetype6-dev python3-numpy -qq #pyMISP #sudo apt-get -y install python3-pip @@ -43,8 +43,7 @@ sudo apt-get install build-essential libffi-dev automake autoconf libtool -qq sudo apt-get install p7zip-full -qq # SUBMODULES # -git submodule init -git submodule update +git submodule update --init # REDIS # test ! -d redis/ && git clone https://github.com/antirez/redis.git From 629877e1f360c1c370d3a59e75e41d8ff6f03712 Mon Sep 17 00:00:00 2001 From: Steve Clement Date: Tue, 9 Nov 2021 13:45:00 +0100 Subject: [PATCH 04/10] fix: [doc] Remove Travis --- README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.md b/README.md index 9f866317..c7422a4e 100644 --- a/README.md +++ b/README.md @@ -9,10 +9,6 @@ AIL Latest Release - - Travis - - Gitter From fc8386e1b5a5b5d7f44faf1929d46f161193bc2b Mon Sep 17 00:00:00 2001 From: Steve Clement Date: Tue, 9 Nov 2021 13:53:15 +0100 Subject: [PATCH 05/10] chg: [doc] GI Badge --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index c7422a4e..ddc7f472 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,11 @@ AIL Latest Release + + + CI + + Gitter From 997a2c602a21ee2679207799e7185518b7b173cc Mon Sep 17 00:00:00 2001 From: Terrtia Date: Mon, 22 Nov 2021 23:45:41 +0100 Subject: [PATCH 06/10] chg: [v4.0 AIL SYNC / AIL 2 AIL] SYNC Manager + fixs + views --- bin/core/Sync_module.py | 6 +- bin/core/ail_2_ail.py | 341 ++++++++++++++++-- bin/core/ail_2_ail_server.py | 64 ++-- bin/packages/Tag.py | 138 ++++++- requirements.txt | 3 + update/v4.0/Update.py | 26 ++ update/v4.0/Update.sh | 29 ++ var/www/Flask_server.py | 2 + var/www/blueprints/ail_2_ail_sync.py | 247 +++++++++++++ var/www/blueprints/tags_ui.py | 28 ++ var/www/modules/Tags/Flask_Tags.py | 7 + .../templates/ail_2_ail/add_ail_server.html | 170 +++++++++ .../templates/ail_2_ail/add_sync_queue.html | 139 +++++++ .../ail_2_ail/ail_2_ail_dashboard.html | 100 +++++ var/www/templates/ail_2_ail/ail_servers.html | 106 ++++++ .../templates/ail_2_ail/register_queue.html | 110 ++++++ var/www/templates/ail_2_ail/sync_queues.html | 105 ++++++ .../templates/ail_2_ail/view_ail_server.html | 168 +++++++++ .../templates/ail_2_ail/view_sync_queue.html | 159 ++++++++ var/www/templates/settings/menu_sidebar.html | 23 ++ .../templates/tags/block_tags_selector.html | 108 ++++++ 21 files changed, 2006 insertions(+), 73 deletions(-) create mode 100755 update/v4.0/Update.py create mode 100755 update/v4.0/Update.sh create mode 100644 var/www/blueprints/ail_2_ail_sync.py create mode 100644 var/www/templates/ail_2_ail/add_ail_server.html create mode 100644 var/www/templates/ail_2_ail/add_sync_queue.html create mode 100644 var/www/templates/ail_2_ail/ail_2_ail_dashboard.html create mode 100644 var/www/templates/ail_2_ail/ail_servers.html create mode 100644 var/www/templates/ail_2_ail/register_queue.html create mode 100644 var/www/templates/ail_2_ail/sync_queues.html create mode 100644 var/www/templates/ail_2_ail/view_ail_server.html create mode 100644 var/www/templates/ail_2_ail/view_sync_queue.html create mode 100644 var/www/templates/tags/block_tags_selector.html diff --git a/bin/core/Sync_module.py b/bin/core/Sync_module.py index 144cf37f..d8c01d83 100755 --- a/bin/core/Sync_module.py +++ b/bin/core/Sync_module.py @@ -51,9 +51,11 @@ class Sync_module(AbstractModule): print(message) ### REFRESH DICT - if self.last_refresh < ail_2_ail.get_last_updated_ail_instance(): - self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict() + if self.last_refresh < ail_2_ail.get_last_updated_sync_config(): self.last_refresh = time.time() + self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict() + print('sync queues refreshed') + print(self.dict_sync_queues) # Extract object from message # # TODO: USE JSON DICT ???? diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py index 43f5c706..a927641a 100755 --- a/bin/core/ail_2_ail.py +++ b/bin/core/ail_2_ail.py @@ -4,10 +4,13 @@ import os import json import secrets +import re import sys import time import uuid +from flask import escape + sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader @@ -16,6 +19,7 @@ import screen sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/')) from Item import Item +import Tag config_loader = ConfigLoader.ConfigLoader() r_cache = config_loader.get_redis_conn("Redis_Cache") @@ -23,6 +27,20 @@ r_serv_db = config_loader.get_redis_conn("ARDB_DB") r_serv_sync = config_loader.get_redis_conn("ARDB_DB") config_loader = None +def is_valid_uuid_v4(UUID): + if not UUID: + return False + UUID = UUID.replace('-', '') + try: + uuid_test = uuid.UUID(hex=UUID, version=4) + return uuid_test.hex == UUID + except: + return False + +def sanityze_uuid(UUID): + sanityzed_uuid = uuid.UUID(hex=UUID, version=4) + return str(sanityzed_uuid) + def generate_uuid(): return str(uuid.uuid4()).replace('-', '') @@ -32,7 +50,31 @@ def generate_sync_api_key(): def get_ail_uuid(): return r_serv_db.get('ail:uuid') -# # TODO: # TODO: # TODO: # TODO: # TODO: ADD SYNC MODE == PUSH +def is_valid_websocket_url(websocket_url): + regex_websocket_url = r'^(wss:\/\/)([0-9]{1,3}(?:\.[0-9]{1,3}){3}|(?=[^\/]{1,254}(?![^\/]))(?:(?=[a-zA-Z0-9-]{1,63}\.?)(?:xn--+)?[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)*\.?)+[a-zA-Z]{2,63}):([0-9]{1,5})$' + if re.match(regex_websocket_url, websocket_url): + return True + return False + +def is_valid_websocket_key(ail_key): + regex_key = r'^[A-Za-z0-9-_]{56}$' + if re.match(regex_key, ail_key): + return True + return False + +#### HANDLE CONFIG UPDATE #### + +def get_last_updated_sync_config(): + epoch = r_serv_sync.get(f'ail:instance:queue:last_updated_sync_config') + if not epoch: + epoch = 0 + return float(epoch) + +def set_last_updated_sync_config(): + epoch = int(time.time()) + r_serv_sync.set(f'ail:instance:queue:last_updated_sync_config', epoch) + return epoch + # # TODO: get connection status # # TODO: get connection METADATA ############################# @@ -49,12 +91,21 @@ def get_all_sync_clients(r_set=False): def get_sync_client_ail_uuid(client_id): return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid') -def get_sync_client_queue_uuid(client_id): - return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'queue_uuid') +# current: only one push registred +def get_client_id_by_ail_uuid(ail_uuid): + res = r_cache.smembers(f'ail_2_ail:ail_uuid:{ail_uuid}') + if res: + return int(res.pop()) + +def get_all_running_sync_servers(): + running_ail_servers= [] + for client_id in get_all_sync_clients(): + ail_uuid = get_sync_client_ail_uuid(client_id) + running_ail_servers.append(ail_uuid) + return running_ail_servers def delete_sync_client_cache(client_id): ail_uuid = get_sync_client_ail_uuid(client_id) - queue_uuid = get_sync_client_queue_uuid(client_id) # map ail_uuid/queue_uuid r_cache.srem(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) r_cache.srem(f'ail_2_ail:queue_uuid:{queue_uuid}', client_id) @@ -71,9 +122,31 @@ def delete_all_sync_clients_cache(): # -kill # -relaunch ## TODO: check command -def send_command_to_manager(command, client_id=-1): +def send_command_to_manager(command, client_id=-1, ail_uuid=None): dict_action = {'command': command, 'client_id': client_id} - r_cache.sadd('ail_2_ail:client_manager:command', dict_action) + if ail_uuid: + dict_action['ail_uuid'] = ail_uuid + str_command = json.dumps(dict_action) + r_cache.sadd('ail_2_ail:client_manager:command', str_command) + + +def refresh_ail_instance_connection(ail_uuid): + client_id = get_client_id_by_ail_uuid(ail_uuid) + launch_required = is_ail_instance_push_enabled(ail_uuid) + + print(client_id) + print(launch_required) + + # relaunch + if client_id and launch_required: + send_command_to_manager('relaunch', client_id=client_id) + # kill + elif client_id: + send_command_to_manager('kill', client_id=client_id) + # launch + elif launch_required: + send_command_to_manager('launch', ail_uuid=ail_uuid) + class AIL2AILClientManager(object): """AIL2AILClientManager.""" @@ -93,19 +166,23 @@ class AIL2AILClientManager(object): # return new client id def get_new_sync_client_id(self): - for new_id in range(100000): + for new_id in range(1, 100000): + new_id = str(new_id) if new_id not in self.clients: return str(new_id) def get_sync_client_ail_uuid(self, client_id): return self.clients[client_id]['ail_uuid'] - def get_sync_client_queue_uuid(self, client_id): - return self.clients[client_id]['queue_uuid'] + # def get_sync_client_queue_uuid(self, client_id): + # return self.clients[client_id]['queue_uuid'] - # # TODO: check PUSH ACL def get_all_sync_clients_to_launch(self): - return get_all_ail_instance() + ail_instances_to_launch = [] + for ail_uuid in get_all_ail_instance(): + if is_ail_instance_push_enabled(ail_uuid): + ail_instances_to_launch.append(ail_uuid) + return ail_instances_to_launch def relaunch_all_sync_clients(self): delete_all_sync_clients_cache() @@ -127,6 +204,8 @@ class AIL2AILClientManager(object): r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid', ail_uuid) r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'launch_time', int(time.time())) + r_cache.sadd('ail_2_ail:all_sync_clients', client_id) + # create map ail_uuid/queue_uuid r_cache.sadd(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) @@ -135,7 +214,7 @@ class AIL2AILClientManager(object): # # TODO: FORCE KILL ???????????? # # TODO: check if exists def kill_sync_client(self, client_id): - if not kill_screen_window('AIL_2_AIL', client_id): + if not screen.kill_screen_window('AIL_2_AIL',client_id): # # TODO: log kill error pass @@ -147,17 +226,19 @@ class AIL2AILClientManager(object): def get_manager_command(self): res = r_cache.spop('ail_2_ail:client_manager:command') if res: - return json.dumps(res) + print(res) + print(type(res)) + return json.loads(res) else: return None def execute_manager_command(self, command_dict): + print(command_dict) command = command_dict.get('command') if command == 'launch': - ail_uuid = int(command_dict.get('ail_uuid')) - queue_uuid = int(command_dict.get('queue_uuid')) - self.launch_sync_client(ail_uuid, queue_uuid) - elif command == 'relaunch': + ail_uuid = command_dict.get('ail_uuid') + self.launch_sync_client(ail_uuid) + elif command == 'relaunch_all': self.relaunch_all_sync_clients() else: # only one sync client @@ -165,13 +246,13 @@ class AIL2AILClientManager(object): if client_id < 1: print('Invalid client id') return None + client_id = str(client_id) if command == 'kill': self.kill_sync_client(client_id) elif command == 'relaunch': ail_uuid = self.get_sync_client_ail_uuid(client_id) - queue_uuid = self.get_sync_client_queue_uuid(client_id) self.kill_sync_client(client_id) - self.launch_sync_client(ail_uuid, queue_uuid) + self.launch_sync_client(ail_uuid) ######################################## ######################################## @@ -181,7 +262,6 @@ class AIL2AILClientManager(object): def get_sync_client_status(client_id): dict_client = {'id': client_id} dict_client['ail_uuid'] = get_sync_client_ail_uuid(client_id) - dict_client['queue_uuid'] = get_sync_client_queue_uuid(client_id) return dict_client def get_all_sync_client_status(): @@ -232,12 +312,18 @@ def get_ail_instance_all_sync_queue(ail_uuid): def is_ail_instance_queue(ail_uuid, queue_uuid): return r_serv_sync.sismember(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) +def exists_ail_instance(ail_uuid): + return r_serv_sync.exists(f'ail:instance:{ail_uuid}') + def get_ail_instance_url(ail_uuid): return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'url') def get_ail_instance_description(ail_uuid): return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'description') +def exists_ail_instance(ail_uuid): + return r_serv_sync.sismember('ail:instance:all', ail_uuid) + def is_ail_instance_push_enabled(ail_uuid): res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'push') return res == 'True' @@ -268,27 +354,44 @@ def change_pull_push_state(ail_uuid, pull=False, push=False): push = False r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'push', push) r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'pull', pull) - + set_last_updated_sync_config() + refresh_ail_instance_connection(ail_uuid) # # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) -# # TODO: push/pull -def get_ail_instance_metadata(ail_uuid): +def get_ail_instance_metadata(ail_uuid, sync_queues=False): dict_meta = {} dict_meta['uuid'] = ail_uuid dict_meta['url'] = get_ail_instance_url(ail_uuid) dict_meta['description'] = get_ail_instance_description(ail_uuid) + dict_meta['pull'] = is_ail_instance_pull_enabled(ail_uuid) + dict_meta['push'] = is_ail_instance_pull_enabled(ail_uuid) # # TODO: HIDE dict_meta['api_key'] = get_ail_instance_key(ail_uuid) + if sync_queues: + dict_meta['sync_queues'] = get_ail_instance_all_sync_queue(ail_uuid) + # # TODO: # - set UUID sync_queue return dict_meta +def get_all_ail_instances_metadata(): + l_servers = [] + for ail_uuid in get_all_ail_instance(): + l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) + return l_servers + +def get_ail_instances_metadata(l_ail_servers): + l_servers = [] + for ail_uuid in l_ail_servers: + l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) + return l_servers + # # TODO: VALIDATE URL # API KEY -def create_ail_instance(ail_uuid, url, api_key=None, description=None): +def create_ail_instance(ail_uuid, url, api_key=None, description=None, pull=True, push=True): r_serv_sync.sadd('ail:instance:all', ail_uuid) r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'url', url) ## API KEY ## @@ -300,6 +403,9 @@ def create_ail_instance(ail_uuid, url, api_key=None, description=None): #- API KEY -# if description: r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'description', description) + change_pull_push_state(ail_uuid, pull=pull, push=push) + set_last_updated_sync_config() + refresh_ail_instance_connection(ail_uuid) return ail_uuid def delete_ail_instance(ail_uuid): @@ -311,12 +417,54 @@ def delete_ail_instance(ail_uuid): r_serv_sync.srem('ail:instance:key:all', ail_uuid) r_serv_sync.delete(f'ail:instance:key:{key}', ail_uuid) r_serv_sync.srem('ail:instance:all', ail_uuid) + set_last_updated_sync_config() + refresh_ail_instance_connection(ail_uuid) + return ail_uuid -def get_last_updated_ail_instance(): - epoch = r_serv_sync.get(f'ail:instance:queue:last_updated') - if not epoch: - epoch = 0 - return float(epoch) +## API ## + +def api_create_ail_instance(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL uuid already exists"}, 400 + + if json_dict.get('pull'): + pull = True + else: + pull = False + if json_dict.get('push'): + push = True + else: + push = False + description = json_dict.get('description') + + ail_url = json_dict.get('url').replace(' ', '') + if not is_valid_websocket_url(ail_url): + return {"status": "error", "reason": "Invalid websocket url"}, 400 + + ail_key = json_dict.get('key') + if ail_key: + ail_key = ail_key.replace(' ', '') + if not is_valid_websocket_key(ail_key): + return {"status": "error", "reason": "Invalid websocket key"}, 400 + + res = create_ail_instance(ail_uuid, ail_url, api_key=ail_key, description=description, + pull=pull, push=push) + return res, 200 + +def api_delete_ail_instance(json_dict): + ail_uuid = json_dict.get('uuid', '').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid AIL uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + res = delete_ail_instance(ail_uuid) + return res, 200 #################### # # @@ -334,32 +482,57 @@ def get_all_sync_queue(): def get_sync_queue_all_ail_instance(queue_uuid): return r_serv_sync.smembers(f'ail2ail:sync_queue:ail_instance:{queue_uuid}') +def exists_sync_queue(queue_uuid): + return r_serv_sync.exists(f'ail2ail:sync_queue:{queue_uuid}') + # # TODO: check if push or pull enabled ? -def is_queue_used_by_ail_instace(queue_uuid): +def is_queue_used_by_ail_instance(queue_uuid): return r_serv_sync.exists(f'ail2ail:sync_queue:ail_instance:{queue_uuid}') # # TODO: add others filter def get_sync_queue_filter(queue_uuid): return r_serv_sync.smembers(f'ail2ail:sync_queue:filter:tags:{queue_uuid}') +def get_sync_queue_name(queue_uuid): + return r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'name') + +def get_sync_queue_description(queue_uuid): + return r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'description') + +def get_sync_queue_max_size(queue_uuid): + return r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'max_size') + # # TODO: ADD FILTER def get_sync_queue_metadata(queue_uuid): dict_meta = {} dict_meta['uuid'] = queue_uuid - dict_meta['name'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'name') - dict_meta['description'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'description') - dict_meta['max_size'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'max_size') + dict_meta['name'] = get_sync_queue_name(queue_uuid) + dict_meta['description'] = get_sync_queue_description(queue_uuid) + dict_meta['max_size'] = get_sync_queue_max_size(queue_uuid) + dict_meta['tags'] = get_sync_queue_filter(queue_uuid) # # TODO: TO ADD: - # - set uuid instance + # - get uuid instance return dict_meta +def get_all_queues_metadata(): + l_queues = [] + for queue_uuid in get_all_sync_queue(): + l_queues.append(get_sync_queue_metadata(queue_uuid)) + return l_queues + +def get_queues_metadata(l_queues_uuid): + l_queues = [] + for queue_uuid in l_queues_uuid: + l_queues.append(get_sync_queue_metadata(queue_uuid)) + return l_queues + ##################################################### def get_all_sync_queue_dict(): dict_sync_queues = {} for queue_uuid in get_all_sync_queue(): - if is_queue_used_by_ail_instace(queue_uuid): + if is_queue_used_by_ail_instance(queue_uuid): dict_queue = {} dict_queue['filter'] = get_sync_queue_filter(queue_uuid) @@ -374,14 +547,22 @@ def get_all_sync_queue_dict(): dict_sync_queues[queue_uuid] = dict_queue return dict_sync_queues +def is_queue_registred_by_ail_instance(queue_uuid, ail_uuid): + return r_serv_sync.sismember(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) def register_ail_to_sync_queue(ail_uuid, queue_uuid): r_serv_sync.sadd(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid) r_serv_sync.sadd(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + set_last_updated_sync_config() +# # # FIXME: TODO: delete sync queue ???????????????????????????????????????????????????? def unregister_ail_to_sync_queue(ail_uuid, queue_uuid): r_serv_sync.srem(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid) r_serv_sync.srem(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + set_last_updated_sync_config() + +def get_all_unregistred_queue_by_ail_instance(ail_uuid): + return r_serv_sync.sdiff('ail2ail:sync_queue:all', f'ail:instance:sync_queue:{ail_uuid}') # # TODO: optionnal name ??? # # TODO: SANITYZE TAGS @@ -397,15 +578,96 @@ def create_sync_queue(name, tags=[], description=None, max_size=100): for tag in tags: r_serv_sync.sadd(f'ail2ail:sync_queue:filter:tags:{queue_uuid}', tag) + set_last_updated_sync_config() return queue_uuid def delete_sync_queue(queue_uuid): for ail_uuid in get_sync_queue_all_ail_instance(queue_uuid): unregister_ail_to_sync_queue(ail_uuid, queue_uuid) r_serv_sync.delete(f'ail2ail:sync_queue:{queue_uuid}') + r_serv_sync.delete(f'ail2ail:sync_queue:filter:tags:{queue_uuid}') r_serv_sync.srem('ail2ail:sync_queue:all', queue_uuid) + set_last_updated_sync_config() return queue_uuid +## API ## + +# # TODO: sanityze queue_name +def api_create_sync_queue(json_dict): + description = json_dict.get('description') + description = escape(description) + queue_name = json_dict.get('name') + if queue_name: ################################################# + queue_name = escape(queue_name) + + tags = json_dict.get('tags') + if not tags: + {"status": "error", "reason": "no tags provided"}, 400 + if not Tag.are_enabled_tags(tags): + {"status": "error", "reason": "Invalid/Disabled tags"}, 400 + + max_size = json_dict.get('max_size') + if not max_size: + max_size = 100 + try: + max_size = int(max_size) + except ValueError: + {"status": "error", "reason": "Invalid queue size value"}, 400 + if not max_size > 0: + return {"status": "error", "reason": "Invalid queue size value"}, 400 + + queue_uuid = create_sync_queue(queue_name, tags=tags, description=description, + max_size=max_size) + return queue_uuid, 200 + +def api_delete_sync_queue(json_dict): + queue_uuid = json_dict.get('uuid', '').replace(' ', '').replace('-', '') + if not is_valid_uuid_v4(queue_uuid): + return {"status": "error", "reason": "Invalid Queue uuid"}, 400 + if not exists_sync_queue(queue_uuid): + return {"status": "error", "reason": "Queue Sync not found"}, 404 + + res = delete_sync_queue(queue_uuid) + return res, 200 + +def api_register_ail_to_sync_queue(json_dict): + ail_uuid = json_dict.get('ail_uuid', '').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid AIL uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + queue_uuid = json_dict.get('queue_uuid', '').replace(' ', '').replace('-', '') + if not is_valid_uuid_v4(queue_uuid): + return {"status": "error", "reason": "Invalid Queue uuid"}, 400 + + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + if not exists_sync_queue(queue_uuid): + return {"status": "error", "reason": "Queue Sync not found"}, 404 + if is_queue_registred_by_ail_instance(queue_uuid, ail_uuid): + return {"status": "error", "reason": "Queue already registred"}, 400 + + res = register_ail_to_sync_queue(ail_uuid, queue_uuid) + return res, 200 + +def api_unregister_ail_to_sync_queue(json_dict): + ail_uuid = json_dict.get('ail_uuid', '').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + queue_uuid = json_dict.get('queue_uuid', '').replace(' ', '').replace('-', '') + if not is_valid_uuid_v4(queue_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + if not exists_sync_queue(queue_uuid): + return {"status": "error", "reason": "Queue Sync not found"}, 404 + if not is_queue_registred_by_ail_instance(queue_uuid, ail_uuid): + return {"status": "error", "reason": "Queue not registred"}, 400 + + res = unregister_ail_to_sync_queue(ail_uuid, queue_uuid) + return res, 200 + ############################# # # #### SYNC REDIS QUEUE ####### @@ -487,6 +749,13 @@ if __name__ == '__main__': #res = delete_sync_queue(queue_uuid) #res = register_ail_to_sync_queue(ail_uuid, queue_uuid) - res = change_pull_push_state(ail_uuid, push=True, pull=True) + #res = change_pull_push_state(ail_uuid, push=True, pull=True) + + # print(get_ail_instance_all_sync_queue(ail_uuid)) + # print(get_all_sync_queue()) + # res = get_all_unregistred_queue_by_ail_instance(ail_uuid) + + ail_uuid = 'd82d3e61-2438-4ede-93bf-37b6fd9d7510' + res = get_client_id_by_ail_uuid(ail_uuid) print(res) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py index d4a6646d..5daf78ff 100755 --- a/bin/core/ail_2_ail_server.py +++ b/bin/core/ail_2_ail_server.py @@ -15,23 +15,14 @@ sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## +from pubsublogger import publisher from core import ail_2_ail - -obj_test = { - "format": "ail", - "version": 1, - "type": "item", - "meta": { - "ail:uuid": "03c51929-eeab-4d47-9dc0-c667f94c7d2c", - "ail:uuid_org": "28bc3db3-16da-461c-b20b-b944f4058708", - }, - "payload": { - "raw" : "MjhiYzNkYjMtMTZkYS00NjFjLWIyMGItYjk0NGY0MDU4NzA4Cg==", - "compress": "gzip", - "encoding": "base64" - } -} +# # TODO: refactor logging +#### LOGS #### +redis_logger = publisher +redis_logger.port = 6380 +redis_logger.channel = 'AIL_SYNC' ############################# @@ -92,7 +83,7 @@ async def pull(websocket, ail_uuid): if Obj: obj_ail_stream = ail_2_ail.create_ail_stream(Obj) Obj = json.dumps(obj_ail_stream) - print(Obj) + #print(Obj) # send objects await websocket.send(Obj) @@ -107,27 +98,29 @@ async def pull(websocket, ail_uuid): # PUSH: receive data from client # # TODO: optional queue_uuid async def push(websocket, ail_uuid): - print(ail_uuid) + #print(ail_uuid) while True: ail_stream = await websocket.recv() # # TODO: CHECK ail_stream ail_stream = json.loads(ail_stream) - print(ail_stream) + #print(ail_stream) ail_2_ail.add_ail_stream_to_sync_importer(ail_stream) async def ail_to_ail_serv(websocket, path): + # # TODO: save in class + ail_uuid = websocket.ail_uuid + remote_address = websocket.remote_address + path = unpack_path(path) + sync_mode = path['sync_mode'] # # TODO: check if it works # # DEBUG: print(websocket.ail_key) print(websocket.ail_uuid) - print(websocket.remote_address) - path = unpack_path(path) - sync_mode = path['sync_mode'] print(f'sync mode: {sync_mode}') await register(websocket) @@ -135,7 +128,8 @@ async def ail_to_ail_serv(websocket, path): if sync_mode == 'pull': await pull(websocket, websocket.ail_uuid) await websocket.close() - print('closed') + redis_logger.info(f'Connection closed: {ail_uuid} {remote_address}') + print(f'Connection closed: {ail_uuid} {remote_address}') elif sync_mode == 'push': await push(websocket, websocket.ail_uuid) @@ -163,29 +157,34 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): api_key = request_headers.get('Authorization', '') print(api_key) if api_key is None: - print('Missing token') + redis_logger.warning(f'Missing token: {self.remote_address}') + print(f'Missing token: {self.remote_address}') return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n" if not ail_2_ail.is_allowed_ail_instance_key(api_key): - print('Invalid token') + redis_logger.warning(f'Invalid token: {self.remote_address}') + print(f'Invalid token: {self.remote_address}') return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" # PATH try: dict_path = unpack_path(path) except Exception as e: - print('Invalid path') + redis_logger.warning(f'Invalid path: {self.remote_address}') + print(f'Invalid path: {self.remote_address}') return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" ail_uuid = ail_2_ail.get_ail_instance_by_key(api_key) if ail_uuid != dict_path['ail_uuid']: - print('Invalid token') + redis_logger.warning(f'Invalid token: {self.remote_address} {ail_uuid}') + print(f'Invalid token: {self.remote_address} {ail_uuid}') return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" if not api_key != ail_2_ail.get_ail_instance_key(api_key): - print('Invalid token') + redis_logger.warning(f'Invalid token: {self.remote_address} {ail_uuid}') + print(f'Invalid token: {self.remote_address} {ail_uuid}') return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" self.ail_key = api_key @@ -210,7 +209,9 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): # SYNC MODE if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=dict_path['sync_mode']): - print('SYNC mode disabled') + sync_mode = dict_path['sync_mode'] + redis_logger.warning(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}') + print(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}') return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n" # # TODO: CHECK API @@ -218,17 +219,16 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): pass else: - print('Invalid path') + print(f'Invalid path: {self.remote_address}') + redis_logger.info(f'Invalid path: {self.remote_address}') return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" ########################################### -# # TODO: logging # # TODO: clean shutdown / kill all connections # # TODO: API # # TODO: Filter object -# # TODO: process_request check # # TODO: IP/uuid to block if __name__ == '__main__': @@ -237,6 +237,7 @@ if __name__ == '__main__': port = 4443 print('Launching Server...') + redis_logger.info('Launching Server...') ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) cert_dir = os.environ['AIL_FLASK'] @@ -245,6 +246,7 @@ if __name__ == '__main__': start_server = websockets.serve(ail_to_ail_serv, "localhost", 4443, ssl=ssl_context, create_protocol=AIL_2_AIL_Protocol) print(f'Server Launched: wss://{host}:{port}') + redis_logger.info(f'Server Launched: wss://{host}:{port}') asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() diff --git a/bin/packages/Tag.py b/bin/packages/Tag.py index 266d6a68..bc044593 100755 --- a/bin/packages/Tag.py +++ b/bin/packages/Tag.py @@ -63,11 +63,17 @@ def get_galaxy_from_tag(tag): except IndexError: return None -def get_active_taxonomies(): - return r_serv_tags.smembers('active_taxonomies') +def get_active_taxonomies(r_set=False): + res = r_serv_tags.smembers('active_taxonomies') + if r_set: + return set(res) + return res -def get_active_galaxies(): - return r_serv_tags.smembers('active_galaxies') +def get_active_galaxies(r_set=False): + res = r_serv_tags.smembers('active_galaxies') + if r_set: + return set(res) + return res def get_all_taxonomies_tags(): # # TODO: add + REMOVE + Update return r_serv_tags.smembers('active_taxonomies_tags') @@ -75,6 +81,44 @@ def get_all_taxonomies_tags(): # # TODO: add + REMOVE + Update def get_all_galaxies_tags(): # # TODO: add + REMOVE + Update return r_serv_tags.smembers('active_galaxies_tags') +def get_taxonomies_enabled_tags(r_list=False): + l_tag_keys = [] + for taxonomie in get_active_taxonomies(): + l_tag_keys.append(f'active_tag_{taxonomie}') + if len(l_tag_keys) > 1: + res = r_serv_tags.sunion(l_tag_keys[0], *l_tag_keys[1:]) + elif l_tag_keys: + res = r_serv_tags.smembers(l_tag_keys[0]) + if r_list: + return list(res) + else: + return res + +def get_galaxies_enabled_tags(): + l_tag_keys = [] + for galaxy in get_active_galaxies(): + l_tag_keys.append(f'active_tag_galaxies_{galaxy}') + if len(l_tag_keys) > 1: + return r_serv_tags.sunion(l_tag_keys[0], *l_tag_keys[1:]) + elif l_tag_keys: + return r_serv_tags.smembers(l_tag_keys[0]) + else: + return [] + +def get_taxonomie_enabled_tags(taxonomie, r_list=False): + res = r_serv_tags.smembers(f'active_tag_{taxonomie}') + if r_list: + return list(res) + else: + return res + +def get_galaxy_enabled_tags(galaxy, r_list=False): + res = r_serv_tags.smembers(f'active_tag_galaxies_{galaxy}') + if r_list: + return list(res) + else: + return res + def is_taxonomie_tag_enabled(taxonomie, tag): if tag in r_serv_tags.smembers('active_tag_' + taxonomie): return True @@ -136,6 +180,67 @@ def is_valid_tags_taxonomies_galaxy(list_tags, list_tags_galaxy): return False return True +def is_taxonomie_tag(tag, namespace=None): + if not namespace: + namespace = tag.split(':')[0] + if namespace != 'misp-galaxy': + return True + else: + return False + +def is_galaxy_tag(tag, namespace=None): + if not namespace: + namespace = tag.split(':')[0] + if namespace == 'misp-galaxy': + return True + else: + return False + +# # TODO: +# def is_valid_tag(tag): +# pass + +def is_enabled_tag(tag, enabled_namespace=None): + if is_taxonomie_tag(tag): + return is_enabled_taxonomie_tag(tag, enabled_taxonomies=enabled_namespace) + else: + return is_enabled_galaxy_tag(tag, enabled_galaxies=enabled_namespace) + +def are_enabled_tags(tags): + enabled_taxonomies = get_active_taxonomies(r_set=True) + enabled_galaxies = get_active_galaxies(r_set=True) + for tag in tags: + if is_taxonomie_tag(tag): + res = is_enabled_taxonomie_tag(tag, enabled_taxonomies=enabled_taxonomies) + else: + res = is_enabled_galaxy_tag(tag, enabled_galaxies=enabled_galaxies) + if not res: + return False + return True + +def is_enabled_taxonomie_tag(tag, enabled_taxonomies=None): + if not enabled_taxonomies: + enabled_taxonomies = get_active_taxonomies() + taxonomie = get_taxonomie_from_tag(tag) + if taxonomie is None: + return False + if taxonomie not in enabled_taxonomies: + return False + if not is_taxonomie_tag_enabled(taxonomie, tag): + return False + +def is_enabled_galaxy_tag(tag, enabled_galaxies=None): + if not enabled_galaxies: + enabled_galaxies = get_active_galaxies() + galaxy = get_galaxy_from_tag(tag) + if galaxy is None: + return False + if galaxy not in enabled_galaxies: + return False + if not is_galaxy_tag_enabled(galaxy, tag): + return False + return True + #### #### def is_tag_in_all_tag(tag): @@ -144,6 +249,31 @@ def is_tag_in_all_tag(tag): else: return False +def get_tag_synonyms(tag): + return r_serv_tags.smembers(f'synonym_tag_{tag}') + +def get_tag_dislay_name(tag): + tag_synonyms = get_tag_synonyms(tag) + if not tag_synonyms: + return tag + else: + return tag + ', '.join(tag_synonyms) + +def get_tags_selector_dict(tags): + list_tags = [] + for tag in tags: + list_tags.append(get_tag_selector_dict(tag)) + return list_tags + +def get_tag_selector_dict(tag): + return {'name':get_tag_dislay_name(tag),'id':tag} + +def get_tags_selector_data(): + dict_selector = {} + dict_selector['active_taxonomies'] = get_active_taxonomies() + dict_selector['active_galaxies'] = get_active_galaxies() + return dict_selector + def get_min_tag(tag): tag = tag.split('=') if len(tag) > 1: diff --git a/requirements.txt b/requirements.txt index cb38df0b..0787a0ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,9 @@ redis==2.10.6 python-magic>0.4.15 yara-python>4.0.2 +# AIL Sync +websockets>9.0 + # Hashlib crcmod mmh3>2.5 diff --git a/update/v4.0/Update.py b/update/v4.0/Update.py new file mode 100755 index 00000000..bd6596b3 --- /dev/null +++ b/update/v4.0/Update.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import re +import sys +import time +import redis +import datetime + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader + +sys.path.append(os.path.join(os.environ['AIL_HOME'], 'update', 'bin')) +from ail_updater import AIL_Updater + +class Updater(AIL_Updater): + """default Updater.""" + + def __init__(self, version): + super(Updater, self).__init__(version) + +if __name__ == '__main__': + + updater = Updater('v4.0') + updater.run_update() diff --git a/update/v4.0/Update.sh b/update/v4.0/Update.sh new file mode 100755 index 00000000..09bc3f4f --- /dev/null +++ b/update/v4.0/Update.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +[ -z "$AIL_HOME" ] && echo "Needs the env var AIL_HOME. Run the script from the virtual environment." && exit 1; +[ -z "$AIL_REDIS" ] && echo "Needs the env var AIL_REDIS. Run the script from the virtual environment." && exit 1; +[ -z "$AIL_ARDB" ] && echo "Needs the env var AIL_ARDB. Run the script from the virtual environment." && exit 1; +[ -z "$AIL_BIN" ] && echo "Needs the env var AIL_ARDB. Run the script from the virtual environment." && exit 1; +[ -z "$AIL_FLASK" ] && echo "Needs the env var AIL_FLASK. Run the script from the virtual environment." && exit 1; + +export PATH=$AIL_HOME:$PATH +export PATH=$AIL_REDIS:$PATH +export PATH=$AIL_ARDB:$PATH +export PATH=$AIL_BIN:$PATH +export PATH=$AIL_FLASK:$PATH + +GREEN="\\033[1;32m" +DEFAULT="\\033[0;39m" + +echo -e $GREEN"Shutting down AIL ..."$DEFAULT +bash ${AIL_BIN}/LAUNCH.sh -ks +wait + +# SUBMODULES # +git submodule update + +echo "" +echo -e $GREEN"Installing nose ..."$DEFAULT +pip3 install -U websockets + +exit 0 diff --git a/var/www/Flask_server.py b/var/www/Flask_server.py index 5fa0fbfb..c8eea936 100755 --- a/var/www/Flask_server.py +++ b/var/www/Flask_server.py @@ -45,6 +45,7 @@ from blueprints.import_export import import_export from blueprints.objects_item import objects_item from blueprints.hunters import hunters from blueprints.old_endpoints import old_endpoints +from blueprints.ail_2_ail_sync import ail_2_ail_sync Flask_dir = os.environ['AIL_FLASK'] @@ -103,6 +104,7 @@ app.register_blueprint(import_export, url_prefix=baseUrl) app.register_blueprint(objects_item, url_prefix=baseUrl) app.register_blueprint(hunters, url_prefix=baseUrl) app.register_blueprint(old_endpoints, url_prefix=baseUrl) +app.register_blueprint(ail_2_ail_sync, url_prefix=baseUrl) # ========= =========# # ========= Cookie name ======== diff --git a/var/www/blueprints/ail_2_ail_sync.py b/var/www/blueprints/ail_2_ail_sync.py new file mode 100644 index 00000000..b9d35a42 --- /dev/null +++ b/var/www/blueprints/ail_2_ail_sync.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +''' + Blueprint Flask: crawler splash endpoints: dashboard, onion crawler ... +''' + +import os +import sys +import json +import random + +from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response, make_response +from flask_login import login_required, current_user, login_user, logout_user + +sys.path.append('modules') +import Flask_config + +# Import Role_Manager +from Role_Manager import create_user_db, check_password_strength, check_user_role_integrity +from Role_Manager import login_admin, login_analyst, login_read_only + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages')) +import Tag + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib')) +import item_basic +import Tracker + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'core')) +import ail_2_ail + +bootstrap_label = Flask_config.bootstrap_label + +# ============ BLUEPRINT ============ +ail_2_ail_sync = Blueprint('ail_2_ail_sync', __name__, template_folder=os.path.join(os.environ['AIL_FLASK'], 'templates/ail_2_ail')) + +# ============ VARIABLES ============ + + + +# ============ FUNCTIONS ============ +def api_validator(api_response): + if api_response: + return Response(json.dumps(api_response[0], indent=2, sort_keys=True), mimetype='application/json'), api_response[1] + +def create_json_response(data, status_code): + return Response(json.dumps(data, indent=2, sort_keys=True), mimetype='application/json'), status_code + +# ============= ROUTES ============== + +@ail_2_ail_sync.route('/settings/ail_2_ail', methods=['GET']) +@login_required +@login_admin +def ail_2_ail_dashboard(): + l_servers = ail_2_ail.get_all_running_sync_servers() + l_servers = ail_2_ail.get_ail_instances_metadata(l_servers) + return render_template("ail_2_ail_dashboard.html", l_servers=l_servers) + +###################### +# # +#### AIL INSTANCE #### + +# # TODO: add more metadata => queues + connections +@ail_2_ail_sync.route('/settings/ail_2_ail/servers', methods=['GET']) +@login_required +@login_admin +def ail_servers(): + l_servers = ail_2_ail.get_all_ail_instances_metadata() + return render_template("ail_servers.html", l_servers=l_servers) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/view', methods=['GET']) +@login_required +@login_admin +def ail_server_view(): + ail_uuid = request.args.get('uuid') + server_metadata = ail_2_ail.get_ail_instance_metadata(ail_uuid,sync_queues=True) + server_metadata['sync_queues'] = ail_2_ail.get_queues_metadata(server_metadata['sync_queues']) + + return render_template("view_ail_server.html", server_metadata=server_metadata, + bootstrap_label=bootstrap_label) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/add', methods=['GET', 'POST']) +@login_required +@login_admin +def ail_server_add(): + if request.method == 'POST': + register_key = request.form.get("register_key") + ail_uuid = request.form.get("ail_uuid") + url = request.form.get("ail_url") + description = request.form.get("ail_description") + pull = request.form.get("ail_pull") + push = request.form.get("ail_push") + + input_dict = {"uuid": ail_uuid, "url": url, + "description": description, + "pull": pull, "push": push} + + if register_key: + input_dict['key'] = request.form.get("ail_key") + + print(input_dict) + + res = ail_2_ail.api_create_ail_instance(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=res)) + else: + + return render_template("add_ail_server.html") + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/edit', methods=['GET', 'POST']) +@login_required +@login_admin +def ail_server_edit(): + ail_uuid = request.args.get('ail_uuid') + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/delete', methods=['GET']) +@login_required +@login_admin +def ail_server_delete(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_delete_ail_instance(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_servers')) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/sync_queues', methods=['GET']) +@login_required +@login_admin +def ail_server_sync_queues(): + ail_uuid = request.args.get('uuid') + sync_queues = ail_2_ail.get_all_unregistred_queue_by_ail_instance(ail_uuid) + sync_queues = ail_2_ail.get_queues_metadata(sync_queues) + + return render_template("register_queue.html", bootstrap_label=bootstrap_label, + ail_uuid=ail_uuid, sync_queues=sync_queues) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/sync_queues/register', methods=['GET']) +@login_required +@login_admin +def ail_server_sync_queues_register(): + + ail_uuid = request.args.get('ail_uuid') + queue_uuid = request.args.get('queue_uuid') + input_dict = {"ail_uuid": ail_uuid, "queue_uuid": queue_uuid} + res = ail_2_ail.api_register_ail_to_sync_queue(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=ail_uuid)) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/sync_queues/unregister', methods=['GET']) +@login_required +@login_admin +def ail_server_sync_queues_unregister(): + + ail_uuid = request.args.get('ail_uuid') + queue_uuid = request.args.get('queue_uuid') + input_dict = {"ail_uuid": ail_uuid, "queue_uuid": queue_uuid} + res = ail_2_ail.api_unregister_ail_to_sync_queue(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=ail_uuid)) + +#################### +# # +#### SYNC QUEUE #### + +@ail_2_ail_sync.route('/settings/ail_2_ail/sync_queues', methods=['GET']) +# @login_required +# @login_admin +def sync_queues(): + ail_uuid = request.args.get('ail_uuid') + l_queues = ail_2_ail.get_all_queues_metadata() + return render_template("sync_queues.html", bootstrap_label=bootstrap_label, + ail_uuid=ail_uuid, l_queues=l_queues) + +@ail_2_ail_sync.route('/settings/ail_2_ail/sync_queue/view', methods=['GET']) +# @login_required +# @login_admin +def sync_queue_view(): + queue_uuid = request.args.get('uuid') + queue_metadata = ail_2_ail.get_sync_queue_metadata(queue_uuid) + ail_servers = ail_2_ail.get_sync_queue_all_ail_instance(queue_uuid) + queue_metadata['ail_servers'] = ail_2_ail.get_ail_instances_metadata(ail_servers) + return render_template("view_sync_queue.html", queue_metadata=queue_metadata, + bootstrap_label=bootstrap_label) + +@ail_2_ail_sync.route('/settings/ail_2_ail/sync_queue/add', methods=['GET', 'POST']) +@login_required +@login_admin +def sync_queue_add(): + if request.method == 'POST': + queue_name = request.form.get("queue_name") + description = request.form.get("queue_description") + max_size = request.form.get("queue_max_size") + + taxonomies_tags = request.form.get('taxonomies_tags') + if taxonomies_tags: + try: + taxonomies_tags = json.loads(taxonomies_tags) + except Exception: + taxonomies_tags = [] + else: + taxonomies_tags = [] + galaxies_tags = request.form.get('galaxies_tags') + if galaxies_tags: + try: + galaxies_tags = json.loads(galaxies_tags) + except Exception: + galaxies_tags = [] + + tags = taxonomies_tags + galaxies_tags + input_dict = {"name": queue_name, "tags": tags, + "description": description, + "max_size": max_size} + + res = ail_2_ail.api_create_sync_queue(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + + return redirect(url_for('ail_2_ail_sync.sync_queue_view', uuid=res)) + else: + return render_template("add_sync_queue.html", tags_selector_data=Tag.get_tags_selector_data()) + +@ail_2_ail_sync.route('/settings/ail_2_ail/sync_queue/edit', methods=['GET', 'POST']) +# @login_required +# @login_admin +def sync_queue_edit(): + return '' + +@ail_2_ail_sync.route('/settings/ail_2_ail/sync_queue/delete', methods=['GET']) +# @login_required +# @login_admin +def sync_queue_delete(): + queue_uuid = request.args.get('uuid') + input_dict = {"uuid": queue_uuid} + res = ail_2_ail.api_delete_sync_queue(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.sync_queues')) + +#### JSON #### + +## - - ## diff --git a/var/www/blueprints/tags_ui.py b/var/www/blueprints/tags_ui.py index 45dde709..e91c4f0a 100644 --- a/var/www/blueprints/tags_ui.py +++ b/var/www/blueprints/tags_ui.py @@ -94,6 +94,34 @@ def get_all_obj_tags(): return jsonify(res) return jsonify(Tag.get_all_obj_tags(object_type)) +@tags_ui.route('/tag/taxonomies/tags/enabled/json') +@login_required +@login_read_only +def tag_taxonomies_tags_enabled_json(): + return jsonify(Tag.get_taxonomies_enabled_tags(r_list=True)) + +@tags_ui.route('/tag/galaxies/tags/enabled/json') +@login_required +@login_read_only +def tag_galaxies_tags_enabled_json(): + tags = Tag.get_galaxies_enabled_tags() + return jsonify(Tag.get_tags_selector_dict(tags)) + +@tags_ui.route('/tag/taxonomie/tags/enabled/json') +@login_required +@login_read_only +def tag_taxonomie_tags_enabled_json(): + taxonomie = request.args.get('taxonomie') + return jsonify(Tag.get_taxonomie_enabled_tags(taxonomie, r_list=True)) + +@tags_ui.route('/tag/galaxy/tags/enabled/json') +@login_required +@login_read_only +def tag_galaxy_tags_enabled_json(): + galaxy = request.args.get('galaxy') + tags = Tag.get_galaxy_enabled_tags(galaxy, r_list=True) + return jsonify(Tag.get_tags_selector_dict(tags)) + @tags_ui.route('/tag/search/item') @login_required @login_read_only diff --git a/var/www/modules/Tags/Flask_Tags.py b/var/www/modules/Tags/Flask_Tags.py index 010610e6..5db95e48 100644 --- a/var/www/modules/Tags/Flask_Tags.py +++ b/var/www/modules/Tags/Flask_Tags.py @@ -59,6 +59,13 @@ for name, tags in clusters.items(): #galaxie name + tags def one(): return 1 +# TODO: +# TODO: +# TODO: +# TODO: +# TODO: +# TODO: +# # TODO: replace me with get_tag_selector_dict() def get_tags_with_synonyms(tag): str_synonyms = ' - synonyms: ' synonyms = r_serv_tags.smembers('synonym_tag_' + tag) diff --git a/var/www/templates/ail_2_ail/add_ail_server.html b/var/www/templates/ail_2_ail/add_ail_server.html new file mode 100644 index 00000000..297fabac --- /dev/null +++ b/var/www/templates/ail_2_ail/add_ail_server.html @@ -0,0 +1,170 @@ + + + + + AIL-Framework + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ +
+
+
Create AIL Server
+
+
+ +
+ +
+
+
+
+
+
+ +
+ +
+
+
+
+ +
+ +
+
+
+
+ +
+ +
+ SYNC Modes: +
+
+
+
+ + +
+
+
+
+ + +
+
+
+ +
+ Server Key : +
+ +
+  Generate Server Key   +
+ + +
+
+ +
+ A new key will be generated for this AIL Server +
+
+
+
+
+
+ +
+
+ + +
+
+ SYNC: create a new AIL Server +
+
+ + +
+ + +
+ + + +
+
+ + +
+ +
+
+ + + + diff --git a/var/www/templates/ail_2_ail/add_sync_queue.html b/var/www/templates/ail_2_ail/add_sync_queue.html new file mode 100644 index 00000000..6d4507b4 --- /dev/null +++ b/var/www/templates/ail_2_ail/add_sync_queue.html @@ -0,0 +1,139 @@ + + + + + AIL-Framework + + + + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ +
+
+
Create SYNC Queue
+
+
+ +
+ +
+
+
+
+
+
+ +
+ +
+
+ +
+ +
+ Queue Max Size +
+
+ +
+
+
+
+ +
+ +
+
+ Tags Filter +
+
+ {% include 'tags/block_tags_selector.html' %} +
+
+ +
+
+ SYNC: create a new Sync Queue +
+
+ +
+ + +
+ + + +
+
+ + +
+ +
+
+ + + + diff --git a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html new file mode 100644 index 00000000..0efc7f82 --- /dev/null +++ b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html @@ -0,0 +1,100 @@ + + + + + AIL-SYNC + + + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ +

Connected Servers:

+ + + + + + + + + + + + {% for dict_server in l_servers %} + + + + + + + {% endfor %} + +
uuidurldescriptionsync queues
+ + {{ dict_server['uuid']}} + + {{ dict_server['url']}}{{ dict_server['description']}} + {% for queue_uuid in dict_server['sync_queues'] %} + + {{queue_uuid}}
+
+ {% endfor %} +
+ +
+ +
+
+ + + + diff --git a/var/www/templates/ail_2_ail/ail_servers.html b/var/www/templates/ail_2_ail/ail_servers.html new file mode 100644 index 00000000..88b3942b --- /dev/null +++ b/var/www/templates/ail_2_ail/ail_servers.html @@ -0,0 +1,106 @@ + + + + + AIL-SYNC + + + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ + + + + + + + + + + + + + {% for dict_server in l_servers %} + + + + + + + {% endfor %} + +
uuidurldescriptionsync queues
+ + {{ dict_server['uuid']}} + + {{ dict_server['url']}}{{ dict_server['description']}} + {% for queue_uuid in dict_server['sync_queues'] %} + + {{queue_uuid}}
+
+ {% endfor %} +
+ +
+ +
+
+ + + + diff --git a/var/www/templates/ail_2_ail/register_queue.html b/var/www/templates/ail_2_ail/register_queue.html new file mode 100644 index 00000000..068a53ee --- /dev/null +++ b/var/www/templates/ail_2_ail/register_queue.html @@ -0,0 +1,110 @@ + + + + + AIL-Framework + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ +
+
+
{{ail_uuid}} Register a SYNC Queue
+
+
+ + + + + + + + + + + + + {% for dict_queue in sync_queues %} + + + + + + + + {% endfor %} + +
nameuuiddescriptionmax sizeRegister Sync Queue
{{ dict_queue['name']}} + + {{ dict_queue['uuid']}} + +
+ {% for tag in dict_queue['tags'] %} + {{ tag }} + {% endfor %} +
+
{{ dict_queue['description']}}{{ dict_queue['max_size']}} + + + +
+ +
+
+ + +
+
+
+ + + + diff --git a/var/www/templates/ail_2_ail/sync_queues.html b/var/www/templates/ail_2_ail/sync_queues.html new file mode 100644 index 00000000..72b40844 --- /dev/null +++ b/var/www/templates/ail_2_ail/sync_queues.html @@ -0,0 +1,105 @@ + + + + + AIL-SYNC + + + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ + + + + + + + + + + + + + {% for dict_queue in l_queues %} + + + + + + + {% endfor %} + +
nameuuiddescriptionmax size
{{ dict_queue['name']}} + + {{ dict_queue['uuid']}} +
+ {% for tag in dict_queue['tags'] %} + {{ tag }} + {% endfor %} +
+
+
{{ dict_queue['description']}}{{ dict_queue['max_size']}}
+ +
+ +
+
+ + + + diff --git a/var/www/templates/ail_2_ail/view_ail_server.html b/var/www/templates/ail_2_ail/view_ail_server.html new file mode 100644 index 00000000..84487a73 --- /dev/null +++ b/var/www/templates/ail_2_ail/view_ail_server.html @@ -0,0 +1,168 @@ + + + + + AIL-Framework + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ +
+
+
{{server_metadata['uuid']}}
+
+
+ + + + + + + + + + + + + + + + + + + + + + + +
URL + {{server_metadata['url']}} +
Api Key + {{server_metadata['api_key']}} +
Description + {{server_metadata['description']}} +
Pull + {{server_metadata['pull']}} +
Push + {{server_metadata['push']}} +
+ + + +
+
+
SYNC QUEUES:
+
+
+ + + + + + + + + + + + + + + + + {% for dict_queue in server_metadata['sync_queues'] %} + + + + + + + + {% endfor %} + +
nameuuiddescriptionmax sizeUnregister Queue
{{ dict_queue['name']}} + + {{ dict_queue['uuid']}} + +
+ {% for tag in dict_queue['tags'] %} + {{ tag }} + {% endfor %} +
+
{{ dict_queue['description']}}{{ dict_queue['max_size']}} + + + +
+ + +
+ + +
+
+ + +
+ +
+ + + + diff --git a/var/www/templates/ail_2_ail/view_sync_queue.html b/var/www/templates/ail_2_ail/view_sync_queue.html new file mode 100644 index 00000000..44e51c05 --- /dev/null +++ b/var/www/templates/ail_2_ail/view_sync_queue.html @@ -0,0 +1,159 @@ + + + + + AIL-Framework + + + + + + + + + + + + + + + + + {% include 'nav_bar.html' %} + +
+
+ + {% include 'settings/menu_sidebar.html' %} + +
+ +
+
+

SYNC Queue: {{queue_metadata['uuid']}}

+
+
+ + + + + + + + + + + + + + + + + + + +
Name + {{queue_metadata['name']}} +
Tags Filter +
+ {% for tag in queue_metadata['tags'] %} + {{ tag }} + {% endfor %} +
+
Description + {{queue_metadata['description']}} +
Max Size + {{queue_metadata['max_size']}} +
+ + + +
+
+
AIL Server:
+
+
+ + + + + + + + + + + + + + + + + + {% for dict_server in queue_metadata['ail_servers'] %} + + + + + + + + + {% endfor %} + +
uuidurldescriptionpullpush
{{ dict_server['uuid']}}{{ dict_server['url']}}{{ dict_server['description']}}{{ dict_server['pull']}}{{ dict_server['push']}} + + + +
+ + +
+ + +
+
+ + +
+ +
+ + + + diff --git a/var/www/templates/settings/menu_sidebar.html b/var/www/templates/settings/menu_sidebar.html index b88f7c9e..18f24242 100644 --- a/var/www/templates/settings/menu_sidebar.html +++ b/var/www/templates/settings/menu_sidebar.html @@ -17,6 +17,29 @@ + + diff --git a/var/www/templates/tags/block_tags_selector.html b/var/www/templates/tags/block_tags_selector.html new file mode 100644 index 00000000..2b6df731 --- /dev/null +++ b/var/www/templates/tags/block_tags_selector.html @@ -0,0 +1,108 @@ +
+ +
+ + + +
+ +
+ + + + + + + + From 658cb73d4e03f649812b56f4f8f3cc1ac7a90372 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Fri, 26 Nov 2021 16:13:46 +0100 Subject: [PATCH 07/10] chg: [ail sync] add sync api (ping, version) + UI/client error handler --- bin/core/ail_2_ail.py | 140 ++++++++++++++++-- bin/core/ail_2_ail_client.py | 117 +++++++++++---- bin/core/ail_2_ail_server.py | 51 +++++-- var/www/blueprints/ail_2_ail_sync.py | 26 +++- .../ail_2_ail/ail_2_ail_dashboard.html | 16 +- var/www/templates/ail_2_ail/ail_servers.html | 18 ++- .../templates/ail_2_ail/view_ail_server.html | 49 ++++++ 7 files changed, 363 insertions(+), 54 deletions(-) diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py index a927641a..285c0162 100755 --- a/bin/core/ail_2_ail.py +++ b/bin/core/ail_2_ail.py @@ -9,7 +9,10 @@ import sys import time import uuid +import subprocess + from flask import escape +from pubsublogger import publisher sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader @@ -27,6 +30,12 @@ r_serv_db = config_loader.get_redis_conn("ARDB_DB") r_serv_sync = config_loader.get_redis_conn("ARDB_DB") config_loader = None +#### LOGS #### +# redis_logger = publisher +# redis_logger.port = 6380 +# redis_logger.channel = 'AIL_SYNC' +##-- LOGS --## + def is_valid_uuid_v4(UUID): if not UUID: return False @@ -50,6 +59,9 @@ def generate_sync_api_key(): def get_ail_uuid(): return r_serv_db.get('ail:uuid') +def get_sync_server_version(): + return '0.1' + def is_valid_websocket_url(websocket_url): regex_websocket_url = r'^(wss:\/\/)([0-9]{1,3}(?:\.[0-9]{1,3}){3}|(?=[^\/]{1,254}(?![^\/]))(?:(?=[a-zA-Z0-9-]{1,63}\.?)(?:xn--+)?[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)*\.?)+[a-zA-Z]{2,63}):([0-9]{1,5})$' if re.match(regex_websocket_url, websocket_url): @@ -134,9 +146,6 @@ def refresh_ail_instance_connection(ail_uuid): client_id = get_client_id_by_ail_uuid(ail_uuid) launch_required = is_ail_instance_push_enabled(ail_uuid) - print(client_id) - print(launch_required) - # relaunch if client_id and launch_required: send_command_to_manager('relaunch', client_id=client_id) @@ -193,7 +202,7 @@ class AIL2AILClientManager(object): def launch_sync_client(self, ail_uuid): dir_project = os.environ['AIL_HOME'] client_id = self.get_new_sync_client_id() - script_options = f'-a {ail_uuid} -m push -i {client_id}' + script_options = f'-u {ail_uuid} -m push -i {client_id}' screen.create_screen(AIL2AILClientManager.SCREEN_NAME) screen.launch_uniq_windows_script(AIL2AILClientManager.SCREEN_NAME, client_id, dir_project, @@ -226,14 +235,11 @@ class AIL2AILClientManager(object): def get_manager_command(self): res = r_cache.spop('ail_2_ail:client_manager:command') if res: - print(res) - print(type(res)) return json.loads(res) else: return None def execute_manager_command(self, command_dict): - print(command_dict) command = command_dict.get('command') if command == 'launch': ail_uuid = command_dict.get('ail_uuid') @@ -357,6 +363,16 @@ def change_pull_push_state(ail_uuid, pull=False, push=False): set_last_updated_sync_config() refresh_ail_instance_connection(ail_uuid) +def get_ail_server_version(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'version') + +def get_ail_server_ping(ail_uuid): + res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'ping') + return res == 'True' + +def get_ail_server_error(ail_uuid): + return r_cache.hget(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error') + # # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) def get_ail_instance_metadata(ail_uuid, sync_queues=False): dict_meta = {} @@ -365,6 +381,9 @@ def get_ail_instance_metadata(ail_uuid, sync_queues=False): dict_meta['description'] = get_ail_instance_description(ail_uuid) dict_meta['pull'] = is_ail_instance_pull_enabled(ail_uuid) dict_meta['push'] = is_ail_instance_pull_enabled(ail_uuid) + dict_meta['ping'] = get_ail_server_ping(ail_uuid) + dict_meta['version'] = get_ail_server_version(ail_uuid) + dict_meta['error'] = get_ail_server_error(ail_uuid) # # TODO: HIDE dict_meta['api_key'] = get_ail_instance_key(ail_uuid) @@ -421,8 +440,105 @@ def delete_ail_instance(ail_uuid): refresh_ail_instance_connection(ail_uuid) return ail_uuid +## WEBSOCKET API - ERRORS ## + +def set_ail_server_version(ail_uuid, version): + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'version', version) + +def set_ail_server_ping(ail_uuid, pong): + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'ping', bool(pong)) + +def save_ail_server_error(ail_uuid, error_message): + r_cache.hset(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error', error_message) + +def clear_save_ail_server_error(ail_uuid): + r_cache.hdel(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error') + +def _get_remote_ail_server_response(ail_uuid, api_request): + websocket_client = os.path.join(os.environ['AIL_BIN'], 'core', 'ail_2_ail_client.py') + l_command = ['python', websocket_client, '-u', ail_uuid, '-m', 'api', '-a', api_request] + process = subprocess.Popen(l_command, stdout=subprocess.PIPE) + while process.poll() is None: + time.sleep(1) + + if process.returncode == 0: + # Scrapy-Splash ERRORS + if process.stderr: + stderr = process.stderr.read().decode() + if stderr: + print(f'stderr: {stderr}') + + if process.stdout: + output = process.stdout.read().decode() + #print(output) + if output: + try: + message = json.loads(output) + return message + except Exception as e: + print(e) + error = f'Error: {e}' + save_ail_server_error(ail_uuid, error) + return + # ERROR + else: + if process.stderr: + stderr = process.stderr.read().decode() + else: + stderr = '' + if process.stdout: + stdout = process.stdout.read().decode() + else: + stdout ='' + if stderr or stdout: + error = f'-stderr-\n{stderr}\n-stdout-\n{stdout}' + print(error) + save_ail_server_error(ail_uuid, error) + return + +def get_remote_ail_server_version(ail_uuid): + response = _get_remote_ail_server_response(ail_uuid, 'version') + if response: + version = response.get('version') + if version: + version = float(version) + if version >= 0.1: + set_ail_server_version(ail_uuid, version) + return version + +# # TODO: CATCH WEBSOCKETS RESPONSE CODE +def ping_remote_ail_server(ail_uuid): + response = _get_remote_ail_server_response(ail_uuid, 'ping') + if response: + response = response.get('message', False) + pong = response == 'pong' + set_ail_server_ping(ail_uuid, pong) + return pong + ## API ## +def api_ping_remote_ail_server(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + res = ping_remote_ail_server(ail_uuid) + return res, 200 + +def api_get_remote_ail_server_version(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + res = get_remote_ail_server_version(ail_uuid) + return res, 200 + def api_create_ail_instance(json_dict): ail_uuid = json_dict.get('uuid').replace(' ', '') if not is_valid_uuid_v4(ail_uuid): @@ -755,7 +871,13 @@ if __name__ == '__main__': # print(get_all_sync_queue()) # res = get_all_unregistred_queue_by_ail_instance(ail_uuid) - ail_uuid = 'd82d3e61-2438-4ede-93bf-37b6fd9d7510' - res = get_client_id_by_ail_uuid(ail_uuid) + ail_uuid = 'c3c2f3ef-ca53-4ff6-8317-51169b73f731' + ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' + # res = ping_remote_ail_server(ail_uuid) + # print(res) + # + res = get_remote_ail_server_version(ail_uuid) + + #res = _get_remote_ail_server_response(ail_uuid, 'pin') print(res) diff --git a/bin/core/ail_2_ail_client.py b/bin/core/ail_2_ail_client.py index 678999a3..797c5872 100755 --- a/bin/core/ail_2_ail_client.py +++ b/bin/core/ail_2_ail_client.py @@ -6,6 +6,7 @@ import json import os import sys import time +from pubsublogger import publisher from urllib.parse import urljoin import asyncio @@ -19,6 +20,12 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from core import ail_2_ail +#### LOGS #### +redis_logger = publisher +redis_logger.port = 6380 +redis_logger.channel = 'AIL_SYNC_client' +##-- LOGS --## + #################################################################### class AIL2AILClient(object): @@ -29,18 +36,21 @@ class AIL2AILClient(object): self.ail_uuid = ail_uuid self.sync_mode = sync_mode - # # TODO: - self.ail_url = "wss://localhost:4443" - self.uri = f"{ail_url}/{sync_mode}/{ail_uuid}" #################################################################### +# # TODO: ADD TIMEOUT => 30s +async def api_request(websocket, ail_uuid): + res = await websocket.recv() + # API OUTPUT + sys.stdout.write(res) + # # TODO: ADD TIMEOUT async def pull(websocket, ail_uuid): while True: obj = await websocket.recv() - print(obj) + sys.stdout.write(res) async def push(websocket, ail_uuid): @@ -60,51 +70,100 @@ async def push(websocket, ail_uuid): await asyncio.sleep(10) -async def ail_to_ail_client(ail_uuid, sync_mode, ail_key=None): +async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): + if not ail_2_ail.exists_ail_instance(ail_uuid): + print('AIL server not found') + return + if not ail_key: ail_key = ail_2_ail.get_ail_instance_key(ail_uuid) - ail_url = "wss://localhost:4443" - uri = f"{ail_url}/{sync_mode}/{ail_uuid}" - print(uri) + # # TODO: raise exception + ail_url = ail_2_ail.get_ail_instance_url(ail_uuid) + local_ail_uuid = ail_2_ail.get_ail_uuid() - async with websockets.connect( - uri, - ssl=ssl_context, - extra_headers={"Authorization": f"{ail_key}"} - ) as websocket: + if sync_mode == 'api': + uri = f"{ail_url}/{sync_mode}/{api}/{local_ail_uuid}" + else: + uri = f"{ail_url}/{sync_mode}/{local_ail_uuid}" + #print(uri) - if sync_mode == 'pull': - await pull(websocket, ail_uuid) + ail_2_ail.clear_save_ail_server_error(ail_uuid) + + try: + async with websockets.connect( + uri, + ssl=ssl_context, + extra_headers={"Authorization": f"{ail_key}"} + ) as websocket: + + if sync_mode == 'pull': + await pull(websocket, ail_uuid) + + elif sync_mode == 'push': + await push(websocket, ail_uuid) + await websocket.close() + + elif sync_mode == 'api': + await api_request(websocket, ail_uuid) + await websocket.close() + except websockets.exceptions.InvalidStatusCode as e: + status_code = e.status_code + error_message = '' + # success + if status_code == 1000: + print('connection closed') + elif status_code == 400: + error_message = 'BAD_REQUEST: Invalid path' + elif status_code == 401: + error_message = 'UNAUTHORIZED: Invalid Key' + elif status_code == 403: + error_message = 'FORBIDDEN: SYNC mode disabled' + else: + error_message = str(e) + if error_message: + sys.stderr.write(error_message) + redis_logger.warning(f'{error_message}: {ail_uuid}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except websockets.exceptions.InvalidURI as e: + error_message = f'Invalid AIL url: {e.uri}' + sys.stderr.write(error_message) + redis_logger.warning(f'{error_message}: {ail_uuid}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except ConnectionError as e: + error_message = str(e) + sys.stderr.write(error_message) + redis_logger.info(f'{error_message}: {ail_uuid}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except websockets.exceptions.ConnectionClosedOK as e: + print('connection closed') + # except Exception as e: + # print(e) - elif sync_mode == 'push': - await push(websocket, ail_uuid) - await websocket.close() - elif sync_mode == 'api': - await websocket.close() -##########################################################3 -# # TODO:manual key -########################################################## if __name__ == '__main__': parser = argparse.ArgumentParser(description='Websocket SYNC Client') - parser.add_argument('-a', '--ail', help='AIL UUID', type=str, dest='ail_uuid', required=True, default=None) + parser.add_argument('-u', '--uuid', help='AIL UUID', type=str, dest='ail_uuid', required=True, default=None) parser.add_argument('-i', '--client_id', help='Client ID', type=str, dest='client_id', default=None) - parser.add_argument('-m', '--mode', help='SYNC Mode, pull or push', type=str, dest='sync_mode', default='pull') + parser.add_argument('-m', '--mode', help='SYNC Mode, pull, push or api', type=str, dest='sync_mode', default='pull') + parser.add_argument('-a', '--api', help='API, ping or version', type=str, dest='api', default=None) #parser.add_argument('-k', '--key', type=str, default='', help='AIL Key') args = parser.parse_args() ail_uuid = args.ail_uuid sync_mode = args.sync_mode + api = args.api - if ail_uuid is None or sync_mode not in ['pull', 'push']: + if ail_uuid is None or sync_mode not in ['api', 'pull', 'push']: parser.print_help() sys.exit(0) - #ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' - #sync_mode = 'pull' + if api: + if api not in ['ping', 'version']: + parser.print_help() + sys.exit(0) # SELF SIGNED CERTIFICATES ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) @@ -112,4 +171,4 @@ if __name__ == '__main__': ssl_context.verify_mode = ssl.CERT_NONE # SELF SIGNED CERTIFICATES - asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode)) + asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode, api)) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py index 5daf78ff..d4e98a1e 100755 --- a/bin/core/ail_2_ail_server.py +++ b/bin/core/ail_2_ail_server.py @@ -22,7 +22,7 @@ from core import ail_2_ail #### LOGS #### redis_logger = publisher redis_logger.port = 6380 -redis_logger.channel = 'AIL_SYNC' +redis_logger.channel = 'AIL_SYNC_Server' ############################# @@ -52,8 +52,13 @@ def unpack_path(path): path = path.split('/') if len(path) < 3: raise Exception('Invalid url path') + if not len(path[-1]): + path = path[:-1] + dict_path['sync_mode'] = path[1] - dict_path['ail_uuid'] = path[2] + dict_path['ail_uuid'] = path[-1] + dict_path['api'] = path[2:-1] + return dict_path # # # # # # # @@ -66,8 +71,12 @@ def unpack_path(path): async def register(websocket): + ail_uuid = websocket.ail_uuid + remote_address = websocket.remote_address + redis_logger.info(f'Client Connected: {ail_uuid} {remote_address}') + print(f'Client Connected: {ail_uuid} {remote_address}') CONNECTED_CLIENT.add(websocket) - print(CONNECTED_CLIENT) + #print(CONNECTED_CLIENT) async def unregister(websocket): CONNECTED_CLIENT.remove(websocket) @@ -108,6 +117,23 @@ async def push(websocket, ail_uuid): ail_2_ail.add_ail_stream_to_sync_importer(ail_stream) +# API: server API +# # TODO: ADD TIMEOUT ??? +async def api(websocket, ail_uuid, api): + api = api[0] + if api == 'ping': + message = {'message':'pong'} + message = json.dumps(message) + await websocket.send(message) + elif api == 'version': + sync_version = ail_2_ail.get_sync_server_version() + message = {'version': sync_version} + message = json.dumps(message) + await websocket.send(message) + + # END API + return + async def ail_to_ail_serv(websocket, path): # # TODO: save in class @@ -118,10 +144,9 @@ async def ail_to_ail_serv(websocket, path): # # TODO: check if it works # # DEBUG: - print(websocket.ail_key) - print(websocket.ail_uuid) - print(websocket.remote_address) - print(f'sync mode: {sync_mode}') + # print(websocket.ail_uuid) + # print(websocket.remote_address) + # print(f'sync mode: {sync_mode}') await register(websocket) try: @@ -135,7 +160,10 @@ async def ail_to_ail_serv(websocket, path): await push(websocket, websocket.ail_uuid) elif sync_mode == 'api': + await api(websocket, websocket.ail_uuid, path['api']) await websocket.close() + redis_logger.info(f'Connection closed: {ail_uuid} {remote_address}') + print(f'Connection closed: {ail_uuid} {remote_address}') finally: await unregister(websocket) @@ -151,11 +179,12 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): async def process_request(self, path, request_headers): - print(self.remote_address) - print(request_headers) + # DEBUG: + # print(self.remote_address) + # print(request_headers) + # API TOKEN api_key = request_headers.get('Authorization', '') - print(api_key) if api_key is None: redis_logger.warning(f'Missing token: {self.remote_address}') print(f'Missing token: {self.remote_address}') @@ -215,7 +244,7 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n" # # TODO: CHECK API - elif dict_path[sync_mode] == 'api': + elif dict_path['sync_mode'] == 'api': pass else: diff --git a/var/www/blueprints/ail_2_ail_sync.py b/var/www/blueprints/ail_2_ail_sync.py index b9d35a42..37643346 100644 --- a/var/www/blueprints/ail_2_ail_sync.py +++ b/var/www/blueprints/ail_2_ail_sync.py @@ -53,9 +53,11 @@ def create_json_response(data, status_code): @login_required @login_admin def ail_2_ail_dashboard(): + ail_uuid = ail_2_ail.get_ail_uuid() l_servers = ail_2_ail.get_all_running_sync_servers() l_servers = ail_2_ail.get_ail_instances_metadata(l_servers) - return render_template("ail_2_ail_dashboard.html", l_servers=l_servers) + return render_template("ail_2_ail_dashboard.html", ail_uuid=ail_uuid, + l_servers=l_servers) ###################### # # @@ -80,6 +82,28 @@ def ail_server_view(): return render_template("view_ail_server.html", server_metadata=server_metadata, bootstrap_label=bootstrap_label) +@ail_2_ail_sync.route('/settings/ail_2_ail/server/api/ping', methods=['GET']) +@login_required +@login_admin +def ail_server_api_ping(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_ping_remote_ail_server(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=ail_uuid)) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/api/version', methods=['GET']) +@login_required +@login_admin +def ail_server_api_version(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_get_remote_ail_server_version(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=ail_uuid)) + @ail_2_ail_sync.route('/settings/ail_2_ail/server/add', methods=['GET', 'POST']) @login_required @login_admin diff --git a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html index 0efc7f82..e5a4bfaf 100644 --- a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html +++ b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html @@ -30,6 +30,20 @@
+
+
+
+
AIL UUID:
+
+
+
+ {{ ail_uuid }} +
+
+
+
+ +

Connected Servers:

@@ -73,7 +87,7 @@