From b6bc102714d159c283abd7d2f57d045691d9416e Mon Sep 17 00:00:00 2001 From: Esa Jokinen Date: Sat, 18 Sep 2021 14:32:31 +0300 Subject: [PATCH] CPE guessing logic & database access as a class. --- bin/lookup.py | 46 +++++++++++++++++----------------------------- bin/server.py | 47 ++++++++++++++++++++--------------------------- lib/cpeguesser.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 56 deletions(-) create mode 100644 lib/cpeguesser.py diff --git a/bin/lookup.py b/bin/lookup.py index 6dc450b..de1e64f 100644 --- a/bin/lookup.py +++ b/bin/lookup.py @@ -1,36 +1,24 @@ -import redis +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + import argparse +import os import sys import json -rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True) +runPath = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(runPath, "..")) +from lib.cpeguesser import CPEGuesser -parser = argparse.ArgumentParser(description='Find potential CPE names from a list of keyword(s) and return a JSON of the results') -parser.add_argument('--word', help='One or more keyword(s) to lookup', action='append') -args = parser.parse_args() +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Find potential CPE names from a list of keyword(s) and return a JSON of the results') + parser.add_argument('--word', help='One or more keyword(s) to lookup', action='append') + args = parser.parse_args() -if args.word is None: - print("Missing keyword(s)") - parser.print_help() - sys.exit(1) + if args.word is None: + print("Missing keyword(s)") + parser.print_help() + sys.exit(1) -k=[] -for keyword in args.word: - k.append('w:{}'.format(keyword.lower())) - -maxinter = len(k) -cpes = [] -for x in reversed(range(maxinter)): - ret = rdb.sinter(k[x]) - cpes.append(list(ret)) - - -result = set(cpes[0]).intersection(*cpes) - -ranked = [] - -for cpe in result: - rank = rdb.zrank('rank:cpe', cpe) - ranked.append((rank, cpe)) - -print(json.dumps(sorted(ranked))) + cpeGuesser = CPEGuesser() + print(json.dumps(cpeGuesser.guessCpe(args.word))) diff --git a/bin/server.py b/bin/server.py index 8274062..62f01cd 100644 --- a/bin/server.py +++ b/bin/server.py @@ -1,51 +1,44 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +import sys import falcon from wsgiref.simple_server import make_server import requests -import redis from datetime import datetime import json -rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True) - +runPath = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(os.path.join(runPath, "..")) +from lib.cpeguesser import CPEGuesser class Search(): def on_post(self, req, resp): ret = [] data_post = req.bounded_stream.read() js = data_post.decode('utf-8') - q = json.loads(js) + try: + q = json.loads(js) + except ValueError: + resp.status = falcon.HTTP_400 + resp.media = "Missing query array or incorrect JSON format" + return if 'query' in q: pass else: - resp.status = falcon.HTTP_500 + resp.status = falcon.HTTP_400 resp.media = "Missing query array or incorrect JSON format" return - k=[] - for keyword in q['query']: - k.append('w:{}'.format(keyword.lower())) - - maxinter = len(k) - cpes = [] - for x in reversed(range(maxinter)): - ret = rdb.sinter(k[x]) - cpes.append(list(ret)) - result = set(cpes[0]).intersection(*cpes) - - ranked = [] - - for cpe in result: - rank = rdb.zrank('rank:cpe', cpe) - ranked.append((rank, cpe)) - - resp.media=sorted(ranked) - -app = falcon.App() -app.add_route('/search', Search()) + cpeGuesser = CPEGuesser() + resp.media=cpeGuesser.guessCpe(q['query']) if __name__ == '__main__': + app = falcon.App() + app.add_route('/search', Search()) + with make_server('', 8000, app) as httpd: print('Serving on port 8000...') httpd.serve_forever() - diff --git a/lib/cpeguesser.py b/lib/cpeguesser.py new file mode 100644 index 0000000..03c7783 --- /dev/null +++ b/lib/cpeguesser.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import redis + +class CPEGuesser(): + def __init__(self): + self.rdb = redis.Redis(host='127.0.0.1', port=6379, db=8, decode_responses=True) + + def guessCpe(self, words): + k=[] + for keyword in words: + k.append('w:{}'.format(keyword.lower())) + + maxinter = len(k) + cpes = [] + for x in reversed(range(maxinter)): + ret = self.rdb.sinter(k[x]) + cpes.append(list(ret)) + result = set(cpes[0]).intersection(*cpes) + + ranked = [] + + for cpe in result: + rank = self.rdb.zrank('rank:cpe', cpe) + ranked.append((rank, cpe)) + + return sorted(ranked)