chg: [module + queues] track + rename object global ID by module

This commit is contained in:
terrtia 2023-10-11 12:06:01 +02:00
parent eae57fb813
commit 676b0f84ef
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
18 changed files with 267 additions and 140 deletions

View file

@ -23,7 +23,7 @@ sys.path.append(os.environ['AIL_BIN'])
################################## ##################################
from core import ail_2_ail from core import ail_2_ail
from modules.abstract_module import AbstractModule from modules.abstract_module import AbstractModule
# from lib.ConfigLoader import ConfigLoader from lib.objects.Items import Item
#### CONFIG #### #### CONFIG ####
# config_loader = ConfigLoader() # config_loader = ConfigLoader()
@ -76,10 +76,11 @@ class Sync_importer(AbstractModule):
# # TODO: create default id # # TODO: create default id
item_id = ail_stream['meta']['ail:id'] item_id = ail_stream['meta']['ail:id']
item = Item(item_id)
message = f'sync {item_id} {b64_gzip_content}' message = f'sync {b64_gzip_content}'
print(item_id) print(item.id)
self.add_message_to_queue(message, 'Importers') self.add_message_to_queue(obj=item, message=message, queue='Importers')
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -15,17 +15,20 @@ This module .
import os import os
import sys import sys
import time import time
import traceback
sys.path.append(os.environ['AIL_BIN']) sys.path.append(os.environ['AIL_BIN'])
################################## ##################################
# Import Project packages # Import Project packages
################################## ##################################
from core import ail_2_ail from core import ail_2_ail
from lib.objects.Items import Item from lib.ail_queues import get_processed_end_obj
from lib.exceptions import ModuleQueueError
from lib.objects import ail_objects
from modules.abstract_module import AbstractModule from modules.abstract_module import AbstractModule
class Sync_module(AbstractModule): class Sync_module(AbstractModule): # TODO KEEP A QUEUE ???????????????????????????????????????????????
""" """
Sync_module module for AIL framework Sync_module module for AIL framework
""" """
@ -53,7 +56,7 @@ class Sync_module(AbstractModule):
print('sync queues refreshed') print('sync queues refreshed')
print(self.dict_sync_queues) print(self.dict_sync_queues)
obj = self.get_obj() obj = ail_objects.get_obj_from_global_id(message)
tags = obj.get_tags() tags = obj.get_tags()
@ -67,10 +70,52 @@ class Sync_module(AbstractModule):
obj_dict = obj.get_default_meta() obj_dict = obj.get_default_meta()
# send to queue push and/or pull # send to queue push and/or pull
for dict_ail in self.dict_sync_queues[queue_uuid]['ail_instances']: for dict_ail in self.dict_sync_queues[queue_uuid]['ail_instances']:
print(f'ail_uuid: {dict_ail["ail_uuid"]} obj: {message}') print(f'ail_uuid: {dict_ail["ail_uuid"]} obj: {obj.type}:{obj.get_subtype(r_str=True)}:{obj.id}')
ail_2_ail.add_object_to_sync_queue(queue_uuid, dict_ail['ail_uuid'], obj_dict, ail_2_ail.add_object_to_sync_queue(queue_uuid, dict_ail['ail_uuid'], obj_dict,
push=dict_ail['push'], pull=dict_ail['pull']) push=dict_ail['push'], pull=dict_ail['pull'])
def run(self):
"""
Run Module endless process
"""
# Endless loop processing messages from the input queue
while self.proceed:
# Get one message (paste) from the QueueIn (copy of Redis_Global publish)
global_id = get_processed_end_obj()
if global_id:
try:
# Module processing with the message from the queue
self.compute(global_id)
except Exception as err:
if self.debug:
self.queue.error()
raise err
# LOG ERROR
trace = traceback.format_tb(err.__traceback__)
trace = ''.join(trace)
self.logger.critical(f"Error in module {self.module_name}: {__name__} : {err}")
self.logger.critical(f"Module {self.module_name} input message: {global_id}")
self.logger.critical(trace)
if isinstance(err, ModuleQueueError):
self.queue.error()
raise err
# remove from set_module
## check if item process == completed
if self.obj:
self.queue.end_message(self.obj.get_global_id(), self.sha256_mess)
self.obj = None
self.sha256_mess = None
else:
self.computeNone()
# Wait before next process
self.logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s")
time.sleep(self.pending_seconds)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -262,23 +262,24 @@ class Crawler(AbstractModule):
if 'html' in entries and entries.get('html'): if 'html' in entries and entries.get('html'):
item_id = crawlers.create_item_id(self.items_dir, self.domain.id) item_id = crawlers.create_item_id(self.items_dir, self.domain.id)
print(item_id) item = Item(item_id)
gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html']) print(item.id)
gzip64encoded = crawlers.get_gzipped_b64_item(item.id, entries['html'])
# send item to Global # send item to Global
relay_message = f'crawler item::{item_id} {gzip64encoded}' relay_message = f'crawler {gzip64encoded}'
self.add_message_to_queue(relay_message, 'Importers') self.add_message_to_queue(obj=item, message=relay_message, queue='Importers')
# Tag # Tag # TODO replace me with metadata to tags
msg = f'infoleak:submission="crawler";{item_id}' # TODO FIXME msg = f'infoleak:submission="crawler"' # TODO FIXME
self.add_message_to_queue(msg, 'Tags') self.add_message_to_queue(obj=item, message=msg, queue='Tags')
# TODO replace me with metadata to add
crawlers.create_item_metadata(item_id, last_url, parent_id) crawlers.create_item_metadata(item_id, last_url, parent_id)
if self.root_item is None: if self.root_item is None:
self.root_item = item_id self.root_item = item_id
parent_id = item_id parent_id = item_id
item = Item(item_id)
title_content = crawlers.extract_title_from_html(entries['html']) title_content = crawlers.extract_title_from_html(entries['html'])
if title_content: if title_content:
title = Titles.create_title(title_content) title = Titles.create_title(title_content)

