ail-framework/bin/packages/ZMQ_PubSub.py
Starow 1379ef705a Initial import of AIL framework - Analysis Information Leak framework
AIL is a modular framework to analyse potential information leak from unstructured data source like pastes from Past
ebin or similar services. AIL framework is flexible and can be extended to support other functionalities to mine sen
sitive information
2014-08-06 11:43:40 +02:00

159 lines
No EOL
5.1 KiB
Python
Executable file

#!/usr/bin/python2.7
"""
The ``ZMQ PubSub`` Modules
==========================
"""
import zmq, ConfigParser, redis, pprint, os, sys
#from pubsublogger import publisher
class PubSub(object):
"""
The PubSub class is a ``Virtual Class`` which regroup the shared attribute
of a Publisher ZeroMQ and a Subcriber ZeroMQ
:param file_conf: -- (str) The filepath of the configuration file used (.cfg)
:param log_channel: -- (str) The channel used as a log channel
:param ps_name: -- (str) The "ID" of the Publisher/Subcriber
:return: PubSub Object
..note:: The ps_name was implemented to separate subscriber queues in redis
when they are listening on a same "stream"
..seealso:: Method of the ZMQSub class
..todo:: Create Implementing a log channel as an attribute of this virtual class.
"""
def __init__(self, file_conf, log_channel, ps_name):
self._ps_name = ps_name
self._config_parser = ConfigParser.ConfigParser()
self._config_file = file_conf # "./packages/config.cfg"
self._config_parser.read(self._config_file)
self._context_zmq = zmq.Context()
#self._logging_publisher_channel = log_channel # "Default"
#publisher.channel(self._logging_publisher_channel)
class ZMQPub(PubSub):
"""
This class create a ZMQ Publisher which is able to send_message to a choosen socket.
:param pub_config_section: -- (str) The name of the section in the config file to get the settings
:return: ZMQPub Object
:Example:
Extract of the config file:
[PubSub_Categ]
adress = tcp://127.0.0.1:5003
Creating the object and sending message:
MyPublisher = ZMQPub('./packages/config.cfg', 'PubSub_Categ', 'pubcateg')
msg = "categ1"+" "+"Im the data sent on the categ1 channel"
MyPublisher.send_message(msg)
..note:: The ps_name attribute for a publisher is "optionnal" but still required to be
instantiated correctly.
"""
def __init__(self, file_conf, pub_config_section, ps_name):
super(ZMQPub, self).__init__(file_conf, "Default", ps_name)
self._pub_config_section = pub_config_section
self._pubsocket = self._context_zmq.socket(zmq.PUB)
self._pub_adress = self._config_parser.get(self._pub_config_section, "adress")
self._pubsocket.bind(self._config_parser.get(self._pub_config_section, "adress"))
def send_message(self, message):
"""Send a message throught the publisher socket"""
self._pubsocket.send(message)
class ZMQSub(PubSub):
"""
This class create a ZMQ Subcriber which is able to receive message directly or
throught redis as a buffer.
The redis buffer is usefull when the subcriber do a time consuming job which is
desynchronising it from the stream of data received.
The redis buffer ensures that no data will be loss waiting to be processed.
:param sub_config_section: -- (str) The name of the section in the config file to get the settings
:param channel: -- (str) The listening channel of the Subcriber.
:return: ZMQSub Object
:Example:
Extract of the config file:
[PubSub_Global]
adress = tcp://127.0.0.1:5000
channel = filelist
Creating the object and receiving data + pushing to redis (redis buffering):
r_serv = redis.StrictRedis(
host = 127.0.0.1,
port = 6387,
db = 0)
channel = cfg.get("PubSub_Global", "channel")
MySubscriber = ZMQSub('./packages/config.cfg',"PubSub_Global", channel, "duplicate")
while True:
MySubscriber.get_and_lpush(r_serv)
Inside another script use this line to retrive the data from redis.
...
while True:
MySubscriber.get_msg_from_queue(r_serv)
...
..note:: If you don't want any redis buffering simply use the "get_message" method
"""
def __init__(self, file_conf, sub_config_section, channel, ps_name):
super(ZMQSub, self).__init__(file_conf, "Default", ps_name)
self._sub_config_section = sub_config_section
self._subsocket = self._context_zmq.socket(zmq.SUB)
self._sub_adress = self._config_parser.get(self._sub_config_section, "adress")
self._subsocket.connect(self._config_parser.get(self._sub_config_section, "adress"))
self._channel = channel
self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel)
def get_message(self):
"""
Get the first sent message from a Publisher.
:return: (str) Message from Publisher
"""
return self._subsocket.recv()
def get_and_lpush(self, r_serv):
"""
Get the first sent message from a Publisher and storing it in redis
..note:: This function also create a set named "queue" for monitoring needs
"""
r_serv.sadd("queues",self._channel+self._ps_name)
r_serv.lpush(self._channel+self._ps_name, self._subsocket.recv())
def get_msg_from_queue(self, r_serv):
"""
Get the first sent message from a Redis List
:return: (str) Message from Publisher
"""
return r_serv.rpop(self._channel+self._ps_name)