#!/usr/bin/env python3 from __future__ import annotations import logging from datetime import date, datetime from importlib.metadata import version from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse import requests def enable_full_debug() -> None: import http.client as http_client http_client.HTTPConnection.debuglevel = 1 logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True class PyVulnerabilityLookup(): def __init__(self, root_url: str='https://vulnerability.circl.lu', useragent: str | None=None, token: str | None=None, *, proxies: dict[str, str] | None=None) -> None: '''Query a specific instance. :param root_url: URL of the instance to query. :param useragent: The User Agent used by requests to run the HTTP requests against the vulnerability lookup instance :param proxies: The proxies to use to connect to the vulnerability lookup instance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies ''' self.root_url = root_url if not urlparse(self.root_url).scheme: self.root_url = 'http://' + self.root_url if not self.root_url.endswith('/'): self.root_url += '/' self.session = requests.session() self.session.headers['user-agent'] = useragent if useragent else f'PyVulnerabilityLookup / {version("pyvulnerabilitylookup")}' self.session.headers['X-API-KEY'] = token if token else '' self.session.headers['Accept'] = 'application/json' self.session.headers['Content-Type'] = 'application/json' if proxies: self.session.proxies.update(proxies) def set_apikey(self, apikey: str) -> None: '''Set the API key to use for the requests''' self.session.headers['X-API-KEY'] = apikey @property def is_up(self) -> bool: '''Test if the given instance is accessible''' try: r = self.session.head(self.root_url) except requests.exceptions.ConnectionError: return False return r.status_code == 200 def redis_up(self) -> bool: '''Check if redis is up and running''' r = self.session.get(urljoin(self.root_url, 'redis_up')) return r.json() def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]: '''Get a vulnerability :param vulnerability_id: The ID of the vulnerability to get (can be from any source, as long as it is a valid ID) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id)))) return r.json() def create_vulnerability(self, vulnerability: dict[str, Any]) -> dict[str, Any]: '''Create a vulnerability. :param vulnerability: The vulnerability ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('vulnerability'))), json=vulnerability) return r.json() def delete_vulnerability(self, vulnerability_id: str) -> int: '''Delete a vulnerability. :param vulnerability_id: The vulnerability ID ''' r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id)))) return r.status_code def get_info(self) -> dict[str, Any]: '''Get more information about the current databases in use and when it was updated''' r = self.session.get(urljoin(self.root_url, 'info')) return r.json() def get_config_info(self) -> dict[str, Any]: '''Get more information about the current databases in use and when it was updated''' r = self.session.get(urljoin(self.root_url, 'configInfo')) return r.json() def get_last(self, number: int | None=None, source: str | None = None) -> list[dict[str, Any]]: '''Get the last vulnerabilities :param number: The number of vulnerabilities to get :param source: The source of the vulnerabilities ''' path = PurePosixPath('last') if source: path /= source if number is not None: path /= str(number) r = self.session.get(urljoin(self.root_url, str(path))) return r.json() def get_vendors(self) -> list[str]: '''Get the list of known vendors''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse')))) return r.json() def get_vendor_products(self, vendor: str) -> list[str]: '''Get the known products for a vendor :params vendor: A vendor owning products (must be in the known vendor list) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse', vendor)))) return r.json() def get_vendor_product_vulnerabilities(self, vendor: str, product: str) -> list[str]: '''Get the the vulnerabilities per vendor and a specific product :param vendor: A vendor owning products (must be in the known vendor list) :param product: A product owned by that vendor ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'search', vendor, product)))) return r.json() # NOTE: endpoints /api/cve/*, /api/dbInfo, /api/last are alises for backward compat. def create_comment(self, comment: dict[str, Any]) -> dict[str, Any]: '''Create a comment. :param comment: The comment ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))), json=comment) return r.json() def get_comments(self, uuid: str | None = None, vuln_id: str | None = None, author: str | None = None) -> dict[str, Any]: '''Get comment(s) :param uuid: The UUID of a specific comment :param vuln_id: The vulnerability ID to get comments of :param author: The author of the comment(s) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))), params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author}) return r.json() def delete_comment(self, comment_uuid: str) -> int: '''Delete a comment. :param comment_uuid: The comment UUID ''' r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid)))) return r.status_code def create_bundle(self, bundle: dict[str, Any]) -> dict[str, Any]: '''Create a bundle. :param bundle: The bundle ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))), json=bundle) return r.json() def get_bundles(self, uuid: str | None = None, vuln_id: str | None = None, author: str | None = None) -> dict[str, Any]: '''Get bundle(s) :param uuid: The UUID a specific bundle :param vuln_id: The vulnerability ID to get bundles of :param author: The author of the bundle(s) ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))), params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author}) return r.json() def delete_bundle(self, bundle_uuid: str) -> int: '''Delete a bundle. :param bundle_uuid: The bundle UUID ''' r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid)))) return r.status_code def create_user(self, login: str, name: str, organisation: str, email: str) -> dict[str, Any]: '''Create a user. :param login: The login of the user :param name: The name of the user :param organisation: The organisation of the user :param email: The email of the user ''' r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))), json={'login': login, 'name': name, 'organisation': organisation, 'email': email}) return r.json() def list_users(self) -> dict[str, Any]: '''List users''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user')))) return r.json() def get_user_information(self) -> dict[str, Any]: '''Get user information''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user', 'me')))) return r.json() # #### Sightings #### def get_sighting(self, sighting_uuid: str) -> dict[str, Any]: '''Get a sighting :param sighting_uuid: The UUID of the sighting ''' r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting', sighting_uuid)))) return r.json() def get_sightings(self, /, *, sighting_uuid: str | None=None, sighting_type: str | None=None, vuln_id: str | None = None, author: str | None = None, date_from: date | datetime | None=None, date_to: date | datetime | None=None) -> dict[str, Any]: '''Get sightings :param sighting_uuid: The UUID of a specific sighting :param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'. :param vuln_id: The vulnerability ID to get sightings of :param author: The author of the sighting(s) :param date_from: The date from which to get sightings :param date_to: The date to which to get sightings ''' params = {} if sighting_uuid: params['uuid'] = sighting_uuid if sighting_type: params['type'] = sighting_type if vuln_id: params['vuln_id'] = vuln_id if author: params['author'] = author if date_from: if isinstance(date_from, datetime): date_from = date_from.date() params['date_from'] = date_from.isoformat() if date_to: if isinstance(date_to, datetime): date_to = date_to.date() params['date_to'] = date_to.isoformat() r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))), params=params) return r.json() def create_sighting(self, /, *, sighting: dict[str, Any] | None=None, creation_timestamp: datetime | None=None, source: str | None = None, sighting_type: str | None=None, vulnerability: str | None=None) -> dict[str, Any]: '''Create a sighting. :param sighting: The sighting, as an object. :param creation_timestamp: The timestamp of the sighting - set to now if not provided :param source: The source of the sighting :param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'. :param vulnerability: The vulnerability ID of the sighting ''' if not sighting: sighting = {} if creation_timestamp: # This calue may or may not have a TZ at this point sighting['creation_timestamp'] = creation_timestamp if source: sighting['source'] = source if sighting_type: sighting['type'] = sighting_type if vulnerability: sighting['vulnerability'] = vulnerability if 'creation_timestamp' in sighting: # check if the datetime object has a TZ, if it doesn't, set it to localtime, make it a string if sighting['creation_timestamp'].tzinfo is None: sighting['creation_timestamp'] = sighting['creation_timestamp'].astimezone() sighting['creation_timestamp'] = sighting['creation_timestamp'].isoformat() r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))), json=sighting) return r.json()