diff --git a/bin/CurveManageTopSets.py b/bin/CurveManageTopSets.py index 8f316333..f4aacd94 100755 --- a/bin/CurveManageTopSets.py +++ b/bin/CurveManageTopSets.py @@ -17,6 +17,7 @@ Requirements import redis import time +import datetime import copy from pubsublogger import publisher from packages import lib_words @@ -87,6 +88,11 @@ def manage_top_set(): for elem in array_month: server_term.zadd(top_termFreq_setName_month[0], float(elem[1]), elem[0]) + timestamp = int(time.mktime(datetime.datetime.now().timetuple())) + value = str(timestamp) + ", " + "-" + r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value) + print "refreshed module" + if __name__ == '__main__': @@ -105,6 +111,18 @@ if __name__ == '__main__': cfg = ConfigParser.ConfigParser() cfg.read(configfile) + + # For Module Manager + r_temp = redis.StrictRedis( + host=cfg.get('RedisPubSub', 'host'), + port=cfg.getint('RedisPubSub', 'port'), + db=cfg.getint('RedisPubSub', 'db')) + + timestamp = int(time.mktime(datetime.datetime.now().timetuple())) + value = str(timestamp) + ", " + "-" + r_temp.set("MODULE_"+ "CurveManageTopSets" + "_" + str(os.getpid()), value) + r_temp.sadd("MODULE_TYPE_"+ "CurveManageTopSets" , str(os.getpid())) + server_term = redis.StrictRedis( host=cfg.get("Redis_Level_DB_TermFreq", "host"), port=cfg.getint("Redis_Level_DB_TermFreq", "port"), diff --git a/bin/ModuleInformation.py b/bin/ModuleInformation.py index df07bf14..5eb81e84 100755 --- a/bin/ModuleInformation.py +++ b/bin/ModuleInformation.py @@ -24,27 +24,36 @@ import ConfigParser import json from terminaltables import AsciiTable import textwrap +from colorama import Fore, Back, Style, init # CONFIG VARIABLES -threshold_stucked_module = 60*60*1 #1 hour +threshold_stucked_module = 60*10*1 #1 hour +kill_retry_threshold = 60 #1m log_filename = "../logs/moduleInfo.log" command_search_pid = "ps a -o pid,cmd | grep {}" command_search_name = "ps a -o pid,cmd | grep {}" command_restart_module = "screen -S \"Script\" -X screen -t \"{}\" bash -c \"./{}.py; read x\"" +init() #Necesary for colorama +printarrayGlob = [None]*14 +printarrayGlob.insert(0, ["Time", "Module", "PID", "Action"]) +lastTimeKillCommand = {} def getPid(module): p = Popen([command_search_pid.format(module+".py")], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True) for line in p.stdout: + print line splittedLine = line.split() if 'python2' in splittedLine: return int(splittedLine[0]) - else: - return None + return None def clearRedisModuleInfo(): for k in server.keys("MODULE_*"): server.delete(k) + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, "*", "-", "Cleared redis module info"]) + printarrayGlob.pop() def cleanRedis(): for k in server.keys("MODULE_TYPE_*"): @@ -60,34 +69,93 @@ def cleanRedis(): if not flag_pid_valid: print flag_pid_valid, 'cleaning', pid, 'in', k server.srem(k, pid) - time.sleep(5) + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, moduleName, pid, "Cleared invalid pid in " + k]) + printarrayGlob.pop() + #time.sleep(5) -def kill_module(module): +def kill_module(module, pid): print '' print '-> trying to kill module:', module - pid = getPid(module) + if pid is None: + print 'pid was None' + printarrayGlob.insert(1, [0, module, pid, "PID was None"]) + printarrayGlob.pop() + pid = getPid(module) + else: #Verify that the pid is at least in redis + if server.exists("MODULE_"+module+"_"+str(pid)) == 0: + return + + lastTimeKillCommand[pid] = int(time.time()) if pid is not None: - os.kill(pid, signal.SIGUSR1) + try: + os.kill(pid, signal.SIGUSR1) + except OSError: + print pid, 'already killed' + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, module, pid, "Already killed"]) + printarrayGlob.pop() + return time.sleep(1) if getPid(module) is None: print module, 'has been killed' print 'restarting', module, '...' p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True) + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, module, pid, "Killed"]) + printarrayGlob.insert(1, [inst_time, module, "?", "Restarted"]) + printarrayGlob.pop() + printarrayGlob.pop() else: print 'killing failed, retrying...' - time.sleep(3) + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, module, pid, "Killing #1 failed."]) + printarrayGlob.pop() + + time.sleep(1) os.kill(pid, signal.SIGUSR1) time.sleep(1) if getPid(module) is None: print module, 'has been killed' print 'restarting', module, '...' p2 = Popen([command_restart_module.format(module, module)], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True) + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, module, pid, "Killed"]) + printarrayGlob.insert(1, [inst_time, module, "?", "Restarted"]) + printarrayGlob.pop() + printarrayGlob.pop() else: print 'killing failed!' - time.sleep(7) + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, module, pid, "Killing failed!"]) + printarrayGlob.pop() + else: + print 'Module does not exist' + inst_time = datetime.datetime.fromtimestamp(int(time.time())) + printarrayGlob.insert(1, [inst_time, module, pid, "Killing failed, module not found"]) + printarrayGlob.pop() + #time.sleep(5) + cleanRedis() + +def get_color(time, idle): + if time is not None: + temp = time.split(':') + time = int(temp[0])*3600 + int(temp[1])*60 + int(temp[2]) + + if time >= threshold_stucked_module: + if not idle: + return Back.RED + Style.BRIGHT + else: + return Back.MAGENTA + Style.BRIGHT + elif time > threshold_stucked_module/2: + return Back.YELLOW + Style.BRIGHT + else: + return Back.GREEN + Style.BRIGHT + else: + return Style.RESET_ALL if __name__ == "__main__": @@ -108,6 +176,8 @@ if __name__ == "__main__": cfg = ConfigParser.ConfigParser() cfg.read(configfile) + threshold_stucked_module = cfg.getint("Module_ModuleInformation", "threshold_stucked_module") + # REDIS # server = redis.StrictRedis( host=cfg.get("Redis_Queues", "host"), @@ -120,11 +190,14 @@ if __name__ == "__main__": lastTime = datetime.datetime.now() module_file_array = set() + no_info_modules = {} path_allmod = os.path.join(os.environ['AIL_HOME'], 'doc/all_modules.txt') with open(path_allmod, 'r') as module_file: for line in module_file: module_file_array.add(line[:-1]) + cleanRedis() + while True: all_queue = set() @@ -135,6 +208,7 @@ if __name__ == "__main__": all_queue.add(queue) key = "MODULE_" + queue + "_" keySet = "MODULE_TYPE_" + queue + array_module_type = [] for moduleNum in server.smembers(keySet): value = server.get(key + str(moduleNum)) @@ -148,20 +222,41 @@ if __name__ == "__main__": if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module: log = open(log_filename, 'a') log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n") - if args.autokill == 1: - kill_module(queue) + try: + last_kill_try = time.time() - lastTimeKillCommand[moduleNum] + except KeyError: + last_kill_try = kill_retry_threshold+1 + if args.autokill == 1 and last_kill_try > kill_retry_threshold : + kill_module(queue, int(moduleNum)) - printarray1.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)]) + array_module_type.append([get_color(processed_time_readable, False) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, False)]) else: - printarray2.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)]) + printarray2.append([get_color(processed_time_readable, True) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, True)]) + array_module_type.sort(lambda x,y: cmp(x[4], y[4]), reverse=True) + for e in array_module_type: + printarray1.append(e) for curr_queue in module_file_array: if curr_queue not in all_queue: - printarray3.append([curr_queue, "Not running"]) + printarray3.append([curr_queue, "Not running"]) + else: + if len(list(server.smembers('MODULE_TYPE_'+curr_queue))) == 0: + if curr_queue not in no_info_modules: + no_info_modules[curr_queue] = int(time.time()) + printarray3.append([curr_queue, "No data"]) + else: + #If no info since long time, try to kill + if args.autokill == 1 and int(time.time()) - no_info_modules[curr_queue] > threshold_stucked_module: + kill_module(curr_queue, None) + no_info_modules[curr_queue] = int(time.time()) + printarray3.append([curr_queue, "Stuck or idle, restarting in " + str(threshold_stucked_module - (int(time.time()) - no_info_modules[curr_queue])) + "s"]) - printarray1.sort(lambda x,y: cmp(x[4], y[4]), reverse=True) - printarray2.sort(lambda x,y: cmp(x[4], y[4]), reverse=True) + + #printarray1.sort(lambda x,y: cmp(x[0], y[0]), reverse=False) + printarray1.sort(key=lambda x: x[0][9:], reverse=False) + #printarray2.sort(lambda x,y: cmp(x[0], y[0]), reverse=False) + printarray2.sort(key=lambda x: x[0][9:], reverse=False) printarray1.insert(0,["Queue", "PID", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"]) printarray2.insert(0,["Queue", "PID","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"]) printarray3.insert(0,["Queue", "State"]) @@ -204,11 +299,21 @@ if __name__ == "__main__": t3 = AsciiTable(printarray3, title="Not running queues") t3.column_max_width(1) + printarray4 = [] + for elem in printarrayGlob: + if elem is not None: + printarray4.append(elem) + + t4 = AsciiTable(printarray4, title="Last actions") + t4.column_max_width(1) + print t1.table print '\n' print t2.table print '\n' print t3.table + print '\n' + print t4.table if (datetime.datetime.now() - lastTime).total_seconds() > args.refresh*5: lastTime = datetime.datetime.now() diff --git a/pip_packages_requirement.txt b/pip_packages_requirement.txt index f6602653..e95df123 100644 --- a/pip_packages_requirement.txt +++ b/pip_packages_requirement.txt @@ -11,6 +11,7 @@ numpy matplotlib networkx terminaltables +colorama #Tokeniser nltk diff --git a/var/www/Flask_server.py b/var/www/Flask_server.py index fcd67a21..4746117e 100755 --- a/var/www/Flask_server.py +++ b/var/www/Flask_server.py @@ -448,7 +448,8 @@ def get_more_search_result(): @app.route("/") def index(): default_minute = cfg.get("Flask", "minute_processed_paste") - return render_template("index.html", default_minute = default_minute) + threshold_stucked_module = cfg.getint("Module_ModuleInformation", "threshold_stucked_module") + return render_template("index.html", default_minute = default_minute, threshold_stucked_module=threshold_stucked_module) @app.route("/monitoring/") diff --git a/var/www/static/js/indexjavascript.js b/var/www/static/js/indexjavascript.js index a289f5ae..8d50ea9d 100644 --- a/var/www/static/js/indexjavascript.js +++ b/var/www/static/js/indexjavascript.js @@ -259,9 +259,9 @@ function create_queue_table() { // - j=1: queueLength // - j=2: LastProcessedPasteTime // - j=3: Number of the module belonging in the same category - if (parseInt(glob_tabvar.row1[i][2]) > 60*2 && parseInt(glob_tabvar.row1[i][1]) > 2) + if (parseInt(glob_tabvar.row1[i][2]) > window.threshold_stucked_module && parseInt(glob_tabvar.row1[i][1]) > 2) tr.className += " danger"; - else if (parseInt(glob_tabvar.row1[i][2]) > 60*1) + else if (parseInt(glob_tabvar.row1[i][1]) == 0) tr.className += " warning"; else tr.className += " success"; diff --git a/var/www/templates/index.html b/var/www/templates/index.html index 66c38a2c..74b45c01 100644 --- a/var/www/templates/index.html +++ b/var/www/templates/index.html @@ -20,6 +20,7 @@