View file

@ -94,9 +94,9 @@ class FeederImporter(AbstractImporter):
if obj.type == 'item': # object save on disk as file (Items) if obj.type == 'item': # object save on disk as file (Items)
gzip64_content = feeder.get_gzip64_content() gzip64_content = feeder.get_gzip64_content()
return f'{feeder_name} {obj.get_global_id()} {gzip64_content}' return obj, f'{feeder_name} {gzip64_content}'
else: # Messages save on DB else: # Messages save on DB
return f'{feeder_name} {obj.get_global_id()}' return obj, f'{feeder_name}'
class FeederModuleImporter(AbstractModule): class FeederModuleImporter(AbstractModule):
@ -115,8 +115,10 @@ class FeederModuleImporter(AbstractModule):
def compute(self, message): def compute(self, message):
# TODO HANDLE Invalid JSON # TODO HANDLE Invalid JSON
json_data = json.loads(message) json_data = json.loads(message)
relay_message = self.importer.importer(json_data) # TODO multiple objs + messages
self.add_message_to_queue(message=relay_message) obj, relay_message = self.importer.importer(json_data)
####
self.add_message_to_queue(obj=obj, message=relay_message)
# Launch Importer # Launch Importer

View file

@ -22,6 +22,8 @@ from lib import ail_logger
# from lib.ail_queues import AILQueue # from lib.ail_queues import AILQueue
from lib import ail_files # TODO RENAME ME from lib import ail_files # TODO RENAME ME
from lib.objects.Items import Item
logging.config.dictConfig(ail_logger.get_config(name='modules')) logging.config.dictConfig(ail_logger.get_config(name='modules'))
class FileImporter(AbstractImporter): class FileImporter(AbstractImporter):
@ -44,10 +46,12 @@ class FileImporter(AbstractImporter):
elif not ail_files.is_text(mimetype): # # # # elif not ail_files.is_text(mimetype): # # # #
return None return None
# TODO handle multiple objects source = 'dir_import'
message = self.create_message(item_id, content, gzipped=gzipped, source='dir_import') message = self.create_message(content, gzipped=gzipped, source=source)
self.logger.info(f'{source} {item_id}')
obj = Item(item_id)
if message: if message:
self.add_message_to_queue(message=message) self.add_message_to_queue(obj, message=message)
class DirImporter(AbstractImporter): class DirImporter(AbstractImporter):
def __init__(self): def __init__(self):

