diff --git a/LICENSE b/LICENSE index c512c98..071cf78 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,8 @@ BSD 3-Clause License -Copyright (c) 2020, Lookyloo +Copyright (c) 2024, Raphaël Vinot +Copyright (c) 2024, CVE Search +Copyright (c) 2024, CIRCL - Computer Incident Response Center Luxembourg All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/README.md b/README.md index 367ad8c..8f9aaf1 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,18 @@ -# Python client and module for Project Template +# Python client and module to query the VARIoT IoT vulnerabilities and exploits databases -This is a simple template used in all the web based APIs in this repository, and a few others. +This is a Python client and module to query the [VARIoT IoT vulnerabilities and exploits databases](https://www.variotdbs.pl/api/). ## Installation ```bash -pip install pyproject +pip install pyvariot ``` ## Usage ### Command line -You can use the `client` command to do a thing: +You can use the `variot` command to query the database: ```bash ``` diff --git a/docs/source/api_reference.rst b/docs/source/api_reference.rst index 5a467c9..3549772 100644 --- a/docs/source/api_reference.rst +++ b/docs/source/api_reference.rst @@ -4,11 +4,11 @@ API reference .. toctree:: :maxdepth: 2 -.. automodule:: pyproject +.. automodule:: pyvariot :members: PyProject --------- -.. autoclass:: PyProject +.. autoclass:: PyVARIoT :members: diff --git a/docs/source/conf.py b/docs/source/conf.py index 226aa0b..5d878bd 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -17,9 +17,9 @@ # -- Project information ----------------------------------------------------- -project = 'PyProject' -copyright = '2021, Lookyloo team' -author = 'Lookyloo team' +project = 'PyVARIoT' +copyright = '2021, CVE Search' +author = 'CVE Search team' # The full version, including alpha/beta/rc tags release = 'v0.0.1' diff --git a/docs/source/index.rst b/docs/source/index.rst index 74293ba..bc1d440 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,12 +1,7 @@ -.. PyLookyloo documentation master file, created by - sphinx-quickstart on Tue Mar 23 12:28:17 2021. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +Welcome to PyVARIoT's documentation! +==================================== -Welcome to PyProject's documentation! -============================================= - -This is the client API for `PyProject `_: +This is the client API for `PyVARIoT `_: foo @@ -16,15 +11,15 @@ Installation The package is available on PyPi, so you can install it with:: - pip install pyproject + pip install pyvariot Usage ----- -You can use `client` as a python script:: +You can use `variot` as a python script:: - $ client -h + $ variot -h Or as a library: diff --git a/pyproject.toml b/pyproject.toml index 92b94c6..ce07d83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] -name = "pyproject" +name = "pyvariot" version = "0.0.1" -description = "Python CLI and module for project" +description = "Python CLI and module to query the VARIoT IoT vulnerabilities and exploits databases" authors = ["Raphaël Vinot "] license = "BSD-3-Clause" @@ -24,7 +24,7 @@ classifiers = [ ] [tool.poetry.scripts] -client = 'pyproject:main' +pyvariot = 'pyvariot:main' [tool.poetry.dependencies] python = "^3.10" diff --git a/pyproject/__init__.py b/pyproject/__init__.py deleted file mode 100644 index 9d86ad4..0000000 --- a/pyproject/__init__.py +++ /dev/null @@ -1,26 +0,0 @@ -from __future__ import annotations - -import argparse -import json -import sys - -from .api import PyProject - -__all__ = ['PyProject'] - - -def main() -> None: - parser = argparse.ArgumentParser(description='Query a thing.') - parser.add_argument('--url', type=str, required=True, help='URL of the instance.') - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument('--redis_up', action='store_true', help='Check if redis is up.') - args = parser.parse_args() - - client = PyProject(args.url) - - if not client.is_up: - print(f'Unable to reach {client.root_url}. Is the server up?') - sys.exit(1) - if args.redis_up: - response = client.redis_up() - print(json.dumps(response, indent=2)) diff --git a/pyproject/api.py b/pyproject/api.py deleted file mode 100644 index 92470cd..0000000 --- a/pyproject/api.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python3 - -from __future__ import annotations - -from importlib.metadata import version -from typing import Any -from urllib.parse import urljoin, urlparse - -import requests - - -class PyProject(): - - def __init__(self, root_url: str, useragent: str | None=None, - *, proxies: dict[str, str] | None=None): - '''Query a specific instance. - - :param root_url: URL of the instance to query. - :param useragent: The User Agent used by requests to run the HTTP requests against the instance. - :param proxies: The proxies to use to connect to theinstance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies - ''' - self.root_url = root_url - - if not urlparse(self.root_url).scheme: - self.root_url = 'http://' + self.root_url - if not self.root_url.endswith('/'): - self.root_url += '/' - self.session = requests.session() - self.session.headers['user-agent'] = useragent if useragent else f'PyProject / {version("pyproject")}' - if proxies: - self.session.proxies.update(proxies) - - @property - def is_up(self) -> bool: - '''Test if the given instance is accessible''' - try: - r = self.session.head(self.root_url) - except requests.exceptions.ConnectionError: - return False - return r.status_code == 200 - - def redis_up(self) -> dict[str, Any]: - '''Check if redis is up and running''' - r = self.session.get(urljoin(self.root_url, 'redis_up')) - return r.json() diff --git a/pyvariot/__init__.py b/pyvariot/__init__.py new file mode 100644 index 0000000..843143c --- /dev/null +++ b/pyvariot/__init__.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import argparse +import json +import sys + +from .api import PyVARIoT + +__all__ = ['PyVARIoT'] + + +def main() -> None: + parser = argparse.ArgumentParser(description='Get a vulnerability or an exploit by ID.') + parser.add_argument('--url', type=str, help='URL of the instance.') + parser.add_argument('--apikey', type=str, help='Your personal API key.') + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument('--vulnerability_id', type=str, help='ID of the vulnerability.') + group.add_argument('--exploit_id', type=str, help='ID of the exploit.') + args = parser.parse_args() + + if args.url: + client = PyVARIoT(args.url) + else: + client = PyVARIoT() + + if args.apikey: + client.apikey = args.apikey + + if not client.is_up: + print(f'Unable to reach {client.root_url}. Is the server up?') + sys.exit(1) + + if args.vulnerability_id: + vulnerability = client.get_vulnerability(args.vulnerability_id) + print(json.dumps(vulnerability, indent=2)) + + if args.exploit_id: + exploit = client.get_exploit(args.exploit_id) + print(json.dumps(exploit, indent=2)) diff --git a/pyvariot/api.py b/pyvariot/api.py new file mode 100644 index 0000000..0e09548 --- /dev/null +++ b/pyvariot/api.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +from datetime import datetime +from importlib.metadata import version +from typing import Any, Generator +from urllib.parse import urljoin, urlparse, parse_qsl +from pathlib import PurePosixPath + +import requests + + +class PyVARIoT(): + + def __init__(self, root_url: str='https://www.variotdbs.pl/', useragent: str | None=None, + *, proxies: dict[str, str] | None=None): + '''Query a specific instance. + + :param root_url: URL of the instance to query. + :param useragent: The User Agent used by requests to run the HTTP requests against the instance. + :param proxies: The proxies to use to connect to theinstance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies + ''' + self.root_url = root_url + + if not urlparse(self.root_url).scheme: + self.root_url = 'http://' + self.root_url + if not self.root_url.endswith('/'): + self.root_url += '/' + self.session = requests.session() + self.session.headers['user-agent'] = useragent if useragent else f'PyVARIoT / {version("pyvariot")}' + if proxies: + self.session.proxies.update(proxies) + + self._apikey: str | None = None + + @property + def apikey(self) -> str | None: + return self._apikey + + @apikey.setter + def apikey(self, apikey: str) -> None: + '''Set the API key to use for the requests. + :params apikey: The API key to use for the requests. + ''' + self._apikey = apikey + self.session.headers['Authorization'] = f'Token {self._apikey}' + + @property + def is_up(self) -> bool: + '''Test if the given instance is accessible''' + try: + r = self.session.head(self.root_url) + except requests.exceptions.ConnectionError: + return False + return r.status_code == 200 + + def get_vulnerability(self, vulnerability_id: str, /, *, jsonld: bool=False) -> dict[str, Any]: + '''Get a vulnerability by its ID. + :param vulnerability_id: The ID of the vulnerability to get. + :param jsonld: Whether to return the JSON-LD representation of the vulnerability. + ''' + r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vuln', vulnerability_id))), + params={'jsonld': jsonld}) + return r.json() + + def get_exploit(self, exploit_id: str, /, *, jsonld: bool=False) -> dict[str, Any]: + '''Get an exploit by its ID. + :param exploit_id: The ID of the exploit to get. + :param jsonld: Whether to return the JSON-LD representation of the exploit. + ''' + r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'exploit', exploit_id))), + params={'jsonld': jsonld}) + return r.json() + + def __prepare_params(self, jsonld: bool=False, since: datetime | None=None, before: datetime | None=None, + limit: int | None=None, offset: int | None=None) -> dict[str, bool | str | int]: + '''Prepare the parameters for the requests.''' + params: dict[str, bool | str | int] = {'jsonld': jsonld} + if since: + params['since'] = since.isoformat() + if before: + params['before'] = before.isoformat() + if limit: + params['limit'] = limit + if offset: + params['offset'] = offset + return params + + def get_vulnerabilities(self, /, *, jsonld: bool=False, + since: datetime | None=None, before: datetime | None, + limit: int | None=None, offset: int | None=None) -> dict[str, Any]: + '''Get vulnerabilities on an interval. + :param jsonld: Whether to return the JSON-LD representation of the vulnerabilities. + :param since: The date from which to get the vulnerabilities. + :param before: The date until which to get the vulnerabilities. + :param limit: The maximum number of vulnerabilities to get in one call. + :param offset: The offset to start getting the vulnerabilities. + ''' + params = self.__prepare_params(jsonld, since, before, limit, offset) + r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulns'))), + params=params) + return r.json() + + def get_vulnerabilities_iter(self, /, *, jsonld: bool=False, + since: datetime | None=None, before: datetime | None, + limit: int | None=None, offset: int | None=None) -> Generator[dict[str, Any], None, None]: + '''Get vulnerabilities on an interval, automatically iterates over all the matching vulerabilities. + :param jsonld: Whether to return the JSON-LD representation of the vulnerabilities. + :param since: The date from which to get the vulnerabilities. + :param before: The date until which to get the vulnerabilities. + :param limit: The maximum number of vulnerabilities to get in one call. + :param offset: The offset to start getting the vulnerabilities. + ''' + while True: + r = self.get_vulnerabilities(jsonld=jsonld, since=since, before=before, limit=limit, offset=offset) + if not r: + break + for vuln in r['results']: + yield vuln + if not r['next']: + break + next_params = dict(parse_qsl(urlparse(r['next']).query)) + since = datetime.fromisoformat(next_params['since']) + before = datetime.fromisoformat(next_params['before']) + limit = int(next_params['limit']) + offset = int(next_params['offset']) + jsonld = False if next_params['offset'] == 'False' else True + + def get_exploits(self, /, *, jsonld: bool=False, + since: datetime | None=None, before: datetime | None, + limit: int | None=None, offset: int | None=None) -> dict[str, Any]: + '''Get exploits on an interval. + :param jsonld: Whether to return the JSON-LD representation of the exploits. + :param since: The date from which to get the exploits. + :param before: The date until which to get the exploits. + :param limit: The maximum number of exploits to get in one call. + :param offset: The offset to start getting the exploits. + ''' + params = self.__prepare_params(jsonld, since, before, limit, offset) + r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'exploits'))), + params=params) + return r.json() + + def get_exploits_iter(self, /, *, jsonld: bool=False, + since: datetime | None=None, before: datetime | None, + limit: int | None=None, offset: int | None=None) -> Generator[dict[str, Any], None, None]: + '''Get exploits on an interval, automatically iterates over all the matching exploits. + :param jsonld: Whether to return the JSON-LD representation of the exploits. + :param since: The date from which to get the exploits. + :param before: The date until which to get the exploits. + :param limit: The maximum number of exploits to get in one call. + :param offset: The offset to start getting the exploits. + ''' + while True: + r = self.get_exploits(jsonld=jsonld, since=since, before=before, limit=limit, offset=offset) + if not r: + break + for exploit in r['results']: + yield exploit + if not r['next']: + break + next_params = dict(parse_qsl(urlparse(r['next']).query)) + since = datetime.fromisoformat(next_params['since']) + before = datetime.fromisoformat(next_params['before']) + limit = int(next_params['limit']) + offset = int(next_params['offset']) + jsonld = False if next_params['offset'] == 'False' else True diff --git a/pyproject/py.typed b/pyvariot/py.typed similarity index 100% rename from pyproject/py.typed rename to pyvariot/py.typed diff --git a/tests/test_web.py b/tests/test_web.py index 17891e7..a5a9a87 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -2,14 +2,56 @@ import unittest -from pyproject import PyProject +from datetime import datetime, timezone + +from pyvariot import PyVARIoT class TestBasic(unittest.TestCase): def setUp(self) -> None: - self.client = PyProject(root_url="http://127.0.0.1:9999") + self.client = PyVARIoT() + # self.client.apikey = '' def test_up(self) -> None: self.assertTrue(self.client.is_up) - self.assertTrue(self.client.redis_up()) + + def test_get_vulnerability(self) -> None: + vuln = self.client.get_vulnerability('VAR-202405-2633') + self.assertEqual(vuln['id'], 'VAR-202405-2633') + vuln = self.client.get_vulnerability('VAR-202405-2633', jsonld=True) + self.assertEqual(vuln['id'], 'VAR-202405-2633') + self.assertEqual(vuln['affected_products']['@context']['@vocab'], 'https://www.variotdbs.pl/ref/affected_products#') + + def test_get_exploit(self) -> None: + exploit = self.client.get_exploit('VAR-E-202403-0059') + self.assertEqual(exploit['id'], 'VAR-E-202403-0059') + exploit = self.client.get_exploit('VAR-E-202403-0059', jsonld=True) + self.assertEqual(exploit['id'], 'VAR-E-202403-0059') + self.assertEqual(exploit['affected_products']['@context']['@vocab'], 'https://www.variotdbs.pl/ref/affected_products#') + + def test_get_vulnerabilities(self) -> None: + since = datetime(2024, 6, 2, 22, tzinfo=timezone.utc) + before = datetime(2024, 6, 2, 23, tzinfo=timezone.utc) + limit = 1 + offset = 0 + vulns = self.client.get_vulnerabilities(since=since, before=before, limit=limit, offset=offset) + self.assertEqual(len(vulns['results']), 1) + + def test_get_vulnerabilities_iter(self) -> None: + since = datetime(2024, 6, 2, 22, tzinfo=timezone.utc) + before = datetime(2024, 6, 2, 23, tzinfo=timezone.utc) + limit = 20 + vulns_ids = [] + for vuln in self.client.get_vulnerabilities_iter(since=since, before=before, limit=limit): + vulns_ids.append(vuln['id']) + self.assertEqual(len(vulns_ids), 29) + + def test_get_exploits_iter(self) -> None: + since = datetime(2023, 12, 13, 13, tzinfo=timezone.utc) + before = datetime(2023, 12, 13, 14, tzinfo=timezone.utc) + limit = 5 + exploits_ids = [] + for exploit in self.client.get_exploits_iter(since=since, before=before, limit=limit): + exploits_ids.append(exploit['id']) + self.assertEqual(len(exploits_ids), 11)