diff --git a/bin/ZMQ_Sub_Indexer.py b/bin/ZMQ_Sub_Indexer.py new file mode 100755 index 00000000..2fd91a88 --- /dev/null +++ b/bin/ZMQ_Sub_Indexer.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* + +""" +The ZMQ_Sub_Indexer Module +============================ + +The ZMQ_Sub_Indexer modules is fetching the list of files to be processed +and index each file with a full-text indexer (Whoosh until now). + +""" +import redis, zmq, ConfigParser, time +from packages import Paste as P +from packages import ZMQ_PubSub +from pubsublogger import publisher + +from whoosh.index import create_in, exists_in, open_dir +from whoosh.fields import * +import os + +configfile = './packages/config.cfg' + +def main(): + """Main Function""" + + # CONFIG # + cfg = ConfigParser.ConfigParser() + cfg.read(configfile) + + # Redis + r_serv1 = redis.StrictRedis( + host = cfg.get("Redis_Queues", "host"), + port = cfg.getint("Redis_Queues", "port"), + db = cfg.getint("Redis_Queues", "db")) + + # Indexer configuration - index dir and schema setup + indexpath = cfg.get("Indexer", "path") + indexertype = cfg.get("Indexer", "type") + if indexertype == "whoosh": + schema = Schema(title=TEXT(stored=True), path=ID(stored=True,unique=True), content=TEXT) + + if not os.path.exists(indexpath): + os.mkdir(indexpath) + + if not exists_in(indexpath): + ix = create_in(indexpath, schema) + else: + ix = open_dir(indexpath) + + # LOGGING # + publisher.channel = "Script" + + # ZMQ # + #Subscriber + channel = cfg.get("PubSub_Global", "channel") + subscriber_name = "indexer" + subscriber_config_section = "PubSub_Global" + + Sub = ZMQ_PubSub.ZMQSub(configfile, subscriber_config_section, channel, subscriber_name) + + # FUNCTIONS # + publisher.info("""ZMQ Indexer is Running""") + + while True: + try: + message = Sub.get_msg_from_queue(r_serv1) + + if message != None: + PST = P.Paste(message.split(" ",-1)[-1]) + else: + if r_serv1.sismember("SHUTDOWN_FLAGS", "Indexer"): + r_serv1.srem("SHUTDOWN_FLAGS", "Indexer") + publisher.warning("Shutdown Flag Up: Terminating.") + break + publisher.debug("Script Indexer is idling 10s") + time.sleep(1) + continue + docpath = message.split(" ",-1)[-1] + paste = PST.get_p_content() + print "Indexing :", docpath + if indexertype == "whoosh": + indexwriter = ix.writer() + indexwriter.update_document(title=unicode(docpath, errors='ignore'),path=unicode(docpath, errors='ignore'),content=unicode(paste, errors='ignore')) + indexwriter.commit() + except IOError: + print "CRC Checksum Failed on :", PST.p_path + publisher.error('{0};{1};{2};{3};{4}'.format("Duplicate", PST.p_source, PST.p_date, PST.p_name, "CRC Checksum Failed" )) + pass + + +if __name__ == "__main__": + main() diff --git a/bin/ZMQ_Sub_Indexer_Q.py b/bin/ZMQ_Sub_Indexer_Q.py new file mode 100755 index 00000000..a23f8ad0 --- /dev/null +++ b/bin/ZMQ_Sub_Indexer_Q.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python2 +# -*-coding:UTF-8 -* +""" +The ZMQ_Sub_Indexer_Q Module +============================ + +The ZMQ_Sub_Indexer_Q module subscribes to PubSub_Global ZMQ channel +and bufferizes the data in a Redis FIFO. + +The FIFO will be then processed by the Indexer scripts (ZMQ_Sub_Indexer) +handling the indexing process of the files seen. + +""" + +import redis, zmq, ConfigParser +from pubsublogger import publisher +from packages import ZMQ_PubSub + +configfile = './packages/config.cfg' + +def main(): + """Main Function""" + + # CONFIG # + cfg = ConfigParser.ConfigParser() + cfg.read(configfile) + + # REDIS # + r_serv = redis.StrictRedis( + host = cfg.get("Redis_Queues", "host"), + port = cfg.getint("Redis_Queues", "port"), + db = cfg.getint("Redis_Queues", "db")) + + # LOGGING # + publisher.channel = "Queuing" + + # ZMQ # + channel = cfg.get("PubSub_Global", "channel") + subscriber_name = "indexer" + + Sub = ZMQ_PubSub.ZMQSub(configfile, "PubSub_Global", channel, subscriber_name) + + publisher.info("""Suscribed to channel {0}""".format(channel)) + + # Until the service is requested to be shutdown, the service + # will get the data from the global ZMQ queue and buffer it in Redis. + + while True: + Sub.get_and_lpush(r_serv) + + if r_serv.sismember("SHUTDOWN_FLAGS", "Indexer_Q"): + r_serv.srem("SHUTDOWN_FLAGS", "Indexer_Q") + print "Shutdown Flag Up: Terminating" + publisher.warning("Shutdown Flag Up: Terminating.") + break + +if __name__ == "__main__": + main() diff --git a/bin/packages/config.cfg.sample b/bin/packages/config.cfg.sample index fd43b98b..0d5881e0 100644 --- a/bin/packages/config.cfg.sample +++ b/bin/packages/config.cfg.sample @@ -59,3 +59,8 @@ adress = tcp://127.0.0.1:5003 [PubSub_Url] adress = tcp://127.0.0.1:5004 channel = urls + +# Indexer configuration +[Indexer] +type = whoosh +path = /home/user/indexdir