From 96a30170e329fe1690688f4b03fd6d92bb378595 Mon Sep 17 00:00:00 2001 From: lpwm9803 Date: Fri, 2 Apr 2021 09:52:05 +0200 Subject: [PATCH] feat: module factorization --- bin/Categ.py | 108 +++++++---- bin/Indexer.py | 223 +++++++++++----------- bin/Keys.py | 250 +++++++++++++------------ bin/ModuleStats.py | 250 +++++++++++++------------ bin/Phone.py | 112 +++++------ bin/TermTrackerMod.py | 168 +++++++++-------- bin/Web.py | 294 ++++++++++++++++------------- bin/WebStats.py | 338 ++++++++++++++++++---------------- bin/module/__init__.py | 0 bin/module/abstract_module.py | 98 ++++++++++ bin/template.py | 76 ++++---- 11 files changed, 1076 insertions(+), 841 deletions(-) create mode 100644 bin/module/__init__.py create mode 100644 bin/module/abstract_module.py diff --git a/bin/Categ.py b/bin/Categ.py index 3ebc42ea..a8efd12b 100755 --- a/bin/Categ.py +++ b/bin/Categ.py @@ -36,68 +36,96 @@ Requirements *Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly. """ + +################################## +# Import External packages +################################## import os import argparse import time import re + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from pubsublogger import publisher from packages import Paste - from Helper import Process -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Script" - config_section = 'Categ' +class Categ(AbstractModule): + """ + Categ module for AIL framework + """ - p = Process(config_section) - matchingThreshold = p.config.getint("Categ", "matchingThreshold") + def __init__(self): + """ + Init Categ + """ + super(Categ, self).__init__() - # SCRIPT PARSER # - parser = argparse.ArgumentParser(description='Start Categ module on files.') + self.matchingThreshold = self.process.config.getint("Categ", "matchingThreshold") - parser.add_argument( - '-d', type=str, default="../files/", - help='Path to the directory containing the category files.', - action='store') + # SCRIPT PARSER # + parser = argparse.ArgumentParser(description='Start Categ module on files.') - args = parser.parse_args() + parser.add_argument( + '-d', type=str, default="../files/", + help='Path to the directory containing the category files.', + action='store') - # FUNCTIONS # - publisher.info("Script Categ started") + args = parser.parse_args() - categories = ['CreditCards', 'Mail', 'Onion', 'Web', 'Credential', 'Cve', 'ApiKey'] - tmp_dict = {} - for filename in categories: - bname = os.path.basename(filename) - tmp_dict[bname] = [] - with open(os.path.join(args.d, filename), 'r') as f: - patterns = [r'%s' % ( re.escape(s.strip()) ) for s in f] - tmp_dict[bname] = re.compile('|'.join(patterns), re.IGNORECASE) + self.redis_logger.info("Script Categ started") - prec_filename = None + categories = ['CreditCards', 'Mail', 'Onion', 'Web', 'Credential', 'Cve', 'ApiKey'] + tmp_dict = {} + for filename in categories: + bname = os.path.basename(filename) + tmp_dict[bname] = [] + with open(os.path.join(args.d, filename), 'r') as f: + patterns = [r'%s' % ( re.escape(s.strip()) ) for s in f] + tmp_dict[bname] = re.compile('|'.join(patterns), re.IGNORECASE) - while True: - filename = p.get_from_set() - if filename is None: - publisher.debug("Script Categ is Idling 10s") - print('Sleeping') - time.sleep(10) - continue + self.categ_items = tmp_dict.items() - paste = Paste.Paste(filename) + prec_filename = None + + + def compute(self, message): + # Cast message as paste + paste = Paste.Paste(message) + # Get paste content content = paste.get_p_content() - for categ, pattern in tmp_dict.items(): + # init categories found + is_categ_found = False + + # Search for pattern categories in paste content + for categ, pattern in self.categ_items: + found = set(re.findall(pattern, content)) - if len(found) >= matchingThreshold: - msg = '{} {}'.format(paste.p_rel_path, len(found)) + lenfound = len(found) + if lenfound >= self.matchingThreshold: + is_categ_found = True + msg = '{} {}'.format(paste.p_rel_path, lenfound) - print(msg, categ) - p.populate_set_out(msg, categ) + self.redis_logger.debug('%s;%s %s'%(self.module_name, msg, categ)) + + # Export message to categ queue + self.process.populate_set_out(msg, categ) - publisher.info( + self.redis_logger.info( 'Categ;{};{};{};Detected {} as {};{}'.format( paste.p_source, paste.p_date, paste.p_name, - len(found), categ, paste.p_rel_path)) + lenfound, categ, paste.p_rel_path)) + + if not is_categ_found: + self.redis_logger.debug('No %s found in this paste: %s'%(self.module_name, paste.p_name)) + + +if __name__ == '__main__': + + module = Categ() + module.run() diff --git a/bin/Indexer.py b/bin/Indexer.py index 1d1ece4b..f86ad7f2 100755 --- a/bin/Indexer.py +++ b/bin/Indexer.py @@ -9,131 +9,148 @@ The ZMQ_Sub_Indexer modules is fetching the list of files to be processed and index each file with a full-text indexer (Whoosh until now). """ +################################## +# Import External packages +################################## import time -from packages import Paste -from pubsublogger import publisher - -from whoosh.index import create_in, exists_in, open_dir -from whoosh.fields import Schema, TEXT, ID import shutil import os from os.path import join, getsize +from whoosh.index import create_in, exists_in, open_dir +from whoosh.fields import Schema, TEXT, ID + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule +from packages import Paste from Helper import Process -# Config variable -TIME_WAIT = 60*15 #sec -# return in bytes -def check_index_size(baseindexpath, indexname): - the_index_name = join(baseindexpath, indexname) - cur_sum = 0 - for root, dirs, files in os.walk(the_index_name): - cur_sum += sum(getsize(join(root, name)) for name in files) - return cur_sum +class Indexer(AbstractModule): + """ + Indexer module for AIL framework + """ -def move_index_into_old_index_folder(baseindexpath): - for cur_file in os.listdir(baseindexpath): - if not cur_file == "old_index": - shutil.move(join(baseindexpath, cur_file), join(join(baseindexpath, "old_index"), cur_file)) + # Time to wait in seconds between two index's size variable compute + TIME_WAIT = 60*15 # sec + + def __init__(self): + """ + Init Instance + """ + super(Indexer, self).__init__() + + # Indexer configuration - index dir and schema setup + self.baseindexpath = join(os.environ['AIL_HOME'], + self.process.config.get("Indexer", "path")) + self.indexRegister_path = join(os.environ['AIL_HOME'], + self.process.config.get("Indexer", "register")) + self.indexertype = self.process.config.get("Indexer", "type") + self.INDEX_SIZE_THRESHOLD = self.process.config.getint( + "Indexer", "index_max_size") + + self.indexname = None + self.schema = None + self.ix = None + + if self.indexertype == "whoosh": + self.schema = Schema(title=TEXT(stored=True), path=ID(stored=True, + unique=True), + content=TEXT) + if not os.path.exists(self.baseindexpath): + os.mkdir(self.baseindexpath) + + # create the index register if not present + time_now = int(time.time()) + if not os.path.isfile(self.indexRegister_path): # index are not organised + self.redis_logger.debug("Indexes are not organized") + self.redis_logger.debug( + "moving all files in folder 'old_index' ") + # move all files to old_index folder + self.move_index_into_old_index_folder() + self.redis_logger.debug("Creating new index") + # create all_index.txt + with open(self.indexRegister_path, 'w') as f: + f.write(str(time_now)) + # create dir + os.mkdir(join(self.baseindexpath, str(time_now))) + + with open(self.indexRegister_path, "r") as f: + allIndex = f.read() + allIndex = allIndex.split() # format [time1\ntime2] + allIndex.sort() + + try: + self.indexname = allIndex[-1].strip('\n\r') + except IndexError as e: + self.indexname = time_now + + self.indexpath = join(self.baseindexpath, str(self.indexname)) + if not exists_in(self.indexpath): + self.ix = create_in(self.indexpath, self.schema) + else: + self.ix = open_dir(self.indexpath) + + self.last_refresh = time_now -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Script" - - config_section = 'Indexer' - - p = Process(config_section) - - # Indexer configuration - index dir and schema setup - baseindexpath = join(os.environ['AIL_HOME'], - p.config.get("Indexer", "path")) - indexRegister_path = join(os.environ['AIL_HOME'], - p.config.get("Indexer", "register")) - indexertype = p.config.get("Indexer", "type") - INDEX_SIZE_THRESHOLD = int(p.config.get("Indexer", "index_max_size")) - if indexertype == "whoosh": - schema = Schema(title=TEXT(stored=True), path=ID(stored=True, - unique=True), - content=TEXT) - if not os.path.exists(baseindexpath): - os.mkdir(baseindexpath) - - # create the index register if not present - time_now = int(time.time()) - if not os.path.isfile(indexRegister_path): #index are not organised - print("Indexes are not organized") - print("moving all files in folder 'old_index' ") - #move all files to old_index folder - move_index_into_old_index_folder(baseindexpath) - print("Creating new index") - #create all_index.txt - with open(indexRegister_path, 'w') as f: - f.write(str(time_now)) - #create dir - os.mkdir(join(baseindexpath, str(time_now))) - - with open(indexRegister_path, "r") as f: - allIndex = f.read() - allIndex = allIndex.split() # format [time1\ntime2] - allIndex.sort() - - try: - indexname = allIndex[-1].strip('\n\r') - except IndexError as e: - indexname = time_now - - indexpath = join(baseindexpath, str(indexname)) - if not exists_in(indexpath): - ix = create_in(indexpath, schema) - else: - ix = open_dir(indexpath) - - last_refresh = time_now - - # LOGGING # - publisher.info("ZMQ Indexer is Running") - - while True: + def compute(self, message): try: - message = p.get_from_set() - - if message is not None: - PST = Paste.Paste(message) - else: - publisher.debug("Script Indexer is idling 1s") - time.sleep(1) - continue + PST = Paste.Paste(message) docpath = message.split(" ", -1)[-1] paste = PST.get_p_content() - print("Indexing - " + indexname + " :", docpath) + self.redis_logger.debug("Indexing - " + self.indexname + " :", docpath) - - #avoid calculating the index's size at each message - if( time.time() - last_refresh > TIME_WAIT): - last_refresh = time.time() - if check_index_size(baseindexpath, indexname) >= INDEX_SIZE_THRESHOLD*(1000*1000): + # Avoid calculating the index's size at each message + if(time.time() - self.last_refresh > self.TIME_WAIT): + self.last_refresh = time.time() + if self.check_index_size() >= self.INDEX_SIZE_THRESHOLD*(1000*1000): timestamp = int(time.time()) - print("Creating new index", timestamp) - indexpath = join(baseindexpath, str(timestamp)) - indexname = str(timestamp) - #update all_index - with open(indexRegister_path, "a") as f: + self.redis_logger.debug("Creating new index", timestamp) + self.indexpath = join(self.baseindexpath, str(timestamp)) + self.indexname = str(timestamp) + # update all_index + with open(self.indexRegister_path, "a") as f: f.write('\n'+str(timestamp)) - #create new dir - os.mkdir(indexpath) - ix = create_in(indexpath, schema) + # create new dir + os.mkdir(self.indexpath) + self.ix = create_in(self.indexpath, self.schema) - - if indexertype == "whoosh": - indexwriter = ix.writer() + if self.indexertype == "whoosh": + indexwriter = self.ix.writer() indexwriter.update_document( title=docpath, path=docpath, content=paste) indexwriter.commit() + except IOError: - print("CRC Checksum Failed on :", PST.p_path) - publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format( + self.redis_logger.debug("CRC Checksum Failed on :", PST.p_path) + self.redis_logger.error('Duplicate;{};{};{};CRC Checksum Failed'.format( PST.p_source, PST.p_date, PST.p_name)) + + def check_index_size(self): + """ + return in bytes + """ + the_index_name = join(self.baseindexpath, self.indexname) + cur_sum = 0 + for root, dirs, files in os.walk(the_index_name): + cur_sum += sum(getsize(join(root, name)) for name in files) + + return cur_sum + + + def move_index_into_old_index_folder(self): + for cur_file in os.listdir(self.baseindexpath): + if not cur_file == "old_index": + shutil.move(join(self.baseindexpath, cur_file), join( + join(self.baseindexpath, "old_index"), cur_file)) + + +if __name__ == '__main__': + + module = Indexer() + module.run() diff --git a/bin/Keys.py b/bin/Keys.py index 237f807c..4d576591 100755 --- a/bin/Keys.py +++ b/bin/Keys.py @@ -12,160 +12,170 @@ RSA private key, certificate messages """ +################################## +# Import External packages +################################## import time +from enum import Enum from pubsublogger import publisher -#from bin.packages import Paste -#from bin.Helper import Process +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from packages import Paste from Helper import Process -def search_key(paste): - content = paste.get_p_content() - find = False - get_pgp_content = False - if '-----BEGIN PGP MESSAGE-----' in content: - publisher.warning('{} has a PGP enc message'.format(paste.p_name)) - - msg = 'infoleak:automatic-detection="pgp-message";{}'.format(message) - p.populate_set_out(msg, 'Tags') - get_pgp_content = True - find = True - - if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in content: - msg = 'infoleak:automatic-detection="pgp-public-key-block";{}'.format(message) - p.populate_set_out(msg, 'Tags') - get_pgp_content = True - - if '-----BEGIN PGP SIGNATURE-----' in content: - msg = 'infoleak:automatic-detection="pgp-signature";{}'.format(message) - p.populate_set_out(msg, 'Tags') - get_pgp_content = True +class KeyEnum(Enum): + PGP_MESSAGE = '-----BEGIN PGP MESSAGE-----' + PGP_PUBLIC_KEY_BLOCK = '-----BEGIN PGP PUBLIC KEY BLOCK-----' + PGP_PRIVATE_KEY_BLOCK = '-----BEGIN PGP PRIVATE KEY BLOCK-----' + PGP_SIGNATURE = '-----BEGIN PGP SIGNATURE-----' + CERTIFICATE = '-----BEGIN CERTIFICATE-----' + PUBLIC_KEY = '-----BEGIN PUBLIC KEY-----' + PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----' + ENCRYPTED_PRIVATE_KEY = '-----BEGIN ENCRYPTED PRIVATE KEY-----' + OPENSSH_PRIVATE_KEY = '-----BEGIN OPENSSH PRIVATE KEY-----' + SSH2_ENCRYPTED_PRIVATE_KEY = '---- BEGIN SSH2 ENCRYPTED PRIVATE KEY ----' + OPENVPN_STATIC_KEY_V1 = '-----BEGIN OpenVPN Static key V1-----' + RSA_PRIVATE_KEY = '-----BEGIN RSA PRIVATE KEY-----' + DSA_PRIVATE_KEY = '-----BEGIN DSA PRIVATE KEY-----' + EC_PRIVATE_KEY = '-----BEGIN EC PRIVATE KEY-----' - if '-----BEGIN CERTIFICATE-----' in content: - publisher.warning('{} has a certificate message'.format(paste.p_name)) +class Keys(AbstractModule): + """ + Keys module for AIL framework + """ + + def __init__(self): + super(Keys, self).__init__() - msg = 'infoleak:automatic-detection="certificate";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + # Waiting time in secondes between to message proccessed + self.pending_seconds = 1 - if '-----BEGIN RSA PRIVATE KEY-----' in content: - publisher.warning('{} has a RSA private key message'.format(paste.p_name)) - print('rsa private key message found') - msg = 'infoleak:automatic-detection="rsa-private-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + def compute(self, message): + paste = Paste.Paste(message) + content = paste.get_p_content() - if '-----BEGIN PRIVATE KEY-----' in content: - publisher.warning('{} has a private key message'.format(paste.p_name)) - print('private key message found') + find = False + get_pgp_content = False - msg = 'infoleak:automatic-detection="private-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.PGP_MESSAGE.value in content: + self.redis_logger.warning('{} has a PGP enc message'.format(paste.p_name)) - if '-----BEGIN ENCRYPTED PRIVATE KEY-----' in content: - publisher.warning('{} has an encrypted private key message'.format(paste.p_name)) - print('encrypted private key message found') + msg = 'infoleak:automatic-detection="pgp-message";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + get_pgp_content = True + find = True - msg = 'infoleak:automatic-detection="encrypted-private-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.PGP_PUBLIC_KEY_BLOCK.value in content: + msg = 'infoleak:automatic-detection="pgp-public-key-block";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + get_pgp_content = True - if '-----BEGIN OPENSSH PRIVATE KEY-----' in content: - publisher.warning('{} has an openssh private key message'.format(paste.p_name)) - print('openssh private key message found') + if KeyEnum.PGP_SIGNATURE.value in content: + msg = 'infoleak:automatic-detection="pgp-signature";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + get_pgp_content = True - msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.CERTIFICATE.value in content: + self.redis_logger.warning('{} has a certificate message'.format(paste.p_name)) - if '---- BEGIN SSH2 ENCRYPTED PRIVATE KEY ----' in content: - publisher.warning('{} has an ssh2 private key message'.format(paste.p_name)) - print('SSH2 private key message found') + msg = 'infoleak:automatic-detection="certificate";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True - msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.RSA_PRIVATE_KEY.value in content: + self.redis_logger.warning('{} has a RSA private key message'.format(paste.p_name)) + print('rsa private key message found') - if '-----BEGIN OpenVPN Static key V1-----' in content: - publisher.warning('{} has an openssh private key message'.format(paste.p_name)) - print('OpenVPN Static key message found') + msg = 'infoleak:automatic-detection="rsa-private-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True - msg = 'infoleak:automatic-detection="vpn-static-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.PRIVATE_KEY.value in content: + self.redis_logger.warning('{} has a private key message'.format(paste.p_name)) + print('private key message found') - if '-----BEGIN DSA PRIVATE KEY-----' in content: - publisher.warning('{} has a dsa private key message'.format(paste.p_name)) + msg = 'infoleak:automatic-detection="private-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True - msg = 'infoleak:automatic-detection="dsa-private-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.ENCRYPTED_PRIVATE_KEY.value in content: + self.redis_logger.warning('{} has an encrypted private key message'.format(paste.p_name)) + print('encrypted private key message found') - if '-----BEGIN EC PRIVATE KEY-----' in content: - publisher.warning('{} has an ec private key message'.format(paste.p_name)) + msg = 'infoleak:automatic-detection="encrypted-private-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True - msg = 'infoleak:automatic-detection="ec-private-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.OPENSSH_PRIVATE_KEY.value in content: + self.redis_logger.warning('{} has an openssh private key message'.format(paste.p_name)) + print('openssh private key message found') - if '-----BEGIN PGP PRIVATE KEY BLOCK-----' in content: - publisher.warning('{} has a pgp private key block message'.format(paste.p_name)) + msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True - msg = 'infoleak:automatic-detection="pgp-private-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.SSH2_ENCRYPTED_PRIVATE_KEY.value in content: + self.redis_logger.warning('{} has an ssh2 private key message'.format(paste.p_name)) + print('SSH2 private key message found') - if '-----BEGIN PUBLIC KEY-----' in content: - publisher.warning('{} has a public key message'.format(paste.p_name)) + msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True - msg = 'infoleak:automatic-detection="public-key";{}'.format(message) - p.populate_set_out(msg, 'Tags') - find = True + if KeyEnum.OPENVPN_STATIC_KEY_V1.value in content: + self.redis_logger.warning('{} has an openssh private key message'.format(paste.p_name)) + print('OpenVPN Static key message found') - # pgp content - if get_pgp_content: - p.populate_set_out(message, 'PgpDump') + msg = 'infoleak:automatic-detection="vpn-static-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True - if find : + if KeyEnum.DSA_PRIVATE_KEY.value in content: + self.redis_logger.warning('{} has a dsa private key message'.format(paste.p_name)) - #Send to duplicate - p.populate_set_out(message, 'Duplicate') - print(message) + msg = 'infoleak:automatic-detection="dsa-private-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True + + if KeyEnum.EC_PRIVATE_KEY.value in content: + self.redis_logger.warning('{} has an ec private key message'.format(paste.p_name)) + + msg = 'infoleak:automatic-detection="ec-private-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True + + if KeyEnum.PGP_PRIVATE_KEY_BLOCK.value in content: + self.redis_logger.warning('{} has a pgp private key block message'.format(paste.p_name)) + + msg = 'infoleak:automatic-detection="pgp-private-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True + + if KeyEnum.PUBLIC_KEY.value in content: + self.redis_logger.warning('{} has a public key message'.format(paste.p_name)) + + msg = 'infoleak:automatic-detection="public-key";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + find = True + + # pgp content + if get_pgp_content: + self.process.populate_set_out(message, 'PgpDump') + + if find : + #Send to duplicate + self.process.populate_set_out(message, 'Duplicate') + self.redis_logger.debug(message) if __name__ == '__main__': - # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - - # Section name in bin/packages/modules.cfg - config_section = 'Keys' - - # Setup the I/O queues - p = Process(config_section) - - # Sent to the logging a description of the module - publisher.info("Run Keys module ") - - # Endless loop getting messages from the input queue - while True: - # Get one message from the input queue - message = p.get_from_set() - if message is None: - publisher.debug("{} queue is empty, waiting".format(config_section)) - time.sleep(1) - continue - - # Do something with the message from the queue - paste = Paste.Paste(message) - search_key(paste) - - # (Optional) Send that thing to the next queue + + module = Keys() + module.run() diff --git a/bin/ModuleStats.py b/bin/ModuleStats.py index 7cf67299..fa1c754c 100755 --- a/bin/ModuleStats.py +++ b/bin/ModuleStats.py @@ -5,151 +5,157 @@ """ +################################## +# Import External packages +################################## import time import datetime import redis import os + + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from packages.Date import Date from pubsublogger import publisher from Helper import Process from packages import Paste - -# Config Var -max_set_cardinality = 8 - -def get_date_range(num_day): - curr_date = datetime.date.today() - date = Date(str(curr_date.year)+str(curr_date.month).zfill(2)+str(curr_date.day).zfill(2)) - date_list = [] - - for i in range(0, num_day+1): - date_list.append(date.substract_day(i)) - return date_list +import ConfigLoader -def compute_most_posted(server, message): - module, num, keyword, paste_date = message.split(';') +class ModuleStats(AbstractModule): + """ + Module Statistics module for AIL framework + """ - redis_progression_name_set = 'top_'+ module +'_set_' + paste_date - - # Add/Update in Redis - server.hincrby(paste_date, module+'-'+keyword, int(num)) - - # Compute Most Posted - date = get_date_range(0)[0] - # check if this keyword is eligible for progression - keyword_total_sum = 0 - - curr_value = server.hget(date, module+'-'+keyword) - keyword_total_sum += int(curr_value) if curr_value is not None else 0 - - if server.zcard(redis_progression_name_set) < max_set_cardinality: - server.zadd(redis_progression_name_set, float(keyword_total_sum), keyword) - - else: # not in set - member_set = server.zrangebyscore(redis_progression_name_set, '-inf', '+inf', withscores=True, start=0, num=1) - # Member set is a list of (value, score) pairs - if int(member_set[0][1]) < keyword_total_sum: - #remove min from set and add the new one - print(module + ': adding ' +keyword+ '(' +str(keyword_total_sum)+') in set and removing '+member_set[0][0]+'('+str(member_set[0][1])+')') - server.zrem(redis_progression_name_set, member_set[0][0]) - server.zadd(redis_progression_name_set, float(keyword_total_sum), keyword) - print(redis_progression_name_set) + # Config Var + MAX_SET_CARDINALITY = 8 -def compute_provider_info(server_trend, path): - redis_all_provider = 'all_provider_set' + def __init__(self): - paste = Paste.Paste(path) + super(ModuleStats, self).__init__() - paste_baseName = paste.p_name.split('.')[0] - paste_size = paste._get_p_size() - paste_provider = paste.p_source - paste_date = str(paste._get_p_date()) - redis_sum_size_set = 'top_size_set_' + paste_date - redis_avg_size_name_set = 'top_avg_size_set_' + paste_date - redis_providers_name_set = 'providers_set_' + paste_date + # Waiting time in secondes between to message proccessed + self.pending_seconds = 20 - # Add/Update in Redis - server_trend.sadd(redis_all_provider, paste_provider) + # Sent to the logging a description of the module + self.redis_logger.info("Makes statistics about valid URL") - num_paste = int(server_trend.hincrby(paste_provider+'_num', paste_date, 1)) - sum_size = float(server_trend.hincrbyfloat(paste_provider+'_size', paste_date, paste_size)) - new_avg = float(sum_size) / float(num_paste) - server_trend.hset(paste_provider +'_avg', paste_date, new_avg) + # REDIS # + self.r_serv_trend = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_Trending") + + self.r_serv_pasteName = ConfigLoader.ConfigLoader().get_redis_conn("Redis_Paste_Name") - # - # Compute Most Posted - # + def compute(self, message): - # Size - if server_trend.zcard(redis_sum_size_set) < max_set_cardinality or server_trend.zscore(redis_sum_size_set, paste_provider) != "nil": - server_trend.zadd(redis_sum_size_set, float(num_paste), paste_provider) - server_trend.zadd(redis_avg_size_name_set, float(new_avg), paste_provider) - else: #set full capacity - member_set = server_trend.zrangebyscore(redis_sum_size_set, '-inf', '+inf', withscores=True, start=0, num=1) - # Member set is a list of (value, score) pairs - if float(member_set[0][1]) < new_avg: - #remove min from set and add the new one - print('Size - adding ' +paste_provider+ '(' +str(new_avg)+') in set and removing '+member_set[0][0]+'('+str(member_set[0][1])+')') - server_trend.zrem(redis_sum_size_set, member_set[0][0]) - server_trend.zadd(redis_sum_size_set, float(sum_size), paste_provider) - server_trend.zrem(redis_avg_size_name_set, member_set[0][0]) - server_trend.zadd(redis_avg_size_name_set, float(new_avg), paste_provider) + if len(message.split(';')) > 1: + self.compute_most_posted(message) + else: + self.compute_provider_info(message) - # Num - # if set not full or provider already present - if server_trend.zcard(redis_providers_name_set) < max_set_cardinality or server_trend.zscore(redis_providers_name_set, paste_provider) != "nil": - server_trend.zadd(redis_providers_name_set, float(num_paste), paste_provider) - else: #set at full capacity - member_set = server_trend.zrangebyscore(redis_providers_name_set, '-inf', '+inf', withscores=True, start=0, num=1) - # Member set is a list of (value, score) pairs - if int(member_set[0][1]) < num_paste: - #remove min from set and add the new one - print('Num - adding ' +paste_provider+ '(' +str(num_paste)+') in set and removing '+member_set[0][0]+'('+str(member_set[0][1])+')') - server_trend.zrem(member_set[0][0]) - server_trend.zadd(redis_providers_name_set, float(num_paste), paste_provider) + def get_date_range(self, num_day): + curr_date = datetime.date.today() + date = Date(str(curr_date.year)+str(curr_date.month).zfill(2)+str(curr_date.day).zfill(2)) + date_list = [] + + for i in range(0, num_day+1): + date_list.append(date.substract_day(i)) + return date_list + + + def compute_most_posted(self, message): + module, num, keyword, paste_date = message.split(';') + + redis_progression_name_set = 'top_'+ module +'_set_' + paste_date + + # Add/Update in Redis + self.r_serv_trend.hincrby(paste_date, module+'-'+keyword, int(num)) + + # Compute Most Posted + date = self.get_date_range(0)[0] + # check if this keyword is eligible for progression + keyword_total_sum = 0 + + curr_value = self.r_serv_trend.hget(date, module+'-'+keyword) + keyword_total_sum += int(curr_value) if curr_value is not None else 0 + + if self.r_serv_trend.zcard(redis_progression_name_set) < self.MAX_SET_CARDINALITY: + self.r_serv_trend.zadd(redis_progression_name_set, float(keyword_total_sum), keyword) + + else: # not in set + member_set = self.r_serv_trend.zrangebyscore(redis_progression_name_set, '-inf', '+inf', withscores=True, start=0, num=1) + # Member set is a list of (value, score) pairs + if int(member_set[0][1]) < keyword_total_sum: + #remove min from set and add the new one + self.redis_logger.debug(module + ': adding ' +keyword+ '(' +str(keyword_total_sum)+') in set and removing '+member_set[0][0]+'('+str(member_set[0][1])+')') + self.r_serv_trend.zrem(redis_progression_name_set, member_set[0][0]) + self.r_serv_trend.zadd(redis_progression_name_set, float(keyword_total_sum), keyword) + self.redis_logger.debug(redis_progression_name_set) + + + def compute_provider_info(self, message): + redis_all_provider = 'all_provider_set' + + paste = Paste.Paste(message) + + paste_baseName = paste.p_name.split('.')[0] + paste_size = paste._get_p_size() + paste_provider = paste.p_source + paste_date = str(paste._get_p_date()) + redis_sum_size_set = 'top_size_set_' + paste_date + redis_avg_size_name_set = 'top_avg_size_set_' + paste_date + redis_providers_name_set = 'providers_set_' + paste_date + + # Add/Update in Redis + self.r_serv_pasteName.sadd(paste_baseName, message) + self.r_serv_trend.sadd(redis_all_provider, paste_provider) + + num_paste = int(self.r_serv_trend.hincrby(paste_provider+'_num', paste_date, 1)) + sum_size = float(self.r_serv_trend.hincrbyfloat(paste_provider+'_size', paste_date, paste_size)) + new_avg = float(sum_size) / float(num_paste) + self.r_serv_trend.hset(paste_provider +'_avg', paste_date, new_avg) + + + # + # Compute Most Posted + # + + # Size + if self.r_serv_trend.zcard(redis_sum_size_set) < self.MAX_SET_CARDINALITY or self.r_serv_trend.zscore(redis_sum_size_set, paste_provider) != "nil": + self.r_serv_trend.zadd(redis_sum_size_set, float(num_paste), paste_provider) + self.r_serv_trend.zadd(redis_avg_size_name_set, float(new_avg), paste_provider) + else: #set full capacity + member_set = self.r_serv_trend.zrangebyscore(redis_sum_size_set, '-inf', '+inf', withscores=True, start=0, num=1) + # Member set is a list of (value, score) pairs + if float(member_set[0][1]) < new_avg: + #remove min from set and add the new one + self.redis_logger.debug('Size - adding ' +paste_provider+ '(' +str(new_avg)+') in set and removing '+member_set[0][0]+'('+str(member_set[0][1])+')') + self.r_serv_trend.zrem(redis_sum_size_set, member_set[0][0]) + self.r_serv_trend.zadd(redis_sum_size_set, float(sum_size), paste_provider) + self.r_serv_trend.zrem(redis_avg_size_name_set, member_set[0][0]) + self.r_serv_trend.zadd(redis_avg_size_name_set, float(new_avg), paste_provider) + + + # Num + # if set not full or provider already present + if self.r_serv_trend.zcard(redis_providers_name_set) < self.MAX_SET_CARDINALITY or self.r_serv_trend.zscore(redis_providers_name_set, paste_provider) != "nil": + self.r_serv_trend.zadd(redis_providers_name_set, float(num_paste), paste_provider) + else: #set at full capacity + member_set = self.r_serv_trend.zrangebyscore(redis_providers_name_set, '-inf', '+inf', withscores=True, start=0, num=1) + # Member set is a list of (value, score) pairs + if int(member_set[0][1]) < num_paste: + #remove min from set and add the new one + self.redis_logger.debug('Num - adding ' +paste_provider+ '(' +str(num_paste)+') in set and removing '+member_set[0][0]+'('+str(member_set[0][1])+')') + self.r_serv_trend.zrem(member_set[0][0]) + self.r_serv_trend.zadd(redis_providers_name_set, float(num_paste), paste_provider) if __name__ == '__main__': - # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - # Section name in bin/packages/modules.cfg - config_section = 'ModuleStats' - - # Setup the I/O queues - p = Process(config_section) - - # Sent to the logging a description of the module - publisher.info("Makes statistics about valid URL") - - # REDIS # - r_serv_trend = redis.StrictRedis( - host=p.config.get("ARDB_Trending", "host"), - port=p.config.get("ARDB_Trending", "port"), - db=p.config.get("ARDB_Trending", "db"), - decode_responses=True) - - # Endless loop getting messages from the input queue - while True: - # Get one message from the input queue - message = p.get_from_set() - - if message is None: - publisher.debug("{} queue is empty, waiting".format(config_section)) - print('sleeping') - time.sleep(20) - continue - - else: - # Do something with the message from the queue - if len(message.split(';')) > 1: - compute_most_posted(r_serv_trend, message) - else: - compute_provider_info(r_serv_trend, message) + module = ModuleStats() + module.run() diff --git a/bin/Phone.py b/bin/Phone.py index c633a199..0e2cff6a 100755 --- a/bin/Phone.py +++ b/bin/Phone.py @@ -11,72 +11,74 @@ It apply phone number regexes on paste content and warn if above a threshold. """ +################################## +# Import External packages +################################## import time import re import phonenumbers + + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from packages import Paste from pubsublogger import publisher from Helper import Process -def search_phone(message): - paste = Paste.Paste(message) - content = paste.get_p_content() +class Phone(AbstractModule): + """ + Phone module for AIL framework + """ + # regex to find phone numbers, may raise many false positives (shalt thou seek optimization, upgrading is required) - reg_phone = re.compile(r'(\+\d{1,4}(\(\d\))?\d?|0\d?)(\d{6,8}|([-/\. ]{1}\d{2,3}){3,4})') - reg_phone = re.compile(r'(\+\d{1,4}(\(\d\))?\d?|0\d?)(\d{6,8}|([-/\. ]{1}\(?\d{2,4}\)?){3,4})') - # list of the regex results in the Paste, may be null - results = reg_phone.findall(content) + # reg_phone = re.compile(r'(\+\d{1,4}(\(\d\))?\d?|0\d?)(\d{6,8}|([-/\. ]{1}\d{2,3}){3,4})') + REG_PHONE = re.compile(r'(\+\d{1,4}(\(\d\))?\d?|0\d?)(\d{6,8}|([-/\. ]{1}\(?\d{2,4}\)?){3,4})') - # if the list is greater than 4, we consider the Paste may contain a list of phone numbers - if len(results) > 4: - print(results) - publisher.warning('{} contains PID (phone numbers)'.format(paste.p_name)) - msg = 'infoleak:automatic-detection="phone-number";{}'.format(message) - p.populate_set_out(msg, 'Tags') + def __init__(self): + super(Phone, self).__init__() + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 1 + + + def compute(self, message): + paste = Paste.Paste(message) + content = paste.get_p_content() + # List of the regex results in the Paste, may be null + results = self.REG_PHONE.findall(content) + + # If the list is greater than 4, we consider the Paste may contain a list of phone numbers + if len(results) > 4: + self.redis_logger.debug(results) + self.redis_logger.warning('{} contains PID (phone numbers)'.format(paste.p_name)) + + msg = 'infoleak:automatic-detection="phone-number";{}'.format(message) + self.process.populate_set_out(msg, 'Tags') + + # Send to duplicate + self.process.populate_set_out(message, 'Duplicate') + + stats = {} + for phone_number in results: + try: + x = phonenumbers.parse(phone_number, None) + country_code = x.country_code + if stats.get(country_code) is None: + stats[country_code] = 1 + else: + stats[country_code] = stats[country_code] + 1 + except: + pass + for country_code in stats: + if stats[country_code] > 4: + self.redis_logger.warning('{} contains Phone numbers with country code {}'.format(paste.p_name, country_code)) - #Send to duplicate - p.populate_set_out(message, 'Duplicate') - stats = {} - for phone_number in results: - try: - x = phonenumbers.parse(phone_number, None) - country_code = x.country_code - if stats.get(country_code) is None: - stats[country_code] = 1 - else: - stats[country_code] = stats[country_code] + 1 - except: - pass - for country_code in stats: - if stats[country_code] > 4: - publisher.warning('{} contains Phone numbers with country code {}'.format(paste.p_name, country_code)) if __name__ == '__main__': - # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - - # Section name in bin/packages/modules.cfg - config_section = 'Phone' - - # Setup the I/O queues - p = Process(config_section) - - # Sent to the logging a description of the module - publisher.info("Run Phone module") - - # Endless loop getting messages from the input queue - while True: - # Get one message from the input queue - message = p.get_from_set() - if message is None: - publisher.debug("{} queue is empty, waiting".format(config_section)) - time.sleep(1) - continue - - # Do something with the message from the queue - search_phone(message) + + module = Phone() + module.run() diff --git a/bin/TermTrackerMod.py b/bin/TermTrackerMod.py index f3b333fc..cd32e961 100755 --- a/bin/TermTrackerMod.py +++ b/bin/TermTrackerMod.py @@ -5,30 +5,26 @@ The TermTracker Module =================== """ + +################################## +# Import External packages +################################## import os import sys import time import signal +################################## +# Import Project packages +################################## from Helper import Process from pubsublogger import publisher - +from module.abstract_module import AbstractModule import NotificationHelper - from packages import Item from packages import Term - from lib import Tracker -full_item_url = "/object/item?id=" - -mail_body_template = "AIL Framework,\nNew occurrence for term tracked term: {}\nitem id: {}\nurl: {}{}" - -# loads tracked words -list_tracked_words = Term.get_tracked_words_list() -last_refresh_word = time.time() -set_tracked_words_list = Term.get_set_tracked_words_list() -last_refresh_set = time.time() class TimeoutException(Exception): pass @@ -36,89 +32,105 @@ def timeout_handler(signum, frame): raise TimeoutException signal.signal(signal.SIGALRM, timeout_handler) -def new_term_found(term, term_type, item_id, item_date): - uuid_list = Term.get_term_uuid_list(term, term_type) - print('new tracked term found: {} in {}'.format(term, item_id)) - for term_uuid in uuid_list: - Term.add_tracked_item(term_uuid, item_id, item_date) +class TermTrackerMod(AbstractModule): - tags_to_add = Term.get_term_tags(term_uuid) - for tag in tags_to_add: - msg = '{};{}'.format(tag, item_id) - p.populate_set_out(msg, 'Tags') + mail_body_template = "AIL Framework,\nNew occurrence for term tracked term: {}\nitem id: {}\nurl: {}{}" - mail_to_notify = Term.get_term_mails(term_uuid) - if mail_to_notify: - mail_subject = Tracker.get_email_subject(term_uuid) - mail_body = mail_body_template.format(term, item_id, full_item_url, item_id) - for mail in mail_to_notify: - NotificationHelper.sendEmailNotification(mail, mail_subject, mail_body) + """ + TermTrackerMod module for AIL framework + """ + def __init__(self): + super(TermTrackerMod, self).__init__() + + self.pending_seconds = 5 + + self.max_execution_time = self.process.config.getint('TermTrackerMod', "max_execution_time") + + self.full_item_url = self.process.config.get("Notifications", "ail_domain") + "/object/item?id=" + + # loads tracked words + self.list_tracked_words = Term.get_tracked_words_list() + self.last_refresh_word = time.time() + self.set_tracked_words_list = Term.get_set_tracked_words_list() + self.last_refresh_set = time.time() + + # Send module state to logs + self.redis_logger.info("Module %s initialized"%(self._module_name())) -if __name__ == "__main__": + def compute(self, item_id): + # Cast message as Item + item_date = Item.get_item_date(item_id) + item_content = Item.get_item_content(item_id) - publisher.port = 6380 - publisher.channel = "Script" - publisher.info("Script TermTrackerMod started") + signal.alarm(self.max_execution_time) - config_section = 'TermTrackerMod' - p = Process(config_section) - max_execution_time = p.config.getint(config_section, "max_execution_time") - - full_item_url = p.config.get("Notifications", "ail_domain") + full_item_url - - while True: - - item_id = p.get_from_set() - - if item_id is not None: - - item_date = Item.get_item_date(item_id) - item_content = Item.get_item_content(item_id) - - signal.alarm(max_execution_time) - try: - dict_words_freq = Term.get_text_word_frequency(item_content) - except TimeoutException: - print ("{0} processing timeout".format(item_id)) - continue - else: - signal.alarm(0) + dict_words_freq = None + try: + dict_words_freq = Term.get_text_word_frequency(item_content) + except TimeoutException: + self.redis_logger.warning("{0} processing timeout".format(item_id)) + else: + signal.alarm(0) + if dict_words_freq: # create token statistics #for word in dict_words_freq: # Term.create_token_statistics(item_date, word, dict_words_freq[word]) # check solo words - for word in list_tracked_words: + for word in self.list_tracked_words: if word in dict_words_freq: - new_term_found(word, 'word', item_id, item_date) + self.new_term_found(word, 'word', item_id, item_date) - # check words set - for elem in set_tracked_words_list: - list_words = elem[0] - nb_words_threshold = elem[1] - word_set = elem[2] - nb_uniq_word = 0 + # check words set + for elem in self.set_tracked_words_list: + list_words = elem[0] + nb_words_threshold = elem[1] + word_set = elem[2] + nb_uniq_word = 0 - for word in list_words: - if word in dict_words_freq: - nb_uniq_word += 1 - if nb_uniq_word >= nb_words_threshold: - new_term_found(word_set, 'set', item_id, item_date) + for word in list_words: + if word in dict_words_freq: + nb_uniq_word += 1 + if nb_uniq_word >= nb_words_threshold: + self.new_term_found(word_set, 'set', item_id, item_date) - else: - time.sleep(5) + # refresh Tracked term + if self.last_refresh_word < Term.get_tracked_term_last_updated_by_type('word'): + self.list_tracked_words = Term.get_tracked_words_list() + self.last_refresh_word = time.time() + self.redis_logger.debug('Tracked word refreshed') + + if self.last_refresh_set < Term.get_tracked_term_last_updated_by_type('set'): + self.set_tracked_words_list = Term.get_set_tracked_words_list() + self.last_refresh_set = time.time() + self.redis_logger.debug('Tracked set refreshed') - # refresh Tracked term - if last_refresh_word < Term.get_tracked_term_last_updated_by_type('word'): - list_tracked_words = Term.get_tracked_words_list() - last_refresh_word = time.time() - print('Tracked word refreshed') + def new_term_found(self, term, term_type, item_id, item_date): + uuid_list = Term.get_term_uuid_list(term, term_type) + self.redis_logger.info('new tracked term found: {} in {}'.format(term, item_id)) - if last_refresh_set < Term.get_tracked_term_last_updated_by_type('set'): - set_tracked_words_list = Term.get_set_tracked_words_list() - last_refresh_set = time.time() - print('Tracked set refreshed') + for term_uuid in uuid_list: + Term.add_tracked_item(term_uuid, item_id, item_date) + + tags_to_add = Term.get_term_tags(term_uuid) + for tag in tags_to_add: + msg = '{};{}'.format(tag, item_id) + self.process.populate_set_out(msg, 'Tags') + + mail_to_notify = Term.get_term_mails(term_uuid) + if mail_to_notify: + mail_subject = Tracker.get_email_subject(term_uuid) + mail_body = TermTrackerMod.mail_body_template.format(term, item_id, self.full_item_url, item_id) + for mail in mail_to_notify: + self.redis_logger.debug('Send Mail {}'.format(mail_subject)) + NotificationHelper.sendEmailNotification(mail, mail_subject, mail_body) + + +if __name__ == '__main__': + + module = TermTrackerMod() + module.run() diff --git a/bin/Web.py b/bin/Web.py index 68e37c25..9c362580 100755 --- a/bin/Web.py +++ b/bin/Web.py @@ -9,152 +9,186 @@ This module tries to parse URLs and warns if some defined contry code are presen """ +################################## +# Import External packages +################################## import redis import pprint import time import os import dns.exception -from packages import Paste -from packages import lib_refine -from pubsublogger import publisher from pyfaup.faup import Faup import re - # Country and ASN lookup from cymru.ip2asn.dns import DNSClient as ip2asn import socket import pycountry import ipaddress +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule +from packages import Paste +from packages import lib_refine +from pubsublogger import publisher from Helper import Process -# Used to prevent concat with empty fields due to url parsing -def avoidNone(a_string): - if a_string is None: - return "" - else: - return a_string -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Script" - - config_section = 'Web' - - p = Process(config_section) - - # REDIS # - r_serv2 = redis.StrictRedis( - host=p.config.get("Redis_Cache", "host"), - port=p.config.getint("Redis_Cache", "port"), - db=p.config.getint("Redis_Cache", "db"), - decode_responses=True) - - # Protocol file path - protocolsfile_path = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "protocolsfile")) - - # Country to log as critical - cc_critical = p.config.get("Url", "cc_critical") - - # FUNCTIONS # - publisher.info("Script URL Started") - - message = p.get_from_set() - prec_filename = None - faup = Faup() - - # Get all uri from protocolsfile (Used for Curve) - uri_scheme = "" - with open(protocolsfile_path, 'r') as scheme_file: - for scheme in scheme_file: - uri_scheme += scheme[:-1]+"|" - uri_scheme = uri_scheme[:-1] - - url_regex = "("+uri_scheme+")\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*" - - while True: - if message is not None: - filename, score = message.split() - - if prec_filename is None or filename != prec_filename: - domains_list = [] - PST = Paste.Paste(filename) - client = ip2asn() - for x in PST.get_regex(url_regex): - matching_url = re.search(url_regex, PST.get_p_content()) - url = matching_url.group(0) - - to_send = "{} {} {}".format(url, PST._get_p_date(), filename) - p.populate_set_out(to_send, 'Url') - - faup.decode(url) - domain = faup.get_domain() - subdomain = faup.get_subdomain() - - publisher.debug('{} Published'.format(url)) - - if subdomain is not None: - ## TODO: # FIXME: remove me - try: - subdomain = subdomain.decode() - except: - pass - - if domain is not None: - ## TODO: # FIXME: remove me - try: - domain = domain.decode() - except: - pass - domains_list.append(domain) - - hostl = avoidNone(subdomain) + avoidNone(domain) - - try: - socket.setdefaulttimeout(1) - ip = socket.gethostbyname(hostl) - except: - # If the resolver is not giving any IPv4 address, - # ASN/CC lookup is skip. - continue - - try: - l = client.lookup(ip, qType='IP') - - except ipaddress.AddressValueError: - continue - cc = getattr(l, 'cc') - asn = '' - if getattr(l, 'asn') is not None: - asn = getattr(l, 'asn')[2:] #remobe b' - - # EU is not an official ISO 3166 code (but used by RIPE - # IP allocation) - if cc is not None and cc != "EU": - print(hostl, asn, cc, \ - pycountry.countries.get(alpha_2=cc).name) - if cc == cc_critical: - to_print = 'Url;{};{};{};Detected {} {}'.format( - PST.p_source, PST.p_date, PST.p_name, - hostl, cc) - #publisher.warning(to_print) - print(to_print) - else: - print(hostl, asn, cc) - - A_values = lib_refine.checking_A_record(r_serv2, - domains_list) - - if A_values[0] >= 1: - pprint.pprint(A_values) - publisher.info('Url;{};{};{};Checked {} URL;{}'.format( - PST.p_source, PST.p_date, PST.p_name, A_values[0], PST.p_rel_path)) - prec_filename = filename +class Web(AbstractModule): + """ + Web module for AIL framework + """ + # Used to prevent concat with empty fields due to url parsing + def avoidNone(self, a_string): + if a_string is None: + return "" else: - publisher.debug("Script url is Idling 10s") - print('Sleeping') - time.sleep(10) + return a_string - message = p.get_from_set() + def __init__(self): + """ + Init Web + """ + super(Web, self).__init__() + + # REDIS Cache + self.r_serv2 = redis.StrictRedis( + host=self.process.config.get("Redis_Cache", "host"), + port=self.process.config.getint("Redis_Cache", "port"), + db=self.process.config.getint("Redis_Cache", "db"), + decode_responses=True) + + # Country to log as critical + self.cc_critical = self.process.config.get("Url", "cc_critical") + + # FUNCTIONS # + self.redis_logger.info("Script URL subscribed to channel web_categ") + + # FIXME For retro compatibility + self.channel = 'web_categ' + + self.faup = Faup() + + # Protocol file path + protocolsfile_path = os.path.join(os.environ['AIL_HOME'], + self.process.config.get("Directories", "protocolsfile")) + # Get all uri from protocolsfile (Used for Curve) + uri_scheme = "" + with open(protocolsfile_path, 'r') as scheme_file: + for scheme in scheme_file: + uri_scheme += scheme[:-1]+"|" + uri_scheme = uri_scheme[:-1] + + self.url_regex = "((?i:"+uri_scheme + \ + ")\://(?:[a-zA-Z0-9\.\-]+(?:\:[a-zA-Z0-9\.&%\$\-]+)*@)*(?:(?:25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(?:25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(?:25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(?:25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|(?:[a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(?:com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(?:\:[0-9]+)*(?:/(?:$|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*)" + + self.prec_filename = None + + # Send module state to logs + self.redis_logger.info("Module %s initialized" % (self.module_name)) + + def compute(self, message): + """ + Search for Web links from given message + """ + # Extract item + filename, score = message.split() + + if self.prec_filename is None or filename != self.prec_filename: + domains_list = set() + PST = Paste.Paste(filename) + client = ip2asn() + + detected_urls = PST.get_regex(self.url_regex) + if len(detected_urls) > 0: + to_print = 'Web;{};{};{};'.format( + PST.p_source, PST.p_date, PST.p_name) + self.redis_logger.info('{}Detected {} URL;{}'.format( + to_print, len(detected_urls), PST.p_rel_path)) + + for url in detected_urls: + self.redis_logger.debug("match regex: %s" % (url)) + + # self.redis_logger.debug("match regex search: %s"%(url)) + + to_send = "{} {} {}".format(url, PST._get_p_date(), filename) + self.process.populate_set_out(to_send, 'Url') + self.redis_logger.debug("url_parsed: %s" % (to_send)) + + self.faup.decode(url) + domain = self.faup.get_domain() + subdomain = self.faup.get_subdomain() + + self.redis_logger.debug('{} Published'.format(url)) + + if subdomain is not None: + # TODO: # FIXME: remove me + try: + subdomain = subdomain.decode() + except: + pass + + if domain is not None: + # TODO: # FIXME: remove me + try: + domain = domain.decode() + except: + pass + domains_list.add(domain) + + hostl = self.avoidNone(subdomain) + self.avoidNone(domain) + + try: + socket.setdefaulttimeout(1) + ip = socket.gethostbyname(hostl) + # If the resolver is not giving any IPv4 address, + # ASN/CC lookup is skip. + l = client.lookup(ip, qType='IP') + except ipaddress.AddressValueError: + self.redis_logger.error( + 'ASN/CC lookup failed for IP {}'.format(ip)) + continue + except: + self.redis_logger.error( + 'Resolver IPv4 address failed for host {}'.format(hostl)) + continue + + cc = getattr(l, 'cc') + asn = '' + if getattr(l, 'asn') is not None: + asn = getattr(l, 'asn')[2:] # remobe b' + + # EU is not an official ISO 3166 code (but used by RIPE + # IP allocation) + if cc is not None and cc != "EU": + self.redis_logger.debug('{};{};{};{}'.format(hostl, asn, cc, + pycountry.countries.get(alpha_2=cc).name)) + if cc == self.cc_critical: + to_print = 'Url;{};{};{};Detected {} {}'.format( + PST.p_source, PST.p_date, PST.p_name, + hostl, cc) + self.redis_logger.info(to_print) + else: + self.redis_logger.debug('{};{};{}'.format(hostl, asn, cc)) + + A_values = lib_refine.checking_A_record(self.r_serv2, + domains_list) + + if A_values[0] >= 1: + PST.__setattr__(self.channel, A_values) + PST.save_attribute_redis(self.channel, (A_values[0], + list(A_values[1]))) + + pprint.pprint(A_values) + # self.redis_logger.info('Url;{};{};{};Checked {} URL;{}'.format( + # PST.p_source, PST.p_date, PST.p_name, A_values[0], PST.p_rel_path)) + + self.prec_filename = filename + + +if __name__ == '__main__': + + module = Web() + module.run() diff --git a/bin/WebStats.py b/bin/WebStats.py index 10aba917..3a7cc472 100755 --- a/bin/WebStats.py +++ b/bin/WebStats.py @@ -10,182 +10,198 @@ It consider the TLD, Domain and protocol. """ +################################## +# Import External packages +################################## import time import datetime import redis import os -from packages import lib_words -from packages.Date import Date from pubsublogger import publisher -from Helper import Process from pyfaup.faup import Faup -# Config Var -threshold_total_sum = 200 # Above this value, a keyword is eligible for a progression -threshold_increase = 1.0 # The percentage representing the keyword occurence since num_day_to_look -max_set_cardinality = 10 # The cardinality of the progression set -num_day_to_look = 5 # the detection of the progression start num_day_to_look in the past -def analyse(server, field_name, date, url_parsed): - field = url_parsed[field_name] - if field is not None: - try: # faup version - field = field.decode() - except: - pass - server.hincrby(field, date, 1) - if field_name == "domain": #save domain in a set for the monthly plot - domain_set_name = "domain_set_" + date[0:6] - server.sadd(domain_set_name, field) - print("added in " + domain_set_name +": "+ field) - -def get_date_range(num_day): - curr_date = datetime.date.today() - date = Date(str(curr_date.year)+str(curr_date.month).zfill(2)+str(curr_date.day).zfill(2)) - date_list = [] - - for i in range(0, num_day+1): - date_list.append(date.substract_day(i)) - return date_list - -# Compute the progression for one keyword -def compute_progression_word(server, num_day, keyword): - date_range = get_date_range(num_day) - # check if this keyword is eligible for progression - keyword_total_sum = 0 - value_list = [] - for date in date_range: # get value up to date_range - curr_value = server.hget(keyword, date) - value_list.append(int(curr_value if curr_value is not None else 0)) - keyword_total_sum += int(curr_value) if curr_value is not None else 0 - oldest_value = value_list[-1] if value_list[-1] != 0 else 1 #Avoid zero division - - # The progression is based on the ratio: value[i] / value[i-1] - keyword_increase = 0 - value_list_reversed = value_list[:] - value_list_reversed.reverse() - for i in range(1, len(value_list_reversed)): - divisor = value_list_reversed[i-1] if value_list_reversed[i-1] != 0 else 1 - keyword_increase += value_list_reversed[i] / divisor - - return (keyword_increase, keyword_total_sum) +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule +from packages import lib_words +from packages.Date import Date +from Helper import Process -''' - recompute the set top_progression zset - - Compute the current field progression - - re-compute the current progression for each first 2*max_set_cardinality fields in the top_progression_zset -''' -def compute_progression(server, field_name, num_day, url_parsed): - redis_progression_name_set = "z_top_progression_"+field_name +class WebStats(AbstractModule): + """ + WebStats module for AIL framework + """ - keyword = url_parsed[field_name] - if keyword is not None: + # Config Var + THRESHOLD_TOTAL_SUM = 200 # Above this value, a keyword is eligible for a progression + THRESHOLD_INCREASE = 1.0 # The percentage representing the keyword occurence since num_day_to_look + MAX_SET_CARDINALITY = 10 # The cardinality of the progression set + NUM_DAY_TO_LOOK = 5 # the detection of the progression start num_day_to_look in the past - #compute the progression of the current word - keyword_increase, keyword_total_sum = compute_progression_word(server, num_day, keyword) - #re-compute the progression of 2*max_set_cardinality - current_top = server.zrevrangebyscore(redis_progression_name_set, '+inf', '-inf', withscores=True, start=0, num=2*max_set_cardinality) - for word, value in current_top: - word_inc, word_tot_sum = compute_progression_word(server, num_day, word) - server.zrem(redis_progression_name_set, word) - if (word_tot_sum > threshold_total_sum) and (word_inc > threshold_increase): - server.zadd(redis_progression_name_set, float(word_inc), word) + def __init__(self): + super(WebStats, self).__init__() - # filter before adding - if (keyword_total_sum > threshold_total_sum) and (keyword_increase > threshold_increase): - server.zadd(redis_progression_name_set, float(keyword_increase), keyword) + # Send module state to logs + self.redis_logger.info("Module %s initialized"%(self.module_name)) + # Sent to the logging a description of the module + self.redis_logger.info("Makes statistics about valid URL") + self.pending_seconds = 5*60 + + # REDIS # + self.r_serv_trend = redis.StrictRedis( + host=self.process.config.get("ARDB_Trending", "host"), + port=self.process.config.get("ARDB_Trending", "port"), + db=self.process.config.get("ARDB_Trending", "db"), + decode_responses=True) + + # FILE CURVE SECTION # + self.csv_path_proto = os.path.join(os.environ['AIL_HOME'], + self.process.config.get("Directories", "protocolstrending_csv")) + self.protocolsfile_path = os.path.join(os.environ['AIL_HOME'], + self.process.config.get("Directories", "protocolsfile")) + + self.csv_path_tld = os.path.join(os.environ['AIL_HOME'], + self.process.config.get("Directories", "tldstrending_csv")) + self.tldsfile_path = os.path.join(os.environ['AIL_HOME'], + self.process.config.get("Directories", "tldsfile")) + + self.csv_path_domain = os.path.join(os.environ['AIL_HOME'], + self.process.config.get("Directories", "domainstrending_csv")) + + self.faup = Faup() + self.generate_new_graph = False + + + def computeNone(self): + if self.generate_new_graph: + self.generate_new_graph = False + + today = datetime.date.today() + year = today.year + month = today.month + + self.redis_logger.debug('Building protocol graph') + lib_words.create_curve_with_word_file(self.r_serv_trend, csv_path_proto, + protocolsfile_path, year, + month) + + self.redis_logger.debug('Building tld graph') + lib_words.create_curve_with_word_file(self.r_serv_trend, csv_path_tld, + tldsfile_path, year, + month) + + self.redis_logger.debug('Building domain graph') + lib_words.create_curve_from_redis_set(self.r_serv_trend, csv_path_domain, + "domain", year, + month) + self.redis_logger.debug('end building') + + + def compute(self, message): + self.generate_new_graph = True + + # Do something with the message from the queue + url, date, path = message.split() + self.faup.decode(url) + url_parsed = self.faup.get() + + # Scheme analysis + self.analyse('scheme', date, url_parsed) + # Tld analysis + self.analyse('tld', date, url_parsed) + # Domain analysis + self.analyse('domain', date, url_parsed) + + self.compute_progression('scheme', self.NUM_DAY_TO_LOOK, url_parsed) + self.compute_progression('tld', self.NUM_DAY_TO_LOOK, url_parsed) + self.compute_progression('domain', self.NUM_DAY_TO_LOOK, url_parsed) + + + def analyse(self, field_name, date, url_parsed): + field = url_parsed[field_name] + + if field is not None: + try: # faup version + field = field.decode() + except: + pass + + self.r_serv_trend.hincrby(field, date, 1) + + if field_name == "domain": #save domain in a set for the monthly plot + domain_set_name = "domain_set_" + date[0:6] + self.r_serv_trend.sadd(domain_set_name, field) + self.redis_logger.debug("added in " + domain_set_name +": "+ field) + + + def get_date_range(self, num_day): + curr_date = datetime.date.today() + date = Date(str(curr_date.year)+str(curr_date.month).zfill(2)+str(curr_date.day).zfill(2)) + date_list = [] + + for i in range(0, num_day+1): + date_list.append(date.substract_day(i)) + return date_list + + + def compute_progression_word(self, num_day, keyword): + """ + Compute the progression for one keyword + """ + date_range = self.get_date_range(num_day) + # check if this keyword is eligible for progression + keyword_total_sum = 0 + value_list = [] + for date in date_range: # get value up to date_range + curr_value = self.r_serv_trend.hget(keyword, date) + value_list.append(int(curr_value if curr_value is not None else 0)) + keyword_total_sum += int(curr_value) if curr_value is not None else 0 + oldest_value = value_list[-1] if value_list[-1] != 0 else 1 #Avoid zero division + + # The progression is based on the ratio: value[i] / value[i-1] + keyword_increase = 0 + value_list_reversed = value_list[:] + value_list_reversed.reverse() + for i in range(1, len(value_list_reversed)): + divisor = value_list_reversed[i-1] if value_list_reversed[i-1] != 0 else 1 + keyword_increase += value_list_reversed[i] / divisor + + return (keyword_increase, keyword_total_sum) + + + def compute_progression(self, field_name, num_day, url_parsed): + """ + recompute the set top_progression zset + - Compute the current field progression + - re-compute the current progression for each first 2*self.MAX_SET_CARDINALITY fields in the top_progression_zset + """ + redis_progression_name_set = "z_top_progression_"+field_name + + keyword = url_parsed[field_name] + if keyword is not None: + + #compute the progression of the current word + keyword_increase, keyword_total_sum = self.compute_progression_word(num_day, keyword) + + #re-compute the progression of 2*self.MAX_SET_CARDINALITY + current_top = self.r_serv_trend.zrevrangebyscore(redis_progression_name_set, '+inf', '-inf', withscores=True, start=0, num=2*self.MAX_SET_CARDINALITY) + for word, value in current_top: + word_inc, word_tot_sum = self.compute_progression_word(num_day, word) + self.r_serv_trend.zrem(redis_progression_name_set, word) + if (word_tot_sum > self.THRESHOLD_TOTAL_SUM) and (word_inc > self.THRESHOLD_INCREASE): + self.r_serv_trend.zadd(redis_progression_name_set, float(word_inc), word) + + # filter before adding + if (keyword_total_sum > self.THRESHOLD_TOTAL_SUM) and (keyword_increase > self.THRESHOLD_INCREASE): + self.r_serv_trend.zadd(redis_progression_name_set, float(keyword_increase), keyword) if __name__ == '__main__': - # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - - # Section name in bin/packages/modules.cfg - config_section = 'WebStats' - - # Setup the I/O queues - p = Process(config_section) - - # Sent to the logging a description of the module - publisher.info("Makes statistics about valid URL") - - # REDIS # - r_serv_trend = redis.StrictRedis( - host=p.config.get("ARDB_Trending", "host"), - port=p.config.get("ARDB_Trending", "port"), - db=p.config.get("ARDB_Trending", "db"), - decode_responses=True) - - # FILE CURVE SECTION # - csv_path_proto = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "protocolstrending_csv")) - protocolsfile_path = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "protocolsfile")) - - csv_path_tld = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "tldstrending_csv")) - tldsfile_path = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "tldsfile")) - - csv_path_domain = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "domainstrending_csv")) - - faup = Faup() - generate_new_graph = False - # Endless loop getting messages from the input queue - while True: - # Get one message from the input queue - message = p.get_from_set() - - if message is None: - if generate_new_graph: - generate_new_graph = False - today = datetime.date.today() - year = today.year - month = today.month - - print('Building protocol graph') - lib_words.create_curve_with_word_file(r_serv_trend, csv_path_proto, - protocolsfile_path, year, - month) - - print('Building tld graph') - lib_words.create_curve_with_word_file(r_serv_trend, csv_path_tld, - tldsfile_path, year, - month) - - print('Building domain graph') - lib_words.create_curve_from_redis_set(r_serv_trend, csv_path_domain, - "domain", year, - month) - print('end building') - - - publisher.debug("{} queue is empty, waiting".format(config_section)) - print('sleeping') - time.sleep(5*60) - continue - - else: - generate_new_graph = True - # Do something with the message from the queue - url, date, path = message.split() - faup.decode(url) - url_parsed = faup.get() - - # Scheme analysis - analyse(r_serv_trend, 'scheme', date, url_parsed) - # Tld analysis - analyse(r_serv_trend, 'tld', date, url_parsed) - # Domain analysis - analyse(r_serv_trend, 'domain', date, url_parsed) - - compute_progression(r_serv_trend, 'scheme', num_day_to_look, url_parsed) - compute_progression(r_serv_trend, 'tld', num_day_to_look, url_parsed) - compute_progression(r_serv_trend, 'domain', num_day_to_look, url_parsed) + + module = WebStats() + module.run() diff --git a/bin/module/__init__.py b/bin/module/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/bin/module/abstract_module.py b/bin/module/abstract_module.py new file mode 100644 index 00000000..aafe28a6 --- /dev/null +++ b/bin/module/abstract_module.py @@ -0,0 +1,98 @@ +# coding: utf-8 +""" +Base Class for AIL Modules +""" + +################################## +# Import External packages +################################## +from abc import ABC, abstractmethod +import time + +################################## +# Import Project packages +################################## +from pubsublogger import publisher +from Helper import Process + + +class AbstractModule(ABC): + """ + Abstract Module class + """ + + def __init__(self, module_name=None, queue_name=None): + """ + Init Module + module_name: str; set the module name if different from the instance ClassName + """ + # Module name if provided else instance className + self.module_name = module_name if module_name else self._module_name() + + # Module name if provided else instance className + self.queue_name = queue_name if queue_name else self._module_name() + + # Init Redis Logger + self.redis_logger = publisher + # Port of the redis instance used by pubsublogger + self.redis_logger.port = 6380 + # Channel name to publish logs + self.redis_logger.channel = 'Script' + # TODO modify generic channel Script to a namespaced channel like: + # publish module logs to script: channel + # self.redis_logger.channel = 'script:%s'%(self.module_name) + + # Run module endlessly + self.proceed = True + + # Waiting time in secondes between two proccessed messages + self.pending_seconds = 10 + + # Setup the I/O queues + self.process = Process(self.queue_name) + + + def run(self): + """ + Run Module endless process + """ + + # Endless loop processing messages from the input queue + while self.proceed: + # Get one message (paste) from the QueueIn (copy of Redis_Global publish) + message = self.process.get_from_set() + + if message is None: + self.computeNone() + # Wait before next process + self.redis_logger.debug('%s, waiting for new message, Idling %ds'%(self.module_name, self.pending_seconds)) + time.sleep(self.pending_seconds) + continue + + try: + # Module processing with the message from the queue + self.compute(message) + except Exception as err: + self.redis_logger.error("Error in module %s: %s"%(self.module_name, err)) + + + def _module_name(self): + """ + Returns the instance class name (ie. the Module Name) + """ + return self.__class__.__name__ + + + @abstractmethod + def compute(self, message): + """ + Main method of the Module to implement + """ + pass + + + def computeNone(self): + """ + Method of the Module when there is no message + """ + pass diff --git a/bin/template.py b/bin/template.py index f311d439..0e1a0a8f 100755 --- a/bin/template.py +++ b/bin/template.py @@ -1,45 +1,57 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* """ - Template for new modules +The Template Module +====================== + +This module is a template for Template for new modules + """ +################################## +# Import External packages +################################## import time from pubsublogger import publisher + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from Helper import Process -def do_something(message): - return None +class Template(AbstractModule): + """ + Template module for AIL framework + """ + + def __init__(self): + super(Template, self).__init__() + + # Send module state to logs + self.redis_logger.info("Module %s initialized"%(self.module_name)) + + # Pending time between two computation in seconds + self.pending_seconds = 10 + + + def computeNone(self): + """ + Compute when no message in queue + """ + self.redis_logger.debug("No message in queue") + + + def compute(self, message): + """ + Compute a message in queue + """ + self.redis_logger.debug("Compute message in queue") + if __name__ == '__main__': - # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - - # Section name in bin/packages/modules.cfg - config_section = '
' - - # Setup the I/O queues - p = Process(config_section) - - # Sent to the logging a description of the module - publisher.info("") - - # Endless loop getting messages from the input queue - while True: - # Get one message from the input queue - message = p.get_from_set() - if message is None: - publisher.debug("{} queue is empty, waiting".format(config_section)) - time.sleep(1) - continue - - # Do something with the message from the queue - something_has_been_done = do_something(message) - - # (Optional) Send that thing to the next queue - p.populate_set_out(something_has_been_done) + + module = Template() + module.run()