fix: [tests] fix tests + global new file content

This commit is contained in:
terrtia 2024-02-27 10:15:40 +01:00
parent 9917d4212c
commit 775b7fa868
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
4 changed files with 64 additions and 46 deletions

View file

@ -105,6 +105,11 @@ class Global(AbstractModule):
filename = self.check_filename(filename, new_file_content) filename = self.check_filename(filename, new_file_content)
if filename: if filename:
new_obj_id = filename.replace(self.ITEMS_FOLDER, '', 1)
new_obj = Item(new_obj_id)
new_obj.sanitize_id()
self.set_obj(new_obj)
# create subdir # create subdir
dirname = os.path.dirname(filename) dirname = os.path.dirname(filename)
if not os.path.exists(dirname): if not os.path.exists(dirname):

View file

@ -56,7 +56,7 @@ class Pasties(AbstractModule):
with open(domains_pasties) as f: with open(domains_pasties) as f:
for line in f: for line in f:
url = line.strip() url = line.strip()
if url: # TODO validate line if url: # TODO validate line
self.faup.decode(url) self.faup.decode(url)
url_decoded = self.faup.get() url_decoded = self.faup.get()
host = url_decoded['host'] host = url_decoded['host']
@ -135,7 +135,7 @@ class Pasties(AbstractModule):
if path.startswith(url_path): if path.startswith(url_path):
if url_path != path and url_path != path_end: if url_path != path and url_path != path_end:
print('send to crawler', url_path, url) print('send to crawler', url_path, url)
self.send_to_crawler(url, self.obj.id)) self.send_to_crawler(url, self.obj.id)
break break

View file

@ -76,6 +76,14 @@ class AbstractModule(ABC):
def get_obj(self): def get_obj(self):
return self.obj return self.obj
def set_obj(self, new_obj):
if self.obj:
old_id = self.obj.id
self.obj = new_obj
self.queue.rename_message_obj(self.obj.id, old_id)
else:
self.obj = new_obj
def get_message(self): def get_message(self):
""" """
Get message from the Redis Queue (QueueIn) Get message from the Redis Queue (QueueIn)

View file

