diff --git a/bin/Helper.py b/bin/Helper.py index d6441b9a..1429904d 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -98,6 +98,7 @@ class PubSub(object): msg = sub.recv(zmq.NOBLOCK) yield msg.split(' ', 1)[1] except zmq.error.Again as e: + time.sleep(0.2) pass else: raise Exception('No subscribe function defined') diff --git a/bin/Mixer.py b/bin/Mixer.py index ca5cfc90..c6a7da2b 100755 --- a/bin/Mixer.py +++ b/bin/Mixer.py @@ -19,7 +19,10 @@ Depending on the configuration, this module will process the feed as follow: - Elseif, the saved content associated with the paste is not the same, process it - Else, do not process it but keep track for statistics on duplicate -Note that the hash of the content is defined as the gzip64encoded +Note that the hash of the content is defined as the gzip64encoded. + +Every data coming from a named feed can be sent to a pre-processing module before going to the global module. +The mapping can be done via the variable feed_queue_mapping Requirements ------------ @@ -40,8 +43,7 @@ from Helper import Process # CONFIG # refresh_time = 30 - - +feed_queue_mapping = { "feeder2": "preProcess1" } # Map a feeder name to a pre-processing module if __name__ == '__main__': publisher.port = 6380 @@ -62,9 +64,9 @@ if __name__ == '__main__': # REDIS # server = redis.StrictRedis( - host=cfg.get("Redis_Mixer", "host"), - port=cfg.getint("Redis_Mixer", "port"), - db=cfg.getint("Redis_Mixer", "db")) + host=cfg.get("Redis_Mixer_Cache", "host"), + port=cfg.getint("Redis_Mixer_Cache", "port"), + db=cfg.getint("Redis_Mixer_Cache", "db")) # LOGGING # publisher.info("Feed Script started to receive & publish.") @@ -86,13 +88,13 @@ if __name__ == '__main__': if message is not None: splitted = message.split() if len(splitted) == 2: - paste, gzip64encoded = splitted + complete_paste, gzip64encoded = splitted try: - feeder_name, paste_name = paste.split('>') + feeder_name, paste_name = complete_paste.split('>') feeder_name.replace(" ","") except ValueError as e: feeder_name = "unnamed_feeder" - paste_name = paste + paste_name = complete_paste # Processed paste processed_paste += 1 @@ -111,8 +113,13 @@ if __name__ == '__main__': #STATS duplicated_paste_per_feeder[feeder_name] += 1 else: # New content - p.populate_set_out(relay_message) - # OR populate another set based on the feeder_name + + # populate Global OR populate another set based on the feeder_name + if feeder_name in feed_queue_mapping: + p.populate_set_out(relay_message, feed_queue_mapping[feeder_name]) + else: + p.populate_set_out(relay_message, 'Mixer') + server.sadd(gzip64encoded, feeder_name) server.expire(gzip64encoded, ttl_key) @@ -128,8 +135,13 @@ if __name__ == '__main__': server.sadd(paste_name, feeder_name) server.expire(paste_name, ttl_key) server.expire('HASH_'+paste_name, ttl_key) - p.populate_set_out(relay_message) - # OR populate another set based on the feeder_name + + # populate Global OR populate another set based on the feeder_name + if feeder_name in feed_queue_mapping: + p.populate_set_out(relay_message, feed_queue_mapping[feeder_name]) + else: + p.populate_set_out(relay_message, 'Mixer') + else: if gzip64encoded != content: # Same paste name but different content @@ -137,8 +149,13 @@ if __name__ == '__main__': duplicated_paste_per_feeder[feeder_name] += 1 server.sadd(paste_name, feeder_name) server.expire(paste_name, ttl_key) - p.populate_set_out(relay_message) - # OR populate another set based on the feeder_name + + # populate Global OR populate another set based on the feeder_name + if feeder_name in feed_queue_mapping: + p.populate_set_out(relay_message, feed_queue_mapping[feeder_name]) + else: + p.populate_set_out(relay_message, 'Mixer') + else: # Already processed # Keep track of processed pastes diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 880bf169..4f2899a0 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -72,11 +72,10 @@ host = localhost port = 6379 db = 2 -[Redis_Mixer] +[Redis_Mixer_Cache] host = localhost port = 6381 db = 1 -channel = 102 ##### LevelDB ##### [Redis_Level_DB_Curve] diff --git a/bin/packages/modules.cfg b/bin/packages/modules.cfg index dee8baff..c82f5c6b 100644 --- a/bin/packages/modules.cfg +++ b/bin/packages/modules.cfg @@ -1,11 +1,15 @@ [Mixer] subscribe = ZMQ_Global -publish = Redis_Mixer +publish = Redis_Mixer,Redis_preProcess1 [Global] subscribe = Redis_Mixer publish = Redis_Global,Redis_ModuleStats +[PreProcessFeed] +subscribe = Redis_preProcess1 +publish = Redis_Mixer + [Duplicates] subscribe = Redis_Duplicate diff --git a/bin/preProcessFeed.py b/bin/preProcessFeed.py new file mode 100755 index 00000000..fe542647 --- /dev/null +++ b/bin/preProcessFeed.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + +import time +from pubsublogger import publisher + +from Helper import Process + + +def do_something(message): + splitted = message.split() + if len(splitted) == 2: + paste_name, gzip64encoded = splitted + + paste_name = paste_name.replace("pastebin", "pastebinPROCESSED") + + to_send = "{0} {1}".format(paste_name, gzip64encoded) + return to_send + +if __name__ == '__main__': + # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) + # Port of the redis instance used by pubsublogger + publisher.port = 6380 + # Script is the default channel used for the modules. + publisher.channel = 'Script' + + # Section name in bin/packages/modules.cfg + config_section = 'PreProcessFeed' + + # Setup the I/O queues + p = Process(config_section) + + # Sent to the logging a description of the module + publisher.info("") + + # Endless loop getting messages from the input queue + while True: + # Get one message from the input queue + message = p.get_from_set() + if message is None: + publisher.debug("{} queue is empty, waiting".format(config_section)) + print "queue empty" + time.sleep(1) + continue + + # Do something with the message from the queue + new_message = do_something(message) + + # (Optional) Send that thing to the next queue + p.populate_set_out(new_message) diff --git a/var/www/templates/index.html b/var/www/templates/index.html index a943369e..f5dfb7f9 100644 --- a/var/www/templates/index.html +++ b/var/www/templates/index.html @@ -95,12 +95,14 @@
- Feeder(s) Monitor: Processed pastes and filtered duplicated + Feeder(s) Monitor:
-
-
+ Processed pastes +
+ Filtered duplicated +