ModuleInfo correctly handle CurveManageTopSets,

Changed queue coloring in dashboard,
ModuleInformation now have a history of executed command and better handle module killing.
This commit is contained in:
Mokaddem 2016-11-25 11:54:16 +01:00
parent 224fbc8084
commit a3255d168c
5 changed files with 100 additions and 15 deletions

View file

@ -17,6 +17,7 @@ Requirements
import redis import redis
import time import time
import datetime
import copy import copy
from pubsublogger import publisher from pubsublogger import publisher
from packages import lib_words from packages import lib_words
@ -87,6 +88,11 @@ def manage_top_set():
for elem in array_month: for elem in array_month:
server_term.zadd(top_termFreq_setName_month[0], float(elem[1]), elem[0]) server_term.zadd(top_termFreq_setName_month[0], float(elem[1]), elem[0])
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
value = str(timestamp) + ", " + "-"
r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value)
print "refreshed module"
if __name__ == '__main__': if __name__ == '__main__':
@ -105,6 +111,18 @@ if __name__ == '__main__':
cfg = ConfigParser.ConfigParser() cfg = ConfigParser.ConfigParser()
cfg.read(configfile) cfg.read(configfile)
# For Module Manager
r_temp = redis.StrictRedis(
host=cfg.get('RedisPubSub', 'host'),
port=cfg.getint('RedisPubSub', 'port'),
db=cfg.getint('RedisPubSub', 'db'))
timestamp = int(time.mktime(datetime.datetime.now().timetuple()))
value = str(timestamp) + ", " + "-"
r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value)
r_temp.sadd("MODULE_TYPE_"+ "CurveManageTopSets" , str(os.getpid()))
server_term = redis.StrictRedis( server_term = redis.StrictRedis(
host=cfg.get("Redis_Level_DB_TermFreq", "host"), host=cfg.get("Redis_Level_DB_TermFreq", "host"),
port=cfg.getint("Redis_Level_DB_TermFreq", "port"), port=cfg.getint("Redis_Level_DB_TermFreq", "port"),

View file

@ -26,12 +26,16 @@ from terminaltables import AsciiTable
import textwrap import textwrap
# CONFIG VARIABLES # CONFIG VARIABLES
threshold_stucked_module = 60*60*1 #1 hour threshold_stucked_module = 60*10*1 #1 hour
kill_retry_threshold = 60 #1m
log_filename = "../logs/moduleInfo.log" log_filename = "../logs/moduleInfo.log"
command_search_pid = "ps a -o pid,cmd | grep {}" command_search_pid = "ps a -o pid,cmd | grep {}"
command_search_name = "ps a -o pid,cmd | grep {}" command_search_name = "ps a -o pid,cmd | grep {}"
command_restart_module = "screen -S \"Script\" -X screen -t \"{}\" bash -c \"./{}.py; read x\"" command_restart_module = "screen -S \"Script\" -X screen -t \"{}\" bash -c \"./{}.py; read x\""
printarrayGlob = [None]*14
printarrayGlob.insert(0, ["Time", "Module", "PID", "Action"])
lastTimeKillCommand = {}
def getPid(module): def getPid(module):
p = Popen([command_search_pid.format(module+".py")], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True) p = Popen([command_search_pid.format(module+".py")], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
@ -45,6 +49,9 @@ def getPid(module):
def clearRedisModuleInfo(): def clearRedisModuleInfo():
for k in server.keys("MODULE_*"): for k in server.keys("MODULE_*"):
server.delete(k) server.delete(k)
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, "*", "-", "Cleared redis module info"])
printarrayGlob.pop()
def cleanRedis(): def cleanRedis():
for k in server.keys("MODULE_TYPE_*"): for k in server.keys("MODULE_TYPE_*"):
@ -60,36 +67,76 @@ def cleanRedis():
if not flag_pid_valid: if not flag_pid_valid:
print flag_pid_valid, 'cleaning', pid, 'in', k print flag_pid_valid, 'cleaning', pid, 'in', k
server.srem(k, pid) server.srem(k, pid)
time.sleep(5) inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, moduleName, pid, "Cleared invalid pid in " + k])
printarrayGlob.pop()
#time.sleep(5)
def kill_module(module): def kill_module(module, pid):
print '' print ''
print '-> trying to kill module:', module print '-> trying to kill module:', module
if pid is None:
print 'pid was None'
printarrayGlob.insert(1, [0, module, pid, "PID was None"])
printarrayGlob.pop()
pid = getPid(module) pid = getPid(module)
else: #Verify that the pid is at least in redis
if server.exists("MODULE_"+module+"_"+str(pid)) == 0:
return
lastTimeKillCommand[pid] = int(time.time())
if pid is not None: if pid is not None:
try:
os.kill(pid, signal.SIGUSR1) os.kill(pid, signal.SIGUSR1)
except OSError:
print pid, 'already killed'
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, module, pid, "Already killed"])
printarrayGlob.pop()
return
time.sleep(1) time.sleep(1)
if getPid(module) is None: if getPid(module) is None:
print module, 'has been killed' print module, 'has been killed'
print 'restarting', module, '...' print 'restarting', module, '...'
p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True) p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, module, pid, "Killed"])
printarrayGlob.insert(1, [inst_time, module, "?", "Restarted"])
printarrayGlob.pop()
printarrayGlob.pop()
else: else:
print 'killing failed, retrying...' print 'killing failed, retrying...'
time.sleep(3) inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, module, pid, "Killing #1 failed."])
printarrayGlob.pop()
time.sleep(1)
os.kill(pid, signal.SIGUSR1) os.kill(pid, signal.SIGUSR1)
time.sleep(1) time.sleep(1)
if getPid(module) is None: if getPid(module) is None:
print module, 'has been killed' print module, 'has been killed'
print 'restarting', module, '...' print 'restarting', module, '...'
p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True) p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, module, pid, "Killed"])
printarrayGlob.insert(1, [inst_time, module, "?", "Restarted"])
printarrayGlob.pop()
printarrayGlob.pop()
else: else:
print 'killing failed!' print 'killing failed!'
inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, module, pid, "Killing failed!"])
printarrayGlob.pop()
else: else:
print 'Module does not exist' print 'Module does not exist'
time.sleep(5) inst_time = datetime.datetime.fromtimestamp(int(time.time()))
printarrayGlob.insert(1, [inst_time, module, pid, "Killing failed, module not found"])
printarrayGlob.pop()
#time.sleep(5)
cleanRedis()
if __name__ == "__main__": if __name__ == "__main__":
@ -110,6 +157,8 @@ if __name__ == "__main__":
cfg = ConfigParser.ConfigParser() cfg = ConfigParser.ConfigParser()
cfg.read(configfile) cfg.read(configfile)
threshold_stucked_module = cfg.getint("Module_ModuleInformation", "threshold_stucked_module")
# REDIS # # REDIS #
server = redis.StrictRedis( server = redis.StrictRedis(
host=cfg.get("Redis_Queues", "host"), host=cfg.get("Redis_Queues", "host"),
@ -128,6 +177,8 @@ if __name__ == "__main__":
for line in module_file: for line in module_file:
module_file_array.add(line[:-1]) module_file_array.add(line[:-1])
cleanRedis()
while True: while True:
all_queue = set() all_queue = set()
@ -151,8 +202,12 @@ if __name__ == "__main__":
if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module: if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module:
log = open(log_filename, 'a') log = open(log_filename, 'a')
log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n") log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n")
if args.autokill == 1: try:
kill_module(queue) last_kill_try = time.time() - lastTimeKillCommand[moduleNum]
except KeyError:
last_kill_try = kill_retry_threshold+1
if args.autokill == 1 and last_kill_try > kill_retry_threshold :
kill_module(queue, int(moduleNum))
printarray1.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)]) printarray1.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)])
@ -170,13 +225,13 @@ if __name__ == "__main__":
else: else:
#If no info since long time, try to kill #If no info since long time, try to kill
if int(time.time()) - no_info_modules[curr_queue] > threshold_stucked_module: if int(time.time()) - no_info_modules[curr_queue] > threshold_stucked_module:
kill_module(curr_queue) kill_module(curr_queue, None)
no_info_modules[curr_queue] = int(time.time()) no_info_modules[curr_queue] = int(time.time())
printarray3.append([curr_queue, "Stuck or idle, restarting in " + str(threshold_stucked_module - (int(time.time()) - no_info_modules[curr_queue])) + "s"]) printarray3.append([curr_queue, "Stuck or idle, restarting in " + str(threshold_stucked_module - (int(time.time()) - no_info_modules[curr_queue])) + "s"])
printarray1.sort(lambda x,y: cmp(x[4], y[4]), reverse=True) printarray1.sort(lambda x,y: cmp(x[0], y[0]), reverse=False)
printarray2.sort(lambda x,y: cmp(x[4], y[4]), reverse=True) printarray2.sort(lambda x,y: cmp(x[0], y[0]), reverse=False)
printarray1.insert(0,["Queue", "PID", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"]) printarray1.insert(0,["Queue", "PID", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"])
printarray2.insert(0,["Queue", "PID","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"]) printarray2.insert(0,["Queue", "PID","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"])
printarray3.insert(0,["Queue", "State"]) printarray3.insert(0,["Queue", "State"])
@ -219,11 +274,21 @@ if __name__ == "__main__":
t3 = AsciiTable(printarray3, title="Not running queues") t3 = AsciiTable(printarray3, title="Not running queues")
t3.column_max_width(1) t3.column_max_width(1)
printarray4 = []
for elem in printarrayGlob:
if elem is not None:
printarray4.append(elem)
t4 = AsciiTable(printarray4, title="Last actions")
t4.column_max_width(1)
print t1.table print t1.table
print '\n' print '\n'
print t2.table print t2.table
print '\n' print '\n'
print t3.table print t3.table
print '\n'
print t4.table
if (datetime.datetime.now() - lastTime).total_seconds() > args.refresh*5: if (datetime.datetime.now() - lastTime).total_seconds() > args.refresh*5:
lastTime = datetime.datetime.now() lastTime = datetime.datetime.now()