@ -10,7 +10,10 @@ from base64 import b64encode
from distutils.dir_util import copy_tree from distutils.dir_util import copy_tree
sys.path.append(os.environ['AIL_BIN']) sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib.ConfigLoader import ConfigLoader
# Modules Classes # Modules Classes
from modules.ApiKey import ApiKey from modules.ApiKey import ApiKey
from modules.Categ import Categ from modules.Categ import Categ
@ -22,59 +25,58 @@ from modules.Onion import Onion
from modules.Telegram import Telegram from modules.Telegram import Telegram
# project packages # project packages
from lib.ConfigLoader import ConfigLoader
import lib.crawlers as crawlers
import lib.objects.Items as Items import lib.objects.Items as Items
#### COPY SAMPLES #### #### COPY SAMPLES ####
config_loader = ConfigLoader() config_loader = ConfigLoader()
# # TODO:move me in new Item package ITEMS_FOLDER = Items.ITEMS_FOLDER
ITEMS_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/'
ITEMS_FOLDER = os.path.join(os.path.realpath(ITEMS_FOLDER), '')
TESTS_ITEMS_FOLDER = os.path.join(ITEMS_FOLDER, 'tests') TESTS_ITEMS_FOLDER = os.path.join(ITEMS_FOLDER, 'tests')
sample_dir = os.path.join(os.environ['AIL_HOME'], 'samples') sample_dir = os.path.join(os.environ['AIL_HOME'], 'samples')
copy_tree(sample_dir, TESTS_ITEMS_FOLDER) copy_tree(sample_dir, TESTS_ITEMS_FOLDER)
#### ---- #### #### ---- ####
class Test_Module_ApiKey(unittest.TestCase): class TestModuleApiKey(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = ApiKey() self.module = ApiKey()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
item_id = 'tests/2021/01/01/api_keys.gz' item_id = 'tests/2021/01/01/api_keys.gz'
self.module.obj = Items.Item(item_id)
google_api_key = 'AIza00000000000000000000000_example-KEY' google_api_key = 'AIza00000000000000000000000_example-KEY'
aws_access_key = 'AKIAIOSFODNN7EXAMPLE' aws_access_key = 'AKIAIOSFODNN7EXAMPLE'
aws_secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' aws_secret_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
matches = self.module_obj.compute(f'{item_id} 3', r_result=True) matches = self.module.compute('3', r_result=True)
self.assertCountEqual(matches[0], {google_api_key}) self.assertCountEqual(matches[0], {google_api_key})
self.assertCountEqual(matches[1], {aws_access_key}) self.assertCountEqual(matches[1], {aws_access_key})
self.assertCountEqual(matches[2], {aws_secret_key}) self.assertCountEqual(matches[2], {aws_secret_key})
class Test_Module_Categ(unittest.TestCase): class TestModuleCateg(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = Categ() self.module = Categ()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
item_id = 'tests/2021/01/01/categ.gz' item_id = 'tests/2021/01/01/categ.gz'
self.module.obj = Items.Item(item_id)
test_categ = ['CreditCards', 'Mail', 'Onion', 'Urls', 'Credential', 'Cve'] test_categ = ['CreditCards', 'Mail', 'Onion', 'Urls', 'Credential', 'Cve']
result = self.module_obj.compute(item_id, r_result=True) result = self.module.compute(None, r_result=True)
print(result) print(result)
self.assertCountEqual(result, test_categ) self.assertCountEqual(result, test_categ)
class Test_Module_CreditCards(unittest.TestCase): class TestModuleCreditCards(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = CreditCards() self.module = CreditCards()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
item_id = 'tests/2021/01/01/credit_cards.gz 7' item_id = 'tests/2021/01/01/credit_cards.gz'
self.module.obj = Items.Item(item_id)
test_cards = ['341039324930797', # American Express test_cards = ['341039324930797', # American Express
'6011613905509166', # Discover Card '6011613905509166', # Discover Card
'3547151714018657', # Japan Credit Bureau (JCB) '3547151714018657', # Japan Credit Bureau (JCB)
@ -82,27 +84,27 @@ class Test_Module_CreditCards(unittest.TestCase):
'4024007132849695', # '4532525919781' # 16-digit VISA, with separators '4024007132849695', # '4532525919781' # 16-digit VISA, with separators
] ]
result = self.module_obj.compute(item_id, r_result=True) result = self.module.compute('7', r_result=True)
self.assertCountEqual(result, test_cards) self.assertCountEqual(result, test_cards)
class Test_Module_DomClassifier(unittest.TestCase): class TestModuleDomClassifier(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = DomClassifier() self.module = DomClassifier()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
test_host = 'foo.be' test_host = 'foo.be'
item_id = 'tests/2021/01/01/domain_classifier.gz' item_id = 'tests/2021/01/01/domain_classifier.gz'
msg = f'{test_host} {item_id}' self.module.obj = Items.Item(item_id)
result = self.module_obj.compute(msg, r_result=True) result = self.module.compute(f'{test_host}', r_result=True)
self.assertTrue(len(result)) self.assertTrue(len(result))
class Test_Module_Global(unittest.TestCase): class TestModuleGlobal(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = Global() self.module = Global()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
# # TODO: delete item # # TODO: delete item
@ -113,25 +115,25 @@ class Test_Module_Global(unittest.TestCase):
item_content = b'Lorem ipsum dolor sit amet, consectetur adipiscing elit' item_content = b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'
item_content_1 = b64encode(gzip.compress(item_content)).decode() item_content_1 = b64encode(gzip.compress(item_content)).decode()
item_content_2 = b64encode(gzip.compress(item_content + b' more text ...')).decode() item_content_2 = b64encode(gzip.compress(item_content + b' more text ...')).decode()
message = f'{item_id} {item_content_1}'
self.module.obj = Items.Item(item_id)
# Test new item # Test new item
result = self.module_obj.compute(message, r_result=True) result = self.module.compute(item_content_1, r_result=True)
print(f'test new item: {result}') print(f'test new item: {result}')
self.assertEqual(result, item_id) self.assertEqual(result, item_id)
# Test duplicate # Test duplicate
result = self.module_obj.compute(message, r_result=True) result = self.module.compute(item_content_1, r_result=True)
print(f'test duplicate {result}') print(f'test duplicate {result}')
self.assertIsNone(result) self.assertIsNone(result)
# Test same id with != content # Test same id with != content
item = Items.Item('tests/2021/01/01/global_831875da824fc86ab5cc0e835755b520.gz') item = Items.Item('tests/2021/01/01/global_831875da824fc86ab5cc0e835755b520.gz')
item.delete() item.delete()
message = f'{item_id} {item_content_2}' result = self.module.compute(item_content_2, r_result=True)
result = self.module_obj.compute(message, r_result=True)
print(f'test same id with != content: {result}') print(f'test same id with != content: {result}')
self.assertIn(item_id[:-3], result) self.assertIn(item_id[:-3], result)
print(result)
self.assertNotEqual(result, item_id) self.assertNotEqual(result, item_id)
# cleanup # cleanup
@ -139,40 +141,43 @@ class Test_Module_Global(unittest.TestCase):
# item.delete() # item.delete()
# # TODO: remove from queue # # TODO: remove from queue
class Test_Module_Keys(unittest.TestCase): class TestModuleKeys(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = Keys() self.module = Keys()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
item_id = 'tests/2021/01/01/keys.gz' item_id = 'tests/2021/01/01/keys.gz'
self.module.obj = Items.Item(item_id)
# # TODO: check results # # TODO: check results
result = self.module_obj.compute(item_id) result = self.module.compute(None)
class Test_Module_Onion(unittest.TestCase): class TestModuleOnion(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = Onion() self.module = Onion()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
item_id = 'tests/2021/01/01/onion.gz' item_id = 'tests/2021/01/01/onion.gz'
self.module.obj = Items.Item(item_id)
domain_1 = 'eswpccgr5xyovsahffkehgleqthrasfpfdblwbs4lstd345dwq5qumqd.onion' domain_1 = 'eswpccgr5xyovsahffkehgleqthrasfpfdblwbs4lstd345dwq5qumqd.onion'
domain_2 = 'www.facebookcorewwwi.onion' domain_2 = 'www.facebookcorewwwi.onion'
self.module_obj.compute(f'{item_id} 3') self.module.compute(f'3')
class Test_Module_Telegram(unittest.TestCase): class TestModuleTelegram(unittest.TestCase):
def setUp(self): def setUp(self):
self.module_obj = Telegram() self.module = Telegram()
self.module_obj.debug = True self.module.debug = True
def test_module(self): def test_module(self):
item_id = 'tests/2021/01/01/keys.gz' item_id = 'tests/2021/01/01/keys.gz'
self.module.obj = Items.Item(item_id)
# # TODO: check results # # TODO: check results
result = self.module_obj.compute(item_id) result = self.module.compute(None)
if __name__ == '__main__': if __name__ == '__main__':