misp-galaxy/tools/mkdocs/modules/universe.py

92 lines
4.3 KiB
Python
Raw Normal View History

2024-03-01 15:30:49 +00:00
from modules.galaxy import Galaxy
from modules.cluster import Cluster
from collections import defaultdict, deque
class Universe:
2024-03-04 13:39:41 +00:00
def __init__(self, add_inbound_relationship=False):
2024-03-01 15:30:49 +00:00
self.galaxies = {} # Maps galaxy_name to Galaxy objects
2024-03-04 13:39:41 +00:00
self.add_inbound_relationship = add_inbound_relationship
2024-03-01 15:30:49 +00:00
def add_galaxy(self, galaxy_name, json_file_name, authors, description):
if galaxy_name not in self.galaxies:
self.galaxies[galaxy_name] = Galaxy(galaxy_name=galaxy_name, json_file_name=json_file_name, authors=authors, description=description)
def add_cluster(self, galaxy_name, uuid, description, value, meta):
if galaxy_name in self.galaxies:
self.galaxies[galaxy_name].add_cluster(uuid=uuid, description=description, value=value, meta=meta)
def define_relationship(self, cluster_a_id, cluster_b_id):
cluster_a = None
cluster_b = None
# Search for Cluster A and Cluster B in all galaxies
for galaxy in self.galaxies.values():
if cluster_a_id in galaxy.clusters:
cluster_a = galaxy.clusters[cluster_a_id]
if cluster_b_id in galaxy.clusters:
cluster_b = galaxy.clusters[cluster_b_id]
if cluster_a and cluster_b: # Both clusters found
break
# If both clusters are found, define the relationship
if cluster_a and cluster_b:
cluster_a.add_outbound_relationship(cluster_b)
cluster_b.add_inbound_relationship(cluster_a)
else:
# If Cluster B is not found, create a private cluster relationship for Cluster A
if cluster_a:
2024-03-04 13:39:41 +00:00
private_cluster = Cluster(uuid=cluster_b_id, galaxy=None, description=None, value="Private Cluster", meta=None)
2024-03-01 15:30:49 +00:00
cluster_a.add_outbound_relationship(private_cluster)
else:
2024-03-04 13:39:41 +00:00
raise ValueError(f"Cluster {cluster_a} not found in any galaxy")
2024-03-01 15:30:49 +00:00
def get_relationships_with_levels(self, start_cluster):
def bfs_with_undirected_relationships(start_cluster):
visited = set() # Tracks whether a cluster has been visited
relationships = defaultdict(lambda: float('inf')) # Tracks the lowest level for each cluster pair
queue = deque([(start_cluster, 0)]) # Queue of (cluster, level)
while queue:
current_cluster, level = queue.popleft()
if current_cluster not in visited:
visited.add(current_cluster)
# Process all relationships regardless of direction
2024-03-04 13:39:41 +00:00
if self.add_inbound_relationship:
neighbors = current_cluster.outbound_relationships.union(current_cluster.inbound_relationships)
else:
neighbors = current_cluster.outbound_relationships
2024-03-01 15:30:49 +00:00
for neighbor in neighbors:
link = frozenset([current_cluster, neighbor])
if level + 1 < relationships[link]:
relationships[link] = level + 1
2024-03-04 13:39:41 +00:00
if neighbor not in visited and neighbor.value != "Private Cluster":
2024-03-01 15:30:49 +00:00
queue.append((neighbor, level + 1))
2024-03-04 13:39:41 +00:00
# count = 0
2024-03-01 15:30:49 +00:00
# Convert the defaultdict to a list of tuples, ignoring direction
processed_relationships = []
for link, lvl in relationships.items():
# Extract clusters from the frozenset; direction is irrelevant
clusters = list(link)
2024-03-04 13:39:41 +00:00
if len(clusters) != 2:
# count += 1
continue
2024-03-01 15:30:49 +00:00
# Arbitrarily choose the first cluster as 'source' for consistency
2024-03-04 13:39:41 +00:00
if clusters[0].value == "Private Cluster":
processed_relationships.append((clusters[1], clusters[0], lvl))
else:
2024-03-01 15:30:49 +00:00
processed_relationships.append((clusters[0], clusters[1], lvl))
2024-03-04 13:39:41 +00:00
# except:
# processed_relationships.append((clusters[0], clusters[0], lvl)) # This is wrong just for testing!!!
2024-03-01 15:30:49 +00:00
2024-03-04 13:39:41 +00:00
# print(f"Count: {count}")
2024-03-01 15:30:49 +00:00
return processed_relationships
return bfs_with_undirected_relationships(start_cluster)