new: [d3fend] initial conversion script for MITRE D3FEND #975

This commit is contained in:
Christophe Vandeplas 2024-05-27 21:57:08 +02:00
parent f3838f4550
commit a4afac9a97
No known key found for this signature in database
GPG key ID: BDC48619FFDC5A5B

211
tools/gen_mitre_d3fend.py Executable file
View file

@ -0,0 +1,211 @@
#!/usr/bin/env python3
import json
import os
import requests
import uuid
d3fend_url = 'https://d3fend.mitre.org/ontologies/d3fend.json'
d3fend_full_mappings_url = 'https://d3fend.mitre.org/api/ontology/inference/d3fend-full-mappings.json'
try:
with open('d3fend.json', 'r') as f:
d3fend_json = json.load(f)
except Exception:
r = requests.get(d3fend_url)
with open('d3fend.json', 'w') as f:
f.write(r.text)
d3fend_json = r.json()
uuid_seed = '35527064-12b4-4b73-952b-6d76b9f1b1e3'
tactics = {} # key = tactic, value = phases
phases_ids = []
techniques_ids = []
techniques = []
def get_as_list(item):
if isinstance(item, dict):
return item.values()
elif isinstance(item, list):
result = []
for i in item:
if isinstance(i, dict):
result += i.values()
if isinstance(i, str):
result.append(i)
return result
elif isinstance(item, str):
return [item]
else:
raise ValueError(f'Unexpected type: {type(item)}')
def is_val_in_element(val, element):
result = False
if isinstance(element, dict): # only one entry
if val == element['@id']:
return True
elif isinstance(element, list): # multiple entries
for e in element:
if val == e['@id']:
return True
elif not element:
pass
else:
raise ValueError(f'Unexpected type: {type(element)}')
return result
def is_element_in_list(element, lst):
if isinstance(element, dict): # only one entry
if element['@id'] in lst:
return True
elif isinstance(element, list): # multiple entries
for e in element:
if e['@id'] in lst:
return True
else:
raise ValueError(f'Unexpected type: {type(element)}')
def id_to_label(id):
return data[id]['rdfs:label']
def get_parent(item):
# value of subClassOf starts with d3f
if 'rdfs:subClassOf' in item:
# if 'd3f:enables' in item:
# parent_classes = get_as_list(item['d3f:enables'])
# else:
parent_classes = get_as_list(item['rdfs:subClassOf'])
for parent_class in parent_classes:
if parent_class.startswith('d3f'):
return parent_class
return None
def find_kill_chain_of(original_item):
# find if back in the kill chain_tactics list we built before
parent_classes = get_as_list(original_item['rdfs:subClassOf'])
for parent_class in parent_classes:
if parent_class.startswith('d3f'):
parent_class_name = id_to_label(parent_class).replace(' ', '-')
for tactic, phases in kill_chain_tactics.items():
if parent_class_name in phases:
return f"{tactic}:{parent_class_name}"
# child with one more parent in between
for parent_class in parent_classes:
if parent_class.startswith('d3f'):
return find_kill_chain_of(data[parent_class])
# first convert as dict with key = @id
data = {}
for item in d3fend_json['@graph']:
data[item['@id']] = item
# tactic
for item in d3fend_json['@graph']:
if is_val_in_element('d3f:DefensiveTactic', item.get('rdfs:subClassOf')):
tactics[item['rdfs:label']] = {
'order': item['d3f:display-order'],
'phases': []
}
print(f"Tactic: {item['rdfs:label']}")
# phases
for item in d3fend_json['@graph']:
if 'rdfs:subClassOf' in item:
if is_val_in_element('d3f:DefensiveTechnique', item['rdfs:subClassOf']):
phases_ids.append(item['@id'])
parent = id_to_label(item['d3f:enables']['@id'])
tactics[parent]['phases'].append(item['rdfs:label'].replace(' ', '-'))
# print(f"Tactic: {parent} \tPhase: {item['rdfs:label']}")
# sort the tactics based on the order
tactics = dict(sorted(tactics.items(), key=lambda item: item[1]['order']))
# sort the values
kill_chain_tactics = {}
for tactic, value in tactics.items():
kill_chain_tactics[tactic] = sorted(value['phases'])
# extract all parent, child and ... techniques
seen_new = True
while seen_new:
seen_new = False
for item in d3fend_json['@graph']:
if 'rdfs:subClassOf' in item:
element = item['rdfs:subClassOf']
if is_element_in_list(element, phases_ids) or is_element_in_list(element, techniques_ids):
if item['@id'] in techniques_ids:
continue
seen_new = True
techniques_ids.append(item['@id'])
if 'Memory Boundary Tracking' in item['rdfs:label']:
print(f"Technique: {item['rdfs:label']}")
kill_chain = find_kill_chain_of(item)
technique = {
'value': item['rdfs:label'],
'description': item['d3f:definition'],
'uuid': str(uuid.uuid5(uuid.UUID(uuid_seed), item['d3f:d3fend-id'])),
'meta': {
'kill_chain': [kill_chain],
'refs': [f"https://d3fend.mitre.org/technique/{item['@id']}"],
'external_id': item['d3f:d3fend-id']
}
}
# synonyms
if 'd3f:synonym' in item:
technique['meta']['synonyms'] = get_as_list(item['d3f:synonym'])
# TODO relations
techniques.append(technique)
print(f"Technique: {item['rdfs:label']} - {item['d3f:d3fend-id']}")
galaxy_fname = 'mitre-d3fend.json'
galaxy_type = "mitre-d3fend"
galaxy_name = "MITRE D3FEND"
galaxy_description = 'A knowledge graph of cybersecurity countermeasures.'
galaxy_source = 'https://d3fend.mitre.org/'
json_galaxy = {
'description': galaxy_description,
'icon': "map",
'kill_chain_order': kill_chain_tactics,
'name': galaxy_name,
'namespace': "mitre",
'type': galaxy_type,
'uuid': "77d1bbfa-2982-4e0a-9238-1dae4a48c5b4",
'version': 1
}
json_cluster = {
'authors': ["MITRE"],
'category': 'd3fend',
'name': galaxy_name,
'description': galaxy_description,
'source': galaxy_source,
'type': galaxy_type,
'uuid': "b8bd7e45-63bf-4c44-8ab1-c81c82547380",
'values': list(techniques),
'version': 1
}
# save the Galaxy and Cluster file
with open(os.path.join('..', 'galaxies', galaxy_fname), 'w') as f:
# do not sort_keys as it would break the kill_chain_order
json.dump(json_galaxy, f, indent=2, ensure_ascii=False)
f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things
with open(os.path.join('..', 'clusters', galaxy_fname), 'w') as f:
json.dump(json_cluster, f, indent=2, sort_keys=True, ensure_ascii=False)
f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things
print("All done, please don't forget to ./jq_all_the_things.sh, commit, and then ./validate_all.sh.")