chg: [format] black -S on all files

This commit is contained in:
Alexandre Dulaunoy 2021-10-19 18:30:29 +02:00
parent 3d40abfd5c
commit 80f0542d22
Signed by: adulau
GPG key ID: 09E2CD4944E6CBCD
4 changed files with 66 additions and 23 deletions

View file

@ -13,9 +13,12 @@ import time
# Configuration # Configuration
cpe_path = '../data/official-cpe-dictionary_v2.3.xml' cpe_path = '../data/official-cpe-dictionary_v2.3.xml'
cpe_source = 'https://nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz' cpe_source = (
'https://nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz'
)
rdb = redis.Redis(host='127.0.0.1', port=6379, db=8) rdb = redis.Redis(host='127.0.0.1', port=6379, db=8)
class CPEHandler(xml.sax.ContentHandler): class CPEHandler(xml.sax.ContentHandler):
def __init__(self): def __init__(self):
self.cpe = "" self.cpe = ""
@ -61,7 +64,9 @@ class CPEHandler( xml.sax.ContentHandler ):
self.itemcount += 1 self.itemcount += 1
if self.itemcount % 5000 == 0: if self.itemcount % 5000 == 0:
time_elapsed = round(time.time() - self.start_time) time_elapsed = round(time.time() - self.start_time)
print (f"... {self.itemcount} items processed ({self.wordcount} words) in {time_elapsed} seconds") print(
f"... {self.itemcount} items processed ({self.wordcount} words) in {time_elapsed} seconds"
)
def CPEExtractor(cpe=None): def CPEExtractor(cpe=None):
@ -77,11 +82,13 @@ def CPEExtractor( cpe=None ):
record['cpeline'] = cpeline[1:] record['cpeline'] = cpeline[1:]
return record return record
def canonize(value=None): def canonize(value=None):
value = value.lower() value = value.lower()
words = value.split('_') words = value.split('_')
return words return words
def insert(word=None, cpe=None): def insert(word=None, cpe=None):
if cpe is None or word is None: if cpe is None or word is None:
return False return False
@ -91,10 +98,30 @@ def insert( word=None, cpe=None):
if __name__ == '__main__': if __name__ == '__main__':
argparser = argparse.ArgumentParser(description='Initializes the Redis database with CPE dictionary.') argparser = argparse.ArgumentParser(
argparser.add_argument('--download', '-d', action='count', default=0, help='Download the CPE dictionary even if it already exists.') description='Initializes the Redis database with CPE dictionary.'
argparser.add_argument('--replace', '-r', action='count', default=0, help='Flush and repopulated the CPE database.') )
argparser.add_argument('--update', '-u', action='store_true', default=False, help='Update the CPE database without flushing') argparser.add_argument(
'--download',
'-d',
action='count',
default=0,
help='Download the CPE dictionary even if it already exists.',
)
argparser.add_argument(
'--replace',
'-r',
action='count',
default=0,
help='Flush and repopulated the CPE database.',
)
argparser.add_argument(
'--update',
'-u',
action='store_true',
default=False,
help='Update the CPE database without flushing',
)
args = argparser.parse_args() args = argparser.parse_args()
if args.replace == 0 and rdb.dbsize() > 0 and not args.update: if args.replace == 0 and rdb.dbsize() > 0 and not args.update:
@ -106,7 +133,12 @@ if __name__ == '__main__':
print(f"Downloading CPE data from {cpe_source} ...") print(f"Downloading CPE data from {cpe_source} ...")
try: try:
urllib.request.urlretrieve(cpe_source, f"{cpe_path}.gz") urllib.request.urlretrieve(cpe_source, f"{cpe_path}.gz")
except (urllib.error.HTTPError, urllib.error.URLError, FileNotFoundError, PermissionError) as e: except (
urllib.error.HTTPError,
urllib.error.URLError,
FileNotFoundError,
PermissionError,
) as e:
print(e) print(e)
sys.exit(1) sys.exit(1)

View file

@ -11,8 +11,16 @@ sys.path.append(os.path.join(runPath, ".."))
from lib.cpeguesser import CPEGuesser from lib.cpeguesser import CPEGuesser
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Find potential CPE names from a list of keyword(s) and return a JSON of the results') parser = argparse.ArgumentParser(
parser.add_argument('word', metavar='WORD', type=str, nargs='+', help='One or more keyword(s) to lookup') description='Find potential CPE names from a list of keyword(s) and return a JSON of the results'
)
parser.add_argument(
'word',
metavar='WORD',
type=str,
nargs='+',
help='One or more keyword(s) to lookup',
)
args = parser.parse_args() args = parser.parse_args()
cpeGuesser = CPEGuesser() cpeGuesser = CPEGuesser()

View file

@ -14,7 +14,8 @@ runPath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(runPath, "..")) sys.path.append(os.path.join(runPath, ".."))
from lib.cpeguesser import CPEGuesser from lib.cpeguesser import CPEGuesser
class Search():
class Search:
def on_post(self, req, resp): def on_post(self, req, resp):
data_post = req.bounded_stream.read() data_post = req.bounded_stream.read()
js = data_post.decode('utf-8') js = data_post.decode('utf-8')
@ -35,6 +36,7 @@ class Search():
cpeGuesser = CPEGuesser() cpeGuesser = CPEGuesser()
resp.media = cpeGuesser.guessCpe(q['query']) resp.media = cpeGuesser.guessCpe(q['query'])
if __name__ == '__main__': if __name__ == '__main__':
app = falcon.App() app = falcon.App()
app.add_route('/search', Search()) app.add_route('/search', Search())

View file

@ -3,7 +3,8 @@
import redis import redis
class CPEGuesser():
class CPEGuesser:
def __init__(self): def __init__(self):
self.rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True) self.rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True)