new: [mitre] MITRE Data Sources and Data Components fixes #914

This commit is contained in:
Christophe Vandeplas 2024-01-12 17:08:06 +01:00
parent 6a325420bf
commit 6ea968588a
No known key found for this signature in database
GPG key ID: BDC48619FFDC5A5B
10 changed files with 11399 additions and 46552 deletions

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,9 @@
{
"description": "Data components are parts of data sources. ",
"icon": "sitemap",
"name": "mitre-data-component",
"namespace": "mitre-attack",
"type": "mitre-data-component",
"uuid": "afff2d74-5d4a-4aa7-995a-3701a2dbe593",
"version": 1
}

View file

@ -0,0 +1,9 @@
{
"description": "Data sources represent the various subjects/topics of information that can be collected by sensors/logs. ",
"icon": "sitemap",
"name": "mitre-data-source",
"namespace": "mitre-attack",
"type": "mitre-data-source",
"uuid": "dca5da28-fdc0-4b37-91cd-989d139d96cf",
"version": 1
}

View file

@ -8,13 +8,21 @@ parser = argparse.ArgumentParser(description='Create a couple galaxy/cluster wit
parser.add_argument("-p", "--path", required=True, help="Path of the mitre/cti folder") parser.add_argument("-p", "--path", required=True, help="Path of the mitre/cti folder")
args = parser.parse_args() args = parser.parse_args()
root_folder = args.path
values = [] values = []
misp_dir = '../' misp_dir = '../'
domains = ['enterprise-attack', 'mobile-attack', 'pre-attack'] domains = ['enterprise-attack', 'mobile-attack', 'pre-attack']
types = ['attack-pattern', 'course-of-action', 'intrusion-set', 'malware', 'tool'] types = {'data-source': 'x-mitre-data-source',
'attack-pattern': 'attack-pattern',
'course-of-action': 'course-of-action',
'intrusion-set': 'intrusion-set',
'malware': 'malware',
'tool': 'tool',
'data-component': 'x-mitre-data-component'
}
mitre_sources = ['mitre-attack', 'mitre-ics-attack', 'mitre-pre-attack', 'mitre-mobile-attack'] mitre_sources = ['mitre-attack', 'mitre-ics-attack', 'mitre-pre-attack', 'mitre-mobile-attack']
all_data = {} # variable that will contain everything all_data = {} # variable that will contain everything
@ -71,7 +79,7 @@ for t in types:
# now load the MITRE ATT&CK # now load the MITRE ATT&CK
for domain in domains: for domain in domains:
attack_dir = os.path.join(args.path, domain) attack_dir = os.path.join(root_folder, domain)
if not os.path.exists(attack_dir): if not os.path.exists(attack_dir):
exit("ERROR: MITRE ATT&CK folder incorrect") exit("ERROR: MITRE ATT&CK folder incorrect")
@ -79,7 +87,7 @@ for domain in domains:
attack_data = json.load(f) attack_data = json.load(f)
for item in attack_data['objects']: for item in attack_data['objects']:
if item['type'] not in types: if item['type'] not in types.values():
continue continue
# print(json.dumps(item, indent=2, sort_keys=True, ensure_ascii=False)) # print(json.dumps(item, indent=2, sort_keys=True, ensure_ascii=False))
@ -94,7 +102,10 @@ for domain in domains:
if 'description' in item: if 'description' in item:
value['description'] = item['description'] value['description'] = item['description']
value['value'] = item['name'] + ' - ' + item['external_references'][0]['external_id'] if 'external_references' in item:
value['value'] = item['name'] + ' - ' + item['external_references'][0]['external_id']
else:
value['value'] = item['name']
value['meta'] = {} value['meta'] = {}
value['meta']['refs'] = [] value['meta']['refs'] = []
value['uuid'] = re.search('--(.*)$', item['id']).group(0)[2:] value['uuid'] = re.search('--(.*)$', item['id']).group(0)[2:]
@ -104,16 +115,17 @@ for domain in domains:
if 'x_mitre_aliases' in item: if 'x_mitre_aliases' in item:
value['meta']['synonyms'] = item['x_mitre_aliases'] value['meta']['synonyms'] = item['x_mitre_aliases']
for reference in item['external_references']: if 'external_references' in item:
if 'url' in reference and reference['url'] not in value['meta']['refs']: for reference in item['external_references']:
value['meta']['refs'].append(reference['url']) if 'url' in reference and reference['url'] not in value['meta']['refs']:
# Find Mitre external IDs from allowed sources value['meta']['refs'].append(reference['url'])
if 'external_id' in reference and reference.get("source_name", None) in mitre_sources: # Find Mitre external IDs from allowed sources
value['meta']['external_id'] = reference['external_id'] if 'external_id' in reference and reference.get("source_name", None) in mitre_sources:
if not value['meta'].get('external_id', None): value['meta']['external_id'] = reference['external_id']
exit("Entry is missing an external ID, please update mitre_sources. Available references: {}".format( if not value['meta'].get('external_id', None):
json.dumps(item['external_references']) exit("Entry is missing an external ID, please update mitre_sources. Available references: {}".format(
)) json.dumps(item['external_references'])
))
if 'kill_chain_phases' in item: # many (but not all) attack-patterns have this if 'kill_chain_phases' in item: # many (but not all) attack-patterns have this
value['meta']['kill_chain'] = [] value['meta']['kill_chain'] = []
@ -129,6 +141,7 @@ for domain in domains:
value['type'] = item['type'] # remove this before dump to json value['type'] = item['type'] # remove this before dump to json
# print(json.dumps(value, sort_keys=True, indent=2)) # print(json.dumps(value, sort_keys=True, indent=2))
# FIXME if 'x_mitre_deprecated' , 'revoked'
all_data_uuid[uuid] = value all_data_uuid[uuid] = value
except Exception as e: except Exception as e:
@ -152,10 +165,6 @@ for domain in domains:
"dest-uuid": dest_uuid, "dest-uuid": dest_uuid,
"type": rel_type "type": rel_type
} }
if rel_type != 'subtechnique-of':
rel_source['tags'] = [
"estimative-language:likelihood-probability=\"almost-certain\""
]
try: try:
if 'related' not in all_data_uuid[source_uuid]: if 'related' not in all_data_uuid[source_uuid]:
all_data_uuid[source_uuid]['related'] = [] all_data_uuid[source_uuid]['related'] = []
@ -166,9 +175,39 @@ for domain in domains:
# LATER find the opposite word of "rel_type" and build the relation in the opposite direction # LATER find the opposite word of "rel_type" and build the relation in the opposite direction
# process (again) the data-component, as they create relationships using 'x_mitre_data_source_ref' instead...
for item in attack_data['objects']:
if item['type'] != 'x-mitre-data-component':
continue
data_source_uuid = re.findall(r'--([0-9a-f-]+)', item['x_mitre_data_source_ref']).pop()
data_component_uuid = re.findall(r'--([0-9a-f-]+)', item['id']).pop()
# create relationship bidirectionally
rel_data_source = {
"dest-uuid": data_component_uuid,
"type": 'includes' # FIXME use a valid type
}
try:
if 'related' not in all_data_uuid[data_source_uuid]:
all_data_uuid[data_source_uuid]['related'] = []
if rel_data_source not in all_data_uuid[data_source_uuid]['related']:
all_data_uuid[data_source_uuid]['related'].append(rel_data_source)
except KeyError:
pass # ignore relations from which we do not know the source
rel_data_component = {
"dest-uuid": data_component_uuid,
"type": 'included-in' # FIXME use a valid type
}
try:
if 'related' not in all_data_uuid[data_component_uuid]:
all_data_uuid[data_component_uuid]['related'] = []
if rel_data_component not in all_data_uuid[data_component_uuid]['related']:
all_data_uuid[data_component_uuid]['related'].append(rel_data_component)
except KeyError:
pass # ignore relations from which we do not know the source
# dump all_data to their respective file # dump all_data to their respective file
for t in types: for t, meta_t in types.items():
fname = os.path.join(misp_dir, 'clusters', 'mitre-{}.json'.format(t)) fname = os.path.join(misp_dir, 'clusters', 'mitre-{}.json'.format(t))
if not os.path.exists(fname): if not os.path.exists(fname):
exit("File {} does not exist, this is unexpected.".format(fname)) exit("File {} does not exist, this is unexpected.".format(fname))
@ -178,7 +217,7 @@ for t in types:
file_data['values'] = [] file_data['values'] = []
for item in all_data_uuid.values(): for item in all_data_uuid.values():
# print(json.dumps(item, sort_keys=True, indent=2)) # print(json.dumps(item, sort_keys=True, indent=2))
if 'type' not in item or item['type'] != t: # drop old data or not from the right type if 'type' not in item or item['type'] != meta_t: # drop old data or not from the right type
continue continue
item_2 = item.copy() item_2 = item.copy()
item_2.pop('type', None) item_2.pop('type', None)