diff --git a/bin/import.py b/bin/import.py index 9fc3e1d..2165063 100644 --- a/bin/import.py +++ b/bin/import.py @@ -13,16 +13,19 @@ import time # Configuration cpe_path = '../data/official-cpe-dictionary_v2.3.xml' -cpe_source = 'https://nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz' +cpe_source = ( + 'https://nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz' +) rdb = redis.Redis(host='127.0.0.1', port=6379, db=8) -class CPEHandler( xml.sax.ContentHandler ): + +class CPEHandler(xml.sax.ContentHandler): def __init__(self): self.cpe = "" self.title = "" self.title_seen = False self.cpe = "" - self.record = {} + self.record = {} self.refs = [] self.itemcount = 0 self.wordcount = 0 @@ -52,19 +55,21 @@ class CPEHandler( xml.sax.ContentHandler ): if tag == 'cpe-item': to_insert = CPEExtractor(cpe=self.record['cpe-23']) for word in canonize(to_insert['vendor']): - insert( word=word, cpe=to_insert['cpeline'] ) + insert(word=word, cpe=to_insert['cpeline']) self.wordcount += 1 for word in canonize(to_insert['product']): - insert( word=word, cpe=to_insert['cpeline'] ) + insert(word=word, cpe=to_insert['cpeline']) self.wordcount += 1 self.record = {} self.itemcount += 1 if self.itemcount % 5000 == 0: - time_elapsed = round( time.time() - self.start_time ) - print (f"... {self.itemcount} items processed ({self.wordcount} words) in {time_elapsed} seconds") + time_elapsed = round(time.time() - self.start_time) + print( + f"... {self.itemcount} items processed ({self.wordcount} words) in {time_elapsed} seconds" + ) -def CPEExtractor( cpe=None ): +def CPEExtractor(cpe=None): if cpe is None: return False record = {} @@ -77,12 +82,14 @@ def CPEExtractor( cpe=None ): record['cpeline'] = cpeline[1:] return record -def canonize( value=None ): + +def canonize(value=None): value = value.lower() words = value.split('_') return words -def insert( word=None, cpe=None): + +def insert(word=None, cpe=None): if cpe is None or word is None: return False rdb.sadd(f"w:{word}", cpe) @@ -91,10 +98,30 @@ def insert( word=None, cpe=None): if __name__ == '__main__': - argparser = argparse.ArgumentParser(description='Initializes the Redis database with CPE dictionary.') - argparser.add_argument('--download', '-d', action='count', default=0, help='Download the CPE dictionary even if it already exists.') - argparser.add_argument('--replace', '-r', action='count', default=0, help='Flush and repopulated the CPE database.') - argparser.add_argument('--update', '-u', action='store_true', default=False, help='Update the CPE database without flushing') + argparser = argparse.ArgumentParser( + description='Initializes the Redis database with CPE dictionary.' + ) + argparser.add_argument( + '--download', + '-d', + action='count', + default=0, + help='Download the CPE dictionary even if it already exists.', + ) + argparser.add_argument( + '--replace', + '-r', + action='count', + default=0, + help='Flush and repopulated the CPE database.', + ) + argparser.add_argument( + '--update', + '-u', + action='store_true', + default=False, + help='Update the CPE database without flushing', + ) args = argparser.parse_args() if args.replace == 0 and rdb.dbsize() > 0 and not args.update: @@ -106,7 +133,12 @@ if __name__ == '__main__': print(f"Downloading CPE data from {cpe_source} ...") try: urllib.request.urlretrieve(cpe_source, f"{cpe_path}.gz") - except (urllib.error.HTTPError, urllib.error.URLError, FileNotFoundError, PermissionError) as e: + except ( + urllib.error.HTTPError, + urllib.error.URLError, + FileNotFoundError, + PermissionError, + ) as e: print(e) sys.exit(1) @@ -130,6 +162,6 @@ if __name__ == '__main__': print("Populating the database (please be patient)...") parser = xml.sax.make_parser() Handler = CPEHandler() - parser.setContentHandler( Handler ) + parser.setContentHandler(Handler) parser.parse(cpe_path) print(f"Done! {rdb.dbsize()} keys inserted.") diff --git a/bin/lookup.py b/bin/lookup.py index e5ec5dd..49a17dc 100644 --- a/bin/lookup.py +++ b/bin/lookup.py @@ -11,8 +11,16 @@ sys.path.append(os.path.join(runPath, "..")) from lib.cpeguesser import CPEGuesser if __name__ == '__main__': - parser = argparse.ArgumentParser(description='Find potential CPE names from a list of keyword(s) and return a JSON of the results') - parser.add_argument('word', metavar='WORD', type=str, nargs='+', help='One or more keyword(s) to lookup') + parser = argparse.ArgumentParser( + description='Find potential CPE names from a list of keyword(s) and return a JSON of the results' + ) + parser.add_argument( + 'word', + metavar='WORD', + type=str, + nargs='+', + help='One or more keyword(s) to lookup', + ) args = parser.parse_args() cpeGuesser = CPEGuesser() diff --git a/bin/server.py b/bin/server.py index d256c6a..9e7cf57 100644 --- a/bin/server.py +++ b/bin/server.py @@ -14,7 +14,8 @@ runPath = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(runPath, "..")) from lib.cpeguesser import CPEGuesser -class Search(): + +class Search: def on_post(self, req, resp): data_post = req.bounded_stream.read() js = data_post.decode('utf-8') @@ -33,7 +34,8 @@ class Search(): return cpeGuesser = CPEGuesser() - resp.media=cpeGuesser.guessCpe(q['query']) + resp.media = cpeGuesser.guessCpe(q['query']) + if __name__ == '__main__': app = falcon.App() @@ -44,7 +46,7 @@ if __name__ == '__main__': print(f"Serving on port {port}...") httpd.serve_forever() except OSError as e: - print (e) + print(e) sys.exit(1) except KeyboardInterrupt: sys.exit(0) diff --git a/lib/cpeguesser.py b/lib/cpeguesser.py index aa32b0d..45d20e2 100644 --- a/lib/cpeguesser.py +++ b/lib/cpeguesser.py @@ -3,12 +3,13 @@ import redis -class CPEGuesser(): + +class CPEGuesser: def __init__(self): self.rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True) def guessCpe(self, words): - k=[] + k = [] for keyword in words: k.append(f"w:{keyword.lower()}")