diff --git a/.github/workflows/black.yml b/.github/workflows/black.yml new file mode 100644 index 0000000..5183a45 --- /dev/null +++ b/.github/workflows/black.yml @@ -0,0 +1,20 @@ +name: Black formatting +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + black: + name: Black + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check Black formatting for Python scripts + uses: psf/black@stable + with: + options: --check --diff --verbose + src: . diff --git a/.github/workflows/shell.yml b/.github/workflows/shell.yml new file mode 100644 index 0000000..58c5370 --- /dev/null +++ b/.github/workflows/shell.yml @@ -0,0 +1,22 @@ +name: ShellCheck +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + shellcheck: + name: ShellCheck + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Run ShellCheck for shell scripts + uses: ludeeus/action-shellcheck@master + with: + severity: style + scandir: . + format: gcc + version: stable diff --git a/bin/import.py b/bin/import.py index ed947e6..3e8d417 100644 --- a/bin/import.py +++ b/bin/import.py @@ -13,7 +13,7 @@ import time from dynaconf import Dynaconf # Configuration -settings = Dynaconf(settings_files=['../config/settings.yaml']) +settings = Dynaconf(settings_files=["../config/settings.yaml"]) cpe_path = settings.cpe.path cpe_source = settings.cpe.source rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8) @@ -33,32 +33,32 @@ class CPEHandler(xml.sax.ContentHandler): def startElement(self, tag, attributes): self.CurrentData = tag - if tag == 'cpe-23:cpe23-item': - self.record['cpe-23'] = attributes['name'] - if tag == 'title': + if tag == "cpe-23:cpe23-item": + self.record["cpe-23"] = attributes["name"] + if tag == "title": self.title_seen = True - if tag == 'reference': - self.refs.append(attributes['href']) + if tag == "reference": + self.refs.append(attributes["href"]) def characters(self, data): if self.title_seen: self.title = self.title + data def endElement(self, tag): - if tag == 'title': - self.record['title'] = self.title + if tag == "title": + self.record["title"] = self.title self.title = "" self.title_seen = False - if tag == 'references': - self.record['refs'] = self.refs + if tag == "references": + self.record["refs"] = self.refs self.refs = [] - if tag == 'cpe-item': - to_insert = CPEExtractor(cpe=self.record['cpe-23']) - for word in canonize(to_insert['vendor']): - insert(word=word, cpe=to_insert['cpeline']) + if tag == "cpe-item": + to_insert = CPEExtractor(cpe=self.record["cpe-23"]) + for word in canonize(to_insert["vendor"]): + insert(word=word, cpe=to_insert["cpeline"]) self.wordcount += 1 - for word in canonize(to_insert['product']): - insert(word=word, cpe=to_insert['cpeline']) + for word in canonize(to_insert["product"]): + insert(word=word, cpe=to_insert["cpeline"]) self.wordcount += 1 self.record = {} self.itemcount += 1 @@ -74,18 +74,18 @@ def CPEExtractor(cpe=None): return False record = {} cpefield = cpe.split(":") - record['vendor'] = cpefield[3] - record['product'] = cpefield[4] + record["vendor"] = cpefield[3] + record["product"] = cpefield[4] cpeline = "" for cpeentry in cpefield[:5]: cpeline = f"{cpeline}:{cpeentry}" - record['cpeline'] = cpeline[1:] + record["cpeline"] = cpeline[1:] return record def canonize(value=None): value = value.lower() - words = value.split('_') + words = value.split("_") return words @@ -97,30 +97,30 @@ def insert(word=None, cpe=None): rdb.zadd("rank:cpe", {cpe: 1}, incr=True) -if __name__ == '__main__': +if __name__ == "__main__": argparser = argparse.ArgumentParser( - description='Initializes the Redis database with CPE dictionary.' + description="Initializes the Redis database with CPE dictionary." ) argparser.add_argument( - '--download', - '-d', - action='count', + "--download", + "-d", + action="count", default=0, - help='Download the CPE dictionary even if it already exists.', + help="Download the CPE dictionary even if it already exists.", ) argparser.add_argument( - '--replace', - '-r', - action='count', + "--replace", + "-r", + action="count", default=0, - help='Flush and repopulated the CPE database.', + help="Flush and repopulated the CPE database.", ) argparser.add_argument( - '--update', - '-u', - action='store_true', + "--update", + "-u", + action="store_true", default=False, - help='Update the CPE database without flushing', + help="Update the CPE database without flushing", ) args = argparser.parse_args() @@ -144,8 +144,8 @@ if __name__ == '__main__': print(f"Uncompressing {cpe_path}.gz ...") try: - with gzip.open(f"{cpe_path}.gz", 'rb') as cpe_gz: - with open(cpe_path, 'wb') as cpe_xml: + with gzip.open(f"{cpe_path}.gz", "rb") as cpe_gz: + with open(cpe_path, "wb") as cpe_xml: shutil.copyfileobj(cpe_gz, cpe_xml) os.remove(f"{cpe_path}.gz") except (FileNotFoundError, PermissionError) as e: diff --git a/bin/lookup.py b/bin/lookup.py index 49a17dc..ddbef29 100644 --- a/bin/lookup.py +++ b/bin/lookup.py @@ -10,16 +10,16 @@ runPath = os.path.dirname(os.path.realpath(__file__)) sys.path.append(os.path.join(runPath, "..")) from lib.cpeguesser import CPEGuesser -if __name__ == '__main__': +if __name__ == "__main__": parser = argparse.ArgumentParser( - description='Find potential CPE names from a list of keyword(s) and return a JSON of the results' + description="Find potential CPE names from a list of keyword(s) and return a JSON of the results" ) parser.add_argument( - 'word', - metavar='WORD', + "word", + metavar="WORD", type=str, - nargs='+', - help='One or more keyword(s) to lookup', + nargs="+", + help="One or more keyword(s) to lookup", ) args = parser.parse_args() diff --git a/bin/server.py b/bin/server.py index 09bdffb..5a0459f 100644 --- a/bin/server.py +++ b/bin/server.py @@ -9,7 +9,7 @@ import json from dynaconf import Dynaconf # Configuration -settings = Dynaconf(settings_files=['../config/settings.yaml']) +settings = Dynaconf(settings_files=["../config/settings.yaml"]) port = settings.server.port runPath = os.path.dirname(os.path.realpath(__file__)) @@ -20,7 +20,7 @@ from lib.cpeguesser import CPEGuesser class Search: def on_post(self, req, resp): data_post = req.bounded_stream.read() - js = data_post.decode('utf-8') + js = data_post.decode("utf-8") try: q = json.loads(js) except ValueError: @@ -28,7 +28,7 @@ class Search: resp.media = "Missing query array or incorrect JSON format" return - if 'query' in q: + if "query" in q: pass else: resp.status = falcon.HTTP_400 @@ -36,15 +36,15 @@ class Search: return cpeGuesser = CPEGuesser() - resp.media = cpeGuesser.guessCpe(q['query']) + resp.media = cpeGuesser.guessCpe(q["query"]) -if __name__ == '__main__': +if __name__ == "__main__": app = falcon.App() - app.add_route('/search', Search()) + app.add_route("/search", Search()) try: - with make_server('', port, app) as httpd: + with make_server("", port, app) as httpd: print(f"Serving on port {port}...") httpd.serve_forever() except OSError as e: diff --git a/lib/cpeguesser.py b/lib/cpeguesser.py index cbc9789..b917dc2 100644 --- a/lib/cpeguesser.py +++ b/lib/cpeguesser.py @@ -5,13 +5,17 @@ import redis from dynaconf import Dynaconf # Configuration -settings = Dynaconf( - settings_files=['../config/settings.yaml'] -) +settings = Dynaconf(settings_files=["../config/settings.yaml"]) + class CPEGuesser: def __init__(self): - self.rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8, decode_responses=True) + self.rdb = redis.Redis( + host=settings.redis.host, + port=settings.redis.port, + db=8, + decode_responses=True, + ) def guessCpe(self, words): k = [] @@ -28,7 +32,7 @@ class CPEGuesser: ranked = [] for cpe in result: - rank = self.rdb.zrank('rank:cpe', cpe) + rank = self.rdb.zrank("rank:cpe", cpe) ranked.append((rank, cpe)) return sorted(ranked)