mirror of
https://github.com/MISP/misp-galaxy.git
synced 2024-11-29 18:27:19 +00:00
Merge pull request #290 from cvandeplas/master
tool: experimental graphing tool
This commit is contained in:
commit
1276d834dd
1 changed files with 195 additions and 0 deletions
195
tools/graph.py
Executable file
195
tools/graph.py
Executable file
|
@ -0,0 +1,195 @@
|
|||
#!/usr/bin/env python3
|
||||
# TODO
|
||||
# - define strength between relations based on 'type' - similar should be closer than the others
|
||||
# - use different colors / shapes
|
||||
|
||||
import json
|
||||
import os
|
||||
import argparse
|
||||
from graphviz import Digraph
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description='Generate a DOT file to graph a Galaxy cluster and its relations.')
|
||||
parser.add_argument("-u", "--uuid", help="Start UUID of a cluster.")
|
||||
parser.add_argument("-a", "--all", action='store_true', help='generate all graphs as PNGs')
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
def gen_galaxy_tag(galaxy_name, cluster_name):
|
||||
# return 'misp-galaxy:{}="{}"'.format(galaxy_name, cluster_name)
|
||||
return '{}={}'.format(galaxy_name, cluster_name)
|
||||
|
||||
files_to_ignore = ['mitre-attack-pattern.json', 'mitre-course-of-action.json', 'mitre-intrusion-set.json',
|
||||
'mitre-malware.json', 'mitre-tool.json']
|
||||
|
||||
galaxies_fnames = []
|
||||
pathClusters = '../clusters'
|
||||
for f in os.listdir(pathClusters):
|
||||
if '.json' in f and f not in files_to_ignore:
|
||||
galaxies_fnames.append(f)
|
||||
galaxies_fnames.sort()
|
||||
|
||||
cluster_uuids = {}
|
||||
galaxies = []
|
||||
for galaxy_fname in galaxies_fnames:
|
||||
fullPathClusters = os.path.join(pathClusters, galaxy_fname)
|
||||
with open(fullPathClusters) as fp:
|
||||
json_data = json.load(fp)
|
||||
galaxies.append(json_data)
|
||||
for cluster in json_data['values']:
|
||||
if 'uuid' not in cluster:
|
||||
continue
|
||||
cluster_uuids[cluster['uuid']] = {
|
||||
'tag': gen_galaxy_tag(json_data['type'], cluster['value']),
|
||||
'galaxy': json_data['type'],
|
||||
'value': cluster['value'],
|
||||
'synonyms': cluster.get('synonyms')
|
||||
}
|
||||
|
||||
|
||||
|
||||
# for k, v in cluster_uuids.items():
|
||||
# print("{}\t{}".format(k, v))
|
||||
|
||||
|
||||
type_mapping = {
|
||||
'ransomware': 'tool',
|
||||
# 'mitre-pre-attack-relationship': '',
|
||||
# 'mitre-enterprise-attack-course-of-action': '',
|
||||
'mitre-enterprise-attack-intrusion-set': 'actor',
|
||||
'mitre-intrusion-set': 'actor',
|
||||
'rat': 'tool',
|
||||
'stealer': 'tool',
|
||||
'mitre-enterprise-attack-malware': 'tool',
|
||||
# 'mitre-attack-pattern': '',
|
||||
# 'mitre-mobile-attack-relationship': '',
|
||||
# 'mitre-enterprise-attack-attack-pattern': '',
|
||||
'microsoft-activity-group': 'actor',
|
||||
# 'mitre-course-of-action': '',
|
||||
'exploit-kit': 'tool',
|
||||
'mitre-mobile-attack-tool': 'tool',
|
||||
'backdoor': 'tool',
|
||||
# 'mitre-pre-attack-attack-pattern': '',
|
||||
'mitre-mobile-attack-intrusion-set': 'actor',
|
||||
'mitre-tool': 'tool',
|
||||
# 'mitre-mobile-attack-attack-pattern': '',
|
||||
'mitre-mobile-attack-malware': 'tool',
|
||||
'tool': 'tool',
|
||||
# 'preventive-measure': '',
|
||||
# 'sector': '',
|
||||
'mitre-malware': 'tool',
|
||||
'banker': 'tool',
|
||||
# 'branded-vulnerability': '',
|
||||
'botnet': 'tool',
|
||||
# 'cert-eu-govsector': '',
|
||||
'threat-actor': 'actor',
|
||||
'mitre-enterprise-attack-tool': 'tool',
|
||||
'android': 'tool',
|
||||
# 'mitre-mobile-attack-course-of-action': '',
|
||||
'mitre-pre-attack-intrusion-set': 'actor',
|
||||
# 'mitre-enterprise-attack-relationship': '',
|
||||
'tds': 'tool',
|
||||
'malpedia': 'tool'
|
||||
}
|
||||
|
||||
|
||||
def gen_dot(uuid):
|
||||
things_to_keep = [uuid] # '5b4ee3ea-eee3-4c8e-8323-85ae32658754' = threat-actor=Sofacy
|
||||
# ' 5e0a7cf2-6107-4d5f-9dd0-9df38b1fcba8' = APT30
|
||||
things_seen = things_to_keep.copy()
|
||||
|
||||
dot = []
|
||||
while len(things_to_keep) > 0:
|
||||
new_things_to_keep = []
|
||||
for galaxy in galaxies:
|
||||
for cluster in galaxy['values']:
|
||||
if 'related' not in cluster:
|
||||
continue
|
||||
src_tag = gen_galaxy_tag(galaxy['type'], cluster['value'])
|
||||
if cluster['uuid'] not in things_to_keep:
|
||||
continue
|
||||
node_params = []
|
||||
node_params.append('label="{}\n{}"'.format(galaxy['type'], cluster['value']))
|
||||
if type_mapping.get(galaxy['type']) == 'actor':
|
||||
node_params.append('shape=octagon')
|
||||
node_params.append('style=filled,color=indianred1')
|
||||
elif type_mapping.get(galaxy['type']) == 'tool':
|
||||
node_params.append('shape=box')
|
||||
node_params.append('style=filled,color=deepskyblue')
|
||||
else:
|
||||
node_params.append('shape=ellipse')
|
||||
dot.append('"{src}" [{params}];'.format(
|
||||
src=src_tag,
|
||||
params=','.join(node_params)
|
||||
))
|
||||
for relation in cluster['related']:
|
||||
try:
|
||||
dest_tag = cluster_uuids[relation['dest-uuid']]['tag']
|
||||
extra = []
|
||||
if relation['type'] == 'similar':
|
||||
# make arrow bidirectional
|
||||
extra.append('dir="both"')
|
||||
# prevent double links for 'similar' types
|
||||
if relation['dest-uuid'] in things_seen:
|
||||
continue
|
||||
dot.append('"{src}" -> "{dst}" [label="{lbl}",{extra}];'.format(
|
||||
# dot.append('"{src}" -> "{dst}" [{extra}];'.format(
|
||||
src=src_tag,
|
||||
dst=dest_tag,
|
||||
lbl=relation['type'],
|
||||
extra=','.join(extra)
|
||||
))
|
||||
# FIXME - add a separate node with the color, type, format of the source-node
|
||||
|
||||
# prevent something to be processed twice
|
||||
if relation['dest-uuid'] not in things_seen:
|
||||
new_things_to_keep.append(relation['dest-uuid'])
|
||||
things_seen.append(relation['dest-uuid'])
|
||||
except KeyError:
|
||||
# skip uuids not found
|
||||
pass
|
||||
# print(new_things_to_keep)
|
||||
things_to_keep = new_things_to_keep.copy()
|
||||
|
||||
|
||||
return dot
|
||||
|
||||
if args.uuid:
|
||||
uuid = args.uuid
|
||||
dot = []
|
||||
# dot.append('digraph G {')
|
||||
dot.append('concentrate=true;')
|
||||
dot.append('overlap=scale;')
|
||||
generated_dot = gen_dot(uuid)
|
||||
if len(generated_dot) == 0:
|
||||
print("Empty graph for uuid: {}".format(uuid))
|
||||
exit()
|
||||
print("Generating graph for uuid: {}".format(uuid))
|
||||
dot += generated_dot
|
||||
# dot.append('}')
|
||||
# dg.source = '\n'.join(dot)
|
||||
dg = Digraph(engine='neato', format='png', body=dot)
|
||||
# print(dg.source)
|
||||
dg.render(filename='graphs/{}'.format(uuid), cleanup=False)
|
||||
|
||||
elif args.all:
|
||||
for uuid in cluster_uuids.keys():
|
||||
dot = []
|
||||
# dot.append('digraph G {')
|
||||
dot.append('concentrate=true;')
|
||||
dot.append('overlap=scale;')
|
||||
generated_dot = gen_dot(uuid)
|
||||
if len(generated_dot) == 0:
|
||||
print("Empty graph for uuid: {}".format(uuid))
|
||||
continue
|
||||
|
||||
print("Generating graph for uuid: {}".format(uuid))
|
||||
dot += generated_dot
|
||||
# dot.append('}')
|
||||
# dg.source = '\n'.join(dot)
|
||||
|
||||
dg = Digraph(format='png', body=dot)
|
||||
# print(dg.source)
|
||||
dg.render(engine='dot', filename='graphs/{}'.format(uuid), cleanup=False)
|
||||
else:
|
||||
exit("No parameters given, use --help for more info.")
|
Loading…
Reference in a new issue