From f88e53925ed654453a063e249b6476acb0212d62 Mon Sep 17 00:00:00 2001 From: Olivier SAGIT Date: Wed, 28 Apr 2021 15:24:33 +0200 Subject: [PATCH] fix: stuck queues and submit paste --- bin/Credential.py | 299 +++++----- bin/CreditCards.py | 117 ++-- bin/DomClassifier.py | 93 +-- bin/Global.py | 286 +++++---- bin/Phone.py | 6 +- bin/SentimentAnalysis.py | 260 ++++---- bin/Tags.py | 71 +-- bin/Web.py | 112 ++-- bin/module/abstract_module.py | 26 +- bin/packages/Import_helper.py | 16 +- bin/submit_paste.py | 554 +++++++++++------- configs/core.cfg.sample | 9 + var/www/modules/Flask_config.py | 22 +- .../modules/PasteSubmit/Flask_PasteSubmit.py | 193 +++--- .../PasteSubmit/templates/submit_items.html | 500 +++++++++------- 15 files changed, 1458 insertions(+), 1106 deletions(-) diff --git a/bin/Credential.py b/bin/Credential.py index cffc4889..68fd24aa 100755 --- a/bin/Credential.py +++ b/bin/Credential.py @@ -23,6 +23,9 @@ Redis organization: """ +################################## +# Import External packages +################################## import time import os import sys @@ -30,169 +33,177 @@ import datetime import re import redis from pyfaup.faup import Faup - from pubsublogger import publisher -from Helper import Process - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages')) -import Item - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) -import ConfigLoader -import regex_helper - -## LOAD CONFIG ## -config_loader = ConfigLoader.ConfigLoader() -server_cred = config_loader.get_redis_conn("ARDB_TermCred") -server_statistics = config_loader.get_redis_conn("ARDB_Statistics") - -minimumLengthThreshold = config_loader.get_config_int("Credential", "minimumLengthThreshold") -criticalNumberToAlert = config_loader.get_config_int("Credential", "criticalNumberToAlert") -minTopPassList = config_loader.get_config_int("Credential", "minTopPassList") - -config_loader = None -## -- ## - +import lib.regex_helper as regex_helper import signal -max_execution_time = 30 -#split username with spec. char or with upper case, distinguish start with upper -REGEX_CRED = "[a-z]+|[A-Z]{3,}|[A-Z]{1,2}[a-z]+|[0-9]+" -REDIS_KEY_NUM_USERNAME = 'uniqNumForUsername' -REDIS_KEY_NUM_PATH = 'uniqNumForUsername' -REDIS_KEY_ALL_CRED_SET = 'AllCredentials' -REDIS_KEY_ALL_CRED_SET_REV = 'AllCredentialsRev' -REDIS_KEY_ALL_PATH_SET = 'AllPath' -REDIS_KEY_ALL_PATH_SET_REV = 'AllPathRev' -REDIS_KEY_MAP_CRED_TO_PATH = 'CredToPathMapping' +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule +from Helper import Process +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages')) +import Item +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Script" - config_section = "Credential" - module_name = "Credential" - p = Process(config_section) - publisher.info("Find credentials") - faup = Faup() +class Credential(AbstractModule): + """ + Credential module for AIL framework + """ - regex_web = "((?:https?:\/\/)[\.-_0-9a-zA-Z]+\.[0-9a-zA-Z]+)" - #regex_cred = "[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}:[a-zA-Z0-9\_\-]+" - regex_cred = "[a-zA-Z0-9\\._-]+@[a-zA-Z0-9\\.-]+\.[a-zA-Z]{2,6}[\\rn :\_\-]{1,10}[a-zA-Z0-9\_\-]+" - regex_site_for_stats = "@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}:" + max_execution_time = 30 - redis_cache_key = regex_helper.generate_redis_cache_key(module_name) + # Split username with spec. char or with upper case, distinguish start with upper + REGEX_CRED = "[a-z]+|[A-Z]{3,}|[A-Z]{1,2}[a-z]+|[0-9]+" + REDIS_KEY_NUM_USERNAME = 'uniqNumForUsername' + REDIS_KEY_NUM_PATH = 'uniqNumForUsername' + REDIS_KEY_ALL_CRED_SET = 'AllCredentials' + REDIS_KEY_ALL_CRED_SET_REV = 'AllCredentialsRev' + REDIS_KEY_ALL_PATH_SET = 'AllPath' + REDIS_KEY_ALL_PATH_SET_REV = 'AllPathRev' + REDIS_KEY_MAP_CRED_TO_PATH = 'CredToPathMapping' - while True: - message = p.get_from_set() - if message is None: - publisher.debug("Script Credential is Idling 10s") - time.sleep(10) - continue + def __init__(self): + super(Credential, self).__init__() + + self.faup = Faup() + + self.regex_web = "((?:https?:\/\/)[\.-_0-9a-zA-Z]+\.[0-9a-zA-Z]+)" + self.regex_cred = "[a-zA-Z0-9\\._-]+@[a-zA-Z0-9\\.-]+\.[a-zA-Z]{2,6}[\\rn :\_\-]{1,10}[a-zA-Z0-9\_\-]+" + self.regex_site_for_stats = "@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}:" + + self.redis_cache_key = regex_helper.generate_redis_cache_key(self.module_name) + + # Database + self.server_cred = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_TermCred") + self.server_statistics = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_Statistics") + + # Config values + self.minimumLengthThreshold = ConfigLoader.ConfigLoader().get_config_int("Credential", "minimumLengthThreshold") + self.criticalNumberToAlert = ConfigLoader.ConfigLoader().get_config_int("Credential", "criticalNumberToAlert") + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 10 + + # Send module state to logs + self.redis_logger.info(f"Module {self.module_name} initialized") + + + def compute(self, message): item_id, count = message.split() item_content = Item.get_item_content(item_id) # Extract all credentials - all_credentials = regex_helper.regex_findall(module_name, redis_cache_key, regex_cred, item_id, item_content, max_time=max_execution_time) - - if not all_credentials: - continue - - all_sites = regex_helper.regex_findall(module_name, redis_cache_key, regex_web, item_id, item_content, max_time=max_execution_time) - - message = 'Checked {} credentials found.'.format(len(all_credentials)) - if all_sites: - message += ' Related websites: {}'.format( (', '.join(all_sites)) ) - print(message) - - to_print = 'Credential;{};{};{};{};{}'.format(Item.get_source(item_id), Item.get_item_date(item_id), Item.get_item_basename(item_id), message, item_id) - - - #num of creds above tresh, publish an alert - if len(all_credentials) > criticalNumberToAlert: - print("========> Found more than 10 credentials in this file : {}".format( item_id )) - publisher.warning(to_print) - #Send to duplicate - p.populate_set_out(item_id, 'Duplicate') - - msg = 'infoleak:automatic-detection="credential";{}'.format(item_id) - p.populate_set_out(msg, 'Tags') - - site_occurence = regex_helper.regex_findall(module_name, redis_cache_key, regex_site_for_stats, item_id, item_content, max_time=max_execution_time, r_set=False) - - creds_sites = {} - - for site in site_occurence: - site_domain = site[1:-1].lower() - if site_domain in creds_sites.keys(): - creds_sites[site_domain] += 1 - else: - creds_sites[site_domain] = 1 - - for url in all_sites: - faup.decode(url) - domain = faup.get()['domain'] - ## TODO: # FIXME: remove me - try: - domain = domain.decode() - except: - pass - if domain in creds_sites.keys(): - creds_sites[domain] += 1 - else: - creds_sites[domain] = 1 - - for site, num in creds_sites.items(): # Send for each different site to moduleStats - - mssg = 'credential;{};{};{}'.format(num, site, Item.get_item_date(item_id)) - print(mssg) - p.populate_set_out(mssg, 'ModuleStats') + all_credentials = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.regex_cred, item_id, item_content, max_time=Credential.max_execution_time) + if all_credentials: + nb_cred = len(all_credentials) + message = f'Checked {nb_cred} credentials found.' + + all_sites = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.regex_web, item_id, item_content, max_time=Credential.max_execution_time) if all_sites: - print("=======> Probably on : {}".format(', '.join(all_sites))) + discovered_sites = ', '.join(all_sites) + message += f' Related websites: {discovered_sites}' + + self.redis_logger.debug(message) - date = datetime.datetime.now().strftime("%Y%m") + to_print = f'Credential;{Item.get_source(item_id)};{Item.get_item_date(item_id)};{Item.get_item_basename(item_id)};{message};{item_id}' + + #num of creds above tresh, publish an alert + if nb_cred > self.criticalNumberToAlert: + self.redis_logger.debug(f"========> Found more than 10 credentials in this file : {item_id}") + self.redis_logger.warning(to_print) + + # Send to duplicate + self.process.populate_set_out(item_id, 'Duplicate') + + msg = f'infoleak:automatic-detection="credential";{item_id}' + self.process.populate_set_out(msg, 'Tags') + + site_occurence = regex_helper.regex_findall(self.module_name, self.redis_cache_key, self.regex_site_for_stats, item_id, item_content, max_time=Credential.max_execution_time, r_set=False) + + creds_sites = {} + + for site in site_occurence: + site_domain = site[1:-1].lower() + if site_domain in creds_sites.keys(): + creds_sites[site_domain] += 1 + else: + creds_sites[site_domain] = 1 + + for url in all_sites: + self.faup.decode(url) + domain = self.faup.get()['domain'] + ## TODO: # FIXME: remove me + try: + domain = domain.decode() + except: + pass + if domain in creds_sites.keys(): + creds_sites[domain] += 1 + else: + creds_sites[domain] = 1 + + for site, num in creds_sites.items(): # Send for each different site to moduleStats + + mssg = f'credential;{num};{site};{Item.get_item_date(item_id)}' + self.redis_logger.debug(mssg) + self.process.populate_set_out(mssg, 'ModuleStats') + + if all_sites: + discovered_sites = ', '.join(all_sites) + self.redis_logger.debug(f"=======> Probably on : {discovered_sites}") + + date = datetime.datetime.now().strftime("%Y%m") + for cred in all_credentials: + maildomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", cred.lower())[0] + self.faup.decode(maildomains) + tld = self.faup.get()['tld'] + ## TODO: # FIXME: remove me + try: + tld = tld.decode() + except: + pass + self.server_statistics.hincrby('credential_by_tld:'+date, tld, 1) + else: + self.redis_logger.info(to_print) + self.redis_logger.debug(f'found {nb_cred} credentials') + + # For searching credential in termFreq for cred in all_credentials: - maildomains = re.findall("@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,20}", cred.lower())[0] - faup.decode(maildomains) - tld = faup.get()['tld'] - ## TODO: # FIXME: remove me - try: - tld = tld.decode() - except: - pass - server_statistics.hincrby('credential_by_tld:'+date, tld, 1) - else: - publisher.info(to_print) - print('found {} credentials'.format(len(all_credentials))) + cred = cred.split('@')[0] #Split to ignore mail address + + # unique number attached to unique path + uniq_num_path = self.server_cred.incr(Credential.REDIS_KEY_NUM_PATH) + self.server_cred.hmset(Credential.REDIS_KEY_ALL_PATH_SET, {item_id: uniq_num_path}) + self.server_cred.hmset(Credential.REDIS_KEY_ALL_PATH_SET_REV, {uniq_num_path: item_id}) + + # unique number attached to unique username + uniq_num_cred = self.server_cred.hget(Credential.REDIS_KEY_ALL_CRED_SET, cred) + if uniq_num_cred is None: + # cred do not exist, create new entries + uniq_num_cred = self.server_cred.incr(Credential.REDIS_KEY_NUM_USERNAME) + self.server_cred.hmset(Credential.REDIS_KEY_ALL_CRED_SET, {cred: uniq_num_cred}) + self.server_cred.hmset(Credential.REDIS_KEY_ALL_CRED_SET_REV, {uniq_num_cred: cred}) + + # Add the mapping between the credential and the path + self.server_cred.sadd(Credential.REDIS_KEY_MAP_CRED_TO_PATH+'_'+str(uniq_num_cred), uniq_num_path) + + # Split credentials on capital letters, numbers, dots and so on + # Add the split to redis, each split point towards its initial credential unique number + splitedCred = re.findall(Credential.REGEX_CRED, cred) + for partCred in splitedCred: + if len(partCred) > self.minimumLengthThreshold: + self.server_cred.sadd(partCred, uniq_num_cred) - #for searching credential in termFreq - for cred in all_credentials: - cred = cred.split('@')[0] #Split to ignore mail address - - #unique number attached to unique path - uniq_num_path = server_cred.incr(REDIS_KEY_NUM_PATH) - server_cred.hmset(REDIS_KEY_ALL_PATH_SET, {item_id: uniq_num_path}) - server_cred.hmset(REDIS_KEY_ALL_PATH_SET_REV, {uniq_num_path: item_id}) - - #unique number attached to unique username - uniq_num_cred = server_cred.hget(REDIS_KEY_ALL_CRED_SET, cred) - if uniq_num_cred is None: #cred do not exist, create new entries - uniq_num_cred = server_cred.incr(REDIS_KEY_NUM_USERNAME) - server_cred.hmset(REDIS_KEY_ALL_CRED_SET, {cred: uniq_num_cred}) - server_cred.hmset(REDIS_KEY_ALL_CRED_SET_REV, {uniq_num_cred: cred}) - - #Add the mapping between the credential and the path - server_cred.sadd(REDIS_KEY_MAP_CRED_TO_PATH+'_'+str(uniq_num_cred), uniq_num_path) - - #Split credentials on capital letters, numbers, dots and so on - #Add the split to redis, each split point towards its initial credential unique number - splitedCred = re.findall(REGEX_CRED, cred) - for partCred in splitedCred: - if len(partCred) > minimumLengthThreshold: - server_cred.sadd(partCred, uniq_num_cred) +if __name__ == '__main__': + + module = Credential() + module.run() diff --git a/bin/CreditCards.py b/bin/CreditCards.py index 456e474a..2e4b969f 100755 --- a/bin/CreditCards.py +++ b/bin/CreditCards.py @@ -11,75 +11,84 @@ It apply credit card regexes on paste content and warn if above a threshold. """ - +################################## +# Import External packages +################################## import pprint import time -from packages import Paste -from packages import lib_refine from pubsublogger import publisher import re import sys +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule +from packages import Paste +from packages import lib_refine from Helper import Process -if __name__ == "__main__": - publisher.port = 6380 - publisher.channel = "Script" - config_section = 'CreditCards' +class CreditCards(AbstractModule): + """ + CreditCards module for AIL framework + """ - p = Process(config_section) + def __init__(self): + super(CreditCards, self).__init__() - # FUNCTIONS # - publisher.info("CreditCards script started") + # Source: http://www.richardsramblings.com/regex/credit-card-numbers/ + cards = [ + r'\b4\d{3}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # 16-digit VISA, with separators + r'\b5[1-5]\d{2}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # 16 digits MasterCard + r'\b6(?:011|22(?:(?=[\ \-]?(?:2[6-9]|[3-9]))|[2-8]|9(?=[\ \-]?(?:[01]|2[0-5])))|4[4-9]\d|5\d\d)(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # Discover Card + r'\b35(?:2[89]|[3-8]\d)(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # Japan Credit Bureau (JCB) + r'\b3[47]\d\d(?:[\ \-]?)\d{6}(?:[\ \-]?)\d{5}\b', # American Express + r'\b(?:5[0678]\d\d|6304|6390|67\d\d)\d{8,15}\b', # Maestro + ] - creditcard_regex = "4[0-9]{12}(?:[0-9]{3})?" + self.regex = re.compile('|'.join(cards)) - # Source: http://www.richardsramblings.com/regex/credit-card-numbers/ - cards = [ - r'\b4\d{3}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # 16-digit VISA, with separators - r'\b5[1-5]\d{2}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # 16 digits MasterCard - r'\b6(?:011|22(?:(?=[\ \-]?(?:2[6-9]|[3-9]))|[2-8]|9(?=[\ \-]?(?:[01]|2[0-5])))|4[4-9]\d|5\d\d)(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # Discover Card - r'\b35(?:2[89]|[3-8]\d)(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}(?:[\ \-]?)\d{4}\b', # Japan Credit Bureau (JCB) - r'\b3[47]\d\d(?:[\ \-]?)\d{6}(?:[\ \-]?)\d{5}\b', # American Express - r'\b(?:5[0678]\d\d|6304|6390|67\d\d)\d{8,15}\b', # Maestro - ] + # Waiting time in secondes between to message proccessed + self.pending_seconds = 10 - regex = re.compile('|'.join(cards)) + # Send module state to logs + self.redis_logger.info(f"Module {self.module_name} initialized") - while True: - message = p.get_from_set() - if message is not None: - filename, score = message.split() - paste = Paste.Paste(filename) - content = paste.get_p_content() - all_cards = re.findall(regex, content) - if len(all_cards) > 0: - print('All matching', all_cards) - creditcard_set = set([]) - for card in all_cards: - clean_card = re.sub('[^0-9]', '', card) - clean_card = clean_card - if lib_refine.is_luhn_valid(clean_card): - print(clean_card, 'is valid') - creditcard_set.add(clean_card) + def compute(self, message): + filename, score = message.split() + paste = Paste.Paste(filename) + content = paste.get_p_content() + all_cards = re.findall(self.regex, content) - pprint.pprint(creditcard_set) - to_print = 'CreditCard;{};{};{};'.format( - paste.p_source, paste.p_date, paste.p_name) - if (len(creditcard_set) > 0): - publisher.warning('{}Checked {} valid number(s);{}'.format( - to_print, len(creditcard_set), paste.p_rel_path)) - print('{}Checked {} valid number(s);{}'.format( - to_print, len(creditcard_set), paste.p_rel_path)) - #Send to duplicate - p.populate_set_out(filename, 'Duplicate') + if len(all_cards) > 0: + self.redis_logger.debug('All matching', all_cards) + creditcard_set = set([]) - msg = 'infoleak:automatic-detection="credit-card";{}'.format(filename) - p.populate_set_out(msg, 'Tags') - else: - publisher.info('{}CreditCard related;{}'.format(to_print, paste.p_rel_path)) - else: - publisher.debug("Script creditcard is idling 1m") - time.sleep(10) + for card in all_cards: + clean_card = re.sub('[^0-9]', '', card) + # TODO purpose of this assignation ? + clean_card = clean_card + if lib_refine.is_luhn_valid(clean_card): + self.redis_logger.debug(clean_card, 'is valid') + creditcard_set.add(clean_card) + + pprint.pprint(creditcard_set) + to_print = f'CreditCard;{paste.p_source};{paste.p_date};{paste.p_name};' + + if (len(creditcard_set) > 0): + self.redis_logger.warning(f'{to_print}Checked {len(creditcard_set)} valid number(s);{paste.p_rel_path}') + + #Send to duplicate + self.process.populate_set_out(filename, 'Duplicate') + + msg = f'infoleak:automatic-detection="credit-card";{filename}' + self.process.populate_set_out(msg, 'Tags') + else: + self.redis_logger.info(f'{to_print}CreditCard related;{paste.p_rel_path}') + +if __name__ == '__main__': + + module = CreditCards() + module.run() diff --git a/bin/DomClassifier.py b/bin/DomClassifier.py index 696c4ca8..f00428c9 100755 --- a/bin/DomClassifier.py +++ b/bin/DomClassifier.py @@ -9,12 +9,20 @@ The DomClassifier modules extract and classify Internet domains/hostnames/IP add the out output of the Global module. """ + +################################## +# Import External packages +################################## import os import sys import time from pubsublogger import publisher - import DomainClassifier.domainclassifier + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from Helper import Process sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib')) @@ -22,60 +30,63 @@ import d4 import item_basic -def main(): - publisher.port = 6380 - publisher.channel = "Script" +class DomClassifier(AbstractModule): + """ + DomClassifier module for AIL framework + """ - config_section = 'DomClassifier' + def __init__(self): + super(DomClassifier, self).__init__() - p = Process(config_section) - addr_dns = p.config.get("DomClassifier", "dns") + # Waiting time in secondes between to message proccessed + self.pending_seconds = 1 - publisher.info("""ZMQ DomainClassifier is Running""") + addr_dns = self.process.config.get("DomClassifier", "dns") - c = DomainClassifier.domainclassifier.Extract(rawtext="", nameservers=[addr_dns]) + self.redis_logger.info("""ZMQ DomainClassifier is Running""") - cc = p.config.get("DomClassifier", "cc") - cc_tld = p.config.get("DomClassifier", "cc_tld") + self.c = DomainClassifier.domainclassifier.Extract(rawtext="", nameservers=[addr_dns]) - while True: + self.cc = self.process.config.get("DomClassifier", "cc") + self.cc_tld = self.process.config.get("DomClassifier", "cc_tld") + + # Send module state to logs + self.redis_logger.info("Module %s initialized" % (self.module_name)) + + + def compute(self, message): try: - item_id = p.get_from_set() - - if item_id is None: - publisher.debug("Script DomClassifier is idling 1s") - time.sleep(1) - continue - - item_content = item_basic.get_item_content(item_id) - mimetype = item_basic.get_item_mimetype(item_id) - item_basename = item_basic.get_basename(item_id) - item_source = item_basic.get_source(item_id) - item_date = item_basic.get_item_date(item_id) + item_content = item_basic.get_item_content(message) + mimetype = item_basic.get_item_mimetype(message) + item_basename = item_basic.get_basename(message) + item_source = item_basic.get_source(message) + item_date = item_basic.get_item_date(message) if mimetype.split('/')[0] == "text": - c.text(rawtext=item_content) - c.potentialdomain() - c.validdomain(passive_dns=True, extended=False) - print(c.vdomain) + self.c.text(rawtext=item_content) + self.c.potentialdomain() + self.c.validdomain(passive_dns=True, extended=False) + self.redis_logger.debug(self.c.vdomain) - if c.vdomain and d4.is_passive_dns_enabled(): - for dns_record in c.vdomain: - p.populate_set_out(dns_record) + if self.c.vdomain and d4.is_passive_dns_enabled(): + for dns_record in self.c.vdomain: + self.process.populate_set_out(dns_record) - localizeddomains = c.include(expression=cc_tld) + localizeddomains = self.c.include(expression=self.cc_tld) if localizeddomains: - print(localizeddomains) - publisher.warning(f"DomainC;{item_source};{item_date};{item_basename};Checked {localizeddomains} located in {cc_tld};{item_id}") - localizeddomains = c.localizedomain(cc=cc) + self.redis_logger.debug(localizeddomains) + self.redis_logger.warning(f"DomainC;{item_source};{item_date};{item_basename};Checked {localizeddomains} located in {self.cc_tld};{message}") + localizeddomains = self.c.localizedomain(cc=self.cc) if localizeddomains: - print(localizeddomains) - publisher.warning(f"DomainC;{item_source};{item_date};{item_basename};Checked {localizeddomains} located in {cc};{item_id}") + self.redis_logger.debug(localizeddomains) + self.redis_logger.warning(f"DomainC;{item_source};{item_date};{item_basename};Checked {localizeddomains} located in {self.cc};{message}") + + except IOError as err: + self.redis_logger.error(f"Duplicate;{item_source};{item_date};{item_basename};CRC Checksum Failed") + raise Exception(f"CRC Checksum Failed on: {message}") - except IOError: - print("CRC Checksum Failed on :", item_id) - publisher.error(f"Duplicate;{item_source};{item_date};{item_basename};CRC Checksum Failed") if __name__ == "__main__": - main() + module = DomClassifier() + module.run() diff --git a/bin/Global.py b/bin/Global.py index 5244ecc8..1df73e78 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -20,6 +20,10 @@ Requirements *Need the ZMQ_Feed_Q Module running to be able to work properly. """ + +################################## +# Import External packages +################################## import base64 import hashlib import io @@ -28,159 +32,203 @@ import os import sys import time import uuid - import datetime import redis - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) -import ConfigLoader - from pubsublogger import publisher + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader from Helper import Process -config_loader = ConfigLoader.ConfigLoader() -r_stats = config_loader.get_redis_conn("ARDB_Statistics") -config_loader = None -def gunzip_bytes_obj(bytes_obj): - in_ = io.BytesIO() - in_.write(bytes_obj) - in_.seek(0) - with gzip.GzipFile(fileobj=in_, mode='rb') as fo: - gunzipped_bytes_obj = fo.read() - return gunzipped_bytes_obj +class Global(AbstractModule): + """ + Global module for AIL framework + """ + + def __init__(self): + super(Global, self).__init__() -def rreplace(s, old, new, occurrence): - li = s.rsplit(old, occurrence) - return new.join(li) + self.r_stats = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_Statistics") + + self.processed_paste = 0 + # TODO rename time_1 explicitely + self.time_1 = time.time() + + # Get and sanityze PASTE DIRECTORY + self.PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], self.process.config.get("Directories", "pastes")) + self.PASTES_FOLDERS = self.PASTES_FOLDER + '/' + self.PASTES_FOLDERS = os.path.join(os.path.realpath(self.PASTES_FOLDERS), '') + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 0.5 + + # Send module state to logs + self.redis_logger.info(f"Module {self.module_name} initialized") -if __name__ == '__main__': - publisher.port = 6380 - publisher.channel = 'Script' - processed_paste = 0 - time_1 = time.time() + def computeNone(self): + difftime = time.time() - self.time_1 + if int(difftime) > 30: + to_print = f'Global; ; ; ;glob Processed {self.processed_paste} paste(s) in {difftime} s' + self.redis_logger.debug(to_print) - config_section = 'Global' + self.time_1 = time.time() + self.processed_paste = 0 - p = Process(config_section) - # get and sanityze PASTE DIRECTORY - PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], p.config.get("Directories", "pastes")) - PASTES_FOLDERS = PASTES_FOLDER + '/' - PASTES_FOLDERS = os.path.join(os.path.realpath(PASTES_FOLDERS), '') + def compute(self, message): + # Recovering the streamed message informations + splitted = message.split() - # LOGGING # - publisher.info("Feed Script started to receive & publish.") + if len(splitted) == 2: + paste, gzip64encoded = splitted - while True: + # Remove PASTES_FOLDER from item path (crawled item + submited) + if self.PASTES_FOLDERS in paste: + paste = paste.replace(self.PASTES_FOLDERS, '', 1) - message = p.get_from_set() - # Recovering the streamed message informations. - if message is not None: - splitted = message.split() - if len(splitted) == 2: - paste, gzip64encoded = splitted + file_name_paste = paste.split('/')[-1] + if len(file_name_paste) > 255: + new_file_name_paste = '{}{}.gz'.format(file_name_paste[:215], str(uuid.uuid4())) + paste = self.rreplace(paste, file_name_paste, new_file_name_paste, 1) + + # Creating the full filepath + filename = os.path.join(self.PASTES_FOLDER, paste) + filename = os.path.realpath(filename) + + # Incorrect filename + if not os.path.commonprefix([filename, self.PASTES_FOLDER]) == self.PASTES_FOLDER: + self.redis_logger.warning(f'Global; Path traversal detected {filename}') + else: - # TODO Store the name of the empty paste inside a Redis-list. - print("Empty Paste: not processed") - publisher.debug("Empty Paste: {0} not processed".format(message)) - continue + # Decode compressed base64 + decoded = base64.standard_b64decode(gzip64encoded) + new_file_content = self.gunzip_bytes_obj(decoded) + + if new_file_content: + + filename = self.check_filename(filename, new_file_content) + + if filename: + + # create subdir + dirname = os.path.dirname(filename) + if not os.path.exists(dirname): + os.makedirs(dirname) + + with open(filename, 'wb') as f: + f.write(decoded) + + paste = filename + # remove self.PASTES_FOLDER from + if self.PASTES_FOLDERS in paste: + paste = paste.replace(self.PASTES_FOLDERS, '', 1) + + self.process.populate_set_out(paste) + self.processed_paste+=1 + else: - #print("Empty Queues: Waiting...") - if int(time.time() - time_1) > 30: - to_print = 'Global; ; ; ;glob Processed {0} paste(s) in {1} s'.format(processed_paste, time.time() - time_1) - print(to_print) - #publisher.info(to_print) - time_1 = time.time() - processed_paste = 0 - time.sleep(0.5) - continue + # TODO Store the name of the empty paste inside a Redis-list + self.redis_logger.debug(f"Empty Paste: {message} not processed") - # remove PASTES_FOLDER from item path (crawled item + submited) - if PASTES_FOLDERS in paste: - paste = paste.replace(PASTES_FOLDERS, '', 1) - file_name_paste = paste.split('/')[-1] - if len(file_name_paste)>255: - new_file_name_paste = '{}{}.gz'.format(file_name_paste[:215], str(uuid.uuid4())) - paste = rreplace(paste, file_name_paste, new_file_name_paste, 1) + def check_filename(self, filename, new_file_content): + """ + Check if file is not a duplicated file + return the filename if new file, else None + """ - # Creating the full filepath - filename = os.path.join(PASTES_FOLDER, paste) - filename = os.path.realpath(filename) + # check if file exist + if os.path.isfile(filename): + self.redis_logger.warning(f'File already exist {filename}') - # incorrect filename - if not os.path.commonprefix([filename, PASTES_FOLDER]) == PASTES_FOLDER: - print('Path traversal detected {}'.format(filename)) - publisher.warning('Global; Path traversal detected') - else: - - # decode compressed base64 - decoded = base64.standard_b64decode(gzip64encoded) - try: - new_file_content = gunzip_bytes_obj(decoded) - except Exception as e: - print('{}, {}'.format(filename, e)) - publisher.warning('Global; Invalid Gzip file: {}, {}'.format(filename, e)) - continue - - # check if file exist - if os.path.isfile(filename): - print('File already exist {}'.format(filename)) - publisher.warning('Global; File already exist') - - try: - with gzip.open(filename, 'rb') as f: - curr_file_content = f.read() - except EOFError: - publisher.warning('Global; Incomplete file: {}'.format(filename)) - # save daily stats - r_stats.zincrby('module:Global:incomplete_file', datetime.datetime.now().strftime('%Y%m%d'), 1) - # discard item - continue - except OSError: - publisher.warning('Global; Not a gzipped file: {}'.format(filename)) - # save daily stats - r_stats.zincrby('module:Global:invalid_file', datetime.datetime.now().strftime('%Y%m%d'), 1) - # discard item - continue + # Check that file already exists but content differs + curr_file_content = self.gunzip_file(filename) + if curr_file_content: + # Compare file content with message content with MD5 checksums curr_file_md5 = hashlib.md5(curr_file_content).hexdigest() - new_file_md5 = hashlib.md5(new_file_content).hexdigest() if new_file_md5 != curr_file_md5: - + # MD5 are not equals, verify filename if filename.endswith('.gz'): - filename = '{}_{}.gz'.format(filename[:-3], new_file_md5) + filename = f'{filename[:-3]}_{new_file_md5}.gz' else: - filename = '{}_{}'.format(filename, new_file_md5) + filename = f'{filename}_{new_file_md5}' + + self.redis_logger.debug(f'new file to check: {filename}') - # continue if new file already exist if os.path.isfile(filename): - print('ignore duplicated file') - continue + # Ignore duplicate + self.redis_logger.debug(f'ignore duplicated file {filename}') + filename = None - print('new file: {}'.format(filename)) - # ignore duplicate else: - print('ignore duplicated file') - continue + # Ignore duplicate checksum equals + self.redis_logger.debug(f'ignore duplicated file {filename}') + filename = None + + else: + # File not unzipped + filename = None + + + return filename - # create subdir - dirname = os.path.dirname(filename) - if not os.path.exists(dirname): - os.makedirs(dirname) - with open(filename, 'wb') as f: - f.write(decoded) + def gunzip_file(self, filename): + """ + Unzip a file + publish stats if failure + """ + curr_file_content = None - paste = filename - # remove PASTES_FOLDER from - if PASTES_FOLDERS in paste: - paste = paste.replace(PASTES_FOLDERS, '', 1) + try: + with gzip.open(filename, 'rb') as f: + curr_file_content = f.read() + except EOFError: + self.redis_logger.warning(f'Global; Incomplete file: {filename}') + # save daily stats + self.r_stats.zincrby('module:Global:incomplete_file', datetime.datetime.now().strftime('%Y%m%d'), 1) + except OSError: + self.redis_logger.warning(f'Global; Not a gzipped file: {filename}') + # save daily stats + self.r_stats.zincrby('module:Global:invalid_file', datetime.datetime.now().strftime('%Y%m%d'), 1) - p.populate_set_out(paste) - processed_paste+=1 + return curr_file_content + + + def gunzip_bytes_obj(self, bytes_obj): + gunzipped_bytes_obj = None + + try: + in_ = io.BytesIO() + in_.write(bytes_obj) + in_.seek(0) + + with gzip.GzipFile(fileobj=in_, mode='rb') as fo: + gunzipped_bytes_obj = fo.read() + + except Exception as e: + self.redis_logger.warning(f'Global; Invalid Gzip file: {filename}, {e}') + + return gunzipped_bytes_obj + + + def rreplace(self, s, old, new, occurrence): + li = s.rsplit(old, occurrence) + + return new.join(li) + + +if __name__ == '__main__': + + module = Global() + module.run() diff --git a/bin/Phone.py b/bin/Phone.py index 0e2cff6a..74a5a7cc 100755 --- a/bin/Phone.py +++ b/bin/Phone.py @@ -54,9 +54,9 @@ class Phone(AbstractModule): # If the list is greater than 4, we consider the Paste may contain a list of phone numbers if len(results) > 4: self.redis_logger.debug(results) - self.redis_logger.warning('{} contains PID (phone numbers)'.format(paste.p_name)) + self.redis_logger.warning(f'{paste.p_name} contains PID (phone numbers)') - msg = 'infoleak:automatic-detection="phone-number";{}'.format(message) + msg = f'infoleak:automatic-detection="phone-number";{message}' self.process.populate_set_out(msg, 'Tags') # Send to duplicate @@ -75,7 +75,7 @@ class Phone(AbstractModule): pass for country_code in stats: if stats[country_code] > 4: - self.redis_logger.warning('{} contains Phone numbers with country code {}'.format(paste.p_name, country_code)) + self.redis_logger.warning(f'{paste.p_name} contains Phone numbers with country code {country_code}') if __name__ == '__main__': diff --git a/bin/SentimentAnalysis.py b/bin/SentimentAnalysis.py index a90a3a09..dcaebf00 100755 --- a/bin/SentimentAnalysis.py +++ b/bin/SentimentAnalysis.py @@ -14,115 +14,32 @@ Hutto, C.J. & Gilbert, E.E. (2014). VADER: A Parsimonious Rule-based Model for Sentiment Analysis of Social Media Text. Eighth International Conference on Weblogs and Social Media (ICWSM-14). Ann Arbor, MI, June 2014. """ + +################################## +# Import External packages +################################## import os import sys - import time import datetime import calendar import redis import json +import signal from pubsublogger import publisher -from Helper import Process -from packages import Paste - -sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) -import ConfigLoader - from nltk.sentiment.vader import SentimentIntensityAnalyzer from nltk import tokenize, download -# Config Variables -accepted_Mime_type = ['text/plain'] -size_threshold = 250 -line_max_length_threshold = 1000 -#time_clean_sentiment_db = 60*60 +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule +from Helper import Process +from packages import Paste +sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) +import ConfigLoader -def Analyse(message, server): - path = message - paste = Paste.Paste(path) - - # get content with removed line + number of them - num_line_removed, p_content = paste.get_p_content_with_removed_lines(line_max_length_threshold) - provider = paste.p_source - p_date = str(paste._get_p_date()) - p_MimeType = paste._get_p_encoding() - - # Perform further analysis - if p_MimeType == "text/plain": - if isJSON(p_content): - p_MimeType = "JSON" - - if p_MimeType in accepted_Mime_type: - - the_date = datetime.date(int(p_date[0:4]), int(p_date[4:6]), int(p_date[6:8])) - the_time = datetime.datetime.now() - the_time = datetime.time(getattr(the_time, 'hour'), 0, 0) - combined_datetime = datetime.datetime.combine(the_date, the_time) - timestamp = calendar.timegm(combined_datetime.timetuple()) - - try: - sentences = tokenize.sent_tokenize(p_content) - except: - # use the NLTK Downloader to obtain the resource - download('punkt') - sentences = tokenize.sent_tokenize(p_content) - - if len(sentences) > 0: - avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0} - neg_line = 0 - pos_line = 0 - sid = SentimentIntensityAnalyzer(sentiment_lexicon_file) - for sentence in sentences: - ss = sid.polarity_scores(sentence) - for k in sorted(ss): - if k == 'compound': - if ss['neg'] > ss['pos']: - avg_score['compoundNeg'] += ss[k] - neg_line += 1 - else: - avg_score['compoundPos'] += ss[k] - pos_line += 1 - else: - avg_score[k] += ss[k] - - - for k in avg_score: - if k == 'compoundPos': - avg_score[k] = avg_score[k] / (pos_line if pos_line > 0 else 1) - elif k == 'compoundNeg': - avg_score[k] = avg_score[k] / (neg_line if neg_line > 0 else 1) - else: - avg_score[k] = avg_score[k] / len(sentences) - - - # In redis-levelDB: {} = set, () = K-V - # {Provider_set -> provider_i} - # {Provider_TimestampInHour_i -> UniqID_i}_j - # (UniqID_i -> PasteValue_i) - - server.sadd('Provider_set', provider) - - provider_timestamp = provider + '_' + str(timestamp) - server.incr('UniqID') - UniqID = server.get('UniqID') - print(provider_timestamp, '->', UniqID, 'dropped', num_line_removed, 'lines') - server.sadd(provider_timestamp, UniqID) - server.set(UniqID, avg_score) - else: - print('Dropped:', p_MimeType) - - -def isJSON(content): - try: - json.loads(content) - return True - - except Exception: - return False - -import signal class TimeoutException(Exception): pass @@ -132,48 +49,133 @@ def timeout_handler(signum, frame): signal.signal(signal.SIGALRM, timeout_handler) -if __name__ == '__main__': - # If you wish to use an other port of channel, do not forget to run a subscriber accordingly (see launch_logs.sh) - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - # Section name in bin/packages/modules.cfg - config_section = 'SentimentAnalysis' +class SentimentAnalysis(AbstractModule): + """ + SentimentAnalysis module for AIL framework + """ - # Setup the I/O queues - p = Process(config_section) + + # Config Variables + accepted_Mime_type = ['text/plain'] + line_max_length_threshold = 1000 - # Sent to the logging a description of the module - publisher.info("") - config_loader = ConfigLoader.ConfigLoader() - sentiment_lexicon_file = config_loader.get_config_str("Directories", "sentiment_lexicon_file") + def __init__(self): + super(SentimentAnalysis, self).__init__() - # REDIS_LEVEL_DB # - server = config_loader.get_redis_conn("ARDB_Sentiment") - config_loader = None + self.sentiment_lexicon_file = ConfigLoader.ConfigLoader().get_config_str("Directories", "sentiment_lexicon_file") - time1 = time.time() + # REDIS_LEVEL_DB # + self.db = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_Sentiment") - while True: - message = p.get_from_set() - if message is None: - #if int(time.time() - time1) > time_clean_sentiment_db: - # clean_db() - # time1 = time.time() - # continue - #else: - publisher.debug("{} queue is empty, waiting".format(config_section)) - time.sleep(1) - continue + self.time1 = time.time() + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 1 + + # Send module state to logs + self.redis_logger.info(f"Module {self.module_name} initialized") + + + def compute(self, message): + # Max time to compute one entry signal.alarm(60) try: - Analyse(message, server) + self.analyse(message) except TimeoutException: - p.incr_module_timeout_statistic() - print ("{0} processing timeout".format(message)) - continue + self.process.incr_module_timeout_statistic() + self.redis_logger.debug(f"{message} processing timeout") else: signal.alarm(0) + + + def analyse(self, message): + + paste = Paste.Paste(message) + + # get content with removed line + number of them + num_line_removed, p_content = paste.get_p_content_with_removed_lines(SentimentAnalysis.line_max_length_threshold) + provider = paste.p_source + p_date = str(paste._get_p_date()) + p_MimeType = paste._get_p_encoding() + + # Perform further analysis + if p_MimeType == "text/plain": + if self.isJSON(p_content): + p_MimeType = "JSON" + + if p_MimeType in SentimentAnalysis.accepted_Mime_type: + self.redis_logger.debug(f'Accepted :{p_MimeType}') + + the_date = datetime.date(int(p_date[0:4]), int(p_date[4:6]), int(p_date[6:8])) + the_time = datetime.datetime.now() + the_time = datetime.time(getattr(the_time, 'hour'), 0, 0) + combined_datetime = datetime.datetime.combine(the_date, the_time) + timestamp = calendar.timegm(combined_datetime.timetuple()) + + try: + sentences = tokenize.sent_tokenize(p_content) + except: + # use the NLTK Downloader to obtain the resource + download('punkt') + sentences = tokenize.sent_tokenize(p_content) + + if len(sentences) > 0: + avg_score = {'neg': 0.0, 'neu': 0.0, 'pos': 0.0, 'compoundPos': 0.0, 'compoundNeg': 0.0} + neg_line = 0 + pos_line = 0 + sid = SentimentIntensityAnalyzer(sentiment_lexicon_file) + for sentence in sentences: + ss = sid.polarity_scores(sentence) + for k in sorted(ss): + if k == 'compound': + if ss['neg'] > ss['pos']: + avg_score['compoundNeg'] += ss[k] + neg_line += 1 + else: + avg_score['compoundPos'] += ss[k] + pos_line += 1 + else: + avg_score[k] += ss[k] + + + for k in avg_score: + if k == 'compoundPos': + avg_score[k] = avg_score[k] / (pos_line if pos_line > 0 else 1) + elif k == 'compoundNeg': + avg_score[k] = avg_score[k] / (neg_line if neg_line > 0 else 1) + else: + avg_score[k] = avg_score[k] / len(sentences) + + + # In redis-levelDB: {} = set, () = K-V + # {Provider_set -> provider_i} + # {Provider_TimestampInHour_i -> UniqID_i}_j + # (UniqID_i -> PasteValue_i) + + self.db.sadd('Provider_set', provider) + + provider_timestamp = provider + '_' + str(timestamp) + self.db.incr('UniqID') + UniqID = self.db.get('UniqID') + self.redis_logger.debug(f'{provider_timestamp}->{UniqID}dropped{num_line_removed}lines') + self.db.sadd(provider_timestamp, UniqID) + self.db.set(UniqID, avg_score) + else: + self.redis_logger.debug(f'Dropped:{p_MimeType}') + + + def isJSON(self, content): + try: + json.loads(content) + return True + + except Exception: + return False + + +if __name__ == '__main__': + + module = SentimentAnalysis() + module.run() diff --git a/bin/Tags.py b/bin/Tags.py index b38d6309..6f3b4a17 100755 --- a/bin/Tags.py +++ b/bin/Tags.py @@ -8,42 +8,47 @@ The Tags Module This module create tags. """ -import time +################################## +# Import External packages +################################## +import time from pubsublogger import publisher + + +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from Helper import Process from packages import Tag + +class Tags(AbstractModule): + """ + Tags module for AIL framework + """ + + def __init__(self): + super(Tags, self).__init__() + + # Waiting time in secondes between to message proccessed + self.pending_seconds = 10 + + # Send module state to logs + self.redis_logger.info(f"Module {self.module_name} initialized") + + + def compute(self, message): + self.redis_logger.debug(message) + + tag, item_id = message.split(';') + Tag.add_tag("item", tag, item_id) + + self.process.populate_set_out(message, 'MISP_The_Hive_feeder') + + if __name__ == '__main__': - - # Port of the redis instance used by pubsublogger - publisher.port = 6380 - # Script is the default channel used for the modules. - publisher.channel = 'Script' - - # Section name in bin/packages/modules.cfg - config_section = 'Tags' - - # Setup the I/O queues - p = Process(config_section) - - # Sent to the logging a description of the module - publisher.info("Tags module started") - - # Endless loop getting messages from the input queue - while True: - # Get one message from the input queue - message = p.get_from_set() - - if message is None: - publisher.debug("{} queue is empty, waiting 10s".format(config_section)) - time.sleep(10) - continue - - else: - print(message) - tag, item_id = message.split(';') - - Tag.add_tag("item", tag, item_id) - - p.populate_set_out(message, 'MISP_The_Hive_feeder') + + module = Tags() + module.run() diff --git a/bin/Web.py b/bin/Web.py index 473489a5..cbc07f2c 100755 --- a/bin/Web.py +++ b/bin/Web.py @@ -50,7 +50,7 @@ class Web(AbstractModule): """ Init Web """ - super(Web, self).__init__() + super(Web, self).__init__(logger_channel='script:web') # REDIS Cache self.r_serv2 = redis.StrictRedis( @@ -82,7 +82,8 @@ class Web(AbstractModule): self.prec_filename = None # Send module state to logs - self.redis_logger.info("Module %s initialized" % (self.module_name)) + self.redis_logger.info(f"Module {self.module_name} initialized") + def compute(self, message): """ @@ -91,82 +92,79 @@ class Web(AbstractModule): # Extract item filename, score = message.split() + domains_list = set() + hosts_list = set() + if self.prec_filename is None or filename != self.prec_filename: - domains_list = set() + domains_list.clear() + hosts_list.clear() + PST = Paste.Paste(filename) client = ip2asn() detected_urls = PST.get_regex(self.url_regex) if len(detected_urls) > 0: - to_print = 'Web;{};{};{};'.format( - PST.p_source, PST.p_date, PST.p_name) - self.redis_logger.info('{}Detected {} URL;{}'.format( - to_print, len(detected_urls), PST.p_rel_path)) + to_print = f'Web;{PST.p_source};{PST.p_date};{PST.p_name};' + self.redis_logger.info(f'{to_print}Detected {len(detected_urls)} URL;{PST.p_rel_path}') for url in detected_urls: - self.redis_logger.debug("match regex: %s" % (url)) + + if url.endswith(".on"): + # URL is an onion link skip + # TODO send to TOR crawler ? + # self.redis_logger.debug("Skip onion link") + continue - # self.redis_logger.debug("match regex search: %s"%(url)) + self.redis_logger.debug(f"match regex: {url}") - to_send = "{} {} {}".format(url, PST._get_p_date(), filename) + to_send = f"{url} {PST._get_p_date()} {filename}" self.process.populate_set_out(to_send, 'Url') - self.redis_logger.debug("url_parsed: %s" % (to_send)) + self.redis_logger.debug(f"url_parsed: {to_send}") self.faup.decode(url) domain = self.faup.get_domain() subdomain = self.faup.get_subdomain() - self.redis_logger.debug('{} Published'.format(url)) + self.redis_logger.debug(f'{url} Published') + + domains_list.add(domain) + + hostl = f'{subdomain}.{domain}' if subdomain else domain + + if hostl not in hosts_list: + # test host only once a host in a paste + hosts_list.add(hostl) - if subdomain is not None: - # TODO: # FIXME: remove me try: - subdomain = subdomain.decode() + socket.setdefaulttimeout(1) + ip = socket.gethostbyname(hostl) + # If the resolver is not giving any IPv4 address, + # ASN/CC lookup is skip. + l = client.lookup(ip, qType='IP') + except ipaddress.AddressValueError: + self.redis_logger.debug( + f'ASN/CC lookup failed for IP {ip}') + continue except: - pass + self.redis_logger.debug( + f'Resolver IPv4 address failed for host {hostl}') + continue - if domain is not None: - # TODO: # FIXME: remove me - try: - domain = domain.decode() - except: - pass - domains_list.add(domain) + cc = getattr(l, 'cc') + asn = '' + if getattr(l, 'asn') is not None: + asn = getattr(l, 'asn')[2:] # remobe b' - hostl = self.avoidNone(subdomain) + self.avoidNone(domain) - - try: - socket.setdefaulttimeout(1) - ip = socket.gethostbyname(hostl) - # If the resolver is not giving any IPv4 address, - # ASN/CC lookup is skip. - l = client.lookup(ip, qType='IP') - except ipaddress.AddressValueError: - self.redis_logger.debug( - f'ASN/CC lookup failed for IP {ip}') - continue - except: - self.redis_logger.debug( - f'Resolver IPv4 address failed for host {hostl}') - continue - - cc = getattr(l, 'cc') - asn = '' - if getattr(l, 'asn') is not None: - asn = getattr(l, 'asn')[2:] # remobe b' - - # EU is not an official ISO 3166 code (but used by RIPE - # IP allocation) - if cc is not None and cc != "EU": - self.redis_logger.debug('{};{};{};{}'.format(hostl, asn, cc, - pycountry.countries.get(alpha_2=cc).name)) - if cc == self.cc_critical: - to_print = 'Url;{};{};{};Detected {} {}'.format( - PST.p_source, PST.p_date, PST.p_name, - hostl, cc) - self.redis_logger.info(to_print) - else: - self.redis_logger.debug('{};{};{}'.format(hostl, asn, cc)) + # EU is not an official ISO 3166 code (but used by RIPE + # IP allocation) + if cc is not None and cc != "EU": + countryname = pycountry.countries.get(alpha_2=cc).name + self.redis_logger.debug(f'{hostl};{asn};{cc};{countryname}') + if cc == self.cc_critical: + to_print = f'Url;{PST.p_source};{PST.p_date};{PST.p_name};Detected {hostl} {cc}' + self.redis_logger.info(to_print) + else: + self.redis_logger.debug(f'{hostl};{asn};{cc}') A_values = lib_refine.checking_A_record(self.r_serv2, domains_list) diff --git a/bin/module/abstract_module.py b/bin/module/abstract_module.py index 7c853af6..cb378ff2 100644 --- a/bin/module/abstract_module.py +++ b/bin/module/abstract_module.py @@ -21,10 +21,12 @@ class AbstractModule(ABC): Abstract Module class """ - def __init__(self, module_name=None, queue_name=None): + def __init__(self, module_name=None, queue_name=None, logger_channel='Script'): """ Init Module module_name: str; set the module name if different from the instance ClassName + queue_name: str; set the queue name if different from the instance ClassName + logger_channel: str; set the logger channel name, 'Script' by default """ # Module name if provided else instance className self.module_name = module_name if module_name else self._module_name() @@ -34,12 +36,13 @@ class AbstractModule(ABC): # Init Redis Logger self.redis_logger = publisher + # Port of the redis instance used by pubsublogger self.redis_logger.port = 6380 + # Channel name to publish logs - self.redis_logger.channel = 'Script' - # TODO modify generic channel Script to a namespaced channel like: - # publish module logs to script: channel + # If provided could be a namespaced channel like script: + self.redis_logger.channel = logger_channel # self.redis_logger.channel = 'script:%s'%(self.module_name) # Run module endlessly @@ -62,18 +65,17 @@ class AbstractModule(ABC): # Get one message (paste) from the QueueIn (copy of Redis_Global publish) message = self.process.get_from_set() - if message is None: + if message: + try: + # Module processing with the message from the queue + self.compute(message) + except Exception as err: + self.redis_logger.critical(f"Error in module {self.module_name}: {err}") + else: self.computeNone() # Wait before next process self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s") time.sleep(self.pending_seconds) - continue - - try: - # Module processing with the message from the queue - self.compute(message) - except Exception as err: - self.redis_logger.critical(f"Error in module {self.module_name}: {err}") def _module_name(self): diff --git a/bin/packages/Import_helper.py b/bin/packages/Import_helper.py index ff15115b..8a1dd840 100755 --- a/bin/packages/Import_helper.py +++ b/bin/packages/Import_helper.py @@ -1,19 +1,27 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* +################################## +# Import External packages +################################## import os import sys import uuid import redis +################################## +# Import Project packages +################################## sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader + config_loader = ConfigLoader.ConfigLoader() r_serv_db = config_loader.get_redis_conn("ARDB_DB") r_serv_log_submit = config_loader.get_redis_conn("Redis_Log_submit") config_loader = None + def is_valid_uuid_v4(UUID): UUID = UUID.replace('-', '') try: @@ -22,7 +30,8 @@ def is_valid_uuid_v4(UUID): except: return False -def create_import_queue(tags, galaxy, paste_content, UUID, password=None, isfile = False): + +def create_import_queue(tags, galaxy, paste_content, UUID, password=None, isfile=False, source=None): # save temp value on disk for tag in tags: @@ -34,6 +43,9 @@ def create_import_queue(tags, galaxy, paste_content, UUID, password=None, isfil if password: r_serv_db.set(UUID + ':password', password) + + if source: + r_serv_db.set(UUID + ':source', source) r_serv_db.set(UUID + ':isfile', isfile) @@ -45,8 +57,10 @@ def create_import_queue(tags, galaxy, paste_content, UUID, password=None, isfil # save UUID on disk r_serv_db.sadd('submitted:uuid', UUID) + return UUID + def check_import_status(UUID): if not is_valid_uuid_v4(UUID): return ({'status': 'error', 'reason': 'Invalid uuid'}, 400) diff --git a/bin/submit_paste.py b/bin/submit_paste.py index 1aec936a..7ee009f2 100755 --- a/bin/submit_paste.py +++ b/bin/submit_paste.py @@ -1,6 +1,17 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* +""" +The Submit paste module +================ + +This module is taking paste in redis queue ARDB_DB and submit to global + +""" + +################################## +# Import External packages +################################## import os import sys import gzip @@ -9,250 +20,357 @@ import redis import base64 import datetime import time +# from sflock.main import unpack +# import sflock -from sflock.main import unpack -import sflock - +################################## +# Import Project packages +################################## +from module.abstract_module import AbstractModule from Helper import Process from pubsublogger import publisher - sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/')) import Tag - sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader -def create_paste(uuid, paste_content, ltags, ltagsgalaxies, name): - now = datetime.datetime.now() - save_path = 'submitted/' + now.strftime("%Y") + '/' + now.strftime("%m") + '/' + now.strftime("%d") + '/' + name + '.gz' - - full_path = filename = os.path.join(os.environ['AIL_HOME'], - p.config.get("Directories", "pastes"), save_path) - - if os.path.isfile(full_path): - addError(uuid, 'File: ' + save_path + ' already exist in submitted pastes') - return 1 - - try: - gzipencoded = gzip.compress(paste_content) - gzip64encoded = base64.standard_b64encode(gzipencoded).decode() - except: - abord_file_submission(uuid, "file error") - return 1 - - # use relative path - rel_item_path = save_path.replace(PASTES_FOLDER, '', 1) - - # send paste to Global module - relay_message = "{0} {1}".format(rel_item_path, gzip64encoded) - p.populate_set_out(relay_message, 'Mixer') - - # increase nb of paste by feeder name - r_serv_log_submit.hincrby("mixer_cache:list_feeder", "submitted", 1) - - # add tags - for tag in ltags: - Tag.add_tag('item', tag, rel_item_path) - - for tag in ltagsgalaxies: - Tag.add_tag('item', tag, rel_item_path) - - r_serv_log_submit.incr(uuid + ':nb_end') - r_serv_log_submit.incr(uuid + ':nb_sucess') - - if r_serv_log_submit.get(uuid + ':nb_end') == r_serv_log_submit.get(uuid + ':nb_total'): - r_serv_log_submit.set(uuid + ':end', 1) - - print(' {} send to Global'.format(rel_item_path)) - r_serv_log_submit.sadd(uuid + ':paste_submit_link', rel_item_path) - - curr_date = datetime.date.today() - serv_statistics.hincrby(curr_date.strftime("%Y%m%d"),'submit_paste', 1) - - return 0 - -def addError(uuid, errorMessage): - print(errorMessage) - error = r_serv_log_submit.get(uuid + ':error') - if error != None: - r_serv_log_submit.set(uuid + ':error', error + '

