Merge pull request #4 from oh2fih/main

Implement the import process fully in Python
This commit is contained in:
Alexandre Dulaunoy 2021-09-21 20:17:48 +02:00 committed by GitHub
commit da7f755f72
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 15 deletions

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
bin/__pycache__/
lib/__pycache__/
data/*.xml
data/*.gz

View file

@ -16,12 +16,9 @@ the software with `lookup.py` to find the most probable CPE matching the keyword
### Installation ### Installation
- `git clone https://github.com/cve-search/cpe-guesser.git` - `git clone https://github.com/cve-search/cpe-guesser.git`
- `cd cpe-guesser/data` - Download the CPE dictionary & populate the database with `python3 cpe-guesser/bin/import.py`.
- `chmod +x dump.sh` - Take a cup of black or green tea.
- `cd ../bin` - `python3 cpe-guesser/bin/server.py` to run the local HTTP server.
- `python3 import.py`
- Take a cup of black or green tea
- `python3 server.py` to run the local HTTP server
If you don't want to install it locally, there is a public online version. Check below. If you don't want to install it locally, there is a public online version. Check below.

View file

@ -1,6 +1,18 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
import os
import urllib.request
import gzip
import shutil
import xml.sax import xml.sax
import redis import redis
# Configuration
cpe_path = '../data/official-cpe-dictionary_v2.3.xml'
cpe_source = 'https://nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz'
rdb = redis.Redis(host='127.0.0.1', port=6379, db=8) rdb = redis.Redis(host='127.0.0.1', port=6379, db=8)
class CPEHandler( xml.sax.ContentHandler ): class CPEHandler( xml.sax.ContentHandler ):
@ -11,6 +23,8 @@ class CPEHandler( xml.sax.ContentHandler ):
self.cpe = "" self.cpe = ""
self.record = {} self.record = {}
self.refs = [] self.refs = []
self.itemcount = 0
self.wordcount = 0
def startElement(self, tag, attributes): def startElement(self, tag, attributes):
self.CurrentData = tag self.CurrentData = tag
@ -37,9 +51,14 @@ class CPEHandler( xml.sax.ContentHandler ):
to_insert = CPEExtractor(cpe=self.record['cpe-23']) to_insert = CPEExtractor(cpe=self.record['cpe-23'])
for word in canonize(to_insert['vendor']): for word in canonize(to_insert['vendor']):
insert( word=word, cpe=to_insert['cpeline'] ) insert( word=word, cpe=to_insert['cpeline'] )
self.wordcount += 1
for word in canonize(to_insert['product']): for word in canonize(to_insert['product']):
insert( word=word, cpe=to_insert['cpeline'] ) insert( word=word, cpe=to_insert['cpeline'] )
self.wordcount += 1
self.record = {} self.record = {}
self.itemcount += 1
if self.itemcount % 5000 == 0:
print ("... " + str(self.itemcount) + " items processed (" + str(self.wordcount) + " words)")
def CPEExtractor( cpe=None ): def CPEExtractor( cpe=None ):
@ -67,10 +86,46 @@ def insert( word=None, cpe=None):
rdb.zadd('s:{}'.format(word), {cpe: 1}, incr=True) rdb.zadd('s:{}'.format(word), {cpe: 1}, incr=True)
rdb.zadd('rank:cpe', {cpe: 1}, incr=True) rdb.zadd('rank:cpe', {cpe: 1}, incr=True)
cpe_path = '../data/official-cpe-dictionary_v2.3.xml'
parser = xml.sax.make_parser() if __name__ == '__main__':
argparser = argparse.ArgumentParser(description='Initializes the Redis database with CPE dictionary.')
argparser.add_argument('--download', '-d', action='count', default=0, help='Download the CPE dictionary even if it already exists.')
argparser.add_argument('--replace', '-r', action='count', default=0, help='Flush and repopulated the CPE database.')
args = argparser.parse_args()
Handler = CPEHandler() if args.replace == 0 and rdb.dbsize() > 0:
parser.setContentHandler( Handler ) print("Warning! The Redis database already has " + str(rdb.dbsize()) + " keys.")
parser.parse(cpe_path) print("Use --replace if you want to flush the database and repopulate it.")
sys.exit(1)
if args.download > 0 or not os.path.isfile(cpe_path):
print("Downloading CPE data from " + cpe_source + " ...")
try:
urllib.request.urlretrieve(cpe_source, cpe_path + ".gz")
except (urllib.error.HTTPError, urllib.error.URLError, FileNotFoundError, PermissionError) as e:
print(e)
sys.exit(1)
print("Uncompressing " + cpe_path + ".gz ...")
try:
with gzip.open(cpe_path + ".gz", 'rb') as cpe_gz:
with open(cpe_path, 'wb') as cpe_xml:
shutil.copyfileobj(cpe_gz, cpe_xml)
os.remove(cpe_path + ".gz")
except (FileNotFoundError, PermissionError) as e:
print(e)
sys.exit(1)
elif os.path.isfile(cpe_path):
print("Using existing file " + cpe_path + " ...")
if rdb.dbsize() > 0:
print("Flushing " + str(rdb.dbsize()) + " keys from the database...")
rdb.flushdb()
print("Populating the database (please be patient)...")
parser = xml.sax.make_parser()
Handler = CPEHandler()
parser.setContentHandler( Handler )
parser.parse(cpe_path)
print("Done! " + str(rdb.dbsize()) + " keys inserted.")

0
data/.gitkeep Normal file
View file

View file

@ -1,4 +0,0 @@
#!/bin/sh
wget https://nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz
gzip -d official-cpe-dictionary_v2.3.xml.gz