mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-14 02:28:23 +00:00
Added basic mixer with confirugable behavior. It handles muliple feeders and performs some basic stats on them.
This commit is contained in:
parent
9758579753
commit
e70b9cd15c
6 changed files with 213 additions and 10 deletions
|
@ -72,6 +72,8 @@ if __name__ == '__main__':
|
||||||
os.makedirs(dirname)
|
os.makedirs(dirname)
|
||||||
|
|
||||||
with open(filename, 'wb') as f:
|
with open(filename, 'wb') as f:
|
||||||
|
print gzip64encoded
|
||||||
|
print base64.standard_b64decode(gzip64encoded)
|
||||||
f.write(base64.standard_b64decode(gzip64encoded))
|
f.write(base64.standard_b64decode(gzip64encoded))
|
||||||
p.populate_set_out(filename)
|
p.populate_set_out(filename)
|
||||||
processed_paste+=1
|
processed_paste+=1
|
||||||
|
|
|
@ -32,7 +32,7 @@ class PubSub(object):
|
||||||
self.config.read(configfile)
|
self.config.read(configfile)
|
||||||
self.redis_sub = False
|
self.redis_sub = False
|
||||||
self.zmq_sub = False
|
self.zmq_sub = False
|
||||||
self.subscriber = None
|
self.subscribers = None
|
||||||
self.publishers = {'Redis': [], 'ZMQ': []}
|
self.publishers = {'Redis': [], 'ZMQ': []}
|
||||||
|
|
||||||
def setup_subscribe(self, conn_name):
|
def setup_subscribe(self, conn_name):
|
||||||
|
@ -46,14 +46,19 @@ class PubSub(object):
|
||||||
host=self.config.get('RedisPubSub', 'host'),
|
host=self.config.get('RedisPubSub', 'host'),
|
||||||
port=self.config.get('RedisPubSub', 'port'),
|
port=self.config.get('RedisPubSub', 'port'),
|
||||||
db=self.config.get('RedisPubSub', 'db'))
|
db=self.config.get('RedisPubSub', 'db'))
|
||||||
self.subscriber = r.pubsub(ignore_subscribe_messages=True)
|
self.subscribers = r.pubsub(ignore_subscribe_messages=True)
|
||||||
self.subscriber.psubscribe(channel)
|
self.subscribers.psubscribe(channel)
|
||||||
elif conn_name.startswith('ZMQ'):
|
elif conn_name.startswith('ZMQ'):
|
||||||
self.zmq_sub = True
|
self.zmq_sub = True
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
self.subscriber = context.socket(zmq.SUB)
|
|
||||||
self.subscriber.connect(self.config.get(conn_name, 'address'))
|
self.subscribers = []
|
||||||
self.subscriber.setsockopt(zmq.SUBSCRIBE, channel)
|
addresses = self.config.get(conn_name, 'address')
|
||||||
|
for address in addresses.split(','):
|
||||||
|
new_sub = context.socket(zmq.SUB)
|
||||||
|
new_sub.connect(address)
|
||||||
|
new_sub.setsockopt(zmq.SUBSCRIBE, channel)
|
||||||
|
self.subscribers.append(new_sub)
|
||||||
|
|
||||||
def setup_publish(self, conn_name):
|
def setup_publish(self, conn_name):
|
||||||
if self.config.has_section(conn_name):
|
if self.config.has_section(conn_name):
|
||||||
|
@ -83,13 +88,17 @@ class PubSub(object):
|
||||||
|
|
||||||
def subscribe(self):
|
def subscribe(self):
|
||||||
if self.redis_sub:
|
if self.redis_sub:
|
||||||
for msg in self.subscriber.listen():
|
for msg in self.subscribers.listen():
|
||||||
if msg.get('data', None) is not None:
|
if msg.get('data', None) is not None:
|
||||||
yield msg['data']
|
yield msg['data']
|
||||||
elif self.zmq_sub:
|
elif self.zmq_sub:
|
||||||
while True:
|
while True:
|
||||||
msg = self.subscriber.recv()
|
for sub in self.subscribers:
|
||||||
yield msg.split(' ', 1)[1]
|
try:
|
||||||
|
msg = sub.recv(zmq.NOBLOCK)
|
||||||
|
yield msg.split(' ', 1)[1]
|
||||||
|
except zmq.error.Again as e:
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
raise Exception('No subscribe function defined')
|
raise Exception('No subscribe function defined')
|
||||||
|
|
||||||
|
|
|
@ -114,6 +114,8 @@ function launching_scripts {
|
||||||
|
|
||||||
screen -S "Script" -X screen -t "ModuleInformation" bash -c './ModuleInformation.py -k 0 -c 1; read x'
|
screen -S "Script" -X screen -t "ModuleInformation" bash -c './ModuleInformation.py -k 0 -c 1; read x'
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
|
screen -S "Script" -X screen -t "Mixer" bash -c './Mixer.py; read x'
|
||||||
|
sleep 0.1
|
||||||
screen -S "Script" -X screen -t "Global" bash -c './Global.py; read x'
|
screen -S "Script" -X screen -t "Global" bash -c './Global.py; read x'
|
||||||
sleep 0.1
|
sleep 0.1
|
||||||
screen -S "Script" -X screen -t "Duplicates" bash -c './Duplicates.py; read x'
|
screen -S "Script" -X screen -t "Duplicates" bash -c './Duplicates.py; read x'
|
||||||
|
|
172
bin/Mixer.py
Executable file
172
bin/Mixer.py
Executable file
|
@ -0,0 +1,172 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
# -*-coding:UTF-8 -*
|
||||||
|
"""
|
||||||
|
The ZMQ_Feed_Q Module
|
||||||
|
=====================
|
||||||
|
|
||||||
|
This module is consuming the Redis-list created by the ZMQ_Feed_Q Module.
|
||||||
|
|
||||||
|
This module take all the feeds provided in the config.
|
||||||
|
Depending on the configuration, this module will process the feed as follow:
|
||||||
|
operation_mode 1: "Avoid any duplicate from any sources"
|
||||||
|
- The module maintain a list of content for each paste
|
||||||
|
- If the content is new, process it
|
||||||
|
- Else, do not process it but keep track for statistics on duplicate
|
||||||
|
|
||||||
|
operation_mode 2: "Keep duplicate coming from different sources"
|
||||||
|
- The module maintain a list of name given to the paste by the feeder
|
||||||
|
- If the name has not yet been seen, process it
|
||||||
|
- Elseif, the saved content associated with the paste is not the same, process it
|
||||||
|
- Else, do not process it but keep track for statistics on duplicate
|
||||||
|
|
||||||
|
Note that the hash of the content is defined as the gzip64encoded
|
||||||
|
|
||||||
|
Requirements
|
||||||
|
------------
|
||||||
|
|
||||||
|
*Need running Redis instances.
|
||||||
|
*Need the ZMQ_Feed_Q Module running to be able to work properly.
|
||||||
|
|
||||||
|
"""
|
||||||
|
import base64
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from pubsublogger import publisher
|
||||||
|
import redis
|
||||||
|
import ConfigParser
|
||||||
|
|
||||||
|
from Helper import Process
|
||||||
|
|
||||||
|
|
||||||
|
# CONFIG #
|
||||||
|
refresh_time = 30
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
publisher.port = 6380
|
||||||
|
publisher.channel = 'Script'
|
||||||
|
|
||||||
|
config_section = 'Mixer'
|
||||||
|
|
||||||
|
p = Process(config_section)
|
||||||
|
|
||||||
|
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
|
||||||
|
if not os.path.exists(configfile):
|
||||||
|
raise Exception('Unable to find the configuration file. \
|
||||||
|
Did you set environment variables? \
|
||||||
|
Or activate the virtualenv.')
|
||||||
|
|
||||||
|
cfg = ConfigParser.ConfigParser()
|
||||||
|
cfg.read(configfile)
|
||||||
|
|
||||||
|
# REDIS #
|
||||||
|
server = redis.StrictRedis(
|
||||||
|
host=cfg.get("Redis_Mixer", "host"),
|
||||||
|
port=cfg.getint("Redis_Mixer", "port"),
|
||||||
|
db=cfg.getint("Redis_Mixer", "db"))
|
||||||
|
|
||||||
|
# LOGGING #
|
||||||
|
publisher.info("Feed Script started to receive & publish.")
|
||||||
|
|
||||||
|
# OTHER CONFIG #
|
||||||
|
operation_mode = cfg.getint("Module_Mixer", "operation_mode")
|
||||||
|
ttl_key = cfg.getint("Module_Mixer", "ttl_duplicate")
|
||||||
|
|
||||||
|
# STATS #
|
||||||
|
processed_paste = 0
|
||||||
|
processed_paste_per_feeder = {}
|
||||||
|
duplicated_paste_per_feeder = {}
|
||||||
|
time_1 = time.time()
|
||||||
|
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
message = p.get_from_set()
|
||||||
|
if message is not None:
|
||||||
|
splitted = message.split()
|
||||||
|
if len(splitted) == 2:
|
||||||
|
paste, gzip64encoded = splitted
|
||||||
|
try:
|
||||||
|
feeder_name, paste_name = paste.split('>')
|
||||||
|
feeder_name.replace(" ","")
|
||||||
|
except ValueError as e:
|
||||||
|
feeder_name = "unnamed_feeder"
|
||||||
|
paste_name = paste
|
||||||
|
|
||||||
|
# Processed paste
|
||||||
|
processed_paste += 1
|
||||||
|
try:
|
||||||
|
processed_paste_per_feeder[feeder_name] += 1
|
||||||
|
except KeyError as e:
|
||||||
|
# new feeder
|
||||||
|
processed_paste_per_feeder[feeder_name] = 1
|
||||||
|
duplicated_paste_per_feeder[feeder_name] = 0
|
||||||
|
|
||||||
|
relay_message = "{0} {1}".format(paste_name, gzip64encoded)
|
||||||
|
|
||||||
|
# Avoid any duplicate coming from any sources
|
||||||
|
if operation_mode == 1:
|
||||||
|
if server.exists(gzip64encoded): # Content already exists
|
||||||
|
#STATS
|
||||||
|
duplicated_paste_per_feeder[feeder_name] += 1
|
||||||
|
else: # New content
|
||||||
|
p.populate_set_out(relay_message)
|
||||||
|
server.sadd(gzip64encoded, feeder_name)
|
||||||
|
server.expire(gzip64encoded, ttl_key)
|
||||||
|
|
||||||
|
|
||||||
|
# Keep duplicate coming from different sources
|
||||||
|
else:
|
||||||
|
# Filter to avoid duplicate
|
||||||
|
content = server.get('HASH_'+paste_name)
|
||||||
|
if content is None:
|
||||||
|
# New content
|
||||||
|
# Store in redis for filtering
|
||||||
|
server.set('HASH_'+paste_name, content)
|
||||||
|
server.sadd(paste_name, feeder_name)
|
||||||
|
server.expire(paste_name, ttl_key)
|
||||||
|
server.expire('HASH_'+paste_name, ttl_key)
|
||||||
|
p.populate_set_out(relay_message)
|
||||||
|
else:
|
||||||
|
if gzip64encoded != content:
|
||||||
|
# Same paste name but different content
|
||||||
|
#STATS
|
||||||
|
duplicated_paste_per_feeder[feeder_name] += 1
|
||||||
|
server.sadd(paste_name, feeder_name)
|
||||||
|
server.expire(paste_name, ttl_key)
|
||||||
|
p.populate_set_out(relay_message)
|
||||||
|
else:
|
||||||
|
# Already processed
|
||||||
|
# Keep track of processed pastes
|
||||||
|
#STATS
|
||||||
|
duplicated_paste_per_feeder[feeder_name] += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
else:
|
||||||
|
# TODO Store the name of the empty paste inside a Redis-list.
|
||||||
|
print "Empty Paste: not processed"
|
||||||
|
publisher.debug("Empty Paste: {0} not processed".format(message))
|
||||||
|
else:
|
||||||
|
print "Empty Queues: Waiting..."
|
||||||
|
if int(time.time() - time_1) > refresh_time:
|
||||||
|
to_print = 'Mixer; ; ; ;mixer_all Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time)
|
||||||
|
print to_print
|
||||||
|
publisher.info(to_print)
|
||||||
|
processed_paste = 0
|
||||||
|
|
||||||
|
for feeder, count in processed_paste_per_feeder.iteritems():
|
||||||
|
to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
|
||||||
|
print to_print
|
||||||
|
publisher.info(to_print)
|
||||||
|
processed_paste_per_feeder[feeder] = 0
|
||||||
|
|
||||||
|
for feeder, count in duplicated_paste_per_feeder.iteritems():
|
||||||
|
to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
|
||||||
|
print to_print
|
||||||
|
publisher.info(to_print)
|
||||||
|
duplicated_paste_per_feeder[feeder] = 0
|
||||||
|
|
||||||
|
time_1 = time.time()
|
||||||
|
time.sleep(0.5)
|
||||||
|
continue
|
|
@ -40,6 +40,12 @@ min_paste_size = 0.3
|
||||||
#Threshold to deduce if a module is stuck or not, in seconds.
|
#Threshold to deduce if a module is stuck or not, in seconds.
|
||||||
threshold_stucked_module=600
|
threshold_stucked_module=600
|
||||||
|
|
||||||
|
[Module_Mixer]
|
||||||
|
#Define the configuration of the mixer, possible value: 1 or 2
|
||||||
|
operation_mode = 1
|
||||||
|
#Define the time that a paste will be considerate duplicate. in seconds (1day = 86400)
|
||||||
|
ttl_duplicate = 86400
|
||||||
|
|
||||||
##### Redis #####
|
##### Redis #####
|
||||||
[Redis_Cache]
|
[Redis_Cache]
|
||||||
host = localhost
|
host = localhost
|
||||||
|
@ -66,6 +72,12 @@ host = localhost
|
||||||
port = 6379
|
port = 6379
|
||||||
db = 2
|
db = 2
|
||||||
|
|
||||||
|
[Redis_Mixer]
|
||||||
|
host = localhost
|
||||||
|
port = 6381
|
||||||
|
db = 1
|
||||||
|
channel = 102
|
||||||
|
|
||||||
##### LevelDB #####
|
##### LevelDB #####
|
||||||
[Redis_Level_DB_Curve]
|
[Redis_Level_DB_Curve]
|
||||||
host = localhost
|
host = localhost
|
||||||
|
@ -111,6 +123,8 @@ path = indexdir
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
|
||||||
|
# For multiple feed, add them with "," without space
|
||||||
|
# e.g.: tcp://127.0.0.1:5556,tcp://127.0.0.1:5557
|
||||||
[ZMQ_Global]
|
[ZMQ_Global]
|
||||||
#address = tcp://crf.circl.lu:5556
|
#address = tcp://crf.circl.lu:5556
|
||||||
address = tcp://127.0.0.1:5556
|
address = tcp://127.0.0.1:5556
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
[Global]
|
[Mixer]
|
||||||
subscribe = ZMQ_Global
|
subscribe = ZMQ_Global
|
||||||
|
publish = Redis_Mixer
|
||||||
|
|
||||||
|
[Global]
|
||||||
|
subscribe = Redis_Mixer
|
||||||
publish = Redis_Global,Redis_ModuleStats
|
publish = Redis_Global,Redis_ModuleStats
|
||||||
|
|
||||||
[Duplicates]
|
[Duplicates]
|
||||||
|
|
Loading…
Reference in a new issue