diff --git a/bin/DB_KVROCKS_MIGRATION.py b/bin/DB_KVROCKS_MIGRATION.py index d39fd108..9178c896 100755 --- a/bin/DB_KVROCKS_MIGRATION.py +++ b/bin/DB_KVROCKS_MIGRATION.py @@ -642,7 +642,7 @@ def domain_migration(): domain = Domains.Domain(dom) domain.update_daterange(first_seen) domain.update_daterange(last_check) - domain._set_ports(ports) # TODO ############################################################################ + # domain._set_ports(ports) if last_origin: domain.set_last_origin(last_origin) for language in languages: diff --git a/bin/importer/FileImporter.py b/bin/importer/FileImporter.py index ba5e54b9..410cc65d 100755 --- a/bin/importer/FileImporter.py +++ b/bin/importer/FileImporter.py @@ -24,37 +24,29 @@ from lib import ail_files # TODO RENAME ME logging.config.dictConfig(ail_logger.get_config(name='modules')) -# TODO Clean queue one object destruct - class FileImporter(AbstractImporter): def __init__(self, feeder='file_import'): - super().__init__() + super().__init__(queue=True) self.logger = logging.getLogger(f'{self.__class__.__name__}') self.feeder_name = feeder # TODO sanityze feeder name - # Setup the I/O queues - self.queue = AILQueue('FileImporter', 'manual') - def importer(self, path): if os.path.isfile(path): with open(path, 'rb') as f: content = f.read() - mimetype = ail_files.get_mimetype(content) - if ail_files.is_text(mimetype): + if content: + mimetype = ail_files.get_mimetype(content) item_id = ail_files.create_item_id(self.feeder_name, path) - content = ail_files.create_gzipped_b64(content) - if content: - message = f'dir_import {item_id} {content}' - self.logger.info(message) - self.queue.send_message(message) - elif mimetype == 'application/gzip': - item_id = ail_files.create_item_id(self.feeder_name, path) - content = ail_files.create_b64(content) - if content: - message = f'dir_import {item_id} {content}' - self.logger.info(message) - self.queue.send_message(message) + gzipped = False + if mimetype == 'application/gzip': + gzipped = True + elif not ail_files.is_text(mimetype): + return None + + message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import') + if message: + self.add_message_to_queue(message) class DirImporter(AbstractImporter): def __init__(self): diff --git a/bin/importer/PystemonImporter.py b/bin/importer/PystemonImporter.py index d17dfc8a..536801ba 100755 --- a/bin/importer/PystemonImporter.py +++ b/bin/importer/PystemonImporter.py @@ -10,9 +10,7 @@ # https://github.com/cvandeplas/pystemon/blob/master/pystemon.yaml#L52 # -import base64 import os -import gzip import sys import redis @@ -32,10 +30,6 @@ class PystemonImporter(AbstractImporter): self.r_pystemon = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True) self.dir_pystemon = pystemon_dir - # # TODO: add exception - def encode_and_compress_data(self, content): - return base64.b64encode(gzip.compress(content)).decode() - def importer(self): item_id = self.r_pystemon.lpop("pastes") print(item_id) @@ -53,9 +47,8 @@ class PystemonImporter(AbstractImporter): if not content: return None - b64_gzipped_content = self.encode_and_compress_data(content) - print(item_id, b64_gzipped_content) - return f'{item_id} {b64_gzipped_content}' + return self.create_message(item_id, content, source='pystemon') + except IOError as e: print(f'Error: {full_item_path}, IOError') return None @@ -81,8 +74,7 @@ class PystemonModuleImporter(AbstractModule): return self.importer.importer() def compute(self, message): - relay_message = f'pystemon {message}' - self.add_message_to_queue(relay_message) + self.add_message_to_queue(message) if __name__ == '__main__': diff --git a/bin/importer/abstract_importer.py b/bin/importer/abstract_importer.py index 2e344bc4..e5155775 100755 --- a/bin/importer/abstract_importer.py +++ b/bin/importer/abstract_importer.py @@ -7,26 +7,41 @@ Importer Class Import Content """ +import base64 +import gzip +import logging +import logging.config import os import sys from abc import ABC, abstractmethod -# sys.path.append(os.environ['AIL_BIN']) +sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## # from ConfigLoader import ConfigLoader +from lib import ail_logger +from lib.ail_queues import AILQueue -class AbstractImporter(ABC): - def __init__(self): +logging.config.dictConfig(ail_logger.get_config(name='modules')) + +# TODO Clean queue one object destruct + +class AbstractImporter(ABC): # TODO ail queues + def __init__(self, queue=False): """ - Init Module - importer_name: str; set the importer name if different from the instance ClassName + AIL Importer + :param queue: Allow to push messages to other modules """ # Module name if provided else instance className self.name = self._name() + self.logger = logging.getLogger(f'{self.__class__.__name__}') + + # Setup the I/O queues for one shot importers + if queue: + self.queue = AILQueue(self.name, 'importer_manual') @abstractmethod def importer(self, *args, **kwargs): @@ -39,4 +54,49 @@ class AbstractImporter(ABC): """ return self.__class__.__name__ + def add_message_to_queue(self, message, queue_name=None): + """ + Add message to queue + :param message: message to send in queue + :param queue_name: queue or module name + + ex: add_message_to_queue(item_id, 'Mail') + """ + if message: + self.queue.send_message(message, queue_name) + + @staticmethod + def b64(content): + if isinstance(content, str): + content = content.encode() + return base64.b64encode(content).decode() + + @staticmethod + def create_gzip(content): + if isinstance(content, str): + content = content.encode() + return gzip.compress(content) + + def b64_gzip(self, content): + try: + gziped = self.create_gzip(content) + return self.b64(gziped) + except Exception as e: + self.logger.warning(e) + return '' + + def create_message(self, obj_id, content, b64=False, gzipped=False, source=None): + if not gzipped: + content = self.b64_gzip(content) + elif not b64: + content = self.b64(gzipped) + if not content: + return None + if isinstance(content, bytes): + content = content.decode() + if not source: + source = self.name + self.logger.info(f'{source} {obj_id}') + # self.logger.debug(f'{source} {obj_id} {content}') + return f'{source} {obj_id} {content}' diff --git a/bin/lib/ail_files.py b/bin/lib/ail_files.py index 26929873..31a27669 100755 --- a/bin/lib/ail_files.py +++ b/bin/lib/ail_files.py @@ -1,9 +1,7 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* -import base64 import datetime -import gzip import logging.config import magic import os @@ -181,15 +179,3 @@ def create_item_id(feeder_name, path): item_id = os.path.join(feeder_name, date, basename) # TODO check if already exists return item_id - -def create_b64(b_content): - return base64.standard_b64encode(b_content).decode() - -def create_gzipped_b64(b_content): - try: - gzipencoded = gzip.compress(b_content) - gzip64encoded = create_b64(gzipencoded) - return gzip64encoded - except Exception as e: - logger.warning(e) - return '' diff --git a/bin/modules/abstract_module.py b/bin/modules/abstract_module.py index 38989bd9..0a1a12cd 100644 --- a/bin/modules/abstract_module.py +++ b/bin/modules/abstract_module.py @@ -33,10 +33,9 @@ class AbstractModule(ABC): def __init__(self, module_name=None, queue=True): """ - Init Module + AIL Module, module_name: str; set the module name if different from the instance ClassName - queue_name: str; set the queue name if different from the instance ClassName - logger_channel: str; set the logger channel name, 'Script' by default + :param queue: Allow to push messages to other modules """ self.logger = logging.getLogger(f'{self.__class__.__name__}')