ail-framework/bin/modules/abstract_module.py

147 lines
4.6 KiB
Python
Raw Permalink Normal View History

2021-04-02 07:52:05 +00:00
# coding: utf-8
"""
Base Class for AIL Modules
"""
##################################
# Import External packages
##################################
from abc import ABC, abstractmethod
import time
import traceback
2021-04-02 07:52:05 +00:00
##################################
# Import Project packages
##################################
from pubsublogger import publisher
from Helper import Process
from lib import regex_helper
2021-04-02 07:52:05 +00:00
class AbstractModule(ABC):
"""
Abstract Module class
"""
2021-04-28 13:24:33 +00:00
def __init__(self, module_name=None, queue_name=None, logger_channel='Script'):
2021-04-02 07:52:05 +00:00
"""
Init Module
module_name: str; set the module name if different from the instance ClassName
2021-04-28 13:24:33 +00:00
queue_name: str; set the queue name if different from the instance ClassName
logger_channel: str; set the logger channel name, 'Script' by default
2021-04-02 07:52:05 +00:00
"""
# Module name if provided else instance className
self.module_name = module_name if module_name else self._module_name()
# Module name if provided else instance className
self.queue_name = queue_name if queue_name else self._module_name()
# Init Redis Logger
self.redis_logger = publisher
2021-04-02 07:52:05 +00:00
# Port of the redis instance used by pubsublogger
self.redis_logger.port = 6380
2021-04-28 13:24:33 +00:00
2021-04-02 07:52:05 +00:00
# Channel name to publish logs
# # TODO: refactor logging
2021-04-28 13:24:33 +00:00
# If provided could be a namespaced channel like script:<ModuleName>
self.redis_logger.channel = logger_channel
2021-05-14 12:50:42 +00:00
2022-06-03 13:30:48 +00:00
#Cache key
self.redis_cache_key = regex_helper.generate_redis_cache_key(self.module_name)
self.max_execution_time = 30
2021-04-02 07:52:05 +00:00
# Run module endlessly
self.proceed = True
# Waiting time in secondes between two proccessed messages
self.pending_seconds = 10
# Setup the I/O queues
self.process = Process(self.queue_name)
def get_message(self):
"""
Get message from the Redis Queue (QueueIn)
Input message can change between modules
ex: '<item id>'
"""
return self.process.get_from_set()
def send_message_to_queue(self, message, queue_name=None):
"""
Send message to queue
:param message: message to send in queue
:param queue_name: queue or module name
ex: send_to_queue(item_id, 'Global')
"""
self.process.populate_set_out(message, queue_name)
2021-10-29 16:48:12 +00:00
# add to new set_module
2021-04-02 07:52:05 +00:00
def regex_findall(self, regex, id, content):
"""
regex findall helper (force timeout)
:param regex: compiled regex
:param id: object id
:param content: object content
ex: send_to_queue(item_id, 'Global')
"""
return regex_helper.regex_findall(self.module_name, self.redis_cache_key, regex, id, content, max_time=self.max_execution_time)
2021-04-02 07:52:05 +00:00
def run(self):
"""
Run Module endless process
"""
2021-04-02 07:52:05 +00:00
# Endless loop processing messages from the input queue
while self.proceed:
# Get one message (ex:item id) from the Redis Queue (QueueIn)
message = self.get_message()
2021-04-28 13:24:33 +00:00
if message:
try:
# Module processing with the message from the queue
self.compute(message)
except Exception as err:
trace = traceback.format_tb(err.__traceback__)
trace = ''.join(trace)
2021-04-28 13:24:33 +00:00
self.redis_logger.critical(f"Error in module {self.module_name}: {err}")
self.redis_logger.critical(f"Module {self.module_name} input message: {message}")
self.redis_logger.critical(trace)
print()
print(f"ERROR: {err}")
print(f'MESSAGE: {message}')
print('TRACEBACK:')
print(trace)
2021-10-29 16:48:12 +00:00
# remove from set_module
## check if item process == completed
2021-04-28 13:24:33 +00:00
else:
2021-04-02 07:52:05 +00:00
self.computeNone()
# Wait before next process
self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s")
2021-04-02 07:52:05 +00:00
time.sleep(self.pending_seconds)
def _module_name(self):
"""
Returns the instance class name (ie. the Module Name)
"""
return self.__class__.__name__
@abstractmethod
def compute(self, message):
"""
Main method of the Module to implement
"""
pass
def computeNone(self):
"""
Method of the Module when there is no message
"""
pass