ail-framework/bin/Mail.py

216 lines
6.8 KiB
Python
Raw Permalink Normal View History

2018-05-04 11:53:29 +00:00
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
2020-05-11 09:11:05 +00:00
The Mails Module
======================
This module is consuming the Redis-list created by the Categ module.
2020-05-11 09:11:05 +00:00
It apply mail regexes on item content and warn if above a threshold.
"""
2020-05-04 12:08:12 +00:00
import os
2020-05-04 12:09:14 +00:00
import re
2020-05-04 12:08:12 +00:00
import sys
2014-08-14 15:55:18 +00:00
import redis
import time
2018-07-30 07:21:22 +00:00
import datetime
2020-05-11 09:11:05 +00:00
import dns.resolver
import dns.exception
from multiprocessing import Process as Proc
from multiprocessing import Queue
2020-05-11 09:11:05 +00:00
from pubsublogger import publisher
from Helper import Process
2020-05-11 09:11:05 +00:00
from pyfaup.faup import Faup
2020-05-04 12:05:04 +00:00
2020-05-11 09:11:05 +00:00
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
import Item
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
## LOAD CONFIG ##
config_loader = ConfigLoader.ConfigLoader()
server_statistics = config_loader.get_redis_conn("ARDB_Statistics")
r_serv_cache = config_loader.get_redis_conn("Redis_Cache")
dns_server = config_loader.get_config_str('Mail', 'dns')
config_loader = None
## -- ##
2020-05-11 09:11:05 +00:00
def is_mxdomain_in_cache(mxdomain):
return r_serv_cache.exists('mxdomain:{}'.format(mxdomain))
def save_mxdomain_in_cache(mxdomain):
r_serv_cache.setex(mxdomain, 1, datetime.timedelta(days=1))
def check_mx_record(set_mxdomains, dns_server):
"""Check if emails MX domains are responding.
:param adress_set: -- (set) This is a set of emails domains
:return: (int) Number of adress with a responding and valid MX domains
"""
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
resolver.timeout = 5.0
resolver.lifetime = 2.0
valid_mxdomain = []
for mxdomain in set_mxdomains:
# check if is in cache
# # TODO:
if is_mxdomain_in_cache(mxdomain):
valid_mxdomain.append(mxdomain)
else:
# DNS resolution
try:
answers = resolver.query(mxdomain, rdtype=dns.rdatatype.MX)
if answers:
save_mxdomain_in_cache(mxdomain)
valid_mxdomain.append(mxdomain)
# DEBUG
# print('---')
# print(answers.response)
# print(answers.qname)
# print(answers.rdtype)
# print(answers.rdclass)
# print(answers.nameserver)
# print()
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
print('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
print('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
print('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
#save_mxdomain_in_cache(mxdomain)
publisher.debug('The query name does not exist.')
print('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
print('The Label is too long')
except dns.exception.Timeout:
print('dns timeout')
#save_mxdomain_in_cache(mxdomain)
except Exception as e:
print(e)
return valid_mxdomain
2020-05-04 12:05:04 +00:00
def extract_all_emails(queue, item_content):
queue.put(re.findall(email_regex, item_content))
2014-08-20 13:14:57 +00:00
if __name__ == "__main__":
publisher.port = 6380
2014-08-20 13:14:57 +00:00
publisher.channel = "Script"
2014-08-14 15:55:18 +00:00
config_section = 'Mail'
faup = Faup()
p = Process(config_section)
2020-05-11 09:11:05 +00:00
publisher.info("Mails module started")
# Numbers of Mails needed to Tags
mail_threshold = 10
max_execution_time = 30
email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
2020-05-11 09:11:05 +00:00
q = Queue()
while True:
2020-05-11 09:15:12 +00:00
message = p.get_from_set()
2014-09-04 09:46:07 +00:00
2020-05-04 12:05:04 +00:00
if message is not None:
item_id, score = message.split()
2020-05-12 13:49:35 +00:00
print(item_id)
2020-05-11 09:11:05 +00:00
item_content = Item.get_item_content(item_id)
proc = Proc(target=extract_all_emails, args=(q, item_content))
proc.start()
2020-05-11 09:11:05 +00:00
try:
proc.join(max_execution_time)
if proc.is_alive():
proc.terminate()
p.incr_module_timeout_statistic()
err_mess = "Mails: processing timeout: {}".format(item_id)
print(err_mess)
publisher.info(err_mess)
continue
else:
all_emails = q.get()
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
proc.terminate()
sys.exit(0)
2020-05-11 09:11:05 +00:00
# filtering duplicate
all_emails = set(all_emails)
# get MXdomains
set_mxdomains = set()
dict_mxdomains_email = {}
for email in all_emails:
mxdomain = email.split('@')[1].lower()
if not mxdomain in dict_mxdomains_email:
dict_mxdomains_email[mxdomain] = []
set_mxdomains.add(mxdomain)
dict_mxdomains_email[mxdomain].append(email)
## TODO: add MAIL trackers
valid_mx = check_mx_record(set_mxdomains, dns_server)
item_date = Item.get_item_date(item_id)
2020-05-11 09:11:05 +00:00
num_valid_email = 0
for domain_mx in valid_mx:
num_valid_email += len(dict_mxdomains_email[domain_mx])
for email in dict_mxdomains_email[domain_mx]:
msg = 'mail;{};{};{}'.format(1, email, item_date)
p.populate_set_out(msg, 'ModuleStats')
# Create country stats
faup.decode(email)
tld = faup.get()['tld']
try:
tld = tld.decode()
except:
pass
server_statistics.hincrby('mail_by_tld:{}'.format(item_date), tld, 1)
msg = 'Mails;{};{};{};Checked {} e-mail(s);{}'.format(Item.get_source(item_id), item_date, Item.get_item_basename(item_id), num_valid_email, item_id)
if num_valid_email > mail_threshold:
2020-05-11 09:22:09 +00:00
print('{} Checked {} e-mail(s)'.format(item_id, num_valid_email))
2020-05-11 09:11:05 +00:00
publisher.warning(msg)
#Send to duplicate
p.populate_set_out(item_id, 'Duplicate')
#tags
msg = 'infoleak:automatic-detection="mail";{}'.format(item_id)
p.populate_set_out(msg, 'Tags')
else:
publisher.info(msg)
2014-09-04 09:46:07 +00:00
else:
time.sleep(10)