mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-23 06:37:15 +00:00
1379ef705a
AIL is a modular framework to analyse potential information leak from unstructured data source like pastes from Past ebin or similar services. AIL framework is flexible and can be extended to support other functionalities to mine sen sitive information
159 lines
No EOL
5.1 KiB
Python
Executable file
159 lines
No EOL
5.1 KiB
Python
Executable file
#!/usr/bin/python2.7
|
|
"""
|
|
The ``ZMQ PubSub`` Modules
|
|
==========================
|
|
|
|
"""
|
|
|
|
import zmq, ConfigParser, redis, pprint, os, sys
|
|
#from pubsublogger import publisher
|
|
|
|
class PubSub(object):
|
|
"""
|
|
The PubSub class is a ``Virtual Class`` which regroup the shared attribute
|
|
of a Publisher ZeroMQ and a Subcriber ZeroMQ
|
|
|
|
:param file_conf: -- (str) The filepath of the configuration file used (.cfg)
|
|
:param log_channel: -- (str) The channel used as a log channel
|
|
:param ps_name: -- (str) The "ID" of the Publisher/Subcriber
|
|
|
|
:return: PubSub Object
|
|
|
|
..note:: The ps_name was implemented to separate subscriber queues in redis
|
|
when they are listening on a same "stream"
|
|
..seealso:: Method of the ZMQSub class
|
|
|
|
..todo:: Create Implementing a log channel as an attribute of this virtual class.
|
|
|
|
"""
|
|
def __init__(self, file_conf, log_channel, ps_name):
|
|
self._ps_name = ps_name
|
|
self._config_parser = ConfigParser.ConfigParser()
|
|
self._config_file = file_conf # "./packages/config.cfg"
|
|
|
|
self._config_parser.read(self._config_file)
|
|
|
|
self._context_zmq = zmq.Context()
|
|
|
|
#self._logging_publisher_channel = log_channel # "Default"
|
|
#publisher.channel(self._logging_publisher_channel)
|
|
|
|
|
|
class ZMQPub(PubSub):
|
|
"""
|
|
This class create a ZMQ Publisher which is able to send_message to a choosen socket.
|
|
|
|
:param pub_config_section: -- (str) The name of the section in the config file to get the settings
|
|
|
|
:return: ZMQPub Object
|
|
|
|
:Example:
|
|
Extract of the config file:
|
|
[PubSub_Categ]
|
|
adress = tcp://127.0.0.1:5003
|
|
|
|
Creating the object and sending message:
|
|
MyPublisher = ZMQPub('./packages/config.cfg', 'PubSub_Categ', 'pubcateg')
|
|
|
|
msg = "categ1"+" "+"Im the data sent on the categ1 channel"
|
|
MyPublisher.send_message(msg)
|
|
|
|
..note:: The ps_name attribute for a publisher is "optionnal" but still required to be
|
|
instantiated correctly.
|
|
|
|
"""
|
|
def __init__(self, file_conf, pub_config_section, ps_name):
|
|
super(ZMQPub, self).__init__(file_conf, "Default", ps_name)
|
|
|
|
self._pub_config_section = pub_config_section
|
|
self._pubsocket = self._context_zmq.socket(zmq.PUB)
|
|
self._pub_adress = self._config_parser.get(self._pub_config_section, "adress")
|
|
|
|
self._pubsocket.bind(self._config_parser.get(self._pub_config_section, "adress"))
|
|
|
|
def send_message(self, message):
|
|
"""Send a message throught the publisher socket"""
|
|
self._pubsocket.send(message)
|
|
|
|
|
|
class ZMQSub(PubSub):
|
|
"""
|
|
This class create a ZMQ Subcriber which is able to receive message directly or
|
|
throught redis as a buffer.
|
|
|
|
The redis buffer is usefull when the subcriber do a time consuming job which is
|
|
desynchronising it from the stream of data received.
|
|
The redis buffer ensures that no data will be loss waiting to be processed.
|
|
|
|
:param sub_config_section: -- (str) The name of the section in the config file to get the settings
|
|
:param channel: -- (str) The listening channel of the Subcriber.
|
|
|
|
:return: ZMQSub Object
|
|
|
|
:Example:
|
|
Extract of the config file:
|
|
[PubSub_Global]
|
|
adress = tcp://127.0.0.1:5000
|
|
channel = filelist
|
|
|
|
Creating the object and receiving data + pushing to redis (redis buffering):
|
|
|
|
r_serv = redis.StrictRedis(
|
|
host = 127.0.0.1,
|
|
port = 6387,
|
|
db = 0)
|
|
|
|
channel = cfg.get("PubSub_Global", "channel")
|
|
MySubscriber = ZMQSub('./packages/config.cfg',"PubSub_Global", channel, "duplicate")
|
|
|
|
while True:
|
|
MySubscriber.get_and_lpush(r_serv)
|
|
|
|
|
|
Inside another script use this line to retrive the data from redis.
|
|
...
|
|
while True:
|
|
MySubscriber.get_msg_from_queue(r_serv)
|
|
...
|
|
|
|
..note:: If you don't want any redis buffering simply use the "get_message" method
|
|
|
|
"""
|
|
def __init__(self, file_conf, sub_config_section, channel, ps_name):
|
|
super(ZMQSub, self).__init__(file_conf, "Default", ps_name)
|
|
|
|
self._sub_config_section = sub_config_section
|
|
self._subsocket = self._context_zmq.socket(zmq.SUB)
|
|
self._sub_adress = self._config_parser.get(self._sub_config_section, "adress")
|
|
|
|
self._subsocket.connect(self._config_parser.get(self._sub_config_section, "adress"))
|
|
|
|
self._channel = channel
|
|
self._subsocket.setsockopt(zmq.SUBSCRIBE, self._channel)
|
|
|
|
def get_message(self):
|
|
"""
|
|
Get the first sent message from a Publisher.
|
|
:return: (str) Message from Publisher
|
|
|
|
"""
|
|
return self._subsocket.recv()
|
|
|
|
def get_and_lpush(self, r_serv):
|
|
"""
|
|
Get the first sent message from a Publisher and storing it in redis
|
|
|
|
..note:: This function also create a set named "queue" for monitoring needs
|
|
|
|
"""
|
|
r_serv.sadd("queues",self._channel+self._ps_name)
|
|
r_serv.lpush(self._channel+self._ps_name, self._subsocket.recv())
|
|
|
|
def get_msg_from_queue(self, r_serv):
|
|
"""
|
|
Get the first sent message from a Redis List
|
|
|
|
:return: (str) Message from Publisher
|
|
|
|
"""
|
|
return r_serv.rpop(self._channel+self._ps_name) |