diff --git a/bin/Global.py b/bin/Global.py index 9cacbc88..bab45b47 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -59,7 +59,7 @@ if __name__ == '__main__': if int(time.time() - time_1) > 30: to_print = 'Global; ; ; ;glob Processed {0} paste(s)'.format(processed_paste) print to_print - publisher.info(to_print) + #publisher.info(to_print) time_1 = time.time() processed_paste = 0 time.sleep(1) diff --git a/bin/Helper.py b/bin/Helper.py index 05b73bf3..1429904d 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -32,7 +32,7 @@ class PubSub(object): self.config.read(configfile) self.redis_sub = False self.zmq_sub = False - self.subscriber = None + self.subscribers = None self.publishers = {'Redis': [], 'ZMQ': []} def setup_subscribe(self, conn_name): @@ -46,14 +46,19 @@ class PubSub(object): host=self.config.get('RedisPubSub', 'host'), port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) - self.subscriber = r.pubsub(ignore_subscribe_messages=True) - self.subscriber.psubscribe(channel) + self.subscribers = r.pubsub(ignore_subscribe_messages=True) + self.subscribers.psubscribe(channel) elif conn_name.startswith('ZMQ'): self.zmq_sub = True context = zmq.Context() - self.subscriber = context.socket(zmq.SUB) - self.subscriber.connect(self.config.get(conn_name, 'address')) - self.subscriber.setsockopt(zmq.SUBSCRIBE, channel) + + self.subscribers = [] + addresses = self.config.get(conn_name, 'address') + for address in addresses.split(','): + new_sub = context.socket(zmq.SUB) + new_sub.connect(address) + new_sub.setsockopt(zmq.SUBSCRIBE, channel) + self.subscribers.append(new_sub) def setup_publish(self, conn_name): if self.config.has_section(conn_name): @@ -83,13 +88,18 @@ class PubSub(object): def subscribe(self): if self.redis_sub: - for msg in self.subscriber.listen(): + for msg in self.subscribers.listen(): if msg.get('data', None) is not None: yield msg['data'] elif self.zmq_sub: while True: - msg = self.subscriber.recv() - yield msg.split(' ', 1)[1] + for sub in self.subscribers: + try: + msg = sub.recv(zmq.NOBLOCK) + yield msg.split(' ', 1)[1] + except zmq.error.Again as e: + time.sleep(0.2) + pass else: raise Exception('No subscribe function defined') diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index d7c31472..f020f3a7 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -114,6 +114,8 @@ function launching_scripts { screen -S "Script" -X screen -t "ModuleInformation" bash -c './ModuleInformation.py -k 0 -c 1; read x' sleep 0.1 + screen -S "Script" -X screen -t "Mixer" bash -c './Mixer.py; read x' + sleep 0.1 screen -S "Script" -X screen -t "Global" bash -c './Global.py; read x' sleep 0.1 screen -S "Script" -X screen -t "Duplicates" bash -c './Duplicates.py; read x' diff --git a/bin/Mixer.py b/bin/Mixer.py new file mode 100755 index 00000000..c6a7da2b --- /dev/null +++ b/bin/Mixer.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python +# -*-coding:UTF-8 -* +""" +The ZMQ_Feed_Q Module +===================== + +This module is consuming the Redis-list created by the ZMQ_Feed_Q Module. + +This module take all the feeds provided in the config. +Depending on the configuration, this module will process the feed as follow: + operation_mode 1: "Avoid any duplicate from any sources" + - The module maintain a list of content for each paste + - If the content is new, process it + - Else, do not process it but keep track for statistics on duplicate + + operation_mode 2: "Keep duplicate coming from different sources" + - The module maintain a list of name given to the paste by the feeder + - If the name has not yet been seen, process it + - Elseif, the saved content associated with the paste is not the same, process it + - Else, do not process it but keep track for statistics on duplicate + +Note that the hash of the content is defined as the gzip64encoded. + +Every data coming from a named feed can be sent to a pre-processing module before going to the global module. +The mapping can be done via the variable feed_queue_mapping + +Requirements +------------ + +*Need running Redis instances. +*Need the ZMQ_Feed_Q Module running to be able to work properly. + +""" +import base64 +import os +import time +from pubsublogger import publisher +import redis +import ConfigParser + +from Helper import Process + + +# CONFIG # +refresh_time = 30 +feed_queue_mapping = { "feeder2": "preProcess1" } # Map a feeder name to a pre-processing module + +if __name__ == '__main__': + publisher.port = 6380 + publisher.channel = 'Script' + + config_section = 'Mixer' + + p = Process(config_section) + + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + + cfg = ConfigParser.ConfigParser() + cfg.read(configfile) + + # REDIS # + server = redis.StrictRedis( + host=cfg.get("Redis_Mixer_Cache", "host"), + port=cfg.getint("Redis_Mixer_Cache", "port"), + db=cfg.getint("Redis_Mixer_Cache", "db")) + + # LOGGING # + publisher.info("Feed Script started to receive & publish.") + + # OTHER CONFIG # + operation_mode = cfg.getint("Module_Mixer", "operation_mode") + ttl_key = cfg.getint("Module_Mixer", "ttl_duplicate") + + # STATS # + processed_paste = 0 + processed_paste_per_feeder = {} + duplicated_paste_per_feeder = {} + time_1 = time.time() + + + while True: + + message = p.get_from_set() + if message is not None: + splitted = message.split() + if len(splitted) == 2: + complete_paste, gzip64encoded = splitted + try: + feeder_name, paste_name = complete_paste.split('>') + feeder_name.replace(" ","") + except ValueError as e: + feeder_name = "unnamed_feeder" + paste_name = complete_paste + + # Processed paste + processed_paste += 1 + try: + processed_paste_per_feeder[feeder_name] += 1 + except KeyError as e: + # new feeder + processed_paste_per_feeder[feeder_name] = 1 + duplicated_paste_per_feeder[feeder_name] = 0 + + relay_message = "{0} {1}".format(paste_name, gzip64encoded) + + # Avoid any duplicate coming from any sources + if operation_mode == 1: + if server.exists(gzip64encoded): # Content already exists + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + else: # New content + + # populate Global OR populate another set based on the feeder_name + if feeder_name in feed_queue_mapping: + p.populate_set_out(relay_message, feed_queue_mapping[feeder_name]) + else: + p.populate_set_out(relay_message, 'Mixer') + + server.sadd(gzip64encoded, feeder_name) + server.expire(gzip64encoded, ttl_key) + + + # Keep duplicate coming from different sources + else: + # Filter to avoid duplicate + content = server.get('HASH_'+paste_name) + if content is None: + # New content + # Store in redis for filtering + server.set('HASH_'+paste_name, content) + server.sadd(paste_name, feeder_name) + server.expire(paste_name, ttl_key) + server.expire('HASH_'+paste_name, ttl_key) + + # populate Global OR populate another set based on the feeder_name + if feeder_name in feed_queue_mapping: + p.populate_set_out(relay_message, feed_queue_mapping[feeder_name]) + else: + p.populate_set_out(relay_message, 'Mixer') + + else: + if gzip64encoded != content: + # Same paste name but different content + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + server.sadd(paste_name, feeder_name) + server.expire(paste_name, ttl_key) + + # populate Global OR populate another set based on the feeder_name + if feeder_name in feed_queue_mapping: + p.populate_set_out(relay_message, feed_queue_mapping[feeder_name]) + else: + p.populate_set_out(relay_message, 'Mixer') + + else: + # Already processed + # Keep track of processed pastes + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + continue + + else: + # TODO Store the name of the empty paste inside a Redis-list. + print "Empty Paste: not processed" + publisher.debug("Empty Paste: {0} not processed".format(message)) + else: + print "Empty Queues: Waiting..." + if int(time.time() - time_1) > refresh_time: + print processed_paste_per_feeder + to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) + print to_print + publisher.info(to_print) + processed_paste = 0 + + for feeder, count in processed_paste_per_feeder.iteritems(): + to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) + print to_print + publisher.info(to_print) + processed_paste_per_feeder[feeder] = 0 + + for feeder, count in duplicated_paste_per_feeder.iteritems(): + to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) + print to_print + publisher.info(to_print) + duplicated_paste_per_feeder[feeder] = 0 + + time_1 = time.time() + time.sleep(0.5) + continue diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 566cf22c..4f2899a0 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -40,6 +40,12 @@ min_paste_size = 0.3 #Threshold to deduce if a module is stuck or not, in seconds. threshold_stucked_module=600 +[Module_Mixer] +#Define the configuration of the mixer, possible value: 1 or 2 +operation_mode = 1 +#Define the time that a paste will be considerate duplicate. in seconds (1day = 86400) +ttl_duplicate = 86400 + ##### Redis ##### [Redis_Cache] host = localhost @@ -66,6 +72,11 @@ host = localhost port = 6379 db = 2 +[Redis_Mixer_Cache] +host = localhost +port = 6381 +db = 1 + ##### LevelDB ##### [Redis_Level_DB_Curve] host = localhost @@ -111,6 +122,8 @@ path = indexdir ############################################################################### +# For multiple feed, add them with "," without space +# e.g.: tcp://127.0.0.1:5556,tcp://127.0.0.1:5557 [ZMQ_Global] #address = tcp://crf.circl.lu:5556 address = tcp://127.0.0.1:5556 diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index 53bbb2a6..c82f5c6b 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -1,7 +1,15 @@ -[Global] +[Mixer] subscribe = ZMQ_Global +publish = Redis_Mixer,Redis_preProcess1 + +[Global] +subscribe = Redis_Mixer publish = Redis_Global,Redis_ModuleStats +[PreProcessFeed] +subscribe = Redis_preProcess1 +publish = Redis_Mixer + [Duplicates] subscribe = Redis_Duplicate diff --git a/bin/preProcessFeed.py b/bin/preProcessFeed.py new file mode 100755 index 00000000..fe542647 --- /dev/null +++ b/bin/preProcessFeed.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + +import time +from pubsublogger import publisher + +from Helper import Process + + +def do_something(message): + splitted = message.split() + if len(splitted) == 2: + paste_name, gzip64encoded = splitted + + paste_name = paste_name.replace("pastebin", "pastebinPROCESSED") + + to_send = "{0} {1}".format(paste_name, gzip64encoded) + return to_send + +if __name__ == '__main__': + # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) + # Port of the redis instance used by pubsublogger + publisher.port = 6380 + # Script is the default channel used for the modules. + publisher.channel = 'Script' + + # Section name in bin/packages/modules.cfg + config_section = 'PreProcessFeed' + + # Setup the I/O queues + p = Process(config_section) + + # Sent to the logging a description of the module + publisher.info("") + + # Endless loop getting messages from the input queue + while True: + # Get one message from the input queue + message = p.get_from_set() + if message is None: + publisher.debug("{} queue is empty, waiting".format(config_section)) + print "queue empty" + time.sleep(1) + continue + + # Do something with the message from the queue + new_message = do_something(message) + + # (Optional) Send that thing to the next queue + p.populate_set_out(new_message) diff --git a/var/www/Flasks/Flask_trendingmodules.py b/var/www/Flasks/Flask_trendingmodules.py index 73cef7f5..19895df9 100644 --- a/var/www/Flasks/Flask_trendingmodules.py +++ b/var/www/Flasks/Flask_trendingmodules.py @@ -6,6 +6,7 @@ ''' import redis import datetime +from Date import Date import flask from flask import Flask, render_template, jsonify, request @@ -62,6 +63,7 @@ def modulesCharts(): else: member_set = get_top_relevant_data(r_serv_charts, module_name) + member_set = member_set if member_set is not None else [] if len(member_set) == 0: member_set.append(("No relevant data", int(100))) return jsonify(member_set) diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index 8d50ea9d..b85f62b0 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -1,13 +1,25 @@ -var time_since_last_pastes_num; +var time_since_last_pastes_num = {}; +var data_for_processed_paste = { }; +var list_feeder = []; +window.paste_num_tabvar_all = {}; -//If we do not received info from global, set pastes_num to 0 +//If we do not received info from mixer, set pastes_num to 0 function checkIfReceivedData(){ - if ((new Date().getTime() - time_since_last_pastes_num) > 45*1000) - window.paste_num_tabvar = 0; - setTimeout(checkIfReceivedData, 45*1000); + for (i in list_feeder) { + if(list_feeder[i] == "global"){ + if ((new Date().getTime() - time_since_last_pastes_num[list_feeder[i]]) > 35*1000){ + window.paste_num_tabvar_all[list_feeder[i]] = 0; + } + } else { + if ((new Date().getTime() - time_since_last_pastes_num["Proc"+list_feeder[i]]) > 35*1000){ + window.paste_num_tabvar_all["Proc"+list_feeder[i]] = 0; + window.paste_num_tabvar_all["Dup"+list_feeder[i]] = 0; + } + } + } + setTimeout(checkIfReceivedData, 35*1000); } -setTimeout(checkIfReceivedData, 45*1000); function initfunc( csvay, scroot) { window.csv = csvay; @@ -24,54 +36,88 @@ function update_values() { // Plot and update the number of processed pastes -$(function() { - var data = []; +// BEGIN PROCESSED PASTES var default_minute = (typeof window.default_minute !== "undefined") ? parseInt(window.default_minute) : 10; - var totalPoints = 60*parseInt(default_minute); //60s*minute - var curr_max = 0; - - function getData() { - if (data.length > 0){ - var data_old = data[0]; - data = data.slice(1); - curr_max = curr_max == data_old ? Math.max.apply(null, data) : curr_max; + var totalPoints = 2*parseInt(default_minute); //60s*minute + var curr_max = {"global": 0}; + + function fetch_data(dataset, curr_data, feeder_name) { + if (curr_data.length > 0){ + var data_old = curr_data[0]; + curr_data = curr_data.slice(1); + curr_max[dataset] = curr_max[dataset] == data_old ? Math.max.apply(null, curr_data) : curr_max[dataset]; } - while (data.length < totalPoints) { - var y = (typeof window.paste_num_tabvar !== "undefined") ? parseInt(window.paste_num_tabvar) : 0; - curr_max = y > curr_max ? y : curr_max; - data.push(y); + while (curr_data.length < totalPoints) { + var y = (typeof window.paste_num_tabvar_all[dataset] !== "undefined") ? parseInt(window.paste_num_tabvar_all[dataset]) : 0; + curr_max[dataset] = y > curr_max[dataset] ? y : curr_max[dataset]; + curr_data.push(y); } // Zip the generated y values with the x values var res = []; - for (var i = 0; i < data.length; ++i) { - res.push([i, data[i]]) - } - return res; + for (var i = 0; i < curr_data.length; ++i) { + res.push([i, curr_data[i]]) + } + data_for_processed_paste[dataset] = curr_data; + return { label: feeder_name, data: res }; } - var updateInterval = 1000; - var options = { - series: { shadowSize: 1 }, - lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }}, + function getData(dataset_group, graph_type) { + var curr_data; + + var all_res = []; + if (dataset_group == "global") { + if (data_for_processed_paste["global"] === undefined) { // create feeder dataset if not exists yet + data_for_processed_paste["global"] = []; + } + curr_data = data_for_processed_paste["global"]; + all_res.push(fetch_data("global", curr_data, "global")); + } else { + + for(d_i in list_feeder) { + if(list_feeder[d_i] == "global") { + continue; + } + + dataset = graph_type+list_feeder[d_i]; + if (data_for_processed_paste[dataset] === undefined) { // create feeder dataset if not exists yet + data_for_processed_paste[dataset] = []; + } + curr_data = data_for_processed_paste[dataset]; + all_res.push(fetch_data(dataset, curr_data, list_feeder[d_i])); + } + + } + return all_res; + } + + var updateInterval = 10000; + var options_processed_pastes = { + series: { shadowSize: 0 , + lines: { fill: true, fillColor: { colors: [ { opacity: 1 }, { opacity: 0.1 } ] }} + }, yaxis: { min: 0, max: 40 }, - colors: ["#a971ff"], + xaxis: { ticks: [[0, 0], [2, 1], [4, 2], [6, 3], [8, 4], [10, 5], [12, 6], [14, 7], [16, 8], [18, 9], [20, 10]] }, grid: { tickColor: "#dddddd", borderWidth: 0 }, + legend: { + show: true, + position: "nw", + } }; - var plot = $.plot("#realtimechart", [ getData() ], options); - - function update() { - plot.setData([getData()]); - plot.getOptions().yaxes[0].max = curr_max; - plot.setupGrid(); - plot.draw(); - setTimeout(update, updateInterval); + + function update_processed_pastes(graph, dataset, graph_type) { + graph.setData(getData(dataset, graph_type)); + graph.getOptions().yaxes[0].max = curr_max[dataset]; + graph.setupGrid(); + graph.draw(); + setTimeout(function(){ update_processed_pastes(graph, dataset, graph_type); }, updateInterval); } - update(); -}); + + +// END PROCESSED PASTES function initfunc( csvay, scroot) { window.csv = csvay; @@ -114,10 +160,44 @@ function create_log_table(obj_json) { var chansplit = obj_json.channel.split('.'); var parsedmess = obj_json.data.split(';'); - if (parsedmess[0] == "Global"){ - var paste_processed = parsedmess[4].split(" ")[2]; - window.paste_num_tabvar = paste_processed; - time_since_last_pastes_num = new Date().getTime(); + + if (parsedmess[0] == "Mixer"){ + var feeder = parsedmess[4].split(" ")[1]; + var paste_processed = parsedmess[4].split(" ")[3]; + var msg_type = parsedmess[4].split(" ")[2]; + + if (feeder == "All_feeders"){ + if(list_feeder.indexOf("global") == -1) { + list_feeder.push("global"); + + options_processed_pastes.legend.show = false; + var total_proc = $.plot("#global", [ getData("global", null) ], options_processed_pastes); + options_processed_pastes.legend.show = true; + options_processed_pastes.series.lines = { show: true }; + data_for_processed_paste["global"] = Array(totalPoints+1).join(0).split(''); + + var feederProc = $.plot("#Proc_feeder", [ getData(feeder, "Proc") ], options_processed_pastes); + var feederDup = $.plot("#Dup_feeder", [ getData(feeder, "Dup") ], options_processed_pastes); + + update_processed_pastes(feederProc, "feeder", "Proc"); + update_processed_pastes(feederDup, "feeder", "Dup"); + update_processed_pastes(total_proc, "global"); + setTimeout(checkIfReceivedData, 45*1000); + } + window.paste_num_tabvar_all["global"] = paste_processed; + time_since_last_pastes_num["global"] = new Date().getTime(); + } else { + + if (list_feeder.indexOf(feeder) == -1) { + list_feeder.push(feeder); + data_for_processed_paste["Proc"+feeder] = Array(totalPoints+1).join(0).split(''); + data_for_processed_paste["Dup"+feeder] = Array(totalPoints+1).join(0).split(''); + } + + var feederName = msg_type == "Duplicated" ? "Dup"+feeder : "Proc"+feeder; + window.paste_num_tabvar_all[feederName] = paste_processed; + time_since_last_pastes_num[feederName] = new Date().getTime(); + } return; } diff --git a/var/www/templates/index.html b/var/www/templates/index.html index 74b45c01..48f81a7d 100644 --- a/var/www/templates/index.html +++ b/var/www/templates/index.html @@ -17,6 +17,7 @@ +