diff --git a/bin/SentimentAnalysis.py b/bin/SentimentAnalysis.py index 09f59e40..d49dcb39 100755 --- a/bin/SentimentAnalysis.py +++ b/bin/SentimentAnalysis.py @@ -54,9 +54,9 @@ def Analyse(message, server): the_time = datetime.time(getattr(the_time, 'hour'), 0, 0) combined_datetime = datetime.datetime.combine(the_date, the_time) timestamp = calendar.timegm(combined_datetime.timetuple()) - + sentences = tokenize.sent_tokenize(p_content.decode('utf-8', 'ignore')) - + if len(sentences) > 0: avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0} neg_line = 0 @@ -74,8 +74,8 @@ def Analyse(message, server): pos_line += 1 else: avg_score[k] += ss[k] - - + + for k in avg_score: if k == 'compoundPos': avg_score[k] = avg_score[k] / (pos_line if pos_line > 0 else 1) @@ -83,15 +83,15 @@ def Analyse(message, server): avg_score[k] = avg_score[k] / (neg_line if neg_line > 0 else 1) else: avg_score[k] = avg_score[k] / len(sentences) - - + + # In redis-levelDB: {} = set, () = K-V # {Provider_set -> provider_i} # {Provider_TimestampInHour_i -> UniqID_i}_j # (UniqID_i -> PasteValue_i) - + server.sadd('Provider_set', provider) - + provider_timestamp = provider + '_' + str(timestamp) server.incr('UniqID') UniqID = server.get('UniqID') @@ -100,7 +100,7 @@ def Analyse(message, server): server.set(UniqID, avg_score) else: print 'Dropped:', p_MimeType - + def isJSON(content): try: @@ -110,6 +110,16 @@ def isJSON(content): except Exception,e: return False +import signal + +class TimeoutException(Exception): + pass + +def timeout_handler(signum, frame): + raise TimeoutException + +signal.signal(signal.SIGALRM, timeout_handler) + if __name__ == '__main__': # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) # Port of the redis instance used by pubsublogger @@ -138,6 +148,12 @@ if __name__ == '__main__': publisher.debug("{} queue is empty, waiting".format(config_section)) time.sleep(1) continue - - Analyse(message, server) + signal.alarm(60) + try: + Analyse(message, server) + except TimeoutException: + print ("{0} processing timeout".format(message)) + continue + else: + signal.alarm(0)