new: Initial commit

This commit is contained in:
Raphaël Vinot 2024-06-07 15:54:49 +02:00
parent cc1d36fb8e
commit af60692c0c
12 changed files with 273 additions and 98 deletions

View file

@ -1,6 +1,8 @@
BSD 3-Clause License BSD 3-Clause License
Copyright (c) 2020, Lookyloo Copyright (c) 2024, Raphaël Vinot
Copyright (c) 2024, CVE Search
Copyright (c) 2024, CIRCL - Computer Incident Response Center Luxembourg
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

View file

@ -1,18 +1,18 @@
# Python client and module for Project Template # Python client and module to query the VARIoT IoT vulnerabilities and exploits databases
This is a simple template used in all the web based APIs in this repository, and a few others. This is a Python client and module to query the [VARIoT IoT vulnerabilities and exploits databases](https://www.variotdbs.pl/api/).
## Installation ## Installation
```bash ```bash
pip install pyproject pip install pyvariot
``` ```
## Usage ## Usage
### Command line ### Command line
You can use the `client` command to do a thing: You can use the `variot` command to query the database:
```bash ```bash
``` ```

View file

@ -4,11 +4,11 @@ API reference
.. toctree:: .. toctree::
:maxdepth: 2 :maxdepth: 2
.. automodule:: pyproject .. automodule:: pyvariot
:members: :members:
PyProject PyProject
--------- ---------
.. autoclass:: PyProject .. autoclass:: PyVARIoT
:members: :members:

View file

@ -17,9 +17,9 @@
# -- Project information ----------------------------------------------------- # -- Project information -----------------------------------------------------
project = 'PyProject' project = 'PyVARIoT'
copyright = '2021, Lookyloo team' copyright = '2021, CVE Search'
author = 'Lookyloo team' author = 'CVE Search team'
# The full version, including alpha/beta/rc tags # The full version, including alpha/beta/rc tags
release = 'v0.0.1' release = 'v0.0.1'

View file

@ -1,12 +1,7 @@
.. PyLookyloo documentation master file, created by Welcome to PyVARIoT's documentation!
sphinx-quickstart on Tue Mar 23 12:28:17 2021. ====================================
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to PyProject's documentation! This is the client API for `PyVARIoT <https://github.com/cve-search/variot>`_:
=============================================
This is the client API for `PyProject <https://github.com/Lookyloo/project_template>`_:
foo foo
@ -16,15 +11,15 @@ Installation
The package is available on PyPi, so you can install it with:: The package is available on PyPi, so you can install it with::
pip install pyproject pip install pyvariot
Usage Usage
----- -----
You can use `client` as a python script:: You can use `variot` as a python script::
$ client -h $ variot -h
Or as a library: Or as a library:

View file

@ -1,7 +1,7 @@
[tool.poetry] [tool.poetry]
name = "pyproject" name = "pyvariot"
version = "0.0.1" version = "0.0.1"
description = "Python CLI and module for project" description = "Python CLI and module to query the VARIoT IoT vulnerabilities and exploits databases"
authors = ["Raphaël Vinot <raphael.vinot@circl.lu>"] authors = ["Raphaël Vinot <raphael.vinot@circl.lu>"]
license = "BSD-3-Clause" license = "BSD-3-Clause"
@ -24,7 +24,7 @@ classifiers = [
] ]
[tool.poetry.scripts] [tool.poetry.scripts]
client = 'pyproject:main' pyvariot = 'pyvariot:main'
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.10" python = "^3.10"

View file

@ -1,26 +0,0 @@
from __future__ import annotations
import argparse
import json
import sys
from .api import PyProject
__all__ = ['PyProject']
def main() -> None:
parser = argparse.ArgumentParser(description='Query a thing.')
parser.add_argument('--url', type=str, required=True, help='URL of the instance.')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--redis_up', action='store_true', help='Check if redis is up.')
args = parser.parse_args()
client = PyProject(args.url)
if not client.is_up:
print(f'Unable to reach {client.root_url}. Is the server up?')
sys.exit(1)
if args.redis_up:
response = client.redis_up()
print(json.dumps(response, indent=2))

