chg: [AIL queues] rewrite module queues. remove PUBSUB

This commit is contained in:
Terrtia 2023-04-13 14:25:02 +02:00
parent bc73b0ca27
commit 6f9e0c2f66
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
57 changed files with 668 additions and 971 deletions

2
.gitignore vendored
View file

@ -68,4 +68,6 @@ doc/all_modules.txt
# auto generated
doc/module-data-flow.png
doc/data-flow.png
doc/ail_queues.dot
doc/ail_queues.svg
doc/statistics

22
bin/AIL_Init.py Executable file
View file

@ -0,0 +1,22 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
AIL Init
============================
Init DB + Clear Stats
"""
import os
import sys
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import ail_queues
if __name__ == "__main__":
ail_queues.save_queue_digraph()
ail_queues.clear_modules_queues_stats()

View file

@ -1,233 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
Queue helper module
============================
This module subscribe to a Publisher stream and put the received messages
into a Redis-list waiting to be popped later by others scripts.
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
the same Subscriber name in both of them.
"""
import redis
import configparser
import os
import zmq
import time
import datetime
import json
class PubSub(object): ## TODO: remove config, use ConfigLoader by default
def __init__(self):
configfile = os.path.join(os.environ['AIL_HOME'], 'configs/core.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
self.config = configparser.ConfigParser()
self.config.read(configfile)
self.redis_sub = False
self.zmq_sub = False
self.subscribers = None
self.publishers = {'Redis': [], 'ZMQ': []}
def setup_subscribe(self, conn_name):
if self.config.has_section(conn_name):
channel = self.config.get(conn_name, 'channel')
else:
channel = conn_name.split('_')[1]
if conn_name.startswith('Redis'):
self.redis_sub = True
r = redis.StrictRedis(
host=self.config.get('RedisPubSub', 'host'),
port=self.config.get('RedisPubSub', 'port'),
db=self.config.get('RedisPubSub', 'db'),
decode_responses=True)
self.subscribers = r.pubsub(ignore_subscribe_messages=True)
self.subscribers.psubscribe(channel)
elif conn_name.startswith('ZMQ'):
self.zmq_sub = True
context = zmq.Context()
# Get all feeds
self.subscribers = []
addresses = self.config.get(conn_name, 'address')
for address in addresses.split(','):
subscriber = context.socket(zmq.SUB)
subscriber.connect(address)
subscriber.setsockopt_string(zmq.SUBSCRIBE, channel)
self.subscribers.append(subscriber)
def setup_publish(self, conn_name):
if self.config.has_section(conn_name):
channel = self.config.get(conn_name, 'channel')
else:
channel = conn_name.split('_')[1]
if conn_name.startswith('Redis'):
r = redis.StrictRedis(host=self.config.get('RedisPubSub', 'host'),
port=self.config.get('RedisPubSub', 'port'),
db=self.config.get('RedisPubSub', 'db'),
decode_responses=True)
self.publishers['Redis'].append((r, channel))
elif conn_name.startswith('ZMQ'):
context = zmq.Context()
p = context.socket(zmq.PUB)
p.bind(self.config.get(conn_name, 'address'))
self.publishers['ZMQ'].append((p, channel))
def publish(self, message):
m = json.loads(message)
channel_message = m.get('channel')
for p, channel in self.publishers['Redis']:
if channel_message is None or channel_message == channel:
p.publish(channel, ( m['message']) )
for p, channel in self.publishers['ZMQ']:
if channel_message is None or channel_message == channel:
p.send('{} {}'.format(channel, m['message']))
#p.send(b' '.join( [channel, mess] ) )
def subscribe(self):
if self.redis_sub:
for msg in self.subscribers.listen():
if msg.get('data', None) is not None:
yield msg['data']
elif self.zmq_sub:
# Initialize poll set
poller = zmq.Poller()
for subscriber in self.subscribers:
poller.register(subscriber, zmq.POLLIN)
while True:
socks = dict(poller.poll())
for subscriber in self.subscribers:
if subscriber in socks:
message = subscriber.recv()
yield message.split(b' ', 1)[1]
else:
raise Exception('No subscribe function defined')
class Process(object):
def __init__(self, conf_section, module=True):
configfile = os.path.join(os.environ['AIL_HOME'], 'configs/core.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
modulesfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg')
self.config = configparser.ConfigParser()
self.config.read(configfile)
self.modules = configparser.ConfigParser()
self.modules.read(modulesfile)
self.subscriber_name = conf_section
self.pubsub = None
if module:
if self.modules.has_section(conf_section):
self.pubsub = PubSub()
else:
raise Exception('Your process has to listen to at least one feed.')
self.r_temp = redis.StrictRedis(
host=self.config.get('RedisPubSub', 'host'),
port=self.config.get('RedisPubSub', 'port'),
db=self.config.get('RedisPubSub', 'db'),
decode_responses=True)
self.moduleNum = os.getpid()
def populate_set_in(self):
# monoproc
try:
src = self.modules.get(self.subscriber_name, 'subscribe')
except configparser.NoOptionError: #NoSectionError
src = None
if src != 'Redis' and src:
self.pubsub.setup_subscribe(src)
for msg in self.pubsub.subscribe():
in_set = self.subscriber_name + 'in'
self.r_temp.sadd(in_set, msg)
self.r_temp.hset('queues', self.subscriber_name,
int(self.r_temp.scard(in_set)))
else:
print('{} has no subscriber'.format(self.subscriber_name))
def get_from_set(self):
# multiproc
in_set = self.subscriber_name + 'in'
self.r_temp.hset('queues', self.subscriber_name,
int(self.r_temp.scard(in_set)))
message = self.r_temp.spop(in_set)
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
dir_name = os.environ['AIL_HOME']+self.config.get('Directories', 'pastes')
if message is None:
return None
else:
try:
if '.gz' in message:
path = message.split(".")[-2].split("/")[-1]
# find start of path with AIL_HOME
index_s = message.find(os.environ['AIL_HOME'])
# Stop when .gz
index_e = message.find(".gz")+3
if(index_s == -1):
complete_path = message[0:index_e]
else:
complete_path = message[index_s:index_e]
else:
path = "-"
complete_path = "?"
value = str(timestamp) + ", " + path
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value)
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path)
self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum))
curr_date = datetime.date.today()
return message
except:
print('except')
path = "?"
value = str(timestamp) + ", " + path
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value)
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", "?")
self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum))
return message
def populate_set_out(self, msg, channel=None):
# multiproc
msg = {'message': msg}
if channel is not None:
msg.update({'channel': channel})
# bytes64 encode bytes to ascii only bytes
j = json.dumps(msg)
self.r_temp.sadd(self.subscriber_name + 'out', j)
def publish(self):
# monoproc
if not self.modules.has_option(self.subscriber_name, 'publish'):
return False
dest = self.modules.get(self.subscriber_name, 'publish')
# We can have multiple publisher
for name in dest.split(','):
self.pubsub.setup_publish(name)
while True:
message = self.r_temp.spop(self.subscriber_name + 'out')
if message is None:
time.sleep(1)
continue
self.pubsub.publish(message)

View file

