new: Support all public API calls

This commit is contained in:
Raphaël Vinot 2024-07-23 15:01:35 +02:00
parent 2e26d60150
commit 0df80fb8f3
2 changed files with 176 additions and 4 deletions

View file

@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
from importlib.metadata import version from importlib.metadata import version
from pathlib import Path from pathlib import PurePosixPath
from typing import Any from typing import Any
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
@ -46,5 +46,76 @@ class PyVulnerabilityLookup():
return r.json() return r.json()
def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]: def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]:
r = self.session.get(urljoin(self.root_url, str(Path('vulnerability', vulnerability_id)))) '''Get a vulnerability
:param vulnerability_id: The ID of the vulnerability to get (can be from any source, as long as it is a valid ID)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id))))
return r.json()
def get_info(self) -> dict[str, Any]:
'''Get more information about the current databases in use and when it was updated'''
r = self.session.get(urljoin(self.root_url, 'info'))
return r.json()
def get_last(self, number: int | None=None, source: str | None = None) -> dict[str, Any]:
'''Get the last vulnerabilities
:param number: The number of vulnerabilities to get
:param source: The source of the vulnerabilities
'''
path = PurePosixPath('last')
if source:
path /= source
if number is not None:
path /= str(number)
r = self.session.get(urljoin(self.root_url, str(path)))
return r.json()
def get_vendors(self) -> list[str]:
'''Get the list of known vendors'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse'))))
return r.json()
def get_vendor_products(self, vendor: str) -> list[str]:
'''Get the known products for a vendor
:params vendor: A vendor owning products (must be in the known vendor list)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'browse', vendor))))
return r.json()
def get_vendor_product_vulnerabilities(self, vendor: str, product: str) -> list[str]:
'''Get the the vulnerabilities per vendor and a specific product
:param vendor: A vendor owning products (must be in the known vendor list)
:param product: A product owned by that vendor
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'search', vendor, product))))
return r.json()
# NOTE: endpoints /api/cve/*, /api/dbInfo, /api/last are alises for backward compat.
def get_comments(self, uuid: str | None = None, vuln_id: str | None = None,
author: str | None = None) -> dict[str, Any]:
'''Get comment(s)
:param uuid: The UUID a specific comment
:param vuln_id: The vulnerability ID to get comments of
:param author: The author of the comment(s)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))),
params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author})
return r.json()
def get_bundles(self, uuid: str | None = None, vuln_id: str | None = None,
author: str | None = None) -> dict[str, Any]:
'''Get bundle(s)
:param uuid: The UUID a specific bundle
:param vuln_id: The vulnerability ID to get bundles of
:param author: The author of the bundle(s)
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))),
params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author})
return r.json() return r.json()

View file

