mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-10 08:38:28 +00:00
Alexandre Dulaunoy
3b101ea8f5
Introducing a timer (in this case 5 seconds) to ensure that the execution time of the tokenizer takes less than 5 seconds. This is a simple and standard POSIX signal handler. This approach fixes the specific issues we have currently with some inputs where the tokenization takes too much time. This fix should be improved and be more generic: - Introducing statistics of content which timeouts. - Keeping a list/queue to further process those files using a different tokenizer approach. Maybe a set of "dirty" processes to handle the edge cases and to not impact the overall processing and analysis. - Make the timer configurable per module (at least for this one).
70 lines
1.8 KiB
Python
Executable file
70 lines
1.8 KiB
Python
Executable file
#!/usr/bin/env python
|
|
# -*-coding:UTF-8 -*
|
|
"""
|
|
The ZMQ_PubSub_Lines Module
|
|
============================
|
|
|
|
This module is consuming the Redis-list created by the ZMQ_PubSub_Tokenize_Q
|
|
Module.
|
|
|
|
It tokenize the content of the paste and publish the result in the following
|
|
format:
|
|
channel_name+' '+/path/of/the/paste.gz+' '+tokenized_word+' '+scoring
|
|
|
|
..seealso:: Paste method (_get_top_words)
|
|
|
|
..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put
|
|
the same Subscriber name in both of them.
|
|
|
|
Requirements
|
|
------------
|
|
|
|
*Need running Redis instances. (Redis)
|
|
*Need the ZMQ_PubSub_Tokenize_Q Module running to be able to work properly.
|
|
|
|
"""
|
|
import time
|
|
from packages import Paste
|
|
from pubsublogger import publisher
|
|
|
|
from Helper import Process
|
|
import signal
|
|
|
|
class TimeoutException(Exception):
|
|
pass
|
|
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutException
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
|
|
if __name__ == "__main__":
|
|
publisher.port = 6380
|
|
publisher.channel = "Script"
|
|
|
|
config_section = 'Tokenize'
|
|
p = Process(config_section)
|
|
|
|
# LOGGING #
|
|
publisher.info("Tokeniser started")
|
|
|
|
while True:
|
|
message = p.get_from_set()
|
|
print message
|
|
if message is not None:
|
|
paste = Paste.Paste(message)
|
|
signal.alarm(5)
|
|
try:
|
|
for word, score in paste._get_top_words().items():
|
|
if len(word) >= 4:
|
|
msg = '{} {} {}'.format(paste.p_path, word, score)
|
|
p.populate_set_out(msg)
|
|
except TimeoutException:
|
|
print ("{0} processing timeout".format(paste.p_path))
|
|
continue
|
|
else:
|
|
signal.alarm(0)
|
|
else:
|
|
publisher.debug("Tokeniser is idling 10s")
|
|
time.sleep(10)
|
|
print "sleepin"
|