mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-10 08:38:28 +00:00
chg: [queues] timeout obj after 2 days
This commit is contained in:
parent
c05f4d7833
commit
0af5ea9d48
3 changed files with 35 additions and 3 deletions
|
@ -22,18 +22,18 @@ sys.path.append(os.environ['AIL_BIN'])
|
||||||
# Import Project packages
|
# Import Project packages
|
||||||
##################################
|
##################################
|
||||||
from core import ail_2_ail
|
from core import ail_2_ail
|
||||||
from lib.ail_queues import get_processed_end_obj
|
from lib.ail_queues import get_processed_end_obj, timeout_processed_objs
|
||||||
from lib.exceptions import ModuleQueueError
|
from lib.exceptions import ModuleQueueError
|
||||||
from lib.objects import ail_objects
|
from lib.objects import ail_objects
|
||||||
from modules.abstract_module import AbstractModule
|
from modules.abstract_module import AbstractModule
|
||||||
|
|
||||||
|
|
||||||
class Sync_module(AbstractModule): # TODO KEEP A QUEUE ???????????????????????????????????????????????
|
class Sync_module(AbstractModule):
|
||||||
"""
|
"""
|
||||||
Sync_module module for AIL framework
|
Sync_module module for AIL framework
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, queue=False): # FIXME MODIFY/ADD QUEUE
|
def __init__(self, queue=False): # FIXME MODIFY/ADD QUEUE
|
||||||
super(Sync_module, self).__init__(queue=queue)
|
super(Sync_module, self).__init__(queue=queue)
|
||||||
|
|
||||||
# Waiting time in seconds between to message processed
|
# Waiting time in seconds between to message processed
|
||||||
|
@ -81,6 +81,10 @@ class Sync_module(AbstractModule): # TODO KEEP A QUEUE ?????????????????????????
|
||||||
|
|
||||||
# Endless loop processing messages from the input queue
|
# Endless loop processing messages from the input queue
|
||||||
while self.proceed:
|
while self.proceed:
|
||||||
|
|
||||||
|
# Timeout queues
|
||||||
|
timeout_processed_objs()
|
||||||
|
|
||||||
# Get one message (paste) from the QueueIn (copy of Redis_Global publish)
|
# Get one message (paste) from the QueueIn (copy of Redis_Global publish)
|
||||||
global_id = get_processed_end_obj()
|
global_id = get_processed_end_obj()
|
||||||
if global_id:
|
if global_id:
|
||||||
|
|
|
@ -57,6 +57,9 @@ def get_object_all_subtypes(obj_type): # TODO Dynamic subtype
|
||||||
return r_object.smembers(f'all_chat:subtypes')
|
return r_object.smembers(f'all_chat:subtypes')
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
def get_obj_queued():
|
||||||
|
return ['item', 'image']
|
||||||
|
|
||||||
def get_objects_tracked():
|
def get_objects_tracked():
|
||||||
return ['decoded', 'item', 'pgp', 'title']
|
return ['decoded', 'item', 'pgp', 'title']
|
||||||
|
|
||||||
|
|
|
@ -14,10 +14,12 @@ sys.path.append(os.environ['AIL_BIN'])
|
||||||
##################################
|
##################################
|
||||||
from lib.exceptions import ModuleQueueError
|
from lib.exceptions import ModuleQueueError
|
||||||
from lib.ConfigLoader import ConfigLoader
|
from lib.ConfigLoader import ConfigLoader
|
||||||
|
from lib import ail_core
|
||||||
|
|
||||||
config_loader = ConfigLoader()
|
config_loader = ConfigLoader()
|
||||||
r_queues = config_loader.get_redis_conn("Redis_Queues")
|
r_queues = config_loader.get_redis_conn("Redis_Queues")
|
||||||
r_obj_process = config_loader.get_redis_conn("Redis_Process")
|
r_obj_process = config_loader.get_redis_conn("Redis_Process")
|
||||||
|
timeout_queue_obj = 172800
|
||||||
config_loader = None
|
config_loader = None
|
||||||
|
|
||||||
MODULES_FILE = os.path.join(os.environ['AIL_HOME'], 'configs', 'modules.cfg')
|
MODULES_FILE = os.path.join(os.environ['AIL_HOME'], 'configs', 'modules.cfg')
|
||||||
|
@ -248,6 +250,29 @@ def rename_processed_obj(new_id, old_id):
|
||||||
r_obj_process.srem(f'objs:process', old_id)
|
r_obj_process.srem(f'objs:process', old_id)
|
||||||
add_processed_obj(new_id, x_hash, module=module)
|
add_processed_obj(new_id, x_hash, module=module)
|
||||||
|
|
||||||
|
def timeout_process_obj(obj_global_id):
|
||||||
|
for q in get_processed_obj_queues(obj_global_id):
|
||||||
|
queue, x_hash = q.split(':', 1)
|
||||||
|
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{x_hash}')
|
||||||
|
for m in get_processed_obj_modules(obj_global_id):
|
||||||
|
module, x_hash = m.split(':', 1)
|
||||||
|
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{x_hash}')
|
||||||
|
|
||||||
|
obj_type = obj_global_id.split(':', 1)[0]
|
||||||
|
r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id)
|
||||||
|
r_obj_process.srem(f'objs:process', obj_global_id)
|
||||||
|
|
||||||
|
r_obj_process.sadd(f'objs:processed', obj_global_id)
|
||||||
|
print(f'timeout: {obj_global_id}')
|
||||||
|
|
||||||
|
|
||||||
|
def timeout_processed_objs():
|
||||||
|
curr_time = int(time.time())
|
||||||
|
time_limit = curr_time - timeout_queue_obj
|
||||||
|
for obj_type in ail_core.get_obj_queued():
|
||||||
|
for obj_global_id in r_obj_process.zrangebyscore(f'objs:process:{obj_type}', 0, time_limit):
|
||||||
|
timeout_process_obj(obj_global_id)
|
||||||
|
|
||||||
def delete_processed_obj(obj_global_id):
|
def delete_processed_obj(obj_global_id):
|
||||||
for q in get_processed_obj_queues(obj_global_id):
|
for q in get_processed_obj_queues(obj_global_id):
|
||||||
queue, x_hash = q.split(':', 1)
|
queue, x_hash = q.split(':', 1)
|
||||||
|
|
Loading…
Reference in a new issue