ail-framework/bin/packages/lib_refine.py

153 lines
5.1 KiB
Python
Raw Normal View History

2018-05-04 11:53:29 +00:00
#!/usr/bin/python3
import os
import re
import sys
import dns.resolver
from pubsublogger import publisher
from datetime import timedelta
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
config_loader = ConfigLoader.ConfigLoader()
dns_server = config_loader.get_config_str("Web", "dns")
config_loader = None
def is_luhn_valid(card_number):
"""Apply the Luhn algorithm to validate credit card.
:param card_number: -- (int) card number
"""
r = [int(ch) for ch in str(card_number)][::-1]
return (sum(r[0::2]) + sum(sum(divmod(d*2, 10)) for d in r[1::2])) % 10 == 0
2020-05-04 12:05:04 +00:00
def checking_MX_record(r_serv, MXdomains, addr_dns):
"""Check if emails MX domains are responding.
:param r_serv: -- Redis connexion database
:param adress_set: -- (set) This is a set of emails adress
:return: (int) Number of adress with a responding and valid MX domains
This function will split the email adress and try to resolve their domains
names: on example@gmail.com it will try to resolve gmail.com
"""
score = 0
WalidMX = set([])
2018-07-30 07:21:22 +00:00
validMX = {}
2020-05-04 12:05:04 +00:00
num = len(MXdomains)
2014-09-04 09:46:07 +00:00
resolver = dns.resolver.Resolver()
resolver.nameservers = [addr_dns]
2014-09-04 09:46:07 +00:00
resolver.timeout = 5
resolver.lifetime = 2
if MXdomains != []:
2018-07-30 07:21:22 +00:00
for MXdomain in MXdomains:
try:
2018-07-30 07:21:22 +00:00
MXdomain = MXdomain[1:]
# Already in Redis living.
2018-07-30 07:21:22 +00:00
if r_serv.exists(MXdomain):
score += 1
2018-07-30 07:21:22 +00:00
WalidMX.add(MXdomain)
validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
# Not already in Redis
else:
# If I'm Walid MX domain
2018-07-30 07:21:22 +00:00
if resolver.query(MXdomain, rdtype=dns.rdatatype.MX):
# Gonna be added in redis.
2018-07-30 07:21:22 +00:00
r_serv.setex(MXdomain, 1, timedelta(days=1))
score += 1
2018-07-30 07:21:22 +00:00
WalidMX.add(MXdomain)
validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
else:
pass
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
print('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
print('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
print('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
2014-09-04 09:46:07 +00:00
r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
publisher.debug('The query name does not exist.')
print('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
print('The Label is too long')
2014-09-04 09:46:07 +00:00
except dns.resolver.Timeout:
print('timeout')
2018-07-30 07:21:22 +00:00
r_serv.setex(MXdomain, 1, timedelta(days=1))
2014-09-04 09:46:07 +00:00
except Exception as e:
2018-04-16 12:50:04 +00:00
print(e)
publisher.debug("emails before: {0} after: {1} (valid)".format(num, score))
2018-07-30 07:21:22 +00:00
#return (num, WalidMX)
return (num, validMX)
def checking_A_record(r_serv, domains_set):
score = 0
num = len(domains_set)
WalidA = set([])
2014-09-04 09:46:07 +00:00
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
2014-09-04 09:46:07 +00:00
resolver.timeout = 5
resolver.lifetime = 2
for Adomain in domains_set:
try:
# Already in Redis living.
if r_serv.exists(Adomain):
score += 1
WalidA.add(Adomain)
# Not already in Redis
else:
# If I'm Walid domain
2014-09-04 09:46:07 +00:00
if resolver.query(Adomain, rdtype=dns.rdatatype.A):
# Gonna be added in redis.
2014-09-04 09:46:07 +00:00
r_serv.setex(Adomain, 1, timedelta(days=1))
score += 1
WalidA.add(Adomain)
else:
pass
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
2014-09-04 09:46:07 +00:00
r_serv.setex(Adomain[1:], 1, timedelta(days=1))
publisher.debug('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
2014-09-04 09:46:07 +00:00
except Exception as e:
2018-04-16 12:50:04 +00:00
print(e)
publisher.debug("URLs before: {0} after: {1} (valid)".format(num, score))
return (num, WalidA)