diff --git a/pyvulnerabilitylookup/api.py b/pyvulnerabilitylookup/api.py index 7313784..d7b7a43 100644 --- a/pyvulnerabilitylookup/api.py +++ b/pyvulnerabilitylookup/api.py @@ -2,14 +2,26 @@ from __future__ import annotations +import logging + from importlib.metadata import version from pathlib import PurePosixPath -from typing import Any, Dict +from typing import Any from urllib.parse import urljoin, urlparse import requests +def enable_full_debug() -> None: + import http.client as http_client + http_client.HTTPConnection.debuglevel = 1 + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) + requests_log = logging.getLogger("requests.packages.urllib3") + requests_log.setLevel(logging.DEBUG) + requests_log.propagate = True + + class PyVulnerabilityLookup(): def __init__(self, root_url: str, useragent: str | None=None, token: str | None=None, @@ -27,7 +39,7 @@ class PyVulnerabilityLookup(): if not self.root_url.endswith('/'): self.root_url += '/' self.session = requests.session() - self.session.headers['user-agent'] = useragent if useragent else f'PyProject / {version("pyvulnerabilitylookup")}' + self.session.headers['user-agent'] = useragent if useragent else f'PyVulnerabilityLookup / {version("pyvulnerabilitylookup")}' self.session.headers['X-API-KEY'] = token if token else '' self.session.headers['Accept'] = 'application/json' self.session.headers['Content-Type'] = 'application/json' @@ -60,6 +72,23 @@ class PyVulnerabilityLookup(): r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id)))) return r.json() + def create_vulnerability(self, vulnerability: dict[str, Any]) -> dict[str, Any]: + '''Create a vulnerability. + + :param vulnerability: The vulnerability + ''' + r = self.session.post(urljoin(self.root_url, str(PurePosixPath('vulnerability'))), + json=vulnerability) + return r.json() + + def delete_vulnerability(self, vulnerability_id: str) -> int: + '''Delete a vulnerability. + + :param vulnerability_id: The vulnerability ID + ''' + r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id)))) + return r.status_code + def get_info(self) -> dict[str, Any]: '''Get more information about the current databases in use and when it was updated''' r = self.session.get(urljoin(self.root_url, 'info')) @@ -177,3 +206,8 @@ class PyVulnerabilityLookup(): r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))), json={'login': login, 'name': name, 'organisation': organisation, 'email': email}) return r.json() + + def list_users(self) -> dict[str, Any]: + '''List users''' + r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user')))) + return r.json() diff --git a/tests/test_web.py b/tests/test_web.py index c41a3eb..12b1c7c 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -6,14 +6,20 @@ import os from pyvulnerabilitylookup import PyVulnerabilityLookup +# NOTE: +# * to run the tests with a pre-configured admin key: +# API_KEY="" pytest tests/test_web.py +# * to run the test against a local instance (fallback to public): +# INSTANCE_URL="http://0.0.0.0:10001" pytest tests/test_web.py + class TestPublic(unittest.TestCase): def setUp(self) -> None: - token = os.getenv("API_KEY", "") - self.client = PyVulnerabilityLookup(root_url="https://vulnerability.circl.lu", token=token) - - # Test default + self.admin_token = os.getenv("API_KEY", '').strip() + instance_url = os.getenv("INSTANCE_URL", 'https://vulnerability.circl.lu').strip() + self.public_test = instance_url == 'https://vulnerability.circl.lu' + self.client = PyVulnerabilityLookup(root_url=instance_url, token=self.admin_token) def test_up(self) -> None: self.assertTrue(self.client.is_up) @@ -54,18 +60,24 @@ class TestPublic(unittest.TestCase): self.assertTrue(isinstance(vendors, list)) def test_get_vendor_products(self) -> None: + if not self.public_test: + return None products = self.client.get_vendor_products('misp') self.assertTrue(isinstance(products, list)) self.assertTrue('misp' in products) def test_get_vendor_product_vulnerabilities(self) -> None: + if not self.public_test: + return None vulns = self.client.get_vendor_product_vulnerabilities('misp', 'misp') self.assertTrue(isinstance(vulns, dict)) self.assertTrue('cvelistv5' in vulns) # Test comments - def test_get_comments(self) -> None: + def test_get_comments_public(self) -> None: + if not self.public_test: + return None comments = self.client.get_comments() self.assertTrue('metadata' in comments) self.assertTrue('data' in comments) @@ -93,12 +105,27 @@ class TestPublic(unittest.TestCase): self.assertEqual(comments['data'][0]['vulnerability'], 'CVE-2024-20401') self.assertEqual(comments['data'][0]['author']['login'], 'admin') - # TODO: POST / Delete Comment - # TODO: POST / Get user + def test_comments_local(self) -> None: + if not self.admin_token: + # this test is only working if the admin token is set + return None + # Makes sure the userkey is set to the right one + self.client.set_apikey(self.admin_token) + comment = {'title': 'test', 'description': 'test', + 'vulnerability': 'CVE-2024-20401', + 'related_vulnerabilities': ['CVE-2024-20402']} + created_comment = self.client.create_comment(comment) + new_comment_uuid = created_comment['data'][0]['uuid'] + comments = self.client.get_comments(uuid=new_comment_uuid) + self.assertTrue(len(comments['data']) == 1) + deleted_comment = self.client.delete_comment(new_comment_uuid) + self.assertTrue(deleted_comment < 300) # Test bundles - def test_get_bundles(self) -> None: + def test_get_bundles_public(self) -> None: + if not self.public_test: + return None bundles = self.client.get_bundles() self.assertTrue('metadata' in bundles) self.assertTrue('data' in bundles) @@ -125,3 +152,62 @@ class TestPublic(unittest.TestCase): self.assertEqual(bundles['data'][0]['uuid'], 'a23cbcad-e890-4df8-8736-9332ed4c3d47') self.assertTrue('CVE-2024-39573' in bundles['data'][0]['related_vulnerabilities']) self.assertEqual(bundles['data'][0]['author']['login'], 'admin') + + def test_bundles_local(self) -> None: + if not self.admin_token: + # this test is only working if the admin token is set + return None + # Makes sure the userkey is set to the right one + self.client.set_apikey(self.admin_token) + bundle = {'name': 'test', 'description': 'test', + 'related_vulnerabilities': ['CVE-2024-20402', 'CVE-2024-20403']} + created_bundle = self.client.create_bundle(bundle) + new_bundle_uuid = created_bundle['data'][0]['uuid'] + bundles = self.client.get_bundles(uuid=new_bundle_uuid) + self.assertTrue(len(bundles['data']) == 1) + deleted_bundle = self.client.delete_bundle(new_bundle_uuid) + self.assertTrue(deleted_bundle < 300) + + # Test User + def test_list_users(self) -> None: + if not self.admin_token: + # this test is only working if the admin token is set + return None + # Makes sure the userkey is set to the right one + self.client.set_apikey(self.admin_token) + users = self.client.list_users() + self.assertTrue(isinstance(users, dict)) + self.assertTrue(len(users['data']) > 0) + got_admin = False + for user in users['data']: + self.assertTrue('login' in user) + self.assertTrue('apikey' in user) + if user['apikey'] == self.admin_token: + self.assertTrue(user['is_admin']) + got_admin = True + self.assertTrue(got_admin) + + def test_create_user_comment(self) -> None: + if self.public_test: + # Do not run that test against the public instance, it would create users. + return None + instance_config = self.client.get_config_info() + if not instance_config.get('registration'): + return None + + user = self.client.create_user('test Name', 'test Login', 'test Organization', 'test@testorg.lu') + self.assertTrue(user) + self.assertTrue('login' in user, user) + self.assertTrue('apikey' in user, user) + self.assertTrue('is_commenter' in user, user) + self.assertTrue(user['is_commenter']) + self.client.set_apikey(user['apikey']) + comment = {'title': 'test', 'description': 'test', + 'vulnerability': 'CVE-2024-20401', + 'related_vulnerabilities': ['CVE-2024-20402']} + created_comment = self.client.create_comment(comment) + new_comment_uuid = created_comment['data'][0]['uuid'] + comments = self.client.get_comments(uuid=new_comment_uuid) + self.assertTrue(len(comments['data']) == 0, comments) + deleted_comment = self.client.delete_comment(new_comment_uuid) + self.assertTrue(deleted_comment < 300)