#!/usr/bin/env python3 from __future__ import annotations import logging from importlib.metadata import version from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse import requests def enable_full_debug() -> None: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True class PyVulnerabilityLookup(): def __init__(self, root_url: str, useragent: str | None=None, token: str | None=None, *, proxies: dict[str, str] | None=None) -> None: '''Query a specific instance. :param root_url: URL of the instance to query. :param useragent: The User Agent used by requests to run the HTTP requests against the vulnerability lookup instance :param proxies: The proxies to use to connect to the vulnerability lookup instance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies ''' self.root_url = root_url if not urlparse(self.root_url).scheme: self.root_url = 'http://' + self.root_url if not self.root_url.endswith('/'): self.root_url += '/' self.session = requests.session() self.session.headers['user-agent'] = useragent if useragent else f'PyVulnerabilityLookup / {version("pyvulnerabilitylookup")}' self.session.headers['X-API-KEY'] = token if token else '' self.session.headers['Accept'] = 'application/json' self.session.headers['Content-Type'] = 'application/json' if proxies: self.session.proxies.update(proxies) def set_apikey(self, apikey: str) -> None: '''Set the API key to use for the requests''' self.session.headers['X-API-KEY'] = apikey @property def is_up(self) -> bool: '''Test if the given instance is accessible''' try: r = self.session.head(self.root_url) except requests.exceptions.ConnectionError: return False return r.status_code == 200 def redis_up(self) -> bool: '''Check if redis is up and running''' r = self.session.get(urljoin(self.root_url, 'redis_up')) return r.json() def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]: '''Get a vulnerability :param vulnerability_id: The ID of the vulnerability to get (can be from any source, as long as it is a valid ID) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id)))) return r.json() def create_vulnerability(self, vulnerability: dict[str, Any]) -> dict[str, Any]: '''Create a vulnerability. :param vulnerability: The vulnerability ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('vulnerability'))), json=vulnerability) return r.json() def delete_vulnerability(self, vulnerability_id: str) -> int: '''Delete a vulnerability. :param vulnerability_id: The vulnerability ID ''' r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id)))) return r.status_code def get_info(self) -> dict[str, Any]: '''Get more information about the current databases in use and when it was updated''' r = self.session.get(urljoin(self.root_url, 'info')) return r.json() def get_config_info(self) -> dict[str, Any]: '''Get more information about the current databases in use and when it was updated''' r = self.session.get(urljoin(self.root_url, 'configInfo')) return r.json() def get_last(self, number: int | None=None, source: str | None = None) -> list[dict[str, Any]]: '''Get the last vulnerabilities :param number: The number of vulnerabilities to get :param source: The source of the vulnerabilities ''' path = PurePosixPath('last') if source: path /= source if number is not None: path /= str(number) r = self.session.get(urljoin(self.root_url, str(path))) return r.json() def get_vendors(self) -> list[str]: '''Get the list of known vendors''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse')))) return r.json() def get_vendor_products(self, vendor: str) -> list[str]: '''Get the known products for a vendor :params vendor: A vendor owning products (must be in the known vendor list) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse', vendor)))) return r.json() def get_vendor_product_vulnerabilities(self, vendor: str, product: str) -> list[str]: '''Get the the vulnerabilities per vendor and a specific product :param vendor: A vendor owning products (must be in the known vendor list) :param product: A product owned by that vendor ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'search', vendor, product)))) return r.json() # NOTE: endpoints /api/cve/*, /api/dbInfo, /api/last are alises for backward compat. def create_comment(self, comment: dict[str, Any]) -> dict[str, Any]: '''Create a comment. :param comment: The comment ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))), json=comment) return r.json() def get_comments(self, uuid: str | None = None, vuln_id: str | None = None, author: str | None = None) -> dict[str, Any]: '''Get comment(s) :param uuid: The UUID of a specific comment :param vuln_id: The vulnerability ID to get comments of :param author: The author of the comment(s) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))), params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author}) return r.json() def delete_comment(self, comment_uuid: str) -> int: '''Delete a comment. :param comment_uuid: The comment UUID ''' r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid)))) return r.status_code def create_bundle(self, bundle: dict[str, Any]) -> dict[str, Any]: '''Create a bundle. :param bundle: The bundle ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))), json=bundle) return r.json() def get_bundles(self, uuid: str | None = None, vuln_id: str | None = None, author: str | None = None) -> dict[str, Any]: '''Get bundle(s) :param uuid: The UUID a specific bundle :param vuln_id: The vulnerability ID to get bundles of :param author: The author of the bundle(s) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))), params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author}) return r.json() def delete_bundle(self, bundle_uuid: str) -> int: '''Delete a bundle. :param bundle_uuid: The bundle UUID ''' r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid)))) return r.status_code def create_user(self, login: str, name: str, organisation: str, email: str) -> dict[str, Any]: '''Create a user. :param login: The login of the user :param name: The name of the user :param organisation: The organisation of the user :param email: The email of the user ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))), json={'login': login, 'name': name, 'organisation': organisation, 'email': email}) return r.json() def list_users(self) -> dict[str, Any]: '''List users''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user')))) return r.json()