chg: [Keys] add test

This commit is contained in:
Terrtia 2021-05-28 17:23:51 +02:00
parent 0c29e1e4fa
commit 75bc585242
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
6 changed files with 67 additions and 91 deletions

View file

@ -17,15 +17,12 @@ RSA private key, certificate messages
##################################
import time
from enum import Enum
from pubsublogger import publisher
##################################
# Import Project packages
##################################
from module.abstract_module import AbstractModule
from packages import Paste
from Helper import Process
from packages.Item import Item
class KeyEnum(Enum):
@ -49,7 +46,7 @@ class Keys(AbstractModule):
"""
Keys module for AIL framework
"""
def __init__(self):
super(Keys, self).__init__()
@ -58,124 +55,124 @@ class Keys(AbstractModule):
def compute(self, message):
paste = Paste.Paste(message)
content = paste.get_p_content()
item = Item(message)
content = item.get_content()
find = False
get_pgp_content = False
if KeyEnum.PGP_MESSAGE.value in content:
self.redis_logger.warning('{} has a PGP enc message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has a PGP enc message')
msg = 'infoleak:automatic-detection="pgp-message";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="pgp-message";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
get_pgp_content = True
find = True
if KeyEnum.PGP_PUBLIC_KEY_BLOCK.value in content:
msg = 'infoleak:automatic-detection="pgp-public-key-block";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="pgp-public-key-block";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
get_pgp_content = True
if KeyEnum.PGP_SIGNATURE.value in content:
msg = 'infoleak:automatic-detection="pgp-signature";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="pgp-signature";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
get_pgp_content = True
if KeyEnum.CERTIFICATE.value in content:
self.redis_logger.warning('{} has a certificate message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has a certificate message')
msg = 'infoleak:automatic-detection="certificate";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="certificate";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.RSA_PRIVATE_KEY.value in content:
self.redis_logger.warning('{} has a RSA private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has a RSA private key message')
print('rsa private key message found')
msg = 'infoleak:automatic-detection="rsa-private-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="rsa-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.PRIVATE_KEY.value in content:
self.redis_logger.warning('{} has a private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has a private key message')
print('private key message found')
msg = 'infoleak:automatic-detection="private-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.ENCRYPTED_PRIVATE_KEY.value in content:
self.redis_logger.warning('{} has an encrypted private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has an encrypted private key message')
print('encrypted private key message found')
msg = 'infoleak:automatic-detection="encrypted-private-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="encrypted-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.OPENSSH_PRIVATE_KEY.value in content:
self.redis_logger.warning('{} has an openssh private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has an openssh private key message')
print('openssh private key message found')
msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.SSH2_ENCRYPTED_PRIVATE_KEY.value in content:
self.redis_logger.warning('{} has an ssh2 private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has an ssh2 private key message')
print('SSH2 private key message found')
msg = 'infoleak:automatic-detection="private-ssh-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="private-ssh-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.OPENVPN_STATIC_KEY_V1.value in content:
self.redis_logger.warning('{} has an openssh private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has an openssh private key message')
print('OpenVPN Static key message found')
msg = 'infoleak:automatic-detection="vpn-static-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="vpn-static-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.DSA_PRIVATE_KEY.value in content:
self.redis_logger.warning('{} has a dsa private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has a dsa private key message')
msg = 'infoleak:automatic-detection="dsa-private-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="dsa-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.EC_PRIVATE_KEY.value in content:
self.redis_logger.warning('{} has an ec private key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has an ec private key message')
msg = 'infoleak:automatic-detection="ec-private-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="ec-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.PGP_PRIVATE_KEY_BLOCK.value in content:
self.redis_logger.warning('{} has a pgp private key block message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has a pgp private key block message')
msg = 'infoleak:automatic-detection="pgp-private-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="pgp-private-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
if KeyEnum.PUBLIC_KEY.value in content:
self.redis_logger.warning('{} has a public key message'.format(paste.p_name))
self.redis_logger.warning(f'{item.get_basename()} has a public key message')
msg = 'infoleak:automatic-detection="public-key";{}'.format(message)
self.process.populate_set_out(msg, 'Tags')
msg = f'infoleak:automatic-detection="public-key";{item.get_id()}'
self.send_message_to_queue(msg, 'Tags')
find = True
# pgp content
if get_pgp_content:
self.process.populate_set_out(message, 'PgpDump')
self.send_message_to_queue(item.get_id(), 'PgpDump')
if find :
#Send to duplicate
self.process.populate_set_out(message, 'Duplicate')
self.redis_logger.debug(message)
self.send_message_to_queue(item.get_id(), 'Duplicate')
self.redis_logger.debug(f'{item.get_id()} has key(s)')
print(f'{item.get_id()} has key(s)')
if __name__ == '__main__':
module = Keys()
module.run()

View file

@ -493,7 +493,7 @@ function update_thirdparty {
function launch_tests() {
tests_dir=${AIL_HOME}/tests
bin_dir=${AIL_BIN}
python3 `which nosetests` -w $tests_dir --with-coverage --cover-package=$bin_dir -d
python3 `which nosetests` -w $tests_dir --with-coverage --cover-package=$bin_dir -d #--cover-erase
}
function reset_password() {

View file

@ -599,7 +599,11 @@ class Item(AbstractObject):
# # WARNING: UNCLEAN DELETE /!\ TEST ONLY /!\
# TODO: DELETE ITEM CORRELATION + TAGS + METADATA + ...
def delete(self):
os.remove(self.get_filename())
try:
os.remove(self.get_filename())
return True
except FileNotFoundError:
return False
# if __name__ == '__main__':
#

View file

@ -1,36 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys,os
import unittest
import magic
sys.path.append(os.environ['AIL_BIN'])
from packages.Paste import Paste
import Keys as Keys
from Helper import Process
from pubsublogger import publisher
class TestKeysModule(unittest.TestCase):
def setUp(self):
self.paste = Paste('../samples/2018/01/01/keys_certificat_sample.gz')
# Section name in bin/packages/modules.cfg
self.config_section = 'Keys'
# Setup the I/O queues
p = Process(self.config_section)
def test_search_key(self):
with self.assertRaises(pubsublogger.exceptions.NoChannelError):
Keys.search_key(self.paste)
def test_search_key(self):
with self.assertRaises(NameError):
publisher.port = 6380
publisher.channel = 'Script'
Keys.search_key(self.paste)

View file

@ -16,6 +16,7 @@ from Categ import Categ
from CreditCards import CreditCards
from DomClassifier import DomClassifier
from Global import Global
from Keys import Keys
from Onion import Onion
# project packages
@ -107,7 +108,7 @@ class Test_Module_Global(unittest.TestCase):
message = f'{item_id} {item_content_2}'
result = self.module_obj.compute(message, r_result=True)
print(result)
self.assertIn(result, item_id)
self.assertIn(item_id[:-3], result)
self.assertNotEqual(result, item_id)
# cleanup
@ -115,6 +116,16 @@ class Test_Module_Global(unittest.TestCase):
item.delete()
# # TODO: remove from queue
class Test_Module_Keys(unittest.TestCase):
def setUp(self):
self.module_obj = Keys()
def test_module(self):
item_id = 'tests/2021/01/01/keys.gz'
# # TODO: check results
result = self.module_obj.compute(item_id)
class Test_Module_Onion(unittest.TestCase):
def setUp(self):