View file

@ -22,6 +22,8 @@ from importer.abstract_importer import AbstractImporter
from modules.abstract_module import AbstractModule from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
class PystemonImporter(AbstractImporter): class PystemonImporter(AbstractImporter):
def __init__(self, pystemon_dir, host='localhost', port=6379, db=10): def __init__(self, pystemon_dir, host='localhost', port=6379, db=10):
super().__init__() super().__init__()
@ -53,10 +55,13 @@ class PystemonImporter(AbstractImporter):
gzipped = False gzipped = False
# TODO handle multiple objects # TODO handle multiple objects
return self.create_message(item_id, content, gzipped=gzipped, source='pystemon') source = 'pystemon'
message = self.create_message(content, gzipped=gzipped, source=source)
self.logger.info(f'{source} {item_id}')
return item_id, message
except IOError as e: except IOError as e:
print(f'Error: {full_item_path}, IOError') self.logger.error(f'Error {e}: {full_item_path}, IOError')
return None return None
@ -80,7 +85,10 @@ class PystemonModuleImporter(AbstractModule):
return self.importer.importer() return self.importer.importer()
def compute(self, message): def compute(self, message):
self.add_message_to_queue(message=message) if message:
item_id, message = message
item = Item(item_id)
self.add_message_to_queue(obj=item, message=message)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -4,15 +4,13 @@
Importer Class Importer Class
================ ================
Import Content ZMQ Importer
""" """
import os import os
import sys import sys
import zmq import zmq
sys.path.append(os.environ['AIL_BIN']) sys.path.append(os.environ['AIL_BIN'])
################################## ##################################
# Import Project packages # Import Project packages
@ -21,6 +19,8 @@ from importer.abstract_importer import AbstractImporter
from modules.abstract_module import AbstractModule from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
class ZMQImporters(AbstractImporter): class ZMQImporters(AbstractImporter):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
@ -74,18 +74,19 @@ class ZMQModuleImporter(AbstractModule):
for message in messages: for message in messages:
message = message.decode() message = message.decode()
obj_id, gzip64encoded = message.split(' ', 1) # TODO ADD LOGS obj_id, gzip64encoded = message.split(' ', 1) # TODO ADD LOGS
splitted = obj_id.split('>>', 1) splitted = obj_id.split('>>', 1)
if splitted == 2: if splitted == 2:
feeder_name, obj_id = splitted feeder_name, obj_id = splitted
else: else:
feeder_name = self.default_feeder_name feeder_name = self.default_feeder_name
# f'{source} item::{obj_id} {content}' obj = Item(obj_id)
relay_message = f'{feeder_name} item::{obj_id} {gzip64encoded}' # f'{source} {content}'
relay_message = f'{feeder_name} {gzip64encoded}'
print(f'feeder_name item::{obj_id}') print(f'feeder_name item::{obj_id}')
self.add_message_to_queue(message=relay_message) self.add_message_to_queue(obj=obj, message=relay_message)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -54,16 +54,22 @@ class AbstractImporter(ABC): # TODO ail queues
""" """
return self.__class__.__name__ return self.__class__.__name__
def add_message_to_queue(self, message, queue_name=None): def add_message_to_queue(self, obj, message='', queue=None):
""" """
Add message to queue Add message to queue
:param obj: AILObject
:param message: message to send in queue :param message: message to send in queue
:param queue_name: queue or module name :param queue: queue name or module name
ex: add_message_to_queue(item_id, 'Mail') ex: add_message_to_queue(item_id, 'Mail')
""" """
if message: if not obj:
self.queue.send_message(message, queue_name) raise Exception(f'Invalid AIL object, {obj}')
obj_global_id = obj.get_global_id()
self.queue.send_message(obj_global_id, message, queue)
def get_available_queues(self):
return self.queue.get_out_queues()
@staticmethod @staticmethod
def b64(content): def b64(content):
@ -85,20 +91,20 @@ class AbstractImporter(ABC): # TODO ail queues
self.logger.warning(e) self.logger.warning(e)
return '' return ''
def create_message(self, obj_id, content, b64=False, gzipped=False, source=None): def create_message(self, content, b64=False, gzipped=False, source=None):
if not gzipped:
content = self.b64_gzip(content)
elif not b64:
content = self.b64(content)
if not content:
return None
if isinstance(content, bytes):
content = content.decode()
if not source: if not source:
source = self.name source = self.name
self.logger.info(f'{source} {obj_id}')
# self.logger.debug(f'{source} {obj_id} {content}')
# TODO handle multiple objects if content:
return f'{source} item::{obj_id} {content}' if not gzipped:
content = self.b64_gzip(content)
elif not b64:
content = self.b64(content)
if not content:
return None
if isinstance(content, bytes):
content = content.decode()
return f'{source} {content}'
else:
return f'{source}'

