chg: [importers obj_type] importers queues: add feeder source + object global ID

This commit is contained in:
terrtia 2023-10-05 16:24:28 +02:00
parent daf9f6fb5d
commit eae57fb813
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
10 changed files with 65 additions and 40 deletions

View file

@ -265,11 +265,11 @@ class Crawler(AbstractModule):
print(item_id) print(item_id)
gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html']) gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html'])
# send item to Global # send item to Global
relay_message = f'crawler {item_id} {gzip64encoded}' relay_message = f'crawler item::{item_id} {gzip64encoded}'
self.add_message_to_queue(relay_message, 'Importers') self.add_message_to_queue(relay_message, 'Importers')
# Tag # Tag
msg = f'infoleak:submission="crawler";{item_id}' msg = f'infoleak:submission="crawler";{item_id}' # TODO FIXME
self.add_message_to_queue(msg, 'Tags') self.add_message_to_queue(msg, 'Tags')
crawlers.create_item_metadata(item_id, last_url, parent_id) crawlers.create_item_metadata(item_id, last_url, parent_id)

View file

@ -87,16 +87,16 @@ class FeederImporter(AbstractImporter):
feeder_name = feeder.get_name() feeder_name = feeder.get_name()
print(f'importing: {feeder_name} feeder') print(f'importing: {feeder_name} feeder')
item_id = feeder.get_item_id() # TODO replace me with object global id obj = feeder.get_obj() # TODO replace by a list of objects to import ????
# process meta # process meta
if feeder.get_json_meta(): if feeder.get_json_meta():
feeder.process_meta() feeder.process_meta()
if feeder_name == 'telegram': if obj.type == 'item': # object save on disk as file (Items)
return item_id # TODO support UI dashboard
else:
gzip64_content = feeder.get_gzip64_content() gzip64_content = feeder.get_gzip64_content()
return f'{feeder_name} {item_id} {gzip64_content}' return f'{feeder_name} {obj.get_global_id()} {gzip64_content}'
else: # Messages save on DB
return f'{feeder_name} {obj.get_global_id()}'
class FeederModuleImporter(AbstractModule): class FeederModuleImporter(AbstractModule):

View file

@ -19,7 +19,7 @@ sys.path.append(os.environ['AIL_BIN'])
from importer.abstract_importer import AbstractImporter from importer.abstract_importer import AbstractImporter
# from modules.abstract_module import AbstractModule # from modules.abstract_module import AbstractModule
from lib import ail_logger from lib import ail_logger
from lib.ail_queues import AILQueue # from lib.ail_queues import AILQueue
from lib import ail_files # TODO RENAME ME from lib import ail_files # TODO RENAME ME
logging.config.dictConfig(ail_logger.get_config(name='modules')) logging.config.dictConfig(ail_logger.get_config(name='modules'))
@ -41,9 +41,10 @@ class FileImporter(AbstractImporter):
gzipped = False gzipped = False
if mimetype == 'application/gzip': if mimetype == 'application/gzip':
gzipped = True gzipped = True
elif not ail_files.is_text(mimetype): elif not ail_files.is_text(mimetype): # # # #
return None return None
# TODO handle multiple objects
message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import') message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import')
if message: if message:
self.add_message_to_queue(message=message) self.add_message_to_queue(message=message)

View file

@ -52,6 +52,7 @@ class PystemonImporter(AbstractImporter):
else: else:
gzipped = False gzipped = False
# TODO handle multiple objects
return self.create_message(item_id, content, gzipped=gzipped, source='pystemon') return self.create_message(item_id, content, gzipped=gzipped, source='pystemon')
except IOError as e: except IOError as e:

View file