' + errorMessage) - r_serv_log_submit.incr(uuid + ':nb_end') - -def abord_file_submission(uuid, errorMessage): - addError(uuid, errorMessage) - r_serv_log_submit.set(uuid + ':end', 1) - curr_date = datetime.date.today() - serv_statistics.hincrby(curr_date.strftime("%Y%m%d"),'submit_abord', 1) - remove_submit_uuid(uuid) - - -def remove_submit_uuid(uuid): - # save temp value on disk - r_serv_db.delete(uuid + ':ltags') - r_serv_db.delete(uuid + ':ltagsgalaxies') - r_serv_db.delete(uuid + ':paste_content') - r_serv_db.delete(uuid + ':isfile') - r_serv_db.delete(uuid + ':password') - - r_serv_log_submit.expire(uuid + ':end', expire_time) - r_serv_log_submit.expire(uuid + ':processing', expire_time) - r_serv_log_submit.expire(uuid + ':nb_total', expire_time) - r_serv_log_submit.expire(uuid + ':nb_sucess', expire_time) - r_serv_log_submit.expire(uuid + ':nb_end', expire_time) - r_serv_log_submit.expire(uuid + ':error', expire_time) - r_serv_log_submit.expire(uuid + ':paste_submit_link', expire_time) - - # delete uuid - r_serv_db.srem('submitted:uuid', uuid) - print('{} all file submitted'.format(uuid)) - -def get_item_date(item_filename): - l_directory = item_filename.split('/') - return '{}{}{}'.format(l_directory[-4], l_directory[-3], l_directory[-2]) - -def verify_extention_filename(filename): - if not '.' in filename: - return True - else: - file_type = filename.rsplit('.', 1)[1] - - #txt file - if file_type in ALLOWED_EXTENSIONS: - return True - else: - return False - -if __name__ == "__main__": - - publisher.port = 6380 - publisher.channel = "Script" - - config_loader = ConfigLoader.ConfigLoader() - - r_serv_db = config_loader.get_redis_conn("ARDB_DB") - r_serv_log_submit = config_loader.get_redis_conn("Redis_Log_submit") - r_serv_tags = config_loader.get_redis_conn("ARDB_Tags") - r_serv_metadata = config_loader.get_redis_conn("ARDB_Metadata") - serv_statistics = config_loader.get_redis_conn("ARDB_Statistics") +class SubmitPaste(AbstractModule): + """ + Company Credentials module for AIL framework + """ expire_time = 120 - MAX_FILE_SIZE = 1000000000 - ALLOWED_EXTENSIONS = ['txt', 'sh', 'pdf'] + # Text max size + TEXT_MAX_SIZE = ConfigLoader.ConfigLoader().get_config_int("SubmitPaste", "TEXT_MAX_SIZE") + # File max size + FILE_MAX_SIZE = ConfigLoader.ConfigLoader().get_config_int("SubmitPaste", "FILE_MAX_SIZE") + # Allowed file type + ALLOWED_EXTENSIONS = ConfigLoader.ConfigLoader().get_config_str("SubmitPaste", "FILE_ALLOWED_EXTENSIONS").split(',') - config_section = 'submit_paste' - p = Process(config_section) + def __init__(self): + """ + init + """ + super(SubmitPaste, self).__init__(queue_name='submit_paste') - PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], config_loader.get_config_str("Directories", "pastes")) + '/' + self.r_serv_db = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_DB") + self.r_serv_log_submit = ConfigLoader.ConfigLoader().get_redis_conn("Redis_Log_submit") + self.r_serv_tags = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_Tags") + self.r_serv_metadata = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_Metadata") + self.serv_statistics = ConfigLoader.ConfigLoader().get_redis_conn("ARDB_Statistics") - config_loader = None + self.pending_seconds = 3 - while True: - - # paste submitted - if r_serv_db.scard('submitted:uuid') > 0: - uuid = r_serv_db.srandmember('submitted:uuid') - - # get temp value save on disk - ltags = r_serv_db.smembers(uuid + ':ltags') - ltagsgalaxies = r_serv_db.smembers(uuid + ':ltagsgalaxies') - paste_content = r_serv_db.get(uuid + ':paste_content') - isfile = r_serv_db.get(uuid + ':isfile') - password = r_serv_db.get(uuid + ':password') - - # needed if redis is restarted - r_serv_log_submit.set(uuid + ':end', 0) - r_serv_log_submit.set(uuid + ':processing', 0) - r_serv_log_submit.set(uuid + ':nb_total', -1) - r_serv_log_submit.set(uuid + ':nb_end', 0) - r_serv_log_submit.set(uuid + ':nb_sucess', 0) + self.PASTES_FOLDER = os.path.join(os.environ['AIL_HOME'], ConfigLoader.ConfigLoader().get_config_str("Directories", "pastes")) + '/' - r_serv_log_submit.set(uuid + ':processing', 1) + def compute(self, uuid): + """ + Main method of the Module to implement + """ + self.redis_logger.debug(f'compute UUID {uuid}') - if isfile == 'True': - file_full_path = paste_content + # get temp value save on disk + ltags = self.r_serv_db.smembers(f'{uuid}:ltags') + ltagsgalaxies = self.r_serv_db.smembers(f'{uuid}:ltagsgalaxies') + paste_content = self.r_serv_db.get(f'{uuid}:paste_content') + isfile = self.r_serv_db.get(f'{uuid}:isfile') + password = self.r_serv_db.get(f'{uuid}:password') + source = self.r_serv_db.get(f'{uuid}:source') - if not os.path.exists(file_full_path): - abord_file_submission(uuid, "Server Error, the archive can't be found") - continue + self.redis_logger.debug(f'isfile UUID {isfile}') + self.redis_logger.debug(f'source UUID {source}') + self.redis_logger.debug(f'paste_content UUID {paste_content}') - #verify file lengh - if os.stat(file_full_path).st_size > MAX_FILE_SIZE: - abord_file_submission(uuid, 'File :{} too large'.format(file_full_path)) + # needed if redis is restarted + self.r_serv_log_submit.set(f'{uuid}:end', 0) + self.r_serv_log_submit.set(f'{uuid}:processing', 0) + self.r_serv_log_submit.set(f'{uuid}:nb_total', -1) + self.r_serv_log_submit.set(f'{uuid}:nb_end', 0) + self.r_serv_log_submit.set(f'{uuid}:nb_sucess', 0) + + self.r_serv_log_submit.set(f'{uuid}:processing', 1) + + if isfile == 'True': + # file input + self._manage_file(uuid, paste_content, ltags, ltagsgalaxies, source) + + else: + # textarea input paste + self._manage_text(uuid, paste_content, ltags, ltagsgalaxies, source) + + # new paste created from file, remove uuid ref + self.remove_submit_uuid(uuid) + + + def run(self): + """ + Run Module endless process + """ + + # Endless loop processing messages from the input queue + while self.proceed: + # Get one message (paste) from the QueueIn (copy of Redis_Global publish) + nb_submit = self.r_serv_db.scard('submitted:uuid') + + if nb_submit > 0: + try: + uuid = self.r_serv_db.srandmember('submitted:uuid') + # Module processing with the message from the queue + self.redis_logger.debug(uuid) + self.compute(uuid) + except Exception as err: + self.redis_logger.error(f'Error in module {self.module_name}: {err}') + # Remove uuid ref + self.remove_submit_uuid(uuid) + else: + # Wait before next process + self.redis_logger.debug(f'{self.module_name}, waiting for new message, Idling {self.pending_seconds}s') + time.sleep(self.pending_seconds) + + + def _manage_text(self, uuid, paste_content, ltags, ltagsgalaxies, source): + """ + Create a paste for given text + """ + if sys.getsizeof(paste_content) < SubmitPaste.TEXT_MAX_SIZE: + self.r_serv_log_submit.set(f'{uuid}:nb_total', 1) + self.create_paste(uuid, paste_content.encode(), ltags, ltagsgalaxies, uuid, source) + time.sleep(0.5) + else: + self.abord_file_submission(uuid, f'Text size is over {SubmitPaste.TEXT_MAX_SIZE} bytes') + + + def _manage_file(self, uuid, file_full_path, ltags, ltagsgalaxies, source): + """ + Create a paste for given file + """ + self.redis_logger.debug('manage') + + if os.path.exists(file_full_path): + self.redis_logger.debug(f'file exists {file_full_path}') + + file_size = os.stat(file_full_path).st_size + self.redis_logger.debug(f'file size {file_size}') + # Verify file length + if file_size < SubmitPaste.FILE_MAX_SIZE: + # TODO sanitize filename + filename = file_full_path.split('/')[-1] + self.redis_logger.debug(f'sanitize filename {filename}') + self.redis_logger.debug('file size allowed') + + if not '.' in filename: + self.redis_logger.debug('no extension for filename') + try: + # Read file + with open(file_full_path,'r') as f: + content = f.read() + self.r_serv_log_submit.set(uuid + ':nb_total', 1) + self.create_paste(uuid, content.encode(), ltags, ltagsgalaxies, uuid, source) + self.remove_submit_uuid(uuid) + except: + self.abord_file_submission(uuid, "file error") else: - filename = file_full_path.split('/')[-1] - if not '.' in filename: - # read file - try: + file_type = filename.rsplit('.', 1)[1] + file_type = file_type.lower() + self.redis_logger.debug(f'file ext {file_type}') + + if file_type in SubmitPaste.ALLOWED_EXTENSIONS: + self.redis_logger.debug('Extension allowed') + # TODO enum of possible file extension ? + # TODO verify file hash with virus total ? + if not self._is_compressed_type(file_type): + self.redis_logger.debug('Plain text file') + # plain txt file with open(file_full_path,'r') as f: content = f.read() - except: - abord_file_submission(uuid, "file error") - continue - r_serv_log_submit.set(uuid + ':nb_total', 1) - create_paste(uuid, content.encode(), ltags, ltagsgalaxies, uuid) - remove_submit_uuid(uuid) - - else: - file_type = filename.rsplit('.', 1)[1] - - #txt file - if file_type in ALLOWED_EXTENSIONS: - with open(file_full_path,'r') as f: - content = f.read() - r_serv_log_submit.set(uuid + ':nb_total', 1) - create_paste(uuid, content.encode(), ltags, ltagsgalaxies, uuid) - remove_submit_uuid(uuid) - #compressed file + self.r_serv_log_submit.set(uuid + ':nb_total', 1) + self.create_paste(uuid, content.encode(), ltags, ltagsgalaxies, uuid, source) else: - #decompress file - try: - if password == None: - files = unpack(file_full_path.encode()) - #print(files.children) - else: - try: - files = unpack(file_full_path.encode(), password=password.encode()) - #print(files.children) - except sflock.exception.IncorrectUsageException: - abord_file_submission(uuid, "Wrong Password") - continue - except: - abord_file_submission(uuid, "file decompression error") - continue - print('unpacking {} file'.format(files.unpacker)) - if(not files.children): - abord_file_submission(uuid, "Empty compressed file") - continue - # set number of files to submit - r_serv_log_submit.set(uuid + ':nb_total', len(files.children)) - n = 1 - for child in files.children: - if verify_extention_filename(child.filename.decode()): - create_paste(uuid, child.contents, ltags, ltagsgalaxies, uuid+'_'+ str(n) ) - n = n + 1 - else: - print('bad extention') - addError(uuid, 'Bad file extension: {}'.format(child.filename.decode()) ) + # Compressed file + self.abord_file_submission(uuid, "file decompression should be implemented") + # TODO add compress file management + # #decompress file + # try: + # if password == None: + # files = unpack(file_full_path.encode()) + # #print(files.children) + # else: + # try: + # files = unpack(file_full_path.encode(), password=password.encode()) + # #print(files.children) + # except sflock.exception.IncorrectUsageException: + # self.abord_file_submission(uuid, "Wrong Password") + # raise + # except: + # self.abord_file_submission(uuid, "file decompression error") + # raise + # self.redis_logger.debug('unpacking {} file'.format(files.unpacker)) + # if(not files.children): + # self.abord_file_submission(uuid, "Empty compressed file") + # raise + # # set number of files to submit + # self.r_serv_log_submit.set(uuid + ':nb_total', len(files.children)) + # n = 1 + # for child in files.children: + # if self.verify_extention_filename(child.filename.decode()): + # self.create_paste(uuid, child.contents, ltags, ltagsgalaxies, uuid+'_'+ str(n) , source) + # n = n + 1 + # else: + # self.redis_logger.error("Error in module %s: bad extention"%(self.module_name)) + # self.addError(uuid, 'Bad file extension: {}'.format(child.filename.decode()) ) - except FileNotFoundError: - print('file not found') - addError(uuid, 'File not found: {}'.format(file_full_path), uuid ) + # except FileNotFoundError: + # self.redis_logger.error("Error in module %s: file not found"%(self.module_name)) + # self.addError(uuid, 'File not found: {}'.format(file_full_path), uuid ) - remove_submit_uuid(uuid) - - - - # textarea input paste else: - r_serv_log_submit.set(uuid + ':nb_total', 1) - create_paste(uuid, paste_content.encode(), ltags, ltagsgalaxies, uuid) - remove_submit_uuid(uuid) - time.sleep(0.5) - - # wait for paste + self.abord_file_submission(uuid, f'File :{file_full_path} too large, over {SubmitPaste.FILE_MAX_SIZE} bytes') else: - publisher.debug("Script submit_paste is Idling 10s") - time.sleep(3) + self.abord_file_submission(uuid, "Server Error, the archive can't be found") + + + def _is_compressed_type(self, file_type): + """ + Check if file type is in the list of compressed file extensions format + """ + compressed_type = ['zip', 'gz', 'tar.gz'] + + return file_type in compressed_type + + + def remove_submit_uuid(self, uuid): + # save temp value on disk + self.r_serv_db.delete(f'{uuid}:ltags') + self.r_serv_db.delete(f'{uuid}:ltagsgalaxies') + self.r_serv_db.delete(f'{uuid}:paste_content') + self.r_serv_db.delete(f'{uuid}:isfile') + self.r_serv_db.delete(f'{uuid}:password') + self.r_serv_db.delete(f'{uuid}:source') + + self.r_serv_log_submit.expire(f'{uuid}:end', SubmitPaste.expire_time) + self.r_serv_log_submit.expire(f'{uuid}:processing', SubmitPaste.expire_time) + self.r_serv_log_submit.expire(f'{uuid}:nb_total', SubmitPaste.expire_time) + self.r_serv_log_submit.expire(f'{uuid}:nb_sucess', SubmitPaste.expire_time) + self.r_serv_log_submit.expire(f'{uuid}:nb_end', SubmitPaste.expire_time) + self.r_serv_log_submit.expire(f'{uuid}:error', SubmitPaste.expire_time) + self.r_serv_log_submit.expire(f'{uuid}:paste_submit_link', SubmitPaste.expire_time) + + # delete uuid + self.r_serv_db.srem('submitted:uuid', uuid) + self.redis_logger.debug(f'{uuid} all file submitted') + + + def create_paste(self, uuid, paste_content, ltags, ltagsgalaxies, name, source=None): + + result = False + + now = datetime.datetime.now() + source = source if source else 'submitted' + save_path = source + '/' + now.strftime("%Y") + '/' + now.strftime("%m") + '/' + now.strftime("%d") + '/' + name + '.gz' + + full_path = filename = os.path.join(os.environ['AIL_HOME'], + self.process.config.get("Directories", "pastes"), save_path) + + self.redis_logger.debug(f'file path of the paste {full_path}') + + if not os.path.isfile(full_path): + # file not exists in AIL paste directory + self.redis_logger.debug(f"new paste {paste_content}") + + gzip64encoded = self._compress_encode_content(paste_content) + + if gzip64encoded: + + # use relative path + rel_item_path = save_path.replace(self.PASTES_FOLDER, '', 1) + self.redis_logger.debug(f"relative path {rel_item_path}") + + # send paste to Global module + relay_message = f"{rel_item_path} {gzip64encoded}" + self.process.populate_set_out(relay_message, 'Mixer') + + # increase nb of paste by feeder name + self.r_serv_log_submit.hincrby("mixer_cache:list_feeder", source, 1) + + # add tags + for tag in ltags: + Tag.add_tag('item', tag, rel_item_path) + + for tag in ltagsgalaxies: + Tag.add_tag('item', tag, rel_item_path) + + self.r_serv_log_submit.incr(f'{uuid}:nb_end') + self.r_serv_log_submit.incr(f'{uuid}:nb_sucess') + + if self.r_serv_log_submit.get(f'{uuid}:nb_end') == self.r_serv_log_submit.get(f'{uuid}:nb_total'): + self.r_serv_log_submit.set(f'{uuid}:end', 1) + + self.redis_logger.debug(f' {rel_item_path} send to Global') + self.r_serv_log_submit.sadd(f'{uuid}:paste_submit_link', rel_item_path) + + curr_date = datetime.date.today() + self.serv_statistics.hincrby(curr_date.strftime("%Y%m%d"),'submit_paste', 1) + self.redis_logger.debug("paste submitted") + else: + self.addError(uuid, f'File: {save_path} already exist in submitted pastes') + + return result + + + def _compress_encode_content(self, content): + gzip64encoded = None + + try: + gzipencoded = gzip.compress(content) + gzip64encoded = base64.standard_b64encode(gzipencoded).decode() + except: + self.abord_file_submission(uuid, "file error") + + return gzip64encoded + + + def addError(self, uuid, errorMessage): + self.redis_logger.debug(errorMessage) + + error = self.r_serv_log_submit.get(f'{uuid}:error') + if error != None: + self.r_serv_log_submit.set(f'{uuid}:error', error + '

' + errorMessage) + + self.r_serv_log_submit.incr(f'{uuid}:nb_end') + + + def abord_file_submission(self, uuid, errorMessage): + self.redis_logger.debug(f'abord {uuid}, {errorMessage}') + + self.addError(uuid, errorMessage) + self.r_serv_log_submit.set(f'{uuid}:end', 1) + curr_date = datetime.date.today() + self.serv_statistics.hincrby(curr_date.strftime("%Y%m%d"),'submit_abord', 1) + self.remove_submit_uuid(uuid) + + + def get_item_date(self, item_filename): + l_directory = item_filename.split('/') + return f'{l_directory[-4]}{l_directory[-3]}{l_directory[-2]}' + + + def verify_extention_filename(self, filename): + if not '.' in filename: + return True + else: + file_type = filename.rsplit('.', 1)[1] + + #txt file + if file_type in SubmitPaste.ALLOWED_EXTENSIONS: + return True + else: + return False + + +if __name__ == '__main__': + + module = SubmitPaste() + module.run() diff --git a/configs/core.cfg.sample b/configs/core.cfg.sample index df6fed66..23b1f9fe 100644 --- a/configs/core.cfg.sample +++ b/configs/core.cfg.sample @@ -280,3 +280,12 @@ domain_proxy = onion.foundation # list of comma-separated CIDR that you wish to be alerted for. e.g: #networks = 192.168.34.0/24,10.0.0.0/8,192.168.33.0/24 networks = + +[SubmitPaste] +# 1 Mb Max text paste size for text submission +TEXT_MAX_SIZE = 1000000 +# 1 Gb Max file size for file submission +FILE_MAX_SIZE = 1000000000 +# Managed file extenions for file submission, comma separated +# TODO add 'zip', 'gz' and 'tar.gz' +FILE_ALLOWED_EXTENSIONS = 'txt','sh','pdf' diff --git a/var/www/modules/Flask_config.py b/var/www/modules/Flask_config.py index 574c9950..e7227888 100644 --- a/var/www/modules/Flask_config.py +++ b/var/www/modules/Flask_config.py @@ -4,12 +4,19 @@ ''' Flask global variables shared accross modules ''' +################################## +# Import External packages +################################## import os import re import sys +################################## +# Import Project packages +################################## sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader +from pubsublogger import publisher # FLASK # app = None @@ -32,6 +39,15 @@ r_serv_db = config_loader.get_redis_conn("ARDB_DB") r_serv_statistics = config_loader.get_redis_conn("ARDB_Statistics") r_serv_onion = config_loader.get_redis_conn("ARDB_Onion") + +# Logger (Redis) +redis_logger = publisher +# Port of the redis instance used by pubsublogger +redis_logger.port = 6380 +# Channel name to publish logs +redis_logger.channel = 'front' + + sys.path.append('../../configs/keys') # MISP # try: @@ -110,7 +126,11 @@ crawler_enabled = config_loader.get_config_boolean("Crawler", "activate_crawler" email_regex = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}' email_regex = re.compile(email_regex) -IMPORT_MAX_TEXT_SIZE = 900000 # size in bytes +# SubmitPaste vars +SUBMIT_PASTE_TEXT_MAX_SIZE = int(config_loader.get_config_str("SubmitPaste", "TEXT_MAX_SIZE")) +SUBMIT_PASTE_FILE_MAX_SIZE = int(config_loader.get_config_str("SubmitPaste", "FILE_MAX_SIZE")) +SUBMIT_PASTE_FILE_ALLOWED_EXTENSIONS = [item.strip() for item in config_loader.get_config_str("SubmitPaste", "FILE_ALLOWED_EXTENSIONS").split(',')] + # VT try: diff --git a/var/www/modules/PasteSubmit/Flask_PasteSubmit.py b/var/www/modules/PasteSubmit/Flask_PasteSubmit.py index 0a10c251..85bd1cb2 100644 --- a/var/www/modules/PasteSubmit/Flask_PasteSubmit.py +++ b/var/www/modules/PasteSubmit/Flask_PasteSubmit.py @@ -4,28 +4,33 @@ ''' Flask functions and routes for the trending modules page ''' -import redis -from flask import Flask, render_template, jsonify, request, Blueprint, url_for, redirect - -from Role_Manager import login_admin, login_analyst -from flask_login import login_required - -import unicodedata -import string -import subprocess +################################## +# Import External packages +################################## import os import sys +import json +import string +import subprocess import datetime +import redis +import unicodedata import uuid from io import BytesIO from Date import Date -import json +from flask import Flask, render_template, jsonify, request, Blueprint, url_for, redirect, abort +from functools import wraps +from Role_Manager import login_admin, login_analyst +from flask_login import login_required + + +################################## +# Import Project packages +################################## import Paste - import Import_helper import Tag - from pytaxonomies import Taxonomies from pymispgalaxies import Galaxies, Clusters @@ -49,6 +54,8 @@ r_serv_metadata = Flask_config.r_serv_metadata r_serv_db = Flask_config.r_serv_db r_serv_log_submit = Flask_config.r_serv_log_submit +logger = Flask_config.redis_logger + pymisp = Flask_config.pymisp if pymisp is False: flag_misp = False @@ -61,12 +68,32 @@ PasteSubmit = Blueprint('PasteSubmit', __name__, template_folder='templates') valid_filename_chars = "-_ %s%s" % (string.ascii_letters, string.digits) -ALLOWED_EXTENSIONS = set(['txt', 'sh', 'pdf', 'zip', 'gz', 'tar.gz']) UPLOAD_FOLDER = Flask_config.UPLOAD_FOLDER misp_event_url = Flask_config.misp_event_url hive_case_url = Flask_config.hive_case_url +text_max_size = int(Flask_config.SUBMIT_PASTE_TEXT_MAX_SIZE) / (1000*1000) +file_max_size = int(Flask_config.SUBMIT_PASTE_FILE_MAX_SIZE) / (1000*1000*1000) +allowed_extensions = ", ". join(Flask_config.SUBMIT_PASTE_FILE_ALLOWED_EXTENSIONS) + + +# ============ Validators ============ +def limit_content_length(): + def decorator(f): + @wraps(f) + def wrapper(*args, **kwargs): + logger.debug('decorator') + cl = request.content_length + if cl is not None: + if cl > Flask_config.SUBMIT_PASTE_FILE_MAX_SIZE or ('file' not in request.files and cl > Flask_config.SUBMIT_PASTE_TEXT_MAX_SIZE): + logger.debug('abort') + abort(413) + return f(*args, **kwargs) + return wrapper + return decorator + + # ============ FUNCTIONS ============ def one(): return 1 @@ -75,7 +102,10 @@ def allowed_file(filename): if not '.' in filename: return True else: - return filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS + file_ext = filename.rsplit('.', 1)[1].lower() + logger.debug(file_ext) + logger.debug(Flask_config.SUBMIT_PASTE_FILE_ALLOWED_EXTENSIONS) + return file_ext in Flask_config.SUBMIT_PASTE_FILE_ALLOWED_EXTENSIONS def clean_filename(filename, whitelist=valid_filename_chars, replace=' '): # replace characters @@ -197,22 +227,22 @@ def hive_create_case(hive_tlp, threat_level, hive_description, hive_case_title, res = HiveApi.create_case_observable(id,observ_sensor) if res.status_code != 201: - print('ko: {}/{}'.format(res.status_code, res.text)) + logger.info(f'ko sensor: {res.status_code}/{res.text}') res = HiveApi.create_case_observable(id, observ_source) if res.status_code != 201: - print('ko: {}/{}'.format(res.status_code, res.text)) + logger.info(f'ko source: {res.status_code}/{res.text}') res = HiveApi.create_case_observable(id, observ_file) if res.status_code != 201: - print('ko: {}/{}'.format(res.status_code, res.text)) + logger.info(f'ko file: {res.status_code}/{res.text}') res = HiveApi.create_case_observable(id, observ_last_seen) if res.status_code != 201: - print('ko: {}/{}'.format(res.status_code, res.text)) + logger.info(f'ko last_seen: {res.status_code}/{res.text}') r_serv_metadata.set('hive_cases:'+path, id) return hive_case_url.replace('id_here', id) else: - print('ko: {}/{}'.format(response.status_code, response.text)) + logger.info(f'ko: {response.status_code}/{response.text}') return False # ============= ROUTES ============== @@ -227,27 +257,35 @@ def PasteSubmit_page(): return render_template("submit_items.html", active_taxonomies = active_taxonomies, - active_galaxies = active_galaxies) + active_galaxies = active_galaxies, + text_max_size = text_max_size, + file_max_size = file_max_size, + allowed_extensions = allowed_extensions) @PasteSubmit.route("/PasteSubmit/submit", methods=['POST']) @login_required @login_analyst +@limit_content_length() def submit(): #paste_name = request.form['paste_name'] + logger.debug('submit') password = request.form['password'] ltags = request.form['tags_taxonomies'] ltagsgalaxies = request.form['tags_galaxies'] paste_content = request.form['paste_content'] + paste_source = request.form['paste_source'] is_file = False if 'file' in request.files: - file = request.files['file'] - if file: - if file.filename: + file_import = request.files['file'] + if file_import: + if file_import.filename: is_file = True + logger.debug(f'is file ? {is_file}') + submitted_tag = 'infoleak:submission="manual"' #active taxonomies @@ -256,13 +294,13 @@ def submit(): active_galaxies = Tag.get_active_galaxies() if ltags or ltagsgalaxies: - + logger.debug(f'ltags ? {ltags} {ltagsgalaxies}') ltags = Tag.unpack_str_tags_list(ltags) ltagsgalaxies = Tag.unpack_str_tags_list(ltagsgalaxies) if not Tag.is_valid_tags_taxonomies_galaxy(ltags, ltagsgalaxies): content = 'INVALID TAGS' - print(content) + logger.info(content) return content, 400 # add submitted tags @@ -271,55 +309,42 @@ def submit(): ltags.append(submitted_tag) if is_file: - if file: + logger.debug('file management') - if file and allowed_file(file.filename): + if allowed_file(file_import.filename): + logger.debug('file extension allowed') - # get UUID - UUID = str(uuid.uuid4()) - - '''if paste_name: - # clean file name - UUID = clean_filename(paste_name)''' - - # create submitted dir - if not os.path.exists(UPLOAD_FOLDER): - os.makedirs(UPLOAD_FOLDER) - - if not '.' in file.filename: - full_path = os.path.join(UPLOAD_FOLDER, UUID) - else: - if file.filename[-6:] == 'tar.gz': - file_type = 'tar.gz' - else: - file_type = file.filename.rsplit('.', 1)[1] - name = UUID + '.' + file_type - full_path = os.path.join(UPLOAD_FOLDER, name) - - #Flask verify the file size - file.save(full_path) - - paste_content = full_path - - Import_helper.create_import_queue(ltags, ltagsgalaxies, paste_content, UUID, password ,True) - - return render_template("submit_items.html", - active_taxonomies = active_taxonomies, - active_galaxies = active_galaxies, - UUID = UUID) - - else: - content = 'wrong file type, allowed_extensions: sh, pdf, zip, gz, tar.gz or remove the extension' - print(content) - return content, 400 - - - elif paste_content != '': - if sys.getsizeof(paste_content) < 900000: - - # get id + # get UUID UUID = str(uuid.uuid4()) - Import_helper.create_import_queue(ltags, ltagsgalaxies, paste_content, UUID, password) + + '''if paste_name: + # clean file name + UUID = clean_filename(paste_name)''' + + # create submitted dir + if not os.path.exists(UPLOAD_FOLDER): + logger.debug('create folder') + os.makedirs(UPLOAD_FOLDER) + + if not '.' in file_import.filename: + logger.debug('add UUID to path') + full_path = os.path.join(UPLOAD_FOLDER, UUID) + else: + if file_import.filename[-6:] == 'tar.gz': + logger.debug('file extension is tar.gz') + file_type = 'tar.gz' + else: + file_type = file_import.filename.rsplit('.', 1)[1] + logger.debug(f'file type {file_type}') + name = UUID + '.' + file_type + full_path = os.path.join(UPLOAD_FOLDER, name) + logger.debug(f'full path {full_path}') + + #Flask verify the file size + file_import.save(full_path) + logger.debug('file saved') + + Import_helper.create_import_queue(ltags, ltagsgalaxies, full_path, UUID, password, True) return render_template("submit_items.html", active_taxonomies = active_taxonomies, @@ -327,12 +352,32 @@ def submit(): UUID = UUID) else: - content = 'size error' - print(content) + content = f'wrong file type, allowed_extensions: {allowed_extensions} or remove the extension' + logger.info(content) + return content, 400 + + + elif paste_content != '': + logger.debug(f'entering text paste management') + if sys.getsizeof(paste_content) < Flask_config.SUBMIT_PASTE_TEXT_MAX_SIZE: + logger.debug(f'size {sys.getsizeof(paste_content)}') + # get id + UUID = str(uuid.uuid4()) + logger.debug('create import') + Import_helper.create_import_queue(ltags, ltagsgalaxies, paste_content, UUID, password, source=paste_source) + logger.debug('import OK') + return render_template("submit_items.html", + active_taxonomies = active_taxonomies, + active_galaxies = active_galaxies, + UUID = UUID) + + else: + content = f'text paste size is over {Flask_config.SUBMIT_PASTE_TEXT_MAX_SIZE} bytes limit' + logger.info(content) return content, 400 content = 'submit aborded' - print(content) + logger.error(content) return content, 400 diff --git a/var/www/modules/PasteSubmit/templates/submit_items.html b/var/www/modules/PasteSubmit/templates/submit_items.html index 18b62c2a..1a8d4f75 100644 --- a/var/www/modules/PasteSubmit/templates/submit_items.html +++ b/var/www/modules/PasteSubmit/templates/submit_items.html @@ -17,68 +17,76 @@ + {% if UUID %} -