Added sleep in helper for multiple listening interfaces (reduce useless work), Added a simple way to pre-process feed before sending it to the global feed.

This commit is contained in:
Mokaddem 2017-01-09 14:12:26 +01:00
parent 24c5621d29
commit 0e39f516a9
6 changed files with 94 additions and 21 deletions

View file

@ -98,6 +98,7 @@ class PubSub(object):
msg = sub.recv(zmq.NOBLOCK) msg = sub.recv(zmq.NOBLOCK)
yield msg.split(' ', 1)[1] yield msg.split(' ', 1)[1]
except zmq.error.Again as e: except zmq.error.Again as e:
time.sleep(0.2)
pass pass
else: else:
raise Exception('No subscribe function defined') raise Exception('No subscribe function defined')

View file

@ -19,7 +19,10 @@ Depending on the configuration, this module will process the feed as follow:
- Elseif, the saved content associated with the paste is not the same, process it - Elseif, the saved content associated with the paste is not the same, process it
- Else, do not process it but keep track for statistics on duplicate - Else, do not process it but keep track for statistics on duplicate
Note that the hash of the content is defined as the gzip64encoded Note that the hash of the content is defined as the gzip64encoded.
Every data coming from a named feed can be sent to a pre-processing module before going to the global module.
The mapping can be done via the variable feed_queue_mapping
Requirements Requirements
------------ ------------
@ -40,8 +43,7 @@ from Helper import Process
# CONFIG # # CONFIG #
refresh_time = 30 refresh_time = 30
feed_queue_mapping = { "feeder2": "preProcess1" } # Map a feeder name to a pre-processing module
if __name__ == '__main__': if __name__ == '__main__':
publisher.port = 6380 publisher.port = 6380
@ -62,9 +64,9 @@ if __name__ == '__main__':
# REDIS # # REDIS #
server = redis.StrictRedis( server = redis.StrictRedis(
host=cfg.get("Redis_Mixer", "host"), host=cfg.get("Redis_Mixer_Cache", "host"),
port=cfg.getint("Redis_Mixer", "port"), port=cfg.getint("Redis_Mixer_Cache", "port"),
db=cfg.getint("Redis_Mixer", "db")) db=cfg.getint("Redis_Mixer_Cache", "db"))
# LOGGING # # LOGGING #
publisher.info("Feed Script started to receive & publish.") publisher.info("Feed Script started to receive & publish.")
@ -86,13 +88,13 @@ if __name__ == '__main__':
if message is not None: if message is not None:
splitted = message.split() splitted = message.split()
if len(splitted) == 2: if len(splitted) == 2:
paste, gzip64encoded = splitted complete_paste, gzip64encoded = splitted
try: try:
feeder_name, paste_name = paste.split('>') feeder_name, paste_name = complete_paste.split('>')
feeder_name.replace(" ","") feeder_name.replace(" ","")
except ValueError as e: except ValueError as e:
feeder_name = "unnamed_feeder" feeder_name = "unnamed_feeder"
paste_name = paste paste_name = complete_paste
# Processed paste # Processed paste
processed_paste += 1 processed_paste += 1
@ -111,8 +113,13 @@ if __name__ == '__main__':
#STATS #STATS
duplicated_paste_per_feeder[feeder_name] += 1 duplicated_paste_per_feeder[feeder_name] += 1
else: # New content else: # New content
p.populate_set_out(relay_message)
# OR populate another set based on the feeder_name # populate Global OR populate another set based on the feeder_name
if feeder_name in feed_queue_mapping:
p.populate_set_out(relay_message, feed_queue_mapping[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
server.sadd(gzip64encoded, feeder_name) server.sadd(gzip64encoded, feeder_name)
server.expire(gzip64encoded, ttl_key) server.expire(gzip64encoded, ttl_key)
@ -128,8 +135,13 @@ if __name__ == '__main__':
server.sadd(paste_name, feeder_name) server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key) server.expire(paste_name, ttl_key)
server.expire('HASH_'+paste_name, ttl_key) server.expire('HASH_'+paste_name, ttl_key)
p.populate_set_out(relay_message)
# OR populate another set based on the feeder_name # populate Global OR populate another set based on the feeder_name
if feeder_name in feed_queue_mapping:
p.populate_set_out(relay_message, feed_queue_mapping[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else: else:
if gzip64encoded != content: if gzip64encoded != content:
# Same paste name but different content # Same paste name but different content
@ -137,8 +149,13 @@ if __name__ == '__main__':
duplicated_paste_per_feeder[feeder_name] += 1 duplicated_paste_per_feeder[feeder_name] += 1
server.sadd(paste_name, feeder_name) server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key) server.expire(paste_name, ttl_key)
p.populate_set_out(relay_message)
# OR populate another set based on the feeder_name # populate Global OR populate another set based on the feeder_name
if feeder_name in feed_queue_mapping:
p.populate_set_out(relay_message, feed_queue_mapping[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else: else:
# Already processed # Already processed
# Keep track of processed pastes # Keep track of processed pastes

View file

@ -72,11 +72,10 @@ host = localhost
port = 6379 port = 6379
db = 2 db = 2
[Redis_Mixer] [Redis_Mixer_Cache]
host = localhost host = localhost
port = 6381 port = 6381
db = 1 db = 1
channel = 102
##### LevelDB ##### ##### LevelDB #####
[Redis_Level_DB_Curve] [Redis_Level_DB_Curve]

View file

@ -1,11 +1,15 @@
[Mixer] [Mixer]
subscribe = ZMQ_Global subscribe = ZMQ_Global
publish = Redis_Mixer publish = Redis_Mixer,Redis_preProcess1
[Global] [Global]
subscribe = Redis_Mixer subscribe = Redis_Mixer
publish = Redis_Global,Redis_ModuleStats publish = Redis_Global,Redis_ModuleStats
[PreProcessFeed]
subscribe = Redis_preProcess1
publish = Redis_Mixer
[Duplicates] [Duplicates]
subscribe = Redis_Duplicate subscribe = Redis_Duplicate

50
bin/preProcessFeed.py Executable file
View file

@ -0,0 +1,50 @@
#!/usr/bin/env python2
# -*-coding:UTF-8 -*
import time
from pubsublogger import publisher
from Helper import Process
def do_something(message):
splitted = message.split()
if len(splitted) == 2:
paste_name, gzip64encoded = splitted
paste_name = paste_name.replace("pastebin", "pastebinPROCESSED")
to_send = "{0} {1}".format(paste_name, gzip64encoded)
return to_send
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
publisher.port = 6380
# Script is the default channel used for the modules.
publisher.channel = 'Script'
# Section name in bin/packages/modules.cfg
config_section = 'PreProcessFeed'
# Setup the I/O queues
p = Process(config_section)
# Sent to the logging a description of the module
publisher.info("<description of the module>")
# Endless loop getting messages from the input queue
while True:
# Get one message from the input queue
message = p.get_from_set()
if message is None:
publisher.debug("{} queue is empty, waiting".format(config_section))
print "queue empty"
time.sleep(1)
continue
# Do something with the message from the queue
new_message = do_something(message)
# (Optional) Send that thing to the next queue
p.populate_set_out(new_message)

View file

@ -95,12 +95,14 @@
<div class="col-lg-6"> <div class="col-lg-6">
<div class="panel panel-default"> <div class="panel panel-default">
<div class="panel-heading"> <div class="panel-heading">
<i class="fa fa-bar-chart-o fa-fw"></i> Feeder(s) Monitor: Processed pastes and filtered duplicated <i class="fa fa-bar-chart-o fa-fw"></i> Feeder(s) Monitor:
</div> </div>
<div id="panelbody" class="panel-body" style="height:420px;"> <div id="panelbody" class="panel-body" style="height:420px;">
<div id="Proc_feeder" style="height: 200px; padding: 0px; position: relative;"></div> <strong>Processed pastes</strong>
<div id="Dup_feeder" style="height: 200px; padding: 0px; position: relative;"></div> <div id="Proc_feeder" style="height: 250px; padding: 0px; position: relative;"></div>
<strong>Filtered duplicated</strong>
<div id="Dup_feeder" style="height: 100px; padding: 0px; position: relative;"></div>
</div> </div>
<!-- /.panel-body --> <!-- /.panel-body -->