View file

@ -1,45 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
from importlib.metadata import version
from typing import Any
from urllib.parse import urljoin, urlparse
import requests
class PyProject():
def __init__(self, root_url: str, useragent: str | None=None,
*, proxies: dict[str, str] | None=None):
'''Query a specific instance.
:param root_url: URL of the instance to query.
:param useragent: The User Agent used by requests to run the HTTP requests against the instance.
:param proxies: The proxies to use to connect to theinstance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
'''
self.root_url = root_url
if not urlparse(self.root_url).scheme:
self.root_url = 'http://' + self.root_url
if not self.root_url.endswith('/'):
self.root_url += '/'
self.session = requests.session()
self.session.headers['user-agent'] = useragent if useragent else f'PyProject / {version("pyproject")}'
if proxies:
self.session.proxies.update(proxies)
@property
def is_up(self) -> bool:
'''Test if the given instance is accessible'''
try:
r = self.session.head(self.root_url)
except requests.exceptions.ConnectionError:
return False
return r.status_code == 200
def redis_up(self) -> dict[str, Any]:
'''Check if redis is up and running'''
r = self.session.get(urljoin(self.root_url, 'redis_up'))
return r.json()

39
pyvariot/__init__.py Normal file
View file

@ -0,0 +1,39 @@
from __future__ import annotations
import argparse
import json
import sys
from .api import PyVARIoT
__all__ = ['PyVARIoT']
def main() -> None:
parser = argparse.ArgumentParser(description='Get a vulnerability or an exploit by ID.')
parser.add_argument('--url', type=str, help='URL of the instance.')
parser.add_argument('--apikey', type=str, help='Your personal API key.')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--vulnerability_id', type=str, help='ID of the vulnerability.')
group.add_argument('--exploit_id', type=str, help='ID of the exploit.')
args = parser.parse_args()
if args.url:
client = PyVARIoT(args.url)
else:
client = PyVARIoT()
if args.apikey:
client.apikey = args.apikey
if not client.is_up:
print(f'Unable to reach {client.root_url}. Is the server up?')
sys.exit(1)
if args.vulnerability_id:
vulnerability = client.get_vulnerability(args.vulnerability_id)
print(json.dumps(vulnerability, indent=2))
if args.exploit_id:
exploit = client.get_exploit(args.exploit_id)
print(json.dumps(exploit, indent=2))

168
pyvariot/api.py Normal file
View file

