mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-23 06:37:15 +00:00
fix: [Mail module] replace signal by multiprocessing (https://docs.python.org/3.4/library/signal.html#execution-of-python-signal-handlers)
This commit is contained in:
parent
ae5c7cdac1
commit
3f6efadcc4
1 changed files with 29 additions and 29 deletions
48
bin/Mail.py
48
bin/Mail.py
|
@ -21,25 +21,14 @@ import datetime
|
||||||
import dns.resolver
|
import dns.resolver
|
||||||
import dns.exception
|
import dns.exception
|
||||||
|
|
||||||
|
from multiprocessing import Process as Proc
|
||||||
|
from multiprocessing import Queue
|
||||||
|
|
||||||
from pubsublogger import publisher
|
from pubsublogger import publisher
|
||||||
from Helper import Process
|
from Helper import Process
|
||||||
|
|
||||||
from pyfaup.faup import Faup
|
from pyfaup.faup import Faup
|
||||||
|
|
||||||
## REGEX TIMEOUT ##
|
|
||||||
import signal
|
|
||||||
|
|
||||||
def timeout_handler(signum, frame):
|
|
||||||
raise TimeoutException()
|
|
||||||
|
|
||||||
class TimeoutException(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
signal.signal(signal.SIGALRM, timeout_handler)
|
|
||||||
max_execution_time = 20
|
|
||||||
## -- ##
|
|
||||||
|
|
||||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
|
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
|
||||||
import Item
|
import Item
|
||||||
|
|
||||||
|
@ -55,6 +44,7 @@ dns_server = config_loader.get_config_str('Mail', 'dns')
|
||||||
|
|
||||||
config_loader = None
|
config_loader = None
|
||||||
## -- ##
|
## -- ##
|
||||||
|
|
||||||
def is_mxdomain_in_cache(mxdomain):
|
def is_mxdomain_in_cache(mxdomain):
|
||||||
return r_serv_cache.exists('mxdomain:{}'.format(mxdomain))
|
return r_serv_cache.exists('mxdomain:{}'.format(mxdomain))
|
||||||
|
|
||||||
|
@ -120,6 +110,9 @@ def check_mx_record(set_mxdomains, dns_server):
|
||||||
print(e)
|
print(e)
|
||||||
return valid_mxdomain
|
return valid_mxdomain
|
||||||
|
|
||||||
|
def extract_all_emails(queue, item_content):
|
||||||
|
queue.put(re.findall(email_regex, item_content))
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
publisher.port = 6380
|
publisher.port = 6380
|
||||||
publisher.channel = "Script"
|
publisher.channel = "Script"
|
||||||
|
@ -135,8 +128,12 @@ if __name__ == "__main__":
|
||||||
# Numbers of Mails needed to Tags
|
# Numbers of Mails needed to Tags
|
||||||
mail_threshold = 10
|
mail_threshold = 10
|
||||||
|
|
||||||
|
max_execution_time = 30
|
||||||
|
|
||||||
email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
|
email_regex = "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}"
|
||||||
|
|
||||||
|
q = Queue()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
message = p.get_from_set()
|
message = p.get_from_set()
|
||||||
|
|
||||||
|
@ -144,23 +141,24 @@ if __name__ == "__main__":
|
||||||
item_id, score = message.split()
|
item_id, score = message.split()
|
||||||
|
|
||||||
item_content = Item.get_item_content(item_id)
|
item_content = Item.get_item_content(item_id)
|
||||||
item_date = Item.get_item_date(item_id)
|
|
||||||
|
|
||||||
#print(item_id)
|
proc = Proc(target=extract_all_emails, args=(q, item_content))
|
||||||
|
proc.start()
|
||||||
# Get all emails address
|
|
||||||
signal.alarm(30)
|
|
||||||
try:
|
try:
|
||||||
all_emails = re.findall(email_regex, item_content)
|
proc.join(max_execution_time)
|
||||||
except TimeoutException:
|
if proc.is_alive():
|
||||||
|
proc.terminate()
|
||||||
p.incr_module_timeout_statistic()
|
p.incr_module_timeout_statistic()
|
||||||
err_mess = "Mails: processing timeout: {}".format(item_id)
|
err_mess = "Mails: processing timeout: {}".format(item_id)
|
||||||
print(err_mess)
|
print(err_mess)
|
||||||
publisher.info(err_mess)
|
publisher.info(err_mess)
|
||||||
signal.signal(signal.SIGALRM, timeout_handler)
|
|
||||||
continue
|
continue
|
||||||
finally:
|
else:
|
||||||
signal.alarm(0)
|
all_emails = q.get()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Caught KeyboardInterrupt, terminating workers")
|
||||||
|
proc.terminate()
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
# filtering duplicate
|
# filtering duplicate
|
||||||
all_emails = set(all_emails)
|
all_emails = set(all_emails)
|
||||||
|
@ -179,6 +177,8 @@ if __name__ == "__main__":
|
||||||
|
|
||||||
valid_mx = check_mx_record(set_mxdomains, dns_server)
|
valid_mx = check_mx_record(set_mxdomains, dns_server)
|
||||||
|
|
||||||
|
item_date = Item.get_item_date(item_id)
|
||||||
|
|
||||||
num_valid_email = 0
|
num_valid_email = 0
|
||||||
for domain_mx in valid_mx:
|
for domain_mx in valid_mx:
|
||||||
num_valid_email += len(dict_mxdomains_email[domain_mx])
|
num_valid_email += len(dict_mxdomains_email[domain_mx])
|
||||||
|
|
Loading…
Reference in a new issue