From 45b0bf3983dab2378b3be95e3d2bcec0398be9b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Tue, 19 Aug 2014 19:07:07 +0200 Subject: [PATCH] Improve the cleanup. Still some to do. --- .gitignore | 3 + bin/Helper.py | 70 +++++++++++++---------- bin/ZMQ_Feed.py | 68 +++++++--------------- bin/ZMQ_Feed_Q.py | 4 +- bin/ZMQ_PubSub_Categ.py | 101 +++++++++++---------------------- bin/ZMQ_PubSub_Categ_Q.py | 4 +- bin/ZMQ_PubSub_Lines_Q.py | 4 +- bin/ZMQ_PubSub_Tokenize.py | 73 ++++++++---------------- bin/ZMQ_PubSub_Tokenize_Q.py | 4 +- bin/ZMQ_Sub_Attributes_Q.py | 4 +- bin/ZMQ_Sub_CreditCards_Q.py | 4 +- bin/ZMQ_Sub_Curve_Q.py | 4 +- bin/ZMQ_Sub_Duplicate_Q.py | 4 +- bin/ZMQ_Sub_Indexer.py | 58 +++++++------------ bin/ZMQ_Sub_Indexer_Q.py | 4 +- bin/ZMQ_Sub_Mails_Q.py | 4 +- bin/ZMQ_Sub_Onion_Q.py | 4 +- bin/ZMQ_Sub_Urls_Q.py | 4 +- bin/packages/ZMQ_PubSub.py | 25 +------- bin/packages/config.cfg | 65 --------------------- bin/packages/config.cfg.sample | 2 +- 21 files changed, 169 insertions(+), 344 deletions(-) delete mode 100644 bin/packages/config.cfg diff --git a/.gitignore b/.gitignore index a27512e0..6c5564af 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,6 @@ AILENV redis-leveldb redis + +# Local config +bin/packages/config.cfg diff --git a/bin/Helper.py b/bin/Helper.py index ab8a896f..8f4913c2 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -10,52 +10,64 @@ into a Redis-list waiting to be popped later by others scripts. ..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put the same Subscriber name in both of them. -Requirements ------------- - -*Running Redis instances. -*Should register to the Publisher "ZMQ_PubSub_Line" channel 1 - """ import redis import ConfigParser import os -from packages import ZMQ_PubSub +import zmq -class Queues(object): +class Redis_Queues(object): - def __init__(self): - configfile = os.join(os.environ('AIL_BIN'), 'packages/config.cfg') - if not os.exists(configfile): - raise Exception('Unable to find the configuration file. Did you set environment variables? Or activate the virtualenv.') + def __init__(self, zmq_conf_section, zmq_conf_channel, subscriber_name): + configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') self.config = ConfigParser.ConfigParser() - self.config.read(self.configfile) + self.config.read(configfile) + self.subscriber_name = subscriber_name - def _queue_init_redis(self): + # ZMQ subscriber + self.sub_channel = self.config.get(zmq_conf_section, zmq_conf_channel) + sub_address = self.config.get(zmq_conf_section, 'adress') + context = zmq.Context() + self.sub_socket = context.socket(zmq.SUB) + self.sub_socket.connect(sub_address) + self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel) + + # Redis Queue config_section = "Redis_Queues" self.r_queues = redis.StrictRedis( host=self.config.get(config_section, "host"), port=self.config.getint(config_section, "port"), db=self.config.getint(config_section, "db")) + self.redis_channel = self.sub_channel + subscriber_name - def _queue_shutdown(self): - # FIXME: Why not just a key? - if self.r_queues.sismember("SHUTDOWN_FLAGS", "Feed_Q"): - self.r_queues.srem("SHUTDOWN_FLAGS", "Feed_Q") - return True - return False + def zmq_pub(self, config_section): + # FIXME: should probably go somewhere else + context = zmq.Context() + self.pub_socket = context.socket(zmq.PUB) + self.pub_socket.bind(self.config.get(config_section, 'adress')) - def queue_subscribe(self, publisher, config_section, channel, - subscriber_name): - channel = self.config.get(config_section, channel) - zmq_sub = ZMQ_PubSub.ZMQSub(self.config, config_section, - channel, subscriber_name) - publisher.info("""Suscribed to channel {}""".format(channel)) - self._queue_init_redis() + def redis_queue_shutdown(self, is_queue=False): + if is_queue: + flag = self.subscriber_name + '_Q' + else: + flag = self.subscriber_name + # srem returns False if the element does not exists + return self.r_queues.srem('SHUTDOWN_FLAGS', flag) + + def redis_queue_subscribe(self, publisher): + publisher.info("Suscribed to channel {}".format(self.sub_channel)) while True: - zmq_sub.get_and_lpush(self.r_queues) - if self._queues_shutdown(): + msg = self.sub_socket.recv() + p = self.r_queues.pipeline() + p.sadd("queues", self.redis_channel) + p.lpush(self.redis_channel, msg) + p.execute() + if self.redis_queue_shutdown(True): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break diff --git a/bin/ZMQ_Feed.py b/bin/ZMQ_Feed.py index be1e3eff..75f62d41 100755 --- a/bin/ZMQ_Feed.py +++ b/bin/ZMQ_Feed.py @@ -20,50 +20,34 @@ Requirements *Need the ZMQ_Feed_Q Module running to be able to work properly. """ -import redis -import ConfigParser import base64 import os import time from pubsublogger import publisher -from packages import ZMQ_PubSub -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" +if __name__ == "__main__": + publisher.channel = "Script" - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + config_section = 'Feed' + config_channel = 'topicfilter' + subscriber_name = 'feed' - # REDIS - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - # ZMQ # - channel = cfg.get("Feed", "topicfilter") - - # Subscriber - subscriber_name = "feed" - subscriber_config_section = "Feed" # Publisher - publisher_name = "pubfed" - publisher_config_section = "PubSub_Global" - - Sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - PubGlob = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) + pub_config_section = "PubSub_Global" + h.zmq_pub(pub_config_section) + pub_channel = h.config.get(pub_config_section, "channel") # LOGGING # - publisher.channel = "Script" publisher.info("Feed Script started to receive & publish.") while True: - message = Sub.get_msg_from_queue(r_serv) + message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) # Recovering the streamed message informations. if message is not None: if len(message.split()) == 3: @@ -75,8 +59,7 @@ def main(): publisher.debug("Empty Paste: {0} not processed".format(paste)) continue else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Feed"): - r_serv.srem("SHUTDOWN_FLAGS", "Feed") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -84,24 +67,13 @@ def main(): time.sleep(10) continue # Creating the full filepath - filename = cfg.get("Directories", "pastes") + paste + filename = os.path.join(os.environ('AIL_BIN'), + h.config.get("Directories", "pastes"), paste) + dirname = os.path.dirname(filename) + if not os.path.exists(dirname): + os.makedirs(dirname) - if not os.path.exists(filename.rsplit("/", 1)[0]): - os.makedirs(filename.rsplit("/", 1)[0]) - else: - # Path already existing - pass + with open(filename, 'wb') as f: + f.write(base64.standard_b64decode(gzip64encoded)) - decoded_gzip = base64.standard_b64decode(gzip64encoded) - # paste, zlib.decompress(decoded_gzip, zlib.MAX_WBITS|16) - - with open(filename, 'wb') as F: - F.write(decoded_gzip) - - msg = cfg.get("PubSub_Global", "channel")+" "+filename - PubGlob.send_message(msg) - publisher.debug("{0} Published".format(msg)) - - -if __name__ == "__main__": - main() + h.pub_socket.send('{} {}'.format(pub_channel, filename)) diff --git a/bin/ZMQ_Feed_Q.py b/bin/ZMQ_Feed_Q.py index a81d1d57..2bcdd238 100755 --- a/bin/ZMQ_Feed_Q.py +++ b/bin/ZMQ_Feed_Q.py @@ -33,5 +33,5 @@ if __name__ == "__main__": config_channel = 'topicfilter' subscriber_name = 'feed' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Categ.py b/bin/ZMQ_PubSub_Categ.py index 71477b9a..4456553e 100755 --- a/bin/ZMQ_PubSub_Categ.py +++ b/bin/ZMQ_PubSub_Categ.py @@ -36,111 +36,78 @@ Requirements *Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly. """ -import redis +import glob +import os import argparse -import ConfigParser import time -from packages import ZMQ_PubSub from pubsublogger import publisher from packages import Paste -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + # Publisher + pub_config_section = 'PubSub_Categ' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + config_section = 'PubSub_Words' + config_channel = 'channel_0' + subscriber_name = 'pubcateg' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + h.zmq_pub(pub_config_section) # SCRIPT PARSER # parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information Leak framework.''', - epilog='''''') + description='This script is a part of the Analysis Information \ + Leak framework.') parser.add_argument( - '-l', type=str, default="../files/list_categ_files", - help='Path to the list_categ_files (../files/list_categ_files)', + '-d', type=str, default="../files/", + help='Path to the directory containing the category files.', action='store') args = parser.parse_args() - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") - subscriber_name = "categ" - subscriber_config_section = "PubSub_Words" - - publisher_name = "pubcateg" - publisher_config_section = "PubSub_Categ" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, - subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, - publisher_name) - # FUNCTIONS # - publisher.info("Script Categ subscribed to channel {0}".format( - cfg.get("PubSub_Words", "channel_0"))) + publisher.info( + "Script Categ subscribed to channel {}".format(h.sub_channel)) - with open(args.l, 'rb') as L: - tmp_dict = {} + tmp_dict = {} + for filename in glob.glob(args.d): + bname = os.path.basename(filename) + tmp_dict[bname] = [] + with open(filename, 'r') as f: + for l in f: + tmp_dict[bname].append(l.strip()) - for num, fname in enumerate(L): - # keywords temp list - tmp_list = [] - - with open(fname[:-1], 'rb') as LS: - - for num, kword in enumerate(LS): - tmp_list.append(kword[:-1]) - - tmp_dict[fname.split('/')[-1][:-1]] = tmp_list - - message = sub.get_msg_from_queue(r_serv) prec_filename = None while True: + message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) if message is not None: channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: PST = Paste.Paste(filename) + prec_filename = filename - prec_filename = filename + for categ, words_list in tmp_dict.items(): - for categ, list in tmp_dict.items(): - - if word.lower() in list: - channel = categ - msg = channel+" "+PST.p_path+" "+word+" "+score - pub.send_message(msg) - # dico_categ.add(categ) + if word.lower() in words_list: + h.pub_socket.send('{} {} {} {}'.format( + categ, PST.p_path, word, score)) publisher.info( 'Categ;{};{};{};Detected {} "{}"'.format( PST.p_source, PST.p_date, PST.p_name, score, word)) else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Categ"): - r_serv.srem("SHUTDOWN_FLAGS", "Categ") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script Categ is Idling 10s") time.sleep(10) - - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Categ_Q.py b/bin/ZMQ_PubSub_Categ_Q.py index c0651565..44f06b17 100755 --- a/bin/ZMQ_PubSub_Categ_Q.py +++ b/bin/ZMQ_PubSub_Categ_Q.py @@ -30,5 +30,5 @@ if __name__ == "__main__": config_channel = 'channel_0' subscriber_name = 'categ' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Lines_Q.py b/bin/ZMQ_PubSub_Lines_Q.py index fe7cd606..a9967d1a 100755 --- a/bin/ZMQ_PubSub_Lines_Q.py +++ b/bin/ZMQ_PubSub_Lines_Q.py @@ -29,5 +29,5 @@ if __name__ == "__main__": config_channel = 'channel' subscriber_name = 'line' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Tokenize.py b/bin/ZMQ_PubSub_Tokenize.py index 701968d0..68c5e720 100755 --- a/bin/ZMQ_PubSub_Tokenize.py +++ b/bin/ZMQ_PubSub_Tokenize.py @@ -4,9 +4,11 @@ The ZMQ_PubSub_Lines Module ============================ -This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q Module. +This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q +Module. -It tokenize the content of the paste and publish the result in the following format: +It tokenize the content of the paste and publish the result in the following +format: channel_name+' '+/path/of/the/paste.gz+' '+tokenized_word+' '+scoring ..seealso:: Paste method (_get_top_words) @@ -21,72 +23,45 @@ Requirements *Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly. """ -import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - channel = cfg.get("PubSub_Longlines", "channel_1") - subscriber_name = "tokenize" - subscriber_config_section = "PubSub_Longlines" - # Publisher - publisher_config_section = "PubSub_Words" - publisher_name = "pubtokenize" + pub_config_section = 'PubSub_Words' - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) + config_section = 'PubSub_Longlines' + config_channel = 'channel_1' + subscriber_name = 'tokenize' - channel_0 = cfg.get("PubSub_Words", "channel_0") + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - # FUNCTIONS # - publisher.info("Tokeniser subscribed to channel {0}".format(cfg.get("PubSub_Longlines", "channel_1"))) + h.zmq_pub(pub_config_section) + pub_channel = h.config.get(pub_config_section, "channel_0") + + # LOGGING # + publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel)) while True: - message = sub.get_msg_from_queue(r_serv) + message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) print message if message is not None: - PST = Paste.Paste(message.split(" ", -1)[-1]) + paste = Paste.Paste(message.split(" ", -1)[-1]) + for word, score in paste._get_top_words().items(): + if len(word) >= 4: + h.pub_socket.send( + '{} {} {} {}'.format(pub_channel, paste.p_path, + word, score)) else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Tokenize"): - r_serv.srem("SHUTDOWN_FLAGS", "Tokenize") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Tokeniser is idling 10s") time.sleep(10) print "sleepin" - continue - - for word, score in PST._get_top_words().items(): - if len(word) >= 4: - msg = channel_0+' '+PST.p_path+' '+str(word)+' '+str(score) - pub.send_message(msg) - print msg - else: - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Tokenize_Q.py b/bin/ZMQ_PubSub_Tokenize_Q.py index 73459e49..5135144d 100755 --- a/bin/ZMQ_PubSub_Tokenize_Q.py +++ b/bin/ZMQ_PubSub_Tokenize_Q.py @@ -30,5 +30,5 @@ if __name__ == "__main__": config_channel = 'channel_1' subscriber_name = 'tokenize' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Attributes_Q.py b/bin/ZMQ_Sub_Attributes_Q.py index 5d0c2cd5..c0202e61 100755 --- a/bin/ZMQ_Sub_Attributes_Q.py +++ b/bin/ZMQ_Sub_Attributes_Q.py @@ -30,5 +30,5 @@ if __name__ == "__main__": config_channel = 'channel' subscriber_name = 'attributes' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_CreditCards_Q.py b/bin/ZMQ_Sub_CreditCards_Q.py index 5df01cf8..8da06df1 100755 --- a/bin/ZMQ_Sub_CreditCards_Q.py +++ b/bin/ZMQ_Sub_CreditCards_Q.py @@ -13,5 +13,5 @@ if __name__ == "__main__": config_channel = 'channel_0' subscriber_name = 'creditcard_categ' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Curve_Q.py b/bin/ZMQ_Sub_Curve_Q.py index aed0caad..55333bb7 100755 --- a/bin/ZMQ_Sub_Curve_Q.py +++ b/bin/ZMQ_Sub_Curve_Q.py @@ -30,5 +30,5 @@ if __name__ == "__main__": config_channel = 'channel_0' subscriber_name = 'curve' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Duplicate_Q.py b/bin/ZMQ_Sub_Duplicate_Q.py index 2a9a4212..8901b377 100755 --- a/bin/ZMQ_Sub_Duplicate_Q.py +++ b/bin/ZMQ_Sub_Duplicate_Q.py @@ -12,5 +12,5 @@ if __name__ == "__main__": config_channel = 'channel' subscriber_name = 'duplicate' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Indexer.py b/bin/ZMQ_Sub_Indexer.py index 3fc3f56e..377f6943 100755 --- a/bin/ZMQ_Sub_Indexer.py +++ b/bin/ZMQ_Sub_Indexer.py @@ -9,38 +9,37 @@ The ZMQ_Sub_Indexer modules is fetching the list of files to be processed and index each file with a full-text indexer (Whoosh until now). """ -import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher from whoosh.index import create_in, exists_in, open_dir from whoosh.fields import Schema, TEXT, ID import os -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" +if __name__ == "__main__": + publisher.channel = "Script" - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + # Subscriber + sub_config_section = 'PubSub_Global' + sub_name = 'indexer' - # Redis - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'indexer' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Indexer configuration - index dir and schema setup - indexpath = cfg.get("Indexer", "path") - indexertype = cfg.get("Indexer", "type") + indexpath = h.config.get("Indexer", "path") + indexertype = h.config.get("Indexer", "type") if indexertype == "whoosh": - schema = Schema(title=TEXT(stored=True), path=ID(stored=True, unique=True), content=TEXT) + schema = Schema(title=TEXT(stored=True), path=ID(stored=True, + unique=True), + content=TEXT) if not os.path.exists(indexpath): os.mkdir(indexpath) if not exists_in(indexpath): @@ -49,29 +48,16 @@ def main(): ix = open_dir(indexpath) # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "indexer" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - # FUNCTIONS # publisher.info("""ZMQ Indexer is Running""") while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Indexer"): - r_serv1.srem("SHUTDOWN_FLAGS", "Indexer") - publisher.warning("Shutdown Flag Up: Terminating.") + if h.redis_queue_shutdown(): break publisher.debug("Script Indexer is idling 10s") time.sleep(1) @@ -88,9 +74,5 @@ def main(): indexwriter.commit() except IOError: print "CRC Checksum Failed on :", PST.p_path - publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format(PST.p_source, PST.p_date, PST.p_name)) - pass - - -if __name__ == "__main__": - main() + publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format( + PST.p_source, PST.p_date, PST.p_name)) diff --git a/bin/ZMQ_Sub_Indexer_Q.py b/bin/ZMQ_Sub_Indexer_Q.py index 65d951e7..5433663f 100755 --- a/bin/ZMQ_Sub_Indexer_Q.py +++ b/bin/ZMQ_Sub_Indexer_Q.py @@ -24,5 +24,5 @@ if __name__ == "__main__": config_channel = 'channel' subscriber_name = 'indexer' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Mails_Q.py b/bin/ZMQ_Sub_Mails_Q.py index 044fc352..0cd21971 100755 --- a/bin/ZMQ_Sub_Mails_Q.py +++ b/bin/ZMQ_Sub_Mails_Q.py @@ -12,5 +12,5 @@ if __name__ == "__main__": config_channel = 'channel_1' subscriber_name = 'mails_categ' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Onion_Q.py b/bin/ZMQ_Sub_Onion_Q.py index 5668240e..d637452a 100755 --- a/bin/ZMQ_Sub_Onion_Q.py +++ b/bin/ZMQ_Sub_Onion_Q.py @@ -29,5 +29,5 @@ if __name__ == "__main__": config_channel = 'channel_2' subscriber_name = 'onion_categ' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Urls_Q.py b/bin/ZMQ_Sub_Urls_Q.py index 0bdeed66..bf8ad14f 100755 --- a/bin/ZMQ_Sub_Urls_Q.py +++ b/bin/ZMQ_Sub_Urls_Q.py @@ -13,5 +13,5 @@ if __name__ == "__main__": config_channel = 'channel_3' subscriber_name = 'web_categ' - h = Helper.Queues() - h.queue_subscribe(publisher, config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.redis_queue_subscribe(publisher) diff --git a/bin/packages/ZMQ_PubSub.py b/bin/packages/ZMQ_PubSub.py index 14b20440..fbf6372d 100755 --- a/bin/packages/ZMQ_PubSub.py +++ b/bin/packages/ZMQ_PubSub.py @@ -28,7 +28,6 @@ class PubSub(object): """ def __init__(self, config, log_channel, ps_name): self._ps_name = ps_name - self._config_parser = config self._context_zmq = zmq.Context() @@ -60,9 +59,8 @@ class ZMQPub(PubSub): def __init__(self, config, pub_config_section, ps_name): super(ZMQPub, self).__init__(config, "Default", ps_name) - self._pub_config_section = pub_config_section self._pubsocket = self._context_zmq.socket(zmq.PUB) - self._pub_adress = self._config_parser.get(self._pub_config_section, "adress") + self._pub_adress = self._config_parser.get(pub_config_section, "adress") self._pubsocket.bind(self._pub_adress) @@ -117,33 +115,14 @@ class ZMQSub(PubSub): def __init__(self, config, sub_config_section, channel, ps_name): super(ZMQSub, self).__init__(config, "Default", ps_name) - self._sub_config_section = sub_config_section self._subsocket = self._context_zmq.socket(zmq.SUB) - self._sub_adress = self._config_parser.get(self._sub_config_section, "adress") + self._sub_adress = self._config_parser.get(sub_config_section, "adress") self._subsocket.connect(self._sub_adress) self._channel = channel self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel) - def get_message(self): - """ - Get the first sent message from a Publisher. - :return: (str) Message from Publisher - - """ - return self._subsocket.recv() - - def get_and_lpush(self, r_serv): - """ - Get the first sent message from a Publisher and storing it in redis - - ..note:: This function also create a set named "queue" for monitoring needs - - """ - r_serv.sadd("queues", self._channel+self._ps_name) - r_serv.lpush(self._channel+self._ps_name, self._subsocket.recv()) - def get_msg_from_queue(self, r_serv): """ Get the first sent message from a Redis List diff --git a/bin/packages/config.cfg b/bin/packages/config.cfg deleted file mode 100644 index 5ef8a9a5..00000000 --- a/bin/packages/config.cfg +++ /dev/null @@ -1,65 +0,0 @@ -[Directories] -bloomfilters = /home/user/Blooms/ -pastes = /home/user/PASTES/ - -##### Redis ##### -[Redis_Cache] -host = localhost -port = 6379 -db = 0 - -[Redis_Log] -host = localhost -port = 6380 -db = 0 - -[Redis_Queues] -host = localhost -port = 6381 -db = 0 - -[Redis_Data_Merging] -host = localhost -port = 6379 -db = 1 - -##### LevelDB ##### -[Redis_Level_DB] -host = localhost -port = 2013 -db = 0 - -[Redis_Level_DB_Hashs] -host = localhost -port = 2013 -db = 1 - -# PUB / SUB : ZMQ -[Feed] -adress = tcp://crf.circl.lu:5556 -topicfilter = 102 - -[PubSub_Global] -adress = tcp://127.0.0.1:5000 -channel = filelist - -[PubSub_Longlines] -adress = tcp://127.0.0.1:5001 -channel_0 = Longlines -channel_1 = Shortlines - -[PubSub_Words] -adress = tcp://127.0.0.1:5002 -channel_0 = words - -[PubSub_Categ] -adress = tcp://127.0.0.1:5003 -channel_0 = cards -channel_1 = emails -channel_2 = tor -channel_3 = urls -#Channels are dynamic (1 channel per categ) <= FIXME: no it's not. - -[PubSub_Url] -adress = tcp://127.0.0.1:5004 -channel = urls diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 76a3ee20..daeabd83 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -1,6 +1,6 @@ [Directories] bloomfilters = /home/user/Blooms/ -pastes = /home/user/PASTES/ +pastes = PASTES wordtrending_csv = /home/user/AIL/var/www/static/csv/wordstrendingdata wordsfile = /home/user/AIL/files/wordfile