View file

@ -448,7 +448,8 @@ def get_more_search_result():
@app.route("/") @app.route("/")
def index(): def index():
default_minute = cfg.get("Flask", "minute_processed_paste") default_minute = cfg.get("Flask", "minute_processed_paste")
return render_template("index.html", default_minute = default_minute) threshold_stucked_module = cfg.getint("Module_ModuleInformation", "threshold_stucked_module")
return render_template("index.html", default_minute = default_minute, threshold_stucked_module=threshold_stucked_module)
@app.route("/monitoring/") @app.route("/monitoring/")

View file

@ -259,9 +259,9 @@ function create_queue_table() {
// - j=1: queueLength // - j=1: queueLength
// - j=2: LastProcessedPasteTime // - j=2: LastProcessedPasteTime
// - j=3: Number of the module belonging in the same category // - j=3: Number of the module belonging in the same category
if (parseInt(glob_tabvar.row1[i][2]) > 60*2 && parseInt(glob_tabvar.row1[i][1]) > 2) if (parseInt(glob_tabvar.row1[i][2]) > window.threshold_stucked_module && parseInt(glob_tabvar.row1[i][1]) > 2)
tr.className += " danger"; tr.className += " danger";
else if (parseInt(glob_tabvar.row1[i][2]) > 60*1) else if (parseInt(glob_tabvar.row1[i][1]) == 0)
tr.className += " warning"; tr.className += " warning";
else else
tr.className += " success"; tr.className += " success";

View file

@ -20,6 +20,7 @@
<script> <script>
window.default_minute = {{ default_minute }}; window.default_minute = {{ default_minute }};
window.glob_tabvar = []; // Avoid undefined window.glob_tabvar = []; // Avoid undefined
window.threshold_stucked_module = {{ threshold_stucked_module }}
function update_values() { function update_values() {
$SCRIPT_ROOT = {{ request.script_root|tojson|safe }}; $SCRIPT_ROOT = {{ request.script_root|tojson|safe }};
$.getJSON($SCRIPT_ROOT+"/_stuff", $.getJSON($SCRIPT_ROOT+"/_stuff",