From eae57fb813b3664bd8bd1fb5df151ee80fc212c8 Mon Sep 17 00:00:00 2001 From: terrtia Date: Thu, 5 Oct 2023 16:24:28 +0200 Subject: [PATCH] chg: [importers obj_type] importers queues: add feeder source + object global ID --- bin/crawlers/Crawler.py | 4 ++-- bin/importer/FeederImporter.py | 10 +++++----- bin/importer/FileImporter.py | 5 +++-- bin/importer/PystemonImporter.py | 1 + bin/importer/ZMQImporter.py | 18 +++++++++++++++--- bin/importer/abstract_importer.py | 4 +++- bin/importer/feeders/Default.py | 22 ++++++++++++++++------ bin/importer/feeders/Telegram.py | 14 ++++++++------ bin/modules/Mixer.py | 25 +++++++++++-------------- bin/modules/SubmitPaste.py | 2 +- 10 files changed, 65 insertions(+), 40 deletions(-) diff --git a/bin/crawlers/Crawler.py b/bin/crawlers/Crawler.py index d16ad9f7..adaf7bbf 100755 --- a/bin/crawlers/Crawler.py +++ b/bin/crawlers/Crawler.py @@ -265,11 +265,11 @@ class Crawler(AbstractModule): print(item_id) gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html']) # send item to Global - relay_message = f'crawler {item_id} {gzip64encoded}' + relay_message = f'crawler item::{item_id} {gzip64encoded}' self.add_message_to_queue(relay_message, 'Importers') # Tag - msg = f'infoleak:submission="crawler";{item_id}' + msg = f'infoleak:submission="crawler";{item_id}' # TODO FIXME self.add_message_to_queue(msg, 'Tags') crawlers.create_item_metadata(item_id, last_url, parent_id) diff --git a/bin/importer/FeederImporter.py b/bin/importer/FeederImporter.py index dc2dfb7d..8c8b08cb 100755 --- a/bin/importer/FeederImporter.py +++ b/bin/importer/FeederImporter.py @@ -87,16 +87,16 @@ class FeederImporter(AbstractImporter): feeder_name = feeder.get_name() print(f'importing: {feeder_name} feeder') - item_id = feeder.get_item_id() # TODO replace me with object global id + obj = feeder.get_obj() # TODO replace by a list of objects to import ???? # process meta if feeder.get_json_meta(): feeder.process_meta() - if feeder_name == 'telegram': - return item_id # TODO support UI dashboard - else: + if obj.type == 'item': # object save on disk as file (Items) gzip64_content = feeder.get_gzip64_content() - return f'{feeder_name} {item_id} {gzip64_content}' + return f'{feeder_name} {obj.get_global_id()} {gzip64_content}' + else: # Messages save on DB + return f'{feeder_name} {obj.get_global_id()}' class FeederModuleImporter(AbstractModule): diff --git a/bin/importer/FileImporter.py b/bin/importer/FileImporter.py index 4a926a41..820e7f53 100755 --- a/bin/importer/FileImporter.py +++ b/bin/importer/FileImporter.py @@ -19,7 +19,7 @@ sys.path.append(os.environ['AIL_BIN']) from importer.abstract_importer import AbstractImporter # from modules.abstract_module import AbstractModule from lib import ail_logger -from lib.ail_queues import AILQueue +# from lib.ail_queues import AILQueue from lib import ail_files # TODO RENAME ME logging.config.dictConfig(ail_logger.get_config(name='modules')) @@ -41,9 +41,10 @@ class FileImporter(AbstractImporter): gzipped = False if mimetype == 'application/gzip': gzipped = True - elif not ail_files.is_text(mimetype): + elif not ail_files.is_text(mimetype): # # # # return None + # TODO handle multiple objects message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import') if message: self.add_message_to_queue(message=message) diff --git a/bin/importer/PystemonImporter.py b/bin/importer/PystemonImporter.py index 1a0e68d8..69733ed0 100755 --- a/bin/importer/PystemonImporter.py +++ b/bin/importer/PystemonImporter.py @@ -52,6 +52,7 @@ class PystemonImporter(AbstractImporter): else: gzipped = False + # TODO handle multiple objects return self.create_message(item_id, content, gzipped=gzipped, source='pystemon') except IOError as e: diff --git a/bin/importer/ZMQImporter.py b/bin/importer/ZMQImporter.py index 91728cf9..bb86880f 100755 --- a/bin/importer/ZMQImporter.py +++ b/bin/importer/ZMQImporter.py @@ -56,6 +56,8 @@ class ZMQModuleImporter(AbstractModule): super().__init__() config_loader = ConfigLoader() + self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name") + addresses = config_loader.get_config_str('ZMQ_Global', 'address') addresses = addresses.split(',') channel = config_loader.get_config_str('ZMQ_Global', 'channel') @@ -63,7 +65,6 @@ class ZMQModuleImporter(AbstractModule): for address in addresses: self.zmq_importer.add(address.strip(), channel) - # TODO MESSAGE SOURCE - UI def get_message(self): for message in self.zmq_importer.importer(): # remove channel from message @@ -72,8 +73,19 @@ class ZMQModuleImporter(AbstractModule): def compute(self, messages): for message in messages: message = message.decode() - print(message.split(' ', 1)[0]) - self.add_message_to_queue(message=message) + + obj_id, gzip64encoded = message.split(' ', 1) # TODO ADD LOGS + splitted = obj_id.split('>>', 1) + if splitted == 2: + feeder_name, obj_id = splitted + else: + feeder_name = self.default_feeder_name + + # f'{source} item::{obj_id} {content}' + relay_message = f'{feeder_name} item::{obj_id} {gzip64encoded}' + + print(f'feeder_name item::{obj_id}') + self.add_message_to_queue(message=relay_message) if __name__ == '__main__': diff --git a/bin/importer/abstract_importer.py b/bin/importer/abstract_importer.py index 1c4b458d..ebe32c63 100755 --- a/bin/importer/abstract_importer.py +++ b/bin/importer/abstract_importer.py @@ -98,5 +98,7 @@ class AbstractImporter(ABC): # TODO ail queues source = self.name self.logger.info(f'{source} {obj_id}') # self.logger.debug(f'{source} {obj_id} {content}') - return f'{source} {obj_id} {content}' + + # TODO handle multiple objects + return f'{source} item::{obj_id} {content}' diff --git a/bin/importer/feeders/Default.py b/bin/importer/feeders/Default.py index 482d06b4..f4313707 100755 --- a/bin/importer/feeders/Default.py +++ b/bin/importer/feeders/Default.py @@ -9,14 +9,21 @@ Process Feeder Json (example: Twitter feeder) """ import os import datetime +import sys import uuid +sys.path.append(os.environ['AIL_BIN']) +################################## +# Import Project packages +################################## +from lib.objects import ail_objects + class DefaultFeeder: """Default Feeder""" def __init__(self, json_data): self.json_data = json_data - self.item_id = None + self.obj = None self.name = None def get_name(self): @@ -52,14 +59,17 @@ class DefaultFeeder: return self.json_data.get('data') ## OVERWRITE ME ## - def get_item_id(self): + def get_obj(self): """ - Return item id. define item id + Return obj global id. define obj global id + Default == item object """ date = datetime.date.today().strftime("%Y/%m/%d") - item_id = os.path.join(self.get_name(), date, str(uuid.uuid4())) - self.item_id = f'{item_id}.gz' - return self.item_id + obj_id = os.path.join(self.get_name(), date, str(uuid.uuid4())) + obj_id = f'{obj_id}.gz' + obj_id = f'item::{obj_id}' + self.obj = ail_objects.get_obj_from_global_id(obj_id) + return self.obj ## OVERWRITE ME ## def process_meta(self): diff --git a/bin/importer/feeders/Telegram.py b/bin/importer/feeders/Telegram.py index 313a8c9b..4eea63da 100755 --- a/bin/importer/feeders/Telegram.py +++ b/bin/importer/feeders/Telegram.py @@ -17,6 +17,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from importer.feeders.Default import DefaultFeeder from lib.ConfigLoader import ConfigLoader +from lib.objects import ail_objects from lib.objects.Chats import Chat from lib.objects import Messages from lib.objects import UsersAccount @@ -25,6 +26,7 @@ from lib.objects.Usernames import Username import base64 import io import gzip + def gunzip_bytes_obj(bytes_obj): gunzipped_bytes_obj = None try: @@ -45,8 +47,7 @@ class TelegramFeeder(DefaultFeeder): super().__init__(json_data) self.name = 'telegram' - # define item id - def get_item_id(self): # TODO rename self.item_id + def get_obj(self): # TODO handle others objects -> images, pdf, ... # Get message date timestamp = self.json_data['meta']['date']['timestamp'] # TODO CREATE DEFAULT TIMESTAMP # if self.json_data['meta'].get('date'): @@ -56,8 +57,10 @@ class TelegramFeeder(DefaultFeeder): # date = datetime.date.today().strftime("%Y/%m/%d") chat_id = str(self.json_data['meta']['chat']['id']) message_id = str(self.json_data['meta']['id']) - self.item_id = Messages.create_obj_id('telegram', chat_id, message_id, timestamp) # TODO rename self.item_id - return self.item_id + obj_id = Messages.create_obj_id('telegram', chat_id, message_id, timestamp) + obj_id = f'message:telegram:{obj_id}' + self.obj = ail_objects.get_obj_from_global_id(obj_id) + return self.obj def process_meta(self): """ @@ -81,7 +84,7 @@ class TelegramFeeder(DefaultFeeder): translation = None decoded = base64.standard_b64decode(self.json_data['data']) content = gunzip_bytes_obj(decoded) - message = Messages.create(self.item_id, content, translation=translation) + message = Messages.create(self.obj.id, content, translation=translation) if meta.get('chat'): chat = Chat(meta['chat']['id'], 'telegram') @@ -131,5 +134,4 @@ class TelegramFeeder(DefaultFeeder): # TODO reply threads ???? # message edit ???? - return None diff --git a/bin/modules/Mixer.py b/bin/modules/Mixer.py index 62c427e3..7ca0985d 100755 --- a/bin/modules/Mixer.py +++ b/bin/modules/Mixer.py @@ -139,22 +139,19 @@ class Mixer(AbstractModule): def compute(self, message): self.refresh_stats() splitted = message.split() - # Old Feeder name "feeder>>item_id gzip64encoded" - if len(splitted) == 2: - item_id, gzip64encoded = splitted - try: - feeder_name, item_id = item_id.split('>>') - feeder_name.replace(" ", "") - if 'import_dir' in feeder_name: - feeder_name = feeder_name.split('/')[1] - except ValueError: - feeder_name = self.default_feeder_name - # Feeder name in message: "feeder item_id gzip64encoded" - elif len(splitted) == 3: - feeder_name, item_id, gzip64encoded = splitted + # message -> # feeder_name - object - content + # or # message -> # feeder_name - object + + # feeder_name - object + if len(splitted) == 2: # feeder_name - object (content already saved) + feeder_name, obj_id = splitted + + # Feeder name in message: "feeder obj_id gzip64encoded" + elif len(splitted) == 3: # gzip64encoded content + feeder_name, obj_id, gzip64encoded = splitted else: print('Invalid message: not processed') - self.logger.debug(f'Invalid Item: {splitted[0]} not processed') + self.logger.debug(f'Invalid Item: {splitted[0]} not processed') # TODO return None # remove absolute path diff --git a/bin/modules/SubmitPaste.py b/bin/modules/SubmitPaste.py index 740090ea..e83a0856 100755 --- a/bin/modules/SubmitPaste.py +++ b/bin/modules/SubmitPaste.py @@ -277,7 +277,7 @@ class SubmitPaste(AbstractModule): self.redis_logger.debug(f"relative path {rel_item_path}") # send paste to Global module - relay_message = f"submitted {rel_item_path} {gzip64encoded}" + relay_message = f"submitted item::{rel_item_path} {gzip64encoded}" self.add_message_to_queue(message=relay_message) # add tags