2023-05-19 13:21:11 +00:00
|
|
|
|
#!/usr/bin/env python3
|
2024-01-17 11:41:24 +00:00
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
2023-05-19 13:21:11 +00:00
|
|
|
|
|
2024-07-30 15:20:54 +00:00
|
|
|
|
import logging
|
|
|
|
|
|
2024-11-18 18:26:52 +00:00
|
|
|
|
from datetime import date, datetime
|
2023-05-19 13:21:11 +00:00
|
|
|
|
from importlib.metadata import version
|
2024-07-23 13:01:35 +00:00
|
|
|
|
from pathlib import PurePosixPath
|
2024-07-30 15:20:54 +00:00
|
|
|
|
from typing import Any
|
2023-05-19 13:21:11 +00:00
|
|
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
|
|
|
|
|
|
import requests
|
|
|
|
|
|
|
|
|
|
|
2024-07-30 15:20:54 +00:00
|
|
|
|
def enable_full_debug() -> None:
|
|
|
|
|
import http.client as http_client
|
|
|
|
|
http_client.HTTPConnection.debuglevel = 1
|
|
|
|
|
logging.basicConfig()
|
|
|
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
|
requests_log = logging.getLogger("requests.packages.urllib3")
|
|
|
|
|
requests_log.setLevel(logging.DEBUG)
|
|
|
|
|
requests_log.propagate = True
|
|
|
|
|
|
|
|
|
|
|
2023-05-19 13:21:11 +00:00
|
|
|
|
class PyVulnerabilityLookup():
|
|
|
|
|
|
2024-11-18 18:26:52 +00:00
|
|
|
|
def __init__(self, root_url: str='https://vulnerability.circl.lu', useragent: str | None=None, token: str | None=None,
|
2024-01-17 11:41:24 +00:00
|
|
|
|
*, proxies: dict[str, str] | None=None) -> None:
|
2023-05-19 13:21:11 +00:00
|
|
|
|
'''Query a specific instance.
|
|
|
|
|
|
|
|
|
|
:param root_url: URL of the instance to query.
|
2023-11-21 16:23:16 +00:00
|
|
|
|
:param useragent: The User Agent used by requests to run the HTTP requests against the vulnerability lookup instance
|
|
|
|
|
:param proxies: The proxies to use to connect to the vulnerability lookup instance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
|
2023-05-19 13:21:11 +00:00
|
|
|
|
'''
|
|
|
|
|
self.root_url = root_url
|
|
|
|
|
|
|
|
|
|
if not urlparse(self.root_url).scheme:
|
|
|
|
|
self.root_url = 'http://' + self.root_url
|
|
|
|
|
if not self.root_url.endswith('/'):
|
|
|
|
|
self.root_url += '/'
|
|
|
|
|
self.session = requests.session()
|
2024-07-30 15:20:54 +00:00
|
|
|
|
self.session.headers['user-agent'] = useragent if useragent else f'PyVulnerabilityLookup / {version("pyvulnerabilitylookup")}'
|
2024-07-25 08:44:22 +00:00
|
|
|
|
self.session.headers['X-API-KEY'] = token if token else ''
|
|
|
|
|
self.session.headers['Accept'] = 'application/json'
|
|
|
|
|
self.session.headers['Content-Type'] = 'application/json'
|
2023-11-21 16:23:16 +00:00
|
|
|
|
if proxies:
|
|
|
|
|
self.session.proxies.update(proxies)
|
2023-05-19 13:21:11 +00:00
|
|
|
|
|
2024-07-29 16:36:29 +00:00
|
|
|
|
def set_apikey(self, apikey: str) -> None:
|
|
|
|
|
'''Set the API key to use for the requests'''
|
|
|
|
|
self.session.headers['X-API-KEY'] = apikey
|
|
|
|
|
|
2023-05-19 13:21:11 +00:00
|
|
|
|
@property
|
|
|
|
|
def is_up(self) -> bool:
|
|
|
|
|
'''Test if the given instance is accessible'''
|
|
|
|
|
try:
|
|
|
|
|
r = self.session.head(self.root_url)
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
|
|
return False
|
|
|
|
|
return r.status_code == 200
|
|
|
|
|
|
2024-01-17 11:41:24 +00:00
|
|
|
|
def redis_up(self) -> bool:
|
2023-05-19 13:21:11 +00:00
|
|
|
|
'''Check if redis is up and running'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'system','redis_up'))))
|
2023-05-19 13:21:11 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
2024-11-19 12:15:36 +00:00
|
|
|
|
# #### DB status ####
|
|
|
|
|
|
|
|
|
|
def get_info(self) -> dict[str, Any]:
|
|
|
|
|
'''Get more information about the current databases in use and when it was updated'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'system','dbInfo'))))
|
2024-11-19 12:15:36 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def get_config_info(self) -> dict[str, Any]:
|
|
|
|
|
'''Get more information about the current databases in use and when it was updated'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'system', 'configInfo'))))
|
2024-11-19 12:15:36 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
# #### Vulnerabilities ####
|
|
|
|
|
|
2024-01-17 11:41:24 +00:00
|
|
|
|
def get_vulnerability(self, vulnerability_id: str) -> dict[str, Any]:
|
2024-07-23 13:01:35 +00:00
|
|
|
|
'''Get a vulnerability
|
|
|
|
|
|
|
|
|
|
:param vulnerability_id: The ID of the vulnerability to get (can be from any source, as long as it is a valid ID)
|
|
|
|
|
'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', vulnerability_id))))
|
2024-07-23 13:01:35 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
2024-07-30 15:20:54 +00:00
|
|
|
|
def create_vulnerability(self, vulnerability: dict[str, Any]) -> dict[str, Any]:
|
|
|
|
|
'''Create a vulnerability.
|
|
|
|
|
|
|
|
|
|
:param vulnerability: The vulnerability
|
|
|
|
|
'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability'))),
|
2024-07-30 15:20:54 +00:00
|
|
|
|
json=vulnerability)
|
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def delete_vulnerability(self, vulnerability_id: str) -> int:
|
|
|
|
|
'''Delete a vulnerability.
|
|
|
|
|
|
|
|
|
|
:param vulnerability_id: The vulnerability ID
|
|
|
|
|
'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', vulnerability_id))))
|
2024-07-30 15:20:54 +00:00
|
|
|
|
return r.status_code
|
|
|
|
|
|
2024-07-23 13:06:19 +00:00
|
|
|
|
def get_last(self, number: int | None=None, source: str | None = None) -> list[dict[str, Any]]:
|
2024-07-23 13:01:35 +00:00
|
|
|
|
'''Get the last vulnerabilities
|
|
|
|
|
|
|
|
|
|
:param number: The number of vulnerabilities to get
|
|
|
|
|
:param source: The source of the vulnerabilities
|
|
|
|
|
'''
|
|
|
|
|
path = PurePosixPath('last')
|
|
|
|
|
if source:
|
|
|
|
|
path /= source
|
|
|
|
|
if number is not None:
|
|
|
|
|
path /= str(number)
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', 'last'))))
|
2024-07-23 13:01:35 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def get_vendors(self) -> list[str]:
|
2024-11-19 12:15:36 +00:00
|
|
|
|
'''Get the known vendors'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', 'browse'))))
|
2024-07-23 13:01:35 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def get_vendor_products(self, vendor: str) -> list[str]:
|
|
|
|
|
'''Get the known products for a vendor
|
|
|
|
|
|
|
|
|
|
:params vendor: A vendor owning products (must be in the known vendor list)
|
|
|
|
|
'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', 'browse', vendor))))
|
2024-07-23 13:01:35 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def get_vendor_product_vulnerabilities(self, vendor: str, product: str) -> list[str]:
|
|
|
|
|
'''Get the the vulnerabilities per vendor and a specific product
|
|
|
|
|
|
|
|
|
|
:param vendor: A vendor owning products (must be in the known vendor list)
|
|
|
|
|
:param product: A product owned by that vendor
|
|
|
|
|
'''
|
2024-11-25 10:11:26 +00:00
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulnerability', 'browse', vendor, product))))
|
2024-07-23 13:01:35 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
2024-11-19 12:15:36 +00:00
|
|
|
|
# #### Comments ####
|
2024-07-23 13:01:35 +00:00
|
|
|
|
|
2024-11-19 12:15:36 +00:00
|
|
|
|
def create_comment(self, /, *, comment: dict[str, Any] | None=None, description: str | None=None,
|
|
|
|
|
description_format: str | None = None, meta: dict[str, str] | None = None,
|
|
|
|
|
related_vulnerabilities: list[str] | None=None, title: str | None=None,
|
|
|
|
|
uuid: str | None=None, vulnerability: str | None = None) -> dict[str, Any]:
|
2024-07-25 08:44:22 +00:00
|
|
|
|
'''Create a comment.
|
|
|
|
|
|
|
|
|
|
:param comment: The comment
|
2024-11-19 12:15:36 +00:00
|
|
|
|
:param description: The description of the comment
|
|
|
|
|
:param description_format: Description format (markdown or text).
|
|
|
|
|
:param meta: Zero or more meta-fields.
|
|
|
|
|
:param related_vulnerabilities: Zero or more related vulnerabilities.
|
|
|
|
|
:param title: The title of the comment
|
|
|
|
|
:param uuid: The UUID of the comment
|
|
|
|
|
:param vulnerability: The vulnerability ID of the comment
|
2024-07-25 08:44:22 +00:00
|
|
|
|
'''
|
2024-11-19 12:15:36 +00:00
|
|
|
|
|
|
|
|
|
if not comment:
|
|
|
|
|
comment = {}
|
|
|
|
|
if description:
|
|
|
|
|
comment['description'] = description
|
|
|
|
|
if description_format:
|
|
|
|
|
comment['description_format'] = description_format
|
|
|
|
|
if meta:
|
|
|
|
|
comment['meta'] = meta
|
|
|
|
|
if related_vulnerabilities:
|
|
|
|
|
comment['related_vulnerabilities'] = related_vulnerabilities
|
|
|
|
|
if title:
|
|
|
|
|
comment['title'] = title
|
|
|
|
|
if uuid:
|
|
|
|
|
comment['uuid'] = uuid
|
|
|
|
|
if vulnerability:
|
|
|
|
|
comment['vulnerability'] = vulnerability
|
|
|
|
|
|
|
|
|
|
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))), json=comment)
|
2024-07-25 08:44:22 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
2024-07-23 13:01:35 +00:00
|
|
|
|
def get_comments(self, uuid: str | None = None, vuln_id: str | None = None,
|
|
|
|
|
author: str | None = None) -> dict[str, Any]:
|
|
|
|
|
'''Get comment(s)
|
|
|
|
|
|
2024-07-25 08:44:22 +00:00
|
|
|
|
:param uuid: The UUID of a specific comment
|
2024-07-23 13:01:35 +00:00
|
|
|
|
:param vuln_id: The vulnerability ID to get comments of
|
|
|
|
|
:param author: The author of the comment(s)
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment'))),
|
|
|
|
|
params={'uuid': uuid, 'vuln_id': vuln_id, 'author': author})
|
|
|
|
|
return r.json()
|
|
|
|
|
|
2024-11-19 12:15:36 +00:00
|
|
|
|
def get_comment(self, comment_uuid: str) -> dict[str, Any]:
|
|
|
|
|
'''Get a comment
|
|
|
|
|
|
|
|
|
|
:param comment_uuid: The UUID of the comment
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid)))
|
|
|
|
|
)
|
|
|
|
|
return r.json()
|
|
|
|
|
|
2024-07-25 08:44:22 +00:00
|
|
|
|
def delete_comment(self, comment_uuid: str) -> int:
|
|
|
|
|
'''Delete a comment.
|
|
|
|
|
|
|
|
|
|
:param comment_uuid: The comment UUID
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'comment', comment_uuid))))
|
|
|
|
|
return r.status_code
|
|
|
|
|
|
2024-11-19 12:15:36 +00:00
|
|
|
|
# #### Bundles ####
|
|
|
|
|
|
|
|
|
|
def create_bundle(self, /, *, bundle: dict[str, Any] | None=None, description: str | None=None,
|
|
|
|
|
meta: dict[str, str] | None=None, name: str | None=None, related_vulnerabilities: list[str] | None=None,
|
|
|
|
|
uuid: str | None=None) -> dict[str, Any]:
|
2024-07-25 08:44:22 +00:00
|
|
|
|
'''Create a bundle.
|
|
|
|
|
|
|
|
|
|
:param bundle: The bundle
|
|
|
|
|
'''
|
2024-11-19 12:15:36 +00:00
|
|
|
|
|
|
|
|
|
if not bundle:
|
|
|
|
|
bundle = {}
|
|
|
|
|
if description:
|
|
|
|
|
bundle['description'] = description
|
|
|
|
|
if meta:
|
|
|
|
|
bundle['meta'] = meta
|
|
|
|
|
if name:
|
|
|
|
|
bundle['name'] = name
|
|
|
|
|
if related_vulnerabilities:
|
|
|
|
|
bundle['related_vulnerabilities'] = related_vulnerabilities
|
|
|
|
|
if uuid:
|
|
|
|
|
bundle['uuid'] = uuid
|
|
|
|
|
|
2024-07-25 08:44:22 +00:00
|
|
|
|
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))),
|
2024-07-29 16:36:29 +00:00
|
|
|
|
json=bundle)
|
2024-07-25 08:44:22 +00:00
|
|
|
|
return r.json()
|
|
|
|
|
|
2024-07-23 13:01:35 +00:00
|
|
|
|
def get_bundles(self, uuid: str | None = None, vuln_id: str | None = None,
|
2024-11-19 12:15:36 +00:00
|
|
|
|
author: str | None = None, per_page: int | None=None,
|
|
|
|
|
meta: list[dict[str, str]] | None=None) -> dict[str, Any]:
|
2024-07-23 13:01:35 +00:00
|
|
|
|
'''Get bundle(s)
|
|
|
|
|
|
|
|
|
|
:param uuid: The UUID a specific bundle
|
|
|
|
|
:param vuln_id: The vulnerability ID to get bundles of
|
|
|
|
|
:param author: The author of the bundle(s)
|
2024-11-19 12:15:36 +00:00
|
|
|
|
:param per_page: The number of bundles to get per page
|
|
|
|
|
:param meta: Query for the meta JSON field. Example: meta=[{‘tags’: [‘tcp’]}]
|
2024-07-23 13:01:35 +00:00
|
|
|
|
'''
|
2024-11-19 12:15:36 +00:00
|
|
|
|
params: dict[str, Any] = {}
|
|
|
|
|
if uuid:
|
|
|
|
|
params['uuid'] = uuid
|
|
|
|
|
if vuln_id:
|
|
|
|
|
params['vuln_id'] = vuln_id
|
|
|
|
|
if author:
|
|
|
|
|
params['author'] = author
|
|
|
|
|
if per_page is not None:
|
|
|
|
|
params['per_page'] = per_page
|
|
|
|
|
if meta:
|
|
|
|
|
params['meta'] = meta
|
|
|
|
|
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle'))), params=params)
|
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def get_bundle(self, bundle_uuid: str) -> dict[str, Any]:
|
|
|
|
|
'''Get a bundle
|
|
|
|
|
|
|
|
|
|
:param bundle_uuid: The UUID of the bundle
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid))))
|
2023-05-19 13:21:11 +00:00
|
|
|
|
return r.json()
|
2024-07-25 08:44:22 +00:00
|
|
|
|
|
|
|
|
|
def delete_bundle(self, bundle_uuid: str) -> int:
|
|
|
|
|
'''Delete a bundle.
|
|
|
|
|
|
|
|
|
|
:param bundle_uuid: The bundle UUID
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'bundle', bundle_uuid))))
|
|
|
|
|
return r.status_code
|
2024-07-29 16:36:29 +00:00
|
|
|
|
|
2024-11-19 12:15:36 +00:00
|
|
|
|
# #### Users ####
|
|
|
|
|
|
|
|
|
|
def create_user(self, /, *, user: dict[str, Any] | None=None,
|
|
|
|
|
login: str | None=None, name: str | None=None,
|
|
|
|
|
organisation: str | None=None, email: str | None=None) -> dict[str, Any]:
|
2024-07-29 16:36:29 +00:00
|
|
|
|
'''Create a user.
|
|
|
|
|
|
|
|
|
|
:param login: The login of the user
|
|
|
|
|
:param name: The name of the user
|
|
|
|
|
:param organisation: The organisation of the user
|
|
|
|
|
:param email: The email of the user
|
|
|
|
|
'''
|
2024-11-19 12:15:36 +00:00
|
|
|
|
|
|
|
|
|
if not user:
|
|
|
|
|
user = {}
|
|
|
|
|
if login:
|
|
|
|
|
user['login'] = login
|
|
|
|
|
if name:
|
|
|
|
|
user['name'] = name
|
|
|
|
|
if organisation:
|
|
|
|
|
user['organisation'] = organisation
|
|
|
|
|
if email:
|
|
|
|
|
user['email'] = email
|
|
|
|
|
|
|
|
|
|
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'user'))), json=user)
|
2024-07-29 16:36:29 +00:00
|
|
|
|
return r.json()
|
2024-07-30 15:20:54 +00:00
|
|
|
|
|
|
|
|
|
def list_users(self) -> dict[str, Any]:
|
2024-11-19 12:15:36 +00:00
|
|
|
|
# Alias this one to get_users for consistency
|
|
|
|
|
return self.get_users()
|
|
|
|
|
|
|
|
|
|
def get_users(self) -> dict[str, Any]:
|
2024-07-30 15:20:54 +00:00
|
|
|
|
'''List users'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user'))))
|
|
|
|
|
return r.json()
|
2024-07-31 14:29:34 +00:00
|
|
|
|
|
|
|
|
|
def get_user_information(self) -> dict[str, Any]:
|
|
|
|
|
'''Get user information'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user', 'me'))))
|
|
|
|
|
return r.json()
|
2024-11-18 18:26:52 +00:00
|
|
|
|
|
2024-11-19 12:15:36 +00:00
|
|
|
|
def reset_api_key(self) -> dict[str, Any]:
|
|
|
|
|
'''Reset the API key'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'user', 'api_key'))))
|
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def delete_user(self, user_id: str) -> int:
|
|
|
|
|
'''Delete a user.
|
|
|
|
|
|
|
|
|
|
:param user_id: The user ID
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.delete(urljoin(self.root_url, str(PurePosixPath('api', 'user', user_id)))
|
|
|
|
|
)
|
|
|
|
|
return r.status_code
|
|
|
|
|
|
2024-11-18 18:26:52 +00:00
|
|
|
|
# #### Sightings ####
|
|
|
|
|
|
|
|
|
|
def get_sighting(self, sighting_uuid: str) -> dict[str, Any]:
|
|
|
|
|
'''Get a sighting
|
|
|
|
|
|
|
|
|
|
:param sighting_uuid: The UUID of the sighting
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting', sighting_uuid))))
|
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def get_sightings(self, /, *, sighting_uuid: str | None=None,
|
|
|
|
|
sighting_type: str | None=None, vuln_id: str | None = None,
|
|
|
|
|
author: str | None = None,
|
|
|
|
|
date_from: date | datetime | None=None,
|
|
|
|
|
date_to: date | datetime | None=None) -> dict[str, Any]:
|
|
|
|
|
'''Get sightings
|
|
|
|
|
|
|
|
|
|
:param sighting_uuid: The UUID of a specific sighting
|
|
|
|
|
:param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'.
|
|
|
|
|
:param vuln_id: The vulnerability ID to get sightings of
|
|
|
|
|
:param author: The author of the sighting(s)
|
|
|
|
|
:param date_from: The date from which to get sightings
|
|
|
|
|
:param date_to: The date to which to get sightings
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
params = {}
|
|
|
|
|
if sighting_uuid:
|
|
|
|
|
params['uuid'] = sighting_uuid
|
|
|
|
|
if sighting_type:
|
|
|
|
|
params['type'] = sighting_type
|
|
|
|
|
if vuln_id:
|
|
|
|
|
params['vuln_id'] = vuln_id
|
|
|
|
|
if author:
|
|
|
|
|
params['author'] = author
|
|
|
|
|
if date_from:
|
|
|
|
|
if isinstance(date_from, datetime):
|
|
|
|
|
date_from = date_from.date()
|
|
|
|
|
params['date_from'] = date_from.isoformat()
|
|
|
|
|
if date_to:
|
|
|
|
|
if isinstance(date_to, datetime):
|
|
|
|
|
date_to = date_to.date()
|
|
|
|
|
params['date_to'] = date_to.isoformat()
|
|
|
|
|
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))), params=params)
|
|
|
|
|
return r.json()
|
|
|
|
|
|
|
|
|
|
def create_sighting(self, /, *, sighting: dict[str, Any] | None=None,
|
|
|
|
|
creation_timestamp: datetime | None=None,
|
|
|
|
|
source: str | None = None,
|
|
|
|
|
sighting_type: str | None=None,
|
|
|
|
|
vulnerability: str | None=None) -> dict[str, Any]:
|
|
|
|
|
'''Create a sighting.
|
|
|
|
|
|
|
|
|
|
:param sighting: The sighting, as an object.
|
|
|
|
|
:param creation_timestamp: The timestamp of the sighting - set to now if not provided
|
|
|
|
|
:param source: The source of the sighting
|
|
|
|
|
:param sighting_type: The type of sighting, can be one of: 'seen', 'exploided', 'not-exploited', 'confirmed', 'not-confirmed', 'patched', 'not-patched'.
|
|
|
|
|
:param vulnerability: The vulnerability ID of the sighting
|
|
|
|
|
'''
|
|
|
|
|
if not sighting:
|
|
|
|
|
sighting = {}
|
|
|
|
|
if creation_timestamp:
|
|
|
|
|
# This calue may or may not have a TZ at this point
|
|
|
|
|
sighting['creation_timestamp'] = creation_timestamp
|
|
|
|
|
if source:
|
|
|
|
|
sighting['source'] = source
|
|
|
|
|
if sighting_type:
|
|
|
|
|
sighting['type'] = sighting_type
|
|
|
|
|
if vulnerability:
|
|
|
|
|
sighting['vulnerability'] = vulnerability
|
|
|
|
|
|
|
|
|
|
if 'creation_timestamp' in sighting:
|
|
|
|
|
# check if the datetime object has a TZ, if it doesn't, set it to localtime, make it a string
|
|
|
|
|
if sighting['creation_timestamp'].tzinfo is None:
|
|
|
|
|
sighting['creation_timestamp'] = sighting['creation_timestamp'].astimezone()
|
|
|
|
|
sighting['creation_timestamp'] = sighting['creation_timestamp'].isoformat()
|
|
|
|
|
|
|
|
|
|
r = self.session.post(urljoin(self.root_url, str(PurePosixPath('api', 'sighting'))),
|
|
|
|
|
json=sighting)
|
|
|
|
|
return r.json()
|
2024-11-19 12:15:36 +00:00
|
|
|
|
|
|
|
|
|
# #### EPSS ####
|
|
|
|
|
|
|
|
|
|
def get_epss(self, vulnerability: str) -> dict[str, Any]:
|
|
|
|
|
'''Get the EPSS for a vulnerability
|
|
|
|
|
|
|
|
|
|
:param vulnerability: The vulnerability ID
|
|
|
|
|
'''
|
|
|
|
|
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'epss', vulnerability))))
|
|
|
|
|
return r.json()
|