2023-05-19 13:21:11 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
|
|
from importlib.metadata import version
|
|
|
|
from pathlib import Path
|
|
|
|
from typing import Dict, Optional, Any
|
|
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
|
|
class PyVulnerabilityLookup():
|
|
|
|
|
2023-11-21 16:23:16 +00:00
|
|
|
def __init__(self, root_url: str, useragent: Optional[str]=None,
|
|
|
|
*, proxies: Optional[Dict[str, str]]=None):
|
2023-05-19 13:21:11 +00:00
|
|
|
'''Query a specific instance.
|
|
|
|
|
|
|
|
:param root_url: URL of the instance to query.
|
2023-11-21 16:23:16 +00:00
|
|
|
:param useragent: The User Agent used by requests to run the HTTP requests against the vulnerability lookup instance
|
|
|
|
:param proxies: The proxies to use to connect to the vulnerability lookup instance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
|
2023-05-19 13:21:11 +00:00
|
|
|
'''
|
|
|
|
self.root_url = root_url
|
|
|
|
|
|
|
|
if not urlparse(self.root_url).scheme:
|
|
|
|
self.root_url = 'http://' + self.root_url
|
|
|
|
if not self.root_url.endswith('/'):
|
|
|
|
self.root_url += '/'
|
|
|
|
self.session = requests.session()
|
|
|
|
self.session.headers['user-agent'] = useragent if useragent else f'PyProject / {version("pyvulnerabilitylookup")}'
|
2023-11-21 16:23:16 +00:00
|
|
|
if proxies:
|
|
|
|
self.session.proxies.update(proxies)
|
2023-05-19 13:21:11 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def is_up(self) -> bool:
|
|
|
|
'''Test if the given instance is accessible'''
|
|
|
|
try:
|
|
|
|
r = self.session.head(self.root_url)
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
|
return False
|
|
|
|
return r.status_code == 200
|
|
|
|
|
|
|
|
def redis_up(self) -> Dict:
|
|
|
|
'''Check if redis is up and running'''
|
|
|
|
r = self.session.get(urljoin(self.root_url, 'redis_up'))
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
def get_vulnerability(self, vulnerability_id: str) -> Dict[str, Any]:
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(Path('vulnerability', vulnerability_id))))
|
|
|
|
return r.json()
|