PyVulnerabilityLookup/pyvulnerabilitylookup/api.py

300 lines
12 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import logging
from datetime import date, datetime
from importlib.metadata import version
from pathlib import PurePosixPath
from typing import Any
from urllib.parse import urljoin, urlparse
import requests
def enable_full_debug() -> None:
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
class PyVulnerabilityLookup():
def __init__(self, root_url: str='https://vulnerability.circl.lu', useragent: str | None=None, token: str | None=None,
*, proxies: dict[str, str] | None=None) -> None:
'''Query a specific instance.
:param root_url: URL of the instance to query.
:param useragent: The User Agent used by requests to run the HTTP requests against the vulnerability lookup instance
:param proxies: The proxies to use to connect to the vulnerability lookup instance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
'''
self.root_url = root_url
if not urlparse(self.root_url).scheme:
self.root_url = 'http://' + self.root_url
if not self.root_url.endswith('/'):
self.root_url += '/'
self.session = requests.session()
self.session.headers['user-agent'] = useragent if useragent else f'PyVulnerabilityLookup / {version("pyvulnerabilitylookup")}'
self.session.headers['X-API-KEY'] = token if token else ''
self.session.headers['Accept'] = 'application/json'
self.session.headers['Content-Type'] = 'application/json'
if proxies:
self.session.proxies.update(proxies)
def set_apikey(self, apikey: str) -> None:
'''Set the API key to use for the requests'''
self.session.headers['X-API-KEY'] = apikey
@property
def is_up(self) -> bool:
'''Test if the given instance is accessible'''
try:
r = self.session.head(self.root_url)
except requests.exceptions.ConnectionError:
return False
return r.status_code == 200
def redis_up(self) -> bool:
'''Check if redis is up and running'''
r = self.session.get(urljoin(self.root_url, 'redis_up'))
return r.json()
def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]:
'''Get a vulnerability
:param vulnerability_id: The ID of the vulnerability to get (can be from any source, as long as it is a valid ID)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id))))
return r.json()
def create_vulnerability(self, vulnerability: dict[str, Any]) -> dict[str, Any]:
'''Create a vulnerability.
:param vulnerability: The vulnerability
'''
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('vulnerability'))),
json=vulnerability)
return r.json()
def delete_vulnerability(self, vulnerability_id: str) -> int:
'''Delete a vulnerability.
:param vulnerability_id: The vulnerability ID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id))))
return r.status_code
def get_info(self) -> dict[str, Any]:
'''Get more information about the current databases in use and when it was updated'''
r = self.session.get(urljoin(self.root_url, 'info'))
return r.json()
def get_config_info(self) -> dict[str, Any]:
'''Get more information about the current databases in use and when it was updated'''
r = self.session.get(urljoin(self.root_url, 'configInfo'))
return r.json()
def get_last(self, number: int | None=None, source: str | None = None) -> list[dict[str, Any]]:
'''Get the last vulnerabilities
:param number: The number of vulnerabilities to get
:param source: The source of the vulnerabilities
'''
path = PurePosixPath('last')
if source:
path /= source
if number is not None:
path /= str(number)
r = self.session.get(urljoin(self.root_url, str(path)))
return r.json()
def get_vendors(self) -> list[str]:
'''Get the list of known vendors'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse'))))
return r.json()
def get_vendor_products(self, vendor: str) -> list[str]:
'''Get the known products for a vendor
:params vendor: A vendor owning products (must be in the known vendor list)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse', vendor))))
return r.json()
def get_vendor_product_vulnerabilities(self, vendor: str, product: str) -> list[str]:
'''Get the the vulnerabilities per vendor and a specific product
:param vendor: A vendor owning products (must be in the known vendor list)
:param product: A product owned by that vendor
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'search', vendor, product))))
return r.json()
# NOTE: endpoints /api/cve/*, /api/dbInfo, /api/last are alises for backward compat.
def create_comment(self, comment: dict[str, Any]) -> dict[str, Any]:
'''Create a comment.
:param comment: The comment
'''
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))),
json=comment)
return r.json()
def get_comments(self, uuid: str | None = None, vuln_id: str | None = None,
author: str | None = None) -> dict[str, Any]:
'''Get comment(s)
:param uuid: The UUID of a specific comment
:param vuln_id: The vulnerability ID to get comments of
:param author: The author of the comment(s)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))),
params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author})
return r.json()
def delete_comment(self, comment_uuid: str) -> int:
'''Delete a comment.
:param comment_uuid: The comment UUID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid))))
return r.status_code
def create_bundle(self, bundle: dict[str, Any]) -> dict[str, Any]:
'''Create a bundle.
:param bundle: The bundle
'''
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))),
json=bundle)
return r.json()
def get_bundles(self, uuid: str | None = None, vuln_id: str | None = None,
author: str | None = None) -> dict[str, Any]:
'''Get bundle(s)
:param uuid: The UUID a specific bundle
:param vuln_id: The vulnerability ID to get bundles of
:param author: The author of the bundle(s)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))),
params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author})
return r.json()
def delete_bundle(self, bundle_uuid: str) -> int:
'''Delete a bundle.
:param bundle_uuid: The bundle UUID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid))))
return r.status_code
def create_user(self, login: str, name: str, organisation: str, email: str) -> dict[str, Any]:
'''Create a user.
:param login: The login of the user
:param name: The name of the user
:param organisation: The organisation of the user
:param email: The email of the user
'''
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))),
json={'login': login, 'name': name, 'organisation': organisation, 'email': email})
return r.json()
def list_users(self) -> dict[str, Any]:
'''List users'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user'))))
return r.json()
def get_user_information(self) -> dict[str, Any]:
'''Get user information'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user', 'me'))))
return r.json()
# #### Sightings ####
def get_sighting(self, sighting_uuid: str) -> dict[str, Any]:
'''Get a sighting
:param sighting_uuid: The UUID of the sighting
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting', sighting_uuid))))
return r.json()
def get_sightings(self, /, *, sighting_uuid: str | None=None,
sighting_type: str | None=None, vuln_id: str | None = None,
author: str | None = None,
date_from: date | datetime | None=None,
date_to: date | datetime | None=None) -> dict[str, Any]:
'''Get sightings
:param sighting_uuid: The UUID of a specific sighting
:param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'.
:param vuln_id: The vulnerability ID to get sightings of
:param author: The author of the sighting(s)
:param date_from: The date from which to get sightings
:param date_to: The date to which to get sightings
'''
params = {}
if sighting_uuid:
params['uuid'] = sighting_uuid
if sighting_type:
params['type'] = sighting_type
if vuln_id:
params['vuln_id'] = vuln_id
if author:
params['author'] = author
if date_from:
if isinstance(date_from, datetime):
date_from = date_from.date()
params['date_from'] = date_from.isoformat()
if date_to:
if isinstance(date_to, datetime):
date_to = date_to.date()
params['date_to'] = date_to.isoformat()
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))), params=params)
return r.json()
def create_sighting(self, /, *, sighting: dict[str, Any] | None=None,
creation_timestamp: datetime | None=None,
source: str | None = None,
sighting_type: str | None=None,
vulnerability: str | None=None) -> dict[str, Any]:
'''Create a sighting.
:param sighting: The sighting, as an object.
:param creation_timestamp: The timestamp of the sighting - set to now if not provided
:param source: The source of the sighting
:param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'.
:param vulnerability: The vulnerability ID of the sighting
'''
if not sighting:
sighting = {}
if creation_timestamp:
# This calue may or may not have a TZ at this point
sighting['creation_timestamp'] = creation_timestamp
if source:
sighting['source'] = source
if sighting_type:
sighting['type'] = sighting_type
if vulnerability:
sighting['vulnerability'] = vulnerability
if 'creation_timestamp' in sighting:
# check if the datetime object has a TZ, if it doesn't, set it to localtime, make it a string
if sighting['creation_timestamp'].tzinfo is None:
sighting['creation_timestamp'] = sighting['creation_timestamp'].astimezone()
sighting['creation_timestamp'] = sighting['creation_timestamp'].isoformat()
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))),
json=sighting)
return r.json()