View file

@ -27,6 +27,7 @@ import base64
import io import io
import gzip import gzip
# TODO remove compression ???
def gunzip_bytes_obj(bytes_obj): def gunzip_bytes_obj(bytes_obj):
gunzipped_bytes_obj = None gunzipped_bytes_obj = None
try: try:

View file

@ -93,6 +93,10 @@ def zscan_iter(r_redis, name): # count ???
## -- Redis -- ## ## -- Redis -- ##
def rreplace(s, old, new, occurrence):
li = s.rsplit(old, occurrence)
return new.join(li)
def paginate_iterator(iter_elems, nb_obj=50, page=1): def paginate_iterator(iter_elems, nb_obj=50, page=1):
dict_page = {'nb_all_elem': len(iter_elems)} dict_page = {'nb_all_elem': len(iter_elems)}
nb_pages = dict_page['nb_all_elem'] / nb_obj nb_pages = dict_page['nb_all_elem'] / nb_obj

View file

@ -84,6 +84,18 @@ class AILQueue:
add_processed_obj(obj_global_id, m_hash, module=self.name) add_processed_obj(obj_global_id, m_hash, module=self.name)
return obj_global_id, m_hash, mess return obj_global_id, m_hash, mess
def rename_message_obj(self, new_id, old_id):
# restrict rename function
if self.name == 'Mixer' or self.name == 'Global':
rename_processed_obj(new_id, old_id)
else:
raise ModuleQueueError('This Module can\'t rename an object ID')
# condition -> not in any queue
# TODO EDIT meta
def end_message(self, obj_global_id, m_hash): def end_message(self, obj_global_id, m_hash):
end_processed_obj(obj_global_id, m_hash, module=self.name) end_processed_obj(obj_global_id, m_hash, module=self.name)
@ -171,6 +183,12 @@ def clear_modules_queues_stats():
def get_processed_objs(): def get_processed_objs():
return r_obj_process.smembers(f'objs:process') return r_obj_process.smembers(f'objs:process')
def get_processed_end_objs():
return r_obj_process.smembers(f'objs:processed')
def get_processed_end_obj():
return r_obj_process.spop(f'objs:processed')
def get_processed_objs_by_type(obj_type): def get_processed_objs_by_type(obj_type):
return r_obj_process.zrange(f'objs:process:{obj_type}', 0, -1) return r_obj_process.zrange(f'objs:process:{obj_type}', 0, -1)
@ -219,6 +237,28 @@ def end_processed_obj(obj_global_id, m_hash, module=None, queue=None):
r_obj_process.sadd(f'objs:processed', obj_global_id) # TODO use list ?????? r_obj_process.sadd(f'objs:processed', obj_global_id) # TODO use list ??????
def rename_processed_obj(new_id, old_id):
module = get_processed_obj_modules(old_id)
# currently in a module
if len(module) == 1:
module, x_hash = module[0].split(':', 1)
obj_type = old_id.split(':', 1)[0]
r_obj_process.zrem(f'obj:modules:{old_id}', f'{module}:{x_hash}')
r_obj_process.zrem(f'objs:process:{obj_type}', old_id)
r_obj_process.srem(f'objs:process', old_id)
add_processed_obj(new_id, x_hash, module=module)
def delete_processed_obj(obj_global_id):
for q in get_processed_obj_queues(obj_global_id):
queue, x_hash = q.split(':', 1)
r_obj_process.zrem(f'obj:queues:{obj_global_id}', f'{queue}:{x_hash}')
for m in get_processed_obj_modules(obj_global_id):
module, x_hash = m.split(':', 1)
r_obj_process.zrem(f'obj:modules:{obj_global_id}', f'{module}:{x_hash}')
obj_type = obj_global_id.split(':', 1)[0]
r_obj_process.zrem(f'objs:process:{obj_type}', obj_global_id)
r_obj_process.srem(f'objs:process', obj_global_id)
################################################################################### ###################################################################################
@ -322,7 +362,10 @@ def save_queue_digraph():
if __name__ == '__main__': if __name__ == '__main__':
# clear_modules_queues_stats() # clear_modules_queues_stats()
# save_queue_digraph() # save_queue_digraph()
oobj_global_id = 'item::submitted/2023/09/06/submitted_75fb9ff2-8c91-409d-8bd6-31769d73db8f.gz' oobj_global_id = 'item::submitted/2023/10/11/submitted_b5440009-05d5-4494-a807-a6d8e4a900cf.gz'
while True: # print(get_processed_obj(oobj_global_id))
print(get_processed_obj(oobj_global_id)) # delete_processed_obj(oobj_global_id)
time.sleep(0.5) # while True:
# print(get_processed_obj(oobj_global_id))
# time.sleep(0.5)
print(get_processed_end_objs())

