diff --git a/bin/lib/ail_queues.py b/bin/lib/ail_queues.py index 8c706445..9d064837 100755 --- a/bin/lib/ail_queues.py +++ b/bin/lib/ail_queues.py @@ -38,7 +38,9 @@ class AILQueue: self._set_subscriber() # Update queue stat r_queues.hset('queues', self.name, self.get_nb_messages()) - r_queues.hset(f'modules', f'{self.pid}:{self.name}', -1) + + r_queues.sadd('modules', self.name) + r_queues.hset(f'module:{self.name}', self.pid, -1) def _set_subscriber(self): subscribers = {} @@ -69,7 +71,7 @@ class AILQueue: def get_message(self): # Update queues stats r_queues.hset('queues', self.name, self.get_nb_messages()) - r_queues.hset(f'modules', f'{self.pid}:{self.name}', int(time.time())) + r_queues.hset(f'module:{self.name}', self.pid, int(time.time())) # Get Message message = r_queues.lpop(f'queue:{self.name}:in') @@ -96,8 +98,6 @@ class AILQueue: # condition -> not in any queue # TODO EDIT meta - - def end_message(self, obj_global_id, m_hash): end_processed_obj(obj_global_id, m_hash, module=self.name) @@ -128,6 +128,9 @@ class AILQueue: nb_mess = r_queues.llen(f'queue:{module_name}:in') r_queues.hset('queues', module_name, nb_mess) + def start(self): + r_queues.hset(f'module:start:{self.name}', self.pid, int(time.time())) + # TODO def refresh(self): # TODO check cache @@ -136,12 +139,17 @@ class AILQueue: def clear(self): r_queues.delete(f'queue:{self.name}:in') + def _stop_module(self): + r_queues.hdel(f'module:{self.name}', self.pid) + if r_queues.hlen(f'module:{self.name}') == 0: + r_queues.srem('modules', self.name) + def error(self): - r_queues.hdel(f'modules', f'{self.pid}:{self.name}') + self._stop_module def end(self): self.clear() - r_queues.hdel(f'modules', f'{self.pid}:{self.name}') + self._stop_module() def get_queues_modules(): @@ -155,27 +163,37 @@ def get_nb_sorted_queues_modules(): res = sorted(res.items()) return res -def get_modules_pid_last_mess(): - return r_queues.hgetall('modules') +def get_modules_names(): + return r_queues.smembers('modules') -def get_modules_queues_stats(): - modules_queues_stats = [] +def get_module_pids(name): + return r_queues.hkeys(f'module:{name}') + +def get_module_start_time(name, pid): + return r_queues.hget(f'module:start:{name}', pid) + +def get_module_last_time(name, pid): + return r_queues.hget(f'module:{name}', pid) + +def get_modules_queues_stats(): # TODO ADD OPTION TO PURGE QUEUES + stats = {} + modules_names = sorted(get_modules_names()) nb_queues_modules = get_nb_queues_modules() - modules_pid_last_mess = get_modules_pid_last_mess() - added_modules = set() - for row_module in modules_pid_last_mess: - pid, module = row_module.split(':', 1) - last_time = modules_pid_last_mess[row_module] - last_time = datetime.datetime.fromtimestamp(int(last_time)) - seconds = int((datetime.datetime.now() - last_time).total_seconds()) - modules_queues_stats.append((module, nb_queues_modules[module], seconds, pid)) - added_modules.add(module) - for module in nb_queues_modules: - if module not in added_modules: - modules_queues_stats.append((module, nb_queues_modules[module], -1, 'Not Launched')) - return sorted(modules_queues_stats) + for name in modules_names: + modules = {} + for pid in get_module_pids(name): + modules[pid] = {'start': get_module_start_time(name, pid), 'last': get_module_last_time(name, pid)} + stats[name] = {'in': nb_queues_modules[name], 'modules': modules} + + # Check if module not started + for name in nb_queues_modules: + if name not in stats: + stats[name] = {'in': nb_queues_modules[name], 'modules': None} + return stats def clear_modules_queues_stats(): + for name in get_modules_names(): + r_queues.delete(f'module:{name}') r_queues.delete('modules') @@ -399,12 +417,5 @@ def save_queue_digraph(): if __name__ == '__main__': - # clear_modules_queues_stats() - # save_queue_digraph() - oobj_global_id = 'item::submitted/2023/10/11/submitted_b5440009-05d5-4494-a807-a6d8e4a900cf.gz' - # print(get_processed_obj(oobj_global_id)) - # delete_processed_obj(oobj_global_id) - # while True: - # print(get_processed_obj(oobj_global_id)) - # time.sleep(0.5) - print(get_processed_end_objs()) + # print(get_processed_end_objs()) + print(get_modules_queues_stats()) diff --git a/bin/modules/abstract_module.py b/bin/modules/abstract_module.py index 273f7580..947775df 100644 --- a/bin/modules/abstract_module.py +++ b/bin/modules/abstract_module.py @@ -73,6 +73,9 @@ class AbstractModule(ABC): # Debug Mode self.debug = False + if queue: + self.queue.start() + def get_obj(self): return self.obj