mirror of
https://github.com/cve-search/PyVulnerabilityLookup.git
synced 2024-11-25 16:27:23 +00:00
158 lines
6.2 KiB
Python
158 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
from importlib.metadata import version
|
|
from pathlib import PurePosixPath
|
|
from typing import Any, Dict
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import requests
|
|
|
|
|
|
class PyVulnerabilityLookup():
|
|
|
|
def __init__(self, root_url: str, useragent: str | None=None, token: str | None=None,
|
|
*, proxies: dict[str, str] | None=None) -> None:
|
|
'''Query a specific instance.
|
|
|
|
:param root_url: URL of the instance to query.
|
|
:param useragent: The User Agent used by requests to run the HTTP requests against the vulnerability lookup instance
|
|
:param proxies: The proxies to use to connect to the vulnerability lookup instance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
|
|
'''
|
|
self.root_url = root_url
|
|
|
|
if not urlparse(self.root_url).scheme:
|
|
self.root_url = 'http://' + self.root_url
|
|
if not self.root_url.endswith('/'):
|
|
self.root_url += '/'
|
|
self.session = requests.session()
|
|
self.session.headers['user-agent'] = useragent if useragent else f'PyProject / {version("pyvulnerabilitylookup")}'
|
|
self.session.headers['X-API-KEY'] = token if token else ''
|
|
self.session.headers['Accept'] = 'application/json'
|
|
self.session.headers['Content-Type'] = 'application/json'
|
|
if proxies:
|
|
self.session.proxies.update(proxies)
|
|
|
|
@property
|
|
def is_up(self) -> bool:
|
|
'''Test if the given instance is accessible'''
|
|
try:
|
|
r = self.session.head(self.root_url)
|
|
except requests.exceptions.ConnectionError:
|
|
return False
|
|
return r.status_code == 200
|
|
|
|
def redis_up(self) -> bool:
|
|
'''Check if redis is up and running'''
|
|
r = self.session.get(urljoin(self.root_url, 'redis_up'))
|
|
return r.json()
|
|
|
|
def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]:
|
|
'''Get a vulnerability
|
|
|
|
:param vulnerability_id: The ID of the vulnerability to get (can be from any source, as long as it is a valid ID)
|
|
'''
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id))))
|
|
return r.json()
|
|
|
|
def get_info(self) -> dict[str, Any]:
|
|
'''Get more information about the current databases in use and when it was updated'''
|
|
r = self.session.get(urljoin(self.root_url, 'info'))
|
|
return r.json()
|
|
|
|
def get_last(self, number: int | None=None, source: str | None = None) -> list[dict[str, Any]]:
|
|
'''Get the last vulnerabilities
|
|
|
|
:param number: The number of vulnerabilities to get
|
|
:param source: The source of the vulnerabilities
|
|
'''
|
|
path = PurePosixPath('last')
|
|
if source:
|
|
path /= source
|
|
if number is not None:
|
|
path /= str(number)
|
|
r = self.session.get(urljoin(self.root_url, str(path)))
|
|
return r.json()
|
|
|
|
def get_vendors(self) -> list[str]:
|
|
'''Get the list of known vendors'''
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse'))))
|
|
return r.json()
|
|
|
|
def get_vendor_products(self, vendor: str) -> list[str]:
|
|
'''Get the known products for a vendor
|
|
|
|
:params vendor: A vendor owning products (must be in the known vendor list)
|
|
'''
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse', vendor))))
|
|
return r.json()
|
|
|
|
def get_vendor_product_vulnerabilities(self, vendor: str, product: str) -> list[str]:
|
|
'''Get the the vulnerabilities per vendor and a specific product
|
|
|
|
:param vendor: A vendor owning products (must be in the known vendor list)
|
|
:param product: A product owned by that vendor
|
|
'''
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'search', vendor, product))))
|
|
return r.json()
|
|
|
|
# NOTE: endpoints /api/cve/*, /api/dbInfo, /api/last are alises for backward compat.
|
|
|
|
def create_comment(self, comment: Dict[str, Any]) -> Dict[str, Any]:
|
|
'''Create a comment.
|
|
|
|
:param comment: The comment
|
|
'''
|
|
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))),
|
|
json=comment)
|
|
return r.json()
|
|
|
|
def get_comments(self, uuid: str | None = None, vuln_id: str | None = None,
|
|
author: str | None = None) -> dict[str, Any]:
|
|
'''Get comment(s)
|
|
|
|
:param uuid: The UUID of a specific comment
|
|
:param vuln_id: The vulnerability ID to get comments of
|
|
:param author: The author of the comment(s)
|
|
'''
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))),
|
|
params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author})
|
|
return r.json()
|
|
|
|
def delete_comment(self, comment_uuid: str) -> int:
|
|
'''Delete a comment.
|
|
|
|
:param comment_uuid: The comment UUID
|
|
'''
|
|
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid))))
|
|
return r.status_code
|
|
|
|
def create_bundle(self, bundle: Dict[str, Any]) -> Dict[str, Any]:
|
|
'''Create a bundle.
|
|
|
|
:param bundle: The bundle
|
|
'''
|
|
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))),
|
|
json=bundle)
|
|
return r.json()
|
|
|
|
def get_bundles(self, uuid: str | None = None, vuln_id: str | None = None,
|
|
author: str | None = None) -> dict[str, Any]:
|
|
'''Get bundle(s)
|
|
|
|
:param uuid: The UUID a specific bundle
|
|
:param vuln_id: The vulnerability ID to get bundles of
|
|
:param author: The author of the bundle(s)
|
|
'''
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))),
|
|
params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author})
|
|
return r.json()
|
|
|
|
def delete_bundle(self, bundle_uuid: str) -> int:
|
|
'''Delete a bundle.
|
|
|
|
:param bundle_uuid: The bundle UUID
|
|
'''
|
|
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid))))
|
|
return r.status_code
|