View file

@ -11,6 +11,7 @@ import cld3
import html2text import html2text
from io import BytesIO from io import BytesIO
from uuid import uuid4
from pymisp import MISPObject from pymisp import MISPObject
@ -18,7 +19,7 @@ sys.path.append(os.environ['AIL_BIN'])
################################## ##################################
# Import Project packages # Import Project packages
################################## ##################################
from lib.ail_core import get_ail_uuid from lib.ail_core import get_ail_uuid, rreplace
from lib.objects.abstract_object import AbstractObject from lib.objects.abstract_object import AbstractObject
from lib.ConfigLoader import ConfigLoader from lib.ConfigLoader import ConfigLoader
from lib import item_basic from lib import item_basic
@ -137,9 +138,23 @@ class Item(AbstractObject):
#################################################################################### ####################################################################################
#################################################################################### ####################################################################################
def sanitize_id(self): # TODO ADD function to check if ITEM (content + file) already exists
pass
def sanitize_id(self):
if ITEMS_FOLDER in self.id:
self.id = self.id.replace(ITEMS_FOLDER, '', 1)
# limit filename length
basename = self.get_basename()
if len(basename) > 255:
new_basename = f'{basename[:215]}{str(uuid4())}.gz'
self.id = rreplace(self.id, basename, new_basename, 1)
return self.id
# # TODO: sanitize_id # # TODO: sanitize_id
# # TODO: check if already exists ? # # TODO: check if already exists ?

View file

@ -84,6 +84,8 @@ def get_object(obj_type, subtype, obj_id):
return UserAccount(obj_id, subtype) return UserAccount(obj_id, subtype)
elif obj_type == 'username': elif obj_type == 'username':
return Usernames.Username(obj_id, subtype) return Usernames.Username(obj_id, subtype)
else:
raise Exception(f'Unknown AIL object: {obj_type} {subtype} {obj_id}')
def get_objects(objects): def get_objects(objects):
objs = set() objs = set()

View file

