#!/usr/bin/env python3 import json import re import os import argparse parser = argparse.ArgumentParser(description='Create a couple galaxy/cluster with cti\'s intrusion-sets\nMust be in the mitre/cti/enterprise-attack/intrusion-set folder') parser.add_argument("-p", "--path", required=True, help="Path of the mitre/cti folder") args = parser.parse_args() values = [] misp_dir = '../../../' domains = ['enterprise-attack', 'mobile-attack', 'pre-attack'] types = ['attack-pattern', 'course-of-action', 'intrusion-set', 'malware', 'tool'] all_data = {} # variable that will contain everything # read in existing data # THIS IS FOR MIGRATION - reading the data from the enterprise-attack, mobile-attack, pre-attack # first build a data set of the MISP Galaxy ATT&CK elements by using the UUID as reference, this speeds up lookups later on. # at the end we will convert everything again to separate datasets all_data_uuid = {} for domain in domains: for t in types: fname = os.path.join(misp_dir, 'clusters', 'mitre-{}-{}.json'.format(domain, t)) if os.path.exists(fname): # print("##### {}".format(fname)) with open(fname) as f: file_data = json.load(f) # print(file_data) for value in file_data['values']: if value['uuid'] in all_data_uuid: # exit("ERROR: Something is really wrong, we seem to have duplicates.") # if it already exists we need to copy over all the data manually to merge it # on the other hand, from a manual analysis it looks like it's mostly the relations that are different # so now we will just copy over the relationships # actually, at time of writing the code below results in no change as the new items always contained more than the previously seen items value_orig = all_data_uuid[value['uuid']] if 'related' in value_orig: for related_item in value_orig['related']: if related_item not in value['related']: value['related'].append(related_item) all_data_uuid[value['uuid']] = value # THIS IS FOR NORMAL OPERATIONS - reading from the very old and new models - one model per type # FIXME implement this (copy paste above or put above in function and call function) # now load the MITRE ATT&CK for domain in domains: attack_dir = os.path.join(args.path, domain) if not os.path.exists(attack_dir): exit("ERROR: MITRE ATT&CK folder incorrect") with open(os.path.join(attack_dir, domain + '.json')) as f: attack_data = json.load(f) for item in attack_data['objects']: if item['type'] not in types: continue # print(json.dumps(item, indent=2, sort_keys=True, ensure_ascii=False)) try: # build the new data structure value = {} uuid = re.search('--(.*)$', item['id']).group(0)[2:] # item exist already in the all_data set update = False if uuid in all_data_uuid: value = all_data_uuid[uuid] if 'description' in item: value['description'] = item['description'] value['value'] = item['name'] + ' - ' + item['external_references'][0]['external_id'] value['meta'] = {} value['meta']['refs'] = [] value['uuid'] = re.search('--(.*)$', item['id']).group(0)[2:] if 'aliases' in item: value['meta']['synonyms'] = item['aliases'] if 'x_mitre_aliases' in item: value['meta']['synonyms'] = item['x_mitre_aliases'] for reference in item['external_references']: if 'url' in reference and reference['url'] not in value['meta']['refs']: value['meta']['refs'].append(reference['url']) if 'external_id' in reference: value['meta']['external_id'] = reference['external_id'] if 'kill_chain_phases' in item: # many (but not all) attack-patterns have this value['meta']['kill_chain'] = [] for killchain in item['kill_chain_phases']: value['meta']['kill_chain'].append(killchain['kill_chain_name'] + ':enterprise-attack:' + killchain['phase_name']) if 'x_mitre_data_sources' in item: value['meta']['mitre_data_sources'] = item['x_mitre_data_sources'] if 'x_mitre_platforms' in item: value['meta']['mitre_platforms'] = item['x_mitre_platforms'] # relationships will be build separately afterwards value['type'] = item['type'] # remove this before dump to json # print(json.dumps(value, sort_keys=True, indent=2)) all_data_uuid[uuid] = value except Exception as e: print(json.dumps(item, sort_keys=True, indent=2)) import traceback traceback.print_exc() # process the 'relationship' type as we now know the existence of all ATT&CK uuids for item in attack_data['objects']: if item['type'] != 'relationship': continue # print(json.dumps(item, indent=2, sort_keys=True, ensure_ascii=False)) rel_type = item['relationship_type'] dest_uuid = re.findall(r'--([0-9a-f-]+)', item['target_ref']).pop() source_uuid = re.findall(r'--([0-9a-f-]+)', item['source_ref']).pop() tags = [] # add the relation in the defined way rel_source = { "dest-uuid": dest_uuid, "tags": [ "estimative-language:likelihood-probability=\"almost-certain\"" ], "type": rel_type } if 'relation' not in all_data_uuid[source_uuid]: all_data_uuid[source_uuid]['relation'] = [] if rel_source not in all_data_uuid[source_uuid]['relation']: all_data_uuid[source_uuid]['relation'].append(rel_source) # LATER find the opposite word of "rel_type" and build the relation in the opposite direction # dump all_data to their respective file for t in types: fname = os.path.join(misp_dir, 'clusters', 'mitre-{}.json'.format(t)) if not os.path.exists(fname): exit("File {} does not exist, this is unexpected.".format(fname)) # print("##### {}".format(fname)) with open(fname) as f: file_data = json.load(f) file_data['values'] = [] for item in all_data_uuid.values(): if item['type'] != t: continue item.pop('type', None) file_data['values'].append(item) with open(fname, 'w') as f: json.dump(file_data, f, indent=2, sort_keys=True, ensure_ascii=False) f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things