ail-framework/bin/Tokenize.py
Alexandre Dulaunoy 3b101ea8f5 (partially) Fix #91 using a simple Alarm (SIGNAL) when exec-timeout
Introducing a timer (in this case 5 seconds) to ensure that the
execution time of the tokenizer takes less than 5 seconds. This
is a simple and standard POSIX signal handler.

This approach fixes the specific issues we have currently
with some inputs where the tokenization takes too much time. This
fix should be improved and be more generic:

 - Introducing statistics of content which timeouts.
 - Keeping a list/queue to further process those files using a different
   tokenizer approach. Maybe a set of "dirty" processes to handle the edge cases
   and to not impact the overall processing and analysis.
 - Make the timer configurable per module (at least for this one).
2017-01-12 07:32:55 +00:00

70 lines
1.8 KiB
Python
Executable file

#!/usr/bin/env python
# -*-coding:UTF-8 -*
"""
The ZMQ_PubSub_Lines Module
============================
This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q
Module.
It tokenize the content of the paste and publish the result in the following
format:
channel_name+' '+/path/of/the/paste.gz+' '+tokenized_word+' '+scoring
..seealso:: Paste method (_get_top_words)
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
the same Subscriber name in both of them.
Requirements
------------
*Need running Redis instances. (Redis)
*Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly.
"""
import time
from packages import Paste
from pubsublogger import publisher
from Helper import Process
import signal
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException
signal.signal(signal.SIGALRM, timeout_handler)
if __name__ == "__main__":
publisher.port = 6380
publisher.channel = "Script"
config_section = 'Tokenize'
p = Process(config_section)
# LOGGING #
publisher.info("Tokeniser started")
while True:
message = p.get_from_set()
print message
if message is not None:
paste = Paste.Paste(message)
signal.alarm(5)
try:
for word, score in paste._get_top_words().items():
if len(word) >= 4:
msg = '{} {} {}'.format(paste.p_path, word, score)
p.populate_set_out(msg)
except TimeoutException:
print ("{0} processing timeout".format(paste.p_path))
continue
else:
signal.alarm(0)
else:
publisher.debug("Tokeniser is idling 10s")
time.sleep(10)
print "sleepin"