@ -79,73 +79,56 @@ class Global(AbstractModule):
self.time_last_stats = time.time() self.time_last_stats = time.time()
self.processed_item = 0 self.processed_item = 0
def compute(self, message, r_result=False): def compute(self, message, r_result=False): # TODO move OBJ ID sanitization to importer
# Recovering the streamed message informations # Recovering the streamed message infos
splitted = message.split() gzip64encoded = message
if len(splitted) == 2: if self.obj.type == 'item':
item, gzip64encoded = splitted if gzip64encoded:
# Remove ITEMS_FOLDER from item path (crawled item + submitted) # Creating the full filepath
if self.ITEMS_FOLDER in item: filename = os.path.join(self.ITEMS_FOLDER, self.obj.id)
item = item.replace(self.ITEMS_FOLDER, '', 1) filename = os.path.realpath(filename)
file_name_item = item.split('/')[-1] # Incorrect filename
if len(file_name_item) > 255: if not os.path.commonprefix([filename, self.ITEMS_FOLDER]) == self.ITEMS_FOLDER:
new_file_name_item = '{}{}.gz'.format(file_name_item[:215], str(uuid4())) self.logger.warning(f'Global; Path traversal detected {filename}')
item = self.rreplace(item, file_name_item, new_file_name_item, 1) print(f'Global; Path traversal detected {filename}')
# Creating the full filepath else:
filename = os.path.join(self.ITEMS_FOLDER, item) # Decode compressed base64
filename = os.path.realpath(filename) decoded = base64.standard_b64decode(gzip64encoded)
new_file_content = self.gunzip_bytes_obj(filename, decoded)
# Incorrect filename # TODO REWRITE ME
if not os.path.commonprefix([filename, self.ITEMS_FOLDER]) == self.ITEMS_FOLDER: if new_file_content:
self.logger.warning(f'Global; Path traversal detected {filename}') filename = self.check_filename(filename, new_file_content)
print(f'Global; Path traversal detected {filename}')
if filename:
# create subdir
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
with open(filename, 'wb') as f:
f.write(decoded)
update_obj_date(self.obj.get_date(), 'item')
self.add_message_to_queue(obj=self.obj, queue='Item')
self.processed_item += 1
print(self.obj.id)
if r_result:
return self.obj.id
else: else:
# Decode compressed base64 self.logger.info(f"Empty Item: {message} not processed")
decoded = base64.standard_b64decode(gzip64encoded) elif self.obj:
new_file_content = self.gunzip_bytes_obj(filename, decoded) # TODO send to specific object queue => image, ...
self.add_message_to_queue(obj=self.obj, queue='Item')
if new_file_content:
filename = self.check_filename(filename, new_file_content)
if filename:
# create subdir
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
with open(filename, 'wb') as f:
f.write(decoded)
item_id = filename
# remove self.ITEMS_FOLDER from
if self.ITEMS_FOLDER in item_id:
item_id = item_id.replace(self.ITEMS_FOLDER, '', 1)
item = Item(item_id)
update_obj_date(item.get_date(), 'item')
self.add_message_to_queue(obj=item, queue='Item')
self.processed_item += 1
# DIRTY FIX AIL SYNC - SEND TO SYNC MODULE
# # FIXME: DIRTY FIX
message = f'{item.get_type()};{item.get_subtype(r_str=True)};{item.get_id()}'
print(message)
self.add_message_to_queue(obj=item, queue='Sync')
print(item_id)
if r_result:
return item_id
else: else:
self.logger.debug(f"Empty Item: {message} not processed") self.logger.critical(f"Empty obj: {self.obj} {message} not processed")
print(f"Empty Item: {message} not processed")
def check_filename(self, filename, new_file_content): def check_filename(self, filename, new_file_content):
""" """

View file

