mirror of
https://github.com/cve-search/cpe-guesser.git
synced 2024-11-24 15:57:22 +00:00
Black 24.3.0 formatting
This commit is contained in:
parent
51f35e2566
commit
2fa90cb3f8
4 changed files with 58 additions and 54 deletions
|
@ -13,7 +13,7 @@ import time
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
settings = Dynaconf(settings_files=['../config/settings.yaml'])
|
settings = Dynaconf(settings_files=["../config/settings.yaml"])
|
||||||
cpe_path = settings.cpe.path
|
cpe_path = settings.cpe.path
|
||||||
cpe_source = settings.cpe.source
|
cpe_source = settings.cpe.source
|
||||||
rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8)
|
rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8)
|
||||||
|
@ -33,32 +33,32 @@ class CPEHandler(xml.sax.ContentHandler):
|
||||||
|
|
||||||
def startElement(self, tag, attributes):
|
def startElement(self, tag, attributes):
|
||||||
self.CurrentData = tag
|
self.CurrentData = tag
|
||||||
if tag == 'cpe-23:cpe23-item':
|
if tag == "cpe-23:cpe23-item":
|
||||||
self.record['cpe-23'] = attributes['name']
|
self.record["cpe-23"] = attributes["name"]
|
||||||
if tag == 'title':
|
if tag == "title":
|
||||||
self.title_seen = True
|
self.title_seen = True
|
||||||
if tag == 'reference':
|
if tag == "reference":
|
||||||
self.refs.append(attributes['href'])
|
self.refs.append(attributes["href"])
|
||||||
|
|
||||||
def characters(self, data):
|
def characters(self, data):
|
||||||
if self.title_seen:
|
if self.title_seen:
|
||||||
self.title = self.title + data
|
self.title = self.title + data
|
||||||
|
|
||||||
def endElement(self, tag):
|
def endElement(self, tag):
|
||||||
if tag == 'title':
|
if tag == "title":
|
||||||
self.record['title'] = self.title
|
self.record["title"] = self.title
|
||||||
self.title = ""
|
self.title = ""
|
||||||
self.title_seen = False
|
self.title_seen = False
|
||||||
if tag == 'references':
|
if tag == "references":
|
||||||
self.record['refs'] = self.refs
|
self.record["refs"] = self.refs
|
||||||
self.refs = []
|
self.refs = []
|
||||||
if tag == 'cpe-item':
|
if tag == "cpe-item":
|
||||||
to_insert = CPEExtractor(cpe=self.record['cpe-23'])
|
to_insert = CPEExtractor(cpe=self.record["cpe-23"])
|
||||||
for word in canonize(to_insert['vendor']):
|
for word in canonize(to_insert["vendor"]):
|
||||||
insert(word=word, cpe=to_insert['cpeline'])
|
insert(word=word, cpe=to_insert["cpeline"])
|
||||||
self.wordcount += 1
|
self.wordcount += 1
|
||||||
for word in canonize(to_insert['product']):
|
for word in canonize(to_insert["product"]):
|
||||||
insert(word=word, cpe=to_insert['cpeline'])
|
insert(word=word, cpe=to_insert["cpeline"])
|
||||||
self.wordcount += 1
|
self.wordcount += 1
|
||||||
self.record = {}
|
self.record = {}
|
||||||
self.itemcount += 1
|
self.itemcount += 1
|
||||||
|
@ -74,18 +74,18 @@ def CPEExtractor(cpe=None):
|
||||||
return False
|
return False
|
||||||
record = {}
|
record = {}
|
||||||
cpefield = cpe.split(":")
|
cpefield = cpe.split(":")
|
||||||
record['vendor'] = cpefield[3]
|
record["vendor"] = cpefield[3]
|
||||||
record['product'] = cpefield[4]
|
record["product"] = cpefield[4]
|
||||||
cpeline = ""
|
cpeline = ""
|
||||||
for cpeentry in cpefield[:5]:
|
for cpeentry in cpefield[:5]:
|
||||||
cpeline = f"{cpeline}:{cpeentry}"
|
cpeline = f"{cpeline}:{cpeentry}"
|
||||||
record['cpeline'] = cpeline[1:]
|
record["cpeline"] = cpeline[1:]
|
||||||
return record
|
return record
|
||||||
|
|
||||||
|
|
||||||
def canonize(value=None):
|
def canonize(value=None):
|
||||||
value = value.lower()
|
value = value.lower()
|
||||||
words = value.split('_')
|
words = value.split("_")
|
||||||
return words
|
return words
|
||||||
|
|
||||||
|
|
||||||
|
@ -97,30 +97,30 @@ def insert(word=None, cpe=None):
|
||||||
rdb.zadd("rank:cpe", {cpe: 1}, incr=True)
|
rdb.zadd("rank:cpe", {cpe: 1}, incr=True)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
argparser = argparse.ArgumentParser(
|
argparser = argparse.ArgumentParser(
|
||||||
description='Initializes the Redis database with CPE dictionary.'
|
description="Initializes the Redis database with CPE dictionary."
|
||||||
)
|
)
|
||||||
argparser.add_argument(
|
argparser.add_argument(
|
||||||
'--download',
|
"--download",
|
||||||
'-d',
|
"-d",
|
||||||
action='count',
|
action="count",
|
||||||
default=0,
|
default=0,
|
||||||
help='Download the CPE dictionary even if it already exists.',
|
help="Download the CPE dictionary even if it already exists.",
|
||||||
)
|
)
|
||||||
argparser.add_argument(
|
argparser.add_argument(
|
||||||
'--replace',
|
"--replace",
|
||||||
'-r',
|
"-r",
|
||||||
action='count',
|
action="count",
|
||||||
default=0,
|
default=0,
|
||||||
help='Flush and repopulated the CPE database.',
|
help="Flush and repopulated the CPE database.",
|
||||||
)
|
)
|
||||||
argparser.add_argument(
|
argparser.add_argument(
|
||||||
'--update',
|
"--update",
|
||||||
'-u',
|
"-u",
|
||||||
action='store_true',
|
action="store_true",
|
||||||
default=False,
|
default=False,
|
||||||
help='Update the CPE database without flushing',
|
help="Update the CPE database without flushing",
|
||||||
)
|
)
|
||||||
args = argparser.parse_args()
|
args = argparser.parse_args()
|
||||||
|
|
||||||
|
@ -144,8 +144,8 @@ if __name__ == '__main__':
|
||||||
|
|
||||||
print(f"Uncompressing {cpe_path}.gz ...")
|
print(f"Uncompressing {cpe_path}.gz ...")
|
||||||
try:
|
try:
|
||||||
with gzip.open(f"{cpe_path}.gz", 'rb') as cpe_gz:
|
with gzip.open(f"{cpe_path}.gz", "rb") as cpe_gz:
|
||||||
with open(cpe_path, 'wb') as cpe_xml:
|
with open(cpe_path, "wb") as cpe_xml:
|
||||||
shutil.copyfileobj(cpe_gz, cpe_xml)
|
shutil.copyfileobj(cpe_gz, cpe_xml)
|
||||||
os.remove(f"{cpe_path}.gz")
|
os.remove(f"{cpe_path}.gz")
|
||||||
except (FileNotFoundError, PermissionError) as e:
|
except (FileNotFoundError, PermissionError) as e:
|
||||||
|
|
|
@ -10,16 +10,16 @@ runPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
sys.path.append(os.path.join(runPath, ".."))
|
sys.path.append(os.path.join(runPath, ".."))
|
||||||
from lib.cpeguesser import CPEGuesser
|
from lib.cpeguesser import CPEGuesser
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description='Find potential CPE names from a list of keyword(s) and return a JSON of the results'
|
description="Find potential CPE names from a list of keyword(s) and return a JSON of the results"
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
'word',
|
"word",
|
||||||
metavar='WORD',
|
metavar="WORD",
|
||||||
type=str,
|
type=str,
|
||||||
nargs='+',
|
nargs="+",
|
||||||
help='One or more keyword(s) to lookup',
|
help="One or more keyword(s) to lookup",
|
||||||
)
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ import json
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
settings = Dynaconf(settings_files=['../config/settings.yaml'])
|
settings = Dynaconf(settings_files=["../config/settings.yaml"])
|
||||||
port = settings.server.port
|
port = settings.server.port
|
||||||
|
|
||||||
runPath = os.path.dirname(os.path.realpath(__file__))
|
runPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
@ -20,7 +20,7 @@ from lib.cpeguesser import CPEGuesser
|
||||||
class Search:
|
class Search:
|
||||||
def on_post(self, req, resp):
|
def on_post(self, req, resp):
|
||||||
data_post = req.bounded_stream.read()
|
data_post = req.bounded_stream.read()
|
||||||
js = data_post.decode('utf-8')
|
js = data_post.decode("utf-8")
|
||||||
try:
|
try:
|
||||||
q = json.loads(js)
|
q = json.loads(js)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
@ -28,7 +28,7 @@ class Search:
|
||||||
resp.media = "Missing query array or incorrect JSON format"
|
resp.media = "Missing query array or incorrect JSON format"
|
||||||
return
|
return
|
||||||
|
|
||||||
if 'query' in q:
|
if "query" in q:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_400
|
||||||
|
@ -36,15 +36,15 @@ class Search:
|
||||||
return
|
return
|
||||||
|
|
||||||
cpeGuesser = CPEGuesser()
|
cpeGuesser = CPEGuesser()
|
||||||
resp.media = cpeGuesser.guessCpe(q['query'])
|
resp.media = cpeGuesser.guessCpe(q["query"])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
app = falcon.App()
|
app = falcon.App()
|
||||||
app.add_route('/search', Search())
|
app.add_route("/search", Search())
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with make_server('', port, app) as httpd:
|
with make_server("", port, app) as httpd:
|
||||||
print(f"Serving on port {port}...")
|
print(f"Serving on port {port}...")
|
||||||
httpd.serve_forever()
|
httpd.serve_forever()
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
|
|
|
@ -5,13 +5,17 @@ import redis
|
||||||
from dynaconf import Dynaconf
|
from dynaconf import Dynaconf
|
||||||
|
|
||||||
# Configuration
|
# Configuration
|
||||||
settings = Dynaconf(
|
settings = Dynaconf(settings_files=["../config/settings.yaml"])
|
||||||
settings_files=['../config/settings.yaml']
|
|
||||||
)
|
|
||||||
|
|
||||||
class CPEGuesser:
|
class CPEGuesser:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8, decode_responses=True)
|
self.rdb = redis.Redis(
|
||||||
|
host=settings.redis.host,
|
||||||
|
port=settings.redis.port,
|
||||||
|
db=8,
|
||||||
|
decode_responses=True,
|
||||||
|
)
|
||||||
|
|
||||||
def guessCpe(self, words):
|
def guessCpe(self, words):
|
||||||
k = []
|
k = []
|
||||||
|
@ -28,7 +32,7 @@ class CPEGuesser:
|
||||||
ranked = []
|
ranked = []
|
||||||
|
|
||||||
for cpe in result:
|
for cpe in result:
|
||||||
rank = self.rdb.zrank('rank:cpe', cpe)
|
rank = self.rdb.zrank("rank:cpe", cpe)
|
||||||
ranked.append((rank, cpe))
|
ranked.append((rank, cpe))
|
||||||
|
|
||||||
return sorted(ranked)
|
return sorted(ranked)
|
||||||
|
|
Loading…
Reference in a new issue