From a4afac9a9775775ab846c5235921c0cf997ba055 Mon Sep 17 00:00:00 2001 From: Christophe Vandeplas Date: Mon, 27 May 2024 21:57:08 +0200 Subject: [PATCH] new: [d3fend] initial conversion script for MITRE D3FEND #975 --- tools/gen_mitre_d3fend.py | 211 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100755 tools/gen_mitre_d3fend.py diff --git a/tools/gen_mitre_d3fend.py b/tools/gen_mitre_d3fend.py new file mode 100755 index 0000000..3bb0160 --- /dev/null +++ b/tools/gen_mitre_d3fend.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +import json +import os +import requests +import uuid + +d3fend_url = 'https://d3fend.mitre.org/ontologies/d3fend.json' +d3fend_full_mappings_url = 'https://d3fend.mitre.org/api/ontology/inference/d3fend-full-mappings.json' + + +try: + with open('d3fend.json', 'r') as f: + d3fend_json = json.load(f) +except Exception: + r = requests.get(d3fend_url) + with open('d3fend.json', 'w') as f: + f.write(r.text) + d3fend_json = r.json() + +uuid_seed = '35527064-12b4-4b73-952b-6d76b9f1b1e3' + +tactics = {} # key = tactic, value = phases +phases_ids = [] +techniques_ids = [] +techniques = [] + + +def get_as_list(item): + if isinstance(item, dict): + return item.values() + elif isinstance(item, list): + result = [] + for i in item: + if isinstance(i, dict): + result += i.values() + if isinstance(i, str): + result.append(i) + return result + elif isinstance(item, str): + return [item] + else: + raise ValueError(f'Unexpected type: {type(item)}') + + +def is_val_in_element(val, element): + result = False + if isinstance(element, dict): # only one entry + if val == element['@id']: + return True + elif isinstance(element, list): # multiple entries + for e in element: + if val == e['@id']: + return True + elif not element: + pass + else: + raise ValueError(f'Unexpected type: {type(element)}') + return result + + +def is_element_in_list(element, lst): + if isinstance(element, dict): # only one entry + if element['@id'] in lst: + return True + + elif isinstance(element, list): # multiple entries + for e in element: + if e['@id'] in lst: + return True + else: + raise ValueError(f'Unexpected type: {type(element)}') + + +def id_to_label(id): + return data[id]['rdfs:label'] + + +def get_parent(item): + # value of subClassOf starts with d3f + if 'rdfs:subClassOf' in item: + # if 'd3f:enables' in item: + # parent_classes = get_as_list(item['d3f:enables']) + # else: + parent_classes = get_as_list(item['rdfs:subClassOf']) + for parent_class in parent_classes: + if parent_class.startswith('d3f'): + return parent_class + return None + + +def find_kill_chain_of(original_item): + # find if back in the kill chain_tactics list we built before + parent_classes = get_as_list(original_item['rdfs:subClassOf']) + for parent_class in parent_classes: + if parent_class.startswith('d3f'): + parent_class_name = id_to_label(parent_class).replace(' ', '-') + for tactic, phases in kill_chain_tactics.items(): + if parent_class_name in phases: + return f"{tactic}:{parent_class_name}" + # child with one more parent in between + for parent_class in parent_classes: + if parent_class.startswith('d3f'): + return find_kill_chain_of(data[parent_class]) + + +# first convert as dict with key = @id +data = {} +for item in d3fend_json['@graph']: + data[item['@id']] = item + +# tactic +for item in d3fend_json['@graph']: + if is_val_in_element('d3f:DefensiveTactic', item.get('rdfs:subClassOf')): + tactics[item['rdfs:label']] = { + 'order': item['d3f:display-order'], + 'phases': [] + } + print(f"Tactic: {item['rdfs:label']}") + +# phases +for item in d3fend_json['@graph']: + if 'rdfs:subClassOf' in item: + if is_val_in_element('d3f:DefensiveTechnique', item['rdfs:subClassOf']): + phases_ids.append(item['@id']) + parent = id_to_label(item['d3f:enables']['@id']) + tactics[parent]['phases'].append(item['rdfs:label'].replace(' ', '-')) + # print(f"Tactic: {parent} \tPhase: {item['rdfs:label']}") + +# sort the tactics based on the order +tactics = dict(sorted(tactics.items(), key=lambda item: item[1]['order'])) +# sort the values +kill_chain_tactics = {} +for tactic, value in tactics.items(): + kill_chain_tactics[tactic] = sorted(value['phases']) + + +# extract all parent, child and ... techniques +seen_new = True +while seen_new: + seen_new = False + for item in d3fend_json['@graph']: + if 'rdfs:subClassOf' in item: + element = item['rdfs:subClassOf'] + if is_element_in_list(element, phases_ids) or is_element_in_list(element, techniques_ids): + if item['@id'] in techniques_ids: + continue + seen_new = True + techniques_ids.append(item['@id']) + if 'Memory Boundary Tracking' in item['rdfs:label']: + print(f"Technique: {item['rdfs:label']}") + kill_chain = find_kill_chain_of(item) + technique = { + 'value': item['rdfs:label'], + 'description': item['d3f:definition'], + 'uuid': str(uuid.uuid5(uuid.UUID(uuid_seed), item['d3f:d3fend-id'])), + 'meta': { + 'kill_chain': [kill_chain], + 'refs': [f"https://d3fend.mitre.org/technique/{item['@id']}"], + 'external_id': item['d3f:d3fend-id'] + } + } + # synonyms + if 'd3f:synonym' in item: + technique['meta']['synonyms'] = get_as_list(item['d3f:synonym']) + # TODO relations + + techniques.append(technique) + print(f"Technique: {item['rdfs:label']} - {item['d3f:d3fend-id']}") + + +galaxy_fname = 'mitre-d3fend.json' +galaxy_type = "mitre-d3fend" +galaxy_name = "MITRE D3FEND" +galaxy_description = 'A knowledge graph of cybersecurity countermeasures.' +galaxy_source = 'https://d3fend.mitre.org/' +json_galaxy = { + 'description': galaxy_description, + 'icon': "map", + 'kill_chain_order': kill_chain_tactics, + 'name': galaxy_name, + 'namespace': "mitre", + 'type': galaxy_type, + 'uuid': "77d1bbfa-2982-4e0a-9238-1dae4a48c5b4", + 'version': 1 +} + +json_cluster = { + 'authors': ["MITRE"], + 'category': 'd3fend', + 'name': galaxy_name, + 'description': galaxy_description, + 'source': galaxy_source, + 'type': galaxy_type, + 'uuid': "b8bd7e45-63bf-4c44-8ab1-c81c82547380", + 'values': list(techniques), + 'version': 1 +} + + +# save the Galaxy and Cluster file +with open(os.path.join('..', 'galaxies', galaxy_fname), 'w') as f: + # do not sort_keys as it would break the kill_chain_order + json.dump(json_galaxy, f, indent=2, ensure_ascii=False) + f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things + +with open(os.path.join('..', 'clusters', galaxy_fname), 'w') as f: + json.dump(json_cluster, f, indent=2, sort_keys=True, ensure_ascii=False) + f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things + +print("All done, please don't forget to ./jq_all_the_things.sh, commit, and then ./validate_all.sh.") +