ail-framework/bin/packages/lib_refine.py
2023-04-04 14:12:23 +02:00

155 lines
5.2 KiB
Python

#!/usr/bin/python3
import os
import sys
import dns.resolver
import dns.exception
from pubsublogger import publisher
from datetime import timedelta
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import ConfigLoader
config_loader = ConfigLoader.ConfigLoader()
dns_server = config_loader.get_config_str("DomClassifier", "dns")
config_loader = None
def is_luhn_valid(card_number):
"""Apply the Luhn algorithm to validate credit card.
:param card_number: -- (int) card number
"""
r = [int(ch) for ch in str(card_number)][::-1]
return (sum(r[0::2]) + sum(sum(divmod(d*2, 10)) for d in r[1::2])) % 10 == 0
def checking_MX_record(r_serv, MXdomains, addr_dns):
"""Check if emails MX domains are responding.
:param r_serv: -- Redis connexion database
:param adress_set: -- (set) This is a set of emails adress
:return: (int) Number of adress with a responding and valid MX domains
This function will split the email adress and try to resolve their domains
names: on example@gmail.com it will try to resolve gmail.com
"""
score = 0
WalidMX = set([])
validMX = {}
num = len(MXdomains)
resolver = dns.resolver.Resolver()
resolver.nameservers = [addr_dns]
resolver.timeout = 5.0
resolver.lifetime = 2.0
if MXdomains != []:
for MXdomain in MXdomains:
try:
MXdomain = MXdomain[1:]
# Already in Redis living.
if r_serv.exists(MXdomain):
score += 1
WalidMX.add(MXdomain)
validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
# Not already in Redis
else:
# If I'm Walid MX domain
if resolver.query(MXdomain, rdtype=dns.rdatatype.MX):
# Gonna be added in redis.
r_serv.setex(MXdomain, 1, timedelta(days=1))
score += 1
WalidMX.add(MXdomain)
validMX[MXdomain] = validMX.get(MXdomain, 0) + 1
else:
pass
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
print('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
print('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
print('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
r_serv.setex(MXdomain[1:], 1, timedelta(days=1))
publisher.debug('The query name does not exist.')
print('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
print('The Label is too long')
except dns.exception.Timeout:
print('dns timeout')
r_serv.setex(MXdomain, 1, timedelta(days=1))
except Exception as e:
print(e)
publisher.debug("emails before: {0} after: {1} (valid)".format(num, score))
#return (num, WalidMX)
return (num, validMX)
def checking_A_record(r_serv, domains_set):
score = 0
num = len(domains_set)
WalidA = set([])
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
resolver.timeout = 5.0
resolver.lifetime = 2.0
for Adomain in domains_set:
try:
# Already in Redis living.
if r_serv.exists(Adomain):
score += 1
WalidA.add(Adomain)
# Not already in Redis
else:
# If I'm Walid domain
if resolver.query(Adomain, rdtype=dns.rdatatype.A):
# Gonna be added in redis.
r_serv.setex(Adomain, 1, timedelta(days=1))
score += 1
WalidA.add(Adomain)
else:
pass
except dns.resolver.NoNameservers:
publisher.debug('NoNameserver, No non-broken nameservers are available to answer the query.')
except dns.resolver.NoAnswer:
publisher.debug('NoAnswer, The response did not contain an answer to the question.')
except dns.name.EmptyLabel:
publisher.debug('SyntaxError: EmptyLabel')
except dns.resolver.NXDOMAIN:
r_serv.setex(Adomain[1:], 1, timedelta(days=1))
publisher.debug('The query name does not exist.')
except dns.name.LabelTooLong:
publisher.debug('The Label is too long')
except Exception as e:
print(e)
publisher.debug("URLs before: {0} after: {1} (valid)".format(num, score))
return (num, WalidA)