mirror of
https://github.com/cve-search/cpe-guesser.git
synced 2024-11-15 03:18:28 +00:00
Merge pull request #5 from oh2fih/main
Optimization: replace format() with f-strings
This commit is contained in:
commit
93fbc1a0da
4 changed files with 37 additions and 28 deletions
|
@ -9,6 +9,7 @@ import gzip
|
||||||
import shutil
|
import shutil
|
||||||
import xml.sax
|
import xml.sax
|
||||||
import redis
|
import redis
|
||||||
|
import time
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
cpe_path = '../data/official-cpe-dictionary_v2.3.xml'
|
cpe_path = '../data/official-cpe-dictionary_v2.3.xml'
|
||||||
|
@ -25,6 +26,7 @@ class CPEHandler( xml.sax.ContentHandler ):
|
||||||
self.refs = []
|
self.refs = []
|
||||||
self.itemcount = 0
|
self.itemcount = 0
|
||||||
self.wordcount = 0
|
self.wordcount = 0
|
||||||
|
self.start_time = time.time()
|
||||||
|
|
||||||
def startElement(self, tag, attributes):
|
def startElement(self, tag, attributes):
|
||||||
self.CurrentData = tag
|
self.CurrentData = tag
|
||||||
|
@ -58,7 +60,8 @@ class CPEHandler( xml.sax.ContentHandler ):
|
||||||
self.record = {}
|
self.record = {}
|
||||||
self.itemcount += 1
|
self.itemcount += 1
|
||||||
if self.itemcount % 5000 == 0:
|
if self.itemcount % 5000 == 0:
|
||||||
print ("... {} items processed ({} words)".format(str(self.itemcount), str(self.wordcount)))
|
time_elapsed = round( time.time() - self.start_time )
|
||||||
|
print (f"... {self.itemcount} items processed ({self.wordcount} words) in {time_elapsed} seconds")
|
||||||
|
|
||||||
|
|
||||||
def CPEExtractor( cpe=None ):
|
def CPEExtractor( cpe=None ):
|
||||||
|
@ -70,7 +73,7 @@ def CPEExtractor( cpe=None ):
|
||||||
record['product'] = cpefield[4]
|
record['product'] = cpefield[4]
|
||||||
cpeline = ""
|
cpeline = ""
|
||||||
for cpeentry in cpefield[:5]:
|
for cpeentry in cpefield[:5]:
|
||||||
cpeline = "{}:{}".format(cpeline, cpeentry)
|
cpeline = f"{cpeline}:{cpeentry}"
|
||||||
record['cpeline'] = cpeline[1:]
|
record['cpeline'] = cpeline[1:]
|
||||||
return record
|
return record
|
||||||
|
|
||||||
|
@ -82,9 +85,9 @@ def canonize( value=None ):
|
||||||
def insert( word=None, cpe=None):
|
def insert( word=None, cpe=None):
|
||||||
if cpe is None or word is None:
|
if cpe is None or word is None:
|
||||||
return False
|
return False
|
||||||
rdb.sadd('w:{}'.format(word), cpe)
|
rdb.sadd(f"w:{word}", cpe)
|
||||||
rdb.zadd('s:{}'.format(word), {cpe: 1}, incr=True)
|
rdb.zadd(f"s:{word}", {cpe: 1}, incr=True)
|
||||||
rdb.zadd('rank:cpe', {cpe: 1}, incr=True)
|
rdb.zadd("rank:cpe", {cpe: 1}, incr=True)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -94,33 +97,33 @@ if __name__ == '__main__':
|
||||||
args = argparser.parse_args()
|
args = argparser.parse_args()
|
||||||
|
|
||||||
if args.replace == 0 and rdb.dbsize() > 0:
|
if args.replace == 0 and rdb.dbsize() > 0:
|
||||||
print("Warning! The Redis database already has " + str(rdb.dbsize()) + " keys.")
|
print(f"Warning! The Redis database already has {rdb.dbsize()} keys.")
|
||||||
print("Use --replace if you want to flush the database and repopulate it.")
|
print("Use --replace if you want to flush the database and repopulate it.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
if args.download > 0 or not os.path.isfile(cpe_path):
|
if args.download > 0 or not os.path.isfile(cpe_path):
|
||||||
print("Downloading CPE data from " + cpe_source + " ...")
|
print(f"Downloading CPE data from {cpe_source} ...")
|
||||||
try:
|
try:
|
||||||
urllib.request.urlretrieve(cpe_source, cpe_path + ".gz")
|
urllib.request.urlretrieve(cpe_source, f"{cpe_path}.gz")
|
||||||
except (urllib.error.HTTPError, urllib.error.URLError, FileNotFoundError, PermissionError) as e:
|
except (urllib.error.HTTPError, urllib.error.URLError, FileNotFoundError, PermissionError) as e:
|
||||||
print(e)
|
print(e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
print("Uncompressing {}.gz ...".format(cpe_path))
|
print(f"Uncompressing {cpe_path}.gz ...")
|
||||||
try:
|
try:
|
||||||
with gzip.open(cpe_path + ".gz", 'rb') as cpe_gz:
|
with gzip.open(f"{cpe_path}.gz", 'rb') as cpe_gz:
|
||||||
with open(cpe_path, 'wb') as cpe_xml:
|
with open(cpe_path, 'wb') as cpe_xml:
|
||||||
shutil.copyfileobj(cpe_gz, cpe_xml)
|
shutil.copyfileobj(cpe_gz, cpe_xml)
|
||||||
os.remove(cpe_path + ".gz")
|
os.remove(f"{cpe_path}.gz")
|
||||||
except (FileNotFoundError, PermissionError) as e:
|
except (FileNotFoundError, PermissionError) as e:
|
||||||
print(e)
|
print(e)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
elif os.path.isfile(cpe_path):
|
elif os.path.isfile(cpe_path):
|
||||||
print("Using existing file {} ...".format(cpe_path))
|
print(f"Using existing file {cpe_path} ...")
|
||||||
|
|
||||||
if rdb.dbsize() > 0:
|
if rdb.dbsize() > 0:
|
||||||
print("Flushing {} keys from the database...".format(str(rdb.dbsize())))
|
print(f"Flushing {rdb.dbsize()} keys from the database...")
|
||||||
rdb.flushdb()
|
rdb.flushdb()
|
||||||
|
|
||||||
print("Populating the database (please be patient)...")
|
print("Populating the database (please be patient)...")
|
||||||
|
@ -128,4 +131,4 @@ if __name__ == '__main__':
|
||||||
Handler = CPEHandler()
|
Handler = CPEHandler()
|
||||||
parser.setContentHandler( Handler )
|
parser.setContentHandler( Handler )
|
||||||
parser.parse(cpe_path)
|
parser.parse(cpe_path)
|
||||||
print("Done! {} keys inserted.".format(str(rdb.dbsize())))
|
print(f"Done! {rdb.dbsize()} keys inserted.")
|
||||||
|
|
|
@ -5,17 +5,17 @@ import os
|
||||||
import sys
|
import sys
|
||||||
import falcon
|
import falcon
|
||||||
from wsgiref.simple_server import make_server
|
from wsgiref.simple_server import make_server
|
||||||
import requests
|
|
||||||
from datetime import datetime
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
port = 8000
|
||||||
|
|
||||||
runPath = os.path.dirname(os.path.realpath(__file__))
|
runPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
sys.path.append(os.path.join(runPath, ".."))
|
sys.path.append(os.path.join(runPath, ".."))
|
||||||
from lib.cpeguesser import CPEGuesser
|
from lib.cpeguesser import CPEGuesser
|
||||||
|
|
||||||
class Search():
|
class Search():
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
ret = []
|
|
||||||
data_post = req.bounded_stream.read()
|
data_post = req.bounded_stream.read()
|
||||||
js = data_post.decode('utf-8')
|
js = data_post.decode('utf-8')
|
||||||
try:
|
try:
|
||||||
|
@ -39,6 +39,12 @@ if __name__ == '__main__':
|
||||||
app = falcon.App()
|
app = falcon.App()
|
||||||
app.add_route('/search', Search())
|
app.add_route('/search', Search())
|
||||||
|
|
||||||
with make_server('', 8000, app) as httpd:
|
try:
|
||||||
print('Serving on port 8000...')
|
with make_server('', port, app) as httpd:
|
||||||
|
print(f"Serving on port {port}...")
|
||||||
httpd.serve_forever()
|
httpd.serve_forever()
|
||||||
|
except OSError as e:
|
||||||
|
print (e)
|
||||||
|
sys.exit(1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
sys.exit(0)
|
||||||
|
|
|
@ -10,7 +10,7 @@ class CPEGuesser():
|
||||||
def guessCpe(self, words):
|
def guessCpe(self, words):
|
||||||
k=[]
|
k=[]
|
||||||
for keyword in words:
|
for keyword in words:
|
||||||
k.append('w:{}'.format(keyword.lower()))
|
k.append(f"w:{keyword.lower()}")
|
||||||
|
|
||||||
maxinter = len(k)
|
maxinter = len(k)
|
||||||
cpes = []
|
cpes = []
|
||||||
|
|
Loading…
Reference in a new issue