ail-framework/bin/SentimentAnalysis.py

183 lines
5.8 KiB
Python
Raw Normal View History

2018-05-04 13:53:29 +02:00
#!/usr/bin/env python3
2016-08-11 09:40:42 +02:00
# -*-coding:UTF-8 -*
"""
Sentiment analyser module.
It takes its inputs from 'global'.
2016-08-11 09:40:42 +02:00
The content is analysed if the length of the line is
above a defined threshold (get_p_content_with_removed_lines).
This is done because NLTK sentences tokemnizer (sent_tokenize) seems to crash
for long lines (function _slices_from_text line#1276).
nltk.sentiment.vader module credit:
Hutto, C.J. & Gilbert, E.E. (2014). VADER: A Parsimonious Rule-based Model for Sentiment Analysis of Social Media Text. Eighth International Conference on Weblogs and Social Media (ICWSM-14). Ann Arbor, MI, June 2014.
2016-08-11 09:40:42 +02:00
"""
import time
import datetime
import calendar
import redis
import json
2016-08-11 09:40:42 +02:00
from pubsublogger import publisher
from Helper import Process
from packages import Paste
2016-08-11 09:40:42 +02:00
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from nltk import tokenize
# Config Variables
accepted_Mime_type = ['text/plain']
size_threshold = 250
line_max_length_threshold = 1000
2016-08-11 09:40:42 +02:00
import os
2018-04-16 14:50:04 +02:00
import configparser
configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
2018-04-16 14:50:04 +02:00
cfg = configparser.ConfigParser()
cfg.read(configfile)
sentiment_lexicon_file = cfg.get("Directories", "sentiment_lexicon_file")
2018-11-20 14:39:45 +01:00
#time_clean_sentiment_db = 60*60
def Analyse(message, server):
2016-08-11 09:40:42 +02:00
path = message
paste = Paste.Paste(path)
# get content with removed line + number of them
num_line_removed, p_content = paste.get_p_content_with_removed_lines(line_max_length_threshold)
provider = paste.p_source
p_date = str(paste._get_p_date())
p_MimeType = paste._get_p_encoding()
# Perform further analysis
if p_MimeType == "text/plain":
if isJSON(p_content):
p_MimeType = "JSON"
if p_MimeType in accepted_Mime_type:
the_date = datetime.date(int(p_date[0:4]), int(p_date[4:6]), int(p_date[6:8]))
the_time = datetime.datetime.now()
the_time = datetime.time(getattr(the_time, 'hour'), 0, 0)
combined_datetime = datetime.datetime.combine(the_date, the_time)
timestamp = calendar.timegm(combined_datetime.timetuple())
2018-04-16 14:50:04 +02:00
sentences = tokenize.sent_tokenize(p_content)
if len(sentences) > 0:
avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0}
neg_line = 0
pos_line = 0
sid = SentimentIntensityAnalyzer(sentiment_lexicon_file)
for sentence in sentences:
ss = sid.polarity_scores(sentence)
for k in sorted(ss):
if k == 'compound':
if ss['neg'] > ss['pos']:
avg_score['compoundNeg'] += ss[k]
neg_line += 1
else:
avg_score['compoundPos'] += ss[k]
pos_line += 1
else:
avg_score[k] += ss[k]
for k in avg_score:
if k == 'compoundPos':
avg_score[k] = avg_score[k] / (pos_line if pos_line > 0 else 1)
elif k == 'compoundNeg':
avg_score[k] = avg_score[k] / (neg_line if neg_line > 0 else 1)
else:
avg_score[k] = avg_score[k] / len(sentences)
2018-04-16 14:50:04 +02:00
# In redis-levelDB: {} = set, () = K-V
# {Provider_set -> provider_i}
# {Provider_TimestampInHour_i -> UniqID_i}_j
# (UniqID_i -> PasteValue_i)
server.sadd('Provider_set', provider)
provider_timestamp = provider + '_' + str(timestamp)
server.incr('UniqID')
UniqID = server.get('UniqID')
2018-05-04 13:53:29 +02:00
print(provider_timestamp, '->', UniqID, 'dropped', num_line_removed, 'lines')
server.sadd(provider_timestamp, UniqID)
server.set(UniqID, avg_score)
else:
2018-04-16 14:50:04 +02:00
print('Dropped:', p_MimeType)
def isJSON(content):
try:
json.loads(content)
return True
2018-04-16 14:50:04 +02:00
except Exception:
return False
2016-08-11 09:40:42 +02:00
import signal
class TimeoutException(Exception):
pass
def timeout_handler(signum, frame):
raise TimeoutException
signal.signal(signal.SIGALRM, timeout_handler)
2016-08-11 09:40:42 +02:00
if __name__ == '__main__':
# If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh)
# Port of the redis instance used by pubsublogger
publisher.port = 6380
# Script is the default channel used for the modules.
publisher.channel = 'Script'
# Section name in bin/packages/modules.cfg
config_section = 'SentimentAnalysis'
2016-08-11 09:40:42 +02:00
# Setup the I/O queues
p = Process(config_section)
# Sent to the logging a description of the module
publisher.info("<description of the module>")
# REDIS_LEVEL_DB #
server = redis.StrictRedis(
2018-05-07 14:50:40 +02:00
host=p.config.get("ARDB_Sentiment", "host"),
port=p.config.get("ARDB_Sentiment", "port"),
db=p.config.get("ARDB_Sentiment", "db"),
2018-05-04 13:53:29 +02:00
decode_responses=True)
2018-11-15 10:39:41 +01:00
time1 = time.time()
2016-08-11 09:40:42 +02:00
while True:
message = p.get_from_set()
if message is None:
2018-11-20 14:39:45 +01:00
#if int(time.time() - time1) > time_clean_sentiment_db:
# clean_db()
# time1 = time.time()
# continue
#else:
publisher.debug("{} queue is empty, waiting".format(config_section))
time.sleep(1)
continue
signal.alarm(60)
try:
Analyse(message, server)
except TimeoutException:
p.incr_module_timeout_statistic()
print ("{0} processing timeout".format(message))
continue
else:
signal.alarm(0)