fix: [MISP exporter] fix MISP exporter CIRCL/AIL-framework#590

This commit is contained in:
Terrtia 2023-04-20 10:44:17 +02:00
parent ab7b2bdbab
commit 86b1fda59b
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
3 changed files with 39 additions and 19 deletions

View file

@ -13,7 +13,7 @@ import uuid
from abc import ABC from abc import ABC
from pymisp import MISPEvent, PyMISP from pymisp import MISPEvent, PyMISP, PyMISPError
from urllib3 import disable_warnings as urllib3_disable_warnings from urllib3 import disable_warnings as urllib3_disable_warnings
sys.path.append('../../configs/keys') sys.path.append('../../configs/keys')
@ -22,9 +22,11 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages # Import Project packages
################################# #################################
from exporter.abstract_exporter import AbstractExporter from exporter.abstract_exporter import AbstractExporter
from lib.exceptions import MISPConnectionError
from lib.ConfigLoader import ConfigLoader from lib.ConfigLoader import ConfigLoader
from lib.Investigations import Investigation from lib.Investigations import Investigation
from lib.objects.abstract_object import AbstractObject from lib.objects.abstract_object import AbstractObject
from lib.objects import ail_objects
# from lib.Tracker import Tracker # from lib.Tracker import Tracker
@ -61,7 +63,7 @@ def delete_user_misp_objects_to_export(user_id):
# --- FUNCTIONS --- # # --- FUNCTIONS --- #
# MISPExporter -> return correct exporter by type ???? # MISPExporter -> return correct exporter by type ????
class MISPExporter(AbstractExporter, ABC): # <- AbstractMISPExporter ??????? class MISPExporter(AbstractExporter, ABC):
"""MISP Exporter """MISP Exporter
:param url: URL of the MISP instance you want to connect to :param url: URL of the MISP instance you want to connect to
@ -97,7 +99,11 @@ class MISPExporter(AbstractExporter, ABC): # <- AbstractMISPExporter ???????
self.ssl = None self.ssl = None
def get_misp(self): def get_misp(self):
return PyMISP(self.url, self.key, self.ssl) try:
misp = PyMISP(self.url, self.key, self.ssl)
except PyMISPError as e:
raise MISPConnectionError(e.message)
return misp
# TODO catch exception # TODO catch exception
def get_misp_uuid(self): def get_misp_uuid(self):
@ -151,6 +157,9 @@ class MISPExporter(AbstractExporter, ABC): # <- AbstractMISPExporter ???????
# TODO EVENT REPORT ??????? # TODO EVENT REPORT ???????
def create_event(self, objs, export=False, event_uuid=None, date=None, publish=False, info=None, tags=None, def create_event(self, objs, export=False, event_uuid=None, date=None, publish=False, info=None, tags=None,
analysis=0, distribution=0, threat_level=4): analysis=0, distribution=0, threat_level=4):
# Test Connection
if export:
self.get_misp()
if tags is None: if tags is None:
tags = [] tags = []
event = MISPEvent() event = MISPEvent()
@ -197,9 +206,9 @@ class MISPExporter(AbstractExporter, ABC): # <- AbstractMISPExporter ???????
class MISPExporterAILObjects(MISPExporter): class MISPExporterAILObjects(MISPExporter):
"""MISPExporter AILObjects """MISPExporter AILObjects
:param url: URL of the MISP instance you want to connect to :param url: URL of the MISP instance you want to connect to :param key: API key of the user you want to use
:param key: API key of the user you want to use :param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in
:param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain) case of self signed or other certificate (the concatenation of all the crt of the chain)
""" """
def __init__(self, url='', key='', ssl=False): def __init__(self, url='', key='', ssl=False):
@ -222,9 +231,9 @@ class MISPExporterAILObjects(MISPExporter):
class MISPExporterInvestigation(MISPExporter): class MISPExporterInvestigation(MISPExporter):
"""MISPExporter Investigation """MISPExporter Investigation
:param url: URL of the MISP instance you want to connect to :param url: URL of the MISP instance you want to connect to :param key: API key of the user you want to use
:param key: API key of the user you want to use :param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in
:param ssl: can be True or False (to check or to not check the validity of the certificate. Or a CA_BUNDLE in case of self signed or other certificate (the concatenation of all the crt of the chain) case of self signed or other certificate (the concatenation of all the crt of the chain)
""" """
def __init__(self, url='', key='', ssl=False): def __init__(self, url='', key='', ssl=False):

View file

@ -1,18 +1,21 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*-coding:UTF-8 -* # -*-coding:UTF-8 -*
class AIL_ERROR(Exception): from pymisp import PyMISPError
"""docstring for AIL_ERROR."""
class AILError(Exception):
def __init__(self, message): def __init__(self, message):
super(AIL_ERROR, self).__init__(message) super(AILError, self).__init__(message)
self.message = message self.message = message
class UpdateInvestigationError(AIL_ERROR): class UpdateInvestigationError(AILError):
pass pass
class NewTagError(AIL_ERROR): class NewTagError(AILError):
pass pass
class ModuleQueueError(AIL_ERROR): class ModuleQueueError(AILError):
pass
class MISPConnectionError(AILError):
pass pass

View file

@ -23,6 +23,7 @@ sys.path.append(os.environ['AIL_BIN'])
################################## ##################################
from exporter import MISPExporter from exporter import MISPExporter
from exporter import TheHiveExporter from exporter import TheHiveExporter
from lib.exceptions import MISPConnectionError
from lib.objects import ail_objects from lib.objects import ail_objects
from lib.Investigations import Investigation from lib.Investigations import Investigation
@ -44,6 +45,9 @@ thehive_exporter_item = TheHiveExporter.TheHiveExporterItem()
# ============ FUNCTIONS ============ # ============ FUNCTIONS ============
def create_json_response(data, status_code):
return Response(json.dumps(data, indent=2, sort_keys=True), mimetype='application/json'), status_code
# ============= ROUTES ============== # ============= ROUTES ==============
@import_export.route('/import_export/import') @import_export.route('/import_export/import')
@ -151,14 +155,18 @@ def objects_misp_export_post():
publish = request.form.get('misp_event_info', False) publish = request.form.get('misp_event_info', False)
objs = ail_objects.get_objects(objects) objs = ail_objects.get_objects(objects)
try:
event = misp_exporter_objects.create_event(objs, distribution=distribution, threat_level=threat_level, event = misp_exporter_objects.create_event(objs, distribution=distribution, threat_level=threat_level,
analysis=analysis, info=info, export=export, publish=publish) analysis=analysis, info=info, export=export, publish=publish)
except MISPConnectionError as e:
return create_json_response({"error": e.message}, 400)
MISPExporter.delete_user_misp_objects_to_export(user_id) MISPExporter.delete_user_misp_objects_to_export(user_id)
if not export: if not export:
return send_file(io.BytesIO(event['event'].encode()), as_attachment=True, event_uuid = event[10:46]
download_name=f'ail_export_{event["uuid"]}.json') # TODO ADD JAVASCRIPT REFRESH PAGE IF RESP == 200
return send_file(io.BytesIO(event.encode()), as_attachment=True,
download_name=f'ail_export_{event_uuid}.json')
else: else:
object_types = ail_objects.get_all_objects_with_subtypes_tuple() object_types = ail_objects.get_all_objects_with_subtypes_tuple()
return render_template("export_object.html", object_types=object_types, return render_template("export_object.html", object_types=object_types,