new: List users, add/delete vulns. Many more tests.

This commit is contained in:
Raphaël Vinot 2024-07-30 17:20:54 +02:00
parent 7a93e8ad8c
commit fc871a83fd
2 changed files with 130 additions and 10 deletions

View file

@ -2,14 +2,26 @@
from __future__ import annotations from __future__ import annotations
import logging
from importlib.metadata import version from importlib.metadata import version
from pathlib import PurePosixPath from pathlib import PurePosixPath
from typing import Any, Dict from typing import Any
from urllib.parse import urljoin, urlparse from urllib.parse import urljoin, urlparse
import requests import requests
def enable_full_debug() -> None:
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
class PyVulnerabilityLookup(): class PyVulnerabilityLookup():
def __init__(self, root_url: str, useragent: str | None=None, token: str | None=None, def __init__(self, root_url: str, useragent: str | None=None, token: str | None=None,
@ -27,7 +39,7 @@ class PyVulnerabilityLookup():
if not self.root_url.endswith('/'): if not self.root_url.endswith('/'):
self.root_url += '/' self.root_url += '/'
self.session = requests.session() self.session = requests.session()
self.session.headers['user-agent'] = useragent if useragent else f'PyProject / {version("pyvulnerabilitylookup")}' self.session.headers['user-agent'] = useragent if useragent else f'PyVulnerabilityLookup / {version("pyvulnerabilitylookup")}'
self.session.headers['X-API-KEY'] = token if token else '' self.session.headers['X-API-KEY'] = token if token else ''
self.session.headers['Accept'] = 'application/json' self.session.headers['Accept'] = 'application/json'
self.session.headers['Content-Type'] = 'application/json' self.session.headers['Content-Type'] = 'application/json'
@ -60,6 +72,23 @@ class PyVulnerabilityLookup():
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id)))) r = self.session.get(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id))))
return r.json() return r.json()
def create_vulnerability(self, vulnerability: dict[str, Any]) -> dict[str, Any]:
'''Create a vulnerability.
:param vulnerability: The vulnerability
'''
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('vulnerability'))),
json=vulnerability)
return r.json()
def delete_vulnerability(self, vulnerability_id: str) -> int:
'''Delete a vulnerability.
:param vulnerability_id: The vulnerability ID
'''
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('vulnerability', vulnerability_id))))
return r.status_code
def get_info(self) -> dict[str, Any]: def get_info(self) -> dict[str, Any]:
'''Get more information about the current databases in use and when it was updated''' '''Get more information about the current databases in use and when it was updated'''
r = self.session.get(urljoin(self.root_url, 'info')) r = self.session.get(urljoin(self.root_url, 'info'))
@ -177,3 +206,8 @@ class PyVulnerabilityLookup():
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))), r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))),
json={'login': login, 'name': name, 'organisation': organisation, 'email': email}) json={'login': login, 'name': name, 'organisation': organisation, 'email': email})
return r.json() return r.json()
def list_users(self) -> dict[str, Any]:
'''List users'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user'))))
return r.json()

View file

@ -6,14 +6,20 @@ import os
from pyvulnerabilitylookup import PyVulnerabilityLookup from pyvulnerabilitylookup import PyVulnerabilityLookup
# NOTE:
# * to run the tests with a pre-configured admin key:
# API_KEY="<APIKEY>" pytest tests/test_web.py
# * to run the test against a local instance (fallback to public):
# INSTANCE_URL="http://0.0.0.0:10001" pytest tests/test_web.py
class TestPublic(unittest.TestCase): class TestPublic(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
token = os.getenv("API_KEY", "") self.admin_token = os.getenv("API_KEY", '').strip()
self.client = PyVulnerabilityLookup(root_url="https://vulnerability.circl.lu", token=token) instance_url = os.getenv("INSTANCE_URL", 'https://vulnerability.circl.lu').strip()
self.public_test = instance_url == 'https://vulnerability.circl.lu'
# Test default self.client = PyVulnerabilityLookup(root_url=instance_url, token=self.admin_token)
def test_up(self) -> None: def test_up(self) -> None:
self.assertTrue(self.client.is_up) self.assertTrue(self.client.is_up)
@ -54,18 +60,24 @@ class TestPublic(unittest.TestCase):
self.assertTrue(isinstance(vendors, list)) self.assertTrue(isinstance(vendors, list))
def test_get_vendor_products(self) -> None: def test_get_vendor_products(self) -> None:
if not self.public_test:
return None
products = self.client.get_vendor_products('misp') products = self.client.get_vendor_products('misp')
self.assertTrue(isinstance(products, list)) self.assertTrue(isinstance(products, list))
self.assertTrue('misp' in products) self.assertTrue('misp' in products)
def test_get_vendor_product_vulnerabilities(self) -> None: def test_get_vendor_product_vulnerabilities(self) -> None:
if not self.public_test:
return None
vulns = self.client.get_vendor_product_vulnerabilities('misp', 'misp') vulns = self.client.get_vendor_product_vulnerabilities('misp', 'misp')
self.assertTrue(isinstance(vulns, dict)) self.assertTrue(isinstance(vulns, dict))
self.assertTrue('cvelistv5' in vulns) self.assertTrue('cvelistv5' in vulns)
# Test comments # Test comments
def test_get_comments(self) -> None: def test_get_comments_public(self) -> None:
if not self.public_test:
return None
comments = self.client.get_comments() comments = self.client.get_comments()
self.assertTrue('metadata' in comments) self.assertTrue('metadata' in comments)
self.assertTrue('data' in comments) self.assertTrue('data' in comments)
@ -93,12 +105,27 @@ class TestPublic(unittest.TestCase):
self.assertEqual(comments['data'][0]['vulnerability'], 'CVE-2024-20401') self.assertEqual(comments['data'][0]['vulnerability'], 'CVE-2024-20401')
self.assertEqual(comments['data'][0]['author']['login'], 'admin') self.assertEqual(comments['data'][0]['author']['login'], 'admin')
# TODO: POST / Delete Comment def test_comments_local(self) -> None:
# TODO: POST / Get user if not self.admin_token:
# this test is only working if the admin token is set
return None
# Makes sure the userkey is set to the right one
self.client.set_apikey(self.admin_token)
comment = {'title': 'test', 'description': 'test',
'vulnerability': 'CVE-2024-20401',
'related_vulnerabilities': ['CVE-2024-20402']}
created_comment = self.client.create_comment(comment)
new_comment_uuid = created_comment['data'][0]['uuid']
comments = self.client.get_comments(uuid=new_comment_uuid)
self.assertTrue(len(comments['data']) == 1)
deleted_comment = self.client.delete_comment(new_comment_uuid)
self.assertTrue(deleted_comment < 300)
# Test bundles # Test bundles
def test_get_bundles(self) -> None: def test_get_bundles_public(self) -> None:
if not self.public_test:
return None
bundles = self.client.get_bundles() bundles = self.client.get_bundles()
self.assertTrue('metadata' in bundles) self.assertTrue('metadata' in bundles)
self.assertTrue('data' in bundles) self.assertTrue('data' in bundles)
@ -125,3 +152,62 @@ class TestPublic(unittest.TestCase):
self.assertEqual(bundles['data'][0]['uuid'], 'a23cbcad-e890-4df8-8736-9332ed4c3d47') self.assertEqual(bundles['data'][0]['uuid'], 'a23cbcad-e890-4df8-8736-9332ed4c3d47')
self.assertTrue('CVE-2024-39573' in bundles['data'][0]['related_vulnerabilities']) self.assertTrue('CVE-2024-39573' in bundles['data'][0]['related_vulnerabilities'])
self.assertEqual(bundles['data'][0]['author']['login'], 'admin') self.assertEqual(bundles['data'][0]['author']['login'], 'admin')
def test_bundles_local(self) -> None:
if not self.admin_token:
# this test is only working if the admin token is set
return None
# Makes sure the userkey is set to the right one
self.client.set_apikey(self.admin_token)
bundle = {'name': 'test', 'description': 'test',
'related_vulnerabilities': ['CVE-2024-20402', 'CVE-2024-20403']}
created_bundle = self.client.create_bundle(bundle)
new_bundle_uuid = created_bundle['data'][0]['uuid']
bundles = self.client.get_bundles(uuid=new_bundle_uuid)
self.assertTrue(len(bundles['data']) == 1)
deleted_bundle = self.client.delete_bundle(new_bundle_uuid)
self.assertTrue(deleted_bundle < 300)
# Test User
def test_list_users(self) -> None:
if not self.admin_token:
# this test is only working if the admin token is set
return None
# Makes sure the userkey is set to the right one
self.client.set_apikey(self.admin_token)
users = self.client.list_users()
self.assertTrue(isinstance(users, dict))
self.assertTrue(len(users['data']) > 0)
got_admin = False
for user in users['data']:
self.assertTrue('login' in user)
self.assertTrue('apikey' in user)
if user['apikey'] == self.admin_token:
self.assertTrue(user['is_admin'])
got_admin = True
self.assertTrue(got_admin)
def test_create_user_comment(self) -> None:
if self.public_test:
# Do not run that test against the public instance, it would create users.
return None
instance_config = self.client.get_config_info()
if not instance_config.get('registration'):
return None
user = self.client.create_user('test Name', 'test Login', 'test Organization', 'test@testorg.lu')
self.assertTrue(user)
self.assertTrue('login' in user, user)
self.assertTrue('apikey' in user, user)
self.assertTrue('is_commenter' in user, user)
self.assertTrue(user['is_commenter'])
self.client.set_apikey(user['apikey'])
comment = {'title': 'test', 'description': 'test',
'vulnerability': 'CVE-2024-20401',
'related_vulnerabilities': ['CVE-2024-20402']}
created_comment = self.client.create_comment(comment)
new_comment_uuid = created_comment['data'][0]['uuid']
comments = self.client.get_comments(uuid=new_comment_uuid)
self.assertTrue(len(comments['data']) == 0, comments)
deleted_comment = self.client.delete_comment(new_comment_uuid)
self.assertTrue(deleted_comment < 300)