cpe-guesser/bin/import.py

168 lines
4.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
import os
import urllib.request
import gzip
import shutil
import xml.sax
import valkey
import time
2023-07-08 18:50:49 +00:00
from dynaconf import Dynaconf
# Configuration
2024-04-05 14:03:23 +00:00
settings = Dynaconf(settings_files=["../config/settings.yaml"])
2023-07-08 18:50:49 +00:00
cpe_path = settings.cpe.path
2023-08-09 13:21:05 +00:00
cpe_source = settings.cpe.source
rdb = valkey.Valkey(host=settings.valkey.host, port=settings.valkey.port, db=8)
2021-10-19 16:30:29 +00:00
2023-08-09 13:21:05 +00:00
2021-10-19 16:30:29 +00:00
class CPEHandler(xml.sax.ContentHandler):
def __init__(self):
self.cpe = ""
self.title = ""
self.title_seen = False
self.cpe = ""
2021-10-19 16:30:29 +00:00
self.record = {}
self.refs = []
self.itemcount = 0
self.wordcount = 0
self.start_time = time.time()
def startElement(self, tag, attributes):
self.CurrentData = tag
2024-04-05 14:03:23 +00:00
if tag == "cpe-23:cpe23-item":
self.record["cpe-23"] = attributes["name"]
if tag == "title":
self.title_seen = True
2024-04-05 14:03:23 +00:00
if tag == "reference":
self.refs.append(attributes["href"])
def characters(self, data):
if self.title_seen:
self.title = self.title + data
def endElement(self, tag):
2024-04-05 14:03:23 +00:00
if tag == "title":
self.record["title"] = self.title
self.title = ""
self.title_seen = False
2024-04-05 14:03:23 +00:00
if tag == "references":
self.record["refs"] = self.refs
self.refs = []
2024-04-05 14:03:23 +00:00
if tag == "cpe-item":
to_insert = CPEExtractor(cpe=self.record["cpe-23"])
for word in canonize(to_insert["vendor"]):
insert(word=word, cpe=to_insert["cpeline"])
self.wordcount += 1
2024-04-05 14:03:23 +00:00
for word in canonize(to_insert["product"]):
insert(word=word, cpe=to_insert["cpeline"])
self.wordcount += 1
self.record = {}
self.itemcount += 1
if self.itemcount % 5000 == 0:
2021-10-19 16:30:29 +00:00
time_elapsed = round(time.time() - self.start_time)
print(
f"... {self.itemcount} items processed ({self.wordcount} words) in {time_elapsed} seconds"
)
2021-10-19 16:30:29 +00:00
def CPEExtractor(cpe=None):
if cpe is None:
return False
record = {}
cpefield = cpe.split(":")
2024-04-05 14:03:23 +00:00
record["vendor"] = cpefield[3]
record["product"] = cpefield[4]
cpeline = ""
for cpeentry in cpefield[:5]:
cpeline = f"{cpeline}:{cpeentry}"
2024-04-05 14:03:23 +00:00
record["cpeline"] = cpeline[1:]
return record
2021-10-19 16:30:29 +00:00
def canonize(value=None):
value = value.lower()
2024-04-05 14:03:23 +00:00
words = value.split("_")
return words
2021-10-19 16:30:29 +00:00
def insert(word=None, cpe=None):
if cpe is None or word is None:
return False
rdb.sadd(f"w:{word}", cpe)
rdb.zadd(f"s:{word}", {cpe: 1}, incr=True)
rdb.zadd("rank:cpe", {cpe: 1}, incr=True)
2024-04-05 14:03:23 +00:00
if __name__ == "__main__":
2021-10-19 16:30:29 +00:00
argparser = argparse.ArgumentParser(
2024-04-05 14:03:23 +00:00
description="Initializes the Redis database with CPE dictionary."
2021-10-19 16:30:29 +00:00
)
argparser.add_argument(
2024-04-05 14:03:23 +00:00
"--download",
"-d",
action="count",
2021-10-19 16:30:29 +00:00
default=0,
2024-04-05 14:03:23 +00:00
help="Download the CPE dictionary even if it already exists.",
2021-10-19 16:30:29 +00:00
)
argparser.add_argument(
2024-04-05 14:03:23 +00:00
"--replace",
"-r",
action="count",
2021-10-19 16:30:29 +00:00
default=0,
2024-04-05 14:03:23 +00:00
help="Flush and repopulated the CPE database.",
2021-10-19 16:30:29 +00:00
)
argparser.add_argument(
2024-04-05 14:03:23 +00:00
"--update",
"-u",
action="store_true",
2021-10-19 16:30:29 +00:00
default=False,
2024-04-05 14:03:23 +00:00
help="Update the CPE database without flushing",
2021-10-19 16:30:29 +00:00
)
args = argparser.parse_args()
if args.replace == 0 and rdb.dbsize() > 0 and not args.update:
print(f"Warning! The Redis database already has {rdb.dbsize()} keys.")
print("Use --replace if you want to flush the database and repopulate it.")
sys.exit(0)
if args.download > 0 or not os.path.isfile(cpe_path):
print(f"Downloading CPE data from {cpe_source} ...")
try:
urllib.request.urlretrieve(cpe_source, f"{cpe_path}.gz")
2021-10-19 16:30:29 +00:00
except (
urllib.error.HTTPError,
urllib.error.URLError,
FileNotFoundError,
PermissionError,
) as e:
print(e)
sys.exit(1)
print(f"Uncompressing {cpe_path}.gz ...")
try:
2024-04-05 14:03:23 +00:00
with gzip.open(f"{cpe_path}.gz", "rb") as cpe_gz:
with open(cpe_path, "wb") as cpe_xml:
shutil.copyfileobj(cpe_gz, cpe_xml)
os.remove(f"{cpe_path}.gz")
except (FileNotFoundError, PermissionError) as e:
print(e)
sys.exit(1)
elif os.path.isfile(cpe_path):
print(f"Using existing file {cpe_path} ...")
if rdb.dbsize() > 0 and not args.update:
print(f"Flushing {rdb.dbsize()} keys from the database...")
rdb.flushdb()
print("Populating the database (please be patient)...")
parser = xml.sax.make_parser()
Handler = CPEHandler()
2021-10-19 16:30:29 +00:00
parser.setContentHandler(Handler)
parser.parse(cpe_path)
print(f"Done! {rdb.dbsize()} keys inserted.")