Merge pull request #2 from oh2fih/main

CPE guessing logic & database access as a class.
This commit is contained in:
Alexandre Dulaunoy 2021-09-18 23:34:57 +02:00 committed by GitHub
commit f40a5630ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 56 deletions

View file

@ -1,10 +1,16 @@
import redis #!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse import argparse
import os
import sys import sys
import json import json
rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True) runPath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(runPath, ".."))
from lib.cpeguesser import CPEGuesser
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Find potential CPE names from a list of keyword(s) and return a JSON of the results') parser = argparse.ArgumentParser(description='Find potential CPE names from a list of keyword(s) and return a JSON of the results')
parser.add_argument('--word', help='One or more keyword(s) to lookup', action='append') parser.add_argument('--word', help='One or more keyword(s) to lookup', action='append')
args = parser.parse_args() args = parser.parse_args()
@ -14,23 +20,5 @@ if args.word is None:
parser.print_help() parser.print_help()
sys.exit(1) sys.exit(1)
k=[] cpeGuesser = CPEGuesser()
for keyword in args.word: print(json.dumps(cpeGuesser.guessCpe(args.word)))
k.append('w:{}'.format(keyword.lower()))
maxinter = len(k)
cpes = []
for x in reversed(range(maxinter)):
ret = rdb.sinter(k[x])
cpes.append(list(ret))
result = set(cpes[0]).intersection(*cpes)
ranked = []
for cpe in result:
rank = rdb.zrank('rank:cpe', cpe)
ranked.append((rank, cpe))
print(json.dumps(sorted(ranked)))

View file

@ -1,51 +1,44 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import falcon import falcon
from wsgiref.simple_server import make_server from wsgiref.simple_server import make_server
import requests import requests
import redis
from datetime import datetime from datetime import datetime
import json import json
rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True) runPath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(runPath, ".."))
from lib.cpeguesser import CPEGuesser
class Search(): class Search():
def on_post(self, req, resp): def on_post(self, req, resp):
ret = [] ret = []
data_post = req.bounded_stream.read() data_post = req.bounded_stream.read()
js = data_post.decode('utf-8') js = data_post.decode('utf-8')
try:
q = json.loads(js) q = json.loads(js)
except ValueError:
resp.status = falcon.HTTP_400
resp.media = "Missing query array or incorrect JSON format"
return
if 'query' in q: if 'query' in q:
pass pass
else: else:
resp.status = falcon.HTTP_500 resp.status = falcon.HTTP_400
resp.media = "Missing query array or incorrect JSON format" resp.media = "Missing query array or incorrect JSON format"
return return
k=[] cpeGuesser = CPEGuesser()
for keyword in q['query']: resp.media=cpeGuesser.guessCpe(q['query'])
k.append('w:{}'.format(keyword.lower()))
maxinter = len(k)
cpes = []
for x in reversed(range(maxinter)):
ret = rdb.sinter(k[x])
cpes.append(list(ret))
result = set(cpes[0]).intersection(*cpes)
ranked = []
for cpe in result:
rank = rdb.zrank('rank:cpe', cpe)
ranked.append((rank, cpe))
resp.media=sorted(ranked)
if __name__ == '__main__':
app = falcon.App() app = falcon.App()
app.add_route('/search', Search()) app.add_route('/search', Search())
if __name__ == '__main__':
with make_server('', 8000, app) as httpd: with make_server('', 8000, app) as httpd:
print('Serving on port 8000...') print('Serving on port 8000...')
httpd.serve_forever() httpd.serve_forever()

28
lib/cpeguesser.py Normal file
View file

@ -0,0 +1,28 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import redis
class CPEGuesser():
def __init__(self):
self.rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True)
def guessCpe(self, words):
k=[]
for keyword in words:
k.append('w:{}'.format(keyword.lower()))
maxinter = len(k)
cpes = []
for x in reversed(range(maxinter)):
ret = self.rdb.sinter(k[x])
cpes.append(list(ret))
result = set(cpes[0]).intersection(*cpes)
ranked = []
for cpe in result:
rank = self.rdb.zrank('rank:cpe', cpe)
ranked.append((rank, cpe))
return sorted(ranked)