PyVulnerabilityLookup/pyvulnerabilitylookup/api.py

434 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
import logging
from datetime import date, datetime
from importlib.metadata import version
from pathlib import PurePosixPath
from typing import Any
from urllib.parse import urljoin, urlparse
import requests
from urllib3.util import Retry
from requests.adapters import HTTPAdapter
def enable_full_debug() -> None:
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
class PyVulnerabilityLookup():
def __init__(self, root_url: str='https://vulnerability.circl.lu', useragent: str | None=None, token: str | None=None,
*, proxies: dict[str, str] | None=None) -> None:
'''Query a specific instance.
:param root_url: URL of the instance to query.
:param useragent: The User Agent used by requests to run the HTTP requests against the vulnerability lookup instance
:param proxies: The proxies to use to connect to the vulnerability lookup instance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
'''
self.root_url = root_url
if not urlparse(self.root_url).scheme:
self.root_url = 'http://' + self.root_url
if not self.root_url.endswith('/'):
self.root_url += '/'
self.session = requests.session()
self.session.headers['user-agent'] = useragent if useragent else f'PyVulnerabilityLookup / {version("pyvulnerabilitylookup")}'
self.session.headers['X-API-KEY'] = token if token else ''
self.session.headers['Accept'] = 'application/json'
self.session.headers['Content-Type'] = 'application/json'
if proxies:
self.session.proxies.update(proxies)
retries = Retry(total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
self.session.mount('https://', HTTPAdapter(max_retries=retries))
def set_apikey(self, apikey: str) -> None:
'''Set the API key to use for the requests'''
self.session.headers['X-API-KEY'] = apikey
@property
def is_up(self) -> bool:
'''Test if the given instance is accessible'''
try:
r = self.session.head(self.root_url)
except requests.exceptions.ConnectionError:
return False
return r.status_code == 200
def redis_up(self) -> bool:
'''Check if redis is up and running'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'system', 'redis_up'))))
return r.json()
# #### DB status ####
def get_info(self) -> dict[str, Any]:
'''Get more information about the current databases in use and when it was updated'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'system', 'dbInfo'))))
return r.json()
def get_config_info(self) -> dict[str, Any]:
'''Get more information about the current databases in use and when it was updated'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'system', 'configInfo'))))
return r.json()
# #### Vulnerabilities ####
def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]:
'''Get a vulnerability
:param vulnerability_id: The ID of the vulnerability to get (can be from any source, as long as it is a valid ID)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', vulnerability_id))))
return r.json()
def create_vulnerability(self, vulnerability: dict[str, Any]) -> dict[str, Any]:
'''Create a vulnerability.
:param vulnerability: The vulnerability
'''
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability'))),
json=vulnerability)
return r.json()
def delete_vulnerability(self, vulnerability_id: str) -> int:
'''Delete a vulnerability.
:param vulnerability_id: The vulnerability ID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', vulnerability_id))))
return r.status_code
def get_last(self, number: int | None=None, source: str | None = None) -> list[dict[str, Any]]:
'''Get the last vulnerabilities
:param number: The number of vulnerabilities to get
:param source: The source of the vulnerabilities
'''
path = PurePosixPath('last')
if source:
path /= source
if number is not None:
path /= str(number)
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', path))))
return r.json()
def get_vendors(self) -> list[str]:
'''Get the known vendors'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', 'browse'))))
return r.json()
def get_vendor_products(self, vendor: str) -> list[str]:
'''Get the known products for a vendor
:params vendor: A vendor owning products (must be in the known vendor list)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse', vendor))))
return r.json()
def get_vendor_product_vulnerabilities(self, vendor: str, product: str) -> list[str]:
'''Get the the vulnerabilities per vendor and a specific product
:param vendor: A vendor owning products (must be in the known vendor list)
:param product: A product owned by that vendor
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', 'search', vendor, product))))
return r.json()
# #### Comments ####
def create_comment(self, /, *, comment: dict[str, Any] | None=None, description: str | None=None,
description_format: str | None = None, meta: dict[str, str] | None = None,
related_vulnerabilities: list[str] | None=None, title: str | None=None,
uuid: str | None=None, vulnerability: str | None = None) -> dict[str, Any]:
'''Create a comment.
:param comment: The comment
:param description: The description of the comment
:param description_format: Description format (markdown or text).
:param meta: Zero or more meta-fields.
:param related_vulnerabilities: Zero or more related vulnerabilities.
:param title: The title of the comment
:param uuid: The UUID of the comment
:param vulnerability: The vulnerability ID of the comment
'''
if not comment:
comment = {}
if description:
comment['description'] = description
if description_format:
comment['description_format'] = description_format
if meta:
comment['meta'] = meta
if related_vulnerabilities:
comment['related_vulnerabilities'] = related_vulnerabilities
if title:
comment['title'] = title
if uuid:
comment['uuid'] = uuid
if vulnerability:
comment['vulnerability'] = vulnerability
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))), json=comment)
return r.json()
def get_comments(self, uuid: str | None = None, vuln_id: str | None = None,
author: str | None = None) -> dict[str, Any]:
'''Get comment(s)
:param uuid: The UUID of a specific comment
:param vuln_id: The vulnerability ID to get comments of
:param author: The author of the comment(s)
'''
params = {}
if uuid:
params['uuid'] = uuid
if vuln_id:
params['vuln_id'] = vuln_id
if author:
params['author'] = author
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))),
params=params)
return r.json()
def get_comment(self, comment_uuid: str) -> dict[str, Any]:
'''Get a comment
:param comment_uuid: The UUID of the comment
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid)))
)
return r.json()
def delete_comment(self, comment_uuid: str) -> int:
'''Delete a comment.
:param comment_uuid: The comment UUID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid))))
return r.status_code
# #### Bundles ####
def create_bundle(self, /, *, bundle: dict[str, Any] | None=None, description: str | None=None,
meta: dict[str, str] | None=None, name: str | None=None, related_vulnerabilities: list[str] | None=None,
uuid: str | None=None) -> dict[str, Any]:
'''Create a bundle.
:param bundle: The bundle
'''
if not bundle:
bundle = {}
if description:
bundle['description'] = description
if meta:
bundle['meta'] = meta
if name:
bundle['name'] = name
if related_vulnerabilities:
bundle['related_vulnerabilities'] = related_vulnerabilities
if uuid:
bundle['uuid'] = uuid
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))),
json=bundle)
return r.json()
def get_bundles(self, uuid: str | None = None, vuln_id: str | None = None,
author: str | None = None, per_page: int | None=None,
meta: list[dict[str, str]] | None=None) -> dict[str, Any]:
'''Get bundle(s)
:param uuid: The UUID a specific bundle
:param vuln_id: The vulnerability ID to get bundles of
:param author: The author of the bundle(s)
:param per_page: The number of bundles to get per page
:param meta: Query for the meta JSON field. Example: meta=[{tags: [tcp]}]
'''
params: dict[str, Any] = {}
if uuid:
params['uuid'] = uuid
if vuln_id:
params['vuln_id'] = vuln_id
if author:
params['author'] = author
if per_page is not None:
params['per_page'] = per_page
if meta:
params['meta'] = meta
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))), params=params)
return r.json()
def get_bundle(self, bundle_uuid: str) -> dict[str, Any]:
'''Get a bundle
:param bundle_uuid: The UUID of the bundle
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid))))
return r.json()
def delete_bundle(self, bundle_uuid: str) -> int:
'''Delete a bundle.
:param bundle_uuid: The bundle UUID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid))))
return r.status_code
# #### Users ####
def create_user(self, /, *, user: dict[str, Any] | None=None,
login: str | None=None, name: str | None=None,
organisation: str | None=None, email: str | None=None) -> dict[str, Any]:
'''Create a user.
:param login: The login of the user
:param name: The name of the user
:param organisation: The organisation of the user
:param email: The email of the user
'''
if not user:
user = {}
if login:
user['login'] = login
if name:
user['name'] = name
if organisation:
user['organisation'] = organisation
if email:
user['email'] = email
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))), json=user)
return r.json()
def list_users(self) -> dict[str, Any]:
# Alias this one to get_users for consistency
return self.get_users()
def get_users(self) -> dict[str, Any]:
'''List users'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user'))))
return r.json()
def get_user_information(self) -> dict[str, Any]:
'''Get user information'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user', 'me'))))
return r.json()
def reset_api_key(self) -> dict[str, Any]:
'''Reset the API key'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user', 'api_key'))))
return r.json()
def delete_user(self, user_id: str) -> int:
'''Delete a user.
:param user_id: The user ID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'user', user_id))))
return r.status_code
# #### Sightings ####
def get_sighting(self, sighting_uuid: str) -> dict[str, Any]:
'''Get a sighting
:param sighting_uuid: The UUID of the sighting
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting', sighting_uuid))))
return r.json()
def get_sightings(self, /, *, sighting_uuid: str | None=None,
sighting_type: str | None=None, vuln_id: str | None = None,
author: str | None = None,
date_from: date | datetime | None=None,
date_to: date | datetime | None=None) -> dict[str, Any]:
'''Get sightings
:param sighting_uuid: The UUID of a specific sighting
:param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'.
:param vuln_id: The vulnerability ID to get sightings of
:param author: The author of the sighting(s)
:param date_from: The date from which to get sightings
:param date_to: The date to which to get sightings
'''
params = {}
if sighting_uuid:
params['uuid'] = sighting_uuid
if sighting_type:
params['type'] = sighting_type
if vuln_id:
params['vuln_id'] = vuln_id
if author:
params['author'] = author
if date_from:
if isinstance(date_from, datetime):
date_from = date_from.date()
params['date_from'] = date_from.isoformat()
if date_to:
if isinstance(date_to, datetime):
date_to = date_to.date()
params['date_to'] = date_to.isoformat()
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))), params=params)
return r.json()
def create_sighting(self, /, *, sighting: dict[str, Any] | None=None,
creation_timestamp: datetime | None=None,
source: str | None = None,
sighting_type: str | None=None,
vulnerability: str | None=None) -> dict[str, Any]:
'''Create a sighting.
:param sighting: The sighting, as an object.
:param creation_timestamp: The timestamp of the sighting - set to now if not provided
:param source: The source of the sighting
:param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'.
:param vulnerability: The vulnerability ID of the sighting
'''
if not sighting:
sighting = {}
if creation_timestamp:
# This calue may or may not have a TZ at this point
sighting['creation_timestamp'] = creation_timestamp
if source:
sighting['source'] = source
if sighting_type:
sighting['type'] = sighting_type
if vulnerability:
sighting['vulnerability'] = vulnerability
if 'creation_timestamp' in sighting:
# check if the datetime object has a TZ, if it doesn't, set it to localtime, make it a string
if sighting['creation_timestamp'].tzinfo is None:
sighting['creation_timestamp'] = sighting['creation_timestamp'].astimezone()
sighting['creation_timestamp'] = sighting['creation_timestamp'].isoformat()
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))),
json=sighting)
return r.json()
# #### EPSS ####
def get_epss(self, vulnerability: str) -> dict[str, Any]:
'''Get the EPSS for a vulnerability
:param vulnerability: The vulnerability ID
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'epss', vulnerability))))
return r.json()