mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-10 08:38:28 +00:00
Merge branch 'module_handling' into production
This commit is contained in:
commit
c2e5d69e21
6 changed files with 145 additions and 19 deletions
|
@ -17,6 +17,7 @@ Requirements
|
||||||
|
|
||||||
import redis
|
import redis
|
||||||
import time
|
import time
|
||||||
|
import datetime
|
||||||
import copy
|
import copy
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
from packages import lib_words
|
from packages import lib_words
|
||||||
|
@ -87,6 +88,11 @@ def manage_top_set():
|
||||||
for elem in array_month:
|
for elem in array_month:
|
||||||
server_term.zadd(top_termFreq_setName_month[0], float(elem[1]), elem[0])
|
server_term.zadd(top_termFreq_setName_month[0], float(elem[1]), elem[0])
|
||||||
|
|
||||||
|
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
|
||||||
|
value = str(timestamp) + ", " + "-"
|
||||||
|
r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value)
|
||||||
|
print "refreshed module"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -105,6 +111,18 @@ if __name__ == '__main__':
|
||||||
cfg = ConfigParser.ConfigParser()
|
cfg = ConfigParser.ConfigParser()
|
||||||
cfg.read(configfile)
|
cfg.read(configfile)
|
||||||
|
|
||||||
|
|
||||||
|
# For Module Manager
|
||||||
|
r_temp = redis.StrictRedis(
|
||||||
|
host=cfg.get('RedisPubSub', 'host'),
|
||||||
|
port=cfg.getint('RedisPubSub', 'port'),
|
||||||
|
db=cfg.getint('RedisPubSub', 'db'))
|
||||||
|
|
||||||
|
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
|
||||||
|
value = str(timestamp) + ", " + "-"
|
||||||
|
r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value)
|
||||||
|
r_temp.sadd("MODULE_TYPE_"+ "CurveManageTopSets" , str(os.getpid()))
|
||||||
|
|
||||||
server_term = redis.StrictRedis(
|
server_term = redis.StrictRedis(
|
||||||
host=cfg.get("Redis_Level_DB_TermFreq", "host"),
|
host=cfg.get("Redis_Level_DB_TermFreq", "host"),
|
||||||
port=cfg.getint("Redis_Level_DB_TermFreq", "port"),
|
port=cfg.getint("Redis_Level_DB_TermFreq", "port"),
|
||||||
|
|
|
@ -24,27 +24,36 @@ import ConfigParser
|
||||||
import json
|
import json
|
||||||
from terminaltables import AsciiTable
|
from terminaltables import AsciiTable
|
||||||
import textwrap
|
import textwrap
|
||||||
|
from colorama import Fore, Back, Style, init
|
||||||
|
|
||||||
# CONFIG VARIABLES
|
# CONFIG VARIABLES
|
||||||
threshold_stucked_module = 60*60*1 #1 hour
|
threshold_stucked_module = 60*10*1 #1 hour
|
||||||
|
kill_retry_threshold = 60 #1m
|
||||||
log_filename = "../logs/moduleInfo.log"
|
log_filename = "../logs/moduleInfo.log"
|
||||||
command_search_pid = "ps a -o pid,cmd | grep {}"
|
command_search_pid = "ps a -o pid,cmd | grep {}"
|
||||||
command_search_name = "ps a -o pid,cmd | grep {}"
|
command_search_name = "ps a -o pid,cmd | grep {}"
|
||||||
command_restart_module = "screen -S \"Script\" -X screen -t \"{}\" bash -c \"./{}.py; read x\""
|
command_restart_module = "screen -S \"Script\" -X screen -t \"{}\" bash -c \"./{}.py; read x\""
|
||||||
|
|
||||||
|
init() #Necesary for colorama
|
||||||
|
printarrayGlob = [None]*14
|
||||||
|
printarrayGlob.insert(0, ["Time", "Module", "PID", "Action"])
|
||||||
|
lastTimeKillCommand = {}
|
||||||
|
|
||||||
def getPid(module):
|
def getPid(module):
|
||||||
p = Popen([command_search_pid.format(module+".py")], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
|
p = Popen([command_search_pid.format(module+".py")], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
|
||||||
for line in p.stdout:
|
for line in p.stdout:
|
||||||
|
print line
|
||||||
splittedLine = line.split()
|
splittedLine = line.split()
|
||||||
if 'python2' in splittedLine:
|
if 'python2' in splittedLine:
|
||||||
return int(splittedLine[0])
|
return int(splittedLine[0])
|
||||||
else:
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def clearRedisModuleInfo():
|
def clearRedisModuleInfo():
|
||||||
for k in server.keys("MODULE_*"):
|
for k in server.keys("MODULE_*"):
|
||||||
server.delete(k)
|
server.delete(k)
|
||||||
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, "*", "-", "Cleared redis module info"])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
|
||||||
def cleanRedis():
|
def cleanRedis():
|
||||||
for k in server.keys("MODULE_TYPE_*"):
|
for k in server.keys("MODULE_TYPE_*"):
|
||||||
|
@ -60,34 +69,93 @@ def cleanRedis():
|
||||||
if not flag_pid_valid:
|
if not flag_pid_valid:
|
||||||
print flag_pid_valid, 'cleaning', pid, 'in', k
|
print flag_pid_valid, 'cleaning', pid, 'in', k
|
||||||
server.srem(k, pid)
|
server.srem(k, pid)
|
||||||
time.sleep(5)
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, moduleName, pid, "Cleared invalid pid in " + k])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
#time.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
def kill_module(module):
|
def kill_module(module, pid):
|
||||||
print ''
|
print ''
|
||||||
print '-> trying to kill module:', module
|
print '-> trying to kill module:', module
|
||||||
|
|
||||||
|
if pid is None:
|
||||||
|
print 'pid was None'
|
||||||
|
printarrayGlob.insert(1, [0, module, pid, "PID was None"])
|
||||||
|
printarrayGlob.pop()
|
||||||
pid = getPid(module)
|
pid = getPid(module)
|
||||||
|
else: #Verify that the pid is at least in redis
|
||||||
|
if server.exists("MODULE_"+module+"_"+str(pid)) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
lastTimeKillCommand[pid] = int(time.time())
|
||||||
if pid is not None:
|
if pid is not None:
|
||||||
|
try:
|
||||||
os.kill(pid, signal.SIGUSR1)
|
os.kill(pid, signal.SIGUSR1)
|
||||||
|
except OSError:
|
||||||
|
print pid, 'already killed'
|
||||||
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, pid, "Already killed"])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
return
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
if getPid(module) is None:
|
if getPid(module) is None:
|
||||||
print module, 'has been killed'
|
print module, 'has been killed'
|
||||||
print 'restarting', module, '...'
|
print 'restarting', module, '...'
|
||||||
p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
|
p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
|
||||||
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, pid, "Killed"])
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, "?", "Restarted"])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
printarrayGlob.pop()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print 'killing failed, retrying...'
|
print 'killing failed, retrying...'
|
||||||
time.sleep(3)
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, pid, "Killing #1 failed."])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
os.kill(pid, signal.SIGUSR1)
|
os.kill(pid, signal.SIGUSR1)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
if getPid(module) is None:
|
if getPid(module) is None:
|
||||||
print module, 'has been killed'
|
print module, 'has been killed'
|
||||||
print 'restarting', module, '...'
|
print 'restarting', module, '...'
|
||||||
p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
|
p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
|
||||||
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, pid, "Killed"])
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, "?", "Restarted"])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
printarrayGlob.pop()
|
||||||
else:
|
else:
|
||||||
print 'killing failed!'
|
print 'killing failed!'
|
||||||
time.sleep(7)
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, pid, "Killing failed!"])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
else:
|
||||||
|
print 'Module does not exist'
|
||||||
|
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
|
||||||
|
printarrayGlob.insert(1, [inst_time, module, pid, "Killing failed, module not found"])
|
||||||
|
printarrayGlob.pop()
|
||||||
|
#time.sleep(5)
|
||||||
|
cleanRedis()
|
||||||
|
|
||||||
|
def get_color(time, idle):
|
||||||
|
if time is not None:
|
||||||
|
temp = time.split(':')
|
||||||
|
time = int(temp[0])*3600 + int(temp[1])*60 + int(temp[2])
|
||||||
|
|
||||||
|
if time >= threshold_stucked_module:
|
||||||
|
if not idle:
|
||||||
|
return Back.RED + Style.BRIGHT
|
||||||
|
else:
|
||||||
|
return Back.MAGENTA + Style.BRIGHT
|
||||||
|
elif time > threshold_stucked_module/2:
|
||||||
|
return Back.YELLOW + Style.BRIGHT
|
||||||
|
else:
|
||||||
|
return Back.GREEN + Style.BRIGHT
|
||||||
|
else:
|
||||||
|
return Style.RESET_ALL
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -108,6 +176,8 @@ if __name__ == "__main__":
|
||||||
cfg = ConfigParser.ConfigParser()
|
cfg = ConfigParser.ConfigParser()
|
||||||
cfg.read(configfile)
|
cfg.read(configfile)
|
||||||
|
|
||||||
|
threshold_stucked_module = cfg.getint("Module_ModuleInformation", "threshold_stucked_module")
|
||||||
|
|
||||||
# REDIS #
|
# REDIS #
|
||||||
server = redis.StrictRedis(
|
server = redis.StrictRedis(
|
||||||
host=cfg.get("Redis_Queues", "host"),
|
host=cfg.get("Redis_Queues", "host"),
|
||||||
|
@ -120,11 +190,14 @@ if __name__ == "__main__":
|
||||||
lastTime = datetime.datetime.now()
|
lastTime = datetime.datetime.now()
|
||||||
|
|
||||||
module_file_array = set()
|
module_file_array = set()
|
||||||
|
no_info_modules = {}
|
||||||
path_allmod = os.path.join(os.environ['AIL_HOME'], 'doc/all_modules.txt')
|
path_allmod = os.path.join(os.environ['AIL_HOME'], 'doc/all_modules.txt')
|
||||||
with open(path_allmod, 'r') as module_file:
|
with open(path_allmod, 'r') as module_file:
|
||||||
for line in module_file:
|
for line in module_file:
|
||||||
module_file_array.add(line[:-1])
|
module_file_array.add(line[:-1])
|
||||||
|
|
||||||
|
cleanRedis()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
all_queue = set()
|
all_queue = set()
|
||||||
|
@ -135,6 +208,7 @@ if __name__ == "__main__":
|
||||||
all_queue.add(queue)
|
all_queue.add(queue)
|
||||||
key = "MODULE_" + queue + "_"
|
key = "MODULE_" + queue + "_"
|
||||||
keySet = "MODULE_TYPE_" + queue
|
keySet = "MODULE_TYPE_" + queue
|
||||||
|
array_module_type = []
|
||||||
|
|
||||||
for moduleNum in server.smembers(keySet):
|
for moduleNum in server.smembers(keySet):
|
||||||
value = server.get(key + str(moduleNum))
|
value = server.get(key + str(moduleNum))
|
||||||
|
@ -148,20 +222,41 @@ if __name__ == "__main__":
|
||||||
if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module:
|
if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module:
|
||||||
log = open(log_filename, 'a')
|
log = open(log_filename, 'a')
|
||||||
log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n")
|
log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n")
|
||||||
if args.autokill == 1:
|
try:
|
||||||
kill_module(queue)
|
last_kill_try = time.time() - lastTimeKillCommand[moduleNum]
|
||||||
|
except KeyError:
|
||||||
|
last_kill_try = kill_retry_threshold+1
|
||||||
|
if args.autokill == 1 and last_kill_try > kill_retry_threshold :
|
||||||
|
kill_module(queue, int(moduleNum))
|
||||||
|
|
||||||
printarray1.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)])
|
array_module_type.append([get_color(processed_time_readable, False) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, False)])
|
||||||
|
|
||||||
else:
|
else:
|
||||||
printarray2.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)])
|
printarray2.append([get_color(processed_time_readable, True) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, True)])
|
||||||
|
array_module_type.sort(lambda x,y: cmp(x[4], y[4]), reverse=True)
|
||||||
|
for e in array_module_type:
|
||||||
|
printarray1.append(e)
|
||||||
|
|
||||||
for curr_queue in module_file_array:
|
for curr_queue in module_file_array:
|
||||||
if curr_queue not in all_queue:
|
if curr_queue not in all_queue:
|
||||||
printarray3.append([curr_queue, "Not running"])
|
printarray3.append([curr_queue, "Not running"])
|
||||||
|
else:
|
||||||
|
if len(list(server.smembers('MODULE_TYPE_'+curr_queue))) == 0:
|
||||||
|
if curr_queue not in no_info_modules:
|
||||||
|
no_info_modules[curr_queue] = int(time.time())
|
||||||
|
printarray3.append([curr_queue, "No data"])
|
||||||
|
else:
|
||||||
|
#If no info since long time, try to kill
|
||||||
|
if args.autokill == 1 and int(time.time()) - no_info_modules[curr_queue] > threshold_stucked_module:
|
||||||
|
kill_module(curr_queue, None)
|
||||||
|
no_info_modules[curr_queue] = int(time.time())
|
||||||
|
printarray3.append([curr_queue, "Stuck or idle, restarting in " + str(threshold_stucked_module - (int(time.time()) - no_info_modules[curr_queue])) + "s"])
|
||||||
|
|
||||||
printarray1.sort(lambda x,y: cmp(x[4], y[4]), reverse=True)
|
|
||||||
printarray2.sort(lambda x,y: cmp(x[4], y[4]), reverse=True)
|
#printarray1.sort(lambda x,y: cmp(x[0], y[0]), reverse=False)
|
||||||
|
printarray1.sort(key=lambda x: x[0][9:], reverse=False)
|
||||||
|
#printarray2.sort(lambda x,y: cmp(x[0], y[0]), reverse=False)
|
||||||
|
printarray2.sort(key=lambda x: x[0][9:], reverse=False)
|
||||||
printarray1.insert(0,["Queue", "PID", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"])
|
printarray1.insert(0,["Queue", "PID", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"])
|
||||||
printarray2.insert(0,["Queue", "PID","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"])
|
printarray2.insert(0,["Queue", "PID","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"])
|
||||||
printarray3.insert(0,["Queue", "State"])
|
printarray3.insert(0,["Queue", "State"])
|
||||||
|
@ -204,11 +299,21 @@ if __name__ == "__main__":
|
||||||
t3 = AsciiTable(printarray3, title="Not running queues")
|
t3 = AsciiTable(printarray3, title="Not running queues")
|
||||||
t3.column_max_width(1)
|
t3.column_max_width(1)
|
||||||
|
|
||||||
|
printarray4 = []
|
||||||
|
for elem in printarrayGlob:
|
||||||
|
if elem is not None:
|
||||||
|
printarray4.append(elem)
|
||||||
|
|
||||||
|
t4 = AsciiTable(printarray4, title="Last actions")
|
||||||
|
t4.column_max_width(1)
|
||||||
|
|
||||||
print t1.table
|
print t1.table
|
||||||
print '\n'
|
print '\n'
|
||||||
print t2.table
|
print t2.table
|
||||||
print '\n'
|
print '\n'
|
||||||
print t3.table
|
print t3.table
|
||||||
|
print '\n'
|
||||||
|
print t4.table
|
||||||
|
|
||||||
if (datetime.datetime.now() - lastTime).total_seconds() > args.refresh*5:
|
if (datetime.datetime.now() - lastTime).total_seconds() > args.refresh*5:
|
||||||
lastTime = datetime.datetime.now()
|
lastTime = datetime.datetime.now()
|
||||||
|
|
|
@ -11,6 +11,7 @@ numpy
|
||||||
matplotlib
|
matplotlib
|
||||||
networkx
|
networkx
|
||||||
terminaltables
|
terminaltables
|
||||||
|
colorama
|
||||||
|
|
||||||
#Tokeniser
|
#Tokeniser
|
||||||
nltk
|
nltk
|
||||||
|
|
|
@ -448,7 +448,8 @@ def get_more_search_result():
|
||||||
@app.route("/")
|
@app.route("/")
|
||||||
def index():
|
def index():
|
||||||
default_minute = cfg.get("Flask", "minute_processed_paste")
|
default_minute = cfg.get("Flask", "minute_processed_paste")
|
||||||
return render_template("index.html", default_minute = default_minute)
|
threshold_stucked_module = cfg.getint("Module_ModuleInformation", "threshold_stucked_module")
|
||||||
|
return render_template("index.html", default_minute = default_minute, threshold_stucked_module=threshold_stucked_module)
|
||||||
|
|
||||||
|
|
||||||
@app.route("/monitoring/")
|
@app.route("/monitoring/")
|
||||||
|
|
|
@ -259,9 +259,9 @@ function create_queue_table() {
|
||||||
// - j=1: queueLength
|
// - j=1: queueLength
|
||||||
// - j=2: LastProcessedPasteTime
|
// - j=2: LastProcessedPasteTime
|
||||||
// - j=3: Number of the module belonging in the same category
|
// - j=3: Number of the module belonging in the same category
|
||||||
if (parseInt(glob_tabvar.row1[i][2]) > 60*2 && parseInt(glob_tabvar.row1[i][1]) > 2)
|
if (parseInt(glob_tabvar.row1[i][2]) > window.threshold_stucked_module && parseInt(glob_tabvar.row1[i][1]) > 2)
|
||||||
tr.className += " danger";
|
tr.className += " danger";
|
||||||
else if (parseInt(glob_tabvar.row1[i][2]) > 60*1)
|
else if (parseInt(glob_tabvar.row1[i][1]) == 0)
|
||||||
tr.className += " warning";
|
tr.className += " warning";
|
||||||
else
|
else
|
||||||
tr.className += " success";
|
tr.className += " success";
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
<script>
|
<script>
|
||||||
window.default_minute = {{ default_minute }};
|
window.default_minute = {{ default_minute }};
|
||||||
window.glob_tabvar = []; // Avoid undefined
|
window.glob_tabvar = []; // Avoid undefined
|
||||||
|
window.threshold_stucked_module = {{ threshold_stucked_module }}
|
||||||
function update_values() {
|
function update_values() {
|
||||||
$SCRIPT_ROOT = {{ request.script_root|tojson|safe }};
|
$SCRIPT_ROOT = {{ request.script_root|tojson|safe }};
|
||||||
$.getJSON($SCRIPT_ROOT+"/_stuff",
|
$.getJSON($SCRIPT_ROOT+"/_stuff",
|
||||||
|
|
Loading…
Reference in a new issue