@ -6,10 +6,12 @@ import time
from pyvulnerabilitylookup import PyVulnerabilityLookup from pyvulnerabilitylookup import PyVulnerabilityLookup
class TestBasic(unittest.TestCase): class TestPublic(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
self.client = PyVulnerabilityLookup(root_url="http://127.0.0.1:10001") self.client = PyVulnerabilityLookup(root_url="https://vulnerability.circl.lu")
# Test default
def test_up(self) -> None: def test_up(self) -> None:
self.assertTrue(self.client.is_up) self.assertTrue(self.client.is_up)
@ -22,3 +24,102 @@ class TestBasic(unittest.TestCase):
break break
print('waiting for pysec to be imported') print('waiting for pysec to be imported')
time.sleep(1) time.sleep(1)
def test_get_info(self) -> None:
info = self.client.get_info()
self.assertTrue(info['last_updates'])
self.assertTrue(info['db_sizes'])
def test_get_last(self) -> None:
last = self.client.get_last()
self.assertTrue(last)
self.assertTrue(isinstance(last, list))
last = self.client.get_last(number=1)
self.assertTrue(isinstance(last, list))
self.assertEqual(len(last), 1)
last = self.client.get_last(source='pysec')
for vuln in last:
self.assertTrue(vuln['id'].startswith('PYSEC'))
last = self.client.get_last(source='pysec', number=1)
self.assertEqual(len(last), 1)
self.assertTrue(last[-1]['id'].startswith('PYSEC'))
# TODO: POST Vulnerability / Delete vulnerability
# Test API
def test_get_vendors(self) -> None:
vendors = self.client.get_vendors()
self.assertTrue(isinstance(vendors, list))
def test_get_vendor_products(self) -> None:
products = self.client.get_vendor_products('misp')
self.assertTrue(isinstance(products, list))
self.assertTrue('misp' in products)
def test_get_vendor_product_vulnerabilities(self) -> None:
vulns = self.client.get_vendor_product_vulnerabilities('misp', 'misp')
self.assertTrue(isinstance(vulns, dict))
self.assertTrue('cvelistv5' in vulns)
# Test comments
def test_get_comments(self) -> None:
comments = self.client.get_comments()
self.assertTrue('metadata' in comments)
self.assertTrue('data' in comments)
self.assertTrue(len(comments['data']) > 0)
comments = self.client.get_comments(uuid='a309d024-2714-4a81-a425-60f83f6d5740')
self.assertTrue(len(comments['data']) == 1)
self.assertEqual(comments['data'][0]['uuid'], 'a309d024-2714-4a81-a425-60f83f6d5740')
comments = self.client.get_comments(vuln_id='CVE-2024-20401')
self.assertTrue(len(comments['data']) >= 1)
for comment in comments['data']:
self.assertEqual(comment['vulnerability'], 'CVE-2024-20401')
comments = self.client.get_comments(author='admin')
self.assertTrue(len(comments['data']) >= 1)
for comment in comments['data']:
self.assertEqual(comment['author']['login'], 'admin')
comments = self.client.get_comments(uuid='a309d024-2714-4a81-a425-60f83f6d5740',
vuln_id='CVE-2024-20401',
author='admin')
self.assertTrue(len(comments['data']) == 1)
self.assertEqual(comments['data'][0]['uuid'], 'a309d024-2714-4a81-a425-60f83f6d5740')
self.assertEqual(comments['data'][0]['vulnerability'], 'CVE-2024-20401')
self.assertEqual(comments['data'][0]['author']['login'], 'admin')
# TODO: POST / Delete Comment
# TODO: POST / Get user
# Test bundles
def test_get_bundles(self) -> None:
bundles = self.client.get_bundles()
self.assertTrue('metadata' in bundles)
self.assertTrue('data' in bundles)
self.assertTrue(len(bundles['data']) > 0)
bundles = self.client.get_bundles(uuid='a23cbcad-e890-4df8-8736-9332ed4c3d47')
self.assertTrue(len(bundles['data']) == 1)
self.assertEqual(bundles['data'][0]['uuid'], 'a23cbcad-e890-4df8-8736-9332ed4c3d47')
bundles = self.client.get_bundles(vuln_id='CVE-2024-39573')
self.assertTrue(len(bundles['data']) >= 1)
for bundle in bundles['data']:
self.assertTrue('CVE-2024-39573' in bundle['related_vulnerabilities'])
bundles = self.client.get_bundles(author='admin')
self.assertTrue(len(bundles['data']) >= 1)
for bundle in bundles['data']:
self.assertEqual(bundle['author']['login'], 'admin')
bundles = self.client.get_bundles(uuid='a23cbcad-e890-4df8-8736-9332ed4c3d47',
vuln_id='CVE-2024-39573',
author='admin')
self.assertTrue(len(bundles['data']) == 1)
self.assertEqual(bundles['data'][0]['uuid'], 'a23cbcad-e890-4df8-8736-9332ed4c3d47')
self.assertTrue('CVE-2024-39573' in bundles['data'][0]['related_vulnerabilities'])
self.assertEqual(bundles['data'][0]['author']['login'], 'admin')