Merge remote-tracking branch 'upstream/master' into production

This commit is contained in:
Mokaddem 2017-02-14 09:31:18 +01:00
commit 303575b2a1

View file

@ -68,9 +68,9 @@ def Analyse(message, server):
the_time = datetime.time(getattr(the_time, 'hour'), 0, 0) the_time = datetime.time(getattr(the_time, 'hour'), 0, 0)
combined_datetime = datetime.datetime.combine(the_date, the_time) combined_datetime = datetime.datetime.combine(the_date, the_time)
timestamp = calendar.timegm(combined_datetime.timetuple()) timestamp = calendar.timegm(combined_datetime.timetuple())
sentences = tokenize.sent_tokenize(p_content.decode('utf-8', 'ignore')) sentences = tokenize.sent_tokenize(p_content.decode('utf-8', 'ignore'))
if len(sentences) > 0: if len(sentences) > 0:
avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0} avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0}
neg_line = 0 neg_line = 0
@ -88,8 +88,8 @@ def Analyse(message, server):
pos_line += 1 pos_line += 1
else: else:
avg_score[k] += ss[k] avg_score[k] += ss[k]
for k in avg_score: for k in avg_score:
if k == 'compoundPos': if k == 'compoundPos':
avg_score[k] = avg_score[k] / (pos_line if pos_line > 0 else 1) avg_score[k] = avg_score[k] / (pos_line if pos_line > 0 else 1)
@ -97,15 +97,15 @@ def Analyse(message, server):
avg_score[k] = avg_score[k] / (neg_line if neg_line > 0 else 1) avg_score[k] = avg_score[k] / (neg_line if neg_line > 0 else 1)
else: else:
avg_score[k] = avg_score[k] / len(sentences) avg_score[k] = avg_score[k] / len(sentences)
# In redis-levelDB: {} = set, () = K-V # In redis-levelDB: {} = set, () = K-V
# {Provider_set -> provider_i} # {Provider_set -> provider_i}
# {Provider_TimestampInHour_i -> UniqID_i}_j # {Provider_TimestampInHour_i -> UniqID_i}_j
# (UniqID_i -> PasteValue_i) # (UniqID_i -> PasteValue_i)
server.sadd('Provider_set', provider) server.sadd('Provider_set', provider)
provider_timestamp = provider + '_' + str(timestamp) provider_timestamp = provider + '_' + str(timestamp)
server.incr('UniqID') server.incr('UniqID')
UniqID = server.get('UniqID') UniqID = server.get('UniqID')
@ -114,7 +114,7 @@ def Analyse(message, server):
server.set(UniqID, avg_score) server.set(UniqID, avg_score)
else: else:
print 'Dropped:', p_MimeType print 'Dropped:', p_MimeType
def isJSON(content): def isJSON(content):
try: try:
@ -124,6 +124,16 @@ def isJSON(content):
except Exception,e: except Exception,e:
return False return False
import signal
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException
signal.signal(signal.SIGALRM, timeout_handler)
if __name__ == '__main__': if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger # Port of the redis instance used by pubsublogger
@ -152,6 +162,12 @@ if __name__ == '__main__':
publisher.debug("{} queue is empty, waiting".format(config_section)) publisher.debug("{} queue is empty, waiting".format(config_section))
time.sleep(1) time.sleep(1)
continue continue
signal.alarm(60)
Analyse(message, server) try:
Analyse(message, server)
except TimeoutException:
print ("{0} processing timeout".format(message))
continue
else:
signal.alarm(0)