@ -0,0 +1,168 @@
#!/usr/bin/env python3
from __future__ import annotations
from datetime import datetime
from importlib.metadata import version
from typing import Any, Generator
from urllib.parse import urljoin, urlparse, parse_qsl
from pathlib import PurePosixPath
import requests
class PyVARIoT():
def __init__(self, root_url: str='https://www.variotdbs.pl/', useragent: str | None=None,
*, proxies: dict[str, str] | None=None):
'''Query a specific instance.
:param root_url: URL of the instance to query.
:param useragent: The User Agent used by requests to run the HTTP requests against the instance.
:param proxies: The proxies to use to connect to theinstance - More details: https://requests.readthedocs.io/en/latest/user/advanced/#proxies
'''
self.root_url = root_url
if not urlparse(self.root_url).scheme:
self.root_url = 'http://' + self.root_url
if not self.root_url.endswith('/'):
self.root_url += '/'
self.session = requests.session()
self.session.headers['user-agent'] = useragent if useragent else f'PyVARIoT / {version("pyvariot")}'
if proxies:
self.session.proxies.update(proxies)
self._apikey: str | None = None
@property
def apikey(self) -> str | None:
return self._apikey
@apikey.setter
def apikey(self, apikey: str) -> None:
'''Set the API key to use for the requests.
:params apikey: The API key to use for the requests.
'''
self._apikey = apikey
self.session.headers['Authorization'] = f'Token {self._apikey}'
@property
def is_up(self) -> bool:
'''Test if the given instance is accessible'''
try:
r = self.session.head(self.root_url)
except requests.exceptions.ConnectionError:
return False
return r.status_code == 200
def get_vulnerability(self, vulnerability_id: str, /, *, jsonld: bool=False) -> dict[str, Any]:
'''Get a vulnerability by its ID.
:param vulnerability_id: The ID of the vulnerability to get.
:param jsonld: Whether to return the JSON-LD representation of the vulnerability.
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vuln', vulnerability_id))),
params={'jsonld': jsonld})
return r.json()
def get_exploit(self, exploit_id: str, /, *, jsonld: bool=False) -> dict[str, Any]:
'''Get an exploit by its ID.
:param exploit_id: The ID of the exploit to get.
:param jsonld: Whether to return the JSON-LD representation of the exploit.
'''
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'exploit', exploit_id))),
params={'jsonld': jsonld})
return r.json()
def __prepare_params(self, jsonld: bool=False, since: datetime | None=None, before: datetime | None=None,
limit: int | None=None, offset: int | None=None) -> dict[str, bool | str | int]:
'''Prepare the parameters for the requests.'''
params: dict[str, bool | str | int] = {'jsonld': jsonld}
if since:
params['since'] = since.isoformat()
if before:
params['before'] = before.isoformat()
if limit:
params['limit'] = limit
if offset:
params['offset'] = offset
return params
def get_vulnerabilities(self, /, *, jsonld: bool=False,
since: datetime | None=None, before: datetime | None,
limit: int | None=None, offset: int | None=None) -> dict[str, Any]:
'''Get vulnerabilities on an interval.
:param jsonld: Whether to return the JSON-LD representation of the vulnerabilities.
:param since: The date from which to get the vulnerabilities.
:param before: The date until which to get the vulnerabilities.
:param limit: The maximum number of vulnerabilities to get in one call.
:param offset: The offset to start getting the vulnerabilities.
'''
params = self.__prepare_params(jsonld, since, before, limit, offset)
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'vulns'))),
params=params)
return r.json()
def get_vulnerabilities_iter(self, /, *, jsonld: bool=False,
since: datetime | None=None, before: datetime | None,
limit: int | None=None, offset: int | None=None) -> Generator[dict[str, Any], None, None]:
'''Get vulnerabilities on an interval, automatically iterates over all the matching vulerabilities.
:param jsonld: Whether to return the JSON-LD representation of the vulnerabilities.
:param since: The date from which to get the vulnerabilities.
:param before: The date until which to get the vulnerabilities.
:param limit: The maximum number of vulnerabilities to get in one call.
:param offset: The offset to start getting the vulnerabilities.
'''
while True:
r = self.get_vulnerabilities(jsonld=jsonld, since=since, before=before, limit=limit, offset=offset)
if not r:
break
for vuln in r['results']:
yield vuln
if not r['next']:
break
next_params = dict(parse_qsl(urlparse(r['next']).query))
since = datetime.fromisoformat(next_params['since'])
before = datetime.fromisoformat(next_params['before'])
limit = int(next_params['limit'])
offset = int(next_params['offset'])
jsonld = False if next_params['offset'] == 'False' else True
def get_exploits(self, /, *, jsonld: bool=False,
since: datetime | None=None, before: datetime | None,
limit: int | None=None, offset: int | None=None) -> dict[str, Any]:
'''Get exploits on an interval.
:param jsonld: Whether to return the JSON-LD representation of the exploits.
:param since: The date from which to get the exploits.
:param before: The date until which to get the exploits.
:param limit: The maximum number of exploits to get in one call.
:param offset: The offset to start getting the exploits.
'''
params = self.__prepare_params(jsonld, since, before, limit, offset)
r = self.session.get(urljoin(self.root_url, str(PurePosixPath('api', 'exploits'))),
params=params)
return r.json()
def get_exploits_iter(self, /, *, jsonld: bool=False,
since: datetime | None=None, before: datetime | None,
limit: int | None=None, offset: int | None=None) -> Generator[dict[str, Any], None, None]:
'''Get exploits on an interval, automatically iterates over all the matching exploits.
:param jsonld: Whether to return the JSON-LD representation of the exploits.
:param since: The date from which to get the exploits.
:param before: The date until which to get the exploits.
:param limit: The maximum number of exploits to get in one call.
:param offset: The offset to start getting the exploits.
'''
while True:
r = self.get_exploits(jsonld=jsonld, since=since, before=before, limit=limit, offset=offset)
if not r:
break
for exploit in r['results']:
yield exploit
if not r['next']:
break
next_params = dict(parse_qsl(urlparse(r['next']).query))
since = datetime.fromisoformat(next_params['since'])
before = datetime.fromisoformat(next_params['before'])
limit = int(next_params['limit'])
offset = int(next_params['offset'])
jsonld = False if next_params['offset'] == 'False' else True

View file

@ -2,14 +2,56 @@
import unittest import unittest
from pyproject import PyProject from datetime import datetime, timezone
from pyvariot import PyVARIoT
class TestBasic(unittest.TestCase): class TestBasic(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
self.client = PyProject(root_url="http://127.0.0.1:9999") self.client = PyVARIoT()
# self.client.apikey = ''
def test_up(self) -> None: def test_up(self) -> None:
self.assertTrue(self.client.is_up) self.assertTrue(self.client.is_up)
self.assertTrue(self.client.redis_up())
def test_get_vulnerability(self) -> None:
vuln = self.client.get_vulnerability('VAR-202405-2633')
self.assertEqual(vuln['id'], 'VAR-202405-2633')
vuln = self.client.get_vulnerability('VAR-202405-2633', jsonld=True)
self.assertEqual(vuln['id'], 'VAR-202405-2633')
self.assertEqual(vuln['affected_products']['@context']['@vocab'], 'https://www.variotdbs.pl/ref/affected_products#')
def test_get_exploit(self) -> None:
exploit = self.client.get_exploit('VAR-E-202403-0059')
self.assertEqual(exploit['id'], 'VAR-E-202403-0059')
exploit = self.client.get_exploit('VAR-E-202403-0059', jsonld=True)
self.assertEqual(exploit['id'], 'VAR-E-202403-0059')
self.assertEqual(exploit['affected_products']['@context']['@vocab'], 'https://www.variotdbs.pl/ref/affected_products#')
def test_get_vulnerabilities(self) -> None:
since = datetime(2024, 6, 2, 22, tzinfo=timezone.utc)
before = datetime(2024, 6, 2, 23, tzinfo=timezone.utc)
limit = 1
offset = 0
vulns = self.client.get_vulnerabilities(since=since, before=before, limit=limit, offset=offset)
self.assertEqual(len(vulns['results']), 1)
def test_get_vulnerabilities_iter(self) -> None:
since = datetime(2024, 6, 2, 22, tzinfo=timezone.utc)
before = datetime(2024, 6, 2, 23, tzinfo=timezone.utc)
limit = 20
vulns_ids = []
for vuln in self.client.get_vulnerabilities_iter(since=since, before=before, limit=limit):
vulns_ids.append(vuln['id'])
self.assertEqual(len(vulns_ids), 29)
def test_get_exploits_iter(self) -> None:
since = datetime(2023, 12, 13, 13, tzinfo=timezone.utc)
before = datetime(2023, 12, 13, 14, tzinfo=timezone.utc)
limit = 5
exploits_ids = []
for exploit in self.client.get_exploits_iter(since=since, before=before, limit=limit):
exploits_ids.append(exploit['id'])
self.assertEqual(len(exploits_ids), 11)