ail-framework/bin/modules/Mail.py

189 lines
6.5 KiB
Python
Raw Normal View History

2022-09-08 08:31:57 +00:00
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Mails Module
======================
This module is consuming the Redis-list created by the Categ module.
It apply mail regexes on item content and warn if above a threshold.
"""
import os
import re
import sys
import datetime
import dns.resolver
import dns.exception
from pyfaup.faup import Faup
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages #
##################################
from modules.abstract_module import AbstractModule
from lib.objects.Items import Item
from lib.ConfigLoader import ConfigLoader
2023-03-30 13:23:41 +00:00
# from lib import Statistics
2022-09-08 08:31:57 +00:00
class Mail(AbstractModule):
"""
Module Mail module for AIL framework
"""
def __init__(self):
super(Mail, self).__init__()
config_loader = ConfigLoader()
self.r_cache = config_loader.get_redis_conn("Redis_Cache")
self.dns_server = config_loader.get_config_str('Mail', 'dns')
self.faup = Faup()
# Numbers of Mails needed to Tags
self.mail_threshold = 10
self.regex_timeout = 30
self.email_regex = r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
2022-09-08 08:31:57 +00:00
re.compile(self.email_regex)
def is_mxdomain_in_cache(self, mxdomain):
return self.r_cache.exists(f'mxdomain:{mxdomain}')
def save_mxdomain_in_cache(self, mxdomain):
self.r_cache.setex(f'mxdomain:{mxdomain}', datetime.timedelta(days=1), 1)
2022-09-08 08:31:57 +00:00
def check_mx_record(self, set_mxdomains):
"""Check if emails MX domains are responding.
:param set_mxdomains: -- (set) This is a set of emails domains
:return: (int) Number of address with a responding and valid MX domains
2022-09-08 08:31:57 +00:00
"""
resolver = dns.resolver.Resolver()
resolver.nameservers = [self.dns_server]
resolver.timeout = 5.0
resolver.lifetime = 2.0
valid_mxdomain = []
for mxdomain in set_mxdomains:
# check if is in cache
# # TODO:
if self.is_mxdomain_in_cache(mxdomain):
valid_mxdomain.append(mxdomain)
else:
# DNS resolution
try:
answers = resolver.query(mxdomain, rdtype=dns.rdatatype.MX)
if answers:
self.save_mxdomain_in_cache(mxdomain)
valid_mxdomain.append(mxdomain)
# DEBUG
# print('---')
# print(answers.response)
# print(answers.qname)
# print(answers.rdtype)
# print(answers.rdclass)
# print(answers.nameserver)
# print()
except dns.resolver.NoNameservers:
self.redis_logger.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
print('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
self.redis_logger.debug('NoAnswer, The response did not contain an answer to the question.')
print('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
self.redis_logger.debug('SyntaxError: EmptyLabel')
print('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
# save_mxdomain_in_cache(mxdomain)
2022-09-08 08:31:57 +00:00
self.redis_logger.debug('The query name does not exist.')
print('The query name does not exist.')
except dns.name.LabelTooLong:
self.redis_logger.debug('The Label is too long')
print('The Label is too long')
except dns.exception.Timeout:
print('dns timeout')
# save_mxdomain_in_cache(mxdomain)
2022-09-08 08:31:57 +00:00
except Exception as e:
print(e)
return valid_mxdomain
def extract(self, obj_id, content, tag):
extracted = []
mxdomains = {}
mails = self.regex_finditer(self.email_regex, obj_id, content)
for mail in mails:
start, end, value = mail
mxdomain = value.rsplit('@', 1)[1].lower()
if mxdomain not in mxdomains:
mxdomains[mxdomain] = []
mxdomains[mxdomain].append(mail)
for mx in self.check_mx_record(mxdomains.keys()):
for row in mxdomains[mx]:
extracted.append([row[0], row[1], row[2], f'tag:{tag}'])
return extracted
# # TODO: sanitize mails
2022-09-08 08:31:57 +00:00
def compute(self, message):
item_id, score = message.split()
item = Item(item_id)
item_date = item.get_date()
mails = self.regex_findall(self.email_regex, item_id, item.get_content())
mxdomains_email = {}
for mail in mails:
mxdomain = mail.rsplit('@', 1)[1].lower()
if not mxdomain in mxdomains_email:
mxdomains_email[mxdomain] = set()
mxdomains_email[mxdomain].add(mail)
# # TODO: add MAIL trackers
2022-09-08 08:31:57 +00:00
valid_mx = self.check_mx_record(mxdomains_email.keys())
print(f'valid_mx: {valid_mx}')
mx_tlds = {}
num_valid_email = 0
for domain_mx in valid_mx:
nb_mails = len(mxdomains_email[domain_mx])
num_valid_email += nb_mails
# Create domain_mail stats
2022-09-08 08:31:57 +00:00
msg = f'mail;{nb_mails};{domain_mx};{item_date}'
self.send_message_to_queue(msg, 'ModuleStats')
# Create country stats
self.faup.decode(domain_mx)
tld = self.faup.get()['tld']
try:
tld = tld.decode()
except:
pass
mx_tlds[tld] = mx_tlds.get(tld, 0) + nb_mails
2023-03-30 13:23:41 +00:00
# for tld in mx_tlds:
# Statistics.add_module_tld_stats_by_date('mail', item_date, tld, mx_tlds[tld])
2022-09-08 08:31:57 +00:00
msg = f'Mails;{item.get_source()};{item_date};{item.get_basename()};Checked {num_valid_email} e-mail(s);{item_id}'
2022-09-08 08:31:57 +00:00
if num_valid_email > self.mail_threshold:
print(f'{item_id} Checked {num_valid_email} e-mail(s)')
self.redis_logger.warning(msg)
# Tags
msg = f'infoleak:automatic-detection="mail";{item_id}'
self.send_message_to_queue(msg, 'Tags')
else:
self.redis_logger.info(msg)
if __name__ == '__main__':
module = Mail()
module.run()