@ -56,6 +56,8 @@ class ZMQModuleImporter(AbstractModule):
super().__init__() super().__init__()
config_loader = ConfigLoader() config_loader = ConfigLoader()
self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name")
addresses = config_loader.get_config_str('ZMQ_Global', 'address') addresses = config_loader.get_config_str('ZMQ_Global', 'address')
addresses = addresses.split(',') addresses = addresses.split(',')
channel = config_loader.get_config_str('ZMQ_Global', 'channel') channel = config_loader.get_config_str('ZMQ_Global', 'channel')
@ -63,7 +65,6 @@ class ZMQModuleImporter(AbstractModule):
for address in addresses: for address in addresses:
self.zmq_importer.add(address.strip(), channel) self.zmq_importer.add(address.strip(), channel)
# TODO MESSAGE SOURCE - UI
def get_message(self): def get_message(self):
for message in self.zmq_importer.importer(): for message in self.zmq_importer.importer():
# remove channel from message # remove channel from message
@ -72,8 +73,19 @@ class ZMQModuleImporter(AbstractModule):
def compute(self, messages): def compute(self, messages):
for message in messages: for message in messages:
message = message.decode() message = message.decode()
print(message.split(' ', 1)[0])
self.add_message_to_queue(message=message) obj_id, gzip64encoded = message.split(' ', 1) # TODO ADD LOGS
splitted = obj_id.split('>>', 1)
if splitted == 2:
feeder_name, obj_id = splitted
else:
feeder_name = self.default_feeder_name
# f'{source} item::{obj_id} {content}'
relay_message = f'{feeder_name} item::{obj_id} {gzip64encoded}'
print(f'feeder_name item::{obj_id}')
self.add_message_to_queue(message=relay_message)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -98,5 +98,7 @@ class AbstractImporter(ABC): # TODO ail queues
source = self.name source = self.name
self.logger.info(f'{source} {obj_id}') self.logger.info(f'{source} {obj_id}')
# self.logger.debug(f'{source} {obj_id} {content}') # self.logger.debug(f'{source} {obj_id} {content}')
return f'{source} {obj_id} {content}'
# TODO handle multiple objects
return f'{source} item::{obj_id} {content}'

View file

@ -9,14 +9,21 @@ Process Feeder Json (example: Twitter feeder)
""" """
import os import os
import datetime import datetime
import sys
import uuid import uuid
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib.objects import ail_objects
class DefaultFeeder: class DefaultFeeder:
"""Default Feeder""" """Default Feeder"""
def __init__(self, json_data): def __init__(self, json_data):
self.json_data = json_data self.json_data = json_data
self.item_id = None self.obj = None
self.name = None self.name = None
def get_name(self): def get_name(self):
@ -52,14 +59,17 @@ class DefaultFeeder:
return self.json_data.get('data') return self.json_data.get('data')
## OVERWRITE ME ## ## OVERWRITE ME ##
def get_item_id(self): def get_obj(self):
""" """
Return item id. define item id Return obj global id. define obj global id
Default == item object
""" """
date = datetime.date.today().strftime("%Y/%m/%d") date = datetime.date.today().strftime("%Y/%m/%d")
item_id = os.path.join(self.get_name(), date, str(uuid.uuid4())) obj_id = os.path.join(self.get_name(), date, str(uuid.uuid4()))
self.item_id = f'{item_id}.gz' obj_id = f'{obj_id}.gz'
return self.item_id obj_id = f'item::{obj_id}'
self.obj = ail_objects.get_obj_from_global_id(obj_id)
return self.obj
## OVERWRITE ME ## ## OVERWRITE ME ##
def process_meta(self): def process_meta(self):

View file

