From 99c8cc7941b1277b9a4c0bb629ab11183ff9bd92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Wed, 20 Aug 2014 15:14:57 +0200 Subject: [PATCH] completely remove ZMQ_PubSub.py --- bin/Helper.py | 15 ++++- bin/ZMQ_Feed.py | 8 +-- bin/ZMQ_PubSub_Categ.py | 14 ++-- bin/ZMQ_PubSub_Lines.py | 100 +++++++++++----------------- bin/ZMQ_PubSub_Tokenize.py | 16 ++--- bin/ZMQ_Pub_Global.py | 8 +-- bin/ZMQ_Sub_Attributes.py | 54 +++++---------- bin/ZMQ_Sub_CreditCards.py | 62 +++++++---------- bin/ZMQ_Sub_Curve.py | 73 ++++++++------------ bin/ZMQ_Sub_Duplicate.py | 104 ++++++++++++----------------- bin/ZMQ_Sub_Indexer.py | 2 +- bin/ZMQ_Sub_Mails.py | 84 +++++++++++------------ bin/ZMQ_Sub_Onion.py | 64 ++++++++---------- bin/ZMQ_Sub_Urls.py | 100 +++++++++++++--------------- bin/indexer_lookup.py | 3 +- bin/packages/ZMQ_PubSub.py | 133 ------------------------------------- bin/packages/lib_words.py | 6 +- 17 files changed, 307 insertions(+), 539 deletions(-) delete mode 100755 bin/packages/ZMQ_PubSub.py diff --git a/bin/Helper.py b/bin/Helper.py index 1be5e1e4..8e3bade2 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -45,10 +45,23 @@ class Redis_Queues(object): self.sub_socket.connect(sub_address) self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel) - def zmq_pub(self, config_section): + def zmq_pub(self, config_section, config_channel): context = zmq.Context() self.pub_socket = context.socket(zmq.PUB) self.pub_socket.bind(self.config.get(config_section, 'adress')) + if config_channel is not None: + self.pub_channel = self.config.get(config_section, config_channel) + else: + # The publishing channel is defined dynamically + self.pub_channel = None + + def zmq_pub_send(self, msg): + if self.pub_channel is None: + raise Exception('A channel is reqired to send a message.') + self.pub_socket.send('{} {}'.format(self.pub_channel, msg)) + + def redis_rpop(self): + return self.r_queues.rpop(self.sub_channel + self.subscriber_name) def redis_queue_shutdown(self, is_queue=False): if is_queue: diff --git a/bin/ZMQ_Feed.py b/bin/ZMQ_Feed.py index 75f62d41..eeee987e 100755 --- a/bin/ZMQ_Feed.py +++ b/bin/ZMQ_Feed.py @@ -39,15 +39,15 @@ if __name__ == "__main__": # Publisher pub_config_section = "PubSub_Global" - h.zmq_pub(pub_config_section) - pub_channel = h.config.get(pub_config_section, "channel") + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) # LOGGING # publisher.info("Feed Script started to receive & publish.") while True: - message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) + message = h.redis_rpop() # Recovering the streamed message informations. if message is not None: if len(message.split()) == 3: @@ -76,4 +76,4 @@ if __name__ == "__main__": with open(filename, 'wb') as f: f.write(base64.standard_b64decode(gzip64encoded)) - h.pub_socket.send('{} {}'.format(pub_channel, filename)) + h.zmq_pub_send(filename) diff --git a/bin/ZMQ_PubSub_Categ.py b/bin/ZMQ_PubSub_Categ.py index 4456553e..fa0ef533 100755 --- a/bin/ZMQ_PubSub_Categ.py +++ b/bin/ZMQ_PubSub_Categ.py @@ -48,16 +48,16 @@ import Helper if __name__ == "__main__": publisher.channel = "Script" - # Publisher - pub_config_section = 'PubSub_Categ' - config_section = 'PubSub_Words' config_channel = 'channel_0' subscriber_name = 'pubcateg' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.zmq_pub(pub_config_section) + # Publisher + pub_config_section = 'PubSub_Categ' + + h.zmq_pub(pub_config_section, None) # SCRIPT PARSER # parser = argparse.ArgumentParser( @@ -86,7 +86,7 @@ if __name__ == "__main__": prec_filename = None while True: - message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) + message = h.redis_rpop() if message is not None: channel, filename, word, score = message.split() @@ -97,8 +97,8 @@ if __name__ == "__main__": for categ, words_list in tmp_dict.items(): if word.lower() in words_list: - h.pub_socket.send('{} {} {} {}'.format( - categ, PST.p_path, word, score)) + h.pub_channel = categ + h.zmq_pub_send('{} {} {}'.format(PST.p_path, word, score)) publisher.info( 'Categ;{};{};{};Detected {} "{}"'.format( diff --git a/bin/ZMQ_PubSub_Lines.py b/bin/ZMQ_PubSub_Lines.py index d13103c6..1cb38fe5 100755 --- a/bin/ZMQ_PubSub_Lines.py +++ b/bin/ZMQ_PubSub_Lines.py @@ -5,10 +5,11 @@ The ZMQ_PubSub_Lines Module ============================ -This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module. +This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q +Module. -It perform a sorting on the line's length and publish/forward them to differents -channels: +It perform a sorting on the line's length and publish/forward them to +differents channels: *Channel 1 if max length(line) < max *Channel 2 if max length(line) > max @@ -28,79 +29,62 @@ Requirements """ import redis import argparse -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'line' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Longlines' + h.zmq_pub(pub_config_section, None) + + # Subscriber + h.zmq_sub(config_section) # SCRIPT PARSER # parser = argparse.ArgumentParser( - description='''This script is a part of the Analysis Information Leak framework.''', - epilog='''''') + description='''This script is a part of the Analysis Information \ + Leak framework.''') - parser.add_argument('-max', type=int, default=500, - help='The limit between "short lines" and "long lines" (500)', - action='store') + parser.add_argument( + '-max', type=int, default=500, + help='The limit between "short lines" and "long lines"', + action='store') args = parser.parse_args() # REDIS # + # FIXME move it in the Paste object r_serv = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "line" - subscriber_config_section = "PubSub_Global" - - # Publisher - publisher_config_section = "PubSub_Longlines" - publisher_name = "publine" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) - - channel_0 = cfg.get("PubSub_Longlines", "channel_0") - channel_1 = cfg.get("PubSub_Longlines", "channel_1") + channel_0 = h.config.get("PubSub_Longlines", "channel_0") + channel_1 = h.config.get("PubSub_Longlines", "channel_1") # FUNCTIONS # - tmp_string = "Lines script Subscribed to channel {} and Start to publish on channel {}, {}" - publisher.info(tmp_string.format( - cfg.get("PubSub_Global", "channel"), - cfg.get("PubSub_Longlines", "channel_0"), - cfg.get("PubSub_Longlines", "channel_1"))) + tmp_string = "Lines script Subscribed to channel {} and Start to publish \ + on channel {}, {}" + publisher.info(tmp_string.format(h.sub_channel, channel_0, channel_1)) while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Lines"): - r_serv1.srem("SHUTDOWN_FLAGS", "Lines") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -111,18 +95,14 @@ def main(): lines_infos = PST.get_lines_info() PST.save_attribute_redis(r_serv, "p_nb_lines", lines_infos[0]) - PST.save_attribute_redis(r_serv, "p_max_length_line", lines_infos[1]) + PST.save_attribute_redis(r_serv, "p_max_length_line", + lines_infos[1]) r_serv.sadd("Pastes_Objects", PST.p_path) if lines_infos[1] >= args.max: - msg = channel_0+" "+PST.p_path + h.pub_channel = channel_0 else: - msg = channel_1+" "+PST.p_path - - pub.send_message(msg) + h.pub_channel = channel_1 + h.zmq_pub_send(PST.p_path) except IOError: print "CRC Checksum Error on : ", PST.p_path - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_PubSub_Tokenize.py b/bin/ZMQ_PubSub_Tokenize.py index 68c5e720..2fe3de5b 100755 --- a/bin/ZMQ_PubSub_Tokenize.py +++ b/bin/ZMQ_PubSub_Tokenize.py @@ -32,31 +32,29 @@ import Helper if __name__ == "__main__": publisher.channel = "Script" - # Publisher - pub_config_section = 'PubSub_Words' - config_section = 'PubSub_Longlines' config_channel = 'channel_1' subscriber_name = 'tokenize' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) - h.zmq_pub(pub_config_section) - pub_channel = h.config.get(pub_config_section, "channel_0") + # Publisher + pub_config_section = 'PubSub_Words' + pub_config_channel = 'channel_0' + h.zmq_pub(pub_config_section, pub_config_channel) # LOGGING # publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel)) while True: - message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) + message = h.redis_rpop() print message if message is not None: paste = Paste.Paste(message.split(" ", -1)[-1]) for word, score in paste._get_top_words().items(): if len(word) >= 4: - h.pub_socket.send( - '{} {} {} {}'.format(pub_channel, paste.p_path, - word, score)) + h.zmq_pub_send('{} {} {}'.format(paste.p_path, word, + score)) else: if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" diff --git a/bin/ZMQ_Pub_Global.py b/bin/ZMQ_Pub_Global.py index 56057737..7ac5ede7 100755 --- a/bin/ZMQ_Pub_Global.py +++ b/bin/ZMQ_Pub_Global.py @@ -37,17 +37,17 @@ if __name__ == "__main__": # Publisher pub_config_section = 'PubSub_Global' - h.zmq_pub(pub_config_section) - pub_channel = h.config.get(pub_config_section, "channel") + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) # LOGGING # publisher.info("Starting to publish.") while True: - filename = h.r_queues.lpop(h.sub_channel) + filename = h.redis_rpop() if filename is not None: - h.pub_socket.send('{} {}'.format(pub_channel, filename)) + h.zmq_pub_send(filename) else: time.sleep(10) publisher.debug("Nothing to publish") diff --git a/bin/ZMQ_Sub_Attributes.py b/bin/ZMQ_Sub_Attributes.py index e8a59071..4e36a7bc 100755 --- a/bin/ZMQ_Sub_Attributes.py +++ b/bin/ZMQ_Sub_Attributes.py @@ -27,56 +27,41 @@ Requirements """ import redis -import ConfigParser import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'attributes' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) # REDIS # r_serv = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - # Subscriber - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "attributes" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # FUNCTIONS # publisher.info("""ZMQ Attribute is Running""") while True: try: - message = sub.get_msg_from_queue(r_serv1) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) else: - if r_serv1.sismember("SHUTDOWN_FLAGS", "Attributes"): - r_serv1.srem("SHUTDOWN_FLAGS", "Attributes") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -95,9 +80,6 @@ def main(): PST.save_all_attributes_redis(r_serv) except IOError: print "CRC Checksum Failed on :", PST.p_path - publisher.error('{0};{1};{2};{3};{4}'.format("Duplicate", PST.p_source, PST.p_date, PST.p_name, "CRC Checksum Failed")) - pass - - -if __name__ == "__main__": - main() + publisher.error('{0};{1};{2};{3};{4}'.format( + "Duplicate", PST.p_source, PST.p_date, PST.p_name, + "CRC Checksum Failed")) diff --git a/bin/ZMQ_Sub_CreditCards.py b/bin/ZMQ_Sub_CreditCards.py index 54021144..084a7b64 100755 --- a/bin/ZMQ_Sub_CreditCards.py +++ b/bin/ZMQ_Sub_CreditCards.py @@ -1,53 +1,44 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher +import Helper -configfile = './packages/config.cfg' - - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards") + config_section = 'PubSub_Categ' + config_channel = 'channel_0' + subscriber_name = 'cards' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # FUNCTIONS # publisher.info("Creditcard script subscribed to channel creditcard_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?" # mastercard_regex = "5[1-5]\d{2}([\ \-]?)\d{4}\1\d{4}\1\d{4}" # visa_regex = "4\d{3}([\ \-]?)\d{4}\1\d{4}\1\d{4}" - # discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]|[3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}" + # discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]| + # [3-9]\d)|[2-8]\d\d|9(?:[01]\d|2[0-5])))\d{10}" # jcb_regex = "35(?:2[89]|[3-8]\d)([\ \-]?)\d{4}\1\d{4}\1\d{4}" # amex_regex = "3[47]\d\d([\ \-]?)\d{6}\1\d{5}" # chinaUP_regex = "62[0-5]\d{13,16}" @@ -69,25 +60,22 @@ def main(): PST.save_attribute_redis(r_serv1, channel, creditcard_set) pprint.pprint(creditcard_set) - to_print = 'CreditCard;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'CreditCard;{};{};{};'.format( + PST.p_source, PST.p_date, PST.p_name) if (len(creditcard_set) > 0): - publisher.critical('{}Checked {} valid number(s)'.format(to_print, len(creditcard_set))) + publisher.critical('{}Checked {} valid number(s)'.format( + to_print, len(creditcard_set))) else: publisher.info('{}CreditCard related'.format(to_print)) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards"): - r_serv.srem("SHUTDOWN_FLAGS", "Creditcards") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script creditcard is idling 1m") time.sleep(60) - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_Curve.py b/bin/ZMQ_Sub_Curve.py index 7a065afb..5a0b94e1 100755 --- a/bin/ZMQ_Sub_Curve.py +++ b/bin/ZMQ_Sub_Curve.py @@ -6,7 +6,8 @@ The ZMQ_Sub_Curve Module This module is consuming the Redis-list created by the ZMQ_Sub_Curve_Q Module. -This modules update a .csv file used to draw curves representing selected words and their occurency per day. +This modules update a .csv file used to draw curves representing selected +words and their occurency per day. ..note:: The channel will have the name of the file created. @@ -22,72 +23,60 @@ Requirements """ import redis -import ConfigParser import time -from packages import Paste as P -from packages import ZMQ_PubSub +from packages import Paste from pubsublogger import publisher from packages import lib_words -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Level_DB", "host"), - port=cfg.get("Redis_Level_DB", "port"), - db=0) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - channel = cfg.get("PubSub_Words", "channel_0") + config_section = 'PubSub_Words' + config_channel = 'channel_0' subscriber_name = "curve" - subscriber_config_section = "PubSub_Words" - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Level_DB", "host"), + port=h.config.get("Redis_Level_DB", "port"), + db=h.config.get("Redis_Level_DB", "db")) # FUNCTIONS # - publisher.info("Script Curve subscribed to channel {0}".format(cfg.get("PubSub_Words", "channel_0"))) + publisher.info("Script Curve subscribed to {}".format(h.sub_channel)) # FILE CURVE SECTION # - csv_path = cfg.get("Directories", "wordtrending_csv") - wordfile_path = cfg.get("Directories", "wordsfile") + csv_path = h.config.get("Directories", "wordtrending_csv") + wordfile_path = h.config.get("Directories", "wordsfile") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None while True: if message is not None: channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: - PST = P.Paste(filename) - lib_words.create_curve_with_word_file(r_serv1, csv_path, wordfile_path, int(PST.p_date.year), int(PST.p_date.month)) + PST = Paste.Paste(filename) + lib_words.create_curve_with_word_file( + r_serv1, csv_path, wordfile_path, int(PST.p_date.year), + int(PST.p_date.month)) prec_filename = filename prev_score = r_serv1.hget(word.lower(), PST.p_date) print prev_score if prev_score is not None: - r_serv1.hset(word.lower(), PST.p_date, int(prev_score) + int(score)) + r_serv1.hset(word.lower(), PST.p_date, + int(prev_score) + int(score)) else: r_serv1.hset(word.lower(), PST.p_date, score) - # r_serv.expire(word,86400) #1day else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Curve"): - r_serv.srem("SHUTDOWN_FLAGS", "Curve") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -95,8 +84,4 @@ def main(): print "sleepin" time.sleep(1) - message = sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_Duplicate.py b/bin/ZMQ_Sub_Duplicate.py index 5bcaf4c7..ccaf6f7d 100755 --- a/bin/ZMQ_Sub_Duplicate.py +++ b/bin/ZMQ_Sub_Duplicate.py @@ -13,61 +13,51 @@ Requirements: """ import redis -import ConfigParser import os import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher from pybloomfilter import BloomFilter -configfile = './packages/config.cfg' +import Helper +if __name__ == "__main__": + publisher.channel = "Script" -def main(): - """Main Function""" + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'duplicate' - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) # REDIS # - # DB QUEUE ( MEMORY ) - r_Q_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - r_serv_merge = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # REDIS # # DB OBJECT & HASHS ( DISK ) + # FIXME increase flexibility dico_redis = {} for year in xrange(2013, 2015): for month in xrange(0, 16): dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis( - host=cfg.get("Redis_Level_DB", "host"), - port=year, + host=h.config.get("Redis_Level_DB", "host"), port=year, db=month) - # LOGGING # - publisher.channel = "Script" - - # ZMQ # - channel = cfg.get("PubSub_Global", "channel") - subscriber_name = "duplicate" - subscriber_config_section = "PubSub_Global" - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) - # FUNCTIONS # - publisher.info("""Script duplicate subscribed to channel {0}""".format(cfg.get("PubSub_Global", "channel"))) + publisher.info("""Script duplicate subscribed to channel {0}""".format( + h.config.get("PubSub_Global", "channel"))) set_limit = 100 + bloompath = os.path.join(os.environ('AIL_BIN'), + h.config.get("Directories", "bloomfilters")) + bloop_path_set = set() while True: try: super_dico = {} @@ -77,15 +67,14 @@ def main(): x = time.time() - message = sub.get_msg_from_queue(r_Q_serv) + message = h.redis_rpop() if message is not None: path = message.split(" ", -1)[-1] PST = Paste.Paste(path) else: publisher.debug("Script Attribute is idling 10s") time.sleep(10) - if r_Q_serv.sismember("SHUTDOWN_FLAGS", "Duplicate"): - r_Q_serv.srem("SHUTDOWN_FLAGS", "Duplicate") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break @@ -97,19 +86,14 @@ def main(): r_serv1 = dico_redis[PST.p_date.year + PST.p_date.month] # Creating the bloom filter name: bloomyyyymm - bloomname = 'bloom' + PST.p_date.year + PST.p_date.month - - bloompath = cfg.get("Directories", "bloomfilters") - - filebloompath = bloompath + bloomname - - # datetime.date(int(PST.p_date.year),int(PST.p_date.month),int(PST.p_date.day)).timetuple().tm_yday % 7 + filebloompath = os.path.join(bloompath, 'bloom' + PST.p_date.year + + PST.p_date.month) if os.path.exists(filebloompath): bloom = BloomFilter.open(filebloompath) else: bloom = BloomFilter(100000000, 0.01, filebloompath) - r_Q_serv.sadd("bloomlist", filebloompath) + bloop_path_set.add(filebloompath) # UNIQUE INDEX HASHS TABLE r_serv0 = dico_redis["201300"] @@ -121,45 +105,43 @@ def main(): # For each bloom filter opened_bloom = [] - for bloo in r_Q_serv.smembers("bloomlist"): + for bloo in bloop_path_set: # Opening blooms opened_bloom.append(BloomFilter.open(bloo)) # For each hash of the paste - for hash in PST._get_hash_lines(min=5, start=1, jump=0): + for line_hash in PST._get_hash_lines(min=5, start=1, jump=0): nb_hash_current += 1 # Adding the hash in Redis & limiting the set - if r_serv1.scard(hash) <= set_limit: - r_serv1.sadd(hash, index) - r_serv1.sadd("HASHS", hash) + if r_serv1.scard(line_hash) <= set_limit: + r_serv1.sadd(line_hash, index) + r_serv1.sadd("HASHS", line_hash) # Adding the hash in the bloom of the month - bloom.add(hash) + bloom.add(line_hash) # Go throught the Database of the bloom filter (of the month) for bloo in opened_bloom: - if hash in bloo: + if line_hash in bloo: db = bloo.name[-6:] - # Go throught the Database of the bloom filter (of the month) + # Go throught the Database of the bloom filter (month) r_serv_bloom = dico_redis[db] # set of index paste: set([1,2,4,65]) - hash_current = r_serv_bloom.smembers(hash) + hash_current = r_serv_bloom.smembers(line_hash) # removing itself from the list hash_current = hash_current - set([index]) - # if the hash is present at least in 1 files (already processed) + # if the hash is present at least in 1 files + # (already processed) if len(hash_current) != 0: - hash_dico[hash] = hash_current + hash_dico[line_hash] = hash_current # if there is data in this dictionnary if len(hash_dico) != 0: super_dico[index] = hash_dico - else: - # The hash is not in this bloom - pass - ########################################################################################### + ########################################################################### # if there is data in this dictionnary if len(super_dico) != 0: @@ -171,7 +153,8 @@ def main(): for p_fname in pset: occur_dico.setdefault(p_fname, 0) - # Count how much hash is similar per file occuring in the dictionnary + # Count how much hash is similar per file occuring + # in the dictionnary if occur_dico[p_fname] >= 0: occur_dico[p_fname] = occur_dico[p_fname] + 1 @@ -181,7 +164,8 @@ def main(): dupl.append((paste, percentage)) # Creating the object attribute and save it. - to_print = 'Duplicate;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'Duplicate;{};{};{};'.format( + PST.p_source, PST.p_date, PST.p_name) if dupl != []: PST.__setattr__("p_duplicate", dupl) PST.save_attribute_redis(r_serv_merge, "p_duplicate", dupl) @@ -193,7 +177,3 @@ def main(): except IOError: print "CRC Checksum Failed on :", PST.p_path publisher.error('{}CRC Checksum Failed'.format(to_print)) - pass - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Indexer.py b/bin/ZMQ_Sub_Indexer.py index 377f6943..6fa4b233 100755 --- a/bin/ZMQ_Sub_Indexer.py +++ b/bin/ZMQ_Sub_Indexer.py @@ -52,7 +52,7 @@ if __name__ == "__main__": while True: try: - message = h.r_queues.rpop(h.sub_channel + h.subscriber_name) + message = h.redis_rpop() if message is not None: PST = Paste.Paste(message.split(" ", -1)[-1]) diff --git a/bin/ZMQ_Sub_Mails.py b/bin/ZMQ_Sub_Mails.py index b406a152..ee7c7f92 100755 --- a/bin/ZMQ_Sub_Mails.py +++ b/bin/ZMQ_Sub_Mails.py @@ -2,53 +2,47 @@ # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time import dns.exception -from packages import Paste as P +from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv2 = redis.StrictRedis( - host=cfg.get("Redis_Cache", "host"), - port=cfg.getint("Redis_Cache", "port"), - db=cfg.getint("Redis_Cache", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails") + config_section = 'PubSub_Categ' + config_channel = 'channel_1' + subscriber_name = 'emails' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) + + r_serv2 = redis.StrictRedis( + host=h.config.get("Redis_Cache", "host"), + port=h.config.getint("Redis_Cache", "port"), + db=h.config.getint("Redis_Cache", "db")) # FUNCTIONS # publisher.info("Suscribed to channel mails_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None + # Log as critical if there are more that that amout of valid emails + is_critical = 10 + email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}" while True: @@ -57,36 +51,36 @@ def main(): channel, filename, word, score = message.split() if prec_filename is None or filename != prec_filename: - PST = P.Paste(filename) - MX_values = lib_refine.checking_MX_record(r_serv2, PST.get_regex(email_regex)) + PST = Paste.Paste(filename) + MX_values = lib_refine.checking_MX_record( + r_serv2, PST.get_regex(email_regex)) if MX_values[0] >= 1: PST.__setattr__(channel, MX_values) - PST.save_attribute_redis(r_serv1, channel, (MX_values[0], list(MX_values[1]))) + PST.save_attribute_redis(r_serv1, channel, + (MX_values[0], + list(MX_values[1]))) pprint.pprint(MX_values) - to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.format(PST.p_source, PST.p_date, PST.p_name, MX_values[0]) - if MX_values[0] > 10: + to_print = 'Mails;{};{};{};Checked {} e-mail(s)'.\ + format(PST.p_source, PST.p_date, PST.p_name, + MX_values[0]) + if MX_values[0] > is_critical: publisher.warning(to_print) else: publisher.info(to_print) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Mails"): - r_serv.srem("SHUTDOWN_FLAGS", "Mails") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script Mails is Idling 10s") time.sleep(10) - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() except dns.exception.Timeout: + # FIXME retry! print "dns.exception.Timeout" - pass - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Onion.py b/bin/ZMQ_Sub_Onion.py index 57d8e17f..c8ad9149 100755 --- a/bin/ZMQ_Sub_Onion.py +++ b/bin/ZMQ_Sub_Onion.py @@ -6,8 +6,8 @@ The ZMQ_Sub_Onion Module This module is consuming the Redis-list created by the ZMQ_Sub_Onion_Q Module. -It trying to extract url from paste and returning only ones which are tor related -(.onion) +It trying to extract url from paste and returning only ones which are tor +related (.onion) ..seealso:: Paste method (get_regex) @@ -22,45 +22,37 @@ Requirements """ import redis -import ConfigParser import pprint import time from packages import Paste -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - Sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor") + config_section = 'PubSub_Categ' + config_channel = 'channel_2' + subscriber_name = 'tor' + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) # FUNCTIONS # publisher.info("Script subscribed to channel onion_categ") # Getting the first message from redis. - message = Sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None # Thanks to Faup project for this regex @@ -78,7 +70,8 @@ def main(): for x in PST.get_regex(url_regex): # Extracting url with regex - credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x + credential, subdomain, domain, host, tld, port, \ + resource_path, query_string, f1, f2, f3, f4 = x if f1 == "onion": domains_list.append(domain) @@ -88,25 +81,22 @@ def main(): PST.save_attribute_redis(r_serv1, channel, domains_list) pprint.pprint(domains_list) print PST.p_path - to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, PST.p_name) + to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date, + PST.p_name) if len(domains_list) > 0: - publisher.warning('{}Detected {} .onion(s)'.format(to_print, len(domains_list))) + publisher.warning('{}Detected {} .onion(s)'.format( + to_print, len(domains_list))) else: publisher.info('{}Onion related'.format(to_print)) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Onion"): - r_serv.srem("SHUTDOWN_FLAGS", "Onion") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script url is Idling 10s") time.sleep(10) - message = Sub.get_msg_from_queue(r_serv) - - -if __name__ == "__main__": - main() + message = h.redis_rpop() diff --git a/bin/ZMQ_Sub_Urls.py b/bin/ZMQ_Sub_Urls.py index 1ba8f892..963a75b3 100755 --- a/bin/ZMQ_Sub_Urls.py +++ b/bin/ZMQ_Sub_Urls.py @@ -1,13 +1,11 @@ #!/usr/bin/env python2 # -*-coding:UTF-8 -* import redis -import ConfigParser import pprint import time import dns.exception from packages import Paste from packages import lib_refine -from packages import ZMQ_PubSub from pubsublogger import publisher # Country and ASN lookup @@ -16,55 +14,43 @@ import socket import pycountry import ipaddress -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read(configfile) - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - r_serv1 = redis.StrictRedis( - host=cfg.get("Redis_Data_Merging", "host"), - port=cfg.getint("Redis_Data_Merging", "port"), - db=cfg.getint("Redis_Data_Merging", "db")) - - r_serv2 = redis.StrictRedis( - host=cfg.get("Redis_Cache", "host"), - port=cfg.getint("Redis_Cache", "port"), - db=cfg.getint("Redis_Cache", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Script" - # ZMQ # - # Subscriber + config_section = 'PubSub_Categ' + config_channel = 'channel_3' subscriber_name = "urls" - subscriber_config_section = "PubSub_Categ" + + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) # Publisher - publisher_config_section = "PubSub_Url" - publisher_name = "adress" - pubchannel = cfg.get("PubSub_Url", "channel") + pub_config_section = "PubSub_Url" + pub_config_channel = 'channel' + h.zmq_pub(pub_config_section, pub_config_channel) + + # Subscriber + h.zmq_sub(config_section) + + # REDIS # + r_serv1 = redis.StrictRedis( + host=h.config.get("Redis_Data_Merging", "host"), + port=h.config.getint("Redis_Data_Merging", "port"), + db=h.config.getint("Redis_Data_Merging", "db")) + + r_serv2 = redis.StrictRedis( + host=h.config.get("Redis_Cache", "host"), + port=h.config.getint("Redis_Cache", "port"), + db=h.config.getint("Redis_Cache", "db")) # Country to log as critical - cc_critical = cfg.get("PubSub_Url", "cc_critical") - - sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, "web_categ", subscriber_name) - pub = ZMQ_PubSub.ZMQPub(configfile, publisher_config_section, publisher_name) + cc_critical = h.config.get("PubSub_Url", "cc_critical") # FUNCTIONS # publisher.info("Script URL subscribed to channel web_categ") - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() prec_filename = None url_regex = "(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*" @@ -79,11 +65,12 @@ def main(): PST = Paste.Paste(filename) client = ip2asn() for x in PST.get_regex(url_regex): - scheme, credential, subdomain, domain, host, tld, port, resource_path, query_string, f1, f2, f3, f4 = x + scheme, credential, subdomain, domain, host, tld, \ + port, resource_path, query_string, f1, f2, f3, \ + f4 = x domains_list.append(domain) - msg = pubchannel + " " + str(x) - pub.send_message(msg) - publisher.debug('{0} Published'.format(x)) + h.zmq_pub_send(str(x)) + publisher.debug('{} Published'.format(x)) if f1 == "onion": print domain @@ -107,35 +94,38 @@ def main(): # EU is not an official ISO 3166 code (but used by RIPE # IP allocation) if cc is not None and cc != "EU": - print hostl, asn, cc, pycountry.countries.get(alpha2=cc).name + print hostl, asn, cc, \ + pycountry.countries.get(alpha2=cc).name if cc == cc_critical: # FIXME: That's going to fail. - publisher.warning('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Detected " + str(A_values[0]) + " " + hostl + " " + cc)) + publisher.warning( + 'Url;{};{};{};Detected {} {} {}'.format( + PST.p_source, PST.p_date, PST.p_name, + A_values[0], hostl, cc)) else: print hostl, asn, cc - A_values = lib_refine.checking_A_record(r_serv2, domains_list) + A_values = lib_refine.checking_A_record(r_serv2, + domains_list) if A_values[0] >= 1: PST.__setattr__(channel, A_values) - PST.save_attribute_redis(r_serv1, channel, (A_values[0], list(A_values[1]))) + PST.save_attribute_redis(r_serv1, channel, + (A_values[0], + list(A_values[1]))) pprint.pprint(A_values) - publisher.info('{0};{1};{2};{3};{4}'.format("Url", PST.p_source, PST.p_date, PST.p_name, "Checked " + str(A_values[0]) + " URL")) + publisher.info('Url;{};{};{};Checked {} URL'.format( + PST.p_source, PST.p_date, PST.p_name, A_values[0])) prec_filename = filename else: - if r_serv.sismember("SHUTDOWN_FLAGS", "Urls"): - r_serv.srem("SHUTDOWN_FLAGS", "Urls") + if h.redis_queue_shutdown(): print "Shutdown Flag Up: Terminating" publisher.warning("Shutdown Flag Up: Terminating.") break publisher.debug("Script url is Idling 10s") time.sleep(10) - message = sub.get_msg_from_queue(r_serv) + message = h.redis_rpop() except dns.exception.Timeout: print "dns.exception.Timeout", A_values - pass - -if __name__ == "__main__": - main() diff --git a/bin/indexer_lookup.py b/bin/indexer_lookup.py index 1c019c72..d89b0155 100644 --- a/bin/indexer_lookup.py +++ b/bin/indexer_lookup.py @@ -13,6 +13,7 @@ import ConfigParser import argparse import gzip +import os def readdoc(path=None): @@ -21,7 +22,7 @@ def readdoc(path=None): f = gzip.open(path, 'r') return f.read() -configfile = '../packages/config.cfg' +configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg') cfg = ConfigParser.ConfigParser() cfg.read(configfile) diff --git a/bin/packages/ZMQ_PubSub.py b/bin/packages/ZMQ_PubSub.py deleted file mode 100755 index fbf6372d..00000000 --- a/bin/packages/ZMQ_PubSub.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/python2.7 -""" -The ``ZMQ PubSub`` Modules -========================== - -""" - -import zmq - - -class PubSub(object): - """ - The PubSub class is a ``Virtual Class`` which regroup the shared attribute - of a Publisher ZeroMQ and a Subcriber ZeroMQ - - :param config: -- (ConfigParser) Handle on the parsed config file - :param log_channel: -- (str) The channel used as a log channel - :param ps_name: -- (str) The "ID" of the Publisher/Subcriber - - :return: PubSub Object - - ..note:: The ps_name was implemented to separate subscriber queues in redis - when they are listening on a same "stream" - ..seealso:: Method of the ZMQSub class - - ..todo:: Create Implementing a log channel as an attribute of this virtual class. - - """ - def __init__(self, config, log_channel, ps_name): - self._ps_name = ps_name - self._config_parser = config - - self._context_zmq = zmq.Context() - - -class ZMQPub(PubSub): - """ - This class create a ZMQ Publisher which is able to send_message to a choosen socket. - - :param pub_config_section: -- (str) The name of the section in the config file to get the settings - - :return: ZMQPub Object - - :Example: - Extract of the config file: - [PubSub_Categ] - adress = tcp://127.0.0.1:5003 - - Creating the object and sending message: - MyPublisher = ZMQPub('./packages/config.cfg', 'PubSub_Categ', 'pubcateg') - - msg = "categ1"+" "+"Im the data sent on the categ1 channel" - MyPublisher.send_message(msg) - - ..note:: The ps_name attribute for a publisher is "optionnal" but still required to be - instantiated correctly. - - """ - def __init__(self, config, pub_config_section, ps_name): - super(ZMQPub, self).__init__(config, "Default", ps_name) - - self._pubsocket = self._context_zmq.socket(zmq.PUB) - self._pub_adress = self._config_parser.get(pub_config_section, "adress") - - self._pubsocket.bind(self._pub_adress) - - def send_message(self, message): - """Send a message throught the publisher socket""" - self._pubsocket.send(message) - - -class ZMQSub(PubSub): - """ - This class create a ZMQ Subcriber which is able to receive message directly or - throught redis as a buffer. - - The redis buffer is usefull when the subcriber do a time consuming job which is - desynchronising it from the stream of data received. - The redis buffer ensures that no data will be loss waiting to be processed. - - :param sub_config_section: -- (str) The name of the section in the config file to get the settings - :param channel: -- (str) The listening channel of the Subcriber. - - :return: ZMQSub Object - - :Example: - Extract of the config file: - [PubSub_Global] - adress = tcp://127.0.0.1:5000 - channel = filelist - - Creating the object and receiving data + pushing to redis (redis buffering): - - r_serv = redis.StrictRedis( - host = 127.0.0.1, - port = 6387, - db = 0) - - channel = cfg.get("PubSub_Global", "channel") - MySubscriber = ZMQSub('./packages/config.cfg',"PubSub_Global", channel, "duplicate") - - while True: - MySubscriber.get_and_lpush(r_serv) - - - Inside another script use this line to retrive the data from redis. - ... - while True: - MySubscriber.get_msg_from_queue(r_serv) - ... - - ..note:: If you don't want any redis buffering simply use the "get_message" method - - """ - def __init__(self, config, sub_config_section, channel, ps_name): - super(ZMQSub, self).__init__(config, "Default", ps_name) - - self._subsocket = self._context_zmq.socket(zmq.SUB) - self._sub_adress = self._config_parser.get(sub_config_section, "adress") - - self._subsocket.connect(self._sub_adress) - - self._channel = channel - self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel) - - def get_msg_from_queue(self, r_serv): - """ - Get the first sent message from a Redis List - - :return: (str) Message from Publisher - - """ - return r_serv.rpop(self._channel+self._ps_name) diff --git a/bin/packages/lib_words.py b/bin/packages/lib_words.py index 0acea7c8..9446a8ec 100644 --- a/bin/packages/lib_words.py +++ b/bin/packages/lib_words.py @@ -45,7 +45,7 @@ def create_dirfile(r_serv, directory, overwrite): r_serv.delete("filelist") for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("The list was overwritten") @@ -53,13 +53,13 @@ def create_dirfile(r_serv, directory, overwrite): if r_serv.llen("filelist") == 0: for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("New list created") else: for x in listdirectory(directory): - r_serv.rpush("filelist", x) + r_serv.lpush("filelist", x) publisher.info("The list was updated with new elements")