chg: [queue] save module start time + pid

This commit is contained in:
terrtia 2024-12-06 14:08:47 +01:00
parent 2d25579e7f
commit 802cb35085
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
2 changed files with 46 additions and 32 deletions

View file

@ -38,7 +38,9 @@ class AILQueue:
self._set_subscriber() self._set_subscriber()
# Update queue stat # Update queue stat
r_queues.hset('queues', self.name, self.get_nb_messages()) r_queues.hset('queues', self.name, self.get_nb_messages())
r_queues.hset(f'modules', f'{self.pid}:{self.name}', -1)
r_queues.sadd('modules', self.name)
r_queues.hset(f'module:{self.name}', self.pid, -1)
def _set_subscriber(self): def _set_subscriber(self):
subscribers = {} subscribers = {}
@ -69,7 +71,7 @@ class AILQueue:
def get_message(self): def get_message(self):
# Update queues stats # Update queues stats
r_queues.hset('queues', self.name, self.get_nb_messages()) r_queues.hset('queues', self.name, self.get_nb_messages())
r_queues.hset(f'modules', f'{self.pid}:{self.name}', int(time.time())) r_queues.hset(f'module:{self.name}', self.pid, int(time.time()))
# Get Message # Get Message
message = r_queues.lpop(f'queue:{self.name}:in') message = r_queues.lpop(f'queue:{self.name}:in')
@ -96,8 +98,6 @@ class AILQueue:
# condition -> not in any queue # condition -> not in any queue
# TODO EDIT meta # TODO EDIT meta
def end_message(self, obj_global_id, m_hash): def end_message(self, obj_global_id, m_hash):
end_processed_obj(obj_global_id, m_hash, module=self.name) end_processed_obj(obj_global_id, m_hash, module=self.name)
@ -128,6 +128,9 @@ class AILQueue:
nb_mess = r_queues.llen(f'queue:{module_name}:in') nb_mess = r_queues.llen(f'queue:{module_name}:in')
r_queues.hset('queues', module_name, nb_mess) r_queues.hset('queues', module_name, nb_mess)
def start(self):
r_queues.hset(f'module:start:{self.name}', self.pid, int(time.time()))
# TODO # TODO
def refresh(self): def refresh(self):
# TODO check cache # TODO check cache
@ -136,12 +139,17 @@ class AILQueue:
def clear(self): def clear(self):
r_queues.delete(f'queue:{self.name}:in') r_queues.delete(f'queue:{self.name}:in')
def _stop_module(self):
r_queues.hdel(f'module:{self.name}', self.pid)
if r_queues.hlen(f'module:{self.name}') == 0:
r_queues.srem('modules', self.name)
def error(self): def error(self):
r_queues.hdel(f'modules', f'{self.pid}:{self.name}') self._stop_module
def end(self): def end(self):
self.clear() self.clear()
r_queues.hdel(f'modules', f'{self.pid}:{self.name}') self._stop_module()
def get_queues_modules(): def get_queues_modules():
@ -155,27 +163,37 @@ def get_nb_sorted_queues_modules():
res = sorted(res.items()) res = sorted(res.items())
return res return res
def get_modules_pid_last_mess(): def get_modules_names():
return r_queues.hgetall('modules') return r_queues.smembers('modules')
def get_modules_queues_stats(): def get_module_pids(name):
modules_queues_stats = [] return r_queues.hkeys(f'module:{name}')
def get_module_start_time(name, pid):
return r_queues.hget(f'module:start:{name}', pid)
def get_module_last_time(name, pid):
return r_queues.hget(f'module:{name}', pid)
def get_modules_queues_stats(): # TODO ADD OPTION TO PURGE QUEUES
stats = {}
modules_names = sorted(get_modules_names())
nb_queues_modules = get_nb_queues_modules() nb_queues_modules = get_nb_queues_modules()
modules_pid_last_mess = get_modules_pid_last_mess() for name in modules_names:
added_modules = set() modules = {}
for row_module in modules_pid_last_mess: for pid in get_module_pids(name):
pid, module = row_module.split(':', 1) modules[pid] = {'start': get_module_start_time(name, pid), 'last': get_module_last_time(name, pid)}
last_time = modules_pid_last_mess[row_module] stats[name] = {'in': nb_queues_modules[name], 'modules': modules}
last_time = datetime.datetime.fromtimestamp(int(last_time))
seconds = int((datetime.datetime.now() - last_time).total_seconds()) # Check if module not started
modules_queues_stats.append((module, nb_queues_modules[module], seconds, pid)) for name in nb_queues_modules:
added_modules.add(module) if name not in stats:
for module in nb_queues_modules: stats[name] = {'in': nb_queues_modules[name], 'modules': None}
if module not in added_modules: return stats
modules_queues_stats.append((module, nb_queues_modules[module], -1, 'Not Launched'))
return sorted(modules_queues_stats)
def clear_modules_queues_stats(): def clear_modules_queues_stats():
for name in get_modules_names():
r_queues.delete(f'module:{name}')
r_queues.delete('modules') r_queues.delete('modules')
@ -399,12 +417,5 @@ def save_queue_digraph():
if __name__ == '__main__': if __name__ == '__main__':
# clear_modules_queues_stats() # print(get_processed_end_objs())
# save_queue_digraph() print(get_modules_queues_stats())
oobj_global_id = 'item::submitted/2023/10/11/submitted_b5440009-05d5-4494-a807-a6d8e4a900cf.gz'
# print(get_processed_obj(oobj_global_id))
# delete_processed_obj(oobj_global_id)
# while True:
# print(get_processed_obj(oobj_global_id))
# time.sleep(0.5)
print(get_processed_end_objs())

View file

@ -73,6 +73,9 @@ class AbstractModule(ABC):
# Debug Mode # Debug Mode
self.debug = False self.debug = False
if queue:
self.queue.start()
def get_obj(self): def get_obj(self):
return self.obj return self.obj