Do not create a ZMQ sub if it is not required.

This commit is contained in:
Raphaël Vinot 2014-08-19 19:53:33 +02:00
parent 45b0bf3983
commit 8d9ffbaa53
15 changed files with 39 additions and 42 deletions

View file

@ -19,7 +19,7 @@ import zmq
class Redis_Queues(object): class Redis_Queues(object):
def __init__(self, zmq_conf_section, zmq_conf_channel, subscriber_name): def __init__(self, conf_section, conf_channel, subscriber_name):
configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg') configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg')
if not os.path.exists(configfile): if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \ raise Exception('Unable to find the configuration file. \
@ -29,13 +29,7 @@ class Redis_Queues(object):
self.config.read(configfile) self.config.read(configfile)
self.subscriber_name = subscriber_name self.subscriber_name = subscriber_name
# ZMQ subscriber self.sub_channel = self.config.get(conf_section, conf_channel)
self.sub_channel = self.config.get(zmq_conf_section, zmq_conf_channel)
sub_address = self.config.get(zmq_conf_section, 'adress')
context = zmq.Context()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(sub_address)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel)
# Redis Queue # Redis Queue
config_section = "Redis_Queues" config_section = "Redis_Queues"
@ -43,10 +37,15 @@ class Redis_Queues(object):
host=self.config.get(config_section, "host"), host=self.config.get(config_section, "host"),
port=self.config.getint(config_section, "port"), port=self.config.getint(config_section, "port"),
db=self.config.getint(config_section, "db")) db=self.config.getint(config_section, "db"))
self.redis_channel = self.sub_channel + subscriber_name
def zmq_sub(self, conf_section):
sub_address = self.config.get(conf_section, 'adress')
context = zmq.Context()
self.sub_socket = context.socket(zmq.SUB)
self.sub_socket.connect(sub_address)
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel)
def zmq_pub(self, config_section): def zmq_pub(self, config_section):
# FIXME: should probably go somewhere else
context = zmq.Context() context = zmq.Context()
self.pub_socket = context.socket(zmq.PUB) self.pub_socket = context.socket(zmq.PUB)
self.pub_socket.bind(self.config.get(config_section, 'adress')) self.pub_socket.bind(self.config.get(config_section, 'adress'))
@ -60,6 +59,7 @@ class Redis_Queues(object):
return self.r_queues.srem('SHUTDOWN_FLAGS', flag) return self.r_queues.srem('SHUTDOWN_FLAGS', flag)
def redis_queue_subscribe(self, publisher): def redis_queue_subscribe(self, publisher):
self.redis_channel = self.sub_channel + self.subscriber_name
publisher.info("Suscribed to channel {}".format(self.sub_channel)) publisher.info("Suscribed to channel {}".format(self.sub_channel))
while True: while True:
msg = self.sub_socket.recv() msg = self.sub_socket.recv()

View file

@ -38,6 +38,7 @@ def main():
port=cfg.getint("Redis_Queues", "port"), port=cfg.getint("Redis_Queues", "port"),
db=cfg.getint("Redis_Queues", "db")) db=cfg.getint("Redis_Queues", "db"))
# FIXME: automatic based on the queue name.
# ### SCRIPTS #### # ### SCRIPTS ####
r_serv.sadd("SHUTDOWN_FLAGS", "Feed") r_serv.sadd("SHUTDOWN_FLAGS", "Feed")
r_serv.sadd("SHUTDOWN_FLAGS", "Categ") r_serv.sadd("SHUTDOWN_FLAGS", "Categ")

View file

@ -33,5 +33,6 @@ if __name__ == "__main__":
config_channel = 'topicfilter' config_channel = 'topicfilter'
subscriber_name = 'feed' subscriber_name = 'feed'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -31,4 +31,5 @@ if __name__ == "__main__":
subscriber_name = 'categ' subscriber_name = 'categ'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -30,4 +30,5 @@ if __name__ == "__main__":
subscriber_name = 'line' subscriber_name = 'line'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -31,4 +31,5 @@ if __name__ == "__main__":
subscriber_name = 'tokenize' subscriber_name = 'tokenize'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -21,49 +21,33 @@ Requirements
*Need running Redis instances. (Redis) *Need running Redis instances. (Redis)
""" """
import redis
import ConfigParser
import time import time
from packages import ZMQ_PubSub
from pubsublogger import publisher from pubsublogger import publisher
configfile = './packages/config.cfg' import Helper
if __name__ == "__main__":
def main():
"""Main Function"""
# CONFIG #
cfg = ConfigParser.ConfigParser()
cfg.read('./packages/config.cfg')
# REDIS #
r_serv = redis.StrictRedis(
host=cfg.get("Redis_Queues", "host"),
port=cfg.getint("Redis_Queues", "port"),
db=cfg.getint("Redis_Queues", "db"))
# LOGGING #
publisher.channel = "Global" publisher.channel = "Global"
# ZMQ # config_section = 'PubSub_Global'
pub_glob = ZMQ_PubSub.ZMQPub(configfile, "PubSub_Global", "global") config_channel = 'channel'
subscriber_name = 'global'
# FONCTIONS # h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
# Publisher
pub_config_section = 'PubSub_Global'
h.zmq_pub(pub_config_section)
pub_channel = h.config.get(pub_config_section, "channel")
# LOGGING #
publisher.info("Starting to publish.") publisher.info("Starting to publish.")
while True: while True:
filename = r_serv.lpop("filelist") filename = h.r_queues.lpop(h.sub_channel)
if filename is not None: if filename is not None:
h.pub_socket.send('{} {}'.format(pub_channel, filename))
msg = cfg.get("PubSub_Global", "channel")+" "+filename
pub_glob.send_message(msg)
publisher.debug("{0} Published".format(msg))
else: else:
time.sleep(10) time.sleep(10)
publisher.debug("Nothing to publish") publisher.debug("Nothing to publish")
if __name__ == "__main__":
main()

View file

@ -31,4 +31,5 @@ if __name__ == "__main__":
subscriber_name = 'attributes' subscriber_name = 'attributes'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -14,4 +14,5 @@ if __name__ == "__main__":
subscriber_name = 'creditcard_categ' subscriber_name = 'creditcard_categ'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -31,4 +31,5 @@ if __name__ == "__main__":
subscriber_name = 'curve' subscriber_name = 'curve'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -13,4 +13,5 @@ if __name__ == "__main__":
subscriber_name = 'duplicate' subscriber_name = 'duplicate'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -25,4 +25,5 @@ if __name__ == "__main__":
subscriber_name = 'indexer' subscriber_name = 'indexer'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -13,4 +13,5 @@ if __name__ == "__main__":
subscriber_name = 'mails_categ' subscriber_name = 'mails_categ'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -30,4 +30,5 @@ if __name__ == "__main__":
subscriber_name = 'onion_categ' subscriber_name = 'onion_categ'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)

View file

@ -14,4 +14,5 @@ if __name__ == "__main__":
subscriber_name = 'web_categ' subscriber_name = 'web_categ'
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name) h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
h.zmq_sub(config_section)
h.redis_queue_subscribe(publisher) h.redis_queue_subscribe(publisher)