Merge pull request #95 from mokaddem/module_handling

Module handling
This commit is contained in:
Alexandre Dulaunoy 2016-12-22 22:18:31 +01:00 committed by GitHub
commit ddfe3cf66a

View file

@ -25,9 +25,9 @@ import json
from terminaltables import AsciiTable from terminaltables import AsciiTable
import textwrap import textwrap
from colorama import Fore, Back, Style, init from colorama import Fore, Back, Style, init
import curses
# CONFIG VARIABLES # CONFIG VARIABLES
threshold_stucked_module = 60*10*1 #1 hour
kill_retry_threshold = 60 #1m kill_retry_threshold = 60 #1m
log_filename = "../logs/moduleInfo.log" log_filename = "../logs/moduleInfo.log"
command_search_pid = "ps a -o pid,cmd | grep {}" command_search_pid = "ps a -o pid,cmd | grep {}"
@ -39,6 +39,15 @@ printarrayGlob = [None]*14
printarrayGlob.insert(0, ["Time", "Module", "PID", "Action"]) printarrayGlob.insert(0, ["Time", "Module", "PID", "Action"])
lastTimeKillCommand = {} lastTimeKillCommand = {}
#Curses init
#stdscr = curses.initscr()
#curses.cbreak()
#stdscr.keypad(1)
# GLOBAL
last_refresh = 0
def getPid(module): def getPid(module):
p = Popen([command_search_pid.format(module+".py")], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True) p = Popen([command_search_pid.format(module+".py")], stdin=PIPE, stdout=PIPE, bufsize=1, shell=True)
for line in p.stdout: for line in p.stdout:
@ -145,23 +154,33 @@ def get_color(time, idle):
temp = time.split(':') temp = time.split(':')
time = int(temp[0])*3600 + int(temp[1])*60 + int(temp[2]) time = int(temp[0])*3600 + int(temp[1])*60 + int(temp[2])
if time >= threshold_stucked_module: if time >= args.treshold:
if not idle: if not idle:
return Back.RED + Style.BRIGHT return Back.RED + Style.BRIGHT
else: else:
return Back.MAGENTA + Style.BRIGHT return Back.MAGENTA + Style.BRIGHT
elif time > threshold_stucked_module/2: elif time > args.treshold/2:
return Back.YELLOW + Style.BRIGHT return Back.YELLOW + Style.BRIGHT
else: else:
return Back.GREEN + Style.BRIGHT return Back.GREEN + Style.BRIGHT
else: else:
return Style.RESET_ALL return Style.RESET_ALL
def waiting_refresh():
global last_refresh
if time.time() - last_refresh < args.refresh:
return False
else:
last_refresh = time.time()
return True
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Show info concerning running modules and log suspected stucked modules. May be use to automatically kill and restart stucked one.') parser = argparse.ArgumentParser(description='Show info concerning running modules and log suspected stucked modules. May be use to automatically kill and restart stucked one.')
parser.add_argument('-r', '--refresh', type=int, required=False, default=1, help='Refresh rate') parser.add_argument('-r', '--refresh', type=int, required=False, default=1, help='Refresh rate')
parser.add_argument('-t', '--treshold', type=int, required=False, default=60*10*1, help='Refresh rate')
parser.add_argument('-k', '--autokill', type=int, required=False, default=0, help='Enable auto kill option (1 for TRUE, anything else for FALSE)') parser.add_argument('-k', '--autokill', type=int, required=False, default=0, help='Enable auto kill option (1 for TRUE, anything else for FALSE)')
parser.add_argument('-c', '--clear', type=int, required=False, default=0, help='Clear the current module information (Used to clear data from old launched modules)') parser.add_argument('-c', '--clear', type=int, required=False, default=0, help='Clear the current module information (Used to clear data from old launched modules)')
@ -176,8 +195,6 @@ if __name__ == "__main__":
cfg = ConfigParser.ConfigParser() cfg = ConfigParser.ConfigParser()
cfg.read(configfile) cfg.read(configfile)
threshold_stucked_module = cfg.getint("Module_ModuleInformation", "threshold_stucked_module")
# REDIS # # REDIS #
server = redis.StrictRedis( server = redis.StrictRedis(
host=cfg.get("Redis_Queues", "host"), host=cfg.get("Redis_Queues", "host"),
@ -199,123 +216,138 @@ if __name__ == "__main__":
cleanRedis() cleanRedis()
while True: while True:
if waiting_refresh():
all_queue = set() #key = ''
printarray1 = [] #while key != 'q':
printarray2 = [] # key = stdsrc.getch()
printarray3 = [] # stdscr.refresh()
for queue, card in server.hgetall("queues").iteritems():
all_queue.add(queue) all_queue = set()
key = "MODULE_" + queue + "_" printarray1 = []
keySet = "MODULE_TYPE_" + queue printarray2 = []
array_module_type = [] printarray3 = []
for queue, card in server.hgetall("queues").iteritems():
for moduleNum in server.smembers(keySet): all_queue.add(queue)
value = server.get(key + str(moduleNum)) key = "MODULE_" + queue + "_"
if value is not None: keySet = "MODULE_TYPE_" + queue
timestamp, path = value.split(", ") array_module_type = []
if timestamp is not None and path is not None:
startTime_readable = datetime.datetime.fromtimestamp(int(timestamp)) for moduleNum in server.smembers(keySet):
processed_time_readable = str((datetime.datetime.now() - startTime_readable)).split('.')[0] value = server.get(key + str(moduleNum))
if value is not None:
if int(card) > 0: timestamp, path = value.split(", ")
if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module: if timestamp is not None and path is not None:
log = open(log_filename, 'a') startTime_readable = datetime.datetime.fromtimestamp(int(timestamp))
log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n") processed_time_readable = str((datetime.datetime.now() - startTime_readable)).split('.')[0]
try:
last_kill_try = time.time() - lastTimeKillCommand[moduleNum] if int(card) > 0:
except KeyError: if int((datetime.datetime.now() - startTime_readable).total_seconds()) > args.treshold:
last_kill_try = kill_retry_threshold+1 log = open(log_filename, 'a')
if args.autokill == 1 and last_kill_try > kill_retry_threshold : log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n")
kill_module(queue, int(moduleNum)) try:
last_kill_try = time.time() - lastTimeKillCommand[moduleNum]
array_module_type.append([get_color(processed_time_readable, False) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, False)]) except KeyError:
last_kill_try = kill_retry_threshold+1
else: if args.autokill == 1 and last_kill_try > kill_retry_threshold :
printarray2.append([get_color(processed_time_readable, True) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, True)]) kill_module(queue, int(moduleNum))
array_module_type.sort(lambda x,y: cmp(x[4], y[4]), reverse=True)
for e in array_module_type: array_module_type.append([get_color(processed_time_readable, False) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, False)])
printarray1.append(e)
else:
for curr_queue in module_file_array: printarray2.append([get_color(processed_time_readable, True) + str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path) + get_color(None, True)])
if curr_queue not in all_queue: array_module_type.sort(lambda x,y: cmp(x[4], y[4]), reverse=True)
printarray3.append([curr_queue, "Not running"]) for e in array_module_type:
else: printarray1.append(e)
if len(list(server.smembers('MODULE_TYPE_'+curr_queue))) == 0:
if curr_queue not in no_info_modules: for curr_queue in module_file_array:
no_info_modules[curr_queue] = int(time.time()) if curr_queue not in all_queue:
printarray3.append([curr_queue, "No data"]) printarray3.append([curr_queue, "Not running"])
else: else:
#If no info since long time, try to kill if len(list(server.smembers('MODULE_TYPE_'+curr_queue))) == 0:
if args.autokill == 1 and int(time.time()) - no_info_modules[curr_queue] > threshold_stucked_module: if curr_queue not in no_info_modules:
kill_module(curr_queue, None)
no_info_modules[curr_queue] = int(time.time()) no_info_modules[curr_queue] = int(time.time())
printarray3.append([curr_queue, "Stuck or idle, restarting in " + str(threshold_stucked_module - (int(time.time()) - no_info_modules[curr_queue])) + "s"]) printarray3.append([curr_queue, "No data"])
else:
#If no info since long time, try to kill
#printarray1.sort(lambda x,y: cmp(x[0], y[0]), reverse=False) if args.autokill == 1:
printarray1.sort(key=lambda x: x[0][9:], reverse=False) if int(time.time()) - no_info_modules[curr_queue] > args.treshold:
#printarray2.sort(lambda x,y: cmp(x[0], y[0]), reverse=False) kill_module(curr_queue, None)
printarray2.sort(key=lambda x: x[0][9:], reverse=False) no_info_modules[curr_queue] = int(time.time())
printarray1.insert(0,["Queue", "PID", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"]) printarray3.append([curr_queue, "Stuck or idle, restarting in " + str(abs(args.treshold - (int(time.time()) - no_info_modules[curr_queue]))) + "s"])
printarray2.insert(0,["Queue", "PID","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"]) else:
printarray3.insert(0,["Queue", "State"]) printarray3.append([curr_queue, "Stuck or idle, restarting disabled"])
os.system('clear') ## FIXME To add:
t1 = AsciiTable(printarray1, title="Working queues") ## Button KILL Process using Curses
t1.column_max_width(1)
if not t1.ok: printarray1.sort(key=lambda x: x[0][9:], reverse=False)
longest_col = t1.column_widths.index(max(t1.column_widths)) printarray2.sort(key=lambda x: x[0][9:], reverse=False)
max_length_col = t1.column_max_width(longest_col) printarray1.insert(0,["Queue", "PID", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"])
if max_length_col > 0: printarray2.insert(0,["Queue", "PID","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"])
for i, content in enumerate(t1.table_data): printarray3.insert(0,["Queue", "State"])
if len(content[longest_col]) > max_length_col:
temp = '' os.system('clear')
for l in content[longest_col].splitlines(): t1 = AsciiTable(printarray1, title="Working queues")
if len(l) > max_length_col: t1.column_max_width(1)
temp += '\n'.join(textwrap.wrap(l, max_length_col)) + '\n' if not t1.ok:
else: longest_col = t1.column_widths.index(max(t1.column_widths))
temp += l + '\n' max_length_col = t1.column_max_width(longest_col)
content[longest_col] = temp.strip() if max_length_col > 0:
t1.table_data[i] = content for i, content in enumerate(t1.table_data):
if len(content[longest_col]) > max_length_col:
t2 = AsciiTable(printarray2, title="Idling queues") temp = ''
t2.column_max_width(1) for l in content[longest_col].splitlines():
if not t2.ok: if len(l) > max_length_col:
longest_col = t2.column_widths.index(max(t2.column_widths)) temp += '\n'.join(textwrap.wrap(l, max_length_col)) + '\n'
max_length_col = t2.column_max_width(longest_col) else:
if max_length_col > 0: temp += l + '\n'
for i, content in enumerate(t2.table_data): content[longest_col] = temp.strip()
if len(content[longest_col]) > max_length_col: t1.table_data[i] = content
temp = ''
for l in content[longest_col].splitlines(): t2 = AsciiTable(printarray2, title="Idling queues")
if len(l) > max_length_col: t2.column_max_width(1)
temp += '\n'.join(textwrap.wrap(l, max_length_col)) + '\n' if not t2.ok:
else: longest_col = t2.column_widths.index(max(t2.column_widths))
temp += l + '\n' max_length_col = t2.column_max_width(longest_col)
content[longest_col] = temp.strip() if max_length_col > 0:
t2.table_data[i] = content for i, content in enumerate(t2.table_data):
if len(content[longest_col]) > max_length_col:
t3 = AsciiTable(printarray3, title="Not running queues") temp = ''
t3.column_max_width(1) for l in content[longest_col].splitlines():
if len(l) > max_length_col:
printarray4 = [] temp += '\n'.join(textwrap.wrap(l, max_length_col)) + '\n'
for elem in printarrayGlob: else:
if elem is not None: temp += l + '\n'
printarray4.append(elem) content[longest_col] = temp.strip()
t2.table_data[i] = content
t4 = AsciiTable(printarray4, title="Last actions")
t4.column_max_width(1) t3 = AsciiTable(printarray3, title="Not running queues")
t3.column_max_width(1)
print t1.table
print '\n' printarray4 = []
print t2.table for elem in printarrayGlob:
print '\n' if elem is not None:
print t3.table printarray4.append(elem)
print '\n'
print t4.table t4 = AsciiTable(printarray4, title="Last actions")
t4.column_max_width(1)
if (datetime.datetime.now() - lastTime).total_seconds() > args.refresh*5:
lastTime = datetime.datetime.now() legend_array = [["Color", "Meaning"], [Back.RED+Style.BRIGHT+" "*10+Style.RESET_ALL, "Time >=" +str(args.treshold)+Style.RESET_ALL], [Back.MAGENTA+Style.BRIGHT+" "*10+Style.RESET_ALL, "Time >=" +str(args.treshold)+" while idle"+Style.RESET_ALL], [Back.YELLOW+Style.BRIGHT+" "*10+Style.RESET_ALL, "Time >=" +str(args.treshold/2)+Style.RESET_ALL], [Back.GREEN+Style.BRIGHT+" "*10+Style.RESET_ALL, "Time <" +str(args.treshold)]]
cleanRedis() legend = AsciiTable(legend_array, title="Legend")
time.sleep(args.refresh) legend.column_max_width(1)
print legend.table
print '\n'
print t1.table
print '\n'
print t2.table
print '\n'
print t3.table
print '\n'
print t4.table
if (datetime.datetime.now() - lastTime).total_seconds() > args.refresh*5:
lastTime = datetime.datetime.now()
cleanRedis()
#time.sleep(args.refresh)