diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index 2ca79326..f3b3a71f 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -44,6 +44,8 @@ isredis=`screen -ls | egrep '[0-9]+.Redis_AIL' | cut -d. -f1` isardb=`screen -ls | egrep '[0-9]+.ARDB_AIL' | cut -d. -f1` islogged=`screen -ls | egrep '[0-9]+.Logging_AIL' | cut -d. -f1` isqueued=`screen -ls | egrep '[0-9]+.Queue_AIL' | cut -d. -f1` +is_ail_core=`screen -ls | egrep '[0-9]+.Core_AIL' | cut -d. -f1` +is_ail_2_ail=`screen -ls | egrep '[0-9]+.AIL_2_AIL' | cut -d. -f1` isscripted=`screen -ls | egrep '[0-9]+.Script_AIL' | cut -d. -f1` isflasked=`screen -ls | egrep '[0-9]+.Flask_AIL' | cut -d. -f1` iscrawler=`screen -ls | egrep '[0-9]+.Crawler_AIL' | cut -d. -f1` @@ -145,11 +147,25 @@ function launching_scripts { screen -dmS "Script_AIL" sleep 0.1 - echo -e $GREEN"\t* Launching scripts"$DEFAULT ################################## # CORE MODULES # ################################## + # screen -dmS "Core_AIL" + # sleep 0.1 + echo -e $GREEN"\t* Launching core scripts ..."$DEFAULT + + # TODO: MOOVE IMPORTER ???? => multiple scripts + + #### SYNC #### + screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x" + sleep 0.1 + screen -S "Script_AIL" -X screen -t "ail_2_ail_server" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./ail_2_ail_server.py; read x" + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Sync_manager" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_manager.py; read x" + sleep 0.1 + ##-- SYNC --## + screen -S "Script_AIL" -X screen -t "JSON_importer" bash -c "cd ${AIL_BIN}/import; ${ENV_PY} ./JSON_importer.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Crawler_manager" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Crawler_manager.py; read x" @@ -165,6 +181,10 @@ function launching_scripts { ################################## # MODULES # ################################## + # screen -dmS "Script_AIL" + # sleep 0.1 + echo -e $GREEN"\t* Launching scripts"$DEFAULT + screen -S "Script_AIL" -X screen -t "Global" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Global.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Categ" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Categ.py; read x" @@ -176,6 +196,9 @@ function launching_scripts { screen -S "Script_AIL" -X screen -t "SubmitPaste" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./submit_paste.py; read x" sleep 0.1 + screen -S "Script_AIL" -X screen -t "Sync_module" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_module.py; read x" + sleep 0.1 + screen -S "Script_AIL" -X screen -t "ApiKey" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./ApiKey.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Credential" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Credential.py; read x" @@ -366,7 +389,7 @@ function launch_queues { } function launch_scripts { - if [[ ! $isscripted ]]; then + if [[ ! $isscripted ]]; then ############################# is core sleep 1 if checking_ardb && checking_redis; then launching_scripts; @@ -414,19 +437,19 @@ function launch_feeder { } function killscript { - if [[ $islogged || $isqueued || $isscripted || $isflasked || $isfeeded || $iscrawler ]]; then + if [[ $islogged || $isqueued || $is_ail_core || $isscripted || $isflasked || $isfeeded || $iscrawler ]]; then echo -e $GREEN"Killing Script"$DEFAULT - kill $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler + kill $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $iscrawler sleep 0.2 echo -e $ROSE`screen -ls`$DEFAULT - echo -e $GREEN"\t* $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler killed."$DEFAULT + echo -e $GREEN"\t* $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $iscrawler killed."$DEFAULT else echo -e $RED"\t* No script to kill"$DEFAULT fi } function killall { - if [[ $isredis || $isardb || $islogged || $isqueued || $isscripted || $isflasked || $isfeeded || $iscrawler ]]; then + if [[ $isredis || $isardb || $islogged || $isqueued || $is_ail_2_ail || $isscripted || $isflasked || $isfeeded || $iscrawler || $is_ail_core ]]; then if [[ $isredis ]]; then echo -e $GREEN"Gracefully closing redis servers"$DEFAULT shutting_down_redis; @@ -437,10 +460,10 @@ function killall { shutting_down_ardb; fi echo -e $GREEN"Killing all"$DEFAULT - kill $isredis $isardb $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler + kill $isredis $isardb $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $iscrawler $is_ail_2_ail sleep 0.2 echo -e $ROSE`screen -ls`$DEFAULT - echo -e $GREEN"\t* $isredis $isardb $islogged $isqueued $isscripted $isflasked $isfeeded $iscrawler killed."$DEFAULT + echo -e $GREEN"\t* $isredis $isardb $islogged $isqueued $isscripted $is_ail_2_ail $isflasked $isfeeded $iscrawler $is_ail_core killed."$DEFAULT else echo -e $RED"\t* No screen to kill"$DEFAULT fi diff --git a/bin/core/Sync_importer.py b/bin/core/Sync_importer.py new file mode 100755 index 00000000..7ed7a8f0 --- /dev/null +++ b/bin/core/Sync_importer.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +""" +The SYNC Module +================================ + +This module . + +""" + +################################## +# Import External packages +################################## +import json +import os +import sys +import time + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail +from modules.abstract_module import AbstractModule +from packages.Item import Item +from packages import Tag + + +class Sync_importer(AbstractModule): + """ + Tags module for AIL framework + """ + + def __init__(self): + super(Sync_importer, self).__init__() + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 10 + + #self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict() + #self.last_refresh = time.time() + + # Send module state to logs + self.redis_logger.info(f'Module {self.module_name} Launched') + + + def run(self): + while self.proceed: + ### REFRESH DICT + # if self.last_refresh < ail_2_ail.get_last_updated_ail_instance(): + # self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict() + # self.last_refresh = time.time() + + ail_stream = ail_2_ail.get_sync_importer_ail_stream() + if ail_stream: + ail_stream = json.loads(ail_stream) + self.compute(ail_stream) + + else: + self.computeNone() + # Wait before next process + self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s") + time.sleep(self.pending_seconds) + + + def compute(self, ail_stream): + + # # TODO: SANITYZE AIL STREAM + # # TODO: CHECK FILTER + + # import Object + b64_gzip_content = ail_stream['payload']['raw'] + + # # TODO: create default id + item_id = ail_stream['meta']['ail:id'] + 'test' + + message = f'{item_id} {b64_gzip_content}' + print(message) + self.send_message_to_queue(message, 'Mixer') + + # # increase nb of paste by feeder name + # server_cache.hincrby("mixer_cache:list_feeder", Sync, 1) + + +if __name__ == '__main__': + + module = Sync_importer() + module.run() diff --git a/bin/core/Sync_manager.py b/bin/core/Sync_manager.py new file mode 100755 index 00000000..1ee97bcb --- /dev/null +++ b/bin/core/Sync_manager.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import sys +import time + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib')) +import ail_2_ail + +# # TODO: lauch me in core screen + +if __name__ == '__main__': + + Client_Manager = ail_2_ail.AIL2AILClientManager() + + while True: + command = Client_Manager.get_manager_command() + if command: + Client_Manager.execute_manager_command(command) + else: + time.sleep(5) diff --git a/bin/core/Sync_module.py b/bin/core/Sync_module.py new file mode 100755 index 00000000..144cf37f --- /dev/null +++ b/bin/core/Sync_module.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +""" +The SYNC Module +================================ + +This module . + +""" + +################################## +# Import External packages +################################## +import os +import sys +import time + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail +from modules.abstract_module import AbstractModule +from packages.Item import Item +from packages import Tag + + +class Sync_module(AbstractModule): + """ + Sync_module module for AIL framework + """ + + def __init__(self): + super(Sync_module, self).__init__() + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 10 + + self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict() + self.last_refresh = time.time() + + print(self.dict_sync_queues) + + # Send module state to logs + self.redis_logger.info(f'Module {self.module_name} Launched') + + + def compute(self, message): + + print(message) + + ### REFRESH DICT + if self.last_refresh < ail_2_ail.get_last_updated_ail_instance(): + self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict() + self.last_refresh = time.time() + + # Extract object from message + # # TODO: USE JSON DICT ???? + mess_split = message.split(';') + if len(mess_split) == 3: + obj_type = mess_split[0] + obj_subtype = mess_split[1] + obj_id = mess_split[2] + + # OBJECT => Item + if obj_type == 'item': + obj = Item(obj_id) + tags = obj.get_tags(r_set=True) + + # check filter + tags + for queue_uuid in self.dict_sync_queues: + filter_tags = self.dict_sync_queues[queue_uuid]['filter'] + print(tags) + print(filter_tags) + print(tags.issubset(filter_tags)) + if filter_tags and tags: + if tags.issubset(filter_tags): + obj_dict = obj.get_default_meta() + # send to queue push and/or pull + for dict_ail in self.dict_sync_queues[queue_uuid]['ail_instances']: + + ail_2_ail.add_object_to_sync_queue(queue_uuid, dict_ail['ail_uuid'], obj_dict, + push=dict_ail['push'], pull=dict_ail['pull']) + + else: + # Malformed message + raise Exception(f'too many values to unpack (expected 3) given {len(mess_split)} with message {message}') + + +if __name__ == '__main__': + + module = Sync_module() + module.run() diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py new file mode 100755 index 00000000..43f5c706 --- /dev/null +++ b/bin/core/ail_2_ail.py @@ -0,0 +1,492 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import json +import secrets +import sys +import time +import uuid + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'core/')) +import screen + +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/')) +from Item import Item + +config_loader = ConfigLoader.ConfigLoader() +r_cache = config_loader.get_redis_conn("Redis_Cache") +r_serv_db = config_loader.get_redis_conn("ARDB_DB") +r_serv_sync = config_loader.get_redis_conn("ARDB_DB") +config_loader = None + +def generate_uuid(): + return str(uuid.uuid4()).replace('-', '') + +def generate_sync_api_key(): + return secrets.token_urlsafe(42) + +def get_ail_uuid(): + return r_serv_db.get('ail:uuid') + +# # TODO: # TODO: # TODO: # TODO: # TODO: ADD SYNC MODE == PUSH +# # TODO: get connection status +# # TODO: get connection METADATA +############################# +# # +#### SYNC CLIENT MANAGER #### + +def get_all_sync_clients(r_set=False): + res = r_cache.smembers('ail_2_ail:all_sync_clients') + if r_set: + return set(res) + else: + return res + +def get_sync_client_ail_uuid(client_id): + return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid') + +def get_sync_client_queue_uuid(client_id): + return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'queue_uuid') + +def delete_sync_client_cache(client_id): + ail_uuid = get_sync_client_ail_uuid(client_id) + queue_uuid = get_sync_client_queue_uuid(client_id) + # map ail_uuid/queue_uuid + r_cache.srem(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) + r_cache.srem(f'ail_2_ail:queue_uuid:{queue_uuid}', client_id) + + r_cache.delete(f'ail_2_ail:sync_client:{client_id}') + r_cache.srem('ail_2_ail:all_sync_clients', client_id) + +def delete_all_sync_clients_cache(): + for client_id in get_all_sync_clients(): + delete_sync_client_cache(client_id) + r_cache.delete('ail_2_ail:all_sync_clients') + +# command: -launch +# -kill +# -relaunch +## TODO: check command +def send_command_to_manager(command, client_id=-1): + dict_action = {'command': command, 'client_id': client_id} + r_cache.sadd('ail_2_ail:client_manager:command', dict_action) + +class AIL2AILClientManager(object): + """AIL2AILClientManager.""" + + SCREEN_NAME = 'AIL_2_AIL' + SCRIPT_NAME = 'ail_2_ail_client.py' + SCRIPT_DIR = os.path.join(os.environ['AIL_BIN'], 'core') + + def __init__(self): + # dict client_id: AIL2AILCLIENT or websocket + self.clients = {} + # launch all sync clients + self.relaunch_all_sync_clients() + + def get_all_clients(self): + return self.clients + + # return new client id + def get_new_sync_client_id(self): + for new_id in range(100000): + if new_id not in self.clients: + return str(new_id) + + def get_sync_client_ail_uuid(self, client_id): + return self.clients[client_id]['ail_uuid'] + + def get_sync_client_queue_uuid(self, client_id): + return self.clients[client_id]['queue_uuid'] + + # # TODO: check PUSH ACL + def get_all_sync_clients_to_launch(self): + return get_all_ail_instance() + + def relaunch_all_sync_clients(self): + delete_all_sync_clients_cache() + self.clients = {} + for ail_uuid in self.get_all_sync_clients_to_launch(): + self.launch_sync_client(ail_uuid) + + def launch_sync_client(self, ail_uuid): + dir_project = os.environ['AIL_HOME'] + client_id = self.get_new_sync_client_id() + script_options = f'-a {ail_uuid} -m push -i {client_id}' + screen.create_screen(AIL2AILClientManager.SCREEN_NAME) + screen.launch_uniq_windows_script(AIL2AILClientManager.SCREEN_NAME, + client_id, dir_project, + AIL2AILClientManager.SCRIPT_DIR, + AIL2AILClientManager.SCRIPT_NAME, + script_options=script_options, kill_previous_windows=True) + # save sync client status + r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid', ail_uuid) + r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'launch_time', int(time.time())) + + # create map ail_uuid/queue_uuid + r_cache.sadd(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) + + self.clients[client_id] = {'ail_uuid': ail_uuid} + + # # TODO: FORCE KILL ???????????? + # # TODO: check if exists + def kill_sync_client(self, client_id): + if not kill_screen_window('AIL_2_AIL', client_id): + # # TODO: log kill error + pass + + delete_sync_client_cache(client_id) + self.clients.pop(client_id) + + ## COMMANDS ## + + def get_manager_command(self): + res = r_cache.spop('ail_2_ail:client_manager:command') + if res: + return json.dumps(res) + else: + return None + + def execute_manager_command(self, command_dict): + command = command_dict.get('command') + if command == 'launch': + ail_uuid = int(command_dict.get('ail_uuid')) + queue_uuid = int(command_dict.get('queue_uuid')) + self.launch_sync_client(ail_uuid, queue_uuid) + elif command == 'relaunch': + self.relaunch_all_sync_clients() + else: + # only one sync client + client_id = int(command_dict.get('client_id')) + if client_id < 1: + print('Invalid client id') + return None + if command == 'kill': + self.kill_sync_client(client_id) + elif command == 'relaunch': + ail_uuid = self.get_sync_client_ail_uuid(client_id) + queue_uuid = self.get_sync_client_queue_uuid(client_id) + self.kill_sync_client(client_id) + self.launch_sync_client(ail_uuid, queue_uuid) + +######################################## +######################################## +######################################## + +# # TODO: ADD METADATA +def get_sync_client_status(client_id): + dict_client = {'id': client_id} + dict_client['ail_uuid'] = get_sync_client_ail_uuid(client_id) + dict_client['queue_uuid'] = get_sync_client_queue_uuid(client_id) + return dict_client + +def get_all_sync_client_status(): + sync_clients = [] + all_sync_clients = r_cache.smembers('ail_2_ail:all_sync_clients') + for client_id in all_sync_clients: + sync_clients.append(get_sync_client_status(client_id)) + return sync_clients + +###################### +# # +#### AIL INSTANCE #### + +## AIL KEYS ## + +def get_all_ail_instance_keys(): + return r_serv_sync.smembers(f'ail:instance:key:all') + +def is_allowed_ail_instance_key(key): + return r_serv_sync.sismember(f'ail:instance:key:all', key) + +def get_ail_instance_key(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'api_key') + +def get_ail_instance_by_key(key): + return r_serv_sync.get(f'ail:instance:key:{key}') + +# def check_acl_sync_queue_ail(ail_uuid, queue_uuid, key): +# return is_ail_instance_queue(ail_uuid, queue_uuid) + +def update_ail_instance_key(ail_uuid, new_key): + old_key = get_ail_instance_key(ail_uuid) + r_serv_sync.srem(f'ail:instance:key:all', old_key) + r_serv_sync.delete(f'ail:instance:key:{old_key}') + + r_serv_sync.sadd(f'ail:instance:key:all', new_key) + r_serv_sync.delete(f'ail:instance:key:{new_key}', ail_uuid) + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'api_key', new_key) + +#- AIL KEYS -# + +def get_all_ail_instance(): + return r_serv_sync.smembers('ail:instance:all') + +def get_ail_instance_all_sync_queue(ail_uuid): + return r_serv_sync.smembers(f'ail:instance:sync_queue:{ail_uuid}') + +def is_ail_instance_queue(ail_uuid, queue_uuid): + return r_serv_sync.sismember(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + +def get_ail_instance_url(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'url') + +def get_ail_instance_description(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'description') + +def is_ail_instance_push_enabled(ail_uuid): + res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'push') + return res == 'True' + +def is_ail_instance_pull_enabled(ail_uuid): + res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'pull') + return res == 'True' + +def is_ail_instance_sync_enabled(ail_uuid, sync_mode=None): + if sync_mode is None: + return is_ail_instance_push_enabled(ail_uuid) or is_ail_instance_pull_enabled(ail_uuid) + elif sync_mode == 'pull': + return is_ail_instance_pull_enabled(ail_uuid) + elif sync_mode == 'push': + return is_ail_instance_push_enabled(ail_uuid) + else: + return False + +def change_pull_push_state(ail_uuid, pull=False, push=False): + # sanityze pull/push + if pull: + pull = True + else: + pull = False + if push: + push = True + else: + push = False + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'push', push) + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'pull', pull) + + +# # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) +# # TODO: push/pull +def get_ail_instance_metadata(ail_uuid): + dict_meta = {} + dict_meta['uuid'] = ail_uuid + dict_meta['url'] = get_ail_instance_url(ail_uuid) + dict_meta['description'] = get_ail_instance_description(ail_uuid) + + # # TODO: HIDE + dict_meta['api_key'] = get_ail_instance_key(ail_uuid) + + # # TODO: + # - set UUID sync_queue + + return dict_meta + +# # TODO: VALIDATE URL +# API KEY +def create_ail_instance(ail_uuid, url, api_key=None, description=None): + r_serv_sync.sadd('ail:instance:all', ail_uuid) + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'url', url) + ## API KEY ## + if not api_key: + api_key = generate_sync_api_key() + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'api_key', api_key) + r_serv_sync.sadd('ail:instance:key:all', api_key) + r_serv_sync.set(f'ail:instance:key:{api_key}', ail_uuid) + #- API KEY -# + if description: + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'description', description) + return ail_uuid + +def delete_ail_instance(ail_uuid): + for queue_uuid in get_ail_instance_all_sync_queue(ail_uuid): + unregister_ail_to_sync_queue(ail_uuid, queue_uuid) + r_serv_sync.delete(f'ail:instance:sync_queue:{ail_uuid}') + key = get_ail_instance_by_key(ail_uuid) + r_serv_sync.delete(f'ail:instance:{ail_uuid}') + r_serv_sync.srem('ail:instance:key:all', ail_uuid) + r_serv_sync.delete(f'ail:instance:key:{key}', ail_uuid) + r_serv_sync.srem('ail:instance:all', ail_uuid) + +def get_last_updated_ail_instance(): + epoch = r_serv_sync.get(f'ail:instance:queue:last_updated') + if not epoch: + epoch = 0 + return float(epoch) + +#################### +# # +#### SYNC QUEUE #### + +class Sync_Queue(object): # # TODO: use for edit + """Sync_Queue.""" + + def __init__(self, uuid): + self.uuid = uuid + +def get_all_sync_queue(): + return r_serv_sync.smembers('ail2ail:sync_queue:all') + +def get_sync_queue_all_ail_instance(queue_uuid): + return r_serv_sync.smembers(f'ail2ail:sync_queue:ail_instance:{queue_uuid}') + +# # TODO: check if push or pull enabled ? +def is_queue_used_by_ail_instace(queue_uuid): + return r_serv_sync.exists(f'ail2ail:sync_queue:ail_instance:{queue_uuid}') + +# # TODO: add others filter +def get_sync_queue_filter(queue_uuid): + return r_serv_sync.smembers(f'ail2ail:sync_queue:filter:tags:{queue_uuid}') + +# # TODO: ADD FILTER +def get_sync_queue_metadata(queue_uuid): + dict_meta = {} + dict_meta['uuid'] = queue_uuid + dict_meta['name'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'name') + dict_meta['description'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'description') + dict_meta['max_size'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'max_size') + + # # TODO: TO ADD: + # - set uuid instance + + return dict_meta + +##################################################### +def get_all_sync_queue_dict(): + dict_sync_queues = {} + for queue_uuid in get_all_sync_queue(): + if is_queue_used_by_ail_instace(queue_uuid): + dict_queue = {} + dict_queue['filter'] = get_sync_queue_filter(queue_uuid) + + dict_queue['ail_instances'] = [] ############ USE DICT ????????? + for ail_uuid in get_sync_queue_all_ail_instance(queue_uuid): + dict_ail = {'ail_uuid': ail_uuid, + 'pull': is_ail_instance_pull_enabled(ail_uuid), + 'push': is_ail_instance_push_enabled(ail_uuid)} + if dict_ail['pull'] or dict_ail['push']: + dict_queue['ail_instances'].append(dict_ail) + if dict_queue['ail_instances']: + dict_sync_queues[queue_uuid] = dict_queue + return dict_sync_queues + + +def register_ail_to_sync_queue(ail_uuid, queue_uuid): + r_serv_sync.sadd(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid) + r_serv_sync.sadd(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + +def unregister_ail_to_sync_queue(ail_uuid, queue_uuid): + r_serv_sync.srem(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid) + r_serv_sync.srem(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid) + +# # TODO: optionnal name ??? +# # TODO: SANITYZE TAGS +def create_sync_queue(name, tags=[], description=None, max_size=100): + queue_uuid = generate_uuid() + r_serv_sync.sadd('ail2ail:sync_queue:all', queue_uuid) + + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'name', name) + if description: + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'description', description) + r_serv_sync.hset(f'ail2ail:sync_queue:{queue_uuid}', 'max_size', max_size) + + for tag in tags: + r_serv_sync.sadd(f'ail2ail:sync_queue:filter:tags:{queue_uuid}', tag) + + return queue_uuid + +def delete_sync_queue(queue_uuid): + for ail_uuid in get_sync_queue_all_ail_instance(queue_uuid): + unregister_ail_to_sync_queue(ail_uuid, queue_uuid) + r_serv_sync.delete(f'ail2ail:sync_queue:{queue_uuid}') + r_serv_sync.srem('ail2ail:sync_queue:all', queue_uuid) + return queue_uuid + +############################# +# # +#### SYNC REDIS QUEUE ####### + +def get_sync_queue_object(ail_uuid, push=True): + for queue_uuid in get_ail_instance_all_sync_queue(ail_uuid): + obj_dict = get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=push) + if obj_dict: + return obj_dict + return None + +def get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=True): + if push: + sync_mode = 'push' + else: + sync_mode = 'pull' + obj_dict = r_serv_sync.lpop(f'sync:queue:{sync_mode}:{queue_uuid}:{ail_uuid}') + if obj_dict: + obj_dict = json.loads(obj_dict) + # # REVIEW: # TODO: create by obj type + return Item(obj_dict['id']) + +def add_object_to_sync_queue(queue_uuid, ail_uuid, obj_dict, push=True, pull=True): + obj = json.dumps(obj_dict) + + # # TODO: # FIXME: USE CACHE ?????? + if push: + r_serv_sync.lpush(f'sync:queue:push:{queue_uuid}:{ail_uuid}', obj) + r_serv_sync.ltrim(f'sync:queue:push:{queue_uuid}:{ail_uuid}', 0, 200) + + if pull: + r_serv_sync.lpush(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', obj) + r_serv_sync.ltrim(f'sync:queue:pull:{queue_uuid}:{ail_uuid}', 0, 200) + +# # TODO: # REVIEW: USE CACHE ????? USE QUEUE FACTORY ????? +def get_sync_importer_ail_stream(): + return r_serv_sync.spop('sync:queue:importer') + +def add_ail_stream_to_sync_importer(ail_stream): + ail_stream = json.dumps(ail_stream) + r_serv_sync.sadd('sync:queue:importer', ail_stream) + +############################# +# # +#### AIL EXCHANGE FORMAT #### + +def create_ail_stream(Object): + ail_stream = {'format': 'ail', + 'version': 1, + 'type': Object.get_type()} + + # OBJECT META + ail_stream['meta'] = {'ail_mime-type': 'text/plain'} + ail_stream['meta']['ail:id'] = Object.get_id() + ail_stream['meta']['ail:tags'] = Object.get_tags() + # GLOBAL PAYLOAD + ail_stream['meta']['ail:uuid'] = get_ail_uuid() + + # OBJECT PAYLOAD + ail_stream['payload'] = Object.get_ail_2_ail_payload() + + return ail_stream + +if __name__ == '__main__': + + ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' + url = "wss://localhost:4443" + api_key = 'secret' + #description = 'first test instance' + queue_uuid = '79bcafc0a6d644deb2c75fb5a83d7caa' + tags = ['infoleak:submission="manual"'] + name = 'submitted queue' + description = 'first test queue, all submitted items' + #queue_uuid = '' + + #res = create_ail_instance(ail_uuid, url, api_key=api_key, description=description) + + #res = create_sync_queue(name, tags=tags, description=description, max_size=100) + #res = delete_sync_queue(queue_uuid) + + #res = register_ail_to_sync_queue(ail_uuid, queue_uuid) + res = change_pull_push_state(ail_uuid, push=True, pull=True) + + print(res) diff --git a/bin/core/ail_2_ail_client.py b/bin/core/ail_2_ail_client.py new file mode 100755 index 00000000..678999a3 --- /dev/null +++ b/bin/core/ail_2_ail_client.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import argparse +import json +import os +import sys +import time +from urllib.parse import urljoin + +import asyncio +import http +import ssl +import websockets + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail + +#################################################################### + +class AIL2AILClient(object): + """AIL2AILClient.""" + + def __init__(self, client_id, ail_uuid, sync_mode): + self.client_id + self.ail_uuid = ail_uuid + self.sync_mode = sync_mode + + # # TODO: + self.ail_url = "wss://localhost:4443" + + self.uri = f"{ail_url}/{sync_mode}/{ail_uuid}" + +#################################################################### + +# # TODO: ADD TIMEOUT +async def pull(websocket, ail_uuid): + while True: + obj = await websocket.recv() + print(obj) + +async def push(websocket, ail_uuid): + + while True: + # get elem to send + Obj = ail_2_ail.get_sync_queue_object(ail_uuid) + if Obj: + obj_ail_stream = ail_2_ail.create_ail_stream(Obj) + obj_ail_stream = json.dumps(obj_ail_stream) + print(obj_ail_stream) + + # send objects + await websocket.send(obj_ail_stream) + # DEBUG: + await asyncio.sleep(0.1) + else: + await asyncio.sleep(10) + + +async def ail_to_ail_client(ail_uuid, sync_mode, ail_key=None): + if not ail_key: + ail_key = ail_2_ail.get_ail_instance_key(ail_uuid) + ail_url = "wss://localhost:4443" + + uri = f"{ail_url}/{sync_mode}/{ail_uuid}" + print(uri) + + async with websockets.connect( + uri, + ssl=ssl_context, + extra_headers={"Authorization": f"{ail_key}"} + ) as websocket: + + if sync_mode == 'pull': + await pull(websocket, ail_uuid) + + elif sync_mode == 'push': + await push(websocket, ail_uuid) + await websocket.close() + + elif sync_mode == 'api': + await websocket.close() + +##########################################################3 +# # TODO:manual key +########################################################## +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Websocket SYNC Client') + parser.add_argument('-a', '--ail', help='AIL UUID', type=str, dest='ail_uuid', required=True, default=None) + parser.add_argument('-i', '--client_id', help='Client ID', type=str, dest='client_id', default=None) + parser.add_argument('-m', '--mode', help='SYNC Mode, pull or push', type=str, dest='sync_mode', default='pull') + #parser.add_argument('-k', '--key', type=str, default='', help='AIL Key') + args = parser.parse_args() + + ail_uuid = args.ail_uuid + sync_mode = args.sync_mode + + if ail_uuid is None or sync_mode not in ['pull', 'push']: + parser.print_help() + sys.exit(0) + + #ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' + #sync_mode = 'pull' + + # SELF SIGNED CERTIFICATES + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + # SELF SIGNED CERTIFICATES + + asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode)) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py new file mode 100755 index 00000000..d4a6646d --- /dev/null +++ b/bin/core/ail_2_ail_server.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import json +import os +import sys +import uuid + +import asyncio +import http +import ssl +import websockets + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from core import ail_2_ail + + +obj_test = { + "format": "ail", + "version": 1, + "type": "item", + "meta": { + "ail:uuid": "03c51929-eeab-4d47-9dc0-c667f94c7d2c", + "ail:uuid_org": "28bc3db3-16da-461c-b20b-b944f4058708", + }, + "payload": { + "raw" : "MjhiYzNkYjMtMTZkYS00NjFjLWIyMGItYjk0NGY0MDU4NzA4Cg==", + "compress": "gzip", + "encoding": "base64" + } +} + +############################# + +CONNECTED_CLIENT = set() +# # TODO: Store in redis + +############################# + +# # # # # # # +# # +# UTILS # +# # +# # # # # # # + +def is_valid_uuid_v4(UUID): + if not UUID: + return False + UUID = UUID.replace('-', '') + try: + uuid_test = uuid.UUID(hex=UUID, version=4) + return uuid_test.hex == UUID + except: + return False + +def unpack_path(path): + dict_path = {} + path = path.split('/') + if len(path) < 3: + raise Exception('Invalid url path') + dict_path['sync_mode'] = path[1] + dict_path['ail_uuid'] = path[2] + return dict_path + +# # # # # # # + + +# async def send_object(): +# if CONNECTED_CLIENT: +# message = 'new json object {"id": "test01"}' +# await asyncio.wait([user.send(message) for user in USERS]) + + +async def register(websocket): + CONNECTED_CLIENT.add(websocket) + print(CONNECTED_CLIENT) + +async def unregister(websocket): + CONNECTED_CLIENT.remove(websocket) + +# PULL: Send data to client +# # TODO: ADD TIMEOUT ??? +async def pull(websocket, ail_uuid): + + for queue_uuid in ail_2_ail.get_ail_instance_all_sync_queue(ail_uuid): + while True: + # get elem to send + Obj = ail_2_ail.get_sync_queue_object_by_queue_uuid(queue_uuid, ail_uuid, push=False) + if Obj: + obj_ail_stream = ail_2_ail.create_ail_stream(Obj) + Obj = json.dumps(obj_ail_stream) + print(Obj) + + # send objects + await websocket.send(Obj) + # END PULL + else: + break + + # END PULL + return None + + +# PUSH: receive data from client +# # TODO: optional queue_uuid +async def push(websocket, ail_uuid): + print(ail_uuid) + while True: + ail_stream = await websocket.recv() + + # # TODO: CHECK ail_stream + ail_stream = json.loads(ail_stream) + print(ail_stream) + + ail_2_ail.add_ail_stream_to_sync_importer(ail_stream) + +async def ail_to_ail_serv(websocket, path): + + + # # TODO: check if it works + # # DEBUG: + print(websocket.ail_key) + print(websocket.ail_uuid) + + print(websocket.remote_address) + path = unpack_path(path) + sync_mode = path['sync_mode'] + print(f'sync mode: {sync_mode}') + + await register(websocket) + try: + if sync_mode == 'pull': + await pull(websocket, websocket.ail_uuid) + await websocket.close() + print('closed') + + elif sync_mode == 'push': + await push(websocket, websocket.ail_uuid) + + elif sync_mode == 'api': + await websocket.close() + + finally: + await unregister(websocket) + + +########################################### +# CHECK Authorization HEADER and URL PATH # + +# # TODO: check AIL UUID (optional header) + +class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): + """AIL_2_AIL_Protocol websockets server.""" + + async def process_request(self, path, request_headers): + + print(self.remote_address) + print(request_headers) + # API TOKEN + api_key = request_headers.get('Authorization', '') + print(api_key) + if api_key is None: + print('Missing token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n" + + if not ail_2_ail.is_allowed_ail_instance_key(api_key): + print('Invalid token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" + + # PATH + try: + dict_path = unpack_path(path) + except Exception as e: + print('Invalid path') + return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" + + + ail_uuid = ail_2_ail.get_ail_instance_by_key(api_key) + if ail_uuid != dict_path['ail_uuid']: + print('Invalid token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" + + + if not api_key != ail_2_ail.get_ail_instance_key(api_key): + print('Invalid token') + return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n" + + self.ail_key = api_key + self.ail_uuid = ail_uuid + + if dict_path['sync_mode'] == 'pull' or dict_path['sync_mode'] == 'push': + + # QUEUE UUID + # if dict_path['queue_uuid']: + # + # if not is_valid_uuid_v4(dict_path['queue_uuid']): + # print('Invalid UUID') + # return http.HTTPStatus.BAD_REQUEST, [], b"Invalid UUID\n" + # + # self.queue_uuid = dict_path['queue_uuid'] + # else: + # self.queue_uuid = None + # + # if not ail_2_ail.is_ail_instance_queue(ail_uuid, dict_path['queue_uuid']): + # print('UUID not found') + # return http.HTTPStatus.FORBIDDEN, [], b"UUID not found\n" + + # SYNC MODE + if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=dict_path['sync_mode']): + print('SYNC mode disabled') + return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n" + + # # TODO: CHECK API + elif dict_path[sync_mode] == 'api': + pass + + else: + print('Invalid path') + return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" + + +########################################### + +# # TODO: logging +# # TODO: clean shutdown / kill all connections +# # TODO: API +# # TODO: Filter object +# # TODO: process_request check +# # TODO: IP/uuid to block + +if __name__ == '__main__': + + host = 'localhost' + port = 4443 + + print('Launching Server...') + + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + cert_dir = os.environ['AIL_FLASK'] + ssl_context.load_cert_chain(certfile=os.path.join(cert_dir, 'server.crt'), keyfile=os.path.join(cert_dir, 'server.key')) + + start_server = websockets.serve(ail_to_ail_serv, "localhost", 4443, ssl=ssl_context, create_protocol=AIL_2_AIL_Protocol) + + print(f'Server Launched: wss://{host}:{port}') + + asyncio.get_event_loop().run_until_complete(start_server) + asyncio.get_event_loop().run_forever() diff --git a/bin/lib/ail_objects.py b/bin/lib/ail_objects.py index 565661dc..97ec275d 100755 --- a/bin/lib/ail_objects.py +++ b/bin/lib/ail_objects.py @@ -9,9 +9,16 @@ import redis from abc import ABC from flask import url_for +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/')) +import Tag + sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader +config_loader = ConfigLoader.ConfigLoader() +r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata") +config_loader = None + class AbstractObject(ABC): """ Abstract Object @@ -22,7 +29,7 @@ class AbstractObject(ABC): # - handle + refactor coorelations # - creates others objects - def __init__(self, obj_type, id): + def __init__(self, obj_type, id, subtype=None): """ Abstract for all the AIL object :param obj_type: object type (item, ...) @@ -30,17 +37,43 @@ class AbstractObject(ABC): """ self.id = id self.type = obj_type - - def get_type(self): - return self.type + self.subtype = None def get_id(self): return self.id + def get_type(self): + return self.type + + def get_subtype(self, r_str=False): + if not self.subtype: + if r_str: + return '' + return self.subtype + + def get_default_meta(self): + dict_meta = {'id': self.get_id(), + 'type': self.get_type()} + if self.subtype: + dict_meta['subtype'] = self.subtype + return dict_meta + + def get_tags(self, r_set=False): + tags = Tag.get_obj_tag(self.id) + if r_set: + tags = set(tags) + return tags + + ## ADD TAGS ???? + #def add_tags(self): + + def _delete(self): + # DELETE TAGS + Tag.delete_obj_all_tags(self.id, self.type) + if self.type == 'item': + # delete tracker + pass -config_loader = ConfigLoader.ConfigLoader() -r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata") -config_loader = None def is_valid_object_type(object_type): if object_type in ['domain', 'item', 'image', 'decoded']: diff --git a/bin/modules/Categ.py b/bin/modules/Categ.py index 36e749a6..a071fb2f 100755 --- a/bin/modules/Categ.py +++ b/bin/modules/Categ.py @@ -102,6 +102,13 @@ class Categ(AbstractModule): if r_result: return categ_found + # DIRTY FIX AIL SYNC + # # FIXME: DIRTY FIX + message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' + print(message) + self.send_message_to_queue(message, 'SyncModule') + + if __name__ == '__main__': # SCRIPT PARSER # diff --git a/bin/modules/Tags.py b/bin/modules/Tags.py index 2a42bfca..2ed37b74 100755 --- a/bin/modules/Tags.py +++ b/bin/modules/Tags.py @@ -45,14 +45,17 @@ class Tags(AbstractModule): if len(mess_split) == 2: tag = mess_split[0] item = Item(mess_split[1]) - item_id = item.get_id() # Create a new tag Tag.add_tag('item', tag, item.get_id()) - print(f'{item_id}: Tagged {tag}') + print(f'{item.get_id()}: Tagged {tag}') # Forward message to channel self.send_message_to_queue(message, 'MISP_The_Hive_feeder') + + message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' + self.send_message_to_queue(message, 'Sync_module') + else: # Malformed message raise Exception(f'too many values to unpack (expected 2) given {len(mess_split)} with message {message}') diff --git a/bin/modules/abstract_module.py b/bin/modules/abstract_module.py index de38faa2..632f8e43 100644 --- a/bin/modules/abstract_module.py +++ b/bin/modules/abstract_module.py @@ -72,6 +72,7 @@ class AbstractModule(ABC): ex: send_to_queue(item_id, 'Global') """ self.process.populate_set_out(message, queue_name) + # add to new set_module def run(self): """ @@ -98,6 +99,8 @@ class AbstractModule(ABC): print('TRACEBACK:') for line in trace: print(line) + # remove from set_module + ## check if item process == completed else: self.computeNone() diff --git a/bin/packages/Item.py b/bin/packages/Item.py index 6ca7ba43..6f0a2859 100755 --- a/bin/packages/Item.py +++ b/bin/packages/Item.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* +import base64 import os import re import sys @@ -502,7 +503,6 @@ def delete_item(obj_id): if not exist_item(obj_id): return False else: - Tag.delete_obj_tags(obj_id, 'item', Tag.get_obj_tag(obj_id)) delete_item_duplicate(obj_id) # delete MISP event r_serv_metadata.delete('misp_events:{}'.format(obj_id)) @@ -532,6 +532,8 @@ def delete_item(obj_id): ### TODO in inport V2 # delete from tracked items + + # # # TODO: # FIXME: LATER # delete from queue ### return False @@ -594,6 +596,18 @@ class Item(AbstractObject): """ return item_basic.get_item_content(self.id) + def get_gzip_content(self, b64=False): + with open(self.get_filename(), 'rb') as f: + content = f.read() + if b64: + content = base64.b64encode(content) + return content.decode() + + def get_ail_2_ail_payload(self): + payload = {'raw': self.get_gzip_content(b64=True), + 'compress': 'gzip'} + return payload + # # TODO: def create(self): pass @@ -607,6 +621,22 @@ class Item(AbstractObject): except FileNotFoundError: return False + ############################################################################ + ############################################################################ + ############################################################################ + + def exist_correlation(self): + pass + + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + ############################################################################ + #if __name__ == '__main__': diff --git a/bin/packages/Tag.py b/bin/packages/Tag.py index ad521d0a..266d6a68 100755 --- a/bin/packages/Tag.py +++ b/bin/packages/Tag.py @@ -421,7 +421,7 @@ def add_tag(object_type, tag, object_id, obj_date=None): r_serv_tags.hincrby('daily_tags:{}'.format(datetime.date.today().strftime("%Y%m%d")), tag, 1) def delete_obj_tag(object_type, object_id, tag, obj_date): - if object_type=="item": # # TODO: # FIXME: # REVIEW: rename me !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + if object_type=="item": # # TODO: # FIXME: # REVIEW: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! obj_date = get_obj_date(object_type, object_id) r_serv_metadata.srem('tag:{}'.format(object_id), tag) r_serv_tags.srem('{}:{}'.format(tag, obj_date), object_id) @@ -455,7 +455,7 @@ def api_delete_obj_tags(tags=[], object_id=None, object_type="item"): if not tags: return ({'status': 'error', 'reason': 'No Tag(s) specified'}, 400) - res = delete_obj_tags(object_id, object_type, tags=tags) + res = delete_obj_tags(object_id, object_type, tags) if res: return res @@ -464,13 +464,16 @@ def api_delete_obj_tags(tags=[], object_id=None, object_type="item"): dict_res['id'] = object_id return (dict_res, 200) -def delete_obj_tags(object_id, object_type, tags=[]): +def delete_obj_tags(object_id, object_type, tags): obj_date = get_obj_date(object_type, object_id) for tag in tags: res = delete_tag(object_type, tag, object_id, obj_date=obj_date) if res: return res +def delete_obj_all_tags(obj_id, obj_type): + delete_obj_tags(obj_id, obj_type, get_obj_tag(obj_id)) + def sanitise_tags_date_range(l_tags, date_from=None, date_to=None): if date_from is None or date_to is None: date_from = get_tags_min_last_seen(l_tags, r_int=False) diff --git a/bin/packages/User.py b/bin/packages/User.py index ef6eba74..fe5197c7 100755 --- a/bin/packages/User.py +++ b/bin/packages/User.py @@ -11,6 +11,9 @@ import ConfigLoader from flask_login import UserMixin +def get_all_users(): + return r_serv_db.hkeys('user:all') + class User(UserMixin): def __init__(self, id): diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index f1ec8ef7..4cc5ba6a 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -2,6 +2,9 @@ subscribe = ZMQ_Global publish = Redis_Mixer,Redis_preProcess1 +[Sync_importer] +publish = Redis_Mixer,Redis_Tags + [Importer_Json] publish = Redis_Mixer,Redis_Tags @@ -55,7 +58,7 @@ subscribe = Redis_Global [Categ] subscribe = Redis_Global -publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Urls,Redis_Credential,Redis_SourceCode,Redis_Cve,Redis_ApiKey +publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Urls,Redis_Credential,Redis_SourceCode,Redis_Cve,Redis_ApiKey,Redis_SyncModule [CreditCards] subscribe = Redis_CreditCards @@ -96,7 +99,11 @@ subscribe = Redis_ModuleStats [Tags] subscribe = Redis_Tags -publish = Redis_Tags_feed +publish = Redis_Tags_feed,Redis_SyncModule + +# dirty fix +[Sync_module] +subscribe = Redis_SyncModule [MISP_The_hive_feeder] subscribe = Redis_Tags_feed