mirror of
https://github.com/MISP/misp-galaxy.git
synced 2024-11-22 23:07:19 +00:00
MITRE galaxy - initial conversion and migration script
this is not fully working yet !
This commit is contained in:
parent
ec7dd3b123
commit
bdfefb4499
1 changed files with 156 additions and 0 deletions
156
tools/mitre-cti/v2.0/create_mitre-galaxy.py
Executable file
156
tools/mitre-cti/v2.0/create_mitre-galaxy.py
Executable file
|
@ -0,0 +1,156 @@
|
|||
#!/usr/bin/env python3
|
||||
import json
|
||||
import re
|
||||
import os
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='Create a couple galaxy/cluster with cti\'s intrusion-sets\nMust be in the mitre/cti/enterprise-attack/intrusion-set folder')
|
||||
parser.add_argument("-p", "--path", required=True, help="Path of the mitre/cti folder")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
values = []
|
||||
misp_dir = '../../../'
|
||||
|
||||
|
||||
domains = ['enterprise-attack', 'mobile-attack', 'pre-attack']
|
||||
types = ['attack-pattern', 'course-of-action', 'intrusion-set', 'malware', 'tool']
|
||||
all_data = {} # variable that will contain everything
|
||||
|
||||
# read in existing data
|
||||
# THIS IS FOR MIGRATION - reading the data from the enterprise-attack, mobile-attack, pre-attack
|
||||
# first build a data set of the MISP Galaxy ATT&CK elements by using the UUID as reference, this speeds up lookups later on.
|
||||
# at the end we will convert everything again to separate datasets
|
||||
all_data_uuid = {}
|
||||
for domain in domains:
|
||||
for t in types:
|
||||
fname = os.path.join(misp_dir, 'clusters', 'mitre-{}-{}.json'.format(domain, t))
|
||||
if os.path.exists(fname):
|
||||
# print("##### {}".format(fname))
|
||||
with open(fname) as f:
|
||||
file_data = json.load(f)
|
||||
# print(file_data)
|
||||
for value in file_data['values']:
|
||||
if value['uuid'] in all_data_uuid:
|
||||
# exit("ERROR: Something is really wrong, we seem to have duplicates.")
|
||||
# if it already exists we need to copy over all the data manually to merge it
|
||||
# on the other hand, from a manual analysis it looks like it's mostly the relations that are different
|
||||
# so now we will just copy over the relationships
|
||||
# actually, at time of writing the code below results in no change as the new items always contained more than the previously seen items
|
||||
value_orig = all_data_uuid[value['uuid']]
|
||||
if 'related' in value_orig:
|
||||
for related_item in value_orig['related']:
|
||||
if related_item not in value['related']:
|
||||
value['related'].append(related_item)
|
||||
all_data_uuid[value['uuid']] = value
|
||||
|
||||
# THIS IS FOR NORMAL OPERATIONS - reading from the very old and new models - one model per type
|
||||
# FIXME implement this (copy paste above or put above in function and call function)
|
||||
|
||||
|
||||
# now load the MITRE ATT&CK
|
||||
for domain in domains:
|
||||
attack_dir = os.path.join(args.path, domain)
|
||||
if not os.path.exists(attack_dir):
|
||||
exit("ERROR: MITRE ATT&CK folder incorrect")
|
||||
|
||||
with open(os.path.join(attack_dir, domain + '.json')) as f:
|
||||
attack_data = json.load(f)
|
||||
|
||||
for item in attack_data['objects']:
|
||||
if item['type'] not in types:
|
||||
continue
|
||||
|
||||
# print(json.dumps(item, indent=2, sort_keys=True, ensure_ascii=False))
|
||||
try:
|
||||
# build the new data structure
|
||||
value = {}
|
||||
uuid = re.search('--(.*)$', item['id']).group(0)[2:]
|
||||
# item exist already in the all_data set
|
||||
update = False
|
||||
if uuid in all_data_uuid:
|
||||
value = all_data_uuid[uuid]
|
||||
|
||||
if 'description' in item:
|
||||
value['description'] = item['description']
|
||||
value['value'] = item['name'] + ' - ' + item['external_references'][0]['external_id']
|
||||
value['meta'] = {}
|
||||
value['meta']['refs'] = []
|
||||
value['uuid'] = re.search('--(.*)$', item['id']).group(0)[2:]
|
||||
|
||||
if 'aliases' in item:
|
||||
value['meta']['synonyms'] = item['aliases']
|
||||
if 'x_mitre_aliases' in item:
|
||||
value['meta']['synonyms'] = item['x_mitre_aliases']
|
||||
|
||||
for reference in item['external_references']:
|
||||
if 'url' in reference and reference['url'] not in value['meta']['refs']:
|
||||
value['meta']['refs'].append(reference['url'])
|
||||
if 'external_id' in reference:
|
||||
value['meta']['external_id'] = reference['external_id']
|
||||
|
||||
if 'kill_chain_phases' in item: # many (but not all) attack-patterns have this
|
||||
value['meta']['kill_chain'] = []
|
||||
for killchain in item['kill_chain_phases']:
|
||||
value['meta']['kill_chain'].append(killchain['kill_chain_name'] + ':enterprise-attack:' + killchain['phase_name'])
|
||||
if 'x_mitre_data_sources' in item:
|
||||
value['meta']['mitre_data_sources'] = item['x_mitre_data_sources']
|
||||
if 'x_mitre_platforms' in item:
|
||||
value['meta']['mitre_platforms'] = item['x_mitre_platforms']
|
||||
|
||||
# relationships will be build separately afterwards
|
||||
value['type'] = item['type'] # remove this before dump to json
|
||||
# print(json.dumps(value, sort_keys=True, indent=2))
|
||||
|
||||
all_data_uuid[uuid] = value
|
||||
|
||||
except Exception as e:
|
||||
print(json.dumps(item, sort_keys=True, indent=2))
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
# process the 'relationship' type as we now know the existence of all ATT&CK uuids
|
||||
for item in attack_data['objects']:
|
||||
if item['type'] != 'relationship':
|
||||
continue
|
||||
# print(json.dumps(item, indent=2, sort_keys=True, ensure_ascii=False))
|
||||
|
||||
rel_type = item['relationship_type']
|
||||
dest_uuid = re.findall(r'--([0-9a-f-]+)', item['target_ref']).pop()
|
||||
source_uuid = re.findall(r'--([0-9a-f-]+)', item['source_ref']).pop()
|
||||
tags = []
|
||||
|
||||
# add the relation in the defined way
|
||||
rel_source = {
|
||||
"dest-uuid": dest_uuid,
|
||||
"tags": [
|
||||
"estimative-language:likelihood-probability=\"almost-certain\""
|
||||
],
|
||||
"type": rel_type
|
||||
}
|
||||
if 'relation' not in all_data_uuid[source_uuid]:
|
||||
all_data_uuid[source_uuid]['relation'] = []
|
||||
if rel_source not in all_data_uuid[source_uuid]['relation']:
|
||||
all_data_uuid[source_uuid]['relation'].append(rel_source)
|
||||
|
||||
# LATER find the opposite word of "rel_type" and build the relation in the opposite direction
|
||||
|
||||
# dump all_data to their respective file
|
||||
for t in types:
|
||||
fname = os.path.join(misp_dir, 'clusters', 'mitre-{}.json'.format(t))
|
||||
if not os.path.exists(fname):
|
||||
exit("File {} does not exist, this is unexpected.".format(fname))
|
||||
# print("##### {}".format(fname))
|
||||
with open(fname) as f:
|
||||
file_data = json.load(f)
|
||||
|
||||
file_data['values'] = []
|
||||
for item in all_data_uuid.values():
|
||||
if item['type'] != t:
|
||||
continue
|
||||
item.pop('type', None)
|
||||
file_data['values'].append(item)
|
||||
|
||||
with open(fname, 'w') as f:
|
||||
json.dump(file_data, f, indent=2, sort_keys=True, ensure_ascii=False)
|
||||
f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things
|
Loading…
Reference in a new issue