@ -17,6 +17,7 @@ sys.path.append(os.environ['AIL_BIN'])
################################## ##################################
from importer.feeders.Default import DefaultFeeder from importer.feeders.Default import DefaultFeeder
from lib.ConfigLoader import ConfigLoader from lib.ConfigLoader import ConfigLoader
from lib.objects import ail_objects
from lib.objects.Chats import Chat from lib.objects.Chats import Chat
from lib.objects import Messages from lib.objects import Messages
from lib.objects import UsersAccount from lib.objects import UsersAccount
@ -25,6 +26,7 @@ from lib.objects.Usernames import Username
import base64 import base64
import io import io
import gzip import gzip
def gunzip_bytes_obj(bytes_obj): def gunzip_bytes_obj(bytes_obj):
gunzipped_bytes_obj = None gunzipped_bytes_obj = None
try: try:
@ -45,8 +47,7 @@ class TelegramFeeder(DefaultFeeder):
super().__init__(json_data) super().__init__(json_data)
self.name = 'telegram' self.name = 'telegram'
# define item id def get_obj(self): # TODO handle others objects -> images, pdf, ...
def get_item_id(self): # TODO rename self.item_id
# Get message date # Get message date
timestamp = self.json_data['meta']['date']['timestamp'] # TODO CREATE DEFAULT TIMESTAMP timestamp = self.json_data['meta']['date']['timestamp'] # TODO CREATE DEFAULT TIMESTAMP
# if self.json_data['meta'].get('date'): # if self.json_data['meta'].get('date'):
@ -56,8 +57,10 @@ class TelegramFeeder(DefaultFeeder):
# date = datetime.date.today().strftime("%Y/%m/%d") # date = datetime.date.today().strftime("%Y/%m/%d")
chat_id = str(self.json_data['meta']['chat']['id']) chat_id = str(self.json_data['meta']['chat']['id'])
message_id = str(self.json_data['meta']['id']) message_id = str(self.json_data['meta']['id'])
self.item_id = Messages.create_obj_id('telegram', chat_id, message_id, timestamp) # TODO rename self.item_id obj_id = Messages.create_obj_id('telegram', chat_id, message_id, timestamp)
return self.item_id obj_id = f'message:telegram:{obj_id}'
self.obj = ail_objects.get_obj_from_global_id(obj_id)
return self.obj
def process_meta(self): def process_meta(self):
""" """
@ -81,7 +84,7 @@ class TelegramFeeder(DefaultFeeder):
translation = None translation = None
decoded = base64.standard_b64decode(self.json_data['data']) decoded = base64.standard_b64decode(self.json_data['data'])
content = gunzip_bytes_obj(decoded) content = gunzip_bytes_obj(decoded)
message = Messages.create(self.item_id, content, translation=translation) message = Messages.create(self.obj.id, content, translation=translation)
if meta.get('chat'): if meta.get('chat'):
chat = Chat(meta['chat']['id'], 'telegram') chat = Chat(meta['chat']['id'], 'telegram')
@ -131,5 +134,4 @@ class TelegramFeeder(DefaultFeeder):
# TODO reply threads ???? # TODO reply threads ????
# message edit ???? # message edit ????
return None return None

View file

@ -139,22 +139,19 @@ class Mixer(AbstractModule):
def compute(self, message): def compute(self, message):
self.refresh_stats() self.refresh_stats()
splitted = message.split() splitted = message.split()
# Old Feeder name "feeder>>item_id gzip64encoded" # message -> # feeder_name - object - content
if len(splitted) == 2: # or # message -> # feeder_name - object
item_id, gzip64encoded = splitted
try: # feeder_name - object
feeder_name, item_id = item_id.split('>>') if len(splitted) == 2: # feeder_name - object (content already saved)
feeder_name.replace(" ", "") feeder_name, obj_id = splitted
if 'import_dir' in feeder_name:
feeder_name = feeder_name.split('/')[1] # Feeder name in message: "feeder obj_id gzip64encoded"
except ValueError: elif len(splitted) == 3: # gzip64encoded content
feeder_name = self.default_feeder_name feeder_name, obj_id, gzip64encoded = splitted
# Feeder name in message: "feeder item_id gzip64encoded"
elif len(splitted) == 3:
feeder_name, item_id, gzip64encoded = splitted
else: else:
print('Invalid message: not processed') print('Invalid message: not processed')
self.logger.debug(f'Invalid Item: {splitted[0]} not processed') self.logger.debug(f'Invalid Item: {splitted[0]} not processed') # TODO
return None return None
# remove absolute path # remove absolute path

View file

@ -277,7 +277,7 @@ class SubmitPaste(AbstractModule):
self.redis_logger.debug(f"relative path {rel_item_path}") self.redis_logger.debug(f"relative path {rel_item_path}")
# send paste to Global module # send paste to Global module
relay_message = f"submitted {rel_item_path} {gzip64encoded}" relay_message = f"submitted item::{rel_item_path} {gzip64encoded}"
self.add_message_to_queue(message=relay_message) self.add_message_to_queue(message=relay_message)
# add tags # add tags