mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-26 07:47:17 +00:00
Big refactoring, make the queues more flexible
This commit is contained in:
parent
623e876f3b
commit
abfe13436b
40 changed files with 412 additions and 734 deletions
|
@ -30,34 +30,28 @@ import time
|
|||
from packages import Paste
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'attributes'
|
||||
config_section = 'Attributes'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
p = Process(config_section)
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""ZMQ Attribute is Running""")
|
||||
publisher.info("Attribute is Running")
|
||||
|
||||
while True:
|
||||
try:
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
|
||||
if message is not None:
|
||||
PST = Paste.Paste(message.split(" ", -1)[-1])
|
||||
PST = Paste.Paste(message)
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Script Attribute is idling 10s")
|
||||
time.sleep(10)
|
||||
publisher.debug("Script Attribute is idling 1s")
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# FIXME do it directly in the class
|
|
@ -42,21 +42,15 @@ import time
|
|||
from pubsublogger import publisher
|
||||
from packages import Paste
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Words'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'categ'
|
||||
config_section = 'Categ'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
|
||||
# Publisher
|
||||
pub_config_section = 'PubSub_Categ'
|
||||
h.zmq_pub(pub_config_section, None)
|
||||
p = Process(config_section)
|
||||
|
||||
# SCRIPT PARSER #
|
||||
parser = argparse.ArgumentParser(
|
||||
|
@ -71,11 +65,11 @@ if __name__ == "__main__":
|
|||
args = parser.parse_args()
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info(
|
||||
"Script Categ subscribed to channel {}".format(h.sub_channel))
|
||||
publisher.info("Script Categ started")
|
||||
|
||||
categories = ['CreditCards', 'Mail', 'Onion', 'Web']
|
||||
tmp_dict = {}
|
||||
for filename in os.listdir(args.d):
|
||||
for filename in categories:
|
||||
bname = os.path.basename(filename)
|
||||
tmp_dict[bname] = []
|
||||
with open(os.path.join(args.d, filename), 'r') as f:
|
||||
|
@ -85,9 +79,9 @@ if __name__ == "__main__":
|
|||
prec_filename = None
|
||||
|
||||
while True:
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
if message is not None:
|
||||
channel, filename, word, score = message.split()
|
||||
filename, word, score = message.split()
|
||||
|
||||
if prec_filename is None or filename != prec_filename:
|
||||
PST = Paste.Paste(filename)
|
||||
|
@ -96,17 +90,14 @@ if __name__ == "__main__":
|
|||
for categ, words_list in tmp_dict.items():
|
||||
|
||||
if word.lower() in words_list:
|
||||
h.pub_channel = categ
|
||||
h.zmq_pub_send('{} {} {}'.format(PST.p_path, word, score))
|
||||
msg = '{} {} {}'.format(PST.p_path, word, score)
|
||||
p.populate_set_out(msg, categ)
|
||||
|
||||
publisher.info(
|
||||
'Categ;{};{};{};Detected {} "{}"'.format(
|
||||
PST.p_source, PST.p_date, PST.p_name, score, word))
|
||||
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Script Categ is Idling 10s")
|
||||
print 'Sleeping'
|
||||
time.sleep(10)
|
|
@ -6,26 +6,27 @@ from packages import Paste
|
|||
from packages import lib_refine
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'cards'
|
||||
config_section = 'CreditCards'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
p = Process(config_section)
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("Creditcard script subscribed to channel creditcard_categ")
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
prec_filename = None
|
||||
|
||||
creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?"
|
||||
|
||||
# FIXME For retro compatibility
|
||||
channel = 'creditcard_categ'
|
||||
|
||||
# mastercard_regex = "5[1-5]\d{2}([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
||||
# visa_regex = "4\d{3}([\ \-]?)\d{4}\1\d{4}\1\d{4}"
|
||||
# discover_regex = "6(?:011\d\d|5\d{4}|4[4-9]\d{3}|22(?:1(?:2[6-9]|
|
||||
|
@ -37,7 +38,7 @@ if __name__ == "__main__":
|
|||
|
||||
while True:
|
||||
if message is not None:
|
||||
channel, filename, word, score = message.split()
|
||||
filename, word, score = message.split()
|
||||
|
||||
if prec_filename is None or filename != prec_filename:
|
||||
creditcard_set = set([])
|
||||
|
@ -62,11 +63,8 @@ if __name__ == "__main__":
|
|||
prec_filename = filename
|
||||
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Script creditcard is idling 1m")
|
||||
print 'Sleeping'
|
||||
time.sleep(60)
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
|
@ -29,38 +29,35 @@ from pubsublogger import publisher
|
|||
from packages import lib_words
|
||||
import os
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Words'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = "curve"
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
config_section = 'Curve'
|
||||
p = Process(config_section)
|
||||
|
||||
# REDIS #
|
||||
r_serv1 = redis.StrictRedis(
|
||||
host=h.config.get("Redis_Level_DB", "host"),
|
||||
port=h.config.get("Redis_Level_DB", "port"),
|
||||
db=h.config.get("Redis_Level_DB", "db"))
|
||||
host=p.config.get("Redis_Level_DB", "host"),
|
||||
port=p.config.get("Redis_Level_DB", "port"),
|
||||
db=p.config.get("Redis_Level_DB", "db"))
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("Script Curve subscribed to {}".format(h.sub_channel))
|
||||
publisher.info("Script Curve started")
|
||||
|
||||
# FILE CURVE SECTION #
|
||||
csv_path = os.path.join(os.environ['AIL_HOME'],
|
||||
h.config.get("Directories", "wordtrending_csv"))
|
||||
p.config.get("Directories", "wordtrending_csv"))
|
||||
wordfile_path = os.path.join(os.environ['AIL_HOME'],
|
||||
h.config.get("Directories", "wordsfile"))
|
||||
p.config.get("Directories", "wordsfile"))
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
prec_filename = None
|
||||
while True:
|
||||
if message is not None:
|
||||
channel, filename, word, score = message.split()
|
||||
filename, word, score = message.split()
|
||||
if prec_filename is None or filename != prec_filename:
|
||||
PST = Paste.Paste(filename)
|
||||
lib_words.create_curve_with_word_file(
|
||||
|
@ -69,7 +66,6 @@ if __name__ == "__main__":
|
|||
|
||||
prec_filename = filename
|
||||
prev_score = r_serv1.hget(word.lower(), PST.p_date)
|
||||
print prev_score
|
||||
if prev_score is not None:
|
||||
r_serv1.hset(word.lower(), PST.p_date,
|
||||
int(prev_score) + int(score))
|
||||
|
@ -77,12 +73,7 @@ if __name__ == "__main__":
|
|||
r_serv1.hset(word.lower(), PST.p_date, score)
|
||||
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Script Curve is Idling")
|
||||
print "sleepin"
|
||||
print "sleeping"
|
||||
time.sleep(1)
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
|
@ -19,17 +19,15 @@ from packages import Paste
|
|||
from pubsublogger import publisher
|
||||
from pybloomfilter import BloomFilter
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'duplicate'
|
||||
config_section = 'Duplicates'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
p = Process(config_section)
|
||||
|
||||
# REDIS #
|
||||
# DB OBJECT & HASHS ( DISK )
|
||||
|
@ -38,16 +36,15 @@ if __name__ == "__main__":
|
|||
for year in xrange(2013, 2015):
|
||||
for month in xrange(0, 16):
|
||||
dico_redis[str(year)+str(month).zfill(2)] = redis.StrictRedis(
|
||||
host=h.config.get("Redis_Level_DB", "host"), port=year,
|
||||
host=p.config.get("Redis_Level_DB", "host"), port=year,
|
||||
db=month)
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("""Script duplicate subscribed to channel {0}""".format(
|
||||
h.config.get("PubSub_Global", "channel")))
|
||||
publisher.info("Script duplicate started")
|
||||
|
||||
set_limit = 100
|
||||
bloompath = os.path.join(os.environ['AIL_HOME'],
|
||||
h.config.get("Directories", "bloomfilters"))
|
||||
p.config.get("Directories", "bloomfilters"))
|
||||
|
||||
bloop_path_set = set()
|
||||
while True:
|
||||
|
@ -59,17 +56,13 @@ if __name__ == "__main__":
|
|||
|
||||
x = time.time()
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
if message is not None:
|
||||
path = message.split(" ", -1)[-1]
|
||||
path = message
|
||||
PST = Paste.Paste(path)
|
||||
else:
|
||||
publisher.debug("Script Attribute is idling 10s")
|
||||
time.sleep(10)
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
continue
|
||||
|
||||
PST._set_p_hash_kind("md5")
|
|
@ -25,56 +25,44 @@ import os
|
|||
import time
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __name__ == '__main__':
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
publisher.channel = 'Script'
|
||||
|
||||
config_section = 'Feed'
|
||||
config_channel = 'topicfilter'
|
||||
subscriber_name = 'feed'
|
||||
config_section = 'Global'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
|
||||
# Publisher
|
||||
pub_config_section = "PubSub_Global"
|
||||
pub_config_channel = 'channel'
|
||||
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||
p = Process(config_section)
|
||||
|
||||
# LOGGING #
|
||||
publisher.info("Feed Script started to receive & publish.")
|
||||
|
||||
while True:
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
# Recovering the streamed message informations.
|
||||
if message is not None:
|
||||
if len(message.split()) == 3:
|
||||
topic, paste, gzip64encoded = message.split()
|
||||
print paste
|
||||
splitted = message.split()
|
||||
if len(splitted) == 2:
|
||||
paste, gzip64encoded = splitted
|
||||
else:
|
||||
# TODO Store the name of the empty paste inside a Redis-list.
|
||||
print "Empty Paste: not processed"
|
||||
publisher.debug("Empty Paste: {0} not processed".format(paste))
|
||||
continue
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
print "Empty Queues: Waiting..."
|
||||
time.sleep(10)
|
||||
time.sleep(1)
|
||||
continue
|
||||
# Creating the full filepath
|
||||
filename = os.path.join(os.environ['AIL_HOME'],
|
||||
h.config.get("Directories", "pastes"), paste)
|
||||
p.config.get("Directories", "pastes"), paste)
|
||||
dirname = os.path.dirname(filename)
|
||||
if not os.path.exists(dirname):
|
||||
os.makedirs(dirname)
|
||||
|
||||
with open(filename, 'wb') as f:
|
||||
f.write(base64.standard_b64decode(gzip64encoded))
|
||||
|
||||
h.zmq_pub_send(filename)
|
||||
p.populate_set_out(filename)
|
180
bin/Helper.py
180
bin/Helper.py
|
@ -15,74 +15,144 @@ import redis
|
|||
import ConfigParser
|
||||
import os
|
||||
import zmq
|
||||
import time
|
||||
import json
|
||||
|
||||
|
||||
class Redis_Queues(object):
|
||||
class PubSub(object):
|
||||
|
||||
def __init__(self, conf_section, conf_channel, subscriber_name):
|
||||
def __init__(self):
|
||||
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
|
||||
print configfile
|
||||
if not os.path.exists(configfile):
|
||||
raise Exception('Unable to find the configuration file. \
|
||||
Did you set environment variables? \
|
||||
Or activate the virtualenv.')
|
||||
self.config = ConfigParser.ConfigParser()
|
||||
self.config.read(configfile)
|
||||
self.subscriber_name = subscriber_name
|
||||
self.redis_sub = False
|
||||
self.zmq_sub = False
|
||||
self.subscriber = None
|
||||
self.publishers = {'Redis': [], 'ZMQ': []}
|
||||
|
||||
self.sub_channel = self.config.get(conf_section, conf_channel)
|
||||
self.sub_address = self.config.get(conf_section, 'address')
|
||||
self.redis_channel = self.sub_channel + self.subscriber_name
|
||||
|
||||
# Redis Queue
|
||||
config_section = "Redis_Queues"
|
||||
self.r_queues = redis.StrictRedis(
|
||||
host=self.config.get(config_section, "host"),
|
||||
port=self.config.getint(config_section, "port"),
|
||||
db=self.config.getint(config_section, "db"))
|
||||
|
||||
def zmq_sub(self):
|
||||
context = zmq.Context()
|
||||
self.sub_socket = context.socket(zmq.SUB)
|
||||
self.sub_socket.connect(self.sub_address)
|
||||
self.sub_socket.setsockopt(zmq.SUBSCRIBE, self.sub_channel)
|
||||
|
||||
def zmq_pub(self, config_section, config_channel):
|
||||
context = zmq.Context()
|
||||
self.pub_socket = context.socket(zmq.PUB)
|
||||
self.pub_socket.bind(self.config.get(config_section, 'adress'))
|
||||
if config_channel is not None:
|
||||
self.pub_channel = self.config.get(config_section, config_channel)
|
||||
def setup_subscribe(self, conn_name):
|
||||
if self.config.has_section(conn_name):
|
||||
channel = self.config.get(conn_name, 'channel')
|
||||
else:
|
||||
# The publishing channel is defined dynamically
|
||||
self.pub_channel = None
|
||||
channel = conn_name.split('_')[1]
|
||||
if conn_name.startswith('Redis'):
|
||||
self.redis_sub = True
|
||||
r = redis.StrictRedis(
|
||||
host=self.config.get('RedisPubSub', 'host'),
|
||||
port=self.config.get('RedisPubSub', 'port'),
|
||||
db=self.config.get('RedisPubSub', 'db'))
|
||||
self.subscriber = r.pubsub()
|
||||
self.subscriber.psubscribe(channel)
|
||||
self.subscriber.get_message()
|
||||
elif conn_name.startswith('ZMQ'):
|
||||
self.zmq_sub = True
|
||||
context = zmq.Context()
|
||||
self.subscriber = context.socket(zmq.SUB)
|
||||
self.subscriber.connect(self.config.get(conn_name, 'address'))
|
||||
self.subscriber.setsockopt(zmq.SUBSCRIBE, channel)
|
||||
|
||||
def zmq_pub_send(self, msg):
|
||||
if self.pub_channel is None:
|
||||
raise Exception('A channel is reqired to send a message.')
|
||||
self.pub_socket.send('{} {}'.format(self.pub_channel, msg))
|
||||
|
||||
def redis_rpop(self):
|
||||
return self.r_queues.rpop(self.redis_channel)
|
||||
|
||||
def redis_queue_shutdown(self, is_queue=False):
|
||||
if is_queue:
|
||||
flag = self.subscriber_name + '_Q'
|
||||
def setup_publish(self, conn_name):
|
||||
if self.config.has_section(conn_name):
|
||||
channel = self.config.get(conn_name, 'channel')
|
||||
else:
|
||||
flag = self.subscriber_name
|
||||
# srem returns False if the element does not exists
|
||||
return self.r_queues.srem('SHUTDOWN_FLAGS', flag)
|
||||
channel = conn_name.split('_')[1]
|
||||
if conn_name.startswith('Redis'):
|
||||
r = redis.StrictRedis(host=self.config.get('RedisPubSub', 'host'),
|
||||
port=self.config.get('RedisPubSub', 'port'),
|
||||
db=self.config.get('RedisPubSub', 'db'))
|
||||
self.publishers['Redis'].append((r, channel))
|
||||
elif conn_name.startswith('ZMQ'):
|
||||
context = zmq.Context()
|
||||
p = context.socket(zmq.PUB)
|
||||
p.bind(self.config.get(conn_name, 'address'))
|
||||
self.publishers['ZMQ'].append((p, channel))
|
||||
|
||||
def redis_queue_subscribe(self, publisher):
|
||||
self.zmq_sub()
|
||||
publisher.info("Suscribed to channel {}".format(self.sub_channel))
|
||||
def publish(self, message):
|
||||
m = json.loads(message)
|
||||
channel_message = m.get('channel')
|
||||
for p, channel in self.publishers['Redis']:
|
||||
if channel_message is None or channel_message == channel:
|
||||
p.publish(channel, m['message'])
|
||||
for p, channel in self.publishers['ZMQ']:
|
||||
if channel_message is None or channel_message == channel:
|
||||
p.send('{} {}'.format(channel, m['message']))
|
||||
|
||||
def subscribe(self):
|
||||
if self.redis_sub:
|
||||
for msg in self.subscriber.listen():
|
||||
if msg.get('data', None) is not None:
|
||||
yield msg['data']
|
||||
elif self.zmq_sub:
|
||||
while True:
|
||||
msg = self.subscriber.recv()
|
||||
yield msg.split(' ', 1)[1]
|
||||
else:
|
||||
raise Exception('No subscribe function defined')
|
||||
|
||||
|
||||
class Process(object):
|
||||
|
||||
def __init__(self, conf_section):
|
||||
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
|
||||
if not os.path.exists(configfile):
|
||||
raise Exception('Unable to find the configuration file. \
|
||||
Did you set environment variables? \
|
||||
Or activate the virtualenv.')
|
||||
modulesfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg')
|
||||
self.config = ConfigParser.ConfigParser()
|
||||
self.config.read(configfile)
|
||||
self.modules = ConfigParser.ConfigParser()
|
||||
self.modules.read(modulesfile)
|
||||
self.subscriber_name = conf_section
|
||||
self.pubsub = None
|
||||
if self.modules.has_section(conf_section):
|
||||
self.pubsub = PubSub()
|
||||
else:
|
||||
raise Exception('Your process has to listen to at least one feed.')
|
||||
self.r_temp = redis.StrictRedis(
|
||||
host=self.config.get('RedisPubSub', 'host'),
|
||||
port=self.config.get('RedisPubSub', 'port'),
|
||||
db=self.config.get('RedisPubSub', 'db'))
|
||||
|
||||
def populate_set_in(self):
|
||||
# monoproc
|
||||
src = self.modules.get(self.subscriber_name, 'subscribe')
|
||||
self.pubsub.setup_subscribe(src)
|
||||
for msg in self.pubsub.subscribe():
|
||||
in_set = self.subscriber_name + 'in'
|
||||
self.r_temp.sadd(in_set, msg)
|
||||
self.r_temp.hset('queues', self.subscriber_name,
|
||||
int(self.r_temp.scard(in_set)))
|
||||
|
||||
def get_from_set(self):
|
||||
# multiproc
|
||||
in_set = self.subscriber_name + 'in'
|
||||
self.r_temp.hset('queues', self.subscriber_name,
|
||||
int(self.r_temp.scard(in_set)))
|
||||
return self.r_temp.spop(in_set)
|
||||
|
||||
def populate_set_out(self, msg, channel=None):
|
||||
# multiproc
|
||||
msg = {'message': msg}
|
||||
if channel is not None:
|
||||
msg.update({'channel': channel})
|
||||
self.r_temp.sadd(self.subscriber_name + 'out', json.dumps(msg))
|
||||
|
||||
def publish(self):
|
||||
# monoproc
|
||||
if not self.modules.has_option(self.subscriber_name, 'publish'):
|
||||
return False
|
||||
dest = self.modules.get(self.subscriber_name, 'publish')
|
||||
# We can have multiple publisher
|
||||
for name in dest.split(','):
|
||||
self.pubsub.setup_publish(name)
|
||||
while True:
|
||||
msg = self.sub_socket.recv()
|
||||
p = self.r_queues.pipeline()
|
||||
p.sadd("queues", self.redis_channel)
|
||||
p.lpush(self.redis_channel, msg)
|
||||
p.execute()
|
||||
if self.redis_queue_shutdown(True):
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
message = self.r_temp.spop(self.subscriber_name + 'out')
|
||||
if message is None:
|
||||
time.sleep(1)
|
||||
continue
|
||||
self.pubsub.publish(message)
|
||||
|
|
|
@ -17,26 +17,21 @@ from whoosh.index import create_in, exists_in, open_dir
|
|||
from whoosh.fields import Schema, TEXT, ID
|
||||
import os
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
# Subscriber
|
||||
sub_config_section = 'PubSub_Global'
|
||||
sub_name = 'indexer'
|
||||
config_section = 'Indexer'
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'indexer'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
p = Process(config_section)
|
||||
|
||||
# Indexer configuration - index dir and schema setup
|
||||
indexpath = h.config.get("Indexer", "path")
|
||||
indexertype = h.config.get("Indexer", "type")
|
||||
indexpath = os.path.join(os.environ['AIL_HOME'],
|
||||
p.config.get("Indexer", "path"))
|
||||
indexertype = p.config.get("Indexer", "type")
|
||||
if indexertype == "whoosh":
|
||||
schema = Schema(title=TEXT(stored=True), path=ID(stored=True,
|
||||
unique=True),
|
||||
|
@ -49,18 +44,16 @@ if __name__ == "__main__":
|
|||
ix = open_dir(indexpath)
|
||||
|
||||
# LOGGING #
|
||||
publisher.info("""ZMQ Indexer is Running""")
|
||||
publisher.info("ZMQ Indexer is Running")
|
||||
|
||||
while True:
|
||||
try:
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
|
||||
if message is not None:
|
||||
PST = Paste.Paste(message.split(" ", -1)[-1])
|
||||
PST = Paste.Paste(message)
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
break
|
||||
publisher.debug("Script Indexer is idling 10s")
|
||||
publisher.debug("Script Indexer is idling 1s")
|
||||
time.sleep(1)
|
||||
continue
|
||||
docpath = message.split(" ", -1)[-1]
|
|
@ -32,26 +32,19 @@ import time
|
|||
from packages import Paste
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
if __name__ == '__main__':
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
publisher.channel = 'Script'
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'line'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
|
||||
# Publisher
|
||||
pub_config_section = 'PubSub_Longlines'
|
||||
h.zmq_pub(pub_config_section, None)
|
||||
config_section = 'Lines'
|
||||
p = Process(config_section)
|
||||
|
||||
# SCRIPT PARSER #
|
||||
parser = argparse.ArgumentParser(
|
||||
description='''This script is a part of the Analysis Information \
|
||||
Leak framework.''')
|
||||
description='This script is a part of the Analysis Information \
|
||||
Leak framework.')
|
||||
|
||||
parser.add_argument(
|
||||
'-max', type=int, default=500,
|
||||
|
@ -60,24 +53,18 @@ if __name__ == "__main__":
|
|||
|
||||
args = parser.parse_args()
|
||||
|
||||
channel_0 = h.config.get("PubSub_Longlines", "channel_0")
|
||||
channel_1 = h.config.get("PubSub_Longlines", "channel_1")
|
||||
|
||||
# FUNCTIONS #
|
||||
tmp_string = "Lines script Subscribed to channel {} and Start to publish \
|
||||
on channel {}, {}"
|
||||
publisher.info(tmp_string.format(h.sub_channel, channel_0, channel_1))
|
||||
on channel Longlines, Shortlines"
|
||||
publisher.info(tmp_string)
|
||||
|
||||
while True:
|
||||
try:
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
print message
|
||||
if message is not None:
|
||||
PST = Paste.Paste(message.split(" ", -1)[-1])
|
||||
PST = Paste.Paste(message)
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Tokeniser is idling 10s")
|
||||
time.sleep(10)
|
||||
continue
|
||||
|
@ -89,10 +76,9 @@ if __name__ == "__main__":
|
|||
|
||||
# FIXME Not used.
|
||||
PST.store.sadd("Pastes_Objects", PST.p_path)
|
||||
if lines_infos[1] >= args.max:
|
||||
h.pub_channel = channel_0
|
||||
if lines_infos[1] < args.max:
|
||||
p.populate_set_out(PST.p_path, 'LinesShort')
|
||||
else:
|
||||
h.pub_channel = channel_1
|
||||
h.zmq_pub_send(PST.p_path)
|
||||
p.populate_set_out(PST.p_path, 'LinesLong')
|
||||
except IOError:
|
||||
print "CRC Checksum Error on : ", PST.p_path
|
|
@ -9,28 +9,29 @@ from packages import Paste
|
|||
from packages import lib_refine
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_1'
|
||||
subscriber_name = 'emails'
|
||||
config_section = 'Mail'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
p = Process(config_section)
|
||||
|
||||
# REDIS #
|
||||
r_serv2 = redis.StrictRedis(
|
||||
host=h.config.get("Redis_Cache", "host"),
|
||||
port=h.config.getint("Redis_Cache", "port"),
|
||||
db=h.config.getint("Redis_Cache", "db"))
|
||||
host=p.config.get("Redis_Cache", "host"),
|
||||
port=p.config.getint("Redis_Cache", "port"),
|
||||
db=p.config.getint("Redis_Cache", "db"))
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("Suscribed to channel mails_categ")
|
||||
|
||||
message = h.redis_rpop()
|
||||
# FIXME For retro compatibility
|
||||
channel = 'mails_categ'
|
||||
|
||||
message = p.get_from_set()
|
||||
prec_filename = None
|
||||
|
||||
# Log as critical if there are more that that amout of valid emails
|
||||
|
@ -41,7 +42,8 @@ if __name__ == "__main__":
|
|||
while True:
|
||||
try:
|
||||
if message is not None:
|
||||
channel, filename, word, score = message.split()
|
||||
print message
|
||||
filename, word, score = message.split()
|
||||
|
||||
if prec_filename is None or filename != prec_filename:
|
||||
PST = Paste.Paste(filename)
|
||||
|
@ -65,14 +67,11 @@ if __name__ == "__main__":
|
|||
prec_filename = filename
|
||||
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Script Mails is Idling 10s")
|
||||
print 'Sleeping'
|
||||
time.sleep(10)
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
except dns.exception.Timeout:
|
||||
# FIXME retry!
|
||||
print "dns.exception.Timeout"
|
|
@ -27,32 +27,34 @@ from packages import Paste
|
|||
from pubsublogger import publisher
|
||||
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_2'
|
||||
subscriber_name = 'tor'
|
||||
config_section = 'Onion'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
p = Process(config_section)
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("Script subscribed to channel onion_categ")
|
||||
|
||||
# FIXME For retro compatibility
|
||||
channel = 'onion_categ'
|
||||
|
||||
# Getting the first message from redis.
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
prec_filename = None
|
||||
|
||||
# Thanks to Faup project for this regex
|
||||
# https://github.com/stricaud/faup
|
||||
url_regex = "([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|onion|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*"
|
||||
url_regex = "([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.onion)(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*"
|
||||
|
||||
while True:
|
||||
if message is not None:
|
||||
channel, filename, word, score = message.split()
|
||||
print message
|
||||
filename, word, score = message.split()
|
||||
|
||||
# "For each new paste"
|
||||
if prec_filename is None or filename != prec_filename:
|
||||
|
@ -64,8 +66,7 @@ if __name__ == "__main__":
|
|||
credential, subdomain, domain, host, tld, port, \
|
||||
resource_path, query_string, f1, f2, f3, f4 = x
|
||||
|
||||
if f1 == "onion":
|
||||
domains_list.append(domain)
|
||||
domains_list.append(domain)
|
||||
|
||||
# Saving the list of extracted onion domains.
|
||||
PST.__setattr__(channel, domains_list)
|
||||
|
@ -83,11 +84,8 @@ if __name__ == "__main__":
|
|||
prec_filename = filename
|
||||
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Script url is Idling 10s")
|
||||
print 'Sleeping'
|
||||
time.sleep(10)
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
24
bin/QueueIn.py
Executable file
24
bin/QueueIn.py
Executable file
|
@ -0,0 +1,24 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
|
||||
from pubsublogger import publisher
|
||||
from Helper import Process
|
||||
import argparse
|
||||
|
||||
|
||||
def run(config_section):
|
||||
p = Process(config_section)
|
||||
p.populate_set_in()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
publisher.port = 6380
|
||||
publisher.channel = 'Queuing'
|
||||
|
||||
parser = argparse.ArgumentParser(description='Entry queue for a module.')
|
||||
parser.add_argument("-c", "--config_section", type=str,
|
||||
help="Config section to use in the config file.")
|
||||
args = parser.parse_args()
|
||||
|
||||
run(args.config_section)
|
24
bin/QueueOut.py
Executable file
24
bin/QueueOut.py
Executable file
|
@ -0,0 +1,24 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
from pubsublogger import publisher
|
||||
from Helper import Process
|
||||
import argparse
|
||||
|
||||
|
||||
def run(config_section):
|
||||
p = Process(config_section)
|
||||
if not p.publish():
|
||||
print(config_section, 'has no publisher.')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
publisher.port = 6380
|
||||
publisher.channel = 'Queuing'
|
||||
|
||||
parser = argparse.ArgumentParser(description='Entry queue for a module.')
|
||||
parser.add_argument("-c", "--config_section", type=str,
|
||||
help="Config section to use in the config file.")
|
||||
args = parser.parse_args()
|
||||
|
||||
run(args.config_section)
|
|
@ -27,40 +27,28 @@ import time
|
|||
from packages import Paste
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Longlines'
|
||||
config_channel = 'channel_1'
|
||||
subscriber_name = 'tokenize'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
|
||||
# Publisher
|
||||
pub_config_section = 'PubSub_Words'
|
||||
pub_config_channel = 'channel_0'
|
||||
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||
config_section = 'Tokenize'
|
||||
p = Process(config_section)
|
||||
|
||||
# LOGGING #
|
||||
publisher.info("Tokeniser subscribed to channel {}".format(h.sub_channel))
|
||||
publisher.info("Tokeniser started")
|
||||
|
||||
while True:
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
print message
|
||||
if message is not None:
|
||||
paste = Paste.Paste(message.split(" ", -1)[-1])
|
||||
paste = Paste.Paste(message)
|
||||
for word, score in paste._get_top_words().items():
|
||||
if len(word) >= 4:
|
||||
h.zmq_pub_send('{} {} {}'.format(paste.p_path, word,
|
||||
score))
|
||||
msg = '{} {} {}'.format(paste.p_path, word, score)
|
||||
p.populate_set_out(msg)
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Tokeniser is idling 10s")
|
||||
time.sleep(10)
|
||||
print "sleepin"
|
|
@ -14,36 +14,32 @@ import socket
|
|||
import pycountry
|
||||
import ipaddress
|
||||
|
||||
import Helper
|
||||
from Helper import Process
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Script"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_3'
|
||||
subscriber_name = "urls"
|
||||
config_section = 'Web'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
|
||||
# Publisher
|
||||
pub_config_section = "PubSub_Url"
|
||||
pub_config_channel = 'channel'
|
||||
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||
p = Process(config_section)
|
||||
|
||||
# REDIS #
|
||||
r_serv2 = redis.StrictRedis(
|
||||
host=h.config.get("Redis_Cache", "host"),
|
||||
port=h.config.getint("Redis_Cache", "port"),
|
||||
db=h.config.getint("Redis_Cache", "db"))
|
||||
host=p.config.get("Redis_Cache", "host"),
|
||||
port=p.config.getint("Redis_Cache", "port"),
|
||||
db=p.config.getint("Redis_Cache", "db"))
|
||||
|
||||
# Country to log as critical
|
||||
cc_critical = h.config.get("PubSub_Url", "cc_critical")
|
||||
cc_critical = p.config.get("PubSub_Url", "cc_critical")
|
||||
|
||||
# FUNCTIONS #
|
||||
publisher.info("Script URL subscribed to channel web_categ")
|
||||
|
||||
message = h.redis_rpop()
|
||||
# FIXME For retro compatibility
|
||||
channel = 'web_categ'
|
||||
|
||||
message = p.get_from_set()
|
||||
prec_filename = None
|
||||
|
||||
url_regex = "(http|https|ftp)\://([a-zA-Z0-9\.\-]+(\:[a-zA-Z0-9\.&%\$\-]+)*@)*((25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])|localhost|([a-zA-Z0-9\-]+\.)*[a-zA-Z0-9\-]+\.(com|edu|gov|int|mil|net|org|biz|arpa|info|name|pro|aero|coop|museum|[a-zA-Z]{2}))(\:[0-9]+)*(/($|[a-zA-Z0-9\.\,\?\'\\\+&%\$#\=~_\-]+))*"
|
||||
|
@ -51,7 +47,7 @@ if __name__ == "__main__":
|
|||
while True:
|
||||
try:
|
||||
if message is not None:
|
||||
channel, filename, word, score = message.split()
|
||||
filename, word, score = message.split()
|
||||
|
||||
if prec_filename is None or filename != prec_filename:
|
||||
domains_list = []
|
||||
|
@ -62,7 +58,7 @@ if __name__ == "__main__":
|
|||
port, resource_path, query_string, f1, f2, f3, \
|
||||
f4 = x
|
||||
domains_list.append(domain)
|
||||
h.zmq_pub_send(str(x))
|
||||
p.populate_set_out(x, 'Url')
|
||||
publisher.debug('{} Published'.format(x))
|
||||
|
||||
if f1 == "onion":
|
||||
|
@ -110,13 +106,10 @@ if __name__ == "__main__":
|
|||
prec_filename = filename
|
||||
|
||||
else:
|
||||
if h.redis_queue_shutdown():
|
||||
print "Shutdown Flag Up: Terminating"
|
||||
publisher.warning("Shutdown Flag Up: Terminating.")
|
||||
break
|
||||
publisher.debug("Script url is Idling 10s")
|
||||
print 'Sleeping'
|
||||
time.sleep(10)
|
||||
|
||||
message = h.redis_rpop()
|
||||
message = p.get_from_set()
|
||||
except dns.exception.Timeout:
|
||||
print "dns.exception.Timeout", A_values
|
|
@ -1,37 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
"""
|
||||
The ZMQ_Feed_Q Module
|
||||
=====================
|
||||
|
||||
This module is the first of the ZMQ tree processing.
|
||||
It's subscribe to a data stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by others scripts
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Need running Redis instances.
|
||||
*Messages from the stream should be formated as follow:
|
||||
"channel_name"+" "+/path/to/the/paste.gz+" "base64_data_encoded_paste"
|
||||
|
||||
"""
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'Feed'
|
||||
config_channel = 'topicfilter'
|
||||
subscriber_name = 'feed'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,35 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_PubSub_Categ_Q Module
|
||||
============================
|
||||
|
||||
This module subscribe to a Publisher stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by others scripts.
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Running Redis instances.
|
||||
*Should register to the Publisher "ZMQ_PubSub_Tokenize"
|
||||
|
||||
"""
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = 'Queuing'
|
||||
|
||||
config_section = 'PubSub_Words'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'categ'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,34 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_PubSub_Line_Q Module
|
||||
============================
|
||||
|
||||
This module subscribe to a Publisher stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by others scripts.
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Running Redis instances.
|
||||
*Should register to the Publisher "ZMQ_Feed"
|
||||
|
||||
"""
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = "PubSub_Global"
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'line'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,35 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_PubSub_Tokenize_Q Module
|
||||
============================
|
||||
|
||||
This module subscribe to a Publisher stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by others scripts.
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Running Redis instances.
|
||||
*Should register to the Publisher "ZMQ_PubSub_Line" channel 1
|
||||
|
||||
"""
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Longlines'
|
||||
config_channel = 'channel_1'
|
||||
subscriber_name = 'tokenize'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,54 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_Pub_Global Module
|
||||
=========================
|
||||
|
||||
This module is consuming the Redis-list created by the script ./Dir.py.
|
||||
This module is as the same level of the ZMQ tree than the Module ZMQ_Feed
|
||||
|
||||
Whereas the ZMQ_Feed is poping the list created in redis by ZMQ_Feed_Q which is
|
||||
listening a stream, ZMQ_Pub_Global is poping the list created in redis by
|
||||
./Dir.py.
|
||||
|
||||
Thanks to this Module there is now two way to Feed the ZMQ tree:
|
||||
*By a continuous stream ..seealso:: ZMQ_Feed Module
|
||||
*Manually with this module and ./Dir.py script.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Need running Redis instances. (Redis)
|
||||
|
||||
"""
|
||||
import time
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Global"
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'global'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
|
||||
# Publisher
|
||||
pub_config_section = 'PubSub_Global'
|
||||
pub_config_channel = 'channel'
|
||||
h.zmq_pub(pub_config_section, pub_config_channel)
|
||||
|
||||
# LOGGING #
|
||||
publisher.info("Starting to publish.")
|
||||
|
||||
while True:
|
||||
filename = h.redis_rpop()
|
||||
|
||||
if filename is not None:
|
||||
h.zmq_pub_send(filename)
|
||||
else:
|
||||
time.sleep(10)
|
||||
publisher.debug("Nothing to publish")
|
|
@ -1,35 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_Sub_Attributes_Q Module
|
||||
============================
|
||||
|
||||
This module subscribe to a Publisher stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by others scripts.
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Running Redis instances.
|
||||
*Should register to the Publisher "ZMQ_Feed"
|
||||
|
||||
"""
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'attributes'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,18 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'cards'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,35 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_Sub_Curve_Q Module
|
||||
============================
|
||||
|
||||
This module subscribe to a Publisher stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by others scripts.
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them.
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Running Redis instances.
|
||||
*Should register to the Publisher "ZMQ_PubSub_Tokenize"
|
||||
|
||||
"""
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Words'
|
||||
config_channel = 'channel_0'
|
||||
subscriber_name = 'curve'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,17 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = 'Queuing'
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'duplicate'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,29 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_Sub_Indexer_Q Module
|
||||
============================
|
||||
|
||||
The ZMQ_Sub_Indexer_Q module subscribes to PubSub_Global ZMQ channel
|
||||
and bufferizes the data in a Redis FIFO.
|
||||
|
||||
The FIFO will be then processed by the Indexer scripts (ZMQ_Sub_Indexer)
|
||||
handling the indexing process of the files seen.
|
||||
|
||||
"""
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Global'
|
||||
config_channel = 'channel'
|
||||
subscriber_name = 'indexer'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,17 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_1'
|
||||
subscriber_name = 'emails'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,34 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
"""
|
||||
The ZMQ_Sub_Onion_Q Module
|
||||
============================
|
||||
|
||||
This module subscribe to a Publisher stream and put the received messages
|
||||
into a Redis-list waiting to be popped later by ZMQ_Sub_Onion.
|
||||
|
||||
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
||||
the same Subscriber name in both of them (here "tor")
|
||||
|
||||
Requirements
|
||||
------------
|
||||
|
||||
*Running Redis instances.
|
||||
*Should register to the Publisher "ZMQ_PubSub_Categ"
|
||||
|
||||
"""
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_2'
|
||||
subscriber_name = 'tor'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -1,18 +0,0 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
from pubsublogger import publisher
|
||||
|
||||
import Helper
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publisher.port = 6380
|
||||
publisher.channel = "Queuing"
|
||||
|
||||
config_section = 'PubSub_Categ'
|
||||
config_channel = 'channel_3'
|
||||
subscriber_name = 'urls'
|
||||
|
||||
h = Helper.Redis_Queues(config_section, config_channel, subscriber_name)
|
||||
h.redis_queue_subscribe(publisher)
|
|
@ -18,7 +18,8 @@ db = 0
|
|||
[Redis_Queues]
|
||||
host = localhost
|
||||
port = 6381
|
||||
db = 0
|
||||
db_sub = 0
|
||||
db_pub = 1
|
||||
|
||||
[Redis_Data_Merging]
|
||||
host = localhost
|
||||
|
@ -71,3 +72,18 @@ cc_critical = DE
|
|||
[Indexer]
|
||||
type = whoosh
|
||||
path = indexdir
|
||||
|
||||
###############################################################################
|
||||
|
||||
[ZMQ_Global]
|
||||
address = tcp://crf.circl.lu:5556
|
||||
channel = 102
|
||||
|
||||
[ZMQ_Url]
|
||||
address = tcp://127.0.0.1:5004
|
||||
channel = urls
|
||||
|
||||
[RedisPubSub]
|
||||
host = localhost
|
||||
port = 6381
|
||||
db = 0
|
||||
|
|
41
bin/packages/modules.cfg
Normal file
41
bin/packages/modules.cfg
Normal file
|
@ -0,0 +1,41 @@
|
|||
[Global]
|
||||
subscribe = ZMQ_Global
|
||||
publish = Redis_Global
|
||||
|
||||
[Duplicates]
|
||||
subscribe = Redis_Global
|
||||
|
||||
[Indexer]
|
||||
subscribe = Redis_Global
|
||||
|
||||
[Attributes]
|
||||
subscribe = Redis_Global
|
||||
|
||||
[Lines]
|
||||
subscribe = Redis_Global
|
||||
publish = Redis_LinesShort,Redis_LinesLong
|
||||
|
||||
[Tokenize]
|
||||
subscribe = Redis_LinesShort
|
||||
publish = Redis_Words
|
||||
|
||||
[Curve]
|
||||
subscribe = Redis_Words
|
||||
|
||||
[Categ]
|
||||
subscribe = Redis_Words
|
||||
publish = Redis_CreditCards,Redis_Mail,Redis_Onion,Redis_Web
|
||||
|
||||
[CreditCards]
|
||||
subscribe = Redis_CreditCards
|
||||
|
||||
[Mail]
|
||||
subscribe = Redis_Mail
|
||||
|
||||
[Onion]
|
||||
subscribe = Redis_Onion
|
||||
#publish = Redis_Global
|
||||
|
||||
[Web]
|
||||
subscribe = Redis_Web
|
||||
publish = Redis_Url,ZMQ_Url
|
22
bin/run_modules.py
Executable file
22
bin/run_modules.py
Executable file
|
@ -0,0 +1,22 @@
|
|||
#!/usr/bin/env python2
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
|
||||
import ConfigParser
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
if __name__ == '__main__':
|
||||
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg')
|
||||
if not os.path.exists(configfile):
|
||||
raise Exception('Unable to find the configuration file. \
|
||||
Did you set environment variables? \
|
||||
Or activate the virtualenv.')
|
||||
config = ConfigParser.ConfigParser()
|
||||
config.read(configfile)
|
||||
|
||||
modules = config.sections()
|
||||
for module in modules:
|
||||
subprocess.Popen(["python", './QueueIn.py', '-c', module])
|
||||
subprocess.Popen(["python", './QueueOut.py', '-c', module])
|
||||
#subprocess.Popen(["python", './{}.py'.format(module)])
|
|
@ -1,6 +0,0 @@
|
|||
isFinite
|
||||
isNaN
|
||||
eval
|
||||
var
|
||||
parseFloat
|
||||
javascript
|
|
@ -1,6 +0,0 @@
|
|||
../files/mails_categ
|
||||
../files/creditcard_categ
|
||||
../files/pass_categ
|
||||
../files/web_categ
|
||||
../files/javascript_categ
|
||||
../files/onion_categ
|
|
@ -1,23 +0,0 @@
|
|||
123456
|
||||
password
|
||||
12345678
|
||||
qwerty
|
||||
abc123
|
||||
123456789
|
||||
1234567
|
||||
iloveyou
|
||||
adobe123
|
||||
123123
|
||||
admin
|
||||
1234567890
|
||||
letmein
|
||||
photoshop
|
||||
1234
|
||||
monkey
|
||||
shadow
|
||||
sunshine
|
||||
12345
|
||||
password1
|
||||
princess
|
||||
azerty
|
||||
trustno1
|
|
@ -44,7 +44,8 @@ def event_stream():
|
|||
|
||||
def get_queues(r):
|
||||
# We may want to put the llen in a pipeline to do only one query.
|
||||
return [(queue, r.llen(queue)) for queue in r.smembers("queues")]
|
||||
return [(queue, int(card)) for queue, card in
|
||||
r.hgetall("queues").iteritems()]
|
||||
|
||||
|
||||
@app.route("/_logs")
|
||||
|
@ -54,6 +55,7 @@ def logs():
|
|||
|
||||
@app.route("/_stuff", methods=['GET'])
|
||||
def stuff():
|
||||
print get_queues(r_serv)
|
||||
return jsonify(row1=get_queues(r_serv))
|
||||
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ function create_log_table(obj_json) {
|
|||
tr.appendChild(msage);
|
||||
|
||||
if (tr.className == document.getElementById("checkbox_log_info").value && document.getElementById("checkbox_log_info").checked == true) {
|
||||
tableBody.appendChild(tr);
|
||||
tableBody.appendChild(tr);
|
||||
}
|
||||
if (tr.className == document.getElementById("checkbox_log_warning").value && document.getElementById("checkbox_log_warning").checked == true) {
|
||||
tableBody.appendChild(tr);
|
||||
|
@ -156,16 +156,16 @@ $(document).ready(function () {
|
|||
var curves_labels = [];
|
||||
var curves_labels2 = [];
|
||||
var x = new Date();
|
||||
|
||||
|
||||
for (i = 0; i < glob_tabvar.row1.length; i++){
|
||||
if (glob_tabvar.row1[i][0].substring(0,4) != "word"){
|
||||
tmp_tab.push(0);
|
||||
curves_labels.push(glob_tabvar.row1[i][0]);
|
||||
}
|
||||
else {
|
||||
if (glob_tabvar.row1[i][0] == 'Categ' || glob_tabvar.row1[i][0] == 'Curve'){
|
||||
tmp_tab2.push(0);
|
||||
curves_labels2.push(glob_tabvar.row1[i][0]);
|
||||
}
|
||||
else {
|
||||
tmp_tab.push(0);
|
||||
curves_labels.push(glob_tabvar.row1[i][0]);
|
||||
}
|
||||
}
|
||||
tmp_tab.unshift(x);
|
||||
tmp_tab2.unshift(x);
|
||||
|
@ -173,7 +173,7 @@ $(document).ready(function () {
|
|||
curves_labels2.unshift("date");
|
||||
data.push(tmp_tab);
|
||||
data2.push(tmp_tab2);
|
||||
|
||||
|
||||
var g = new Dygraph(document.getElementById("Graph"), data,
|
||||
{
|
||||
labels: curves_labels,
|
||||
|
@ -189,7 +189,7 @@ $(document).ready(function () {
|
|||
fillGraph: true,
|
||||
includeZero: true,
|
||||
});
|
||||
|
||||
|
||||
var g2 = new Dygraph(document.getElementById("Graph2"), data2,
|
||||
{
|
||||
labels: curves_labels2,
|
||||
|
@ -209,7 +209,7 @@ $(document).ready(function () {
|
|||
|
||||
var interval = 1000; //number of mili seconds between each call
|
||||
var refresh = function() {
|
||||
|
||||
|
||||
$.ajax({
|
||||
url: "",
|
||||
cache: false,
|
||||
|
@ -218,23 +218,23 @@ $(document).ready(function () {
|
|||
setTimeout(function() {
|
||||
var x = new Date();
|
||||
var tmp_values = [];
|
||||
var tmp_values2 = [];
|
||||
var tmp_values2 = [];
|
||||
refresh();
|
||||
update_values();
|
||||
create_queue_table();
|
||||
|
||||
|
||||
|
||||
|
||||
for (i = 0; i < (glob_tabvar.row1).length; i++){
|
||||
if (glob_tabvar.row1[i][0].substring(0,4) != "word"){
|
||||
tmp_values.push(glob_tabvar.row1[i][1]);
|
||||
if (glob_tabvar.row1[i][0] == 'Categ' || glob_tabvar.row1[i][0] == 'Curve'){
|
||||
tmp_values2.push(glob_tabvar.row1[i][1]);
|
||||
}
|
||||
else {
|
||||
tmp_values2.push(glob_tabvar.row1[i][1]);
|
||||
tmp_values.push(glob_tabvar.row1[i][1]);
|
||||
}
|
||||
}
|
||||
tmp_values.unshift(x);
|
||||
data.push(tmp_values);
|
||||
|
||||
|
||||
tmp_values2.unshift(x);
|
||||
data2.push(tmp_values2);
|
||||
|
||||
|
@ -244,8 +244,8 @@ $(document).ready(function () {
|
|||
}
|
||||
g.updateOptions( { 'file': data } );
|
||||
g2.updateOptions( { 'file': data2 } );
|
||||
|
||||
|
||||
|
||||
|
||||
// TagCanvas.Reload('myCanvas');
|
||||
|
||||
}, interval);
|
||||
|
|
Loading…
Reference in a new issue