Merge pull request #11 from oh2fih/workflows

Add ShellCheck & Black formatting workflows
This commit is contained in:
Alexandre Dulaunoy 2024-04-15 08:06:15 +02:00 committed by GitHub
commit 6dbf87b4ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 100 additions and 54 deletions

20
.github/workflows/black.yml vendored Normal file
View file

@ -0,0 +1,20 @@
name: Black formatting
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
black:
name: Black
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check Black formatting for Python scripts
uses: psf/black@stable
with:
options: --check --diff --verbose
src: .

22
.github/workflows/shell.yml vendored Normal file
View file

@ -0,0 +1,22 @@
name: ShellCheck
on:
push:
branches:
- main
pull_request:
branches:
- main
jobs:
shellcheck:
name: ShellCheck
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run ShellCheck for shell scripts
uses: ludeeus/action-shellcheck@master
with:
severity: style
scandir: .
format: gcc
version: stable

View file

@ -13,7 +13,7 @@ import time
from dynaconf import Dynaconf from dynaconf import Dynaconf
# Configuration # Configuration
settings = Dynaconf(settings_files=['../config/settings.yaml']) settings = Dynaconf(settings_files=["../config/settings.yaml"])
cpe_path = settings.cpe.path cpe_path = settings.cpe.path
cpe_source = settings.cpe.source cpe_source = settings.cpe.source
rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8) rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8)
@ -33,32 +33,32 @@ class CPEHandler(xml.sax.ContentHandler):
def startElement(self, tag, attributes): def startElement(self, tag, attributes):
self.CurrentData = tag self.CurrentData = tag
if tag == 'cpe-23:cpe23-item': if tag == "cpe-23:cpe23-item":
self.record['cpe-23'] = attributes['name'] self.record["cpe-23"] = attributes["name"]
if tag == 'title': if tag == "title":
self.title_seen = True self.title_seen = True
if tag == 'reference': if tag == "reference":
self.refs.append(attributes['href']) self.refs.append(attributes["href"])
def characters(self, data): def characters(self, data):
if self.title_seen: if self.title_seen:
self.title = self.title + data self.title = self.title + data
def endElement(self, tag): def endElement(self, tag):
if tag == 'title': if tag == "title":
self.record['title'] = self.title self.record["title"] = self.title
self.title = "" self.title = ""
self.title_seen = False self.title_seen = False
if tag == 'references': if tag == "references":
self.record['refs'] = self.refs self.record["refs"] = self.refs
self.refs = [] self.refs = []
if tag == 'cpe-item': if tag == "cpe-item":
to_insert = CPEExtractor(cpe=self.record['cpe-23']) to_insert = CPEExtractor(cpe=self.record["cpe-23"])
for word in canonize(to_insert['vendor']): for word in canonize(to_insert["vendor"]):
insert(word=word, cpe=to_insert['cpeline']) insert(word=word, cpe=to_insert["cpeline"])
self.wordcount += 1 self.wordcount += 1
for word in canonize(to_insert['product']): for word in canonize(to_insert["product"]):
insert(word=word, cpe=to_insert['cpeline']) insert(word=word, cpe=to_insert["cpeline"])
self.wordcount += 1 self.wordcount += 1
self.record = {} self.record = {}
self.itemcount += 1 self.itemcount += 1
@ -74,18 +74,18 @@ def CPEExtractor(cpe=None):
return False return False
record = {} record = {}
cpefield = cpe.split(":") cpefield = cpe.split(":")
record['vendor'] = cpefield[3] record["vendor"] = cpefield[3]
record['product'] = cpefield[4] record["product"] = cpefield[4]
cpeline = "" cpeline = ""
for cpeentry in cpefield[:5]: for cpeentry in cpefield[:5]:
cpeline = f"{cpeline}:{cpeentry}" cpeline = f"{cpeline}:{cpeentry}"
record['cpeline'] = cpeline[1:] record["cpeline"] = cpeline[1:]
return record return record
def canonize(value=None): def canonize(value=None):
value = value.lower() value = value.lower()
words = value.split('_') words = value.split("_")
return words return words
@ -97,30 +97,30 @@ def insert(word=None, cpe=None):
rdb.zadd("rank:cpe", {cpe: 1}, incr=True) rdb.zadd("rank:cpe", {cpe: 1}, incr=True)
if __name__ == '__main__': if __name__ == "__main__":
argparser = argparse.ArgumentParser( argparser = argparse.ArgumentParser(
description='Initializes the Redis database with CPE dictionary.' description="Initializes the Redis database with CPE dictionary."
) )
argparser.add_argument( argparser.add_argument(
'--download', "--download",
'-d', "-d",
action='count', action="count",
default=0, default=0,
help='Download the CPE dictionary even if it already exists.', help="Download the CPE dictionary even if it already exists.",
) )
argparser.add_argument( argparser.add_argument(
'--replace', "--replace",
'-r', "-r",
action='count', action="count",
default=0, default=0,
help='Flush and repopulated the CPE database.', help="Flush and repopulated the CPE database.",
) )
argparser.add_argument( argparser.add_argument(
'--update', "--update",
'-u', "-u",
action='store_true', action="store_true",
default=False, default=False,
help='Update the CPE database without flushing', help="Update the CPE database without flushing",
) )
args = argparser.parse_args() args = argparser.parse_args()
@ -144,8 +144,8 @@ if __name__ == '__main__':
print(f"Uncompressing {cpe_path}.gz ...") print(f"Uncompressing {cpe_path}.gz ...")
try: try:
with gzip.open(f"{cpe_path}.gz", 'rb') as cpe_gz: with gzip.open(f"{cpe_path}.gz", "rb") as cpe_gz:
with open(cpe_path, 'wb') as cpe_xml: with open(cpe_path, "wb") as cpe_xml:
shutil.copyfileobj(cpe_gz, cpe_xml) shutil.copyfileobj(cpe_gz, cpe_xml)
os.remove(f"{cpe_path}.gz") os.remove(f"{cpe_path}.gz")
except (FileNotFoundError, PermissionError) as e: except (FileNotFoundError, PermissionError) as e:

