Merge pull request #96 from mokaddem/more-feed

Handling of multiple feeders
This commit is contained in:
Alexandre Dulaunoy 2017-01-09 15:29:28 +01:00 committed by GitHub
commit a96ba40d94
10 changed files with 451 additions and 72 deletions

View file

@ -59,7 +59,7 @@ if __name__ == '__main__':
if int(time.time() - time_1) > 30:
to_print = 'Global; ; ; ;glob Processed {0} paste(s)'.format(processed_paste)
print to_print
publisher.info(to_print)
#publisher.info(to_print)
time_1 = time.time()
processed_paste = 0
time.sleep(1)

View file

@ -32,7 +32,7 @@ class PubSub(object):
self.config.read(configfile)
self.redis_sub = False
self.zmq_sub = False
self.subscriber = None
self.subscribers = None
self.publishers = {'Redis': [], 'ZMQ': []}
def setup_subscribe(self, conn_name):
@ -46,14 +46,19 @@ class PubSub(object):
host=self.config.get('RedisPubSub', 'host'),
port=self.config.get('RedisPubSub', 'port'),
db=self.config.get('RedisPubSub', 'db'))
self.subscriber = r.pubsub(ignore_subscribe_messages=True)
self.subscriber.psubscribe(channel)
self.subscribers = r.pubsub(ignore_subscribe_messages=True)
self.subscribers.psubscribe(channel)
elif conn_name.startswith('ZMQ'):
self.zmq_sub = True
context = zmq.Context()
self.subscriber = context.socket(zmq.SUB)
self.subscriber.connect(self.config.get(conn_name, 'address'))
self.subscriber.setsockopt(zmq.SUBSCRIBE, channel)
self.subscribers = []
addresses = self.config.get(conn_name, 'address')
for address in addresses.split(','):
new_sub = context.socket(zmq.SUB)
new_sub.connect(address)
new_sub.setsockopt(zmq.SUBSCRIBE, channel)
self.subscribers.append(new_sub)
def setup_publish(self, conn_name):
if self.config.has_section(conn_name):
@ -83,13 +88,18 @@ class PubSub(object):
def subscribe(self):
if self.redis_sub:
for msg in self.subscriber.listen():
for msg in self.subscribers.listen():
if msg.get('data', None) is not None:
yield msg['data']
elif self.zmq_sub:
while True:
msg = self.subscriber.recv()
yield msg.split(' ', 1)[1]
for sub in self.subscribers:
try:
msg = sub.recv(zmq.NOBLOCK)
yield msg.split(' ', 1)[1]
except zmq.error.Again as e:
time.sleep(0.2)
pass
else:
raise Exception('No subscribe function defined')

View file

