ail-framework/bin/modules/Decoder.py

132 lines
4.8 KiB
Python
Raw Normal View History

2018-07-19 14:52:09 +00:00
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
Decoder module
Dectect Binary and decode it
"""
##################################
# Import External packages
##################################
2018-07-19 14:52:09 +00:00
import os
import base64
from hashlib import sha1
import re
import sys
2018-07-19 14:52:09 +00:00
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from modules.abstract_module import AbstractModule
2022-08-19 14:53:31 +00:00
from lib.ConfigLoader import ConfigLoader
from lib.objects.Items import Item
from lib.objects.Decodeds import Decoded
2022-08-19 14:53:31 +00:00
config_loader = ConfigLoader()
2023-04-04 12:12:23 +00:00
hex_max_execution_time = config_loader.get_config_int("Decoder", "max_execution_time_hexadecimal")
binary_max_execution_time = config_loader.get_config_int("Decoder", "max_execution_time_binary")
base64_max_execution_time = config_loader.get_config_int("Decoder", "max_execution_time_base64")
2022-08-19 14:53:31 +00:00
config_loader = None
class Decoder(AbstractModule):
"""
Decoder module for AIL framework
"""
2018-07-19 14:52:09 +00:00
def hex_decoder(self, hexStr):
# hexStr = ''.join( hex_string.split(" ") )
return bytes(bytearray([int(hexStr[i:i+2], 16) for i in range(0, len(hexStr), 2)]))
2018-07-19 14:52:09 +00:00
def binary_decoder(self, binary_string):
return bytes(bytearray([int(binary_string[i:i+8], 2) for i in range(0, len(binary_string), 8)]))
2018-07-19 14:52:09 +00:00
def base64_decoder(self, base64_string):
return base64.b64decode(base64_string)
2018-07-19 14:52:09 +00:00
def __init__(self):
super(Decoder, self).__init__()
2018-07-19 14:52:09 +00:00
regex_binary = r'[0-1]{40,}'
# regex_hex = r'(0[xX])?[A-Fa-f0-9]{40,}'
regex_hex = r'[A-Fa-f0-9]{40,}'
regex_base64 = r'(?:[A-Za-z0-9+/]{4}){2,}(?:[A-Za-z0-9+/]{2}[AEIMQUYcgkosw048]=|[A-Za-z0-9+/][AQgw]==)'
2018-07-19 14:52:09 +00:00
cmp_regex_binary = re.compile(regex_binary)
cmp_regex_hex = re.compile(regex_hex)
cmp_regex_base64 = re.compile(regex_base64)
2018-07-19 14:52:09 +00:00
# map decoder function
self.decoder_function = {'binary': self.binary_decoder,
'hexadecimal': self.hex_decoder,
'base64': self.base64_decoder}
2018-07-19 14:52:09 +00:00
# list all decoder with regex,
decoder_binary = {'name': 'binary', 'regex': cmp_regex_binary,
'encoded_min_size': 300, 'max_execution_time': binary_max_execution_time}
decoder_hexadecimal = {'name': 'hexadecimal', 'regex': cmp_regex_hex,
'encoded_min_size': 300, 'max_execution_time': hex_max_execution_time}
decoder_base64 = {'name': 'base64', 'regex': cmp_regex_base64,
'encoded_min_size': 40, 'max_execution_time': base64_max_execution_time}
2018-07-19 14:52:09 +00:00
self.decoder_order = [decoder_base64, decoder_binary, decoder_hexadecimal, decoder_base64]
2018-07-19 14:52:09 +00:00
# Waiting time in seconds between to message processed
self.pending_seconds = 1
2018-07-19 14:52:09 +00:00
# Send module state to logs
self.redis_logger.info(f'Module {self.module_name} initialized')
2018-07-19 14:52:09 +00:00
def compute(self, message):
2018-07-19 14:52:09 +00:00
2022-08-19 14:53:31 +00:00
item = Item(message)
content = item.get_content()
date = item.get_date()
2018-07-19 14:52:09 +00:00
for decoder in self.decoder_order:
find = False
dname = decoder['name']
encodeds = self.regex_findall(decoder['regex'], item.id, content)
# PERF remove encoded from item content
for encoded in encodeds:
content = content.replace(encoded, '', 1)
encodeds = set(encodeds)
for encoded in encodeds:
find = False
if len(encoded) >= decoder['encoded_min_size']:
decoded_file = self.decoder_function[dname](encoded)
sha1_string = sha1(decoded_file).hexdigest()
decoded = Decoded(sha1_string)
if not decoded.exists():
mimetype = decoded.guess_mimetype(decoded_file)
if not mimetype:
print(sha1_string, item.id)
raise Exception(f'Invalid mimetype: {decoded.id} {item.id}')
decoded.save_file(decoded_file, mimetype)
else:
mimetype = decoded.get_mimetype()
decoded.add(dname, date, item.id, mimetype=mimetype)
# DEBUG
self.redis_logger.debug(f'{item.id} : {dname} - {decoded.id} - {mimetype}')
print(f'{item.id} : {dname} - {decoded.id} - {mimetype}')
if find:
self.redis_logger.info(f'{item.id} - {dname}')
print(f'{item.id} - {dname}')
# Send to Tags
msg = f'infoleak:automatic-detection="{dname}";{item.id}'
self.send_message_to_queue(msg, 'Tags')
if __name__ == '__main__':
module = Decoder()
module.run()