@ -9,7 +9,7 @@ This module is consuming the Redis-list created by the ZMQ_Feed_Q Module.
This module take all the feeds provided in the config. This module take all the feeds provided in the config.
Depending on the configuration, this module will process the feed as follow: Depending on the configuration, this module will process the feed as follows:
operation_mode 1: "Avoid any duplicate from any sources" operation_mode 1: "Avoid any duplicate from any sources"
- The module maintain a list of content for each item - The module maintain a list of content for each item
- If the content is new, process it - If the content is new, process it
@ -64,9 +64,6 @@ class Mixer(AbstractModule):
self.ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate") self.ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate")
self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name") self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name")
self.ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/'
self.ITEMS_FOLDER = os.path.join(os.path.realpath(self.ITEMS_FOLDER), '')
self.nb_processed_items = 0 self.nb_processed_items = 0
self.feeders_processed = {} self.feeders_processed = {}
self.feeders_duplicate = {} self.feeders_duplicate = {}
@ -138,27 +135,38 @@ class Mixer(AbstractModule):
def compute(self, message): def compute(self, message):
self.refresh_stats() self.refresh_stats()
# obj = self.obj
# TODO CHECK IF NOT self.object -> get object global ID from message
splitted = message.split() splitted = message.split()
# message -> # feeder_name - object - content # message -> feeder_name - content
# or # message -> # feeder_name - object # or message -> feeder_name
# feeder_name - object # feeder_name - object
if len(splitted) == 2: # feeder_name - object (content already saved) if len(splitted) == 1: # feeder_name - object (content already saved)
feeder_name, obj_id = splitted feeder_name = message
gzip64encoded = None
# Feeder name in message: "feeder obj_id gzip64encoded" # Feeder name in message: "feeder obj_id gzip64encoded"
elif len(splitted) == 3: # gzip64encoded content elif len(splitted) == 2: # gzip64encoded content
feeder_name, obj_id, gzip64encoded = splitted feeder_name, gzip64encoded = splitted
else: else:
print('Invalid message: not processed') self.logger.warning(f'Invalid Message: {splitted} not processed')
self.logger.debug(f'Invalid Item: {splitted[0]} not processed') # TODO
return None return None
# remove absolute path if self.obj.type == 'item':
item_id = item_id.replace(self.ITEMS_FOLDER, '', 1) # Remove ITEMS_FOLDER from item path (crawled item + submitted)
# Limit basename length
obj_id = self.obj.id
self.obj.sanitize_id()
if self.obj.id != obj_id:
self.queue.rename_message_obj(self.obj.id, obj_id)
relay_message = f'{item_id} {gzip64encoded}'
relay_message = gzip64encoded
# print(relay_message)
# TODO only work for item object
# Avoid any duplicate coming from any sources # Avoid any duplicate coming from any sources
if self.operation_mode == 1: if self.operation_mode == 1:
digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest() digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest()
@ -207,7 +215,10 @@ class Mixer(AbstractModule):
# No Filtering # No Filtering
else: else:
self.increase_stat_processed(feeder_name) self.increase_stat_processed(feeder_name)
self.add_message_to_queue(relay_message) if self.obj.type == 'item':
self.add_message_to_queue(obj=self.obj, message=gzip64encoded)
else:
self.add_message_to_queue(obj=self.obj)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -25,7 +25,7 @@ from modules.abstract_module import AbstractModule
from lib.objects.Items import ITEMS_FOLDER from lib.objects.Items import ITEMS_FOLDER
from lib import ConfigLoader from lib import ConfigLoader
from lib import Tag from lib import Tag
from lib.objects.Items import Item
class SubmitPaste(AbstractModule): class SubmitPaste(AbstractModule):
""" """
@ -276,9 +276,11 @@ class SubmitPaste(AbstractModule):
rel_item_path = save_path.replace(self.PASTES_FOLDER, '', 1) rel_item_path = save_path.replace(self.PASTES_FOLDER, '', 1)
self.redis_logger.debug(f"relative path {rel_item_path}") self.redis_logger.debug(f"relative path {rel_item_path}")
item = Item(rel_item_path)
# send paste to Global module # send paste to Global module
relay_message = f"submitted item::{rel_item_path} {gzip64encoded}" relay_message = f"submitted {gzip64encoded}"
self.add_message_to_queue(message=relay_message) self.add_message_to_queue(obj=item, message=relay_message)
# add tags # add tags
for tag in ltags: for tag in ltags:

View file

@ -46,9 +46,6 @@ class Tags(AbstractModule):
# Forward message to channel # Forward message to channel
self.add_message_to_queue(message=tag, queue='Tag_feed') self.add_message_to_queue(message=tag, queue='Tag_feed')
self.add_message_to_queue(queue='Sync')
if __name__ == '__main__': if __name__ == '__main__':
module = Tags() module = Tags()
module.run() module.run()

View file

@ -96,7 +96,8 @@ class AbstractModule(ABC):
self.obj = None self.obj = None
return None return None
def add_message_to_queue(self, message='', obj=None, queue=None): # TODO ADD META OBJ ????
def add_message_to_queue(self, obj=None, message='', queue=None):
""" """
Add message to queue Add message to queue
:param obj: AILObject :param obj: AILObject