From 1397db9691671f2be9b6a18cb575835378e3c7f9 Mon Sep 17 00:00:00 2001 From: Alexandre Dulaunoy Date: Mon, 8 Sep 2014 11:07:45 +0200 Subject: [PATCH 1/4] Global queue for DomainClassifier --- bin/ZMQ_Sub_DomainClassifier.py | 87 +++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100755 bin/ZMQ_Sub_DomainClassifier.py diff --git a/bin/ZMQ_Sub_DomainClassifier.py b/bin/ZMQ_Sub_DomainClassifier.py new file mode 100755 index 00000000..a0b65ec1 --- /dev/null +++ b/bin/ZMQ_Sub_DomainClassifier.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + +""" +The ZMQ_Sub_DomainClassifier Module +============================ + +The ZMQ_Sub_DomainClassifier modules is fetching the list of files to be processed +and index each file with a full-text indexer (Whoosh until now). + +""" +import redis +import ConfigParser +import time +from packages import Paste +from packages import ZMQ_PubSub +from pubsublogger import publisher + +import DomainClassifier.domainclassifier +import os + +configfile = './packages/config.cfg' + + +def main(): + """Main Function""" + + # CONFIG # + cfg = ConfigParser.ConfigParser() + cfg.read(configfile) + + # Redis + r_serv1 = redis.StrictRedis( + host=cfg.get("Redis_Queues", "host"), + port=cfg.getint("Redis_Queues", "port"), + db=cfg.getint("Redis_Queues", "db")) + + # LOGGING # + publisher.channel = "Script" + + # ZMQ # + # Subscriber + channel = cfg.get("PubSub_Global", "channel") + subscriber_name = "DomainClassifier" + subscriber_config_section = "PubSub_Global" + + sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + + # FUNCTIONS # + publisher.info("""ZMQ DomainClassifier is Running""") + c = DomainClassifier.domainclassifier.Extract(rawtext="") + + while True: + try: + message = sub.get_msg_from_queue(r_serv1) + + if message is not None: + PST = Paste.Paste(message.split(" ", -1)[-1]) + else: + if r_serv1.sismember("SHUTDOWN_FLAGS", "Indexer"): + r_serv1.srem("SHUTDOWN_FLAGS", "Indexer") + publisher.warning("Shutdown Flag Up: Terminating.") + break + publisher.debug("Script DomainClassifier is idling 10s") + time.sleep(1) + continue + docpath = message.split(" ", -1)[-1] + paste = PST.get_p_content() + mimetype = PST._get_p_encoding() + if mimetype == "text/plain": + c.text(rawtext=paste) + c.potentialdomain() + c.validdomain(rtype=['A'],extended=True) + localizeddomains = c.include(expression=r'\.lu$') + if localizeddomains: + print (localizeddomains) + localizeddomains = c.localizedomain(cc='LU') + if localizeddomains: + print (localizeddomains) + except IOError: + print "CRC Checksum Failed on :", PST.p_path + publisher.error('Duplicate;{};{};{};CRC Checksum Failed'.format(PST.p_source, PST.p_date, PST.p_name)) + pass + + +if __name__ == "__main__": + main() From 3055b0deae9e44a2b3336ef64bcb56b1ef2839c9 Mon Sep 17 00:00:00 2001 From: Alexandre Dulaunoy Date: Mon, 8 Sep 2014 11:52:34 +0200 Subject: [PATCH 2/4] DomainClassifier requirements added --- pip_packages_requirement.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pip_packages_requirement.txt b/pip_packages_requirement.txt index 75d5d866..c369d7c4 100644 --- a/pip_packages_requirement.txt +++ b/pip_packages_requirement.txt @@ -26,6 +26,8 @@ ipython flask texttable +#DomainClassifier +DomainClassifier #Indexer requirements whoosh From 246621f6638f88b73d61e8a803e649d88a315a2f Mon Sep 17 00:00:00 2001 From: Alexandre Dulaunoy Date: Mon, 8 Sep 2014 16:43:21 +0200 Subject: [PATCH 3/4] First version of the DomainClassifier --- bin/ZMQ_Sub_DomainClassifier.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bin/ZMQ_Sub_DomainClassifier.py b/bin/ZMQ_Sub_DomainClassifier.py index a0b65ec1..e246d011 100755 --- a/bin/ZMQ_Sub_DomainClassifier.py +++ b/bin/ZMQ_Sub_DomainClassifier.py @@ -44,6 +44,9 @@ def main(): subscriber_name = "DomainClassifier" subscriber_config_section = "PubSub_Global" + cc = cfg.get("PubSub_DomainClassifier", "cc") + cc_tld = cfg.get("PubSub_DomainClassifier", "cc_tld") + sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) # FUNCTIONS # @@ -71,10 +74,10 @@ def main(): c.text(rawtext=paste) c.potentialdomain() c.validdomain(rtype=['A'],extended=True) - localizeddomains = c.include(expression=r'\.lu$') + localizeddomains = c.include(expression=cc_tld) if localizeddomains: print (localizeddomains) - localizeddomains = c.localizedomain(cc='LU') + localizeddomains = c.localizedomain(cc=cc) if localizeddomains: print (localizeddomains) except IOError: From de6e21d5a774cf9efd96492c5d9db3b1d2df6f71 Mon Sep 17 00:00:00 2001 From: Alexandre Dulaunoy Date: Mon, 8 Sep 2014 16:44:05 +0200 Subject: [PATCH 4/4] DomainClassifier sample configuration added --- bin/packages/config.cfg.sample | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index 2483e6c9..f274fc55 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -67,6 +67,10 @@ channel = urls # country code logged as critical cc_critical = DE +[PubSub_DomainClassifier] +cc = DE +cc_tld = r'\.de$' + # Indexer configuration [Indexer] type = whoosh