From 347588fdec95384dd2d616e549d2c541a771298c Mon Sep 17 00:00:00 2001 From: Terrtia Date: Thu, 12 Apr 2018 17:06:57 +0200 Subject: [PATCH] python3.5 Mixer and Global module --- bin/Global.py | 15 +++++---- bin/Helper.py | 73 ++++++++++++++++++++++------------------- bin/LAUNCH.sh | 90 +++++++++++++++++++++++++-------------------------- bin/Mixer.py | 33 +++++++++++-------- 4 files changed, 112 insertions(+), 99 deletions(-) diff --git a/bin/Global.py b/bin/Global.py index bab45b47..c9daeef8 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3.5 # -*-coding:UTF-8 -* """ The ZMQ_Feed_Q Module @@ -44,6 +44,7 @@ if __name__ == '__main__': while True: message = p.get_from_set() + #print(message) # Recovering the streamed message informations. if message is not None: splitted = message.split() @@ -51,14 +52,14 @@ if __name__ == '__main__': paste, gzip64encoded = splitted else: # TODO Store the name of the empty paste inside a Redis-list. - print "Empty Paste: not processed" + print("Empty Paste: not processed") publisher.debug("Empty Paste: {0} not processed".format(message)) continue else: - print "Empty Queues: Waiting..." + print("Empty Queues: Waiting...") if int(time.time() - time_1) > 30: to_print = 'Global; ; ; ;glob Processed {0} paste(s)'.format(processed_paste) - print to_print + print(to_print) #publisher.info(to_print) time_1 = time.time() processed_paste = 0 @@ -66,12 +67,14 @@ if __name__ == '__main__': continue # Creating the full filepath filename = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "pastes"), paste) + p.config.get("Directories", "pastes"), paste.decode('utf8')) + #print(filename) dirname = os.path.dirname(filename) if not os.path.exists(dirname): os.makedirs(dirname) with open(filename, 'wb') as f: f.write(base64.standard_b64decode(gzip64encoded)) - p.populate_set_out(filename) + + p.populate_set_out(filename.encode('utf8')) processed_paste+=1 diff --git a/bin/Helper.py b/bin/Helper.py index 2560f340..38e1b85a 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3.5 # -*-coding:UTF-8 -* """ Queue helper module @@ -12,11 +12,7 @@ the same Subscriber name in both of them. """ import redis -try: # dirty to support python3 - import ConfigParser -except: - import configparser - ConfigParser = configparser +import configparser import os import zmq import time @@ -32,7 +28,7 @@ class PubSub(object): raise Exception('Unable to find the configuration file. \ Did you set environment variables? \ Or activate the virtualenv.') - self.config = ConfigParser.ConfigParser() + self.config = configparser.ConfigParser() self.config.read(configfile) self.redis_sub = False self.zmq_sub = False @@ -61,7 +57,7 @@ class PubSub(object): for address in addresses.split(','): new_sub = context.socket(zmq.SUB) new_sub.connect(address) - new_sub.setsockopt(zmq.SUBSCRIBE, channel) + new_sub.setsockopt_string(zmq.SUBSCRIBE, channel) self.subscribers.append(new_sub) def setup_publish(self, conn_name): @@ -81,7 +77,7 @@ class PubSub(object): self.publishers['ZMQ'].append((p, channel)) def publish(self, message): - m = json.loads(message) + m = json.loads(message.decode('utf8')) channel_message = m.get('channel') for p, channel in self.publishers['Redis']: if channel_message is None or channel_message == channel: @@ -100,7 +96,7 @@ class PubSub(object): for sub in self.subscribers: try: msg = sub.recv(zmq.NOBLOCK) - yield msg.split(' ', 1)[1] + yield msg.split(b" ", 1)[1] except zmq.error.Again as e: time.sleep(0.2) pass @@ -117,9 +113,9 @@ class Process(object): Did you set environment variables? \ Or activate the virtualenv.') modulesfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') - self.config = ConfigParser.ConfigParser() + self.config = configparser.ConfigParser() self.config.read(configfile) - self.modules = ConfigParser.ConfigParser() + self.modules = configparser.ConfigParser() self.modules.read(modulesfile) self.subscriber_name = conf_section @@ -135,7 +131,6 @@ class Process(object): self.moduleNum = os.getpid() - def populate_set_in(self): # monoproc src = self.modules.get(self.subscriber_name, 'subscribe') @@ -159,36 +154,46 @@ class Process(object): return None else: - try: - if ".gz" in message: - path = message.split(".")[-2].split("/")[-1] - #find start of path with AIL_HOME - index_s = message.find(os.environ['AIL_HOME']) - #Stop when .gz - index_e = message.find(".gz")+3 + #try: + if b'.gz' in message: + path = message.split(b".")[-2].split(b"/")[-1] + #find start of path with AIL_HOME + index_s = (message.decode('utf8')).find(os.environ['AIL_HOME']) + #Stop when .gz + index_e = message.find(b".gz")+3 + if(index_s == -1): + complete_path = message[0:index_e] + else: complete_path = message[index_s:index_e] - else: - path = "?" - value = str(timestamp) + ", " + path - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path) - self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) - return message - - except: + else: path = "?" - value = str(timestamp) + ", " + path - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) - self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", "?") - self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) - return message + + value = str(timestamp) + ", " + path.decode('utf8') + self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) + self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path) + self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) + return message + + #except: + #print('except') + #path = "?" + #value = str(timestamp) + ", " + path + #self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) + #self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", "?") + #self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) + #return message def populate_set_out(self, msg, channel=None): # multiproc + msg = msg.decode('utf8') msg = {'message': msg} if channel is not None: msg.update({'channel': channel}) + + # TODO use bytes here ? + #j = (json.dumps(msg)).encode('utf8') + j = json.dumps(msg) self.r_temp.sadd(self.subscriber_name + 'out', json.dumps(msg)) def publish(self): diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index d836d87e..84ce8e5c 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -122,57 +122,57 @@ function launching_scripts { sleep 0.1 screen -S "Script_AIL" -X screen -t "Mixer" bash -c 'python3 Mixer.py; read x' sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Global" bash -c './Global.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Duplicates" bash -c './Duplicates.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Attributes" bash -c './Attributes.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Lines" bash -c './Lines.py; read x' - #sleep 0.1 + screen -S "Script_AIL" -X screen -t "Global" bash -c 'python3 Global.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Duplicates" bash -c 'python3 Duplicates.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Attributes" bash -c 'python3 Attributes.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Lines" bash -c 'python3 Lines.py; read x' + sleep 0.1 #screen -S "Script_AIL" -X screen -t "DomClassifier" bash -c './DomClassifier.py; read x' #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Categ" bash -c './Categ.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Tokenize" bash -c './Tokenize.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "CreditCards" bash -c './CreditCards.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Onion" bash -c './Onion.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Mail" bash -c './Mail.py; read x' - #sleep 0.1 + screen -S "Script_AIL" -X screen -t "Categ" bash -c 'python3 Categ.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Tokenize" bash -c 'python3 Tokenize.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "CreditCards" bash -c 'python3 CreditCards.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Onion" bash -c './Onion.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Mail" bash -c './Mail.py; read x' + sleep 0.1 #screen -S "Script_AIL" -X screen -t "Web" bash -c './Web.py; read x' #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Credential" bash -c './Credential.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Curve" bash -c './Curve.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "CurveManageTopSets" bash -c './CurveManageTopSets.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "RegexForTermsFrequency" bash -c './RegexForTermsFrequency.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "SetForTermsFrequency" bash -c './SetForTermsFrequency.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Indexer" bash -c './Indexer.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Keys" bash -c './Keys.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Phone" bash -c './Phone.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Release" bash -c './Release.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "Cve" bash -c './Cve.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "WebStats" bash -c './WebStats.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "ModuleStats" bash -c './ModuleStats.py; read x' - #sleep 0.1 + screen -S "Script_AIL" -X screen -t "Credential" bash -c './Credential.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Curve" bash -c './Curve.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "CurveManageTopSets" bash -c './CurveManageTopSets.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "RegexForTermsFrequency" bash -c './RegexForTermsFrequency.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "SetForTermsFrequency" bash -c './SetForTermsFrequency.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Indexer" bash -c './Indexer.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Keys" bash -c './Keys.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Phone" bash -c './Phone.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Release" bash -c './Release.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "Cve" bash -c './Cve.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "WebStats" bash -c './WebStats.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "ModuleStats" bash -c './ModuleStats.py; read x' + sleep 0.1 #screen -S "Script_AIL" -X screen -t "SQLInjectionDetection" bash -c './SQLInjectionDetection.py; read x' #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "alertHandler" bash -c './alertHandler.py; read x' - #sleep 0.1 - #screen -S "Script_AIL" -X screen -t "SentimentAnalysis" bash -c './SentimentAnalysis.py; read x' + screen -S "Script_AIL" -X screen -t "alertHandler" bash -c './alertHandler.py; read x' + sleep 0.1 + screen -S "Script_AIL" -X screen -t "SentimentAnalysis" bash -c './SentimentAnalysis.py; read x' } diff --git a/bin/Mixer.py b/bin/Mixer.py index 1b3138d4..b9bb33c2 100755 --- a/bin/Mixer.py +++ b/bin/Mixer.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3.5 # -*-coding:UTF-8 -* """ The Mixer Module @@ -35,7 +35,7 @@ import os import time from pubsublogger import publisher import redis -import ConfigParser +import configparser from Helper import Process @@ -58,7 +58,7 @@ if __name__ == '__main__': Did you set environment variables? \ Or activate the virtualenv.') - cfg = ConfigParser.ConfigParser() + cfg = configparser.ConfigParser() cfg.read(configfile) # REDIS # @@ -89,9 +89,13 @@ if __name__ == '__main__': splitted = message.split() if len(splitted) == 2: complete_paste, gzip64encoded = splitted + try: - feeder_name, paste_name = complete_paste.split('>') - feeder_name.replace(" ","") + feeder_name = ( complete_paste.replace(b"archive/",b"") ).split(b"/")[0] + + # TODO take real name ? + paste_name = complete_paste + except ValueError as e: feeder_name = "unnamed_feeder" paste_name = complete_paste @@ -105,7 +109,8 @@ if __name__ == '__main__': processed_paste_per_feeder[feeder_name] = 1 duplicated_paste_per_feeder[feeder_name] = 0 - relay_message = "{0} {1}".format(paste_name, gzip64encoded) + relay_message = b" ".join( [paste_name, gzip64encoded] ) + digest = hashlib.sha1(gzip64encoded).hexdigest() # Avoid any duplicate coming from any sources @@ -173,26 +178,26 @@ if __name__ == '__main__': else: # TODO Store the name of the empty paste inside a Redis-list. - print "Empty Paste: not processed" + print("Empty Paste: not processed") publisher.debug("Empty Paste: {0} not processed".format(message)) else: - print "Empty Queues: Waiting..." + print("Empty Queues: Waiting...") if int(time.time() - time_1) > refresh_time: - print processed_paste_per_feeder + print(processed_paste_per_feeder) to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time) - print to_print + print(to_print) publisher.info(to_print) processed_paste = 0 - for feeder, count in processed_paste_per_feeder.iteritems(): + for feeder, count in processed_paste_per_feeder.items(): to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) - print to_print + print(to_print) publisher.info(to_print) processed_paste_per_feeder[feeder] = 0 - for feeder, count in duplicated_paste_per_feeder.iteritems(): + for feeder, count in duplicated_paste_per_feeder.items(): to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time) - print to_print + print(to_print) publisher.info(to_print) duplicated_paste_per_feeder[feeder] = 0