diff --git a/.gitignore b/.gitignore index fe57bf5c..716f2c33 100644 --- a/.gitignore +++ b/.gitignore @@ -68,4 +68,6 @@ doc/all_modules.txt # auto generated doc/module-data-flow.png doc/data-flow.png +doc/ail_queues.dot +doc/ail_queues.svg doc/statistics diff --git a/bin/AIL_Init.py b/bin/AIL_Init.py new file mode 100755 index 00000000..6c3946d5 --- /dev/null +++ b/bin/AIL_Init.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* +""" +AIL Init +============================ + +Init DB + Clear Stats + +""" + +import os +import sys + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from lib import ail_queues + +if __name__ == "__main__": + ail_queues.save_queue_digraph() + ail_queues.clear_modules_queues_stats() diff --git a/bin/Helper.py b/bin/Helper.py deleted file mode 100755 index 65a260d1..00000000 --- a/bin/Helper.py +++ /dev/null @@ -1,233 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* -""" -Queue helper module -============================ - -This module subscribe to a Publisher stream and put the received messages -into a Redis-list waiting to be popped later by others scripts. - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them. - -""" -import redis -import configparser -import os -import zmq -import time -import datetime -import json - - -class PubSub(object): ## TODO: remove config, use ConfigLoader by default - - def __init__(self): - configfile = os.path.join(os.environ['AIL_HOME'], 'configs/core.cfg') - if not os.path.exists(configfile): - raise Exception('Unable to find the configuration file. \ - Did you set environment variables? \ - Or activate the virtualenv.') - self.config = configparser.ConfigParser() - self.config.read(configfile) - self.redis_sub = False - self.zmq_sub = False - self.subscribers = None - self.publishers = {'Redis': [], 'ZMQ': []} - - def setup_subscribe(self, conn_name): - if self.config.has_section(conn_name): - channel = self.config.get(conn_name, 'channel') - else: - channel = conn_name.split('_')[1] - if conn_name.startswith('Redis'): - self.redis_sub = True - r = redis.StrictRedis( - host=self.config.get('RedisPubSub', 'host'), - port=self.config.get('RedisPubSub', 'port'), - db=self.config.get('RedisPubSub', 'db'), - decode_responses=True) - self.subscribers = r.pubsub(ignore_subscribe_messages=True) - self.subscribers.psubscribe(channel) - elif conn_name.startswith('ZMQ'): - self.zmq_sub = True - context = zmq.Context() - - # Get all feeds - self.subscribers = [] - addresses = self.config.get(conn_name, 'address') - for address in addresses.split(','): - subscriber = context.socket(zmq.SUB) - subscriber.connect(address) - subscriber.setsockopt_string(zmq.SUBSCRIBE, channel) - self.subscribers.append(subscriber) - - def setup_publish(self, conn_name): - if self.config.has_section(conn_name): - channel = self.config.get(conn_name, 'channel') - else: - channel = conn_name.split('_')[1] - if conn_name.startswith('Redis'): - r = redis.StrictRedis(host=self.config.get('RedisPubSub', 'host'), - port=self.config.get('RedisPubSub', 'port'), - db=self.config.get('RedisPubSub', 'db'), - decode_responses=True) - self.publishers['Redis'].append((r, channel)) - elif conn_name.startswith('ZMQ'): - context = zmq.Context() - p = context.socket(zmq.PUB) - p.bind(self.config.get(conn_name, 'address')) - self.publishers['ZMQ'].append((p, channel)) - - def publish(self, message): - m = json.loads(message) - channel_message = m.get('channel') - for p, channel in self.publishers['Redis']: - if channel_message is None or channel_message == channel: - p.publish(channel, ( m['message']) ) - for p, channel in self.publishers['ZMQ']: - if channel_message is None or channel_message == channel: - p.send('{} {}'.format(channel, m['message'])) - #p.send(b' '.join( [channel, mess] ) ) - - - def subscribe(self): - if self.redis_sub: - for msg in self.subscribers.listen(): - if msg.get('data', None) is not None: - yield msg['data'] - elif self.zmq_sub: - # Initialize poll set - poller = zmq.Poller() - for subscriber in self.subscribers: - poller.register(subscriber, zmq.POLLIN) - - while True: - socks = dict(poller.poll()) - - for subscriber in self.subscribers: - if subscriber in socks: - message = subscriber.recv() - yield message.split(b' ', 1)[1] - else: - raise Exception('No subscribe function defined') - - -class Process(object): - - def __init__(self, conf_section, module=True): - configfile = os.path.join(os.environ['AIL_HOME'], 'configs/core.cfg') - if not os.path.exists(configfile): - raise Exception('Unable to find the configuration file. \ - Did you set environment variables? \ - Or activate the virtualenv.') - modulesfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') - self.config = configparser.ConfigParser() - self.config.read(configfile) - self.modules = configparser.ConfigParser() - self.modules.read(modulesfile) - self.subscriber_name = conf_section - - self.pubsub = None - if module: - if self.modules.has_section(conf_section): - self.pubsub = PubSub() - else: - raise Exception('Your process has to listen to at least one feed.') - self.r_temp = redis.StrictRedis( - host=self.config.get('RedisPubSub', 'host'), - port=self.config.get('RedisPubSub', 'port'), - db=self.config.get('RedisPubSub', 'db'), - decode_responses=True) - - self.moduleNum = os.getpid() - - def populate_set_in(self): - # monoproc - try: - src = self.modules.get(self.subscriber_name, 'subscribe') - except configparser.NoOptionError: #NoSectionError - src = None - if src != 'Redis' and src: - self.pubsub.setup_subscribe(src) - for msg in self.pubsub.subscribe(): - in_set = self.subscriber_name + 'in' - self.r_temp.sadd(in_set, msg) - self.r_temp.hset('queues', self.subscriber_name, - int(self.r_temp.scard(in_set))) - else: - print('{} has no subscriber'.format(self.subscriber_name)) - - def get_from_set(self): - # multiproc - in_set = self.subscriber_name + 'in' - self.r_temp.hset('queues', self.subscriber_name, - int(self.r_temp.scard(in_set))) - message = self.r_temp.spop(in_set) - - timestamp = int(time.mktime(datetime.datetime.now().timetuple())) - dir_name = os.environ['AIL_HOME']+self.config.get('Directories', 'pastes') - - if message is None: - return None - - else: - try: - if '.gz' in message: - path = message.split(".")[-2].split("/")[-1] - # find start of path with AIL_HOME - index_s = message.find(os.environ['AIL_HOME']) - # Stop when .gz - index_e = message.find(".gz")+3 - if(index_s == -1): - complete_path = message[0:index_e] - else: - complete_path = message[index_s:index_e] - - else: - path = "-" - complete_path = "?" - - value = str(timestamp) + ", " + path - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path) - self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) - - curr_date = datetime.date.today() - return message - - except: - print('except') - path = "?" - value = str(timestamp) + ", " + path - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", "?") - self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) - return message - - def populate_set_out(self, msg, channel=None): - # multiproc - msg = {'message': msg} - if channel is not None: - msg.update({'channel': channel}) - - # bytes64 encode bytes to ascii only bytes - j = json.dumps(msg) - self.r_temp.sadd(self.subscriber_name + 'out', j) - - def publish(self): - # monoproc - if not self.modules.has_option(self.subscriber_name, 'publish'): - return False - dest = self.modules.get(self.subscriber_name, 'publish') - # We can have multiple publisher - for name in dest.split(','): - self.pubsub.setup_publish(name) - while True: - message = self.r_temp.spop(self.subscriber_name + 'out') - - if message is None: - time.sleep(1) - continue - self.pubsub.publish(message) - diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index 6d1dd80a..52a211a4 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -35,7 +35,6 @@ isredis=`screen -ls | egrep '[0-9]+.Redis_AIL' | cut -d. -f1` isardb=`screen -ls | egrep '[0-9]+.ARDB_AIL' | cut -d. -f1` iskvrocks=`screen -ls | egrep '[0-9]+.KVROCKS_AIL' | cut -d. -f1` islogged=`screen -ls | egrep '[0-9]+.Logging_AIL' | cut -d. -f1` -isqueued=`screen -ls | egrep '[0-9]+.Queue_AIL' | cut -d. -f1` is_ail_core=`screen -ls | egrep '[0-9]+.Core_AIL' | cut -d. -f1` is_ail_2_ail=`screen -ls | egrep '[0-9]+.AIL_2_AIL' | cut -d. -f1` isscripted=`screen -ls | egrep '[0-9]+.Script_AIL' | cut -d. -f1` @@ -152,14 +151,6 @@ function launching_logs { screen -S "Logging_AIL" -X screen -t "LogCrawler" bash -c "cd ${AIL_BIN}; ${AIL_VENV}/bin/log_subscriber -p 6380 -c Crawler -l ../logs/ ${syslog_cmd}; read x" } -function launching_queues { - screen -dmS "Queue_AIL" - sleep 0.1 - - echo -e $GREEN"\t* Launching all the queues"$DEFAULT - screen -S "Queue_AIL" -X screen -t "Queues" bash -c "cd ${AIL_BIN}; ${ENV_PY} launch_queues.py; read x" -} - function checking_configuration { bin_dir=${AIL_HOME}/bin echo -e "\t* Checking configuration" @@ -185,13 +176,17 @@ function launching_scripts { # sleep 0.1 echo -e $GREEN"\t* Launching core scripts ..."$DEFAULT - # TODO: IMPORTER SCREEN ???? + # Clear Queue Stats + pushd ${AIL_BIN} + ${ENV_PY} ./AIL_Init.py + popd + # TODO: IMPORTER SCREEN ???? #### SYNC #### - screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x" - sleep 0.1 screen -S "Script_AIL" -X screen -t "ail_2_ail_server" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./ail_2_ail_server.py; read x" sleep 0.1 + screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x" + sleep 0.1 screen -S "Script_AIL" -X screen -t "Sync_manager" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_manager.py; read x" sleep 0.1 ##-- SYNC --## @@ -225,7 +220,7 @@ function launching_scripts { sleep 0.1 screen -S "Script_AIL" -X screen -t "Tags" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Tags.py; read x" sleep 0.1 - screen -S "Script_AIL" -X screen -t "SubmitPaste" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./submit_paste.py; read x" + screen -S "Script_AIL" -X screen -t "SubmitPaste" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./SubmitPaste.py; read x" sleep 0.1 screen -S "Script_AIL" -X screen -t "Crawler" bash -c "cd ${AIL_BIN}/crawlers; ${ENV_PY} ./Crawler.py; read x" @@ -448,14 +443,6 @@ function launch_logs { fi } -function launch_queues { - if [[ ! $isqueued ]]; then - launching_queues; - else - echo -e $RED"\t* A screen is already launched"$DEFAULT - fi -} - function launch_scripts { if [[ ! $isscripted ]]; then ############################# is core sleep 1 @@ -505,19 +492,19 @@ function launch_feeder { } function killscript { - if [[ $islogged || $isqueued || $is_ail_core || $isscripted || $isflasked || $isfeeded || $is_ail_2_ail ]]; then + if [[ $islogged || $is_ail_core || $isscripted || $isflasked || $isfeeded || $is_ail_2_ail ]]; then echo -e $GREEN"Killing Script"$DEFAULT - kill $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail + kill $islogged $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail sleep 0.2 echo -e $ROSE`screen -ls`$DEFAULT - echo -e $GREEN"\t* $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail killed."$DEFAULT + echo -e $GREEN"\t* $islogged $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail killed."$DEFAULT else echo -e $RED"\t* No script to kill"$DEFAULT fi } function killall { - if [[ $isredis || $isardb || $iskvrocks || $islogged || $isqueued || $is_ail_2_ail || $isscripted || $isflasked || $isfeeded || $is_ail_core || $is_ail_2_ail ]]; then + if [[ $isredis || $isardb || $iskvrocks || $islogged || $is_ail_2_ail || $isscripted || $isflasked || $isfeeded || $is_ail_core || $is_ail_2_ail ]]; then if [[ $isredis ]]; then echo -e $GREEN"Gracefully closing redis servers"$DEFAULT shutting_down_redis; @@ -532,10 +519,10 @@ function killall { shutting_down_kvrocks; fi echo -e $GREEN"Killing all"$DEFAULT - kill $isredis $isardb $iskvrocks $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail + kill $isredis $isardb $iskvrocks $islogged $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail sleep 0.2 echo -e $ROSE`screen -ls`$DEFAULT - echo -e $GREEN"\t* $isredis $isardb $iskvrocks $islogged $isqueued $isscripted $is_ail_2_ail $isflasked $isfeeded $is_ail_core killed."$DEFAULT + echo -e $GREEN"\t* $isredis $isardb $iskvrocks $islogged $isscripted $is_ail_2_ail $isflasked $isfeeded $is_ail_core killed."$DEFAULT else echo -e $RED"\t* No screen to kill"$DEFAULT fi @@ -612,14 +599,13 @@ function launch_all { launch_redis; launch_kvrocks; launch_logs; - launch_queues; launch_scripts; launch_flask; } function menu_display { - options=("Redis" "Ardb" "Kvrocks" "Logs" "Queues" "Scripts" "Flask" "Killall" "Update" "Update-config" "Update-thirdparty") + options=("Redis" "Ardb" "Kvrocks" "Logs" "Scripts" "Flask" "Killall" "Update" "Update-config" "Update-thirdparty") menu() { echo "What do you want to Launch?:" @@ -656,9 +642,6 @@ function menu_display { Logs) launch_logs; ;; - Queues) - launch_queues; - ;; Scripts) launch_scripts; ;; diff --git a/bin/PreProcessFeed.py.sample b/bin/PreProcessFeed.py.sample deleted file mode 100755 index a232a007..00000000 --- a/bin/PreProcessFeed.py.sample +++ /dev/null @@ -1,60 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* - -''' -The preProcess Module -===================== - -This module is just an example of how we can pre-process a feed coming from the Mixer -module before seeding it to the Global module. - -''' - -import time -from pubsublogger import publisher - -from Helper import Process - - -def do_something(message): - splitted = message.split() - if len(splitted) == 2: - item_id, gzip64encoded = splitted - - item_id = item_id.replace("pastebin", "pastebinPROCESSED") - - to_send = "{0} {1}".format(item_id, gzip64encoded) - return to_send - - -if __name__ == '__main__': - # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - - # Section name in bin/packages/modules.cfg - config_section = 'PreProcessFeed' - - # Setup the I/O queues - p = Process(config_section) - - # Sent to the logging a description of the module - publisher.info("") - - # Endless loop getting messages from the input queue - while True: - # Get one message from the input queue - message = p.get_from_set() - if message is None: - publisher.debug("{} queue is empty, waiting".format(config_section)) - print("queue empty") - time.sleep(1) - continue - - # Do something with the message from the queue - new_message = do_something(message) - - # (Optional) Send that thing to the next queue - p.populate_set_out(new_message) diff --git a/bin/QueueIn.py b/bin/QueueIn.py deleted file mode 100755 index 4495e9c4..00000000 --- a/bin/QueueIn.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* - - -from pubsublogger import publisher -from Helper import Process -import argparse - - -def run(config_section): - p = Process(config_section) - p.populate_set_in() - - -if __name__ == '__main__': - publisher.port = 6380 - publisher.channel = 'Queuing' - - parser = argparse.ArgumentParser(description='Entry queue for a module.') - parser.add_argument("-c", "--config_section", type=str, - help="Config section to use in the config file.") - args = parser.parse_args() - - run(args.config_section) diff --git a/bin/QueueOut.py b/bin/QueueOut.py deleted file mode 100755 index dbb36513..00000000 --- a/bin/QueueOut.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* - -from pubsublogger import publisher -from Helper import Process -import argparse - - -def run(config_section): - p = Process(config_section) - if not p.publish(): - print(config_section, 'has no publisher.') - - -if __name__ == '__main__': - publisher.port = 6380 - publisher.channel = 'Queuing' - - parser = argparse.ArgumentParser(description='Entry queue for a module.') - parser.add_argument("-c", "--config_section", type=str, - help="Config section to use in the config file.") - args = parser.parse_args() - - run(args.config_section) diff --git a/bin/__init__.py b/bin/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/bin/core/Sync_importer.py b/bin/core/Sync_importer.py index 38854c15..d1f37ee5 100755 --- a/bin/core/Sync_importer.py +++ b/bin/core/Sync_importer.py @@ -79,7 +79,7 @@ class Sync_importer(AbstractModule): message = f'sync {item_id} {b64_gzip_content}' print(item_id) - self.send_message_to_queue(message, 'Mixer') + self.add_message_to_queue(message, 'Importers') if __name__ == '__main__': diff --git a/bin/crawlers/Crawler.py b/bin/crawlers/Crawler.py index 23c0410e..39b15359 100755 --- a/bin/crawlers/Crawler.py +++ b/bin/crawlers/Crawler.py @@ -232,11 +232,11 @@ class Crawler(AbstractModule): gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html']) # send item to Global relay_message = f'crawler {item_id} {gzip64encoded}' - self.send_message_to_queue(relay_message, 'Import') + self.add_message_to_queue(relay_message, 'Importers') # Tag msg = f'infoleak:submission="crawler";{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') crawlers.create_item_metadata(item_id, last_url, parent_id) if self.root_item is None: diff --git a/bin/importer/FeederImporter.py b/bin/importer/FeederImporter.py index f63eea36..e7a06132 100755 --- a/bin/importer/FeederImporter.py +++ b/bin/importer/FeederImporter.py @@ -113,7 +113,7 @@ class FeederModuleImporter(AbstractModule): # TODO HANDLE Invalid JSON json_data = json.loads(message) relay_message = self.importer.importer(json_data) - self.send_message_to_queue(relay_message) + self.add_message_to_queue(relay_message) # Launch Importer diff --git a/bin/importer/ZMQImporter.py b/bin/importer/ZMQImporter.py index 144af904..dd9986d2 100755 --- a/bin/importer/ZMQImporter.py +++ b/bin/importer/ZMQImporter.py @@ -73,7 +73,7 @@ class ZMQModuleImporter(AbstractModule): for message in messages: message = message.decode() print(message.split(' ', 1)[0]) - self.send_message_to_queue(message) + self.add_message_to_queue(message) if __name__ == '__main__': diff --git a/bin/launch_queues.py b/bin/launch_queues.py deleted file mode 100755 index 44e4e249..00000000 --- a/bin/launch_queues.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* - - -import configparser -import os -import subprocess -import time - - -def check_pid(pid): - if pid is None: - # Already seen as finished. - return None - else: - if pid.poll() is not None: - return False - return True - - -if __name__ == '__main__': - configfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') - if not os.path.exists(configfile): - raise Exception('Unable to find the configuration file. \ - Did you set environment variables? \ - Or activate the virtualenv.') - config = configparser.ConfigParser() - config.read(configfile) - - modules = config.sections() - pids = {} - for module in modules: - pin = subprocess.Popen(["python3", './QueueIn.py', '-c', module]) - pout = subprocess.Popen(["python3", './QueueOut.py', '-c', module]) - pids[module] = (pin, pout) - is_running = True - try: - while is_running: - time.sleep(5) - is_running = False - for module, p in pids.items(): - pin, pout = p - if pin is None: - # already dead - pass - elif not check_pid(pin): - print(module, 'input queue finished.') - pin = None - else: - is_running = True - if pout is None: - # already dead - pass - elif not check_pid(pout): - print(module, 'output queue finished.') - pout = None - else: - is_running = True - pids[module] = (pin, pout) - except KeyboardInterrupt: - for module, p in pids.items(): - pin, pout = p - if pin is not None: - pin.kill() - if pout is not None: - pout.kill() diff --git a/bin/lib/ConfigLoader.py b/bin/lib/ConfigLoader.py index 04ae8ee8..5be8f492 100755 --- a/bin/lib/ConfigLoader.py +++ b/bin/lib/ConfigLoader.py @@ -56,6 +56,9 @@ class ConfigLoader(object): directory_path = os.path.join(os.environ['AIL_HOME'], directory_path) return directory_path + def get_config_sections(self): + return self.cfg.sections() + def get_config_str(self, section, key_name): return self.cfg.get(section, key_name) diff --git a/bin/lib/ail_queues.py b/bin/lib/ail_queues.py new file mode 100755 index 00000000..a5950a5d --- /dev/null +++ b/bin/lib/ail_queues.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +# -*-coding:UTF-8 -* + +import os +import sys +import datetime +import time + +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from lib.exceptions import ModuleQueueError +from lib.ConfigLoader import ConfigLoader + +config_loader = ConfigLoader() +r_queues = config_loader.get_redis_conn("Redis_Queues") +config_loader = None + +MODULES_FILE = os.path.join(os.environ['AIL_HOME'], 'configs', 'modules.cfg') + + +class AILQueue: + + def __init__(self, module_name, module_pid): + self.name = module_name + self.pid = module_pid + self._set_subscriber() + # Update queue stat + r_queues.hset('queues', self.name, self.get_nb_messages()) + r_queues.hset(f'modules', f'{self.pid}:{self.name}', -1) + + def _set_subscriber(self): + subscribers = {} + module_config_loader = ConfigLoader(config_file=MODULES_FILE) # TODO CHECK IF FILE EXISTS + if not module_config_loader.has_section(self.name): + raise ModuleQueueError(f'No Section defined for this module: {self.name}. Please add one in configs/module.cfg') + + if module_config_loader.has_option(self.name, 'publish'): + subscribers_queues = module_config_loader.get_config_str(self.name, 'publish') + if subscribers_queues: + subscribers_queues = set(subscribers_queues.split(',')) + for queue_name in subscribers_queues: + subscribers[queue_name] = set() + if subscribers_queues: + for module in module_config_loader.get_config_sections(): + if module_config_loader.has_option(module, 'subscribe'): + queue_name = module_config_loader.get_config_str(module, 'subscribe') + if queue_name in subscribers: + subscribers[queue_name].add(module) + self.subscribers_modules = subscribers + + def get_nb_messages(self): + return r_queues.llen(f'queue:{self.name}:in') + + def get_message(self): + # Update queues stats + r_queues.hset('queues', self.name, self.get_nb_messages()) + r_queues.hset(f'modules', f'{self.pid}:{self.name}', int(time.time())) + # Get Message + message = r_queues.lpop(f'queue:{self.name}:in') + if not message: + return None + else: + # TODO SAVE CURRENT ITEMS (OLD Module information) + + return message + + def send_message(self, message, queue_name=None): + if not self.subscribers_modules: + raise ModuleQueueError('This Module don\'t have any subscriber') + if queue_name: + if queue_name not in self.subscribers_modules: + raise ModuleQueueError(f'send_message: Unknown queue_name {queue_name}') + else: + if len(self.subscribers_modules) > 1: + raise ModuleQueueError('Queue name required. This module push to multiple queues') + queue_name = list(self.subscribers_modules)[0] + + # Add message to all modules + for module_name in self.subscribers_modules[queue_name]: + r_queues.rpush(f'queue:{module_name}:in', message) + # stats + nb_mess = r_queues.llen(f'queue:{module_name}:in') + r_queues.hset('queues', module_name, nb_mess) + + # TODO + def refresh(self): + # TODO check cache + self._set_subscriber() + + def clear(self): + r_queues.delete(f'queue:{self.name}:in') + + def error(self): + r_queues.hdel(f'modules', f'{self.pid}:{self.name}') + +def get_queues_modules(): + return r_queues.hkeys('queues') + +def get_nb_queues_modules(): + return r_queues.hgetall('queues') + +def get_nb_sorted_queues_modules(): + res = r_queues.hgetall('queues') + res = sorted(res.items()) + return res + +def get_modules_pid_last_mess(): + return r_queues.hgetall('modules') + +def get_modules_queues_stats(): + modules_queues_stats = [] + nb_queues_modules = get_nb_queues_modules() + modules_pid_last_mess = get_modules_pid_last_mess() + added_modules = set() + for row_module in modules_pid_last_mess: + pid, module = row_module.split(':', 1) + last_time = modules_pid_last_mess[row_module] + last_time = datetime.datetime.fromtimestamp(int(last_time)) + seconds = int((datetime.datetime.now() - last_time).total_seconds()) + modules_queues_stats.append((module, nb_queues_modules[module], seconds, pid)) + added_modules.add(module) + for module in nb_queues_modules: + if module not in added_modules: + modules_queues_stats.append((module, nb_queues_modules[module], -1, 'Not Launched')) + return sorted(modules_queues_stats) + +def clear_modules_queues_stats(): + r_queues.delete('modules') + +def get_queue_digraph(): + queues_ail = {} + modules = {} + module_config_loader = ConfigLoader(config_file=MODULES_FILE) + for module in module_config_loader.get_config_sections(): + if module_config_loader.has_option(module, 'subscribe'): + if module not in modules: + modules[module] = {'in': set(), 'out': set()} + queue = module_config_loader.get_config_str(module, 'subscribe') + modules[module]['in'].add(queue) + if queue not in queues_ail: + queues_ail[queue] = [] + queues_ail[queue].append(module) + + if module_config_loader.has_option(module, 'publish'): + if module not in modules: + modules[module] = {'in': set(), 'out': set()} + queues = module_config_loader.get_config_str(module, 'publish') + for queue in queues.split(','): + modules[module]['out'].add(queue) + + # print(modules) + # print(queues_ail) + + mapped = set() + import_modules = set() + edges = '# Define edges between nodes\n' + for module in modules: + for queue_name in modules[module]['out']: + if queue_name == 'Importers': + import_modules.add(module) + if queue_name in queues_ail: + for module2 in queues_ail[queue_name]: + to_break = False + new_edge = None + cluster_out = f'cluster_{queue_name.lower()}' + queue_in = modules[module]['in'] + if queue_in: + queue_in = next(iter(queue_in)) + if len(queues_ail.get(queue_in, [])) == 1: + cluster_in = f'cluster_{queue_in.lower()}' + new_edge = f'{module} -> {module2} [ltail="{cluster_in}" lhead="{cluster_out}"];\n' + to_break = True + if not new_edge: + new_edge = f'{module} -> {module2} [lhead="{cluster_out}"];\n' + to_map = f'{module}:{cluster_out}' + if to_map not in mapped: + mapped.add(to_map) + edges = f'{edges}{new_edge}' + if to_break: + break + + subgraph = '# Define subgraphs for each queue\n' + for queue_name in queues_ail: + cluster_name = f'cluster_{queue_name.lower()}' + subgraph = f'{subgraph} subgraph {cluster_name} {{\n label="Queue {queue_name}";\n color=blue;\n' + for module in queues_ail[queue_name]: + subgraph = f'{subgraph} {module};\n' + subgraph = f'{subgraph}}}\n\n' + + cluster_name = f'cluster_importers' + subgraph = f'{subgraph} subgraph {cluster_name} {{\n label="AIL Importers";\n color=red;\n' + for module in import_modules: + subgraph = f'{subgraph} {module};\n' + subgraph = f'{subgraph}}}\n\n' + + digraph = 'digraph Modules {\ngraph [rankdir=LR splines=ortho];\nnode [shape=rectangle]\ncompound=true;\n' + digraph = f'{digraph}edge[arrowhead=open color=salmon]\n\n' + digraph = f'{digraph}{subgraph}{edges}\n}}\n' + return digraph + +def save_queue_digraph(): + import subprocess + digraph = get_queue_digraph() + dot_file = os.path.join(os.environ['AIL_HOME'], 'doc/ail_queues.dot') + svg_file = os.path.join(os.environ['AIL_HOME'], 'doc/ail_queues.svg') + with open(dot_file, 'w') as f: + f.write(digraph) + + print('dot', '-Tsvg', dot_file, '-o', svg_file) + process = subprocess.run(['dot', '-Tsvg', dot_file, '-o', svg_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if process.returncode == 0: + # modified_files = process.stdout + # print(process.stdout) + return True + else: + print(process.stderr.decode()) + sys.exit(1) + + +########################################################################################### +########################################################################################### +########################################################################################### +########################################################################################### +########################################################################################### + +# def get_all_queues_name(): +# return r_queues.hkeys('queues') +# +# def get_all_queues_dict_with_nb_elem(): +# return r_queues.hgetall('queues') +# +# def get_all_queues_with_sorted_nb_elem(): +# res = r_queues.hgetall('queues') +# res = sorted(res.items()) +# return res +# +# def get_module_pid_by_queue_name(queue_name): +# return r_queues.smembers('MODULE_TYPE_{}'.format(queue_name)) +# +# # # TODO: remove last msg part +# def get_module_last_process_start_time(queue_name, module_pid): +# res = r_queues.get('MODULE_{}_{}'.format(queue_name, module_pid)) +# if res: +# return res.split(',')[0] +# return None +# +# def get_module_last_msg(queue_name, module_pid): +# return r_queues.get('MODULE_{}_{}_PATH'.format(queue_name, module_pid)) +# +# def get_all_modules_queues_stats(): +# all_modules_queues_stats = [] +# for queue_name, nb_elem_queue in get_all_queues_with_sorted_nb_elem(): +# l_module_pid = get_module_pid_by_queue_name(queue_name) +# for module_pid in l_module_pid: +# last_process_start_time = get_module_last_process_start_time(queue_name, module_pid) +# if last_process_start_time: +# last_process_start_time = datetime.datetime.fromtimestamp(int(last_process_start_time)) +# seconds = int((datetime.datetime.now() - last_process_start_time).total_seconds()) +# else: +# seconds = 0 +# all_modules_queues_stats.append((queue_name, nb_elem_queue, seconds, module_pid)) +# return all_modules_queues_stats +# +# +# def _get_all_messages_from_queue(queue_name): +# #self.r_temp.hset('queues', self.subscriber_name, int(self.r_temp.scard(in_set))) +# return r_queues.smembers(f'queue:{queue_name}:in') +# +# # def is_message_in queue(queue_name): +# # pass +# +# def remove_message_from_queue(queue_name, message): +# queue_key = f'queue:{queue_name}:in' +# r_queues.srem(queue_key, message) +# r_queues.hset('queues', queue_name, int(r_queues.scard(queue_key))) + + +if __name__ == '__main__': + # clear_modules_queues_stats() + save_queue_digraph() diff --git a/bin/lib/exceptions.py b/bin/lib/exceptions.py index 3b8ab98d..864358fc 100755 --- a/bin/lib/exceptions.py +++ b/bin/lib/exceptions.py @@ -13,3 +13,6 @@ class UpdateInvestigationError(AIL_ERROR): class NewTagError(AIL_ERROR): pass + +class ModuleQueueError(AIL_ERROR): + pass diff --git a/bin/lib/module_extractor.py b/bin/lib/module_extractor.py index fe81f9cb..38f9a7e2 100755 --- a/bin/lib/module_extractor.py +++ b/bin/lib/module_extractor.py @@ -36,10 +36,10 @@ r_key = regex_helper.generate_redis_cache_key('extractor') # TODO UI Link MODULES = { - 'infoleak:automatic-detection="credit-card"': CreditCards(), - 'infoleak:automatic-detection="iban"': Iban(), - 'infoleak:automatic-detection="mail"': Mail(), - 'infoleak:automatic-detection="onion"': Onion(), + 'infoleak:automatic-detection="credit-card"': CreditCards(queue=False), + 'infoleak:automatic-detection="iban"': Iban(queue=False), + 'infoleak:automatic-detection="mail"': Mail(queue=False), + 'infoleak:automatic-detection="onion"': Onion(queue=False), # APIkey ??? # Credentials # Zerobins @@ -47,7 +47,7 @@ MODULES = { # SQL Injetction / Libinjection ??? } -tools = Tools() +tools = Tools(queue=False) for tool_name in tools.get_tools(): MODULES[f'infoleak:automatic-detection="{tool_name}-tool"'] = tools diff --git a/bin/lib/queues_modules.py b/bin/lib/queues_modules.py deleted file mode 100755 index 5a1e87c6..00000000 --- a/bin/lib/queues_modules.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* - -import os -import sys -import datetime - -sys.path.append(os.environ['AIL_BIN']) -################################## -# Import Project packages -################################## -from lib import ConfigLoader - -config_loader = ConfigLoader.ConfigLoader() -r_serv_queues = config_loader.get_redis_conn("Redis_Queues") -config_loader = None - -def get_all_queues_name(): - return r_serv_queues.hkeys('queues') - -def get_all_queues_dict_with_nb_elem(): - return r_serv_queues.hgetall('queues') - -def get_all_queues_with_sorted_nb_elem(): - res = r_serv_queues.hgetall('queues') - res = sorted(res.items()) - return res - -def get_module_pid_by_queue_name(queue_name): - return r_serv_queues.smembers('MODULE_TYPE_{}'.format(queue_name)) - -# # TODO: remove last msg part -def get_module_last_process_start_time(queue_name, module_pid): - res = r_serv_queues.get('MODULE_{}_{}'.format(queue_name, module_pid)) - if res: - return res.split(',')[0] - return None - -def get_module_last_msg(queue_name, module_pid): - return r_serv_queues.get('MODULE_{}_{}_PATH'.format(queue_name, module_pid)) - -def get_all_modules_queues_stats(): - all_modules_queues_stats = [] - for queue_name, nb_elem_queue in get_all_queues_with_sorted_nb_elem(): - l_module_pid = get_module_pid_by_queue_name(queue_name) - for module_pid in l_module_pid: - last_process_start_time = get_module_last_process_start_time(queue_name, module_pid) - if last_process_start_time: - last_process_start_time = datetime.datetime.fromtimestamp(int(last_process_start_time)) - seconds = int((datetime.datetime.now() - last_process_start_time).total_seconds()) - else: - seconds = 0 - all_modules_queues_stats.append((queue_name, nb_elem_queue, seconds, module_pid)) - return all_modules_queues_stats - - -def _get_all_messages_from_queue(queue_name): - queue_in = f'{queue_name}in' - #self.r_temp.hset('queues', self.subscriber_name, int(self.r_temp.scard(in_set))) - return r_serv_queues.smembers(queue_in) - -# def is_message_in queue(queue_name): -# pass - -def remove_message_from_queue(queue_name, message, out=False): - if out: - queue_key = f'{queue_name}out' - else: - queue_key = f'{queue_name}in' - r_serv_queues.srem(queue_in, message) - if not out: - r_serv_queues.hset('queues', queue_name, int(r_serv_queues.scard(queue_key)) ) - -if __name__ == '__main__': - print(get_all_queues_with_sorted_nb_elem()) - queue_name = 'Tags' - res = _get_all_messages_from_queue(queue_name) - print(res) diff --git a/bin/modules/ApiKey.py b/bin/modules/ApiKey.py index 6b2cf9b3..92e0d2a5 100755 --- a/bin/modules/ApiKey.py +++ b/bin/modules/ApiKey.py @@ -64,7 +64,7 @@ class ApiKey(AbstractModule): self.redis_logger.warning(f'{to_print}Checked {len(google_api_key)} found Google API Key;{item.get_id()}') msg = f'infoleak:automatic-detection="google-api-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # # TODO: # FIXME: AWS regex/validate/sanitize KEY + SECRET KEY if aws_access_key: @@ -75,11 +75,11 @@ class ApiKey(AbstractModule): self.redis_logger.warning(f'{to_print}Checked {len(aws_secret_key)} found AWS secret Key;{item.get_id()}') msg = 'infoleak:automatic-detection="aws-key";{}'.format(item.get_id()) - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # Tags msg = f'infoleak:automatic-detection="api-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if r_result: return google_api_key, aws_access_key, aws_secret_key diff --git a/bin/modules/Categ.py b/bin/modules/Categ.py index f163001d..f1cfe57f 100755 --- a/bin/modules/Categ.py +++ b/bin/modules/Categ.py @@ -43,6 +43,7 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader from lib.objects.Items import Item @@ -59,8 +60,10 @@ class Categ(AbstractModule): self.categ_files_dir = categ_files_dir + config_loader = ConfigLoader() + # default = 1 string - self.matchingThreshold = self.process.config.getint("Categ", "matchingThreshold") + self.matchingThreshold = config_loader.get_config_int("Categ", "matchingThreshold") self.reload_categ_words() self.redis_logger.info("Script Categ started") @@ -95,20 +98,13 @@ class Categ(AbstractModule): # Export message to categ queue print(msg, categ) - self.send_message_to_queue(msg, categ) + self.add_message_to_queue(msg, categ) self.redis_logger.info( f'Categ;{item.get_source()};{item.get_date()};{item.get_basename()};Detected {lenfound} as {categ};{item.get_id()}') if r_result: return categ_found - # DIRTY FIX AIL SYNC - # # FIXME: DIRTY FIX - message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' - print(message) - self.send_message_to_queue(message, 'SyncModule') - - if __name__ == '__main__': # SCRIPT PARSER # diff --git a/bin/modules/Credential.py b/bin/modules/Credential.py index 31c9a8f4..3c1efd9c 100755 --- a/bin/modules/Credential.py +++ b/bin/modules/Credential.py @@ -112,7 +112,7 @@ class Credential(AbstractModule): self.redis_logger.warning(to_print) msg = f'infoleak:automatic-detection="credential";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') site_occurrence = self.regex_findall(self.regex_site_for_stats, item.get_id(), item_content) @@ -138,11 +138,11 @@ class Credential(AbstractModule): else: creds_sites[domain] = 1 - for site, num in creds_sites.items(): # Send for each different site to moduleStats - - mssg = f'credential;{num};{site};{item.get_date()}' - print(mssg) - self.send_message_to_queue(mssg, 'ModuleStats') + # for site, num in creds_sites.items(): # Send for each different site to moduleStats + # + # mssg = f'credential;{num};{site};{item.get_date()}' + # print(mssg) + # self.add_message_to_queue(mssg, 'ModuleStats') if all_sites: discovered_sites = ', '.join(all_sites) diff --git a/bin/modules/CreditCards.py b/bin/modules/CreditCards.py index 33bdebe5..5757e1b2 100755 --- a/bin/modules/CreditCards.py +++ b/bin/modules/CreditCards.py @@ -31,8 +31,8 @@ class CreditCards(AbstractModule): CreditCards module for AIL framework """ - def __init__(self): - super(CreditCards, self).__init__() + def __init__(self, queue=True): + super(CreditCards, self).__init__(queue=queue) # Source: http://www.richardsramblings.com/regex/credit-card-numbers/ cards = [ @@ -90,7 +90,7 @@ class CreditCards(AbstractModule): self.redis_logger.warning(mess) msg = f'infoleak:automatic-detection="credit-card";{item.id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if r_result: return creditcard_set diff --git a/bin/modules/Cryptocurrencies.py b/bin/modules/Cryptocurrencies.py index f77551ff..dbb5d637 100755 --- a/bin/modules/Cryptocurrencies.py +++ b/bin/modules/Cryptocurrencies.py @@ -135,13 +135,13 @@ class Cryptocurrencies(AbstractModule, ABC): # Check private key if is_valid_address: msg = f'{currency["tag"]};{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if currency.get('private_key'): private_keys = self.regex_findall(currency['private_key']['regex'], item_id, content) if private_keys: msg = f'{currency["private_key"]["tag"]};{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # debug print(private_keys) diff --git a/bin/modules/CveModule.py b/bin/modules/CveModule.py index 93e2e17f..a3f9b436 100755 --- a/bin/modules/CveModule.py +++ b/bin/modules/CveModule.py @@ -63,7 +63,7 @@ class CveModule(AbstractModule): msg = f'infoleak:automatic-detection="cve";{item_id}' # Send to Tags Queue - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if __name__ == '__main__': diff --git a/bin/modules/Decoder.py b/bin/modules/Decoder.py index bcb9bfd2..41a3fae1 100755 --- a/bin/modules/Decoder.py +++ b/bin/modules/Decoder.py @@ -123,7 +123,7 @@ class Decoder(AbstractModule): # Send to Tags msg = f'infoleak:automatic-detection="{dname}";{item.id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if __name__ == '__main__': diff --git a/bin/modules/DomClassifier.py b/bin/modules/DomClassifier.py index da2cea14..57d7633c 100755 --- a/bin/modules/DomClassifier.py +++ b/bin/modules/DomClassifier.py @@ -23,6 +23,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from modules.abstract_module import AbstractModule from lib.objects.Items import Item +from lib.ConfigLoader import ConfigLoader from lib import d4 @@ -34,15 +35,17 @@ class DomClassifier(AbstractModule): def __init__(self): super(DomClassifier, self).__init__() + config_loader = ConfigLoader() + # Waiting time in seconds between to message processed self.pending_seconds = 1 - addr_dns = self.process.config.get("DomClassifier", "dns") + addr_dns = config_loader.get_config_str("DomClassifier", "dns") self.c = DomainClassifier.domainclassifier.Extract(rawtext="", nameservers=[addr_dns]) - self.cc = self.process.config.get("DomClassifier", "cc") - self.cc_tld = self.process.config.get("DomClassifier", "cc_tld") + self.cc = config_loader.get_config_str("DomClassifier", "cc") + self.cc_tld = config_loader.get_config_str("DomClassifier", "cc_tld") # Send module state to logs self.redis_logger.info(f"Module: {self.module_name} Launched") @@ -66,7 +69,7 @@ class DomClassifier(AbstractModule): if self.c.vdomain and d4.is_passive_dns_enabled(): for dns_record in self.c.vdomain: - self.send_message_to_queue(dns_record) + self.add_message_to_queue(dns_record) localizeddomains = self.c.include(expression=self.cc_tld) if localizeddomains: diff --git a/bin/modules/Global.py b/bin/modules/Global.py index 2db4a389..77d27086 100755 --- a/bin/modules/Global.py +++ b/bin/modules/Global.py @@ -39,8 +39,10 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from modules.abstract_module import AbstractModule from lib.ail_core import get_ail_uuid +from lib.ConfigLoader import ConfigLoader from lib.data_retention_engine import update_obj_date -from lib import item_basic +from lib.objects.Items import Item + # from lib import Statistics class Global(AbstractModule): @@ -54,11 +56,12 @@ class Global(AbstractModule): self.processed_item = 0 self.time_last_stats = time.time() + config_loader = ConfigLoader() + # Get and sanitize ITEM DIRECTORY # # TODO: rename PASTE => ITEM - self.PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], self.process.config.get("Directories", "pastes")) - self.PASTES_FOLDERS = self.PASTES_FOLDER + '/' - self.PASTES_FOLDERS = os.path.join(os.path.realpath(self.PASTES_FOLDERS), '') + self.ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/' + self.ITEMS_FOLDER = os.path.join(os.path.realpath(self.ITEMS_FOLDER), '') # Waiting time in seconds between to message processed self.pending_seconds = 0.5 @@ -85,9 +88,9 @@ class Global(AbstractModule): if len(splitted) == 2: item, gzip64encoded = splitted - # Remove PASTES_FOLDER from item path (crawled item + submitted) - if self.PASTES_FOLDERS in item: - item = item.replace(self.PASTES_FOLDERS, '', 1) + # Remove ITEMS_FOLDER from item path (crawled item + submitted) + if self.ITEMS_FOLDER in item: + item = item.replace(self.ITEMS_FOLDER, '', 1) file_name_item = item.split('/')[-1] if len(file_name_item) > 255: @@ -95,11 +98,11 @@ class Global(AbstractModule): item = self.rreplace(item, file_name_item, new_file_name_item, 1) # Creating the full filepath - filename = os.path.join(self.PASTES_FOLDER, item) + filename = os.path.join(self.ITEMS_FOLDER, item) filename = os.path.realpath(filename) # Incorrect filename - if not os.path.commonprefix([filename, self.PASTES_FOLDER]) == self.PASTES_FOLDER: + if not os.path.commonprefix([filename, self.ITEMS_FOLDER]) == self.ITEMS_FOLDER: self.redis_logger.warning(f'Global; Path traversal detected {filename}') print(f'Global; Path traversal detected {filename}') @@ -121,14 +124,23 @@ class Global(AbstractModule): f.write(decoded) item_id = filename - # remove self.PASTES_FOLDER from - if self.PASTES_FOLDERS in item_id: - item_id = item_id.replace(self.PASTES_FOLDERS, '', 1) + # remove self.ITEMS_FOLDER from + if self.ITEMS_FOLDER in item_id: + item_id = item_id.replace(self.ITEMS_FOLDER, '', 1) - update_obj_date(item_basic.get_item_date(item_id), 'item') + item = Item(item_id) - self.send_message_to_queue(item_id) + update_obj_date(item.get_date(), 'item') + + self.add_message_to_queue(item_id, 'Item') self.processed_item += 1 + + # DIRTY FIX AIL SYNC - SEND TO SYNC MODULE + # # FIXME: DIRTY FIX + message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' + print(message) + self.add_message_to_queue(message, 'Sync') + print(item_id) if r_result: return item_id diff --git a/bin/modules/Hosts.py b/bin/modules/Hosts.py index 782681ee..3509b5ea 100755 --- a/bin/modules/Hosts.py +++ b/bin/modules/Hosts.py @@ -62,7 +62,7 @@ class Hosts(AbstractModule): # print(host) msg = f'{host} {item.get_id()}' - self.send_message_to_queue(msg, 'Host') + self.add_message_to_queue(msg, 'Host') if __name__ == '__main__': diff --git a/bin/modules/Iban.py b/bin/modules/Iban.py index 0de3d256..091a85a5 100755 --- a/bin/modules/Iban.py +++ b/bin/modules/Iban.py @@ -37,8 +37,8 @@ class Iban(AbstractModule): enumerate(string.ascii_lowercase, 10)) LETTERS_IBAN = {ord(d): str(i) for i, d in _LETTERS_IBAN} - def __init__(self): - super(Iban, self).__init__() + def __init__(self, queue=True): + super(Iban, self).__init__(queue=queue) # Waiting time in secondes between to message proccessed self.pending_seconds = 10 @@ -98,7 +98,7 @@ class Iban(AbstractModule): self.redis_logger.warning(f'{to_print}Checked found {len(valid_ibans)} IBAN;{item_id}') # Tags msg = f'infoleak:automatic-detection="iban";{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if __name__ == '__main__': diff --git a/bin/modules/Indexer.py b/bin/modules/Indexer.py index f097c333..75052329 100755 --- a/bin/modules/Indexer.py +++ b/bin/modules/Indexer.py @@ -25,6 +25,7 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader from lib.objects.Items import Item @@ -42,14 +43,13 @@ class Indexer(AbstractModule): """ super(Indexer, self).__init__() + config_loader = ConfigLoader() + # Indexer configuration - index dir and schema setup - self.baseindexpath = join(os.environ['AIL_HOME'], - self.process.config.get("Indexer", "path")) - self.indexRegister_path = join(os.environ['AIL_HOME'], - self.process.config.get("Indexer", "register")) - self.indexertype = self.process.config.get("Indexer", "type") - self.INDEX_SIZE_THRESHOLD = self.process.config.getint( - "Indexer", "index_max_size") + self.baseindexpath = join(os.environ['AIL_HOME'], config_loader.get_config_str("Indexer", "path")) + self.indexRegister_path = join(os.environ['AIL_HOME'], config_loader.get_config_str("Indexer", "register")) + self.indexertype = config_loader.get_config_str("Indexer", "type") + self.INDEX_SIZE_THRESHOLD = config_loader.get_config_int("Indexer", "index_max_size") self.indexname = None self.schema = None diff --git a/bin/modules/Keys.py b/bin/modules/Keys.py index 91d3c7bf..7ed1845c 100755 --- a/bin/modules/Keys.py +++ b/bin/modules/Keys.py @@ -66,32 +66,32 @@ class Keys(AbstractModule): self.redis_logger.warning(f'{item.get_basename()} has a PGP enc message') msg = f'infoleak:automatic-detection="pgp-message";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') get_pgp_content = True # find = True if KeyEnum.PGP_PUBLIC_KEY_BLOCK.value in content: msg = f'infoleak:automatic-detection="pgp-public-key-block";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') get_pgp_content = True if KeyEnum.PGP_SIGNATURE.value in content: msg = f'infoleak:automatic-detection="pgp-signature";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') get_pgp_content = True if KeyEnum.PGP_PRIVATE_KEY_BLOCK.value in content: self.redis_logger.warning(f'{item.get_basename()} has a pgp private key block message') msg = f'infoleak:automatic-detection="pgp-private-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') get_pgp_content = True if KeyEnum.CERTIFICATE.value in content: self.redis_logger.warning(f'{item.get_basename()} has a certificate message') msg = f'infoleak:automatic-detection="certificate";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.RSA_PRIVATE_KEY.value in content: @@ -99,7 +99,7 @@ class Keys(AbstractModule): print('rsa private key message found') msg = f'infoleak:automatic-detection="rsa-private-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.PRIVATE_KEY.value in content: @@ -107,7 +107,7 @@ class Keys(AbstractModule): print('private key message found') msg = f'infoleak:automatic-detection="private-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.ENCRYPTED_PRIVATE_KEY.value in content: @@ -115,7 +115,7 @@ class Keys(AbstractModule): print('encrypted private key message found') msg = f'infoleak:automatic-detection="encrypted-private-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.OPENSSH_PRIVATE_KEY.value in content: @@ -123,7 +123,7 @@ class Keys(AbstractModule): print('openssh private key message found') msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.SSH2_ENCRYPTED_PRIVATE_KEY.value in content: @@ -131,7 +131,7 @@ class Keys(AbstractModule): print('SSH2 private key message found') msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.OPENVPN_STATIC_KEY_V1.value in content: @@ -139,37 +139,37 @@ class Keys(AbstractModule): print('OpenVPN Static key message found') msg = f'infoleak:automatic-detection="vpn-static-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.DSA_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has a dsa private key message') msg = f'infoleak:automatic-detection="dsa-private-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.EC_PRIVATE_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has an ec private key message') msg = f'infoleak:automatic-detection="ec-private-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True if KeyEnum.PUBLIC_KEY.value in content: self.redis_logger.warning(f'{item.get_basename()} has a public key message') msg = f'infoleak:automatic-detection="public-key";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # find = True # pgp content if get_pgp_content: - self.send_message_to_queue(item.get_id(), 'PgpDump') + self.add_message_to_queue(item.get_id(), 'PgpDump') # if find : # # Send to duplicate - # self.send_message_to_queue(item.get_id(), 'Duplicate') + # self.add_message_to_queue(item.get_id(), 'Duplicate') # self.redis_logger.debug(f'{item.get_id()} has key(s)') # print(f'{item.get_id()} has key(s)') diff --git a/bin/modules/LibInjection.py b/bin/modules/LibInjection.py index a27c5964..9f357441 100755 --- a/bin/modules/LibInjection.py +++ b/bin/modules/LibInjection.py @@ -78,7 +78,7 @@ class LibInjection(AbstractModule): # Add tag msg = f'infoleak:automatic-detection="sql-injection";{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # statistics # # # TODO: # FIXME: remove me diff --git a/bin/modules/Mail.py b/bin/modules/Mail.py index 5785f1eb..10d55877 100755 --- a/bin/modules/Mail.py +++ b/bin/modules/Mail.py @@ -36,8 +36,8 @@ class Mail(AbstractModule): Module Mail module for AIL framework """ - def __init__(self): - super(Mail, self).__init__() + def __init__(self, queue=True): + super(Mail, self).__init__(queue=queue) config_loader = ConfigLoader() self.r_cache = config_loader.get_redis_conn("Redis_Cache") @@ -158,8 +158,8 @@ class Mail(AbstractModule): num_valid_email += nb_mails # Create domain_mail stats - msg = f'mail;{nb_mails};{domain_mx};{item_date}' - self.send_message_to_queue(msg, 'ModuleStats') + # msg = f'mail;{nb_mails};{domain_mx};{item_date}' + # self.add_message_to_queue(msg, 'ModuleStats') # Create country stats self.faup.decode(domain_mx) @@ -178,7 +178,7 @@ class Mail(AbstractModule): self.redis_logger.warning(msg) # Tags msg = f'infoleak:automatic-detection="mail";{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') else: self.redis_logger.info(msg) diff --git a/bin/modules/Mixer.py b/bin/modules/Mixer.py index f57aefa3..2fdb5228 100755 --- a/bin/modules/Mixer.py +++ b/bin/modules/Mixer.py @@ -173,7 +173,7 @@ class Mixer(AbstractModule): self.r_cache.expire(digest, self.ttl_key) self.increase_stat_processed(feeder_name) - self.send_message_to_queue(relay_message) + self.add_message_to_queue(relay_message) # Need To Be Fixed, Currently doesn't check the source (-> same as operation 1) # # Keep duplicate coming from different sources @@ -189,7 +189,7 @@ class Mixer(AbstractModule): # self.r_cache.expire(item_id, self.ttl_key) # self.r_cache.expire(f'HASH_{item_id}', self.ttl_key) # - # self.send_message_to_queue(relay_message) + # self.add_message_to_queue(relay_message) # # else: # if digest != older_digest: @@ -199,7 +199,7 @@ class Mixer(AbstractModule): # self.r_cache.sadd(item_id, feeder_name) # self.r_cache.expire(item_id, ttl_key) # - # self.send_message_to_queue(relay_message) + # self.add_message_to_queue(relay_message) # # else: # # Already processed @@ -210,7 +210,7 @@ class Mixer(AbstractModule): # No Filtering else: self.increase_stat_processed(feeder_name) - self.send_message_to_queue(relay_message) + self.add_message_to_queue(relay_message) if __name__ == "__main__": diff --git a/bin/modules/Onion.py b/bin/modules/Onion.py index e8bb5c7a..d5b25326 100755 --- a/bin/modules/Onion.py +++ b/bin/modules/Onion.py @@ -29,8 +29,8 @@ from lib import crawlers class Onion(AbstractModule): """docstring for Onion module.""" - def __init__(self): - super(Onion, self).__init__() + def __init__(self, queue=True): + super(Onion, self).__init__(queue=queue) config_loader = ConfigLoader() self.r_cache = config_loader.get_redis_conn("Redis_Cache") @@ -101,7 +101,7 @@ class Onion(AbstractModule): # TAG Item msg = f'infoleak:automatic-detection="onion";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if __name__ == "__main__": diff --git a/bin/modules/PgpDump.py b/bin/modules/PgpDump.py index c6f91740..ea9454cd 100755 --- a/bin/modules/PgpDump.py +++ b/bin/modules/PgpDump.py @@ -235,7 +235,7 @@ class PgpDump(AbstractModule): if self.symmetrically_encrypted: msg = f'infoleak:automatic-detection="pgp-symmetric";{self.item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if __name__ == '__main__': diff --git a/bin/modules/Phone.py b/bin/modules/Phone.py index 3a936105..997b5826 100755 --- a/bin/modules/Phone.py +++ b/bin/modules/Phone.py @@ -54,7 +54,7 @@ class Phone(AbstractModule): self.redis_logger.warning(f'{item.get_id()} contains PID (phone numbers)') msg = f'infoleak:automatic-detection="phone-number";{item.get_id()}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') stats = {} for phone_number in results: diff --git a/bin/modules/SQLInjectionDetection.py b/bin/modules/SQLInjectionDetection.py index 34e50810..e0827ce6 100755 --- a/bin/modules/SQLInjectionDetection.py +++ b/bin/modules/SQLInjectionDetection.py @@ -59,7 +59,7 @@ class SQLInjectionDetection(AbstractModule): # Tag msg = f'infoleak:automatic-detection="sql-injection";{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # statistics # tld = url_parsed['tld'] diff --git a/bin/modules/SentimentAnalysis.py b/bin/modules/SentimentAnalysis.py index 2f625809..5aea7e7f 100755 --- a/bin/modules/SentimentAnalysis.py +++ b/bin/modules/SentimentAnalysis.py @@ -78,7 +78,6 @@ class SentimentAnalysis(AbstractModule): try: self.analyse(message) except TimeoutException: - self.process.incr_module_timeout_statistic() self.redis_logger.debug(f"{message} processing timeout") else: signal.alarm(0) diff --git a/bin/modules/submit_paste.py b/bin/modules/SubmitPaste.py similarity index 98% rename from bin/modules/submit_paste.py rename to bin/modules/SubmitPaste.py index c1e13b34..92e1fcc9 100755 --- a/bin/modules/submit_paste.py +++ b/bin/modules/SubmitPaste.py @@ -24,6 +24,7 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from modules.abstract_module import AbstractModule +from lib.objects.Items import ITEMS_FOLDER from lib import ConfigLoader from lib import Tag @@ -45,7 +46,7 @@ class SubmitPaste(AbstractModule): """ init """ - super(SubmitPaste, self).__init__(queue_name='submit_paste') + super(SubmitPaste, self).__init__() # TODO KVROCKS self.r_serv_db = ConfigLoader.ConfigLoader().get_db_conn("Kvrocks_DB") @@ -262,8 +263,7 @@ class SubmitPaste(AbstractModule): source = source if source else 'submitted' save_path = source + '/' + now.strftime("%Y") + '/' + now.strftime("%m") + '/' + now.strftime("%d") + '/submitted_' + name + '.gz' - full_path = os.path.join(os.environ['AIL_HOME'], - self.process.config.get("Directories", "pastes"), save_path) + full_path = os.path.join(ITEMS_FOLDER, save_path) self.redis_logger.debug(f'file path of the paste {full_path}') @@ -281,7 +281,7 @@ class SubmitPaste(AbstractModule): # send paste to Global module relay_message = f"submitted {rel_item_path} {gzip64encoded}" - self.process.populate_set_out(relay_message) + self.add_message_to_queue(relay_message) # add tags for tag in ltags: diff --git a/bin/modules/Tags.py b/bin/modules/Tags.py index 6300a1d1..eb8a4e8d 100755 --- a/bin/modules/Tags.py +++ b/bin/modules/Tags.py @@ -50,10 +50,10 @@ class Tags(AbstractModule): print(f'{item.get_id()}: Tagged {tag}') # Forward message to channel - self.send_message_to_queue(message, 'MISP_The_Hive_feeder') + self.add_message_to_queue(message, 'Tag_feed') message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}' - self.send_message_to_queue(message, 'SyncModule') + self.add_message_to_queue(message, 'Sync') else: # Malformed message diff --git a/bin/modules/Telegram.py b/bin/modules/Telegram.py index cb5538af..9cf4a2b6 100755 --- a/bin/modules/Telegram.py +++ b/bin/modules/Telegram.py @@ -86,7 +86,7 @@ class Telegram(AbstractModule): if invite_code_found: # tags msg = f'infoleak:automatic-detection="telegram-invite-hash";{item.id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if __name__ == "__main__": diff --git a/bin/modules/Tools.py b/bin/modules/Tools.py index 57dc6a23..7401c2be 100755 --- a/bin/modules/Tools.py +++ b/bin/modules/Tools.py @@ -395,8 +395,8 @@ class Tools(AbstractModule): Tools module for AIL framework """ - def __init__(self): - super(Tools, self).__init__() + def __init__(self, queue=True): + super(Tools, self).__init__(queue=queue) self.max_execution_time = 30 # Waiting time in seconds between to message processed @@ -426,7 +426,7 @@ class Tools(AbstractModule): print(f'{item.id} found: {tool_name}') # Tag Item msg = f"{tool['tag']};{item.id}" - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # TODO ADD LOGS diff --git a/bin/modules/Urls.py b/bin/modules/Urls.py index 636a00d6..b33f3250 100755 --- a/bin/modules/Urls.py +++ b/bin/modules/Urls.py @@ -22,6 +22,7 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader from lib.objects.Items import Item # # TODO: Faup packages: Add new binding: Check TLD @@ -37,11 +38,13 @@ class Urls(AbstractModule): """ super(Urls, self).__init__() + config_loader = ConfigLoader() + self.faup = Faup() # Protocol file path protocolsfile_path = os.path.join(os.environ['AIL_HOME'], - self.process.config.get("Directories", "protocolsfile")) + config_loader.get_config_str("Directories", "protocolsfile")) # Get all uri from protocolsfile (Used for Curve) uri_scheme = "" with open(protocolsfile_path, 'r') as scheme_file: @@ -78,7 +81,7 @@ class Urls(AbstractModule): to_send = f"{url} {item.get_id()}" print(to_send) - self.send_message_to_queue(to_send, 'Url') + self.add_message_to_queue(to_send, 'Url') self.redis_logger.debug(f"url_parsed: {to_send}") if len(l_urls) > 0: diff --git a/bin/modules/abstract_module.py b/bin/modules/abstract_module.py index 64f454a0..79b7bb0d 100644 --- a/bin/modules/abstract_module.py +++ b/bin/modules/abstract_module.py @@ -7,22 +7,26 @@ Base Class for AIL Modules # Import External packages ################################## from abc import ABC, abstractmethod +import os +import sys import time import traceback +sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## from pubsublogger import publisher -from Helper import Process +from lib.ail_queues import AILQueue from lib import regex_helper +from lib.exceptions import ModuleQueueError class AbstractModule(ABC): """ Abstract Module class """ - def __init__(self, module_name=None, queue_name=None, logger_channel='Script'): + def __init__(self, module_name=None, logger_channel='Script', queue=True): """ Init Module module_name: str; set the module name if different from the instance ClassName @@ -32,8 +36,11 @@ class AbstractModule(ABC): # Module name if provided else instance className self.module_name = module_name if module_name else self._module_name() - # Module name if provided else instance className - self.queue_name = queue_name if queue_name else self._module_name() + self.pid = os.getpid() + + # Setup the I/O queues + if queue: + self.queue = AILQueue(self.module_name, self.pid) # Init Redis Logger self.redis_logger = publisher @@ -46,19 +53,16 @@ class AbstractModule(ABC): # If provided could be a namespaced channel like script: self.redis_logger.channel = logger_channel - #Cache key + # Cache key self.r_cache_key = regex_helper.generate_redis_cache_key(self.module_name) self.max_execution_time = 30 # Run module endlessly self.proceed = True - # Waiting time in secondes between two proccessed messages + # Waiting time in seconds between two processed messages self.pending_seconds = 10 - # Setup the I/O queues - self.process = Process(self.queue_name) - # Debug Mode self.debug = False @@ -68,17 +72,17 @@ class AbstractModule(ABC): Input message can change between modules ex: '' """ - return self.process.get_from_set() + return self.queue.get_message() - def send_message_to_queue(self, message, queue_name=None): + def add_message_to_queue(self, message, queue_name=None): """ - Send message to queue + Add message to queue :param message: message to send in queue :param queue_name: queue or module name - ex: send_to_queue(item_id, 'Global') + ex: add_message_to_queue(item_id, 'Mail') """ - self.process.populate_set_out(message, queue_name) + self.queue.send_message(message, queue_name) # add to new set_module def regex_search(self, regex, obj_id, content): @@ -87,16 +91,16 @@ class AbstractModule(ABC): def regex_finditer(self, regex, obj_id, content): return regex_helper.regex_finditer(self.r_cache_key, regex, obj_id, content, max_time=self.max_execution_time) - def regex_findall(self, regex, id, content, r_set=False): + def regex_findall(self, regex, obj_id, content, r_set=False): """ regex findall helper (force timeout) :param regex: compiled regex - :param id: object id + :param obj_id: object id :param content: object content - - ex: send_to_queue(item_id, 'Global') + :param r_set: return result as set """ - return regex_helper.regex_findall(self.module_name, self.r_cache_key, regex, id, content, max_time=self.max_execution_time, r_set=r_set) + return regex_helper.regex_findall(self.module_name, self.r_cache_key, regex, obj_id, content, + max_time=self.max_execution_time, r_set=r_set) def run(self): """ @@ -114,7 +118,10 @@ class AbstractModule(ABC): self.compute(message) except Exception as err: if self.debug: + self.queue.error() raise err + + # LOG ERROR trace = traceback.format_tb(err.__traceback__) trace = ''.join(trace) self.redis_logger.critical(f"Error in module {self.module_name}: {err}") @@ -125,6 +132,10 @@ class AbstractModule(ABC): print(f'MESSAGE: {message}') print('TRACEBACK:') print(trace) + + if isinstance(err, ModuleQueueError): + self.queue.error() + raise err # remove from set_module ## check if item process == completed @@ -134,14 +145,12 @@ class AbstractModule(ABC): self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s") time.sleep(self.pending_seconds) - def _module_name(self): """ - Returns the instance class name (ie. the Module Name) + Returns the instance class name (ie the Module Name) """ return self.__class__.__name__ - @abstractmethod def compute(self, message): """ @@ -149,7 +158,6 @@ class AbstractModule(ABC): """ pass - def computeNone(self): """ Method of the Module when there is no message diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg deleted file mode 100644 index 8e408f1c..00000000 --- a/bin/packages/modules.cfg +++ /dev/null @@ -1,172 +0,0 @@ -[ZMQModuleImporter] -publish = Redis_Import - -[FeederModuleImporter] -publish = Redis_Import - -#################################################### - -[Mixer] -# subscribe = ZMQ_Global -subscribe = Redis_Import -publish = Redis_Mixer - -[Sync_importer] -publish = Redis_Import,Redis_Tags - -[Importer_Json] -publish = Redis_Import,Redis_Tags - -[Global] -subscribe = Redis_Mixer -publish = Redis_Global,Redis_ModuleStats - -[Duplicates] -subscribe = Redis_Duplicate - -[Indexer] -subscribe = Redis_Global - -[Hosts] -subscribe = Redis_Global -publish = Redis_Host - -[DomClassifier] -subscribe = Redis_Host -publish = Redis_D4_client - -[D4Client] -subscribe = Redis_D4_client - -[Retro_Hunt] -subscribe = Redis -publish = Redis_Tags - -[Tracker_Typo_Squatting] -subscribe = Redis_Host -publish = Redis_Tags - -[Tracker_Term] -subscribe = Redis_Global -publish = Redis_Tags - -[Tracker_Regex] -subscribe = Redis_Global -publish = Redis_Tags - -[Tracker_Yara] -subscribe = Redis_Global -publish = Redis_Tags - -[Tools] -subscribe = Redis_Global -publish = Redis_Tags - -[Telegram] -subscribe = Redis_Global -publish = Redis_Tags - -[Languages] -subscribe = Redis_Global - -[Categ] -subscribe = Redis_Global -publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Urls,Redis_Credential,Redis_Cve,Redis_ApiKey,Redis_SyncModule - -[CreditCards] -subscribe = Redis_CreditCards -publish = Redis_Tags - -[Iban] -subscribe = Redis_Global -publish = Redis_Tags - -[Mail] -subscribe = Redis_Mail -publish = Redis_ModuleStats,Redis_Tags - -[Onion] -subscribe = Redis_Onion -publish = Redis_Tags,Redis_Crawler -#publish = Redis_ValidOnion,ZMQ_FetchedOnion,Redis_Tags,Redis_Crawler - -[Urls] -subscribe = Redis_Urls -publish = Redis_Url -#publish = Redis_Url,ZMQ_Url - -[LibInjection] -subscribe = Redis_Url -publish = Redis_Tags - -[SQLInjectionDetection] -subscribe = Redis_Url -publish = Redis_Tags - -[ModuleStats] -subscribe = Redis_ModuleStats - -[Tags] -subscribe = Redis_Tags -publish = Redis_Tags_feed,Redis_SyncModule - -# dirty fix -[Sync_module] -subscribe = Redis_SyncModule - -[MISP_The_hive_feeder] -subscribe = Redis_Tags_feed - -#[SentimentAnalysis] -#subscribe = Redis_Global - -[Credential] -subscribe = Redis_Credential -publish = Redis_Duplicate,Redis_ModuleStats,Redis_Tags - -[CveModule] -subscribe = Redis_Cve -publish = Redis_Tags - -# Disabled -#[Phone] -#subscribe = Redis_Global -#publish = Redis_Tags - -[Keys] -subscribe = Redis_Global -publish = Redis_PgpDump,Redis_Tags - -[PgpDump] -subscribe = Redis_PgpDump -publish = Redis_Tags - -[ApiKey] -subscribe = Redis_ApiKey -publish = Redis_Tags - -[Decoder] -subscribe = Redis_Global -publish = Redis_Tags - -[Cryptocurrencies] -subscribe = Redis_Global -publish = Redis_Tags - -[submit_paste] -publish = Redis_Import - -[Crawler] -publish = Redis_Import,Redis_Tags - -[IP] -subscribe = Redis_Global -publish = Redis_Tags - -[Zerobins] -subscribe = Redis_Url - -# [My_Module] -# subscribe = Redis_Global -# publish = Redis_Tags - diff --git a/bin/trackers/Retro_Hunt.py b/bin/trackers/Retro_Hunt.py index 434eaae0..a62cf19c 100755 --- a/bin/trackers/Retro_Hunt.py +++ b/bin/trackers/Retro_Hunt.py @@ -19,6 +19,7 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader from lib.objects.Items import Item from packages import Date from lib import Tracker @@ -34,9 +35,10 @@ class Retro_Hunt(AbstractModule): """ def __init__(self): super(Retro_Hunt, self).__init__() + config_loader = ConfigLoader() self.pending_seconds = 5 - self.full_item_url = self.process.config.get("Notifications", "ail_domain") + "/object/item?id=" + self.full_item_url = config_loader.get_config_str("Notifications", "ail_domain") + "/object/item?id=" # reset on each loop self.task_uuid = None @@ -149,7 +151,7 @@ class Retro_Hunt(AbstractModule): # Tags for tag in self.tags: msg = f'{tag};{id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # # Mails # mail_to_notify = Tracker.get_tracker_mails(tracker_uuid) diff --git a/bin/trackers/Tracker_Regex.py b/bin/trackers/Tracker_Regex.py index 66e6514f..11e00f46 100755 --- a/bin/trackers/Tracker_Regex.py +++ b/bin/trackers/Tracker_Regex.py @@ -18,6 +18,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from modules.abstract_module import AbstractModule from lib.objects.Items import Item +from lib.ConfigLoader import ConfigLoader from packages import Term from lib import Tracker @@ -31,9 +32,11 @@ class Tracker_Regex(AbstractModule): def __init__(self): super(Tracker_Regex, self).__init__() + config_loader = ConfigLoader() + self.pending_seconds = 5 - self.max_execution_time = self.process.config.getint(self.module_name, "max_execution_time") + self.max_execution_time = config_loader.get_config_int(self.module_name, "max_execution_time") # refresh Tracked Regex self.dict_regex_tracked = Term.get_regex_tracked_words_dict() @@ -85,7 +88,7 @@ class Tracker_Regex(AbstractModule): for tag in tracker.get_tags(): msg = f'{tag};{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if tracker.mail_export(): # TODO add matches + custom subjects diff --git a/bin/trackers/Tracker_Term.py b/bin/trackers/Tracker_Term.py index f3ee7ec7..b3d09783 100755 --- a/bin/trackers/Tracker_Term.py +++ b/bin/trackers/Tracker_Term.py @@ -20,6 +20,7 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from modules.abstract_module import AbstractModule +from lib.ConfigLoader import ConfigLoader from lib.objects.Items import Item from packages import Term from lib import Tracker @@ -46,9 +47,11 @@ class Tracker_Term(AbstractModule): def __init__(self): super(Tracker_Term, self).__init__() + config_loader = ConfigLoader() + self.pending_seconds = 5 - self.max_execution_time = self.process.config.getint('Tracker_Term', "max_execution_time") + self.max_execution_time = config_loader.get_config_int('Tracker_Term', "max_execution_time") # loads tracked words self.list_tracked_words = Term.get_tracked_words_list() @@ -137,7 +140,7 @@ class Tracker_Term(AbstractModule): # Tags for tag in tracker.get_tags(): msg = f'{tag};{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # Mail if tracker.mail_export(): diff --git a/bin/trackers/Tracker_Typo_Squatting.py b/bin/trackers/Tracker_Typo_Squatting.py index 898b137b..fa7f613b 100755 --- a/bin/trackers/Tracker_Typo_Squatting.py +++ b/bin/trackers/Tracker_Typo_Squatting.py @@ -79,7 +79,7 @@ class Tracker_Typo_Squatting(AbstractModule): for tag in tracker.get_tags(): msg = f'{tag};{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') if tracker.mail_export(): self.exporters['mail'].export(tracker, item) diff --git a/bin/trackers/Tracker_Yara.py b/bin/trackers/Tracker_Yara.py index b85deb55..25138054 100755 --- a/bin/trackers/Tracker_Yara.py +++ b/bin/trackers/Tracker_Yara.py @@ -88,7 +88,7 @@ class Tracker_Yara(AbstractModule): # Tags for tag in tracker.get_tags(): msg = f'{tag};{item_id}' - self.send_message_to_queue(msg, 'Tags') + self.add_message_to_queue(msg, 'Tags') # Mails if tracker.mail_export(): diff --git a/configs/modules.cfg b/configs/modules.cfg new file mode 100644 index 00000000..dc71dac8 --- /dev/null +++ b/configs/modules.cfg @@ -0,0 +1,166 @@ +[ZMQModuleImporter] +publish = Importers + +[FeederModuleImporter] +publish = Importers + +[Importer_Json] +publish = Importers,Tags + +#################################################### + +[Mixer] +subscribe = Importers +publish = SaveObj + +[Sync_importer] +publish = Importers,Tags + +[Global] +subscribe = SaveObj +publish = Item,Sync + +[Duplicates] +subscribe = Duplicate + +[Indexer] +subscribe = Item + +[Hosts] +subscribe = Item +publish = Host + +[DomClassifier] +subscribe = Host +publish = D4_client + +[D4Client] +subscribe = D4_client + +[Retro_Hunt] +publish = Tags + +[Tracker_Typo_Squatting] +subscribe = Host +publish = Tags + +[Tracker_Term] +subscribe = Item +publish = Tags + +[Tracker_Regex] +subscribe = Item +publish = Tags + +[Tracker_Yara] +subscribe = Item +publish = Tags + +[Tools] +subscribe = Item +publish = Tags + +[Telegram] +subscribe = Item +publish = Tags + +[Languages] +subscribe = Item + +[Categ] +subscribe = Item +publish = CreditCards,Mail,Onion,Urls,Credential,Cve,ApiKey + +[CreditCards] +subscribe = CreditCards +publish = Tags + +[Iban] +subscribe = Item +publish = Tags + +[Mail] +subscribe = Mail +publish = Tags +#publish = ModuleStats,Tags + +[Onion] +subscribe = Onion +publish = Tags + +[Urls] +subscribe = Urls +publish = Url + +[LibInjection] +subscribe = Url +publish = Tags + +[SQLInjectionDetection] +subscribe = Url +publish = Tags + +[Tags] +subscribe = Tags +publish = Tag_feed,Sync + +# dirty fix +[Sync_module] +subscribe = Sync + +[MISP_The_hive_feeder] +subscribe = Tag_feed + +#[SentimentAnalysis] +#subscribe = Item + +[Credential] +subscribe = Credential +publish = Duplicate,Tags + +[CveModule] +subscribe = Cve +publish = Tags + +# Disabled +#[Phone] +#subscribe = Item +#publish = Tags + +[Keys] +subscribe = Item +publish = PgpDump,Tags + +[PgpDump] +subscribe = PgpDump +publish = Tags + +[ApiKey] +subscribe = ApiKey +publish = Tags + +[Decoder] +subscribe = Item +publish = Tags + +[Cryptocurrencies] +subscribe = Item +publish = Tags + +[SubmitPaste] +publish = Importers + +[Crawler] +publish = Importers,Tags + +[IP] +subscribe = Item +publish = Tags + +[Zerobins] +subscribe = Url + +# [My_Module_Name] +# subscribe = Global # Queue name +# publish = Tags # Queue name + diff --git a/doc/generate_graph_data.py b/doc/generate_graph_data.py deleted file mode 100755 index 843fe110..00000000 --- a/doc/generate_graph_data.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* - -import os -import argparse - -def main(): - - content = "" - modules = {} - all_modules = [] - curr_module = "" - streamingPub = {} - streamingSub = {} - - path = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') # path to module config file - path2 = os.path.join(os.environ['AIL_HOME'], 'doc/all_modules.txt') # path and name of the output file, this file contain a list off all modules - - - parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information Leak - framework. It create a graph that represent the flow between modules".''', - epilog='Example: ./generate_graph_data.py 0') - - parser.add_argument('type', type=int, default=0, - help='''The graph type (default 0), - 0: module graph, - 1: data graph''', - choices=[0, 1], action='store') - - parser.add_argument('spline', type=str, default="ortho", - help='''The graph splines type, spline:default , ortho: orthogonal''', - choices=["ortho", "spline"], action='store') - - args = parser.parse_args() - - with open(path, 'r') as f: - - # get all modules, subscriber and publisher for each module - for line in f: - if line[0] != '#': - # module name - if line[0] == '[': - curr_name = line.replace('[','').replace(']','').replace('\n', '').replace(' ', '') - all_modules.append(curr_name) - modules[curr_name] = {'sub': [], 'pub': []} - curr_module = curr_name - elif curr_module != "": # searching for sub or pub - # subscriber list - if line.startswith("subscribe"): - curr_subscribers = [w for w in line.replace('\n', '').replace(' ', '').split('=')[1].split(',')] - modules[curr_module]['sub'] = curr_subscribers - for sub in curr_subscribers: - streamingSub[sub] = curr_module - - # publisher list - elif line.startswith("publish"): - curr_publishers = [w for w in line.replace('\n', '').replace(' ', '').split('=')[1].split(',')] - modules[curr_module]['pub'] = curr_publishers - for pub in curr_publishers: - streamingPub[pub] = curr_module - else: - continue - - output_set_graph = set() - with open(path2, 'w') as f2: - for e in all_modules: - f2.write(e+"\n") - - output_text_graph = "" - - # flow between modules - if args.type == 0: - - for module in modules.keys(): - for stream_in in modules[module]['sub']: - if stream_in not in streamingPub.keys(): - output_set_graph.add("\"" + stream_in + "\" [color=darkorange1] ;\n") - output_set_graph.add("\"" + stream_in + "\"" + "->" + module + ";\n") - else: - output_set_graph.add("\"" + streamingPub[stream_in] + "\"" + "->" + module + ";\n") - - for stream_out in modules[module]['pub']: - if stream_out not in streamingSub.keys(): - #output_set_graph.add("\"" + stream_out + "\" [color=darkorange1] ;\n") - output_set_graph.add("\"" + module + "\"" + "->" + stream_out + ";\n") - else: - output_set_graph.add("\"" + module + "\"" + "->" + streamingSub[stream_out] + ";\n") - - # graph head - output_text_graph += "digraph unix {\n" - output_text_graph += "graph [pad=\"0.5\"];\n" - output_text_graph += "size=\"25,25\";\n" - output_text_graph += "splines=" - output_text_graph += args.spline - output_text_graph += ";\n" - output_text_graph += "node [color=lightblue2, style=filled];\n" - - - # flow between data - if args.type == 1: - - for module in modules.keys(): - for stream_in in modules[module]['sub']: - for stream_out in modules[module]['pub']: - - if stream_in not in streamingPub.keys(): - output_set_graph.add("\"" + stream_in + "\" [color=darkorange1] ;\n") - - output_set_graph.add("\"" + stream_in + "\"" + "->" + stream_out + ";\n") - - # graph head - output_text_graph += "digraph unix {\n" - output_text_graph += "graph [pad=\"0.5\"];\n" - output_text_graph += "size=\"25,25\";\n" - output_text_graph += "splines=" - output_text_graph += args.spline - output_text_graph += ";\n" - output_text_graph += "node [color=tan, style=filled];\n" - - - - # create final txt graph - for elem in output_set_graph: - output_text_graph += elem - - output_text_graph += "}" - print(output_text_graph) - -if __name__ == "__main__": - main() diff --git a/doc/generate_modules_data_flow_graph.sh b/doc/generate_modules_data_flow_graph.sh deleted file mode 100755 index 790d461a..00000000 --- a/doc/generate_modules_data_flow_graph.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -python3 $AIL_HOME/doc/generate_graph_data.py 0 ortho | dot -T png -o $AIL_HOME/doc/module-data-flow.png -python3 $AIL_HOME/doc/generate_graph_data.py 1 ortho | dot -T png -o $AIL_HOME/doc/data-flow.png diff --git a/installing_deps.sh b/installing_deps.sh index 77962dc2..db29c8d0 100755 --- a/installing_deps.sh +++ b/installing_deps.sh @@ -130,9 +130,6 @@ cp ${AIL_BIN}/helper/gen_cert/server.key ${AIL_FLASK}/server.key mkdir -p $AIL_HOME/PASTES -#Create the file all_module and update the graph in doc -$AIL_HOME/doc/generate_modules_data_flow_graph.sh - #### DB SETUP #### # init update version diff --git a/var/www/modules/dashboard/Flask_dashboard.py b/var/www/modules/dashboard/Flask_dashboard.py index 45be4f46..923390dd 100644 --- a/var/www/modules/dashboard/Flask_dashboard.py +++ b/var/www/modules/dashboard/Flask_dashboard.py @@ -20,7 +20,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## -from lib import queues_modules +from lib import ail_queues from lib import ail_updates from packages.Date import Date @@ -57,7 +57,7 @@ def event_stream(): def get_queues(): # We may want to put the llen in a pipeline to do only one query. - return queues_modules.get_all_modules_queues_stats() + return ail_queues.get_modules_queues_stats() def get_date_range(date_from, num_day): date = Date(str(date_from[0:4])+str(date_from[4:6]).zfill(2)+str(date_from[6:8]).zfill(2)) diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index b2cc7e39..10db9265 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -346,9 +346,11 @@ function create_queue_table() { // - j=1: queueLength // - j=2: LastProcessedPasteTime // - j=3: Number of the module belonging in the same category - if (parseInt(glob_tabvar.row1[i][2]) > window.threshold_stucked_module && parseInt(glob_tabvar.row1[i][1]) > 2) + if (glob_tabvar.row1[i][3]==="Not Launched") + tr.className += " bg-danger text-white"; + else if (parseInt(glob_tabvar.row1[i][2]) > window.threshold_stucked_module && parseInt(glob_tabvar.row1[i][1]) > 2) tr.className += " table-danger"; - else if (parseInt(glob_tabvar.row1[i][1]) == 0) + else if (parseInt(glob_tabvar.row1[i][1]) === 0) tr.className += " table-disabled"; else tr.className += " table-success";