From abfe13436bf3b8f173d82875ff4ecd7b723d4269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Fri, 29 Aug 2014 19:37:56 +0200 Subject: [PATCH] Big refactoring, make the queues more flexible --- bin/{ZMQ_Sub_Attributes.py => Attributes.py} | 22 +-- bin/{ZMQ_PubSub_Categ.py => Categ.py} | 31 ++- ...{ZMQ_Sub_CreditCards.py => CreditCards.py} | 22 +-- bin/{ZMQ_Sub_Curve.py => Curve.py} | 35 ++-- bin/{ZMQ_Sub_Duplicate.py => Duplicates.py} | 23 +-- bin/{ZMQ_Feed.py => Global.py} | 36 ++-- bin/Helper.py | 180 ++++++++++++------ bin/{ZMQ_Sub_Indexer.py => Indexer.py} | 27 +-- bin/{ZMQ_PubSub_Lines.py => Lines.py} | 44 ++--- bin/{ZMQ_Sub_Mails.py => Mail.py} | 29 ++- bin/{ZMQ_Sub_Onion.py => Onion.py} | 28 ++- bin/QueueIn.py | 24 +++ bin/QueueOut.py | 24 +++ bin/{ZMQ_PubSub_Tokenize.py => Tokenize.py} | 28 +-- bin/{ZMQ_Sub_Urls.py => Web.py} | 37 ++-- bin/ZMQ_Feed_Q.py | 37 ---- bin/ZMQ_PubSub_Categ_Q.py | 35 ---- bin/ZMQ_PubSub_Lines_Q.py | 34 ---- bin/ZMQ_PubSub_Tokenize_Q.py | 35 ---- bin/ZMQ_Pub_Global.py | 54 ------ bin/ZMQ_Sub_Attributes_Q.py | 35 ---- bin/ZMQ_Sub_CreditCards_Q.py | 18 -- bin/ZMQ_Sub_Curve_Q.py | 35 ---- bin/ZMQ_Sub_Duplicate_Q.py | 17 -- bin/ZMQ_Sub_Indexer_Q.py | 29 --- bin/ZMQ_Sub_Mails_Q.py | 17 -- bin/ZMQ_Sub_Onion_Q.py | 34 ---- bin/ZMQ_Sub_Urls_Q.py | 18 -- bin/packages/config.cfg.sample | 18 +- bin/packages/modules.cfg | 41 ++++ bin/run_modules.py | 22 +++ files/{creditcard_categ => CreditCards} | 0 files/{mails_categ => Mail} | 0 files/{onion_categ => Onion} | 0 files/{web_categ => Web} | 0 files/javascript_categ | 6 - files/list_categ_files | 6 - files/pass_categ | 23 --- var/www/Flask_server.py | 4 +- var/www/static/js/indexjavascript.js | 38 ++-- 40 files changed, 412 insertions(+), 734 deletions(-) rename bin/{ZMQ_Sub_Attributes.py => Attributes.py} (71%) rename bin/{ZMQ_PubSub_Categ.py => Categ.py} (77%) rename bin/{ZMQ_Sub_CreditCards.py => CreditCards.py} (79%) rename bin/{ZMQ_Sub_Curve.py => Curve.py} (67%) rename bin/{ZMQ_Sub_Duplicate.py => Duplicates.py} (88%) rename bin/{ZMQ_Feed.py => Global.py} (64%) rename bin/{ZMQ_Sub_Indexer.py => Indexer.py} (73%) rename bin/{ZMQ_PubSub_Lines.py => Lines.py} (61%) rename bin/{ZMQ_Sub_Mails.py => Mail.py} (73%) rename bin/{ZMQ_Sub_Onion.py => Onion.py} (77%) create mode 100755 bin/QueueIn.py create mode 100755 bin/QueueOut.py rename bin/{ZMQ_PubSub_Tokenize.py => Tokenize.py} (57%) rename bin/{ZMQ_Sub_Urls.py => Web.py} (81%) delete mode 100755 bin/ZMQ_Feed_Q.py delete mode 100755 bin/ZMQ_PubSub_Categ_Q.py delete mode 100755 bin/ZMQ_PubSub_Lines_Q.py delete mode 100755 bin/ZMQ_PubSub_Tokenize_Q.py delete mode 100755 bin/ZMQ_Pub_Global.py delete mode 100755 bin/ZMQ_Sub_Attributes_Q.py delete mode 100755 bin/ZMQ_Sub_CreditCards_Q.py delete mode 100755 bin/ZMQ_Sub_Curve_Q.py delete mode 100755 bin/ZMQ_Sub_Duplicate_Q.py delete mode 100755 bin/ZMQ_Sub_Indexer_Q.py delete mode 100755 bin/ZMQ_Sub_Mails_Q.py delete mode 100755 bin/ZMQ_Sub_Onion_Q.py delete mode 100755 bin/ZMQ_Sub_Urls_Q.py create mode 100644 bin/packages/modules.cfg create mode 100755 bin/run_modules.py rename files/{creditcard_categ => CreditCards} (100%) rename files/{mails_categ => Mail} (100%) rename files/{onion_categ => Onion} (100%) rename files/{web_categ => Web} (100%) delete mode 100644 files/javascript_categ delete mode 100644 files/list_categ_files delete mode 100644 files/pass_categ diff --git a/bin/ZMQ_Sub_Attributes.py b/bin/Attributes.py similarity index 71% rename from bin/ZMQ_Sub_Attributes.py rename to bin/Attributes.py index ce7ddb75..4235bb4e 100755 --- a/bin/ZMQ_Sub_Attributes.py +++ b/bin/Attributes.py @@ -30,34 +30,28 @@ import time from packages import Paste from pubsublogger import publisher -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'attributes' + config_section = 'Attributes' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + p = Process(config_section) # FUNCTIONS # - publisher.info("""ZMQ Attribute is Running""") + publisher.info("Attribute is Running") while True: try: - message = h.redis_rpop() + message = p.get_from_set() if message is not None: - PST = Paste.Paste(message.split(" ", -1)[-1]) + PST = Paste.Paste(message) else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break - publisher.debug("Script Attribute is idling 10s") - time.sleep(10) + publisher.debug("Script Attribute is idling 1s") + time.sleep(1) continue # FIXME do it directly in the class diff --git a/bin/ZMQ_PubSub_Categ.py b/bin/Categ.py similarity index 77% rename from bin/ZMQ_PubSub_Categ.py rename to bin/Categ.py index 71c4c164..d9b052bd 100755 --- a/bin/ZMQ_PubSub_Categ.py +++ b/bin/Categ.py @@ -42,21 +42,15 @@ import time from pubsublogger import publisher from packages import Paste -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Words' - config_channel = 'channel_0' - subscriber_name = 'categ' + config_section = 'Categ' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - - # Publisher - pub_config_section = 'PubSub_Categ' - h.zmq_pub(pub_config_section, None) + p = Process(config_section) # SCRIPT PARSER # parser = argparse.ArgumentParser( @@ -71,11 +65,11 @@ if __name__ == "__main__": args = parser.parse_args() # FUNCTIONS # - publisher.info( - "Script Categ subscribed to channel {}".format(h.sub_channel)) + publisher.info("Script Categ started") + categories = ['CreditCards', 'Mail', 'Onion', 'Web'] tmp_dict = {} - for filename in os.listdir(args.d): + for filename in categories: bname = os.path.basename(filename) tmp_dict[bname] = [] with open(os.path.join(args.d, filename), 'r') as f: @@ -85,9 +79,9 @@ if __name__ == "__main__": prec_filename = None while True: - message = h.redis_rpop() + message = p.get_from_set() if message is not None: - channel, filename, word, score = message.split() + filename, word, score = message.split() if prec_filename is None or filename != prec_filename: PST = Paste.Paste(filename) @@ -96,17 +90,14 @@ if __name__ == "__main__": for categ, words_list in tmp_dict.items(): if word.lower() in words_list: - h.pub_channel = categ - h.zmq_pub_send('{} {} {}'.format(PST.p_path, word, score)) + msg = '{} {} {}'.format(PST.p_path, word, score) + p.populate_set_out(msg, categ) publisher.info( 'Categ;{};{};{};Detected {} "{}"'.format( PST.p_source, PST.p_date, PST.p_name, score, word)) else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Script Categ is Idling 10s") + print 'Sleeping' time.sleep(10) diff --git a/bin/ZMQ_Sub_CreditCards.py b/bin/CreditCards.py similarity index 79% rename from bin/ZMQ_Sub_CreditCards.py rename to bin/CreditCards.py index 2f21056a..37e8bf67 100755 --- a/bin/ZMQ_Sub_CreditCards.py +++ b/bin/CreditCards.py @@ -6,26 +6,27 @@ from packages import Paste from packages import lib_refine from pubsublogger import publisher -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Categ' - config_channel = 'channel_0' - subscriber_name = 'cards' + config_section = 'CreditCards' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + p = Process(config_section) # FUNCTIONS # publisher.info("Creditcard script subscribed to channel creditcard_categ") - message = h.redis_rpop() + message = p.get_from_set() prec_filename = None creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?" + # FIXME For retro compatibility + channel = 'creditcard_categ' + # mastercard_regex = "5[1-5]\d{2}([\ \-]?)\d{4}\1\d{4}\1\d{4}" # visa_regex = "4\d{3}([\ \-]?)\d{4}\1\d{4}\1\d{4}" # discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]| @@ -37,7 +38,7 @@ if __name__ == "__main__": while True: if message is not None: - channel, filename, word, score = message.split() + filename, word, score = message.split() if prec_filename is None or filename != prec_filename: creditcard_set = set([]) @@ -62,11 +63,8 @@ if __name__ == "__main__": prec_filename = filename else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Script creditcard is idling 1m") + print 'Sleeping' time.sleep(60) - message = h.redis_rpop() + message = p.get_from_set() diff --git a/bin/ZMQ_Sub_Curve.py b/bin/Curve.py similarity index 67% rename from bin/ZMQ_Sub_Curve.py rename to bin/Curve.py index df06e6ea..9226300e 100755 --- a/bin/ZMQ_Sub_Curve.py +++ b/bin/Curve.py @@ -29,38 +29,35 @@ from pubsublogger import publisher from packages import lib_words import os -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Words' - config_channel = 'channel_0' - subscriber_name = "curve" - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + config_section = 'Curve' + p = Process(config_section) # REDIS # r_serv1 = redis.StrictRedis( - host=h.config.get("Redis_Level_DB", "host"), - port=h.config.get("Redis_Level_DB", "port"), - db=h.config.get("Redis_Level_DB", "db")) + host=p.config.get("Redis_Level_DB", "host"), + port=p.config.get("Redis_Level_DB", "port"), + db=p.config.get("Redis_Level_DB", "db")) # FUNCTIONS # - publisher.info("Script Curve subscribed to {}".format(h.sub_channel)) + publisher.info("Script Curve started") # FILE CURVE SECTION # csv_path = os.path.join(os.environ['AIL_HOME'], - h.config.get("Directories", "wordtrending_csv")) + p.config.get("Directories", "wordtrending_csv")) wordfile_path = os.path.join(os.environ['AIL_HOME'], - h.config.get("Directories", "wordsfile")) + p.config.get("Directories", "wordsfile")) - message = h.redis_rpop() + message = p.get_from_set() prec_filename = None while True: if message is not None: - channel, filename, word, score = message.split() + filename, word, score = message.split() if prec_filename is None or filename != prec_filename: PST = Paste.Paste(filename) lib_words.create_curve_with_word_file( @@ -69,7 +66,6 @@ if __name__ == "__main__": prec_filename = filename prev_score = r_serv1.hget(word.lower(), PST.p_date) - print prev_score if prev_score is not None: r_serv1.hset(word.lower(), PST.p_date, int(prev_score) + int(score)) @@ -77,12 +73,7 @@ if __name__ == "__main__": r_serv1.hset(word.lower(), PST.p_date, score) else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Script Curve is Idling") - print "sleepin" + print "sleeping" time.sleep(1) - - message = h.redis_rpop() + message = p.get_from_set() diff --git a/bin/ZMQ_Sub_Duplicate.py b/bin/Duplicates.py similarity index 88% rename from bin/ZMQ_Sub_Duplicate.py rename to bin/Duplicates.py index 8ebbb8b7..ed62be0d 100755 --- a/bin/ZMQ_Sub_Duplicate.py +++ b/bin/Duplicates.py @@ -19,17 +19,15 @@ from packages import Paste from pubsublogger import publisher from pybloomfilter import BloomFilter -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'duplicate' + config_section = 'Duplicates' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + p = Process(config_section) # REDIS # # DB OBJECT & HASHS ( DISK ) @@ -38,16 +36,15 @@ if __name__ == "__main__": for year in xrange(2013, 2015): for month in xrange(0, 16): dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis( - host=h.config.get("Redis_Level_DB", "host"), port=year, + host=p.config.get("Redis_Level_DB", "host"), port=year, db=month) # FUNCTIONS # - publisher.info("""Script duplicate subscribed to channel {0}""".format( - h.config.get("PubSub_Global", "channel"))) + publisher.info("Script duplicate started") set_limit = 100 bloompath = os.path.join(os.environ['AIL_HOME'], - h.config.get("Directories", "bloomfilters")) + p.config.get("Directories", "bloomfilters")) bloop_path_set = set() while True: @@ -59,17 +56,13 @@ if __name__ == "__main__": x = time.time() - message = h.redis_rpop() + message = p.get_from_set() if message is not None: - path = message.split(" ", -1)[-1] + path = message PST = Paste.Paste(path) else: publisher.debug("Script Attribute is idling 10s") time.sleep(10) - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break continue PST._set_p_hash_kind("md5") diff --git a/bin/ZMQ_Feed.py b/bin/Global.py similarity index 64% rename from bin/ZMQ_Feed.py rename to bin/Global.py index aac05045..e3181580 100755 --- a/bin/ZMQ_Feed.py +++ b/bin/Global.py @@ -25,56 +25,44 @@ import os import time from pubsublogger import publisher -import Helper +from Helper import Process -if __name__ == "__main__": +if __name__ == '__main__': publisher.port = 6380 - publisher.channel = "Script" + publisher.channel = 'Script' - config_section = 'Feed' - config_channel = 'topicfilter' - subscriber_name = 'feed' + config_section = 'Global' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - - # Publisher - pub_config_section = "PubSub_Global" - pub_config_channel = 'channel' - h.zmq_pub(pub_config_section, pub_config_channel) + p = Process(config_section) # LOGGING # publisher.info("Feed Script started to receive & publish.") while True: - message = h.redis_rpop() + message = p.get_from_set() # Recovering the streamed message informations. if message is not None: - if len(message.split()) == 3: - topic, paste, gzip64encoded = message.split() - print paste + splitted = message.split() + if len(splitted) == 2: + paste, gzip64encoded = splitted else: # TODO Store the name of the empty paste inside a Redis-list. print "Empty Paste: not processed" publisher.debug("Empty Paste: {0} not processed".format(paste)) continue else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break print "Empty Queues: Waiting..." - time.sleep(10) + time.sleep(1) continue # Creating the full filepath filename = os.path.join(os.environ['AIL_HOME'], - h.config.get("Directories", "pastes"), paste) + p.config.get("Directories", "pastes"), paste) dirname = os.path.dirname(filename) if not os.path.exists(dirname): os.makedirs(dirname) with open(filename, 'wb') as f: f.write(base64.standard_b64decode(gzip64encoded)) - - h.zmq_pub_send(filename) + p.populate_set_out(filename) diff --git a/bin/Helper.py b/bin/Helper.py index 549e3e7d..f04cd4c7 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -15,74 +15,144 @@ import redis import ConfigParser import os import zmq +import time +import json -class Redis_Queues(object): +class PubSub(object): - def __init__(self, conf_section, conf_channel, subscriber_name): + def __init__(self): configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') - print configfile if not os.path.exists(configfile): raise Exception('Unable to find the configuration file. \ Did you set environment variables? \ Or activate the virtualenv.') self.config = ConfigParser.ConfigParser() self.config.read(configfile) - self.subscriber_name = subscriber_name + self.redis_sub = False + self.zmq_sub = False + self.subscriber = None + self.publishers = {'Redis': [], 'ZMQ': []} - self.sub_channel = self.config.get(conf_section, conf_channel) - self.sub_address = self.config.get(conf_section, 'address') - self.redis_channel = self.sub_channel + self.subscriber_name - - # Redis Queue - config_section = "Redis_Queues" - self.r_queues = redis.StrictRedis( - host=self.config.get(config_section, "host"), - port=self.config.getint(config_section, "port"), - db=self.config.getint(config_section, "db")) - - def zmq_sub(self): - context = zmq.Context() - self.sub_socket = context.socket(zmq.SUB) - self.sub_socket.connect(self.sub_address) - self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel) - - def zmq_pub(self, config_section, config_channel): - context = zmq.Context() - self.pub_socket = context.socket(zmq.PUB) - self.pub_socket.bind(self.config.get(config_section, 'adress')) - if config_channel is not None: - self.pub_channel = self.config.get(config_section, config_channel) + def setup_subscribe(self, conn_name): + if self.config.has_section(conn_name): + channel = self.config.get(conn_name, 'channel') else: - # The publishing channel is defined dynamically - self.pub_channel = None + channel = conn_name.split('_')[1] + if conn_name.startswith('Redis'): + self.redis_sub = True + r = redis.StrictRedis( + host=self.config.get('RedisPubSub', 'host'), + port=self.config.get('RedisPubSub', 'port'), + db=self.config.get('RedisPubSub', 'db')) + self.subscriber = r.pubsub() + self.subscriber.psubscribe(channel) + self.subscriber.get_message() + elif conn_name.startswith('ZMQ'): + self.zmq_sub = True + context = zmq.Context() + self.subscriber = context.socket(zmq.SUB) + self.subscriber.connect(self.config.get(conn_name, 'address')) + self.subscriber.setsockopt(zmq.SUBSCRIBE, channel) - def zmq_pub_send(self, msg): - if self.pub_channel is None: - raise Exception('A channel is reqired to send a message.') - self.pub_socket.send('{} {}'.format(self.pub_channel, msg)) - - def redis_rpop(self): - return self.r_queues.rpop(self.redis_channel) - - def redis_queue_shutdown(self, is_queue=False): - if is_queue: - flag = self.subscriber_name + '_Q' + def setup_publish(self, conn_name): + if self.config.has_section(conn_name): + channel = self.config.get(conn_name, 'channel') else: - flag = self.subscriber_name - # srem returns False if the element does not exists - return self.r_queues.srem('SHUTDOWN_FLAGS', flag) + channel = conn_name.split('_')[1] + if conn_name.startswith('Redis'): + r = redis.StrictRedis(host=self.config.get('RedisPubSub', 'host'), + port=self.config.get('RedisPubSub', 'port'), + db=self.config.get('RedisPubSub', 'db')) + self.publishers['Redis'].append((r, channel)) + elif conn_name.startswith('ZMQ'): + context = zmq.Context() + p = context.socket(zmq.PUB) + p.bind(self.config.get(conn_name, 'address')) + self.publishers['ZMQ'].append((p, channel)) - def redis_queue_subscribe(self, publisher): - self.zmq_sub() - publisher.info("Suscribed to channel {}".format(self.sub_channel)) + def publish(self, message): + m = json.loads(message) + channel_message = m.get('channel') + for p, channel in self.publishers['Redis']: + if channel_message is None or channel_message == channel: + p.publish(channel, m['message']) + for p, channel in self.publishers['ZMQ']: + if channel_message is None or channel_message == channel: + p.send('{} {}'.format(channel, m['message'])) + + def subscribe(self): + if self.redis_sub: + for msg in self.subscriber.listen(): + if msg.get('data', None) is not None: + yield msg['data'] + elif self.zmq_sub: + while True: + msg = self.subscriber.recv() + yield msg.split(' ', 1)[1] + else: + raise Exception('No subscribe function defined') + + +class Process(object): + + def __init__(self, conf_section): + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + modulesfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') + self.config = ConfigParser.ConfigParser() + self.config.read(configfile) + self.modules = ConfigParser.ConfigParser() + self.modules.read(modulesfile) + self.subscriber_name = conf_section + self.pubsub = None + if self.modules.has_section(conf_section): + self.pubsub = PubSub() + else: + raise Exception('Your process has to listen to at least one feed.') + self.r_temp = redis.StrictRedis( + host=self.config.get('RedisPubSub', 'host'), + port=self.config.get('RedisPubSub', 'port'), + db=self.config.get('RedisPubSub', 'db')) + + def populate_set_in(self): + # monoproc + src = self.modules.get(self.subscriber_name, 'subscribe') + self.pubsub.setup_subscribe(src) + for msg in self.pubsub.subscribe(): + in_set = self.subscriber_name + 'in' + self.r_temp.sadd(in_set, msg) + self.r_temp.hset('queues', self.subscriber_name, + int(self.r_temp.scard(in_set))) + + def get_from_set(self): + # multiproc + in_set = self.subscriber_name + 'in' + self.r_temp.hset('queues', self.subscriber_name, + int(self.r_temp.scard(in_set))) + return self.r_temp.spop(in_set) + + def populate_set_out(self, msg, channel=None): + # multiproc + msg = {'message': msg} + if channel is not None: + msg.update({'channel': channel}) + self.r_temp.sadd(self.subscriber_name + 'out', json.dumps(msg)) + + def publish(self): + # monoproc + if not self.modules.has_option(self.subscriber_name, 'publish'): + return False + dest = self.modules.get(self.subscriber_name, 'publish') + # We can have multiple publisher + for name in dest.split(','): + self.pubsub.setup_publish(name) while True: - msg = self.sub_socket.recv() - p = self.r_queues.pipeline() - p.sadd("queues", self.redis_channel) - p.lpush(self.redis_channel, msg) - p.execute() - if self.redis_queue_shutdown(True): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break + message = self.r_temp.spop(self.subscriber_name + 'out') + if message is None: + time.sleep(1) + continue + self.pubsub.publish(message) diff --git a/bin/ZMQ_Sub_Indexer.py b/bin/Indexer.py similarity index 73% rename from bin/ZMQ_Sub_Indexer.py rename to bin/Indexer.py index 0eb6dcce..679ac8e0 100755 --- a/bin/ZMQ_Sub_Indexer.py +++ b/bin/Indexer.py @@ -17,26 +17,21 @@ from whoosh.index import create_in, exists_in, open_dir from whoosh.fields import Schema, TEXT, ID import os -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - # Subscriber - sub_config_section = 'PubSub_Global' - sub_name = 'indexer' + config_section = 'Indexer' - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'indexer' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + p = Process(config_section) # Indexer configuration - index dir and schema setup - indexpath = h.config.get("Indexer", "path") - indexertype = h.config.get("Indexer", "type") + indexpath = os.path.join(os.environ['AIL_HOME'], + p.config.get("Indexer", "path")) + indexertype = p.config.get("Indexer", "type") if indexertype == "whoosh": schema = Schema(title=TEXT(stored=True), path=ID(stored=True, unique=True), @@ -49,18 +44,16 @@ if __name__ == "__main__": ix = open_dir(indexpath) # LOGGING # - publisher.info("""ZMQ Indexer is Running""") + publisher.info("ZMQ Indexer is Running") while True: try: - message = h.redis_rpop() + message = p.get_from_set() if message is not None: - PST = Paste.Paste(message.split(" ", -1)[-1]) + PST = Paste.Paste(message) else: - if h.redis_queue_shutdown(): - break - publisher.debug("Script Indexer is idling 10s") + publisher.debug("Script Indexer is idling 1s") time.sleep(1) continue docpath = message.split(" ", -1)[-1] diff --git a/bin/ZMQ_PubSub_Lines.py b/bin/Lines.py similarity index 61% rename from bin/ZMQ_PubSub_Lines.py rename to bin/Lines.py index f7dd6da2..81dc9483 100755 --- a/bin/ZMQ_PubSub_Lines.py +++ b/bin/Lines.py @@ -32,26 +32,19 @@ import time from packages import Paste from pubsublogger import publisher -import Helper +from Helper import Process -if __name__ == "__main__": +if __name__ == '__main__': publisher.port = 6380 - publisher.channel = "Script" + publisher.channel = 'Script' - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'line' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - - # Publisher - pub_config_section = 'PubSub_Longlines' - h.zmq_pub(pub_config_section, None) + config_section = 'Lines' + p = Process(config_section) # SCRIPT PARSER # parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information \ - Leak framework.''') + description='This script is a part of the Analysis Information \ + Leak framework.') parser.add_argument( '-max', type=int, default=500, @@ -60,24 +53,18 @@ if __name__ == "__main__": args = parser.parse_args() - channel_0 = h.config.get("PubSub_Longlines", "channel_0") - channel_1 = h.config.get("PubSub_Longlines", "channel_1") - # FUNCTIONS # tmp_string = "Lines script Subscribed to channel {} and Start to publish \ - on channel {}, {}" - publisher.info(tmp_string.format(h.sub_channel, channel_0, channel_1)) + on channel Longlines, Shortlines" + publisher.info(tmp_string) while True: try: - message = h.redis_rpop() + message = p.get_from_set() + print message if message is not None: - PST = Paste.Paste(message.split(" ", -1)[-1]) + PST = Paste.Paste(message) else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Tokeniser is idling 10s") time.sleep(10) continue @@ -89,10 +76,9 @@ if __name__ == "__main__": # FIXME Not used. PST.store.sadd("Pastes_Objects", PST.p_path) - if lines_infos[1] >= args.max: - h.pub_channel = channel_0 + if lines_infos[1] < args.max: + p.populate_set_out(PST.p_path, 'LinesShort') else: - h.pub_channel = channel_1 - h.zmq_pub_send(PST.p_path) + p.populate_set_out(PST.p_path, 'LinesLong') except IOError: print "CRC Checksum Error on : ", PST.p_path diff --git a/bin/ZMQ_Sub_Mails.py b/bin/Mail.py similarity index 73% rename from bin/ZMQ_Sub_Mails.py rename to bin/Mail.py index 33feef3c..1632b7a0 100755 --- a/bin/ZMQ_Sub_Mails.py +++ b/bin/Mail.py @@ -9,28 +9,29 @@ from packages import Paste from packages import lib_refine from pubsublogger import publisher -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Categ' - config_channel = 'channel_1' - subscriber_name = 'emails' + config_section = 'Mail' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + p = Process(config_section) # REDIS # r_serv2 = redis.StrictRedis( - host=h.config.get("Redis_Cache", "host"), - port=h.config.getint("Redis_Cache", "port"), - db=h.config.getint("Redis_Cache", "db")) + host=p.config.get("Redis_Cache", "host"), + port=p.config.getint("Redis_Cache", "port"), + db=p.config.getint("Redis_Cache", "db")) # FUNCTIONS # publisher.info("Suscribed to channel mails_categ") - message = h.redis_rpop() + # FIXME For retro compatibility + channel = 'mails_categ' + + message = p.get_from_set() prec_filename = None # Log as critical if there are more that that amout of valid emails @@ -41,7 +42,8 @@ if __name__ == "__main__": while True: try: if message is not None: - channel, filename, word, score = message.split() + print message + filename, word, score = message.split() if prec_filename is None or filename != prec_filename: PST = Paste.Paste(filename) @@ -65,14 +67,11 @@ if __name__ == "__main__": prec_filename = filename else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Script Mails is Idling 10s") + print 'Sleeping' time.sleep(10) - message = h.redis_rpop() + message = p.get_from_set() except dns.exception.Timeout: # FIXME retry! print "dns.exception.Timeout" diff --git a/bin/ZMQ_Sub_Onion.py b/bin/Onion.py similarity index 77% rename from bin/ZMQ_Sub_Onion.py rename to bin/Onion.py index 99857520..c14a5485 100755 --- a/bin/ZMQ_Sub_Onion.py +++ b/bin/Onion.py @@ -27,32 +27,34 @@ from packages import Paste from pubsublogger import publisher -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Categ' - config_channel = 'channel_2' - subscriber_name = 'tor' + config_section = 'Onion' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + p = Process(config_section) # FUNCTIONS # publisher.info("Script subscribed to channel onion_categ") + # FIXME For retro compatibility + channel = 'onion_categ' + # Getting the first message from redis. - message = h.redis_rpop() + message = p.get_from_set() prec_filename = None # Thanks to Faup project for this regex # https://github.com/stricaud/faup - url_regex = "([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|onion|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*" + url_regex = "([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.onion)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*" while True: if message is not None: - channel, filename, word, score = message.split() + print message + filename, word, score = message.split() # "For each new paste" if prec_filename is None or filename != prec_filename: @@ -64,8 +66,7 @@ if __name__ == "__main__": credential, subdomain, domain, host, tld, port, \ resource_path, query_string, f1, f2, f3, f4 = x - if f1 == "onion": - domains_list.append(domain) + domains_list.append(domain) # Saving the list of extracted onion domains. PST.__setattr__(channel, domains_list) @@ -83,11 +84,8 @@ if __name__ == "__main__": prec_filename = filename else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Script url is Idling 10s") + print 'Sleeping' time.sleep(10) - message = h.redis_rpop() + message = p.get_from_set() diff --git a/bin/QueueIn.py b/bin/QueueIn.py new file mode 100755 index 00000000..683a50ef --- /dev/null +++ b/bin/QueueIn.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + + +from pubsublogger import publisher +from Helper import Process +import argparse + + +def run(config_section): + p = Process(config_section) + p.populate_set_in() + + +if __name__ == '__main__': + publisher.port = 6380 + publisher.channel = 'Queuing' + + parser = argparse.ArgumentParser(description='Entry queue for a module.') + parser.add_argument("-c", "--config_section", type=str, + help="Config section to use in the config file.") + args = parser.parse_args() + + run(args.config_section) diff --git a/bin/QueueOut.py b/bin/QueueOut.py new file mode 100755 index 00000000..d2853274 --- /dev/null +++ b/bin/QueueOut.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + +from pubsublogger import publisher +from Helper import Process +import argparse + + +def run(config_section): + p = Process(config_section) + if not p.publish(): + print(config_section, 'has no publisher.') + + +if __name__ == '__main__': + publisher.port = 6380 + publisher.channel = 'Queuing' + + parser = argparse.ArgumentParser(description='Entry queue for a module.') + parser.add_argument("-c", "--config_section", type=str, + help="Config section to use in the config file.") + args = parser.parse_args() + + run(args.config_section) diff --git a/bin/ZMQ_PubSub_Tokenize.py b/bin/Tokenize.py similarity index 57% rename from bin/ZMQ_PubSub_Tokenize.py rename to bin/Tokenize.py index 7e1f45e1..b1a75495 100755 --- a/bin/ZMQ_PubSub_Tokenize.py +++ b/bin/Tokenize.py @@ -27,40 +27,28 @@ import time from packages import Paste from pubsublogger import publisher -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Longlines' - config_channel = 'channel_1' - subscriber_name = 'tokenize' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - - # Publisher - pub_config_section = 'PubSub_Words' - pub_config_channel = 'channel_0' - h.zmq_pub(pub_config_section, pub_config_channel) + config_section = 'Tokenize' + p = Process(config_section) # LOGGING # - publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel)) + publisher.info("Tokeniser started") while True: - message = h.redis_rpop() + message = p.get_from_set() print message if message is not None: - paste = Paste.Paste(message.split(" ", -1)[-1]) + paste = Paste.Paste(message) for word, score in paste._get_top_words().items(): if len(word) >= 4: - h.zmq_pub_send('{} {} {}'.format(paste.p_path, word, - score)) + msg = '{} {} {}'.format(paste.p_path, word, score) + p.populate_set_out(msg) else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Tokeniser is idling 10s") time.sleep(10) print "sleepin" diff --git a/bin/ZMQ_Sub_Urls.py b/bin/Web.py similarity index 81% rename from bin/ZMQ_Sub_Urls.py rename to bin/Web.py index a339a543..e7a1bc8d 100755 --- a/bin/ZMQ_Sub_Urls.py +++ b/bin/Web.py @@ -14,36 +14,32 @@ import socket import pycountry import ipaddress -import Helper +from Helper import Process if __name__ == "__main__": publisher.port = 6380 publisher.channel = "Script" - config_section = 'PubSub_Categ' - config_channel = 'channel_3' - subscriber_name = "urls" + config_section = 'Web' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - - # Publisher - pub_config_section = "PubSub_Url" - pub_config_channel = 'channel' - h.zmq_pub(pub_config_section, pub_config_channel) + p = Process(config_section) # REDIS # r_serv2 = redis.StrictRedis( - host=h.config.get("Redis_Cache", "host"), - port=h.config.getint("Redis_Cache", "port"), - db=h.config.getint("Redis_Cache", "db")) + host=p.config.get("Redis_Cache", "host"), + port=p.config.getint("Redis_Cache", "port"), + db=p.config.getint("Redis_Cache", "db")) # Country to log as critical - cc_critical = h.config.get("PubSub_Url", "cc_critical") + cc_critical = p.config.get("PubSub_Url", "cc_critical") # FUNCTIONS # publisher.info("Script URL subscribed to channel web_categ") - message = h.redis_rpop() + # FIXME For retro compatibility + channel = 'web_categ' + + message = p.get_from_set() prec_filename = None url_regex = "(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*" @@ -51,7 +47,7 @@ if __name__ == "__main__": while True: try: if message is not None: - channel, filename, word, score = message.split() + filename, word, score = message.split() if prec_filename is None or filename != prec_filename: domains_list = [] @@ -62,7 +58,7 @@ if __name__ == "__main__": port, resource_path, query_string, f1, f2, f3, \ f4 = x domains_list.append(domain) - h.zmq_pub_send(str(x)) + p.populate_set_out(x, 'Url') publisher.debug('{} Published'.format(x)) if f1 == "onion": @@ -110,13 +106,10 @@ if __name__ == "__main__": prec_filename = filename else: - if h.redis_queue_shutdown(): - print "Shutdown Flag Up: Terminating" - publisher.warning("Shutdown Flag Up: Terminating.") - break publisher.debug("Script url is Idling 10s") + print 'Sleeping' time.sleep(10) - message = h.redis_rpop() + message = p.get_from_set() except dns.exception.Timeout: print "dns.exception.Timeout", A_values diff --git a/bin/ZMQ_Feed_Q.py b/bin/ZMQ_Feed_Q.py deleted file mode 100755 index 2bcdd238..00000000 --- a/bin/ZMQ_Feed_Q.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* - -""" -The ZMQ_Feed_Q Module -===================== - -This module is the first of the ZMQ tree processing. -It's subscribe to a data stream and put the received messages -into a Redis-list waiting to be popped later by others scripts - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them. - -Requirements ------------- - -*Need running Redis instances. -*Messages from the stream should be formated as follow: - "channel_name"+" "+/path/to/the/paste.gz+" "base64_data_encoded_paste" - -""" - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.channel = "Queuing" - - config_section = 'Feed' - config_channel = 'topicfilter' - subscriber_name = 'feed' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Categ_Q.py b/bin/ZMQ_PubSub_Categ_Q.py deleted file mode 100755 index 2b836752..00000000 --- a/bin/ZMQ_PubSub_Categ_Q.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_PubSub_Categ_Q Module -============================ - -This module subscribe to a Publisher stream and put the received messages -into a Redis-list waiting to be popped later by others scripts. - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them. - -Requirements ------------- - -*Running Redis instances. -*Should register to the Publisher "ZMQ_PubSub_Tokenize" - -""" - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = 'Queuing' - - config_section = 'PubSub_Words' - config_channel = 'channel_0' - subscriber_name = 'categ' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Lines_Q.py b/bin/ZMQ_PubSub_Lines_Q.py deleted file mode 100755 index 6c1d315f..00000000 --- a/bin/ZMQ_PubSub_Lines_Q.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_PubSub_Line_Q Module -============================ - -This module subscribe to a Publisher stream and put the received messages -into a Redis-list waiting to be popped later by others scripts. - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them. - -Requirements ------------- - -*Running Redis instances. -*Should register to the Publisher "ZMQ_Feed" - -""" - -from pubsublogger import publisher - -import Helper - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = "PubSub_Global" - config_channel = 'channel' - subscriber_name = 'line' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Tokenize_Q.py b/bin/ZMQ_PubSub_Tokenize_Q.py deleted file mode 100755 index c3e92dde..00000000 --- a/bin/ZMQ_PubSub_Tokenize_Q.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_PubSub_Tokenize_Q Module -============================ - -This module subscribe to a Publisher stream and put the received messages -into a Redis-list waiting to be popped later by others scripts. - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them. - -Requirements ------------- - -*Running Redis instances. -*Should register to the Publisher "ZMQ_PubSub_Line" channel 1 - -""" - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Longlines' - config_channel = 'channel_1' - subscriber_name = 'tokenize' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Pub_Global.py b/bin/ZMQ_Pub_Global.py deleted file mode 100755 index 2e687432..00000000 --- a/bin/ZMQ_Pub_Global.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_Pub_Global Module -========================= - -This module is consuming the Redis-list created by the script ./Dir.py. -This module is as the same level of the ZMQ tree than the Module ZMQ_Feed - -Whereas the ZMQ_Feed is poping the list created in redis by ZMQ_Feed_Q which is -listening a stream, ZMQ_Pub_Global is poping the list created in redis by -./Dir.py. - -Thanks to this Module there is now two way to Feed the ZMQ tree: -*By a continuous stream ..seealso:: ZMQ_Feed Module -*Manually with this module and ./Dir.py script. - -Requirements ------------- - -*Need running Redis instances. (Redis) - -""" -import time -from pubsublogger import publisher - -import Helper - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Global" - - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'global' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - - # Publisher - pub_config_section = 'PubSub_Global' - pub_config_channel = 'channel' - h.zmq_pub(pub_config_section, pub_config_channel) - - # LOGGING # - publisher.info("Starting to publish.") - - while True: - filename = h.redis_rpop() - - if filename is not None: - h.zmq_pub_send(filename) - else: - time.sleep(10) - publisher.debug("Nothing to publish") diff --git a/bin/ZMQ_Sub_Attributes_Q.py b/bin/ZMQ_Sub_Attributes_Q.py deleted file mode 100755 index 7047c01e..00000000 --- a/bin/ZMQ_Sub_Attributes_Q.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_Sub_Attributes_Q Module -============================ - -This module subscribe to a Publisher stream and put the received messages -into a Redis-list waiting to be popped later by others scripts. - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them. - -Requirements ------------- - -*Running Redis instances. -*Should register to the Publisher "ZMQ_Feed" - -""" - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'attributes' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_CreditCards_Q.py b/bin/ZMQ_Sub_CreditCards_Q.py deleted file mode 100755 index 80992dad..00000000 --- a/bin/ZMQ_Sub_CreditCards_Q.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Categ' - config_channel = 'channel_0' - subscriber_name = 'cards' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Curve_Q.py b/bin/ZMQ_Sub_Curve_Q.py deleted file mode 100755 index dde959bf..00000000 --- a/bin/ZMQ_Sub_Curve_Q.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_Sub_Curve_Q Module -============================ - -This module subscribe to a Publisher stream and put the received messages -into a Redis-list waiting to be popped later by others scripts. - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them. - -Requirements ------------- - -*Running Redis instances. -*Should register to the Publisher "ZMQ_PubSub_Tokenize" - -""" - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Words' - config_channel = 'channel_0' - subscriber_name = 'curve' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Duplicate_Q.py b/bin/ZMQ_Sub_Duplicate_Q.py deleted file mode 100755 index e66c3ec4..00000000 --- a/bin/ZMQ_Sub_Duplicate_Q.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* - -from pubsublogger import publisher - -import Helper - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = 'Queuing' - - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'duplicate' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Indexer_Q.py b/bin/ZMQ_Sub_Indexer_Q.py deleted file mode 100755 index ec5f3418..00000000 --- a/bin/ZMQ_Sub_Indexer_Q.py +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_Sub_Indexer_Q Module -============================ - -The ZMQ_Sub_Indexer_Q module subscribes to PubSub_Global ZMQ channel -and bufferizes the data in a Redis FIFO. - -The FIFO will be then processed by the Indexer scripts (ZMQ_Sub_Indexer) -handling the indexing process of the files seen. - -""" - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Global' - config_channel = 'channel' - subscriber_name = 'indexer' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Mails_Q.py b/bin/ZMQ_Sub_Mails_Q.py deleted file mode 100755 index 1ffed251..00000000 --- a/bin/ZMQ_Sub_Mails_Q.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* - -from pubsublogger import publisher - -import Helper - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Categ' - config_channel = 'channel_1' - subscriber_name = 'emails' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Onion_Q.py b/bin/ZMQ_Sub_Onion_Q.py deleted file mode 100755 index 833edf61..00000000 --- a/bin/ZMQ_Sub_Onion_Q.py +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* -""" -The ZMQ_Sub_Onion_Q Module -============================ - -This module subscribe to a Publisher stream and put the received messages -into a Redis-list waiting to be popped later by ZMQ_Sub_Onion. - -..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put -the same Subscriber name in both of them (here "tor") - -Requirements ------------- - -*Running Redis instances. -*Should register to the Publisher "ZMQ_PubSub_Categ" - -""" -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Categ' - config_channel = 'channel_2' - subscriber_name = 'tor' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Urls_Q.py b/bin/ZMQ_Sub_Urls_Q.py deleted file mode 100755 index 346ca9ec..00000000 --- a/bin/ZMQ_Sub_Urls_Q.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* - -from pubsublogger import publisher - -import Helper - - -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Queuing" - - config_section = 'PubSub_Categ' - config_channel = 'channel_3' - subscriber_name = 'urls' - - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.redis_queue_subscribe(publisher) diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index b543ee0d..8c1bf8fd 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -18,7 +18,8 @@ db = 0 [Redis_Queues] host = localhost port = 6381 -db = 0 +db_sub = 0 +db_pub = 1 [Redis_Data_Merging] host = localhost @@ -71,3 +72,18 @@ cc_critical = DE [Indexer] type = whoosh path = indexdir + +############################################################################### + +[ZMQ_Global] +address = tcp://crf.circl.lu:5556 +channel = 102 + +[ZMQ_Url] +address = tcp://127.0.0.1:5004 +channel = urls + +[RedisPubSub] +host = localhost +port = 6381 +db = 0 diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg new file mode 100644 index 00000000..b58e25f6 --- /dev/null +++ b/bin/packages/modules.cfg @@ -0,0 +1,41 @@ +[Global] +subscribe = ZMQ_Global +publish = Redis_Global + +[Duplicates] +subscribe = Redis_Global + +[Indexer] +subscribe = Redis_Global + +[Attributes] +subscribe = Redis_Global + +[Lines] +subscribe = Redis_Global +publish = Redis_LinesShort,Redis_LinesLong + +[Tokenize] +subscribe = Redis_LinesShort +publish = Redis_Words + +[Curve] +subscribe = Redis_Words + +[Categ] +subscribe = Redis_Words +publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Web + +[CreditCards] +subscribe = Redis_CreditCards + +[Mail] +subscribe = Redis_Mail + +[Onion] +subscribe = Redis_Onion +#publish = Redis_Global + +[Web] +subscribe = Redis_Web +publish = Redis_Url,ZMQ_Url diff --git a/bin/run_modules.py b/bin/run_modules.py new file mode 100755 index 00000000..7792c82d --- /dev/null +++ b/bin/run_modules.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + + +import ConfigParser +import os +import subprocess + +if __name__ == '__main__': + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + config = ConfigParser.ConfigParser() + config.read(configfile) + + modules = config.sections() + for module in modules: + subprocess.Popen(["python", './QueueIn.py', '-c', module]) + subprocess.Popen(["python", './QueueOut.py', '-c', module]) + #subprocess.Popen(["python", './{}.py'.format(module)]) diff --git a/files/creditcard_categ b/files/CreditCards similarity index 100% rename from files/creditcard_categ rename to files/CreditCards diff --git a/files/mails_categ b/files/Mail similarity index 100% rename from files/mails_categ rename to files/Mail diff --git a/files/onion_categ b/files/Onion similarity index 100% rename from files/onion_categ rename to files/Onion diff --git a/files/web_categ b/files/Web similarity index 100% rename from files/web_categ rename to files/Web diff --git a/files/javascript_categ b/files/javascript_categ deleted file mode 100644 index 1512b681..00000000 --- a/files/javascript_categ +++ /dev/null @@ -1,6 +0,0 @@ -isFinite -isNaN -eval -var -parseFloat -javascript diff --git a/files/list_categ_files b/files/list_categ_files deleted file mode 100644 index 286fa36d..00000000 --- a/files/list_categ_files +++ /dev/null @@ -1,6 +0,0 @@ -../files/mails_categ -../files/creditcard_categ -../files/pass_categ -../files/web_categ -../files/javascript_categ -../files/onion_categ diff --git a/files/pass_categ b/files/pass_categ deleted file mode 100644 index 20d2f3f3..00000000 --- a/files/pass_categ +++ /dev/null @@ -1,23 +0,0 @@ -123456 -password -12345678 -qwerty -abc123 -123456789 -1234567 -iloveyou -adobe123 -123123 -admin -1234567890 -letmein -photoshop -1234 -monkey -shadow -sunshine -12345 -password1 -princess -azerty -trustno1 diff --git a/var/www/Flask_server.py b/var/www/Flask_server.py index 214cb22f..01d54480 100755 --- a/var/www/Flask_server.py +++ b/var/www/Flask_server.py @@ -44,7 +44,8 @@ def event_stream(): def get_queues(r): # We may want to put the llen in a pipeline to do only one query. - return [(queue, r.llen(queue)) for queue in r.smembers("queues")] + return [(queue, int(card)) for queue, card in + r.hgetall("queues").iteritems()] @app.route("/_logs") @@ -54,6 +55,7 @@ def logs(): @app.route("/_stuff", methods=['GET']) def stuff(): + print get_queues(r_serv) return jsonify(row1=get_queues(r_serv)) diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index 1622f978..1a8fbaa2 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -96,7 +96,7 @@ function create_log_table(obj_json) { tr.appendChild(msage); if (tr.className == document.getElementById("checkbox_log_info").value && document.getElementById("checkbox_log_info").checked == true) { - tableBody.appendChild(tr); + tableBody.appendChild(tr); } if (tr.className == document.getElementById("checkbox_log_warning").value && document.getElementById("checkbox_log_warning").checked == true) { tableBody.appendChild(tr); @@ -156,16 +156,16 @@ $(document).ready(function () { var curves_labels = []; var curves_labels2 = []; var x = new Date(); - + for (i = 0; i < glob_tabvar.row1.length; i++){ - if (glob_tabvar.row1[i][0].substring(0,4) != "word"){ - tmp_tab.push(0); - curves_labels.push(glob_tabvar.row1[i][0]); - } - else { + if (glob_tabvar.row1[i][0] == 'Categ' || glob_tabvar.row1[i][0] == 'Curve'){ tmp_tab2.push(0); curves_labels2.push(glob_tabvar.row1[i][0]); } + else { + tmp_tab.push(0); + curves_labels.push(glob_tabvar.row1[i][0]); + } } tmp_tab.unshift(x); tmp_tab2.unshift(x); @@ -173,7 +173,7 @@ $(document).ready(function () { curves_labels2.unshift("date"); data.push(tmp_tab); data2.push(tmp_tab2); - + var g = new Dygraph(document.getElementById("Graph"), data, { labels: curves_labels, @@ -189,7 +189,7 @@ $(document).ready(function () { fillGraph: true, includeZero: true, }); - + var g2 = new Dygraph(document.getElementById("Graph2"), data2, { labels: curves_labels2, @@ -209,7 +209,7 @@ $(document).ready(function () { var interval = 1000; //number of mili seconds between each call var refresh = function() { - + $.ajax({ url: "", cache: false, @@ -218,23 +218,23 @@ $(document).ready(function () { setTimeout(function() { var x = new Date(); var tmp_values = []; - var tmp_values2 = []; + var tmp_values2 = []; refresh(); update_values(); create_queue_table(); - - + + for (i = 0; i < (glob_tabvar.row1).length; i++){ - if (glob_tabvar.row1[i][0].substring(0,4) != "word"){ - tmp_values.push(glob_tabvar.row1[i][1]); + if (glob_tabvar.row1[i][0] == 'Categ' || glob_tabvar.row1[i][0] == 'Curve'){ + tmp_values2.push(glob_tabvar.row1[i][1]); } else { - tmp_values2.push(glob_tabvar.row1[i][1]); + tmp_values.push(glob_tabvar.row1[i][1]); } } tmp_values.unshift(x); data.push(tmp_values); - + tmp_values2.unshift(x); data2.push(tmp_values2); @@ -244,8 +244,8 @@ $(document).ready(function () { } g.updateOptions( { 'file': data } ); g2.updateOptions( { 'file': data2 } ); - - + + // TagCanvas.Reload('myCanvas'); }, interval);