mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-10 08:38:28 +00:00
fix: [PgpDump] process large pgp blocks
This commit is contained in:
parent
1929b2c6a7
commit
17af705231
1 changed files with 86 additions and 39 deletions
125
bin/PgpDump.py
125
bin/PgpDump.py
|
@ -9,6 +9,7 @@
|
|||
import os
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
import redis
|
||||
import signal
|
||||
import datetime
|
||||
|
@ -28,6 +29,15 @@ def timeout_handler(signum, frame):
|
|||
|
||||
signal.signal(signal.SIGALRM, timeout_handler)
|
||||
|
||||
# save pgp message in directory, process one time each day
|
||||
def save_in_file(message, pgp_content):
|
||||
print('save in file')
|
||||
UUID = str(uuid.uuid4())
|
||||
file_name = os.path.join(pgp_dump_dir_to_process, UUID)
|
||||
with open(file_name, 'w') as f:
|
||||
f.write(pgp_content)
|
||||
r_serv_db.sadd('pgpdumb:uuid', '{};{}'.format(UUID, message))
|
||||
|
||||
def remove_html(item_content):
|
||||
if bool(BeautifulSoup(item_content, "html.parser").find()):
|
||||
soup = BeautifulSoup(item_content, 'html.parser')
|
||||
|
@ -41,33 +51,44 @@ def remove_html(item_content):
|
|||
else:
|
||||
return item_content
|
||||
|
||||
def extract_all_id(item_content, regex):
|
||||
# max execution time on regex
|
||||
signal.alarm(max_execution_time)
|
||||
try:
|
||||
pgp_extracted_block = re.findall(regex, item_content)
|
||||
except TimeoutException:
|
||||
pgp_extracted_block = []
|
||||
p.incr_module_timeout_statistic() # add encoder type
|
||||
print ("{0} processing timeout".format(paste.p_rel_path))
|
||||
else:
|
||||
signal.alarm(0)
|
||||
def extract_all_id(message, item_content, regex=None, is_file=False):
|
||||
|
||||
for pgp_to_dump in pgp_extracted_block:
|
||||
pgp_packet = get_pgp_packet(pgp_to_dump)
|
||||
if is_file:
|
||||
pgp_packet = get_pgp_packet_file(item_content)
|
||||
extract_id_from_output(pgp_packet)
|
||||
|
||||
def get_pgp_packet(save_path):
|
||||
save_path = '{}'.format(save_path)
|
||||
print (len(save_path))
|
||||
if len(save_path) > 131072:
|
||||
save_path = save_path[:131071]
|
||||
process1 = subprocess.Popen([ 'echo', '-e', save_path], stdout=subprocess.PIPE)
|
||||
process2 = subprocess.Popen([ 'pgpdump'], stdin=process1.stdout, stdout=subprocess.PIPE)
|
||||
process1.stdout.close()
|
||||
output = process2.communicate()[0].decode()
|
||||
return output
|
||||
else:
|
||||
# max execution time on regex
|
||||
signal.alarm(max_execution_time)
|
||||
try:
|
||||
pgp_extracted_block = re.findall(regex, item_content)
|
||||
except TimeoutException:
|
||||
pgp_extracted_block = []
|
||||
p.incr_module_timeout_statistic() # add encoder type
|
||||
print ("{0} processing timeout".format(paste.p_rel_path))
|
||||
else:
|
||||
signal.alarm(0)
|
||||
|
||||
for pgp_to_dump in pgp_extracted_block:
|
||||
pgp_packet = get_pgp_packet(message, pgp_to_dump)
|
||||
extract_id_from_output(pgp_packet)
|
||||
|
||||
def get_pgp_packet(message, save_path):
|
||||
save_path = '{}'.format(save_path)
|
||||
if len(save_path) > 131072:
|
||||
save_in_file(message, save_path)
|
||||
return ''
|
||||
else:
|
||||
process1 = subprocess.Popen([ 'echo', '-e', save_path], stdout=subprocess.PIPE)
|
||||
process2 = subprocess.Popen([ 'pgpdump'], stdin=process1.stdout, stdout=subprocess.PIPE)
|
||||
process1.stdout.close()
|
||||
output = process2.communicate()[0].decode()
|
||||
return output
|
||||
|
||||
def get_pgp_packet_file(file):
|
||||
process1 = subprocess.Popen([ 'pgpdump', file], stdout=subprocess.PIPE)
|
||||
output = process1.communicate()[0].decode()
|
||||
return output
|
||||
|
||||
def extract_id_from_output(pgp_dump_outpout):
|
||||
all_user_id = set(re.findall(regex_user_id, pgp_dump_outpout))
|
||||
|
@ -131,6 +152,12 @@ if __name__ == '__main__':
|
|||
# Setup the I/O queues
|
||||
p = Process(config_section)
|
||||
|
||||
r_serv_db = redis.StrictRedis(
|
||||
host=p.config.get("ARDB_DB", "host"),
|
||||
port=p.config.getint("ARDB_DB", "port"),
|
||||
db=p.config.getint("ARDB_DB", "db"),
|
||||
decode_responses=True)
|
||||
|
||||
serv_metadata = redis.StrictRedis(
|
||||
host=p.config.get("ARDB_Metadata", "host"),
|
||||
port=p.config.getint("ARDB_Metadata", "port"),
|
||||
|
@ -140,6 +167,11 @@ if __name__ == '__main__':
|
|||
# Sent to the logging a description of the module
|
||||
publisher.info("PgpDump started")
|
||||
|
||||
# check/create pgpdump queue directory (used for huge pgp blocks)
|
||||
pgp_dump_dir_to_process = os.path.join(os.environ['AIL_HOME'], 'temp', 'pgpdump')
|
||||
if not os.path.isdir(pgp_dump_dir_to_process):
|
||||
os.makedirs(pgp_dump_dir_to_process)
|
||||
|
||||
user_id_str = 'User ID - '
|
||||
regex_user_id= '{}.+'.format(user_id_str)
|
||||
|
||||
|
@ -159,28 +191,43 @@ if __name__ == '__main__':
|
|||
|
||||
# Endless loop getting messages from the input queue
|
||||
while True:
|
||||
# Get one message from the input queue
|
||||
message = p.get_from_set()
|
||||
|
||||
if message is None:
|
||||
publisher.debug("{} queue is empty, waiting".format(config_section))
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
|
||||
is_file = False
|
||||
set_key = set()
|
||||
set_name = set()
|
||||
set_mail = set()
|
||||
paste = Paste.Paste(message)
|
||||
|
||||
# Do something with the message from the queue
|
||||
date = str(paste._get_p_date())
|
||||
content = paste.get_p_content()
|
||||
content = remove_html(content)
|
||||
if r_serv_db.scard('pgpdumb:uuid') > 0:
|
||||
res = r_serv_db.spop('pgpdumb:uuid')
|
||||
file_to_process, message = res.split(';', 1)
|
||||
file_to_process = os.path.join(pgp_dump_dir_to_process, file_to_process)
|
||||
date = datetime.datetime.now().strftime("%Y/%m/%d")
|
||||
paste = Paste.Paste(message)
|
||||
date = str(paste._get_p_date())
|
||||
print(message)
|
||||
extract_all_id(message, file_to_process, is_file=True)
|
||||
os.remove(file_to_process)
|
||||
|
||||
extract_all_id(content, regex_pgp_public_blocs)
|
||||
extract_all_id(content, regex_pgp_signature)
|
||||
extract_all_id(content, regex_pgp_message)
|
||||
else:
|
||||
# Get one message from the input queue
|
||||
message = p.get_from_set()
|
||||
|
||||
if message is None:
|
||||
publisher.debug("{} queue is empty, waiting".format(config_section))
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
paste = Paste.Paste(message)
|
||||
|
||||
date = str(paste._get_p_date())
|
||||
content = paste.get_p_content()
|
||||
content = remove_html(content)
|
||||
|
||||
print(message)
|
||||
|
||||
extract_all_id(message, content, regex_pgp_public_blocs)
|
||||
extract_all_id(message, content, regex_pgp_signature)
|
||||
extract_all_id(message, content, regex_pgp_message)
|
||||
|
||||
for key_id in set_key:
|
||||
print(key_id)
|
||||
|
|
Loading…
Reference in a new issue