View file

@ -10,16 +10,16 @@ runPath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(runPath, "..")) sys.path.append(os.path.join(runPath, ".."))
from lib.cpeguesser import CPEGuesser from lib.cpeguesser import CPEGuesser
if __name__ == '__main__': if __name__ == "__main__":
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Find potential CPE names from a list of keyword(s) and return a JSON of the results' description="Find potential CPE names from a list of keyword(s) and return a JSON of the results"
) )
parser.add_argument( parser.add_argument(
'word', "word",
metavar='WORD', metavar="WORD",
type=str, type=str,
nargs='+', nargs="+",
help='One or more keyword(s) to lookup', help="One or more keyword(s) to lookup",
) )
args = parser.parse_args() args = parser.parse_args()

View file

@ -9,7 +9,7 @@ import json
from dynaconf import Dynaconf from dynaconf import Dynaconf
# Configuration # Configuration
settings = Dynaconf(settings_files=['../config/settings.yaml']) settings = Dynaconf(settings_files=["../config/settings.yaml"])
port = settings.server.port port = settings.server.port
runPath = os.path.dirname(os.path.realpath(__file__)) runPath = os.path.dirname(os.path.realpath(__file__))
@ -20,7 +20,7 @@ from lib.cpeguesser import CPEGuesser
class Search: class Search:
def on_post(self, req, resp): def on_post(self, req, resp):
data_post = req.bounded_stream.read() data_post = req.bounded_stream.read()
js = data_post.decode('utf-8') js = data_post.decode("utf-8")
try: try:
q = json.loads(js) q = json.loads(js)
except ValueError: except ValueError:
@ -28,7 +28,7 @@ class Search:
resp.media = "Missing query array or incorrect JSON format" resp.media = "Missing query array or incorrect JSON format"
return return
if 'query' in q: if "query" in q:
pass pass
else: else:
resp.status = falcon.HTTP_400 resp.status = falcon.HTTP_400
@ -36,15 +36,15 @@ class Search:
return return
cpeGuesser = CPEGuesser() cpeGuesser = CPEGuesser()
resp.media = cpeGuesser.guessCpe(q['query']) resp.media = cpeGuesser.guessCpe(q["query"])
if __name__ == '__main__': if __name__ == "__main__":
app = falcon.App() app = falcon.App()
app.add_route('/search', Search()) app.add_route("/search", Search())
try: try:
with make_server('', port, app) as httpd: with make_server("", port, app) as httpd:
print(f"Serving on port {port}...") print(f"Serving on port {port}...")
httpd.serve_forever() httpd.serve_forever()
except OSError as e: except OSError as e:

View file

@ -5,13 +5,17 @@ import redis
from dynaconf import Dynaconf from dynaconf import Dynaconf
# Configuration # Configuration
settings = Dynaconf( settings = Dynaconf(settings_files=["../config/settings.yaml"])
settings_files=['../config/settings.yaml']
)
class CPEGuesser: class CPEGuesser:
def __init__(self): def __init__(self):
self.rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8, decode_responses=True) self.rdb = redis.Redis(
host=settings.redis.host,
port=settings.redis.port,
db=8,
decode_responses=True,
)
def guessCpe(self, words): def guessCpe(self, words):
k = [] k = []
@ -28,7 +32,7 @@ class CPEGuesser:
ranked = [] ranked = []
for cpe in result: for cpe in result:
rank = self.rdb.zrank('rank:cpe', cpe) rank = self.rdb.zrank("rank:cpe", cpe)
ranked.append((rank, cpe)) ranked.append((rank, cpe))
return sorted(ranked) return sorted(ranked)