mirror of
https://github.com/ail-project/ail-framework.git
synced 2025-01-18 16:36:13 +00:00
Cleanup the queues.
This commit is contained in:
parent
078c8ea836
commit
f1753d67c6
16 changed files with 193 additions and 474 deletions
6
.gitignore
vendored
Normal file
6
.gitignore
vendored
Normal file
|
@ -0,0 +1,6 @@
|
|||
*.swp
|
||||
|
||||
# Install Dirs
|
||||
AILENV
|
||||
redis-leveldb
|
||||
redis
|
61
bin/Helper.py
Executable file
61
bin/Helper.py
Executable file
|
@ -0,0 +1,61 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
Queue helper module
|
||||
============================
|
||||
|
||||
This module subscribe to a Publisher stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by others scripts.
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Running Redis instances.
|
||||
*Should register to the Publisher "ZMQ_PubSub_Line" channel 1
|
||||
|
||||
"""
|
||||
import redis
|
||||
import ConfigParser
|
||||
import os
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
|
||||
class Queues(object):
|
||||
|
||||
def __init__(self):
|
||||
configfile = os.join(os.environ('AIL_BIN'), 'packages/config.cfg')
|
||||
if not os.exists(configfile):
|
||||
raise Exception('Unable to find the configuration file. Did you set environment variables? Or activate the virtualenv.')
|
||||
self.config = ConfigParser.ConfigParser()
|
||||
self.config.read(self.configfile)
|
||||
|
||||
def _queue_init_redis(self):
|
||||
config_section = "Redis_Queues"
|
||||
self.r_queues = redis.StrictRedis(
|
||||
host=self.config.get(config_section, "host"),
|
||||
port=self.config.getint(config_section, "port"),
|
||||
db=self.config.getint(config_section, "db"))
|
||||
|
||||
def _queue_shutdown(self):
|
||||
# FIXME: Why not just a key?
|
||||
if self.r_queues.sismember("SHUTDOWN_FLAGS", "Feed_Q"):
|
||||
self.r_queues.srem("SHUTDOWN_FLAGS", "Feed_Q")
|
||||
return True
|
||||
return False
|
||||
|
||||
def queue_subscribe(self, publisher, config_section, channel,
|
||||
subscriber_name):
|
||||
channel = self.config.get(config_section, channel)
|
||||
zmq_sub = ZMQ_PubSub.ZMQSub(self.config, config_section,
|
||||
channel, subscriber_name)
|
||||
publisher.info("""Suscribed to channel {}""".format(channel))
|
||||
self._queue_init_redis()
|
||||
while True:
|
||||
zmq_sub.get_and_lpush(self.r_queues)
|
||||
if self._queues_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
|
@ -20,45 +20,18 @@ Requirements
|
|||
"channel_name"+" "+/path/to/the/paste.gz+" "base64_data_encoded_paste"
|
||||
|
||||
"""
|
||||
import redis
|
||||
import ConfigParser
|
||||
|
||||
from pubsublogger import publisher
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("Feed", "topicfilter")
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "Feed", channel, "feed")
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Feed_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Feed_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'Feed'
|
||||
config_channel = 'topicfilter'
|
||||
subscriber_name = 'feed'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -17,47 +17,18 @@ Requirements
|
|||
*Should register to the Publisher "ZMQ_PubSub_Tokenize"
|
||||
|
||||
"""
|
||||
import redis
|
||||
import ConfigParser
|
||||
|
||||
from pubsublogger import publisher
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("PubSub_Words", "channel_0")
|
||||
subscriber_name = "categ"
|
||||
subscriber_config_section = "PubSub_Words"
|
||||
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Categ_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Categ_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = 'Queuing'
|
||||
|
||||
config_section = 'PubSub_Words'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'categ'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -18,47 +18,16 @@ Requirements
|
|||
|
||||
"""
|
||||
|
||||
import redis
|
||||
import ConfigParser
|
||||
from pubsublogger import publisher
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("PubSub_Global", "channel")
|
||||
subscriber_name = "line"
|
||||
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name)
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Lines_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Lines_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
import Helper
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = "PubSub_Global"
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'line'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -17,48 +17,18 @@ Requirements
|
|||
*Should register to the Publisher "ZMQ_PubSub_Line" channel 1
|
||||
|
||||
"""
|
||||
import redis
|
||||
import ConfigParser
|
||||
|
||||
from pubsublogger import publisher
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("PubSub_Longlines", "channel_1")
|
||||
subscriber_name = "tokenize"
|
||||
subscriber_config_section = "PubSub_Longlines"
|
||||
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Tokenize_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Tokenize_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Longlines'
|
||||
config_channel = 'channel_1'
|
||||
subscriber_name = 'tokenize'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -18,47 +18,17 @@ Requirements
|
|||
|
||||
"""
|
||||
|
||||
import redis
|
||||
import ConfigParser
|
||||
from pubsublogger import publisher
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("PubSub_Global", "channel")
|
||||
subscriber_name = "attributes"
|
||||
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name)
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Attributes_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Attributes_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'attributes'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -1,44 +1,17 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
import redis
|
||||
import ConfigParser
|
||||
from packages import ZMQ_PubSub
|
||||
from pubsublogger import publisher
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "creditcard_categ", "cards")
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format("creditcard_categ"))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Creditcards_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Creditcards_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'creditcard_categ'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -17,47 +17,18 @@ Requirements
|
|||
*Should register to the Publisher "ZMQ_PubSub_Tokenize"
|
||||
|
||||
"""
|
||||
import redis
|
||||
import ConfigParser
|
||||
|
||||
from pubsublogger import publisher
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("PubSub_Words", "channel_0")
|
||||
subscriber_name = "curve"
|
||||
subscriber_config_section = "PubSub_Words"
|
||||
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name)
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Curve_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Curve_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Words'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'curve'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -1,45 +1,16 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
import redis
|
||||
import ConfigParser
|
||||
from packages import ZMQ_PubSub
|
||||
from pubsublogger import publisher
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("PubSub_Global", "channel")
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, "duplicate")
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Duplicate_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Duplicate_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
import Helper
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = 'Queuing'
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'duplicate'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -12,49 +12,17 @@ handling the indexing process of the files seen.
|
|||
|
||||
"""
|
||||
|
||||
import redis
|
||||
import ConfigParser
|
||||
from pubsublogger import publisher
|
||||
from packages import ZMQ_PubSub
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
channel = cfg.get("PubSub_Global", "channel")
|
||||
subscriber_name = "indexer"
|
||||
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name)
|
||||
|
||||
publisher.info("""Suscribed to channel {0}""".format(channel))
|
||||
|
||||
# Until the service is requested to be shutdown, the service
|
||||
# will get the data from the global ZMQ queue and buffer it in Redis.
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Indexer_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Indexer_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'indexer'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -1,44 +1,16 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
import redis
|
||||
import ConfigParser
|
||||
from packages import ZMQ_PubSub
|
||||
from pubsublogger import publisher
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "mails_categ", "emails")
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format("mails_categ"))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Mails_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Mails_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
import Helper
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_1'
|
||||
subscriber_name = 'mails_categ'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -17,44 +17,17 @@ Requirements
|
|||
*Should register to the Publisher "ZMQ_PubSub_Categ"
|
||||
|
||||
"""
|
||||
import redis
|
||||
import ConfigParser
|
||||
from packages import ZMQ_PubSub
|
||||
from pubsublogger import publisher
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "onion_categ", "tor")
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format("onion_categ"))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Onion_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Onion_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_2'
|
||||
subscriber_name = 'onion_categ'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -1,44 +1,17 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
import redis
|
||||
import ConfigParser
|
||||
from packages import ZMQ_PubSub
|
||||
from pubsublogger import publisher
|
||||
|
||||
configfile = './packages/config.cfg'
|
||||
import Helper
|
||||
|
||||
|
||||
def main():
|
||||
"""Main Function"""
|
||||
|
||||
# CONFIG #
|
||||
cfg = ConfigParser.ConfigParser()
|
||||
cfg.read(configfile)
|
||||
|
||||
# REDIS #
|
||||
r_serv = redis.StrictRedis(
|
||||
host=cfg.get("Redis_Queues", "host"),
|
||||
port=cfg.getint("Redis_Queues", "port"),
|
||||
db=cfg.getint("Redis_Queues", "db"))
|
||||
|
||||
# LOGGING #
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
# ZMQ #
|
||||
sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Categ", "web_categ", "urls")
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Suscribed to channel {0}""".format("web_categ"))
|
||||
|
||||
while True:
|
||||
sub.get_and_lpush(r_serv)
|
||||
|
||||
if r_serv.sismember("SHUTDOWN_FLAGS", "Urls_Q"):
|
||||
r_serv.srem("SHUTDOWN_FLAGS", "Urls_Q")
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_3'
|
||||
subscriber_name = 'web_categ'
|
||||
|
||||
h = Helper.Queues()
|
||||
h.queue_subscribe(publisher, config_section, config_channel, subscriber_name)
|
||||
|
|
|
@ -6,7 +6,6 @@ The ``ZMQ PubSub`` Modules
|
|||
"""
|
||||
|
||||
import zmq
|
||||
import ConfigParser
|
||||
|
||||
|
||||
class PubSub(object):
|
||||
|
@ -14,7 +13,7 @@ class PubSub(object):
|
|||
The PubSub class is a ``Virtual Class`` which regroup the shared attribute
|
||||
of a Publisher ZeroMQ and a Subcriber ZeroMQ
|
||||
|
||||
:param file_conf: -- (str) The filepath of the configuration file used (.cfg)
|
||||
:param config: -- (ConfigParser) Handle on the parsed config file
|
||||
:param log_channel: -- (str) The channel used as a log channel
|
||||
:param ps_name: -- (str) The "ID" of the Publisher/Subcriber
|
||||
|
||||
|
@ -27,18 +26,13 @@ class PubSub(object):
|
|||
..todo:: Create Implementing a log channel as an attribute of this virtual class.
|
||||
|
||||
"""
|
||||
def __init__(self, file_conf, log_channel, ps_name):
|
||||
def __init__(self, config, log_channel, ps_name):
|
||||
self._ps_name = ps_name
|
||||
self._config_parser = ConfigParser.ConfigParser()
|
||||
self._config_file = file_conf # "./packages/config.cfg"
|
||||
|
||||
self._config_parser.read(self._config_file)
|
||||
self._config_parser = config
|
||||
|
||||
self._context_zmq = zmq.Context()
|
||||
|
||||
# self._logging_publisher_channel = log_channel # "Default"
|
||||
# publisher.channel(self._logging_publisher_channel)
|
||||
|
||||
|
||||
class ZMQPub(PubSub):
|
||||
"""
|
||||
|
@ -63,14 +57,14 @@ class ZMQPub(PubSub):
|
|||
instantiated correctly.
|
||||
|
||||
"""
|
||||
def __init__(self, file_conf, pub_config_section, ps_name):
|
||||
super(ZMQPub, self).__init__(file_conf, "Default", ps_name)
|
||||
def __init__(self, config, pub_config_section, ps_name):
|
||||
super(ZMQPub, self).__init__(config, "Default", ps_name)
|
||||
|
||||
self._pub_config_section = pub_config_section
|
||||
self._pubsocket = self._context_zmq.socket(zmq.PUB)
|
||||
self._pub_adress = self._config_parser.get(self._pub_config_section, "adress")
|
||||
|
||||
self._pubsocket.bind(self._config_parser.get(self._pub_config_section, "adress"))
|
||||
self._pubsocket.bind(self._pub_adress)
|
||||
|
||||
def send_message(self, message):
|
||||
"""Send a message throught the publisher socket"""
|
||||
|
@ -120,14 +114,14 @@ class ZMQSub(PubSub):
|
|||
..note:: If you don't want any redis buffering simply use the "get_message" method
|
||||
|
||||
"""
|
||||
def __init__(self, file_conf, sub_config_section, channel, ps_name):
|
||||
super(ZMQSub, self).__init__(file_conf, "Default", ps_name)
|
||||
def __init__(self, config, sub_config_section, channel, ps_name):
|
||||
super(ZMQSub, self).__init__(config, "Default", ps_name)
|
||||
|
||||
self._sub_config_section = sub_config_section
|
||||
self._subsocket = self._context_zmq.socket(zmq.SUB)
|
||||
self._sub_adress = self._config_parser.get(self._sub_config_section, "adress")
|
||||
|
||||
self._subsocket.connect(self._config_parser.get(self._sub_config_section, "adress"))
|
||||
self._subsocket.connect(self._sub_adress)
|
||||
|
||||
self._channel = channel
|
||||
self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel)
|
||||
|
|
|
@ -54,7 +54,11 @@ channel_0 = words
|
|||
|
||||
[PubSub_Categ]
|
||||
adress = tcp://127.0.0.1:5003
|
||||
#Channels are dynamic (1 channel per categ)
|
||||
channel_0 = cards
|
||||
channel_1 = emails
|
||||
channel_2 = tor
|
||||
channel_3 = urls
|
||||
#Channels are dynamic (1 channel per categ) <= FIXME: no it's not.
|
||||
|
||||
[PubSub_Url]
|
||||
adress = tcp://127.0.0.1:5004
|
||||
|
|
Loading…
Add table
Reference in a new issue