From 17af705231592951ac3190cd4999d7a7c7a9d427 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Wed, 5 Jun 2019 16:20:26 +0200 Subject: [PATCH] fix: [PgpDump] process large pgp blocks --- bin/PgpDump.py | 125 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 86 insertions(+), 39 deletions(-) diff --git a/bin/PgpDump.py b/bin/PgpDump.py index 0dc43621..21ffd263 100755 --- a/bin/PgpDump.py +++ b/bin/PgpDump.py @@ -9,6 +9,7 @@ import os import re import time +import uuid import redis import signal import datetime @@ -28,6 +29,15 @@ def timeout_handler(signum, frame): signal.signal(signal.SIGALRM, timeout_handler) +# save pgp message in directory, process one time each day +def save_in_file(message, pgp_content): + print('save in file') + UUID = str(uuid.uuid4()) + file_name = os.path.join(pgp_dump_dir_to_process, UUID) + with open(file_name, 'w') as f: + f.write(pgp_content) + r_serv_db.sadd('pgpdumb:uuid', '{};{}'.format(UUID, message)) + def remove_html(item_content): if bool(BeautifulSoup(item_content, "html.parser").find()): soup = BeautifulSoup(item_content, 'html.parser') @@ -41,33 +51,44 @@ def remove_html(item_content): else: return item_content -def extract_all_id(item_content, regex): - # max execution time on regex - signal.alarm(max_execution_time) - try: - pgp_extracted_block = re.findall(regex, item_content) - except TimeoutException: - pgp_extracted_block = [] - p.incr_module_timeout_statistic() # add encoder type - print ("{0} processing timeout".format(paste.p_rel_path)) - else: - signal.alarm(0) +def extract_all_id(message, item_content, regex=None, is_file=False): - for pgp_to_dump in pgp_extracted_block: - pgp_packet = get_pgp_packet(pgp_to_dump) + if is_file: + pgp_packet = get_pgp_packet_file(item_content) extract_id_from_output(pgp_packet) -def get_pgp_packet(save_path): - save_path = '{}'.format(save_path) - print (len(save_path)) - if len(save_path) > 131072: - save_path = save_path[:131071] - process1 = subprocess.Popen([ 'echo', '-e', save_path], stdout=subprocess.PIPE) - process2 = subprocess.Popen([ 'pgpdump'], stdin=process1.stdout, stdout=subprocess.PIPE) - process1.stdout.close() - output = process2.communicate()[0].decode() - return output + else: + # max execution time on regex + signal.alarm(max_execution_time) + try: + pgp_extracted_block = re.findall(regex, item_content) + except TimeoutException: + pgp_extracted_block = [] + p.incr_module_timeout_statistic() # add encoder type + print ("{0} processing timeout".format(paste.p_rel_path)) + else: + signal.alarm(0) + for pgp_to_dump in pgp_extracted_block: + pgp_packet = get_pgp_packet(message, pgp_to_dump) + extract_id_from_output(pgp_packet) + +def get_pgp_packet(message, save_path): + save_path = '{}'.format(save_path) + if len(save_path) > 131072: + save_in_file(message, save_path) + return '' + else: + process1 = subprocess.Popen([ 'echo', '-e', save_path], stdout=subprocess.PIPE) + process2 = subprocess.Popen([ 'pgpdump'], stdin=process1.stdout, stdout=subprocess.PIPE) + process1.stdout.close() + output = process2.communicate()[0].decode() + return output + +def get_pgp_packet_file(file): + process1 = subprocess.Popen([ 'pgpdump', file], stdout=subprocess.PIPE) + output = process1.communicate()[0].decode() + return output def extract_id_from_output(pgp_dump_outpout): all_user_id = set(re.findall(regex_user_id, pgp_dump_outpout)) @@ -131,6 +152,12 @@ if __name__ == '__main__': # Setup the I/O queues p = Process(config_section) + r_serv_db = redis.StrictRedis( + host=p.config.get("ARDB_DB", "host"), + port=p.config.getint("ARDB_DB", "port"), + db=p.config.getint("ARDB_DB", "db"), + decode_responses=True) + serv_metadata = redis.StrictRedis( host=p.config.get("ARDB_Metadata", "host"), port=p.config.getint("ARDB_Metadata", "port"), @@ -140,6 +167,11 @@ if __name__ == '__main__': # Sent to the logging a description of the module publisher.info("PgpDump started") + # check/create pgpdump queue directory (used for huge pgp blocks) + pgp_dump_dir_to_process = os.path.join(os.environ['AIL_HOME'], 'temp', 'pgpdump') + if not os.path.isdir(pgp_dump_dir_to_process): + os.makedirs(pgp_dump_dir_to_process) + user_id_str = 'User ID - ' regex_user_id= '{}.+'.format(user_id_str) @@ -159,28 +191,43 @@ if __name__ == '__main__': # Endless loop getting messages from the input queue while True: - # Get one message from the input queue - message = p.get_from_set() - - if message is None: - publisher.debug("{} queue is empty, waiting".format(config_section)) - time.sleep(1) - continue - + is_file = False set_key = set() set_name = set() set_mail = set() - paste = Paste.Paste(message) - # Do something with the message from the queue - date = str(paste._get_p_date()) - content = paste.get_p_content() - content = remove_html(content) + if r_serv_db.scard('pgpdumb:uuid') > 0: + res = r_serv_db.spop('pgpdumb:uuid') + file_to_process, message = res.split(';', 1) + file_to_process = os.path.join(pgp_dump_dir_to_process, file_to_process) + date = datetime.datetime.now().strftime("%Y/%m/%d") + paste = Paste.Paste(message) + date = str(paste._get_p_date()) + print(message) + extract_all_id(message, file_to_process, is_file=True) + os.remove(file_to_process) - extract_all_id(content, regex_pgp_public_blocs) - extract_all_id(content, regex_pgp_signature) - extract_all_id(content, regex_pgp_message) + else: + # Get one message from the input queue + message = p.get_from_set() + + if message is None: + publisher.debug("{} queue is empty, waiting".format(config_section)) + time.sleep(1) + continue + + paste = Paste.Paste(message) + + date = str(paste._get_p_date()) + content = paste.get_p_content() + content = remove_html(content) + + print(message) + + extract_all_id(message, content, regex_pgp_public_blocs) + extract_all_id(message, content, regex_pgp_signature) + extract_all_id(message, content, regex_pgp_message) for key_id in set_key: print(key_id)