diff --git a/bin/Helper.py b/bin/Helper.py index 8f4913c2..1be5e1e4 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -19,7 +19,7 @@ import zmq class Redis_Queues(object): - def __init__(self, zmq_conf_section, zmq_conf_channel, subscriber_name): + def __init__(self, conf_section, conf_channel, subscriber_name): configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg') if not os.path.exists(configfile): raise Exception('Unable to find the configuration file. \ @@ -29,13 +29,7 @@ class Redis_Queues(object): self.config.read(configfile) self.subscriber_name = subscriber_name - # ZMQ subscriber - self.sub_channel = self.config.get(zmq_conf_section, zmq_conf_channel) - sub_address = self.config.get(zmq_conf_section, 'adress') - context = zmq.Context() - self.sub_socket = context.socket(zmq.SUB) - self.sub_socket.connect(sub_address) - self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel) + self.sub_channel = self.config.get(conf_section, conf_channel) # Redis Queue config_section = "Redis_Queues" @@ -43,10 +37,15 @@ class Redis_Queues(object): host=self.config.get(config_section, "host"), port=self.config.getint(config_section, "port"), db=self.config.getint(config_section, "db")) - self.redis_channel = self.sub_channel + subscriber_name + + def zmq_sub(self, conf_section): + sub_address = self.config.get(conf_section, 'adress') + context = zmq.Context() + self.sub_socket = context.socket(zmq.SUB) + self.sub_socket.connect(sub_address) + self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel) def zmq_pub(self, config_section): - # FIXME: should probably go somewhere else context = zmq.Context() self.pub_socket = context.socket(zmq.PUB) self.pub_socket.bind(self.config.get(config_section, 'adress')) @@ -60,6 +59,7 @@ class Redis_Queues(object): return self.r_queues.srem('SHUTDOWN_FLAGS', flag) def redis_queue_subscribe(self, publisher): + self.redis_channel = self.sub_channel + self.subscriber_name publisher.info("Suscribed to channel {}".format(self.sub_channel)) while True: msg = self.sub_socket.recv() diff --git a/bin/Shutdown.py b/bin/Shutdown.py index e2474c32..f197e5c8 100755 --- a/bin/Shutdown.py +++ b/bin/Shutdown.py @@ -38,6 +38,7 @@ def main(): port=cfg.getint("Redis_Queues", "port"), db=cfg.getint("Redis_Queues", "db")) + # FIXME: automatic based on the queue name. # ### SCRIPTS #### r_serv.sadd("SHUTDOWN_FLAGS", "Feed") r_serv.sadd("SHUTDOWN_FLAGS", "Categ") diff --git a/bin/ZMQ_Feed_Q.py b/bin/ZMQ_Feed_Q.py index 2bcdd238..ba7c4a2d 100755 --- a/bin/ZMQ_Feed_Q.py +++ b/bin/ZMQ_Feed_Q.py @@ -33,5 +33,6 @@ if __name__ == "__main__": config_channel = 'topicfilter' subscriber_name = 'feed' - h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h = Helper.Redis_Queues(subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Categ_Q.py b/bin/ZMQ_PubSub_Categ_Q.py index 44f06b17..7780dd2f 100755 --- a/bin/ZMQ_PubSub_Categ_Q.py +++ b/bin/ZMQ_PubSub_Categ_Q.py @@ -31,4 +31,5 @@ if __name__ == "__main__": subscriber_name = 'categ' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Lines_Q.py b/bin/ZMQ_PubSub_Lines_Q.py index a9967d1a..abf1fd35 100755 --- a/bin/ZMQ_PubSub_Lines_Q.py +++ b/bin/ZMQ_PubSub_Lines_Q.py @@ -30,4 +30,5 @@ if __name__ == "__main__": subscriber_name = 'line' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_PubSub_Tokenize_Q.py b/bin/ZMQ_PubSub_Tokenize_Q.py index 5135144d..7594c37a 100755 --- a/bin/ZMQ_PubSub_Tokenize_Q.py +++ b/bin/ZMQ_PubSub_Tokenize_Q.py @@ -31,4 +31,5 @@ if __name__ == "__main__": subscriber_name = 'tokenize' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Pub_Global.py b/bin/ZMQ_Pub_Global.py index d47ee730..56057737 100755 --- a/bin/ZMQ_Pub_Global.py +++ b/bin/ZMQ_Pub_Global.py @@ -21,49 +21,33 @@ Requirements *Need running Redis instances. (Redis) """ -import redis -import ConfigParser import time -from packages import ZMQ_PubSub from pubsublogger import publisher -configfile = './packages/config.cfg' +import Helper - -def main(): - """Main Function""" - - # CONFIG # - cfg = ConfigParser.ConfigParser() - cfg.read('./packages/config.cfg') - - # REDIS # - r_serv = redis.StrictRedis( - host=cfg.get("Redis_Queues", "host"), - port=cfg.getint("Redis_Queues", "port"), - db=cfg.getint("Redis_Queues", "db")) - - # LOGGING # +if __name__ == "__main__": publisher.channel = "Global" - # ZMQ # - pub_glob = ZMQ_PubSub.ZMQPub(configfile, "PubSub_Global", "global") + config_section = 'PubSub_Global' + config_channel = 'channel' + subscriber_name = 'global' - # FONCTIONS # + h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + + # Publisher + pub_config_section = 'PubSub_Global' + h.zmq_pub(pub_config_section) + pub_channel = h.config.get(pub_config_section, "channel") + + # LOGGING # publisher.info("Starting to publish.") while True: - filename = r_serv.lpop("filelist") + filename = h.r_queues.lpop(h.sub_channel) if filename is not None: - - msg = cfg.get("PubSub_Global", "channel")+" "+filename - pub_glob.send_message(msg) - publisher.debug("{0} Published".format(msg)) + h.pub_socket.send('{} {}'.format(pub_channel, filename)) else: time.sleep(10) publisher.debug("Nothing to publish") - - -if __name__ == "__main__": - main() diff --git a/bin/ZMQ_Sub_Attributes_Q.py b/bin/ZMQ_Sub_Attributes_Q.py index c0202e61..6117174d 100755 --- a/bin/ZMQ_Sub_Attributes_Q.py +++ b/bin/ZMQ_Sub_Attributes_Q.py @@ -31,4 +31,5 @@ if __name__ == "__main__": subscriber_name = 'attributes' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_CreditCards_Q.py b/bin/ZMQ_Sub_CreditCards_Q.py index 8da06df1..46bc33e9 100755 --- a/bin/ZMQ_Sub_CreditCards_Q.py +++ b/bin/ZMQ_Sub_CreditCards_Q.py @@ -14,4 +14,5 @@ if __name__ == "__main__": subscriber_name = 'creditcard_categ' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Curve_Q.py b/bin/ZMQ_Sub_Curve_Q.py index 55333bb7..572c9c79 100755 --- a/bin/ZMQ_Sub_Curve_Q.py +++ b/bin/ZMQ_Sub_Curve_Q.py @@ -31,4 +31,5 @@ if __name__ == "__main__": subscriber_name = 'curve' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Duplicate_Q.py b/bin/ZMQ_Sub_Duplicate_Q.py index 8901b377..24af023e 100755 --- a/bin/ZMQ_Sub_Duplicate_Q.py +++ b/bin/ZMQ_Sub_Duplicate_Q.py @@ -13,4 +13,5 @@ if __name__ == "__main__": subscriber_name = 'duplicate' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Indexer_Q.py b/bin/ZMQ_Sub_Indexer_Q.py index 5433663f..cbdb4a05 100755 --- a/bin/ZMQ_Sub_Indexer_Q.py +++ b/bin/ZMQ_Sub_Indexer_Q.py @@ -25,4 +25,5 @@ if __name__ == "__main__": subscriber_name = 'indexer' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Mails_Q.py b/bin/ZMQ_Sub_Mails_Q.py index 0cd21971..4fe630b5 100755 --- a/bin/ZMQ_Sub_Mails_Q.py +++ b/bin/ZMQ_Sub_Mails_Q.py @@ -13,4 +13,5 @@ if __name__ == "__main__": subscriber_name = 'mails_categ' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Onion_Q.py b/bin/ZMQ_Sub_Onion_Q.py index d637452a..6d239203 100755 --- a/bin/ZMQ_Sub_Onion_Q.py +++ b/bin/ZMQ_Sub_Onion_Q.py @@ -30,4 +30,5 @@ if __name__ == "__main__": subscriber_name = 'onion_categ' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher) diff --git a/bin/ZMQ_Sub_Urls_Q.py b/bin/ZMQ_Sub_Urls_Q.py index bf8ad14f..b083090e 100755 --- a/bin/ZMQ_Sub_Urls_Q.py +++ b/bin/ZMQ_Sub_Urls_Q.py @@ -14,4 +14,5 @@ if __name__ == "__main__": subscriber_name = 'web_categ' h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) + h.zmq_sub(config_section) h.redis_queue_subscribe(publisher)