From 7542eaf739dbe0def96a525445bbfee16c24ceb6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vinot?= Date: Tue, 2 Sep 2014 15:21:36 +0200 Subject: [PATCH] Update starting script. --- bin/{Attributes.py => Attribute.py} | 0 bin/{CreditCards.py => CreditCard.py} | 0 bin/{Duplicates.py => Duplicate.py} | 0 bin/Helper.py | 3 +- bin/LAUNCH.sh | 50 ++++++--------------- bin/{Lines.py => Line.py} | 0 bin/Onion.py | 6 ++- bin/{Web.py => Url.py} | 0 bin/launch_queues.py | 65 +++++++++++++++++++++++++++ bin/run_modules.py | 22 --------- var/www/Flask_server.py | 1 - 11 files changed, 84 insertions(+), 63 deletions(-) rename bin/{Attributes.py => Attribute.py} (100%) rename bin/{CreditCards.py => CreditCard.py} (100%) rename bin/{Duplicates.py => Duplicate.py} (100%) rename bin/{Lines.py => Line.py} (100%) rename bin/{Web.py => Url.py} (100%) create mode 100755 bin/launch_queues.py delete mode 100755 bin/run_modules.py diff --git a/bin/Attributes.py b/bin/Attribute.py similarity index 100% rename from bin/Attributes.py rename to bin/Attribute.py diff --git a/bin/CreditCards.py b/bin/CreditCard.py similarity index 100% rename from bin/CreditCards.py rename to bin/CreditCard.py diff --git a/bin/Duplicates.py b/bin/Duplicate.py similarity index 100% rename from bin/Duplicates.py rename to bin/Duplicate.py diff --git a/bin/Helper.py b/bin/Helper.py index f04cd4c7..78a1c94f 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -45,9 +45,8 @@ class PubSub(object): host=self.config.get('RedisPubSub', 'host'), port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) - self.subscriber = r.pubsub() + self.subscriber = r.pubsub(ignore_subscribe_messages=True) self.subscriber.psubscribe(channel) - self.subscriber.get_message() elif conn_name.startswith('ZMQ'): self.zmq_sub = True context = zmq.Context() diff --git a/bin/LAUNCH.sh b/bin/LAUNCH.sh index d7424603..981d32e6 100755 --- a/bin/LAUNCH.sh +++ b/bin/LAUNCH.sh @@ -90,30 +90,8 @@ function launching_queues { screen -dmS "Queue" sleep 0.1 - echo -e $GREEN"\t* Launching redis ZMQ queues"$DEFAULT - screen -S "Queue" -X screen -t "QFeed" bash -c './ZMQ_Feed_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "QDuplicate" bash -c './ZMQ_Sub_Duplicate_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "QAttributes" bash -c './ZMQ_Sub_Attributes_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "Qlines" bash -c './ZMQ_PubSub_Lines_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "QCateg" bash -c './ZMQ_PubSub_Categ_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "QTokenize" bash -c './ZMQ_PubSub_Tokenize_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "Qcreditcard" bash -c './ZMQ_Sub_CreditCards_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "QOnion" bash -c './ZMQ_Sub_Onion_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "Qmails" bash -c './ZMQ_Sub_Mails_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "Qurls" bash -c './ZMQ_Sub_Urls_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "QCurve" bash -c './ZMQ_Sub_Curve_Q.py; read x' - sleep 0.1 - screen -S "Queue" -X screen -t "QIndexer" bash -c './ZMQ_Sub_Indexer_Q.py; read x' + echo -e $GREEN"\t* Launching all the queues"$DEFAULT + screen -S "Queue" -X screen -t "Queues" bash -c './launch_queues.py; read x' } function launching_scripts { @@ -122,29 +100,29 @@ function launching_scripts { echo -e $GREEN"\t* Launching ZMQ scripts"$DEFAULT - screen -S "Script" -X screen -t "Feed" bash -c './ZMQ_Feed.py; read x' + screen -S "Script" -X screen -t "Global" bash -c './Global.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Duplicate" bash -c './ZMQ_Sub_Duplicate.py; read x' + screen -S "Script" -X screen -t "Duplicate" bash -c './Duplicate.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Attributes" bash -c './ZMQ_Sub_Attributes.py; read x' + screen -S "Script" -X screen -t "Attribute" bash -c './Attribute.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Lines" bash -c './ZMQ_PubSub_Lines.py; read x' + screen -S "Script" -X screen -t "Line" bash -c './Line.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Categ" bash -c './ZMQ_PubSub_Categ.py; read x' + screen -S "Script" -X screen -t "Categ" bash -c './Categ.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Tokenize" bash -c './ZMQ_PubSub_Tokenize.py; read x' + screen -S "Script" -X screen -t "Tokenize" bash -c './Tokenize.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Creditcard" bash -c './ZMQ_Sub_CreditCards.py; read x' + screen -S "Script" -X screen -t "CreditCard" bash -c './CreditCard.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Onion" bash -c './ZMQ_Sub_Onion.py; read x' + screen -S "Script" -X screen -t "Onion" bash -c './Onion.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Mails" bash -c './ZMQ_Sub_Mails.py; read x' + screen -S "Script" -X screen -t "Mail" bash -c './Mail.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Urls" bash -c './ZMQ_Sub_Urls.py; read x' + screen -S "Script" -X screen -t "Url" bash -c './Url.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Curve" bash -c './ZMQ_Sub_Curve.py; read x' + screen -S "Script" -X screen -t "Curve" bash -c './Curve.py; read x' sleep 0.1 - screen -S "Script" -X screen -t "Indexer" bash -c './ZMQ_Sub_Indexer.py; read x' + screen -S "Script" -X screen -t "Indexer" bash -c './Indexer.py; read x' } #If no params, display the help diff --git a/bin/Lines.py b/bin/Line.py similarity index 100% rename from bin/Lines.py rename to bin/Line.py diff --git a/bin/Onion.py b/bin/Onion.py index 525507e8..b9166abb 100755 --- a/bin/Onion.py +++ b/bin/Onion.py @@ -35,8 +35,9 @@ from Helper import Process def fetch(p, r_cache, urls, domains, path): + failed = [] for url, domain in zip(urls, domains): - if r_cache.exists(url): + if r_cache.exists(url) or url in failed: continue to_fetch = base64.standard_b64encode(url) process = subprocess.Popen(["python", './tor_fetcher.py', to_fetch], @@ -64,6 +65,7 @@ def fetch(p, r_cache, urls, domains, path): yield url os.unlink(tempfile) else: + failed.append(url) print 'Failed at downloading', url print process.stdout.read() @@ -136,7 +138,7 @@ if __name__ == "__main__": PST.p_date, PST.p_name) for url in fetch(p, r_cache, urls, domains_list, path): - publisher.warning('{}Valid: {}'.format(to_print, url)) + publisher.warning('{}Checked {}'.format(to_print, url)) else: publisher.info('{}Onion related'.format(to_print)) diff --git a/bin/Web.py b/bin/Url.py similarity index 100% rename from bin/Web.py rename to bin/Url.py diff --git a/bin/launch_queues.py b/bin/launch_queues.py new file mode 100755 index 00000000..05adf774 --- /dev/null +++ b/bin/launch_queues.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + + +import ConfigParser +import os +import subprocess +import time + + +def check_pid(pid): + if pid is None: + # Already seen as finished. + return None + else: + if pid.poll() is not None: + return False + return True + +if __name__ == '__main__': + configfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') + if not os.path.exists(configfile): + raise Exception('Unable to find the configuration file. \ + Did you set environment variables? \ + Or activate the virtualenv.') + config = ConfigParser.ConfigParser() + config.read(configfile) + + modules = config.sections() + pids = {} + for module in modules: + pin = subprocess.Popen(["python", './QueueIn.py', '-c', module]) + pout = subprocess.Popen(["python", './QueueOut.py', '-c', module]) + pids[module] = (pin, pout) + is_running = True + try: + while is_running: + time.sleep(5) + is_running = False + for module, p in pids.iteritems(): + pin, pout = p + if pin is None: + # already dead + pass + elif not check_pid(pin): + print(module, 'input queue finished.') + pin = None + else: + is_running = True + if pout is None: + # already dead + pass + elif not check_pid(pout): + print(module, 'output queue finished.') + pout = None + else: + is_running = True + pids[module] = (pin, pout) + except KeyboardInterrupt: + for module, p in pids.iteritems(): + pin, pout = p + if pin is not None: + pin.kill() + if pout is not None: + pout.kill() diff --git a/bin/run_modules.py b/bin/run_modules.py deleted file mode 100755 index 7792c82d..00000000 --- a/bin/run_modules.py +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env python2 -# -*-coding:UTF-8 -* - - -import ConfigParser -import os -import subprocess - -if __name__ == '__main__': - configfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') - if not os.path.exists(configfile): - raise Exception('Unable to find the configuration file. \ - Did you set environment variables? \ - Or activate the virtualenv.') - config = ConfigParser.ConfigParser() - config.read(configfile) - - modules = config.sections() - for module in modules: - subprocess.Popen(["python", './QueueIn.py', '-c', module]) - subprocess.Popen(["python", './QueueOut.py', '-c', module]) - #subprocess.Popen(["python", './{}.py'.format(module)]) diff --git a/var/www/Flask_server.py b/var/www/Flask_server.py index 01d54480..5171ba3b 100755 --- a/var/www/Flask_server.py +++ b/var/www/Flask_server.py @@ -55,7 +55,6 @@ def logs(): @app.route("/_stuff", methods=['GET']) def stuff(): - print get_queues(r_serv) return jsonify(row1=get_queues(r_serv))