diff --git a/bin/Helper.py b/bin/Helper.py index 66d7766a..e7338ceb 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -108,6 +108,7 @@ class Process(object): self.modules = ConfigParser.ConfigParser() self.modules.read(modulesfile) self.subscriber_name = conf_section + self.pubsub = None if self.modules.has_section(conf_section): self.pubsub = PubSub() @@ -118,6 +119,15 @@ class Process(object): port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) + self.moduleNum = 1 + for i in range(1, 50): + curr_num = self.r_temp.get("MODULE_"+self.subscriber_name + "_" + str(i)) + if curr_num is None: + self.moduleNum = i + break + + + def populate_set_in(self): # monoproc src = self.modules.get(self.subscriber_name, 'subscribe') @@ -142,15 +152,18 @@ class Process(object): else: try: - path = message.split(".")[-2].split("/")[-1] + if ".gz" in message: + path = message.split(".")[-2].split("/")[-1] + else: + path = "?" value = str(timestamp) + ", " + path - self.r_temp.set("MODULE_"+self.subscriber_name, value) + self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) return message except: path = "?" value = str(timestamp) + ", " + path - self.r_temp.set("MODULE_"+self.subscriber_name, value) + self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) return message def populate_set_out(self, msg, channel=None): diff --git a/bin/ModuleInformation.py b/bin/ModuleInformation.py index 8fcd8c78..a3c0f0c2 100755 --- a/bin/ModuleInformation.py +++ b/bin/ModuleInformation.py @@ -29,6 +29,9 @@ def getPid(module): else: return None +def clearRedisModuleInfo(): + for k in server.keys("MODULE_*"): + server.delete(k) def kill_module(module): print '' @@ -62,6 +65,7 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description='Show info concerning running modules and log suspected stucked modules. May be use to automatically kill and restart stucked one.') parser.add_argument('-r', '--refresh', type=int, required=False, default=1, help='Refresh rate') parser.add_argument('-k', '--autokill', type=int, required=True, default=1, help='Enable auto kill option (1 for TRUE, anything else for FALSE)') + parser.add_argument('-c', '--clear', type=int, required=False, default=1, help='Clear the current module information (Used to clear data from old launched modules)') args = parser.parse_args() @@ -80,37 +84,46 @@ if __name__ == "__main__": port=cfg.getint("Redis_Queues", "port"), db=cfg.getint("Redis_Queues", "db")) + if args.clear == 1: + clearRedisModuleInfo() + while True: - num = 0 + curr_range = 50 printarray1 = [] printarray2 = [] for queue, card in server.hgetall("queues").iteritems(): - key = "MODULE_" + queue - value = server.get(key) - if value is not None: - timestamp, path = value.split(", ") - if timestamp is not None and path is not None: - num += 1 - startTime_readable = datetime.datetime.fromtimestamp(int(timestamp)) - processed_time_readable = str((datetime.datetime.now() - startTime_readable)).split('.')[0] + key = "MODULE_" + queue + "_" + for i in range(1, 50): + curr_num = server.get("MODULE_"+ queue + "_" + str(i)) + if curr_num is None: + curr_range = i + break - if int(card) > 0: - if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module: - log = open(log_filename, 'a') - log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n") - if args.autokill == 1: - kill_module(queue) + for moduleNum in range(1, curr_range): + value = server.get(key + str(moduleNum)) + if value is not None: + timestamp, path = value.split(", ") + if timestamp is not None and path is not None: + startTime_readable = datetime.datetime.fromtimestamp(int(timestamp)) + processed_time_readable = str((datetime.datetime.now() - startTime_readable)).split('.')[0] - printarray1.append([str(num), str(queue), str(card), str(startTime_readable), str(processed_time_readable), str(path)]) + if int(card) > 0: + if int((datetime.datetime.now() - startTime_readable).total_seconds()) > threshold_stucked_module: + log = open(log_filename, 'a') + log.write(json.dumps([queue, card, str(startTime_readable), str(processed_time_readable), path]) + "\n") + if args.autokill == 1: + kill_module(queue) - else: - printarray2.append([str(num), str(queue), str(card), str(startTime_readable), str(processed_time_readable), str(path)]) + printarray1.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)]) + + else: + printarray2.append([str(queue), str(moduleNum), str(card), str(startTime_readable), str(processed_time_readable), str(path)]) printarray1.sort(lambda x,y: cmp(x[4], y[4]), reverse=True) printarray2.sort(lambda x,y: cmp(x[4], y[4]), reverse=True) - printarray1.insert(0,["#", "Queue", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"]) - printarray2.insert(0,["#", "Queue", "Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"]) + printarray1.insert(0,["Queue", "#", "Amount", "Paste start time", "Processing time for current paste (H:M:S)", "Paste hash"]) + printarray2.insert(0,["Queue", "#","Amount", "Paste start time", "Time since idle (H:M:S)", "Last paste hash"]) os.system('clear') t1 = AsciiTable(printarray1, title="Working queues")