diff --git a/bin/Global.py b/bin/Global.py index 9cacbc88..a29c3b86 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -72,6 +72,8 @@ if __name__ == '__main__': os.makedirs(dirname) with open(filename, 'wb') as f: + print gzip64encoded + print base64.standard_b64decode(gzip64encoded) f.write(base64.standard_b64decode(gzip64encoded)) p.populate_set_out(filename) processed_paste+=1 diff --git a/bin/Helper.py b/bin/Helper.py index 05b73bf3..d6441b9a 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -32,7 +32,7 @@ class PubSub(object): self.config.read(configfile) self.redis_sub = False self.zmq_sub = False - self.subscriber = None + self.subscribers = None self.publishers = {'Redis': [], 'ZMQ': []} def setup_subscribe(self, conn_name): @@ -46,14 +46,19 @@ class PubSub(object): host=self.config.get('RedisPubSub', 'host'), port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) - self.subscriber = r.pubsub(ignore_subscribe_messages=True) - self.subscriber.psubscribe(channel) + self.subscribers = r.pubsub(ignore_subscribe_messages=True) + self.subscribers.psubscribe(channel) elif conn_name.startswith('ZMQ'): self.zmq_sub = True context = zmq.Context() - self.subscriber = context.socket(zmq.SUB) - self.subscriber.connect(self.config.get(conn_name, 'address')) - self.subscriber.setsockopt(zmq.SUBSCRIBE, channel) + + self.subscribers = [] + addresses = self.config.get(conn_name, 'address') + for address in addresses.split(','): + new_sub = context.socket(zmq.SUB) + new_sub.connect(address) + new_sub.setsockopt(zmq.SUBSCRIBE, channel) + self.subscribers.append(new_sub) def setup_publish(self, conn_name): if self.config.has_section(conn_name): @@ -83,13 +88,17 @@ class PubSub(object): def subscribe(self): if self.redis_sub: - for msg in self.subscriber.listen(): + for msg in self.subscribers.listen(): if msg.get('data', None) is not None: yield msg['data'] elif self.zmq_sub: while True: - msg = self.subscriber.recv() - yield msg.split(' ', 1)[1] + for sub in self.subscribers: + try: + msg = sub.recv(zmq.NOBLOCK) + yield msg.split(' ', 1)[1] + except zmq.error.Again as e: + pass else: raise Exception('No subscribe function defined') diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index d7c31472..f020f3a7 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -114,6 +114,8 @@ function launching_scripts { screen -S "Script" -X screen -t "ModuleInformation" bash -c './ModuleInformation.py -k 0 -c 1; read x' sleep 0.1 + screen -S "Script" -X screen -t "Mixer" bash -c './Mixer.py; read x' + sleep 0.1 screen -S "Script" -X screen -t "Global" bash -c './Global.py; read x' sleep 0.1 screen -S "Script" -X screen -t "Duplicates" bash -c './Duplicates.py; read x' diff --git a/bin/Mixer.py b/bin/Mixer.py new file mode 100755 index 00000000..e1804263 --- /dev/null +++ b/bin/Mixer.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# -*-coding:UTF-8 -* +""" +The ZMQ_Feed_Q Module +===================== + +This module is consuming the Redis-list created by the ZMQ_Feed_Q Module. + +This module take all the feeds provided in the config. +Depending on the configuration, this module will process the feed as follow: + operation_mode 1: "Avoid any duplicate from any sources" + - The module maintain a list of content for each paste + - If the content is new, process it + - Else, do not process it but keep track for statistics on duplicate + + operation_mode 2: "Keep duplicate coming from different sources" + - The module maintain a list of name given to the paste by the feeder + - If the name has not yet been seen, process it + - Elseif, the saved content associated with the paste is not the same, process it + - Else, do not process it but keep track for statistics on duplicate + +Note that the hash of the content is defined as the gzip64encoded + +Requirements +------------ + +*Need running Redis instances. +*Need the ZMQ_Feed_Q Module running to be able to work properly. + +""" +import base64 +import os +import time +from pubsublogger import publisher +import redis +import ConfigParser + +from Helper import Process + + +# CONFIG # +refresh_time = 30 + + + +if __name__ == '__main__': + publisher.port = 6380 + publisher.channel = 'Script' + + config_section = 'Mixer' + + p = Process(config_section) + + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + + cfg = ConfigParser.ConfigParser() + cfg.read(configfile) + + # REDIS # + server = redis.StrictRedis( + host=cfg.get("Redis_Mixer", "host"), + port=cfg.getint("Redis_Mixer", "port"), + db=cfg.getint("Redis_Mixer", "db")) + + # LOGGING # + publisher.info("Feed Script started to receive & publish.") + + # OTHER CONFIG # + operation_mode = cfg.getint("Module_Mixer", "operation_mode") + ttl_key = cfg.getint("Module_Mixer", "ttl_duplicate") + + # STATS # + processed_paste = 0 + processed_paste_per_feeder = {} + duplicated_paste_per_feeder = {} + time_1 = time.time() + + + while True: + + message = p.get_from_set() + if message is not None: + splitted = message.split() + if len(splitted) == 2: + paste, gzip64encoded = splitted + try: + feeder_name, paste_name = paste.split('>') + feeder_name.replace(" ","") + except ValueError as e: + feeder_name = "unnamed_feeder" + paste_name = paste + + # Processed paste + processed_paste += 1 + try: + processed_paste_per_feeder[feeder_name] += 1 + except KeyError as e: + # new feeder + processed_paste_per_feeder[feeder_name] = 1 + duplicated_paste_per_feeder[feeder_name] = 0 + + relay_message = "{0} {1}".format(paste_name, gzip64encoded) + + # Avoid any duplicate coming from any sources + if operation_mode == 1: + if server.exists(gzip64encoded): # Content already exists + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + else: # New content + p.populate_set_out(relay_message) + server.sadd(gzip64encoded, feeder_name) + server.expire(gzip64encoded, ttl_key) + + + # Keep duplicate coming from different sources + else: + # Filter to avoid duplicate + content = server.get('HASH_'+paste_name) + if content is None: + # New content + # Store in redis for filtering + server.set('HASH_'+paste_name, content) + server.sadd(paste_name, feeder_name) + server.expire(paste_name, ttl_key) + server.expire('HASH_'+paste_name, ttl_key) + p.populate_set_out(relay_message) + else: + if gzip64encoded != content: + # Same paste name but different content + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + server.sadd(paste_name, feeder_name) + server.expire(paste_name, ttl_key) + p.populate_set_out(relay_message) + else: + # Already processed + # Keep track of processed pastes + #STATS + duplicated_paste_per_feeder[feeder_name] += 1 + continue + + else: + # TODO Store the name of the empty paste inside a Redis-list. + print "Empty Paste: not processed" + publisher.debug("Empty Paste: {0} not processed".format(message)) + else: + print "Empty Queues: Waiting..." + if int(time.time() - time_1) > refresh_time: + to_print = 'Mixer; ; ; ;mixer_all Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) + print to_print + publisher.info(to_print) + processed_paste = 0 + + for feeder, count in processed_paste_per_feeder.iteritems(): + to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) + print to_print + publisher.info(to_print) + processed_paste_per_feeder[feeder] = 0 + + for feeder, count in duplicated_paste_per_feeder.iteritems(): + to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) + print to_print + publisher.info(to_print) + duplicated_paste_per_feeder[feeder] = 0 + + time_1 = time.time() + time.sleep(0.5) + continue diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 566cf22c..880bf169 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -40,6 +40,12 @@ min_paste_size = 0.3 #Threshold to deduce if a module is stuck or not, in seconds. threshold_stucked_module=600 +[Module_Mixer] +#Define the configuration of the mixer, possible value: 1 or 2 +operation_mode = 1 +#Define the time that a paste will be considerate duplicate. in seconds (1day = 86400) +ttl_duplicate = 86400 + ##### Redis ##### [Redis_Cache] host = localhost @@ -66,6 +72,12 @@ host = localhost port = 6379 db = 2 +[Redis_Mixer] +host = localhost +port = 6381 +db = 1 +channel = 102 + ##### LevelDB ##### [Redis_Level_DB_Curve] host = localhost @@ -111,6 +123,8 @@ path = indexdir ############################################################################### +# For multiple feed, add them with "," without space +# e.g.: tcp://127.0.0.1:5556,tcp://127.0.0.1:5557 [ZMQ_Global] #address = tcp://crf.circl.lu:5556 address = tcp://127.0.0.1:5556 diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index 53bbb2a6..dee8baff 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -1,5 +1,9 @@ -[Global] +[Mixer] subscribe = ZMQ_Global +publish = Redis_Mixer + +[Global] +subscribe = Redis_Mixer publish = Redis_Global,Redis_ModuleStats [Duplicates]