chg: [Mixer] refactor Mixer + fix ZMQImporter config

This commit is contained in:
Terrtia 2023-03-31 14:53:20 +02:00
parent 47da4aa62c
commit d01780dd95
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
8 changed files with 234 additions and 260 deletions

View file

@ -1,224 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Mixer Module
================
This module is consuming the Redis-list created by the ZMQ_Feed_Q Module.
This module take all the feeds provided in the config.
Depending on the configuration, this module will process the feed as follow:
operation_mode 1: "Avoid any duplicate from any sources"
- The module maintain a list of content for each paste
- If the content is new, process it
- Else, do not process it but keep track for statistics on duplicate
operation_mode 2: "Keep duplicate coming from different sources"
- The module maintain a list of name given to the paste by the feeder
- If the name has not yet been seen, process it
- Elseif, the saved content associated with the paste is not the same, process it
- Else, do not process it but keep track for statistics on duplicate
operation_mode 3: "Don't look if duplicated content"
- SImply do not bother to check if it is a duplicate
- Simply do not bother to check if it is a duplicate
Note that the hash of the content is defined as the sha1(gzip64encoded).
Every data coming from a named feed can be sent to a pre-processing module before going to the global module.
The mapping can be done via the variable FEED_QUEUE_MAPPING
"""
import os
import sys
import base64
import hashlib
import time
from pubsublogger import publisher
import redis
from Helper import Process
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
# CONFIG #
refresh_time = 30
FEED_QUEUE_MAPPING = { "feeder2": "preProcess1" } # Map a feeder name to a pre-processing module
if __name__ == '__main__':
publisher.port = 6380
publisher.channel = 'Script'
config_section = 'Mixer'
p = Process(config_section)
config_loader = ConfigLoader.ConfigLoader()
# REDIS #
server = config_loader.get_redis_conn("Redis_Mixer_Cache")
server_cache = config_loader.get_redis_conn("Redis_Log_submit")
# LOGGING #
publisher.info("Feed Script started to receive & publish.")
# OTHER CONFIG #
operation_mode = config_loader.get_config_int("Module_Mixer", "operation_mode")
ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate")
default_unnamed_feed_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name")
PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/'
config_loader = None
# STATS #
processed_paste = 0
processed_paste_per_feeder = {}
duplicated_paste_per_feeder = {}
time_1 = time.time()
print('Operation mode ' + str(operation_mode))
while True:
message = p.get_from_set()
if message is not None:
print(message)
splitted = message.split()
if len(splitted) == 2:
complete_paste, gzip64encoded = splitted # NEW: source, item_id, gzip64 source if len==3 ???
try:
#feeder_name = ( complete_paste.replace("archive/","") ).split("/")[0]
feeder_name, paste_name = complete_paste.split('>>')
feeder_name.replace(" ","")
if 'import_dir' in feeder_name:
feeder_name = feeder_name.split('/')[1]
except ValueError as e:
feeder_name = default_unnamed_feed_name
paste_name = complete_paste
# remove absolute path
paste_name = paste_name.replace(PASTES_FOLDER, '', 1)
# Processed paste
processed_paste += 1
try:
processed_paste_per_feeder[feeder_name] += 1
except KeyError as e:
# new feeder
processed_paste_per_feeder[feeder_name] = 1
duplicated_paste_per_feeder[feeder_name] = 0
relay_message = "{0} {1}".format(paste_name, gzip64encoded)
#relay_message = b" ".join( [paste_name, gzip64encoded] )
digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest()
# Avoid any duplicate coming from any sources
if operation_mode == 1:
if server.exists(digest): # Content already exists
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
else: # New content
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
server.sadd(digest, feeder_name)
server.expire(digest, ttl_key)
# Keep duplicate coming from different sources
elif operation_mode == 2:
# Filter to avoid duplicate
content = server.get('HASH_'+paste_name)
if content is None:
# New content
# Store in redis for filtering
server.set('HASH_'+paste_name, digest)
server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key)
server.expire('HASH_'+paste_name, ttl_key)
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
if digest != content:
# Same paste name but different content
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
server.sadd(paste_name, feeder_name)
server.expire(paste_name, ttl_key)
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
# Already processed
# Keep track of processed pastes
#STATS
duplicated_paste_per_feeder[feeder_name] += 1
continue
else:
# populate Global OR populate another set based on the feeder_name
if feeder_name in FEED_QUEUE_MAPPING:
p.populate_set_out(relay_message, FEED_QUEUE_MAPPING[feeder_name])
else:
p.populate_set_out(relay_message, 'Mixer')
else:
# TODO Store the name of the empty paste inside a Redis-list.
print("Empty Paste: not processed")
publisher.debug("Empty Paste: {0} not processed".format(message))
else:
if int(time.time() - time_1) > refresh_time:
# update internal feeder
list_feeder = server_cache.hkeys("mixer_cache:list_feeder")
if list_feeder:
for feeder in list_feeder:
count = int(server_cache.hget("mixer_cache:list_feeder", feeder))
if count is None:
count = 0
processed_paste_per_feeder[feeder] = processed_paste_per_feeder.get(feeder, 0) + count
processed_paste = processed_paste + count
print(processed_paste_per_feeder)
to_print = 'Mixer; ; ; ;mixer_all All_feeders Processed {0} paste(s) in {1}sec'.format(processed_paste, refresh_time)
print(to_print)
publisher.info(to_print)
processed_paste = 0
for feeder, count in processed_paste_per_feeder.items():
to_print = 'Mixer; ; ; ;mixer_{0} {0} Processed {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
print(to_print)
publisher.info(to_print)
processed_paste_per_feeder[feeder] = 0
for feeder, count in duplicated_paste_per_feeder.items():
to_print = 'Mixer; ; ; ;mixer_{0} {0} Duplicated {1} paste(s) in {2}sec'.format(feeder, count, refresh_time)
print(to_print)
publisher.info(to_print)
duplicated_paste_per_feeder[feeder] = 0
time_1 = time.time()
# delete internal feeder list
server_cache.delete("mixer_cache:list_feeder")
time.sleep(0.5)
continue

View file

@ -22,24 +22,24 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from core import ail_2_ail
from lib.ConfigLoader import ConfigLoader
from modules.abstract_module import AbstractModule
# from lib.ConfigLoader import ConfigLoader
#### CONFIG ####
config_loader = ConfigLoader()
server_cache = config_loader.get_redis_conn("Redis_Log_submit")
config_loader = None
# config_loader = ConfigLoader()
#
# config_loader = None
#### ------ ####
class Sync_importer(AbstractModule):
"""
Tags module for AIL framework
Sync_importer module for AIL framework
"""
def __init__(self):
super(Sync_importer, self).__init__()
# Waiting time in secondes between to message proccessed
# Waiting time in seconds between to message processed
self.pending_seconds = 10
# self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict()
@ -48,7 +48,6 @@ class Sync_importer(AbstractModule):
# Send module state to logs
self.redis_logger.info(f'Module {self.module_name} Launched')
def run(self):
while self.proceed:
### REFRESH DICT
@ -67,7 +66,6 @@ class Sync_importer(AbstractModule):
self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s")
time.sleep(self.pending_seconds)
def compute(self, ail_stream):
# # TODO: SANITYZE AIL STREAM
@ -79,15 +77,11 @@ class Sync_importer(AbstractModule):
# # TODO: create default id
item_id = ail_stream['meta']['ail:id']
message = f'{item_id} {b64_gzip_content}'
message = f'sync {item_id} {b64_gzip_content}'
print(item_id)
self.send_message_to_queue(message, 'Mixer')
# increase nb of item by ail sync
server_cache.hincrby("mixer_cache:list_feeder", 'AIL_Sync', 1)
if __name__ == '__main__':
module = Sync_importer()
module.run()

View file

@ -27,7 +27,6 @@ class Crawler(AbstractModule):
self.pending_seconds = 1
config_loader = ConfigLoader()
self.r_log_submit = config_loader.get_redis_conn('Redis_Log_submit')
self.default_har = config_loader.get_config_boolean('Crawler', 'default_har')
self.default_screenshot = config_loader.get_config_boolean('Crawler', 'default_screenshot')
@ -228,10 +227,8 @@ class Crawler(AbstractModule):
print(item_id)
gzip64encoded = crawlers.get_gzipped_b64_item(item_id, entries['html'])
# send item to Global
relay_message = f'{item_id} {gzip64encoded}'
relay_message = f'crawler {item_id} {gzip64encoded}'
self.send_message_to_queue(relay_message, 'Mixer')
# increase nb of paste by feeder name
self.r_log_submit.hincrby('mixer_cache:list_feeder', 'crawler', 1)
# Tag
msg = f'infoleak:submission="crawler";{item_id}'

View file

@ -93,7 +93,7 @@ class FeederImporter(AbstractImporter):
feeder.process_meta()
gzip64_content = feeder.get_gzip64_content()
return f'{item_id} {gzip64_content}'
return f'{feeder_name} {item_id} {gzip64_content}'
class FeederModuleImporter(AbstractModule):
@ -115,10 +115,6 @@ class FeederModuleImporter(AbstractModule):
relay_message = self.importer.importer(json_data)
self.send_message_to_queue(relay_message)
# TODO IN MIXER
# increase nb of paste by feeder name
# server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1)
# Launch Importer
if __name__ == '__main__':

View file

@ -56,10 +56,11 @@ class ZMQModuleImporter(AbstractModule):
super().__init__()
config_loader = ConfigLoader()
address = config_loader.get_config_str('ZMQ_Global', 'address')
addresses = config_loader.get_config_str('ZMQ_Global', 'address')
addresses = addresses.split(',').strip()
channel = config_loader.get_config_str('ZMQ_Global', 'channel')
self.zmq_importer = ZMQImporters()
# TODO register all Importers
for address in addresses:
self.zmq_importer.add(address, channel)
# TODO MESSAGE SOURCE - UI

218
bin/modules/Mixer.py Executable file
View file

@ -0,0 +1,218 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Mixer Module
================
This module is consuming the Redis-list created by the ZMQ_Feed_Q Module.
This module take all the feeds provided in the config.
Depending on the configuration, this module will process the feed as follow:
operation_mode 1: "Avoid any duplicate from any sources"
- The module maintain a list of content for each item
- If the content is new, process it
- Else, do not process it but keep track for statistics on duplicate
DISABLED
operation_mode 2: "Keep duplicate coming from different sources"
- The module maintain a list of name given to the item by the feeder
- If the name has not yet been seen, process it
- Elseif, the saved content associated with the item is not the same, process it
- Else, do not process it but keep track for statistics on duplicate
operation_mode 3: "Don't look if duplicated content"
- SImply do not bother to check if it is a duplicate
- Simply do not bother to check if it is a duplicate
Note that the hash of the content is defined as the sha1(gzip64encoded).
"""
import os
import sys
import hashlib
import time
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
class Mixer(AbstractModule):
"""docstring for Mixer module."""
def __init__(self):
super(Mixer, self).__init__()
config_loader = ConfigLoader()
self.r_cache = config_loader.get_redis_conn("Redis_Mixer_Cache")
# self.r_cache_s = config_loader.get_redis_conn("Redis_Log_submit")
self.pending_seconds = 5
self.refresh_time = 30
self.last_refresh = time.time()
self.operation_mode = config_loader.get_config_int("Module_Mixer", "operation_mode")
print(f'Operation mode {self.operation_mode}')
self.ttl_key = config_loader.get_config_int("Module_Mixer", "ttl_duplicate")
self.default_feeder_name = config_loader.get_config_str("Module_Mixer", "default_unnamed_feed_name")
self.ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/'
self.ITEMS_FOLDER = os.path.join(os.path.realpath(self.ITEMS_FOLDER), '')
self.nb_processed_items = 0
self.feeders_processed = {}
self.feeders_duplicate = {}
self.redis_logger.info(f"Module: {self.module_name} Launched")
# TODO Save stats in cache
# def get_feeders(self):
# return self.r_cache_s.smembers("mixer_cache:feeders")
#
# def get_feeder_nb_last_processed(self, feeder):
# nb = self.r_cache_s.hget("mixer_cache:feeders:last_processed", feeder)
# if nb:
# return int(nb)
# else:
# return 0
#
# def get_cache_feeders_nb_last_processed(self):
# feeders = {}
# for feeder in self.get_feeders():
# feeders[feeder] = self.get_feeder_nb_last_processed(feeder)
# return feeders
def clear_feeders_stat(self):
pass
# self.r_cache_s.delete("mixer_cache:feeders:last_processed")
def increase_stat_processed(self, feeder):
self.nb_processed_items += 1
try:
self.feeders_processed[feeder] += 1
except KeyError:
self.feeders_processed[feeder] = 1
def increase_stat_duplicate(self, feeder):
self.nb_processed_items += 1
try:
self.feeders_duplicate[feeder] += 1
except KeyError:
self.feeders_duplicate[feeder] = 1
# TODO Save stats in cache
def refresh_stats(self):
if int(time.time() - self.last_refresh) > self.refresh_time:
# update internal feeder
to_print = f'Mixer; ; ; ;mixer_all All_feeders Processed {self.nb_processed_items} item(s) in {self.refresh_time}sec'
print(to_print)
self.redis_logger.info(to_print)
self.nb_processed_items = 0
for feeder in self.feeders_processed:
to_print = f'Mixer; ; ; ;mixer_{feeder} {feeder} Processed {self.feeders_processed[feeder]} item(s) in {self.refresh_time}sec'
print(to_print)
self.redis_logger.info(to_print)
self.feeders_processed[feeder] = 0
for feeder in self.feeders_duplicate:
to_print = f'Mixer; ; ; ;mixer_{feeder} {feeder} Duplicated {self.feeders_duplicate[feeder]} item(s) in {self.refresh_time}sec'
print(to_print)
self.redis_logger.info(to_print)
self.feeders_duplicate[feeder] = 0
self.last_refresh = time.time()
self.clear_feeders_stat()
time.sleep(0.5)
def computeNone(self):
self.refresh_stats()
def compute(self, message):
self.refresh_stats()
splitted = message.split()
# Old Feeder name "feeder>>item_id gzip64encoded"
if len(splitted) == 2:
item_id, gzip64encoded = splitted
try:
feeder_name, item_id = item_id.split('>>')
feeder_name.replace(" ", "")
if 'import_dir' in feeder_name:
feeder_name = feeder_name.split('/')[1]
except ValueError:
feeder_name = self.default_feeder_name
# Feeder name in message: "feeder item_id gzip64encoded"
elif len(splitted) == 3:
feeder_name, item_id, gzip64encoded = splitted
else:
print('Invalid message: not processed')
self.redis_logger.debug('Invalid Item: {message} not processed')
return None
# remove absolute path
item_id = item_id.replace(self.ITEMS_FOLDER, '', 1)
relay_message = f'{item_id} {gzip64encoded}'
# Avoid any duplicate coming from any sources
if self.operation_mode == 1:
digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest()
if self.r_cache.exists(digest): # Content already exists
# STATS
self.increase_stat_duplicate(feeder_name)
else: # New content
self.r_cache.sadd(digest, feeder_name)
self.r_cache.expire(digest, self.ttl_key)
self.increase_stat_processed(feeder_name)
self.send_message_to_queue(relay_message)
# Need To Be Fixed, Currently doesn't check the source (-> same as operation 1)
# # Keep duplicate coming from different sources
# elif self.operation_mode == 2:
# digest = hashlib.sha1(gzip64encoded.encode('utf8')).hexdigest()
# # Filter to avoid duplicate
# older_digest = self.r_cache.get(f'HASH_{item_id}')
# if older_digest is None:
# # New content
# # Store in redis for filtering
# self.r_cache.set(f'HASH_{item_id}', digest)
# self.r_cache.sadd(item_id, feeder_name)
# self.r_cache.expire(item_id, self.ttl_key)
# self.r_cache.expire(f'HASH_{item_id}', self.ttl_key)
#
# self.send_message_to_queue(relay_message)
#
# else:
# if digest != older_digest:
# # Same item name but different content
# # STATS
# self.increase_stat_duplicate(feeder_name)
# self.r_cache.sadd(item_id, feeder_name)
# self.r_cache.expire(item_id, ttl_key)
#
# self.send_message_to_queue(relay_message)
#
# else:
# # Already processed
# # Keep track of processed items
# # STATS
# self.increase_stat_duplicate(feeder_name)
# No Filtering
else:
self.increase_stat_processed(feeder_name)
self.send_message_to_queue(relay_message)
if __name__ == "__main__":
module = Mixer()
module.run()

View file

@ -284,12 +284,9 @@ class SubmitPaste(AbstractModule):
self.redis_logger.debug(f"relative path {rel_item_path}")
# send paste to Global module
relay_message = f"{rel_item_path} {gzip64encoded}"
relay_message = f"submitted {rel_item_path} {gzip64encoded}"
self.process.populate_set_out(relay_message, 'Mixer')
# increase nb of paste by feeder name
self.r_serv_log_submit.hincrby("mixer_cache:list_feeder", source, 1)
# add tags
for tag in ltags:
Tag.add_object_tag(tag, 'item', rel_item_path)

View file

@ -10,7 +10,6 @@ publish = Redis_Import
# subscribe = ZMQ_Global
subscribe = Redis_Import
publish = Redis_Mixer
#publish = Redis_Mixer,Redis_preProcess1
[Sync_importer]
publish = Redis_Mixer,Redis_Tags
@ -168,10 +167,6 @@ publish = Redis_Tags
[Zerobins]
subscribe = Redis_Url
#[PreProcessFeed]
#subscribe = Redis_preProcess1
#publish = Redis_Mixer
# [My_Module]
# subscribe = Redis_Global
# publish = Redis_Tags