@ -35,7 +35,6 @@ isredis=`screen -ls | egrep '[0-9]+.Redis_AIL' | cut -d. -f1`
isardb=`screen -ls | egrep '[0-9]+.ARDB_AIL' | cut -d. -f1`
iskvrocks=`screen -ls | egrep '[0-9]+.KVROCKS_AIL' | cut -d. -f1`
islogged=`screen -ls | egrep '[0-9]+.Logging_AIL' | cut -d. -f1`
isqueued=`screen -ls | egrep '[0-9]+.Queue_AIL' | cut -d. -f1`
is_ail_core=`screen -ls | egrep '[0-9]+.Core_AIL' | cut -d. -f1`
is_ail_2_ail=`screen -ls | egrep '[0-9]+.AIL_2_AIL' | cut -d. -f1`
isscripted=`screen -ls | egrep '[0-9]+.Script_AIL' | cut -d. -f1`
@ -152,14 +151,6 @@ function launching_logs {
screen -S "Logging_AIL" -X screen -t "LogCrawler" bash -c "cd ${AIL_BIN}; ${AIL_VENV}/bin/log_subscriber -p 6380 -c Crawler -l ../logs/ ${syslog_cmd}; read x"
}
function launching_queues {
screen -dmS "Queue_AIL"
sleep 0.1
echo -e $GREEN"\t* Launching all the queues"$DEFAULT
screen -S "Queue_AIL" -X screen -t "Queues" bash -c "cd ${AIL_BIN}; ${ENV_PY} launch_queues.py; read x"
}
function checking_configuration {
bin_dir=${AIL_HOME}/bin
echo -e "\t* Checking configuration"
@ -185,13 +176,17 @@ function launching_scripts {
# sleep 0.1
echo -e $GREEN"\t* Launching core scripts ..."$DEFAULT
# TODO: IMPORTER SCREEN ????
# Clear Queue Stats
pushd ${AIL_BIN}
${ENV_PY} ./AIL_Init.py
popd
# TODO: IMPORTER SCREEN ????
#### SYNC ####
screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x"
sleep 0.1
screen -S "Script_AIL" -X screen -t "ail_2_ail_server" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./ail_2_ail_server.py; read x"
sleep 0.1
screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x"
sleep 0.1
screen -S "Script_AIL" -X screen -t "Sync_manager" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_manager.py; read x"
sleep 0.1
##-- SYNC --##
@ -225,7 +220,7 @@ function launching_scripts {
sleep 0.1
screen -S "Script_AIL" -X screen -t "Tags" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./Tags.py; read x"
sleep 0.1
screen -S "Script_AIL" -X screen -t "SubmitPaste" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./submit_paste.py; read x"
screen -S "Script_AIL" -X screen -t "SubmitPaste" bash -c "cd ${AIL_BIN}/modules; ${ENV_PY} ./SubmitPaste.py; read x"
sleep 0.1
screen -S "Script_AIL" -X screen -t "Crawler" bash -c "cd ${AIL_BIN}/crawlers; ${ENV_PY} ./Crawler.py; read x"
@ -448,14 +443,6 @@ function launch_logs {
fi
}
function launch_queues {
if [[ ! $isqueued ]]; then
launching_queues;
else
echo -e $RED"\t* A screen is already launched"$DEFAULT
fi
}
function launch_scripts {
if [[ ! $isscripted ]]; then ############################# is core
sleep 1
@ -505,19 +492,19 @@ function launch_feeder {
}
function killscript {
if [[ $islogged || $isqueued || $is_ail_core || $isscripted || $isflasked || $isfeeded || $is_ail_2_ail ]]; then
if [[ $islogged || $is_ail_core || $isscripted || $isflasked || $isfeeded || $is_ail_2_ail ]]; then
echo -e $GREEN"Killing Script"$DEFAULT
kill $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail
kill $islogged $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail
sleep 0.2
echo -e $ROSE`screen -ls`$DEFAULT
echo -e $GREEN"\t* $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail killed."$DEFAULT
echo -e $GREEN"\t* $islogged $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail killed."$DEFAULT
else
echo -e $RED"\t* No script to kill"$DEFAULT
fi
}
function killall {
if [[ $isredis || $isardb || $iskvrocks || $islogged || $isqueued || $is_ail_2_ail || $isscripted || $isflasked || $isfeeded || $is_ail_core || $is_ail_2_ail ]]; then
if [[ $isredis || $isardb || $iskvrocks || $islogged || $is_ail_2_ail || $isscripted || $isflasked || $isfeeded || $is_ail_core || $is_ail_2_ail ]]; then
if [[ $isredis ]]; then
echo -e $GREEN"Gracefully closing redis servers"$DEFAULT
shutting_down_redis;
@ -532,10 +519,10 @@ function killall {
shutting_down_kvrocks;
fi
echo -e $GREEN"Killing all"$DEFAULT
kill $isredis $isardb $iskvrocks $islogged $isqueued $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail
kill $isredis $isardb $iskvrocks $islogged $is_ail_core $isscripted $isflasked $isfeeded $is_ail_2_ail
sleep 0.2
echo -e $ROSE`screen -ls`$DEFAULT
echo -e $GREEN"\t* $isredis $isardb $iskvrocks $islogged $isqueued $isscripted $is_ail_2_ail $isflasked $isfeeded $is_ail_core killed."$DEFAULT
echo -e $GREEN"\t* $isredis $isardb $iskvrocks $islogged $isscripted $is_ail_2_ail $isflasked $isfeeded $is_ail_core killed."$DEFAULT
else
echo -e $RED"\t* No screen to kill"$DEFAULT
fi
@ -612,14 +599,13 @@ function launch_all {
launch_redis;
launch_kvrocks;
launch_logs;
launch_queues;
launch_scripts;
launch_flask;
}
function menu_display {
options=("Redis" "Ardb" "Kvrocks" "Logs" "Queues" "Scripts" "Flask" "Killall" "Update" "Update-config" "Update-thirdparty")
options=("Redis" "Ardb" "Kvrocks" "Logs" "Scripts" "Flask" "Killall" "Update" "Update-config" "Update-thirdparty")
menu() {
echo "What do you want to Launch?:"
@ -656,9 +642,6 @@ function menu_display {
Logs)
launch_logs;
;;
Queues)
launch_queues;
;;
Scripts)
launch_scripts;
;;

View file

@ -1,60 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
'''
The preProcess Module
=====================
This module is just an example of how we can pre-process a feed coming from the Mixer
module before seeding it to the Global module.
'''
import time
from pubsublogger import publisher
from Helper import Process
def do_something(message):
splitted = message.split()
if len(splitted) == 2:
item_id, gzip64encoded = splitted
item_id = item_id.replace("pastebin", "pastebinPROCESSED")
to_send = "{0} {1}".format(item_id, gzip64encoded)
return to_send
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
publisher.port = 6380
# Script is the default channel used for the modules.
publisher.channel = 'Script'
# Section name in bin/packages/modules.cfg
config_section = 'PreProcessFeed'
# Setup the I/O queues
p = Process(config_section)
# Sent to the logging a description of the module
publisher.info("<description of the module>")
# Endless loop getting messages from the input queue
while True:
# Get one message from the input queue
message = p.get_from_set()
if message is None:
publisher.debug("{} queue is empty, waiting".format(config_section))
print("queue empty")
time.sleep(1)
continue
# Do something with the message from the queue
new_message = do_something(message)
# (Optional) Send that thing to the next queue
p.populate_set_out(new_message)

View file

@ -1,24 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
from pubsublogger import publisher
from Helper import Process
import argparse
def run(config_section):
p = Process(config_section)
p.populate_set_in()
if __name__ == '__main__':
publisher.port = 6380
publisher.channel = 'Queuing'
parser = argparse.ArgumentParser(description='Entry queue for a module.')
parser.add_argument("-c", "--config_section", type=str,
help="Config section to use in the config file.")
args = parser.parse_args()
run(args.config_section)

View file

@ -1,24 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
from pubsublogger import publisher
from Helper import Process
import argparse
def run(config_section):
p = Process(config_section)
if not p.publish():
print(config_section, 'has no publisher.')
if __name__ == '__main__':
publisher.port = 6380
publisher.channel = 'Queuing'
parser = argparse.ArgumentParser(description='Entry queue for a module.')
parser.add_argument("-c", "--config_section", type=str,
help="Config section to use in the config file.")
args = parser.parse_args()
run(args.config_section)

View file

View file

@ -79,7 +79,7 @@ class Sync_importer(AbstractModule):
message = f'sync {item_id} {b64_gzip_content}'
print(item_id)
self.send_message_to_queue(message, 'Mixer')
self.add_message_to_queue(message, 'Importers')
if __name__ == '__main__':

View file

@ -232,11 +232,11 @@ class Crawler(AbstractModule):
gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html'])
# send item to Global
relay_message = f'crawler {item_id} {gzip64encoded}'
self.send_message_to_queue(relay_message, 'Import')
self.add_message_to_queue(relay_message, 'Importers')
# Tag
msg = f'infoleak:submission="crawler";{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
crawlers.create_item_metadata(item_id, last_url, parent_id)
if self.root_item is None:

View file

@ -113,7 +113,7 @@ class FeederModuleImporter(AbstractModule):
# TODO HANDLE Invalid JSON
json_data = json.loads(message)
relay_message = self.importer.importer(json_data)
self.send_message_to_queue(relay_message)
self.add_message_to_queue(relay_message)
# Launch Importer

View file

@ -73,7 +73,7 @@ class ZMQModuleImporter(AbstractModule):
for message in messages:
message = message.decode()
print(message.split(' ', 1)[0])
self.send_message_to_queue(message)
self.add_message_to_queue(message)
if __name__ == '__main__':

View file

@ -1,66 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import configparser
import os
import subprocess
import time
def check_pid(pid):
if pid is None:
# Already seen as finished.
return None
else:
if pid.poll() is not None:
return False
return True
if __name__ == '__main__':
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
config = configparser.ConfigParser()
config.read(configfile)
modules = config.sections()
pids = {}
for module in modules:
pin = subprocess.Popen(["python3", './QueueIn.py', '-c', module])
pout = subprocess.Popen(["python3", './QueueOut.py', '-c', module])
pids[module] = (pin, pout)
is_running = True
try:
while is_running:
time.sleep(5)
is_running = False
for module, p in pids.items():
pin, pout = p
if pin is None:
# already dead
pass
elif not check_pid(pin):
print(module, 'input queue finished.')
pin = None
else:
is_running = True
if pout is None:
# already dead
pass
elif not check_pid(pout):
print(module, 'output queue finished.')
pout = None
else:
is_running = True
pids[module] = (pin, pout)
except KeyboardInterrupt:
for module, p in pids.items():
pin, pout = p
if pin is not None:
pin.kill()
if pout is not None:
pout.kill()

View file

@ -56,6 +56,9 @@ class ConfigLoader(object):
directory_path = os.path.join(os.environ['AIL_HOME'], directory_path)
return directory_path
def get_config_sections(self):
return self.cfg.sections()
def get_config_str(self, section, key_name):
return self.cfg.get(section, key_name)

282
bin/lib/ail_queues.py Executable file
View file

@ -0,0 +1,282 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import os
import sys
import datetime
import time
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib.exceptions import ModuleQueueError
from lib.ConfigLoader import ConfigLoader
config_loader = ConfigLoader()
r_queues = config_loader.get_redis_conn("Redis_Queues")
config_loader = None
MODULES_FILE = os.path.join(os.environ['AIL_HOME'], 'configs', 'modules.cfg')
class AILQueue:
def __init__(self, module_name, module_pid):
self.name = module_name
self.pid = module_pid
self._set_subscriber()
# Update queue stat
r_queues.hset('queues', self.name, self.get_nb_messages())
r_queues.hset(f'modules', f'{self.pid}:{self.name}', -1)
def _set_subscriber(self):
subscribers = {}
module_config_loader = ConfigLoader(config_file=MODULES_FILE) # TODO CHECK IF FILE EXISTS
if not module_config_loader.has_section(self.name):
raise ModuleQueueError(f'No Section defined for this module: {self.name}. Please add one in configs/module.cfg')
if module_config_loader.has_option(self.name, 'publish'):
subscribers_queues = module_config_loader.get_config_str(self.name, 'publish')
if subscribers_queues:
subscribers_queues = set(subscribers_queues.split(','))
for queue_name in subscribers_queues:
subscribers[queue_name] = set()
if subscribers_queues:
for module in module_config_loader.get_config_sections():
if module_config_loader.has_option(module, 'subscribe'):
queue_name = module_config_loader.get_config_str(module, 'subscribe')
if queue_name in subscribers:
subscribers[queue_name].add(module)
self.subscribers_modules = subscribers
def get_nb_messages(self):
return r_queues.llen(f'queue:{self.name}:in')
def get_message(self):
# Update queues stats
r_queues.hset('queues', self.name, self.get_nb_messages())
r_queues.hset(f'modules', f'{self.pid}:{self.name}', int(time.time()))
# Get Message
message = r_queues.lpop(f'queue:{self.name}:in')
if not message:
return None
else:
# TODO SAVE CURRENT ITEMS (OLD Module information)
return message
def send_message(self, message, queue_name=None):
if not self.subscribers_modules:
raise ModuleQueueError('This Module don\'t have any subscriber')
if queue_name:
if queue_name not in self.subscribers_modules:
raise ModuleQueueError(f'send_message: Unknown queue_name {queue_name}')
else:
if len(self.subscribers_modules) > 1:
raise ModuleQueueError('Queue name required. This module push to multiple queues')
queue_name = list(self.subscribers_modules)[0]
# Add message to all modules
for module_name in self.subscribers_modules[queue_name]:
r_queues.rpush(f'queue:{module_name}:in', message)
# stats
nb_mess = r_queues.llen(f'queue:{module_name}:in')
r_queues.hset('queues', module_name, nb_mess)
# TODO
def refresh(self):
# TODO check cache
self._set_subscriber()
def clear(self):
r_queues.delete(f'queue:{self.name}:in')
def error(self):
r_queues.hdel(f'modules', f'{self.pid}:{self.name}')
def get_queues_modules():
return r_queues.hkeys('queues')
def get_nb_queues_modules():
return r_queues.hgetall('queues')
def get_nb_sorted_queues_modules():
res = r_queues.hgetall('queues')
res = sorted(res.items())
return res
def get_modules_pid_last_mess():
return r_queues.hgetall('modules')
def get_modules_queues_stats():
modules_queues_stats = []
nb_queues_modules = get_nb_queues_modules()
modules_pid_last_mess = get_modules_pid_last_mess()
added_modules = set()
for row_module in modules_pid_last_mess:
pid, module = row_module.split(':', 1)
last_time = modules_pid_last_mess[row_module]
last_time = datetime.datetime.fromtimestamp(int(last_time))
seconds = int((datetime.datetime.now() - last_time).total_seconds())
modules_queues_stats.append((module, nb_queues_modules[module], seconds, pid))
added_modules.add(module)
for module in nb_queues_modules:
if module not in added_modules:
modules_queues_stats.append((module, nb_queues_modules[module], -1, 'Not Launched'))
return sorted(modules_queues_stats)
def clear_modules_queues_stats():
r_queues.delete('modules')
def get_queue_digraph():
queues_ail = {}
modules = {}
module_config_loader = ConfigLoader(config_file=MODULES_FILE)
for module in module_config_loader.get_config_sections():
if module_config_loader.has_option(module, 'subscribe'):
if module not in modules:
modules[module] = {'in': set(), 'out': set()}
queue = module_config_loader.get_config_str(module, 'subscribe')
modules[module]['in'].add(queue)
if queue not in queues_ail:
queues_ail[queue] = []
queues_ail[queue].append(module)
if module_config_loader.has_option(module, 'publish'):
if module not in modules:
modules[module] = {'in': set(), 'out': set()}
queues = module_config_loader.get_config_str(module, 'publish')
for queue in queues.split(','):
modules[module]['out'].add(queue)
# print(modules)
# print(queues_ail)
mapped = set()
import_modules = set()
edges = '# Define edges between nodes\n'
for module in modules:
for queue_name in modules[module]['out']:
if queue_name == 'Importers':
import_modules.add(module)
if queue_name in queues_ail:
for module2 in queues_ail[queue_name]:
to_break = False
new_edge = None
cluster_out = f'cluster_{queue_name.lower()}'
queue_in = modules[module]['in']
if queue_in:
queue_in = next(iter(queue_in))
if len(queues_ail.get(queue_in, [])) == 1:
cluster_in = f'cluster_{queue_in.lower()}'
new_edge = f'{module} -> {module2} [ltail="{cluster_in}" lhead="{cluster_out}"];\n'
to_break = True
if not new_edge:
new_edge = f'{module} -> {module2} [lhead="{cluster_out}"];\n'
to_map = f'{module}:{cluster_out}'
if to_map not in mapped:
mapped.add(to_map)
edges = f'{edges}{new_edge}'
if to_break:
break
subgraph = '# Define subgraphs for each queue\n'
for queue_name in queues_ail:
cluster_name = f'cluster_{queue_name.lower()}'
subgraph = f'{subgraph} subgraph {cluster_name} {{\n label="Queue {queue_name}";\n color=blue;\n'
for module in queues_ail[queue_name]:
subgraph = f'{subgraph} {module};\n'
subgraph = f'{subgraph}}}\n\n'
cluster_name = f'cluster_importers'
subgraph = f'{subgraph} subgraph {cluster_name} {{\n label="AIL Importers";\n color=red;\n'
for module in import_modules:
subgraph = f'{subgraph} {module};\n'
subgraph = f'{subgraph}}}\n\n'
digraph = 'digraph Modules {\ngraph [rankdir=LR splines=ortho];\nnode [shape=rectangle]\ncompound=true;\n'
digraph = f'{digraph}edge[arrowhead=open color=salmon]\n\n'
digraph = f'{digraph}{subgraph}{edges}\n}}\n'
return digraph
def save_queue_digraph():
import subprocess
digraph = get_queue_digraph()
dot_file = os.path.join(os.environ['AIL_HOME'], 'doc/ail_queues.dot')
svg_file = os.path.join(os.environ['AIL_HOME'], 'doc/ail_queues.svg')
with open(dot_file, 'w') as f:
f.write(digraph)
print('dot', '-Tsvg', dot_file, '-o', svg_file)
process = subprocess.run(['dot', '-Tsvg', dot_file, '-o', svg_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if process.returncode == 0:
# modified_files = process.stdout
# print(process.stdout)
return True
else:
print(process.stderr.decode())
sys.exit(1)
###########################################################################################
###########################################################################################
###########################################################################################
###########################################################################################
###########################################################################################
# def get_all_queues_name():
# return r_queues.hkeys('queues')
#
# def get_all_queues_dict_with_nb_elem():
# return r_queues.hgetall('queues')
#
# def get_all_queues_with_sorted_nb_elem():
# res = r_queues.hgetall('queues')
# res = sorted(res.items())
# return res
#
# def get_module_pid_by_queue_name(queue_name):
# return r_queues.smembers('MODULE_TYPE_{}'.format(queue_name))
#
# # # TODO: remove last msg part
# def get_module_last_process_start_time(queue_name, module_pid):
# res = r_queues.get('MODULE_{}_{}'.format(queue_name, module_pid))
# if res:
# return res.split(',')[0]
# return None
#
# def get_module_last_msg(queue_name, module_pid):
# return r_queues.get('MODULE_{}_{}_PATH'.format(queue_name, module_pid))
#
# def get_all_modules_queues_stats():
# all_modules_queues_stats = []
# for queue_name, nb_elem_queue in get_all_queues_with_sorted_nb_elem():
# l_module_pid = get_module_pid_by_queue_name(queue_name)
# for module_pid in l_module_pid:
# last_process_start_time = get_module_last_process_start_time(queue_name, module_pid)
# if last_process_start_time:
# last_process_start_time = datetime.datetime.fromtimestamp(int(last_process_start_time))
# seconds = int((datetime.datetime.now() - last_process_start_time).total_seconds())
# else:
# seconds = 0
# all_modules_queues_stats.append((queue_name, nb_elem_queue, seconds, module_pid))
# return all_modules_queues_stats
#
#
# def _get_all_messages_from_queue(queue_name):
# #self.r_temp.hset('queues', self.subscriber_name, int(self.r_temp.scard(in_set)))
# return r_queues.smembers(f'queue:{queue_name}:in')
#
# # def is_message_in queue(queue_name):
# # pass
#
# def remove_message_from_queue(queue_name, message):
# queue_key = f'queue:{queue_name}:in'
# r_queues.srem(queue_key, message)
# r_queues.hset('queues', queue_name, int(r_queues.scard(queue_key)))
if __name__ == '__main__':
# clear_modules_queues_stats()
save_queue_digraph()

View file

@ -13,3 +13,6 @@ class UpdateInvestigationError(AIL_ERROR):
class NewTagError(AIL_ERROR):
pass
class ModuleQueueError(AIL_ERROR):
pass

View file

@ -36,10 +36,10 @@ r_key = regex_helper.generate_redis_cache_key('extractor')
# TODO UI Link
MODULES = {
'infoleak:automatic-detection="credit-card"': CreditCards(),
'infoleak:automatic-detection="iban"': Iban(),
'infoleak:automatic-detection="mail"': Mail(),
'infoleak:automatic-detection="onion"': Onion(),
'infoleak:automatic-detection="credit-card"': CreditCards(queue=False),
'infoleak:automatic-detection="iban"': Iban(queue=False),
'infoleak:automatic-detection="mail"': Mail(queue=False),
'infoleak:automatic-detection="onion"': Onion(queue=False),
# APIkey ???
# Credentials
# Zerobins
@ -47,7 +47,7 @@ MODULES = {
# SQL Injetction / Libinjection ???
}
tools = Tools()
tools = Tools(queue=False)
for tool_name in tools.get_tools():
MODULES[f'infoleak:automatic-detection="{tool_name}-tool"'] = tools

View file

@ -1,78 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import os
import sys
import datetime
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import ConfigLoader
config_loader = ConfigLoader.ConfigLoader()
r_serv_queues = config_loader.get_redis_conn("Redis_Queues")
config_loader = None
def get_all_queues_name():
return r_serv_queues.hkeys('queues')
def get_all_queues_dict_with_nb_elem():
return r_serv_queues.hgetall('queues')
def get_all_queues_with_sorted_nb_elem():
res = r_serv_queues.hgetall('queues')
res = sorted(res.items())
return res
def get_module_pid_by_queue_name(queue_name):
return r_serv_queues.smembers('MODULE_TYPE_{}'.format(queue_name))
# # TODO: remove last msg part
def get_module_last_process_start_time(queue_name, module_pid):
res = r_serv_queues.get('MODULE_{}_{}'.format(queue_name, module_pid))
if res:
return res.split(',')[0]
return None
def get_module_last_msg(queue_name, module_pid):
return r_serv_queues.get('MODULE_{}_{}_PATH'.format(queue_name, module_pid))
def get_all_modules_queues_stats():
all_modules_queues_stats = []
for queue_name, nb_elem_queue in get_all_queues_with_sorted_nb_elem():
l_module_pid = get_module_pid_by_queue_name(queue_name)
for module_pid in l_module_pid:
last_process_start_time = get_module_last_process_start_time(queue_name, module_pid)
if last_process_start_time:
last_process_start_time = datetime.datetime.fromtimestamp(int(last_process_start_time))
seconds = int((datetime.datetime.now() - last_process_start_time).total_seconds())
else:
seconds = 0
all_modules_queues_stats.append((queue_name, nb_elem_queue, seconds, module_pid))
return all_modules_queues_stats
def _get_all_messages_from_queue(queue_name):
queue_in = f'{queue_name}in'
#self.r_temp.hset('queues', self.subscriber_name, int(self.r_temp.scard(in_set)))
return r_serv_queues.smembers(queue_in)
# def is_message_in queue(queue_name):
# pass
def remove_message_from_queue(queue_name, message, out=False):
if out:
queue_key = f'{queue_name}out'
else:
queue_key = f'{queue_name}in'
r_serv_queues.srem(queue_in, message)
if not out:
r_serv_queues.hset('queues', queue_name, int(r_serv_queues.scard(queue_key)) )
if __name__ == '__main__':
print(get_all_queues_with_sorted_nb_elem())
queue_name = 'Tags'
res = _get_all_messages_from_queue(queue_name)
print(res)

View file

@ -64,7 +64,7 @@ class ApiKey(AbstractModule):
self.redis_logger.warning(f'{to_print}Checked {len(google_api_key)} found Google API Key;{item.get_id()}')
msg = f'infoleak:automatic-detection="google-api-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# # TODO: # FIXME: AWS regex/validate/sanitize KEY + SECRET KEY
if aws_access_key:
@ -75,11 +75,11 @@ class ApiKey(AbstractModule):
self.redis_logger.warning(f'{to_print}Checked {len(aws_secret_key)} found AWS secret Key;{item.get_id()}')
msg = 'infoleak:automatic-detection="aws-key";{}'.format(item.get_id())
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# Tags
msg = f'infoleak:automatic-detection="api-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if r_result:
return google_api_key, aws_access_key, aws_secret_key

View file

@ -43,6 +43,7 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
@ -59,8 +60,10 @@ class Categ(AbstractModule):
self.categ_files_dir = categ_files_dir
config_loader = ConfigLoader()
# default = 1 string
self.matchingThreshold = self.process.config.getint("Categ", "matchingThreshold")
self.matchingThreshold = config_loader.get_config_int("Categ", "matchingThreshold")
self.reload_categ_words()
self.redis_logger.info("Script Categ started")
@ -95,20 +98,13 @@ class Categ(AbstractModule):
# Export message to categ queue
print(msg, categ)
self.send_message_to_queue(msg, categ)
self.add_message_to_queue(msg, categ)
self.redis_logger.info(
f'Categ;{item.get_source()};{item.get_date()};{item.get_basename()};Detected {lenfound} as {categ};{item.get_id()}')
if r_result:
return categ_found
# DIRTY FIX AIL SYNC
# # FIXME: DIRTY FIX
message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}'
print(message)
self.send_message_to_queue(message, 'SyncModule')
if __name__ == '__main__':
# SCRIPT PARSER #

View file

@ -112,7 +112,7 @@ class Credential(AbstractModule):
self.redis_logger.warning(to_print)
msg = f'infoleak:automatic-detection="credential";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
site_occurrence = self.regex_findall(self.regex_site_for_stats, item.get_id(), item_content)
@ -138,11 +138,11 @@ class Credential(AbstractModule):
else:
creds_sites[domain] = 1
for site, num in creds_sites.items(): # Send for each different site to moduleStats
mssg = f'credential;{num};{site};{item.get_date()}'
print(mssg)
self.send_message_to_queue(mssg, 'ModuleStats')
# for site, num in creds_sites.items(): # Send for each different site to moduleStats
#
# mssg = f'credential;{num};{site};{item.get_date()}'
# print(mssg)
# self.add_message_to_queue(mssg, 'ModuleStats')
if all_sites:
discovered_sites = ', '.join(all_sites)

View file

@ -31,8 +31,8 @@ class CreditCards(AbstractModule):
CreditCards module for AIL framework
"""
def __init__(self):
super(CreditCards, self).__init__()
def __init__(self, queue=True):
super(CreditCards, self).__init__(queue=queue)
# Source: http://www.richardsramblings.com/regex/credit-card-numbers/
cards = [
@ -90,7 +90,7 @@ class CreditCards(AbstractModule):
self.redis_logger.warning(mess)
msg = f'infoleak:automatic-detection="credit-card";{item.id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if r_result:
return creditcard_set

View file

@ -135,13 +135,13 @@ class Cryptocurrencies(AbstractModule, ABC):
# Check private key
if is_valid_address:
msg = f'{currency["tag"]};{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if currency.get('private_key'):
private_keys = self.regex_findall(currency['private_key']['regex'], item_id, content)
if private_keys:
msg = f'{currency["private_key"]["tag"]};{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# debug
print(private_keys)

View file

@ -63,7 +63,7 @@ class CveModule(AbstractModule):
msg = f'infoleak:automatic-detection="cve";{item_id}'
# Send to Tags Queue
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if __name__ == '__main__':

View file

@ -123,7 +123,7 @@ class Decoder(AbstractModule):
# Send to Tags
msg = f'infoleak:automatic-detection="{dname}";{item.id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if __name__ == '__main__':

View file

@ -23,6 +23,7 @@ sys.path.append(os.environ['AIL_BIN'])
##################################
from modules.abstract_module import AbstractModule
from lib.objects.Items import Item
from lib.ConfigLoader import ConfigLoader
from lib import d4
@ -34,15 +35,17 @@ class DomClassifier(AbstractModule):
def __init__(self):
super(DomClassifier, self).__init__()
config_loader = ConfigLoader()
# Waiting time in seconds between to message processed
self.pending_seconds = 1
addr_dns = self.process.config.get("DomClassifier", "dns")
addr_dns = config_loader.get_config_str("DomClassifier", "dns")
self.c = DomainClassifier.domainclassifier.Extract(rawtext="", nameservers=[addr_dns])
self.cc = self.process.config.get("DomClassifier", "cc")
self.cc_tld = self.process.config.get("DomClassifier", "cc_tld")
self.cc = config_loader.get_config_str("DomClassifier", "cc")
self.cc_tld = config_loader.get_config_str("DomClassifier", "cc_tld")
# Send module state to logs
self.redis_logger.info(f"Module: {self.module_name} Launched")
@ -66,7 +69,7 @@ class DomClassifier(AbstractModule):
if self.c.vdomain and d4.is_passive_dns_enabled():
for dns_record in self.c.vdomain:
self.send_message_to_queue(dns_record)
self.add_message_to_queue(dns_record)
localizeddomains = self.c.include(expression=self.cc_tld)
if localizeddomains:

View file

@ -39,8 +39,10 @@ sys.path.append(os.environ['AIL_BIN'])
##################################
from modules.abstract_module import AbstractModule
from lib.ail_core import get_ail_uuid
from lib.ConfigLoader import ConfigLoader
from lib.data_retention_engine import update_obj_date
from lib import item_basic
from lib.objects.Items import Item
# from lib import Statistics
class Global(AbstractModule):
@ -54,11 +56,12 @@ class Global(AbstractModule):
self.processed_item = 0
self.time_last_stats = time.time()
config_loader = ConfigLoader()
# Get and sanitize ITEM DIRECTORY
# # TODO: rename PASTE => ITEM
self.PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], self.process.config.get("Directories", "pastes"))
self.PASTES_FOLDERS = self.PASTES_FOLDER + '/'
self.PASTES_FOLDERS = os.path.join(os.path.realpath(self.PASTES_FOLDERS), '')
self.ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/'
self.ITEMS_FOLDER = os.path.join(os.path.realpath(self.ITEMS_FOLDER), '')
# Waiting time in seconds between to message processed
self.pending_seconds = 0.5
@ -85,9 +88,9 @@ class Global(AbstractModule):
if len(splitted) == 2:
item, gzip64encoded = splitted
# Remove PASTES_FOLDER from item path (crawled item + submitted)
if self.PASTES_FOLDERS in item:
item = item.replace(self.PASTES_FOLDERS, '', 1)
# Remove ITEMS_FOLDER from item path (crawled item + submitted)
if self.ITEMS_FOLDER in item:
item = item.replace(self.ITEMS_FOLDER, '', 1)
file_name_item = item.split('/')[-1]
if len(file_name_item) > 255:
@ -95,11 +98,11 @@ class Global(AbstractModule):
item = self.rreplace(item, file_name_item, new_file_name_item, 1)
# Creating the full filepath
filename = os.path.join(self.PASTES_FOLDER, item)
filename = os.path.join(self.ITEMS_FOLDER, item)
filename = os.path.realpath(filename)
# Incorrect filename
if not os.path.commonprefix([filename, self.PASTES_FOLDER]) == self.PASTES_FOLDER:
if not os.path.commonprefix([filename, self.ITEMS_FOLDER]) == self.ITEMS_FOLDER:
self.redis_logger.warning(f'Global; Path traversal detected {filename}')
print(f'Global; Path traversal detected {filename}')
@ -121,14 +124,23 @@ class Global(AbstractModule):
f.write(decoded)
item_id = filename
# remove self.PASTES_FOLDER from
if self.PASTES_FOLDERS in item_id:
item_id = item_id.replace(self.PASTES_FOLDERS, '', 1)
# remove self.ITEMS_FOLDER from
if self.ITEMS_FOLDER in item_id:
item_id = item_id.replace(self.ITEMS_FOLDER, '', 1)
update_obj_date(item_basic.get_item_date(item_id), 'item')
item = Item(item_id)
self.send_message_to_queue(item_id)
update_obj_date(item.get_date(), 'item')
self.add_message_to_queue(item_id, 'Item')
self.processed_item += 1
# DIRTY FIX AIL SYNC - SEND TO SYNC MODULE
# # FIXME: DIRTY FIX
message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}'
print(message)
self.add_message_to_queue(message, 'Sync')
print(item_id)
if r_result:
return item_id

View file

@ -62,7 +62,7 @@ class Hosts(AbstractModule):
# print(host)
msg = f'{host} {item.get_id()}'
self.send_message_to_queue(msg, 'Host')
self.add_message_to_queue(msg, 'Host')
if __name__ == '__main__':

View file

@ -37,8 +37,8 @@ class Iban(AbstractModule):
enumerate(string.ascii_lowercase, 10))
LETTERS_IBAN = {ord(d): str(i) for i, d in _LETTERS_IBAN}
def __init__(self):
super(Iban, self).__init__()
def __init__(self, queue=True):
super(Iban, self).__init__(queue=queue)
# Waiting time in secondes between to message proccessed
self.pending_seconds = 10
@ -98,7 +98,7 @@ class Iban(AbstractModule):
self.redis_logger.warning(f'{to_print}Checked found {len(valid_ibans)} IBAN;{item_id}')
# Tags
msg = f'infoleak:automatic-detection="iban";{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if __name__ == '__main__':

View file

@ -25,6 +25,7 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
@ -42,14 +43,13 @@ class Indexer(AbstractModule):
"""
super(Indexer, self).__init__()
config_loader = ConfigLoader()
# Indexer configuration - index dir and schema setup
self.baseindexpath = join(os.environ['AIL_HOME'],
self.process.config.get("Indexer", "path"))
self.indexRegister_path = join(os.environ['AIL_HOME'],
self.process.config.get("Indexer", "register"))
self.indexertype = self.process.config.get("Indexer", "type")
self.INDEX_SIZE_THRESHOLD = self.process.config.getint(
"Indexer", "index_max_size")
self.baseindexpath = join(os.environ['AIL_HOME'], config_loader.get_config_str("Indexer", "path"))
self.indexRegister_path = join(os.environ['AIL_HOME'], config_loader.get_config_str("Indexer", "register"))
self.indexertype = config_loader.get_config_str("Indexer", "type")
self.INDEX_SIZE_THRESHOLD = config_loader.get_config_int("Indexer", "index_max_size")
self.indexname = None
self.schema = None

View file

@ -66,32 +66,32 @@ class Keys(AbstractModule):
self.redis_logger.warning(f'{item.get_basename()} has a PGP enc message')
msg = f'infoleak:automatic-detection="pgp-message";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
get_pgp_content = True
# find = True
if KeyEnum.PGP_PUBLIC_KEY_BLOCK.value in content:
msg = f'infoleak:automatic-detection="pgp-public-key-block";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
get_pgp_content = True
if KeyEnum.PGP_SIGNATURE.value in content:
msg = f'infoleak:automatic-detection="pgp-signature";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
get_pgp_content = True
if KeyEnum.PGP_PRIVATE_KEY_BLOCK.value in content:
self.redis_logger.warning(f'{item.get_basename()} has a pgp private key block message')
msg = f'infoleak:automatic-detection="pgp-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
get_pgp_content = True
if KeyEnum.CERTIFICATE.value in content:
self.redis_logger.warning(f'{item.get_basename()} has a certificate message')
msg = f'infoleak:automatic-detection="certificate";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.RSA_PRIVATE_KEY.value in content:
@ -99,7 +99,7 @@ class Keys(AbstractModule):
print('rsa private key message found')
msg = f'infoleak:automatic-detection="rsa-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.PRIVATE_KEY.value in content:
@ -107,7 +107,7 @@ class Keys(AbstractModule):
print('private key message found')
msg = f'infoleak:automatic-detection="private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.ENCRYPTED_PRIVATE_KEY.value in content:
@ -115,7 +115,7 @@ class Keys(AbstractModule):
print('encrypted private key message found')
msg = f'infoleak:automatic-detection="encrypted-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.OPENSSH_PRIVATE_KEY.value in content:
@ -123,7 +123,7 @@ class Keys(AbstractModule):
print('openssh private key message found')
msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.SSH2_ENCRYPTED_PRIVATE_KEY.value in content:
@ -131,7 +131,7 @@ class Keys(AbstractModule):
print('SSH2 private key message found')
msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.OPENVPN_STATIC_KEY_V1.value in content:
@ -139,37 +139,37 @@ class Keys(AbstractModule):
print('OpenVPN Static key message found')
msg = f'infoleak:automatic-detection="vpn-static-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.DSA_PRIVATE_KEY.value in content:
self.redis_logger.warning(f'{item.get_basename()} has a dsa private key message')
msg = f'infoleak:automatic-detection="dsa-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.EC_PRIVATE_KEY.value in content:
self.redis_logger.warning(f'{item.get_basename()} has an ec private key message')
msg = f'infoleak:automatic-detection="ec-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
if KeyEnum.PUBLIC_KEY.value in content:
self.redis_logger.warning(f'{item.get_basename()} has a public key message')
msg = f'infoleak:automatic-detection="public-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# find = True
# pgp content
if get_pgp_content:
self.send_message_to_queue(item.get_id(), 'PgpDump')
self.add_message_to_queue(item.get_id(), 'PgpDump')
# if find :
# # Send to duplicate
# self.send_message_to_queue(item.get_id(), 'Duplicate')
# self.add_message_to_queue(item.get_id(), 'Duplicate')
# self.redis_logger.debug(f'{item.get_id()} has key(s)')
# print(f'{item.get_id()} has key(s)')

View file

@ -78,7 +78,7 @@ class LibInjection(AbstractModule):
# Add tag
msg = f'infoleak:automatic-detection="sql-injection";{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# statistics
# # # TODO: # FIXME: remove me

View file

@ -36,8 +36,8 @@ class Mail(AbstractModule):
Module Mail module for AIL framework
"""
def __init__(self):
super(Mail, self).__init__()
def __init__(self, queue=True):
super(Mail, self).__init__(queue=queue)
config_loader = ConfigLoader()
self.r_cache = config_loader.get_redis_conn("Redis_Cache")
@ -158,8 +158,8 @@ class Mail(AbstractModule):
num_valid_email += nb_mails
# Create domain_mail stats
msg = f'mail;{nb_mails};{domain_mx};{item_date}'
self.send_message_to_queue(msg, 'ModuleStats')
# msg = f'mail;{nb_mails};{domain_mx};{item_date}'
# self.add_message_to_queue(msg, 'ModuleStats')
# Create country stats
self.faup.decode(domain_mx)
@ -178,7 +178,7 @@ class Mail(AbstractModule):
self.redis_logger.warning(msg)
# Tags
msg = f'infoleak:automatic-detection="mail";{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
else:
self.redis_logger.info(msg)

View file

@ -173,7 +173,7 @@ class Mixer(AbstractModule):
self.r_cache.expire(digest, self.ttl_key)
self.increase_stat_processed(feeder_name)
self.send_message_to_queue(relay_message)
self.add_message_to_queue(relay_message)
# Need To Be Fixed, Currently doesn't check the source (-> same as operation 1)
# # Keep duplicate coming from different sources
@ -189,7 +189,7 @@ class Mixer(AbstractModule):
# self.r_cache.expire(item_id, self.ttl_key)
# self.r_cache.expire(f'HASH_{item_id}', self.ttl_key)
#
# self.send_message_to_queue(relay_message)
# self.add_message_to_queue(relay_message)
#
# else:
# if digest != older_digest:
@ -199,7 +199,7 @@ class Mixer(AbstractModule):
# self.r_cache.sadd(item_id, feeder_name)
# self.r_cache.expire(item_id, ttl_key)
#
# self.send_message_to_queue(relay_message)
# self.add_message_to_queue(relay_message)
#
# else:
# # Already processed
@ -210,7 +210,7 @@ class Mixer(AbstractModule):
# No Filtering
else:
self.increase_stat_processed(feeder_name)
self.send_message_to_queue(relay_message)
self.add_message_to_queue(relay_message)
if __name__ == "__main__":

View file

@ -29,8 +29,8 @@ from lib import crawlers
class Onion(AbstractModule):
"""docstring for Onion module."""
def __init__(self):
super(Onion, self).__init__()
def __init__(self, queue=True):
super(Onion, self).__init__(queue=queue)
config_loader = ConfigLoader()
self.r_cache = config_loader.get_redis_conn("Redis_Cache")
@ -101,7 +101,7 @@ class Onion(AbstractModule):
# TAG Item
msg = f'infoleak:automatic-detection="onion";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if __name__ == "__main__":

View file

@ -235,7 +235,7 @@ class PgpDump(AbstractModule):
if self.symmetrically_encrypted:
msg = f'infoleak:automatic-detection="pgp-symmetric";{self.item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if __name__ == '__main__':

View file

@ -54,7 +54,7 @@ class Phone(AbstractModule):
self.redis_logger.warning(f'{item.get_id()} contains PID (phone numbers)')
msg = f'infoleak:automatic-detection="phone-number";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
stats = {}
for phone_number in results:

View file

@ -59,7 +59,7 @@ class SQLInjectionDetection(AbstractModule):
# Tag
msg = f'infoleak:automatic-detection="sql-injection";{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# statistics
# tld = url_parsed['tld']

View file

@ -78,7 +78,6 @@ class SentimentAnalysis(AbstractModule):
try:
self.analyse(message)
except TimeoutException:
self.process.incr_module_timeout_statistic()
self.redis_logger.debug(f"{message} processing timeout")
else:
signal.alarm(0)

View file

@ -24,6 +24,7 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.objects.Items import ITEMS_FOLDER
from lib import ConfigLoader
from lib import Tag
@ -45,7 +46,7 @@ class SubmitPaste(AbstractModule):
"""
init
"""
super(SubmitPaste, self).__init__(queue_name='submit_paste')
super(SubmitPaste, self).__init__()
# TODO KVROCKS
self.r_serv_db = ConfigLoader.ConfigLoader().get_db_conn("Kvrocks_DB")
@ -262,8 +263,7 @@ class SubmitPaste(AbstractModule):
source = source if source else 'submitted'
save_path = source + '/' + now.strftime("%Y") + '/' + now.strftime("%m") + '/' + now.strftime("%d") + '/submitted_' + name + '.gz'
full_path = os.path.join(os.environ['AIL_HOME'],
self.process.config.get("Directories", "pastes"), save_path)
full_path = os.path.join(ITEMS_FOLDER, save_path)
self.redis_logger.debug(f'file path of the paste {full_path}')
@ -281,7 +281,7 @@ class SubmitPaste(AbstractModule):
# send paste to Global module
relay_message = f"submitted {rel_item_path} {gzip64encoded}"
self.process.populate_set_out(relay_message)
self.add_message_to_queue(relay_message)
# add tags
for tag in ltags:

View file

@ -50,10 +50,10 @@ class Tags(AbstractModule):
print(f'{item.get_id()}: Tagged {tag}')
# Forward message to channel
self.send_message_to_queue(message, 'MISP_The_Hive_feeder')
self.add_message_to_queue(message, 'Tag_feed')
message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}'
self.send_message_to_queue(message, 'SyncModule')
self.add_message_to_queue(message, 'Sync')
else:
# Malformed message

View file

@ -86,7 +86,7 @@ class Telegram(AbstractModule):
if invite_code_found:
# tags
msg = f'infoleak:automatic-detection="telegram-invite-hash";{item.id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if __name__ == "__main__":

View file

@ -395,8 +395,8 @@ class Tools(AbstractModule):
Tools module for AIL framework
"""
def __init__(self):
super(Tools, self).__init__()
def __init__(self, queue=True):
super(Tools, self).__init__(queue=queue)
self.max_execution_time = 30
# Waiting time in seconds between to message processed
@ -426,7 +426,7 @@ class Tools(AbstractModule):
print(f'{item.id} found: {tool_name}')
# Tag Item
msg = f"{tool['tag']};{item.id}"
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# TODO ADD LOGS

View file

@ -22,6 +22,7 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
# # TODO: Faup packages: Add new binding: Check TLD
@ -37,11 +38,13 @@ class Urls(AbstractModule):
"""
super(Urls, self).__init__()
config_loader = ConfigLoader()
self.faup = Faup()
# Protocol file path
protocolsfile_path = os.path.join(os.environ['AIL_HOME'],
self.process.config.get("Directories", "protocolsfile"))
config_loader.get_config_str("Directories", "protocolsfile"))
# Get all uri from protocolsfile (Used for Curve)
uri_scheme = ""
with open(protocolsfile_path, 'r') as scheme_file:
@ -78,7 +81,7 @@ class Urls(AbstractModule):
to_send = f"{url} {item.get_id()}"
print(to_send)
self.send_message_to_queue(to_send, 'Url')
self.add_message_to_queue(to_send, 'Url')
self.redis_logger.debug(f"url_parsed: {to_send}")
if len(l_urls) > 0:

View file

@ -7,22 +7,26 @@ Base Class for AIL Modules
# Import External packages
##################################
from abc import ABC, abstractmethod
import os
import sys
import time
import traceback
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from pubsublogger import publisher
from Helper import Process
from lib.ail_queues import AILQueue
from lib import regex_helper
from lib.exceptions import ModuleQueueError
class AbstractModule(ABC):
"""
Abstract Module class
"""
def __init__(self, module_name=None, queue_name=None, logger_channel='Script'):
def __init__(self, module_name=None, logger_channel='Script', queue=True):
"""
Init Module
module_name: str; set the module name if different from the instance ClassName
@ -32,8 +36,11 @@ class AbstractModule(ABC):
# Module name if provided else instance className
self.module_name = module_name if module_name else self._module_name()
# Module name if provided else instance className
self.queue_name = queue_name if queue_name else self._module_name()
self.pid = os.getpid()
# Setup the I/O queues
if queue:
self.queue = AILQueue(self.module_name, self.pid)
# Init Redis Logger
self.redis_logger = publisher
@ -46,19 +53,16 @@ class AbstractModule(ABC):
# If provided could be a namespaced channel like script:<ModuleName>
self.redis_logger.channel = logger_channel
#Cache key
# Cache key
self.r_cache_key = regex_helper.generate_redis_cache_key(self.module_name)
self.max_execution_time = 30
# Run module endlessly
self.proceed = True
# Waiting time in secondes between two proccessed messages
# Waiting time in seconds between two processed messages
self.pending_seconds = 10
# Setup the I/O queues
self.process = Process(self.queue_name)
# Debug Mode
self.debug = False
@ -68,17 +72,17 @@ class AbstractModule(ABC):
Input message can change between modules
ex: '<item id>'
"""
return self.process.get_from_set()
return self.queue.get_message()
def send_message_to_queue(self, message, queue_name=None):
def add_message_to_queue(self, message, queue_name=None):
"""
Send message to queue
Add message to queue
:param message: message to send in queue
:param queue_name: queue or module name
ex: send_to_queue(item_id, 'Global')
ex: add_message_to_queue(item_id, 'Mail')
"""
self.process.populate_set_out(message, queue_name)
self.queue.send_message(message, queue_name)
# add to new set_module
def regex_search(self, regex, obj_id, content):
@ -87,16 +91,16 @@ class AbstractModule(ABC):
def regex_finditer(self, regex, obj_id, content):
return regex_helper.regex_finditer(self.r_cache_key, regex, obj_id, content, max_time=self.max_execution_time)
def regex_findall(self, regex, id, content, r_set=False):
def regex_findall(self, regex, obj_id, content, r_set=False):
"""
regex findall helper (force timeout)
:param regex: compiled regex
:param id: object id
:param obj_id: object id
:param content: object content
ex: send_to_queue(item_id, 'Global')
:param r_set: return result as set
"""
return regex_helper.regex_findall(self.module_name, self.r_cache_key, regex, id, content, max_time=self.max_execution_time, r_set=r_set)
return regex_helper.regex_findall(self.module_name, self.r_cache_key, regex, obj_id, content,
max_time=self.max_execution_time, r_set=r_set)
def run(self):
"""
@ -114,7 +118,10 @@ class AbstractModule(ABC):
self.compute(message)
except Exception as err:
if self.debug:
self.queue.error()
raise err
# LOG ERROR
trace = traceback.format_tb(err.__traceback__)
trace = ''.join(trace)
self.redis_logger.critical(f"Error in module {self.module_name}: {err}")
@ -125,6 +132,10 @@ class AbstractModule(ABC):
print(f'MESSAGE: {message}')
print('TRACEBACK:')
print(trace)
if isinstance(err, ModuleQueueError):
self.queue.error()
raise err
# remove from set_module
## check if item process == completed
@ -134,14 +145,12 @@ class AbstractModule(ABC):
self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s")
time.sleep(self.pending_seconds)
def _module_name(self):
"""
Returns the instance class name (ie. the Module Name)
Returns the instance class name (ie the Module Name)
"""
return self.__class__.__name__
@abstractmethod
def compute(self, message):
"""
@ -149,7 +158,6 @@ class AbstractModule(ABC):
"""
pass
def computeNone(self):
"""
Method of the Module when there is no message

View file

@ -1,172 +0,0 @@
[ZMQModuleImporter]
publish = Redis_Import
[FeederModuleImporter]
publish = Redis_Import
####################################################
[Mixer]
# subscribe = ZMQ_Global
subscribe = Redis_Import
publish = Redis_Mixer
[Sync_importer]
publish = Redis_Import,Redis_Tags
[Importer_Json]
publish = Redis_Import,Redis_Tags
[Global]
subscribe = Redis_Mixer
publish = Redis_Global,Redis_ModuleStats
[Duplicates]
subscribe = Redis_Duplicate
[Indexer]
subscribe = Redis_Global
[Hosts]
subscribe = Redis_Global
publish = Redis_Host
[DomClassifier]
subscribe = Redis_Host
publish = Redis_D4_client
[D4Client]
subscribe = Redis_D4_client
[Retro_Hunt]
subscribe = Redis
publish = Redis_Tags
[Tracker_Typo_Squatting]
subscribe = Redis_Host
publish = Redis_Tags
[Tracker_Term]
subscribe = Redis_Global
publish = Redis_Tags
[Tracker_Regex]
subscribe = Redis_Global
publish = Redis_Tags
[Tracker_Yara]
subscribe = Redis_Global
publish = Redis_Tags
[Tools]
subscribe = Redis_Global
publish = Redis_Tags
[Telegram]
subscribe = Redis_Global
publish = Redis_Tags
[Languages]
subscribe = Redis_Global
[Categ]
subscribe = Redis_Global
publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Urls,Redis_Credential,Redis_Cve,Redis_ApiKey,Redis_SyncModule
[CreditCards]
subscribe = Redis_CreditCards
publish = Redis_Tags
[Iban]
subscribe = Redis_Global
publish = Redis_Tags
[Mail]
subscribe = Redis_Mail
publish = Redis_ModuleStats,Redis_Tags
[Onion]
subscribe = Redis_Onion
publish = Redis_Tags,Redis_Crawler
#publish = Redis_ValidOnion,ZMQ_FetchedOnion,Redis_Tags,Redis_Crawler
[Urls]
subscribe = Redis_Urls
publish = Redis_Url
#publish = Redis_Url,ZMQ_Url
[LibInjection]
subscribe = Redis_Url
publish = Redis_Tags
[SQLInjectionDetection]
subscribe = Redis_Url
publish = Redis_Tags
[ModuleStats]
subscribe = Redis_ModuleStats
[Tags]
subscribe = Redis_Tags
publish = Redis_Tags_feed,Redis_SyncModule
# dirty fix
[Sync_module]
subscribe = Redis_SyncModule
[MISP_The_hive_feeder]
subscribe = Redis_Tags_feed
#[SentimentAnalysis]
#subscribe = Redis_Global
[Credential]
subscribe = Redis_Credential
publish = Redis_Duplicate,Redis_ModuleStats,Redis_Tags
[CveModule]
subscribe = Redis_Cve
publish = Redis_Tags
# Disabled
#[Phone]
#subscribe = Redis_Global
#publish = Redis_Tags
[Keys]
subscribe = Redis_Global
publish = Redis_PgpDump,Redis_Tags
[PgpDump]
subscribe = Redis_PgpDump
publish = Redis_Tags
[ApiKey]
subscribe = Redis_ApiKey
publish = Redis_Tags
[Decoder]
subscribe = Redis_Global
publish = Redis_Tags
[Cryptocurrencies]
subscribe = Redis_Global
publish = Redis_Tags
[submit_paste]
publish = Redis_Import
[Crawler]
publish = Redis_Import,Redis_Tags
[IP]
subscribe = Redis_Global
publish = Redis_Tags
[Zerobins]
subscribe = Redis_Url
# [My_Module]
# subscribe = Redis_Global
# publish = Redis_Tags

View file

@ -19,6 +19,7 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
from packages import Date
from lib import Tracker
@ -34,9 +35,10 @@ class Retro_Hunt(AbstractModule):
"""
def __init__(self):
super(Retro_Hunt, self).__init__()
config_loader = ConfigLoader()
self.pending_seconds = 5
self.full_item_url = self.process.config.get("Notifications", "ail_domain") + "/object/item?id="
self.full_item_url = config_loader.get_config_str("Notifications", "ail_domain") + "/object/item?id="
# reset on each loop
self.task_uuid = None
@ -149,7 +151,7 @@ class Retro_Hunt(AbstractModule):
# Tags
for tag in self.tags:
msg = f'{tag};{id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# # Mails
# mail_to_notify = Tracker.get_tracker_mails(tracker_uuid)

View file

@ -18,6 +18,7 @@ sys.path.append(os.environ['AIL_BIN'])
##################################
from modules.abstract_module import AbstractModule
from lib.objects.Items import Item
from lib.ConfigLoader import ConfigLoader
from packages import Term
from lib import Tracker
@ -31,9 +32,11 @@ class Tracker_Regex(AbstractModule):
def __init__(self):
super(Tracker_Regex, self).__init__()
config_loader = ConfigLoader()
self.pending_seconds = 5
self.max_execution_time = self.process.config.getint(self.module_name, "max_execution_time")
self.max_execution_time = config_loader.get_config_int(self.module_name, "max_execution_time")
# refresh Tracked Regex
self.dict_regex_tracked = Term.get_regex_tracked_words_dict()
@ -85,7 +88,7 @@ class Tracker_Regex(AbstractModule):
for tag in tracker.get_tags():
msg = f'{tag};{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if tracker.mail_export():
# TODO add matches + custom subjects

View file

@ -20,6 +20,7 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
from packages import Term
from lib import Tracker
@ -46,9 +47,11 @@ class Tracker_Term(AbstractModule):
def __init__(self):
super(Tracker_Term, self).__init__()
config_loader = ConfigLoader()
self.pending_seconds = 5
self.max_execution_time = self.process.config.getint('Tracker_Term', "max_execution_time")
self.max_execution_time = config_loader.get_config_int('Tracker_Term', "max_execution_time")
# loads tracked words
self.list_tracked_words = Term.get_tracked_words_list()
@ -137,7 +140,7 @@ class Tracker_Term(AbstractModule):
# Tags
for tag in tracker.get_tags():
msg = f'{tag};{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# Mail
if tracker.mail_export():

View file

@ -79,7 +79,7 @@ class Tracker_Typo_Squatting(AbstractModule):
for tag in tracker.get_tags():
msg = f'{tag};{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
if tracker.mail_export():
self.exporters['mail'].export(tracker, item)

View file

@ -88,7 +88,7 @@ class Tracker_Yara(AbstractModule):
# Tags
for tag in tracker.get_tags():
msg = f'{tag};{item_id}'
self.send_message_to_queue(msg, 'Tags')
self.add_message_to_queue(msg, 'Tags')
# Mails
if tracker.mail_export():

166
configs/modules.cfg Normal file
View file

@ -0,0 +1,166 @@
[ZMQModuleImporter]
publish = Importers
[FeederModuleImporter]
publish = Importers
[Importer_Json]
publish = Importers,Tags
####################################################
[Mixer]
subscribe = Importers
publish = SaveObj
[Sync_importer]
publish = Importers,Tags
[Global]
subscribe = SaveObj
publish = Item,Sync
[Duplicates]
subscribe = Duplicate
[Indexer]
subscribe = Item
[Hosts]
subscribe = Item
publish = Host
[DomClassifier]
subscribe = Host
publish = D4_client
[D4Client]
subscribe = D4_client
[Retro_Hunt]
publish = Tags
[Tracker_Typo_Squatting]
subscribe = Host
publish = Tags
[Tracker_Term]
subscribe = Item
publish = Tags
[Tracker_Regex]
subscribe = Item
publish = Tags
[Tracker_Yara]
subscribe = Item
publish = Tags
[Tools]
subscribe = Item
publish = Tags
[Telegram]
subscribe = Item
publish = Tags
[Languages]
subscribe = Item
[Categ]
subscribe = Item
publish = CreditCards,Mail,Onion,Urls,Credential,Cve,ApiKey
[CreditCards]
subscribe = CreditCards
publish = Tags
[Iban]
subscribe = Item
publish = Tags
[Mail]
subscribe = Mail
publish = Tags
#publish = ModuleStats,Tags
[Onion]
subscribe = Onion
publish = Tags
[Urls]
subscribe = Urls
publish = Url
[LibInjection]
subscribe = Url
publish = Tags
[SQLInjectionDetection]
subscribe = Url
publish = Tags
[Tags]
subscribe = Tags
publish = Tag_feed,Sync
# dirty fix
[Sync_module]
subscribe = Sync
[MISP_The_hive_feeder]
subscribe = Tag_feed
#[SentimentAnalysis]
#subscribe = Item
[Credential]
subscribe = Credential
publish = Duplicate,Tags
[CveModule]
subscribe = Cve
publish = Tags
# Disabled
#[Phone]
#subscribe = Item
#publish = Tags
[Keys]
subscribe = Item
publish = PgpDump,Tags
[PgpDump]
subscribe = PgpDump
publish = Tags
[ApiKey]
subscribe = ApiKey
publish = Tags
[Decoder]
subscribe = Item
publish = Tags
[Cryptocurrencies]
subscribe = Item
publish = Tags
[SubmitPaste]
publish = Importers
[Crawler]
publish = Importers,Tags
[IP]
subscribe = Item
publish = Tags
[Zerobins]
subscribe = Url
# [My_Module_Name]
# subscribe = Global # Queue name
# publish = Tags # Queue name

View file

@ -1,131 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import os
import argparse
def main():
content = ""
modules = {}
all_modules = []
curr_module = ""
streamingPub = {}
streamingSub = {}
path = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') # path to module config file
path2 = os.path.join(os.environ['AIL_HOME'], 'doc/all_modules.txt') # path and name of the output file, this file contain a list off all modules
parser = argparse.ArgumentParser(
description='''This script is a part of the Analysis Information Leak
framework. It create a graph that represent the flow between modules".''',
epilog='Example: ./generate_graph_data.py 0')
parser.add_argument('type', type=int, default=0,
help='''The graph type (default 0),
0: module graph,
1: data graph''',
choices=[0, 1], action='store')
parser.add_argument('spline', type=str, default="ortho",
help='''The graph splines type, spline:default , ortho: orthogonal''',
choices=["ortho", "spline"], action='store')
args = parser.parse_args()
with open(path, 'r') as f:
# get all modules, subscriber and publisher for each module
for line in f:
if line[0] != '#':
# module name
if line[0] == '[':
curr_name = line.replace('[','').replace(']','').replace('\n', '').replace(' ', '')
all_modules.append(curr_name)
modules[curr_name] = {'sub': [], 'pub': []}
curr_module = curr_name
elif curr_module != "": # searching for sub or pub
# subscriber list
if line.startswith("subscribe"):
curr_subscribers = [w for w in line.replace('\n', '').replace(' ', '').split('=')[1].split(',')]
modules[curr_module]['sub'] = curr_subscribers
for sub in curr_subscribers:
streamingSub[sub] = curr_module
# publisher list
elif line.startswith("publish"):
curr_publishers = [w for w in line.replace('\n', '').replace(' ', '').split('=')[1].split(',')]
modules[curr_module]['pub'] = curr_publishers
for pub in curr_publishers:
streamingPub[pub] = curr_module
else:
continue
output_set_graph = set()
with open(path2, 'w') as f2:
for e in all_modules:
f2.write(e+"\n")
output_text_graph = ""
# flow between modules
if args.type == 0:
for module in modules.keys():
for stream_in in modules[module]['sub']:
if stream_in not in streamingPub.keys():
output_set_graph.add("\"" + stream_in + "\" [color=darkorange1] ;\n")
output_set_graph.add("\"" + stream_in + "\"" + "->" + module + ";\n")
else:
output_set_graph.add("\"" + streamingPub[stream_in] + "\"" + "->" + module + ";\n")
for stream_out in modules[module]['pub']:
if stream_out not in streamingSub.keys():
#output_set_graph.add("\"" + stream_out + "\" [color=darkorange1] ;\n")
output_set_graph.add("\"" + module + "\"" + "->" + stream_out + ";\n")
else:
output_set_graph.add("\"" + module + "\"" + "->" + streamingSub[stream_out] + ";\n")
# graph head
output_text_graph += "digraph unix {\n"
output_text_graph += "graph [pad=\"0.5\"];\n"
output_text_graph += "size=\"25,25\";\n"
output_text_graph += "splines="
output_text_graph += args.spline
output_text_graph += ";\n"
output_text_graph += "node [color=lightblue2, style=filled];\n"
# flow between data
if args.type == 1:
for module in modules.keys():
for stream_in in modules[module]['sub']:
for stream_out in modules[module]['pub']:
if stream_in not in streamingPub.keys():
output_set_graph.add("\"" + stream_in + "\" [color=darkorange1] ;\n")
output_set_graph.add("\"" + stream_in + "\"" + "->" + stream_out + ";\n")
# graph head
output_text_graph += "digraph unix {\n"
output_text_graph += "graph [pad=\"0.5\"];\n"
output_text_graph += "size=\"25,25\";\n"
output_text_graph += "splines="
output_text_graph += args.spline
output_text_graph += ";\n"
output_text_graph += "node [color=tan, style=filled];\n"
# create final txt graph
for elem in output_set_graph:
output_text_graph += elem
output_text_graph += "}"
print(output_text_graph)
if __name__ == "__main__":
main()

View file

@ -1,4 +0,0 @@
#!/bin/bash
python3 $AIL_HOME/doc/generate_graph_data.py 0 ortho | dot -T png -o $AIL_HOME/doc/module-data-flow.png
python3 $AIL_HOME/doc/generate_graph_data.py 1 ortho | dot -T png -o $AIL_HOME/doc/data-flow.png

View file

@ -130,9 +130,6 @@ cp ${AIL_BIN}/helper/gen_cert/server.key ${AIL_FLASK}/server.key
mkdir -p $AIL_HOME/PASTES
#Create the file all_module and update the graph in doc
$AIL_HOME/doc/generate_modules_data_flow_graph.sh
#### DB SETUP ####
# init update version

View file

@ -20,7 +20,7 @@ sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import queues_modules
from lib import ail_queues
from lib import ail_updates
from packages.Date import Date
@ -57,7 +57,7 @@ def event_stream():
def get_queues():
# We may want to put the llen in a pipeline to do only one query.
return queues_modules.get_all_modules_queues_stats()
return ail_queues.get_modules_queues_stats()
def get_date_range(date_from, num_day):
date = Date(str(date_from[0:4])+str(date_from[4:6]).zfill(2)+str(date_from[6:8]).zfill(2))

View file

@ -346,9 +346,11 @@ function create_queue_table() {
// - j=1: queueLength
// - j=2: LastProcessedPasteTime
// - j=3: Number of the module belonging in the same category
if (parseInt(glob_tabvar.row1[i][2]) > window.threshold_stucked_module && parseInt(glob_tabvar.row1[i][1]) > 2)
if (glob_tabvar.row1[i][3]==="Not Launched")
tr.className += " bg-danger text-white";
else if (parseInt(glob_tabvar.row1[i][2]) > window.threshold_stucked_module && parseInt(glob_tabvar.row1[i][1]) > 2)
tr.className += " table-danger";
else if (parseInt(glob_tabvar.row1[i][1]) == 0)
else if (parseInt(glob_tabvar.row1[i][1]) === 0)
tr.className += " table-disabled";
else
tr.className += " table-success";