@ -114,6 +114,8 @@ function launching_scripts {
screen -S "Script" -X screen -t "ModuleInformation" bash -c './ModuleInformation.py -k 0 -c 1; read x'
sleep 0.1
screen -S "Script" -X screen -t "Mixer" bash -c './Mixer.py; read x'
sleep 0.1
screen -S "Script" -X screen -t "Global" bash -c './Global.py; read x'
sleep 0.1
screen -S "Script" -X screen -t "Duplicates" bash -c './Duplicates.py; read x'

193
bin/Mixer.py Executable file
View file

@ -0,0 +1,193 @@
#!/usr/bin/env python
# -*-coding:UTF-8 -*
"""
The ZMQ_Feed_Q Module
=====================
This module is consuming the Redis-list created by the ZMQ_Feed_Q Module.
This module take all the feeds provided in the config.
Depending on the configuration, this module will process the feed as follow:
operation_mode 1: "Avoid any duplicate from any sources"
- The module maintain a list of content for each paste
- If the content is new, process it
- Else, do not process it but keep track for statistics on duplicate
operation_mode 2: "Keep duplicate coming from different sources"
- The module maintain a list of name given to the paste by the feeder
- If the name has not yet been seen, process it
- Elseif, the saved content associated with the paste is not the same, process it
- Else, do not process it but keep track for statistics on duplicate
Note that the hash of the content is defined as the gzip64encoded.
Every data coming from a named feed can be sent to a pre-processing module before going to the global module.
The mapping can be done via the variable feed_queue_mapping
Requirements
------------
*Need running Redis instances.
*Need the ZMQ_Feed_Q Module running to be able to work properly.
"""
import base64
import os
import time
from pubsublogger import publisher
import redis
import ConfigParser
from Helper import Process
# CONFIG #
refresh_time = 30
feed_queue_mapping = { "feeder2": "preProcess1" } # Map a feeder name to a pre-processing module
if __name__ == '__main__':
publisher.port = 6380
publisher.channel = 'Script'
config_section = 'Mixer'
p = Process(config_section)
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
cfg = ConfigParser.ConfigParser()
cfg.read(configfile)
# REDIS #
server = redis.StrictRedis(
host=cfg.get("Redis_Mixer_Cache", "host"),
port=cfg.getint("Redis_Mixer_Cache", "port"),
db=cfg.getint("Redis_Mixer_Cache", "db"))
# LOGGING #
publisher.info("Feed Script started to receive & publish.")
# OTHER CONFIG #
operation_mode = cfg.getint("Module_Mixer", "operation_mode")
ttl_key = cfg.getint("Module_Mixer", "ttl_duplicate")
# STATS #
processed_paste = 0
processed_paste_per_feeder = {}
duplicated_paste_per_feeder = {}
time_1 = time.time()
while True:
message = p.get_from_set()
if message is not None:
splitted = message.split()
if len(splitted) == 2:
complete_paste, gzip64encoded = splitted
try:
feeder_name, paste_name = complete_paste.split('>')
feeder_name.replace(" ","")
except ValueError as e:
feeder_name = "unnamed_feeder"
paste_name = complete_paste
# Processed paste
processed_paste += 1
try:
processed_paste_per_feeder[feeder_name] += 1
except KeyError as e:
# new feeder
processed_paste_per_feeder[feeder_name] = 1
duplicated_paste_per_feeder[feeder_name] = 0
relay_message = "{0} {1}".format(paste_name, gzip64encoded)
# Avoid any duplicate coming from any sources
if operation_mode == 1:
if server.exists(gzip64encoded): # Content already exists
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
else: # New content
# populate Global OR populate another set based on the feeder_name
if feeder_name in feed_queue_mapping:
p.populate_set_out(relay_message, feed_queue_mapping[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
server.sadd(gzip64encoded, feeder_name)
server.expire(gzip64encoded, ttl_key)
# Keep duplicate coming from different sources
else:
# Filter to avoid duplicate
content = server.get('HASH_'+paste_name)
if content is None:
# New content
# Store in redis for filtering
server.set('HASH_'+paste_name, content)
server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key)
server.expire('HASH_'+paste_name, ttl_key)
# populate Global OR populate another set based on the feeder_name
if feeder_name in feed_queue_mapping:
p.populate_set_out(relay_message, feed_queue_mapping[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
if gzip64encoded != content:
# Same paste name but different content
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key)
# populate Global OR populate another set based on the feeder_name
if feeder_name in feed_queue_mapping:
p.populate_set_out(relay_message, feed_queue_mapping[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
# Already processed
# Keep track of processed pastes
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
continue
else:
# TODO Store the name of the empty paste inside a Redis-list.
print "Empty Paste: not processed"
publisher.debug("Empty Paste: {0} not processed".format(message))
else:
print "Empty Queues: Waiting..."
if int(time.time() - time_1) > refresh_time:
print processed_paste_per_feeder
to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time)
print to_print
publisher.info(to_print)
processed_paste = 0
for feeder, count in processed_paste_per_feeder.iteritems():
to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
print to_print
publisher.info(to_print)
processed_paste_per_feeder[feeder] = 0
for feeder, count in duplicated_paste_per_feeder.iteritems():
to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
print to_print
publisher.info(to_print)
duplicated_paste_per_feeder[feeder] = 0
time_1 = time.time()
time.sleep(0.5)
continue

View file

@ -40,6 +40,12 @@ min_paste_size = 0.3
#Threshold to deduce if a module is stuck or not, in seconds.
threshold_stucked_module=600
[Module_Mixer]
#Define the configuration of the mixer, possible value: 1 or 2
operation_mode = 1
#Define the time that a paste will be considerate duplicate. in seconds (1day = 86400)
ttl_duplicate = 86400
##### Redis #####
[Redis_Cache]
host = localhost
@ -66,6 +72,11 @@ host = localhost
port = 6379
db = 2
[Redis_Mixer_Cache]
host = localhost
port = 6381
db = 1
##### LevelDB #####
[Redis_Level_DB_Curve]
host = localhost
@ -111,6 +122,8 @@ path = indexdir
###############################################################################
# For multiple feed, add them with "," without space
# e.g.: tcp://127.0.0.1:5556,tcp://127.0.0.1:5557
[ZMQ_Global]
#address = tcp://crf.circl.lu:5556
address = tcp://127.0.0.1:5556

View file

@ -1,7 +1,15 @@
[Global]
[Mixer]
subscribe = ZMQ_Global
publish = Redis_Mixer,Redis_preProcess1
[Global]
subscribe = Redis_Mixer
publish = Redis_Global,Redis_ModuleStats
[PreProcessFeed]
subscribe = Redis_preProcess1
publish = Redis_Mixer
[Duplicates]
subscribe = Redis_Duplicate

50
bin/preProcessFeed.py Executable file
View file

@ -0,0 +1,50 @@
#!/usr/bin/env python2
# -*-coding:UTF-8 -*
import time
from pubsublogger import publisher
from Helper import Process
def do_something(message):
splitted = message.split()
if len(splitted) == 2:
paste_name, gzip64encoded = splitted
paste_name = paste_name.replace("pastebin", "pastebinPROCESSED")
to_send = "{0} {1}".format(paste_name, gzip64encoded)
return to_send
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
publisher.port = 6380
# Script is the default channel used for the modules.
publisher.channel = 'Script'
# Section name in bin/packages/modules.cfg
config_section = 'PreProcessFeed'
# Setup the I/O queues
p = Process(config_section)
# Sent to the logging a description of the module
publisher.info("<description of the module>")
# Endless loop getting messages from the input queue
while True:
# Get one message from the input queue
message = p.get_from_set()
if message is None:
publisher.debug("{} queue is empty, waiting".format(config_section))
print "queue empty"
time.sleep(1)
continue
# Do something with the message from the queue
new_message = do_something(message)
# (Optional) Send that thing to the next queue
p.populate_set_out(new_message)

View file

@ -6,6 +6,7 @@
'''
import redis
import datetime
from Date import Date
import flask
from flask import Flask, render_template, jsonify, request
@ -62,6 +63,7 @@ def modulesCharts():
else:
member_set = get_top_relevant_data(r_serv_charts, module_name)
member_set = member_set if member_set is not None else []
if len(member_set) == 0:
member_set.append(("No relevant data", int(100)))
return jsonify(member_set)

View file

@ -1,13 +1,25 @@
var time_since_last_pastes_num;
var time_since_last_pastes_num = {};
var data_for_processed_paste = { };
var list_feeder = [];
window.paste_num_tabvar_all = {};
//If we do not received info from global, set pastes_num to 0
//If we do not received info from mixer, set pastes_num to 0
function checkIfReceivedData(){
if ((new Date().getTime() - time_since_last_pastes_num) > 45*1000)
window.paste_num_tabvar = 0;
setTimeout(checkIfReceivedData, 45*1000);
for (i in list_feeder) {
if(list_feeder[i] == "global"){
if ((new Date().getTime() - time_since_last_pastes_num[list_feeder[i]]) > 35*1000){
window.paste_num_tabvar_all[list_feeder[i]] = 0;
}
} else {
if ((new Date().getTime() - time_since_last_pastes_num["Proc"+list_feeder[i]]) > 35*1000){
window.paste_num_tabvar_all["Proc"+list_feeder[i]] = 0;
window.paste_num_tabvar_all["Dup"+list_feeder[i]] = 0;
}
}
}
setTimeout(checkIfReceivedData, 35*1000);
}
setTimeout(checkIfReceivedData, 45*1000);
function initfunc( csvay, scroot) {
window.csv = csvay;
@ -24,54 +36,88 @@ function update_values() {
// Plot and update the number of processed pastes
$(function() {
var data = [];
// BEGIN PROCESSED PASTES
var default_minute = (typeof window.default_minute !== "undefined") ? parseInt(window.default_minute) : 10;
var totalPoints = 60*parseInt(default_minute); //60s*minute
var curr_max = 0;
function getData() {
if (data.length > 0){
var data_old = data[0];
data = data.slice(1);
curr_max = curr_max == data_old ? Math.max.apply(null, data) : curr_max;
var totalPoints = 2*parseInt(default_minute); //60s*minute
var curr_max = {"global": 0};
function fetch_data(dataset, curr_data, feeder_name) {
if (curr_data.length > 0){
var data_old = curr_data[0];
curr_data = curr_data.slice(1);
curr_max[dataset] = curr_max[dataset] == data_old ? Math.max.apply(null, curr_data) : curr_max[dataset];
}
while (data.length < totalPoints) {
var y = (typeof window.paste_num_tabvar !== "undefined") ? parseInt(window.paste_num_tabvar) : 0;
curr_max = y > curr_max ? y : curr_max;
data.push(y);
while (curr_data.length < totalPoints) {
var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0;
curr_max[dataset] = y > curr_max[dataset] ? y : curr_max[dataset];
curr_data.push(y);
}
// Zip the generated y values with the x values
var res = [];
for (var i = 0; i < data.length; ++i) {
res.push([i, data[i]])
}
return res;
for (var i = 0; i < curr_data.length; ++i) {
res.push([i, curr_data[i]])
}
data_for_processed_paste[dataset] = curr_data;
return { label: feeder_name, data: res };
}
var updateInterval = 1000;
var options = {
series: { shadowSize: 1 },
lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }},
function getData(dataset_group, graph_type) {
var curr_data;
var all_res = [];
if (dataset_group == "global") {
if (data_for_processed_paste["global"] === undefined) { // create feeder dataset if not exists yet
data_for_processed_paste["global"] = [];
}
curr_data = data_for_processed_paste["global"];
all_res.push(fetch_data("global", curr_data, "global"));
} else {
for(d_i in list_feeder) {
if(list_feeder[d_i] == "global") {
continue;
}
dataset = graph_type+list_feeder[d_i];
if (data_for_processed_paste[dataset] === undefined) { // create feeder dataset if not exists yet
data_for_processed_paste[dataset] = [];
}
curr_data = data_for_processed_paste[dataset];
all_res.push(fetch_data(dataset, curr_data, list_feeder[d_i]));
}
}
return all_res;
}
var updateInterval = 10000;
var options_processed_pastes = {
series: { shadowSize: 0 ,
lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }}
},
yaxis: { min: 0, max: 40 },
colors: ["#a971ff"],
xaxis: { ticks: [[0, 0], [2, 1], [4, 2], [6, 3], [8, 4], [10, 5], [12, 6], [14, 7], [16, 8], [18, 9], [20, 10]] },
grid: {
tickColor: "#dddddd",
borderWidth: 0
},
legend: {
show: true,
position: "nw",
}
};
var plot = $.plot("#realtimechart", [ getData() ], options);
function update() {
plot.setData([getData()]);
plot.getOptions().yaxes[0].max = curr_max;
plot.setupGrid();
plot.draw();
setTimeout(update, updateInterval);
function update_processed_pastes(graph, dataset, graph_type) {
graph.setData(getData(dataset, graph_type));
graph.getOptions().yaxes[0].max = curr_max[dataset];
graph.setupGrid();
graph.draw();
setTimeout(function(){ update_processed_pastes(graph, dataset, graph_type); }, updateInterval);
}
update();
});
// END PROCESSED PASTES
function initfunc( csvay, scroot) {
window.csv = csvay;
@ -114,10 +160,44 @@ function create_log_table(obj_json) {
var chansplit = obj_json.channel.split('.');
var parsedmess = obj_json.data.split(';');
if (parsedmess[0] == "Global"){
var paste_processed = parsedmess[4].split(" ")[2];
window.paste_num_tabvar = paste_processed;
time_since_last_pastes_num = new Date().getTime();
if (parsedmess[0] == "Mixer"){
var feeder = parsedmess[4].split(" ")[1];
var paste_processed = parsedmess[4].split(" ")[3];
var msg_type = parsedmess[4].split(" ")[2];
if (feeder == "All_feeders"){
if(list_feeder.indexOf("global") == -1) {
list_feeder.push("global");
options_processed_pastes.legend.show = false;
var total_proc = $.plot("#global", [ getData("global", null) ], options_processed_pastes);
options_processed_pastes.legend.show = true;
options_processed_pastes.series.lines = { show: true };
data_for_processed_paste["global"] = Array(totalPoints+1).join(0).split('');
var feederProc = $.plot("#Proc_feeder", [ getData(feeder, "Proc") ], options_processed_pastes);
var feederDup = $.plot("#Dup_feeder", [ getData(feeder, "Dup") ], options_processed_pastes);
update_processed_pastes(feederProc, "feeder", "Proc");
update_processed_pastes(feederDup, "feeder", "Dup");
update_processed_pastes(total_proc, "global");
setTimeout(checkIfReceivedData, 45*1000);
}
window.paste_num_tabvar_all["global"] = paste_processed;
time_since_last_pastes_num["global"] = new Date().getTime();
} else {
if (list_feeder.indexOf(feeder) == -1) {
list_feeder.push(feeder);
data_for_processed_paste["Proc"+feeder] = Array(totalPoints+1).join(0).split('');
data_for_processed_paste["Dup"+feeder] = Array(totalPoints+1).join(0).split('');
}
var feederName = msg_type == "Duplicated" ? "Dup"+feeder : "Proc"+feeder;
window.paste_num_tabvar_all[feederName] = paste_processed;
time_since_last_pastes_num[feederName] = new Date().getTime();
}
return;
}

View file

@ -17,6 +17,7 @@
<script type="text/javascript" src="{{ url_for('static', filename='js/dygraph-combined.js') }}"></script>
<script src="{{ url_for('static', filename='js/jquery.js') }}"></script>
<script src="{{ url_for('static', filename='js/jquery.flot.js') }}"></script>
<script src="{{ url_for('static', filename='js/jquery.flot.time.js') }}"></script>
<script>
window.default_minute = {{ default_minute }};
window.glob_tabvar = []; // Avoid undefined
@ -52,10 +53,10 @@
<!-- /.sidebar-collapse -->
<div class="panel panel-default">
<div class="panel-heading">
<i class="fa fa-dashboard fa-fw"></i> Pastes since {{ default_minute }} min
<i class="fa fa-dashboard fa-fw"></i> Total pastes since {{ default_minute }} min
</div>
<div class="panel-body">
<div id="realtimechart" style="height: 90px; padding: 0px; position: relative;"></div>
<div id="global" style="height: 90px; padding: 0px; position: relative;"></div>
</div>
</div>
<!-- <div id="Graph_paste_num" style="height:90px; width:100%;"></div> -->
@ -91,23 +92,43 @@
</div>
<div class="row">
<div class="col-lg-12">
<div class="panel panel-default">
<div class="panel-heading">
<i class="fa fa-bar-chart-o fa-fw"></i> Queues Monitor
</div>
<div class="panel-body">
<div class="row">
<div class="col-lg-6" id="Graph" style="height:400px; width:48%;"></div>
<div class="col-lg-6" id="Graph2" style="height:400px; width:48%;"></div>
</div>
<!-- /.row -->
<div class="col-lg-6">
<div class="panel panel-default">
<div class="panel-heading">
<i class="fa fa-bar-chart-o fa-fw"></i> Feeder(s) Monitor:
</div>
<!-- /.panel-body -->
</div>
<!-- /.panel -->
</div>
<!-- /.col-lg-8 -->
<div id="panelbody" class="panel-body" style="height:420px;">
<strong>Processed pastes</strong>
<div id="Proc_feeder" style="height: 230px; padding: 0px; position: relative;"></div>
<hr style="border-top: 2px solid #eee; margin-top: 7px; margin-bottom: 7px;">
<strong>Filtered duplicated</strong>
<div id="Dup_feeder" style="height: 100px; padding: 0px; position: relative;"></div>
</div>
<!-- /.panel-body -->
</div>
<!-- /.panel -->
</div>
<!-- /.col-lg-6 -->
<div class="col-lg-6">
<div class="panel panel-default">
<div class="panel-heading">
<i class="fa fa-bar-chart-o fa-fw"></i> Queues Monitor
</div>
<div class="panel-body">
<div class="row">
<div class="col-lg-6" id="Graph" style="height:195px; width:88%;"></div>
<div style="height:10px;"></div>
<div class="col-lg-6" id="Graph2" style="height:195px; width:88%;"></div>
</div>
<!-- /.row -->
</div>
<!-- /.panel-body -->
</div>
<!-- /.panel -->
</div>
<!-- /.col-lg-6 -->
<div class="col-lg-12">
<div class="panel panel-default">
<div class="panel-heading">