mirror of
https://github.com/MISP/misp-galaxy.git
synced 2024-11-26 16:57:18 +00:00
Add [technique] subtechnique
This commit is contained in:
parent
35b8192208
commit
a311ce6a1c
2 changed files with 179 additions and 51 deletions
|
@ -1,6 +1,13 @@
|
||||||
from api.api import TidalAPI
|
from api.api import TidalAPI
|
||||||
from models.galaxy import Galaxy
|
from models.galaxy import Galaxy
|
||||||
from models.cluster import GroupCluster, SoftwareCluster, CampaignsCluster, TechniqueCluster, TacticCluster, ReferencesCluster
|
from models.cluster import (
|
||||||
|
GroupCluster,
|
||||||
|
SoftwareCluster,
|
||||||
|
CampaignsCluster,
|
||||||
|
TechniqueCluster,
|
||||||
|
TacticCluster,
|
||||||
|
ReferencesCluster,
|
||||||
|
)
|
||||||
import argparse
|
import argparse
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
@ -9,6 +16,7 @@ CONFIG = "./config"
|
||||||
GALAXY_PATH = "../../galaxies"
|
GALAXY_PATH = "../../galaxies"
|
||||||
CLUSTER_PATH = "../../clusters"
|
CLUSTER_PATH = "../../clusters"
|
||||||
|
|
||||||
|
|
||||||
def create_galaxy(endpoint: str, version: int):
|
def create_galaxy(endpoint: str, version: int):
|
||||||
api = TidalAPI()
|
api = TidalAPI()
|
||||||
data = api.get_data(endpoint)
|
data = api.get_data(endpoint)
|
||||||
|
@ -17,7 +25,7 @@ def create_galaxy(endpoint: str, version: int):
|
||||||
|
|
||||||
galaxy = Galaxy(**config["galaxy"], version=version)
|
galaxy = Galaxy(**config["galaxy"], version=version)
|
||||||
galaxy.save_to_file(f"{GALAXY_PATH}/tidal-{endpoint}.json")
|
galaxy.save_to_file(f"{GALAXY_PATH}/tidal-{endpoint}.json")
|
||||||
|
|
||||||
match endpoint:
|
match endpoint:
|
||||||
case "groups":
|
case "groups":
|
||||||
cluster = GroupCluster(**config["cluster"], uuid=galaxy.uuid)
|
cluster = GroupCluster(**config["cluster"], uuid=galaxy.uuid)
|
||||||
|
@ -44,13 +52,14 @@ def create_galaxy(endpoint: str, version: int):
|
||||||
cluster.save_to_file(f"{CLUSTER_PATH}/tidal-{endpoint}.json")
|
cluster.save_to_file(f"{CLUSTER_PATH}/tidal-{endpoint}.json")
|
||||||
print(f"Galaxy tidal-{endpoint} created")
|
print(f"Galaxy tidal-{endpoint} created")
|
||||||
|
|
||||||
|
|
||||||
def main(args, galaxies):
|
def main(args, galaxies):
|
||||||
if args.all:
|
if args.all:
|
||||||
for galaxy in galaxies:
|
for galaxy in galaxies:
|
||||||
create_galaxy(galaxy, args.version)
|
create_galaxy(galaxy, args.version)
|
||||||
else:
|
else:
|
||||||
create_galaxy(args.type, args.version)
|
create_galaxy(args.type, args.version)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
@ -59,7 +68,6 @@ if __name__ == "__main__":
|
||||||
if f.endswith(".json"):
|
if f.endswith(".json"):
|
||||||
galaxies.append(f.split(".")[0])
|
galaxies.append(f.split(".")[0])
|
||||||
|
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="Create galaxy and cluster json files from Tidal API"
|
description="Create galaxy and cluster json files from Tidal API"
|
||||||
)
|
)
|
||||||
|
@ -86,4 +94,4 @@ if __name__ == "__main__":
|
||||||
if hasattr(args, "func"):
|
if hasattr(args, "func"):
|
||||||
args.func(args, galaxies=galaxies)
|
args.func(args, galaxies=galaxies)
|
||||||
else:
|
else:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
from dataclasses import dataclass, field, asdict
|
from dataclasses import dataclass, field, asdict
|
||||||
import json
|
import json
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Meta:
|
class Meta:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class GroupsMeta():
|
class GroupsMeta(Meta):
|
||||||
source: str = None
|
source: str = None
|
||||||
group_attack_id: str = None
|
group_attack_id: str = None
|
||||||
country: str = None
|
country: str = None
|
||||||
|
@ -16,8 +18,9 @@ class GroupsMeta():
|
||||||
tags: list = None
|
tags: list = None
|
||||||
owner: str = None
|
owner: str = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SoftwareMeta():
|
class SoftwareMeta(Meta):
|
||||||
source: str = None
|
source: str = None
|
||||||
type: str = None
|
type: str = None
|
||||||
software_attack_id: str = None
|
software_attack_id: str = None
|
||||||
|
@ -25,23 +28,32 @@ class SoftwareMeta():
|
||||||
tags: list = None
|
tags: list = None
|
||||||
owner: str = None
|
owner: str = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TechniqueMeta():
|
class TechniqueMeta(Meta):
|
||||||
source: str = None
|
source: str = None
|
||||||
platforms: list = None
|
platforms: list = None
|
||||||
tags: list = None
|
tags: list = None
|
||||||
owner: str = None
|
owner: str = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TacticMeta():
|
class SubTechniqueMeta(Meta):
|
||||||
|
source: str = None
|
||||||
|
technique_attack_id: str = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TacticMeta(Meta):
|
||||||
source: str = None
|
source: str = None
|
||||||
tactic_attack_id: str = None
|
tactic_attack_id: str = None
|
||||||
ordinal_position: int = None
|
ordinal_position: int = None
|
||||||
tags: list = None
|
tags: list = None
|
||||||
owner: str = None
|
owner: str = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ReferencesMeta():
|
class ReferencesMeta(Meta):
|
||||||
source: str = None
|
source: str = None
|
||||||
refs: list = None
|
refs: list = None
|
||||||
title: str = None
|
title: str = None
|
||||||
|
@ -50,8 +62,9 @@ class ReferencesMeta():
|
||||||
date_published: str = None
|
date_published: str = None
|
||||||
owner: str = None
|
owner: str = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class CampaignsMeta():
|
class CampaignsMeta(Meta):
|
||||||
source: str = None
|
source: str = None
|
||||||
campaign_attack_id: str = None
|
campaign_attack_id: str = None
|
||||||
first_seen: str = None
|
first_seen: str = None
|
||||||
|
@ -59,6 +72,7 @@ class CampaignsMeta():
|
||||||
tags: list = None
|
tags: list = None
|
||||||
owner: str = None
|
owner: str = None
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class ClusterValue:
|
class ClusterValue:
|
||||||
description: str = ""
|
description: str = ""
|
||||||
|
@ -69,11 +83,23 @@ class ClusterValue:
|
||||||
|
|
||||||
def return_value(self):
|
def return_value(self):
|
||||||
value_dict = asdict(self)
|
value_dict = asdict(self)
|
||||||
value_dict['meta'] = {k: v for k, v in asdict(self.meta).items() if v is not None}
|
value_dict["meta"] = {
|
||||||
|
k: v for k, v in asdict(self.meta).items() if v is not None
|
||||||
|
}
|
||||||
return value_dict
|
return value_dict
|
||||||
|
|
||||||
class Cluster():
|
|
||||||
def __init__(self, authors: str, category: str, description: str, name: str, source: str, type: str, uuid: str):
|
class Cluster:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
authors: str,
|
||||||
|
category: str,
|
||||||
|
description: str,
|
||||||
|
name: str,
|
||||||
|
source: str,
|
||||||
|
type: str,
|
||||||
|
uuid: str,
|
||||||
|
):
|
||||||
self.authors = authors
|
self.authors = authors
|
||||||
self.category = category
|
self.category = category
|
||||||
self.description = description
|
self.description = description
|
||||||
|
@ -85,7 +111,7 @@ class Cluster():
|
||||||
|
|
||||||
def add_values(self):
|
def add_values(self):
|
||||||
print("This method should be implemented in the child class")
|
print("This method should be implemented in the child class")
|
||||||
|
|
||||||
def save_to_file(self, path):
|
def save_to_file(self, path):
|
||||||
with open(path, "w") as file:
|
with open(path, "w") as file:
|
||||||
file.write(json.dumps(self.__dict__(), indent=4))
|
file.write(json.dumps(self.__dict__(), indent=4))
|
||||||
|
@ -104,29 +130,48 @@ class Cluster():
|
||||||
"uuid": self.uuid,
|
"uuid": self.uuid,
|
||||||
"values": self.values,
|
"values": self.values,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class GroupCluster(Cluster):
|
class GroupCluster(Cluster):
|
||||||
def __init__(self, authors: str, category: str, description: str, name: str, source: str, type: str, uuid: str):
|
def __init__(
|
||||||
|
self,
|
||||||
|
authors: str,
|
||||||
|
category: str,
|
||||||
|
description: str,
|
||||||
|
name: str,
|
||||||
|
source: str,
|
||||||
|
type: str,
|
||||||
|
uuid: str,
|
||||||
|
):
|
||||||
super().__init__(authors, category, description, name, source, type, uuid)
|
super().__init__(authors, category, description, name, source, type, uuid)
|
||||||
|
|
||||||
def add_values(self, data):
|
def add_values(self, data):
|
||||||
for entry in data["data"]:
|
for entry in data["data"]:
|
||||||
meta = GroupsMeta(
|
meta = GroupsMeta(
|
||||||
source=entry.get("source"),
|
source=entry.get("source"),
|
||||||
group_attack_id=entry.get("group_attack_id"),
|
group_attack_id=entry.get("group_attack_id"),
|
||||||
country=entry.get("country")[0].get("country_code") if entry.get("country") else None,
|
country=(
|
||||||
observed_countries=[x.get("country_code") for x in entry.get("observed_country")],
|
entry.get("country")[0].get("country_code")
|
||||||
observed_motivations=[x.get("name") for x in entry.get("observed_motivation")],
|
if entry.get("country")
|
||||||
|
else None
|
||||||
|
),
|
||||||
|
observed_countries=[
|
||||||
|
x.get("country_code") for x in entry.get("observed_country")
|
||||||
|
],
|
||||||
|
observed_motivations=[
|
||||||
|
x.get("name") for x in entry.get("observed_motivation")
|
||||||
|
],
|
||||||
target_categories=[x.get("name") for x in entry.get("observed_sector")],
|
target_categories=[x.get("name") for x in entry.get("observed_sector")],
|
||||||
tags=[x.get("tag") for x in entry.get("tags")],
|
tags=[x.get("tag") for x in entry.get("tags")],
|
||||||
owner=entry.get("owner_name"),
|
owner=entry.get("owner_name"),
|
||||||
)
|
)
|
||||||
related = []
|
related = []
|
||||||
for relation in entry.get("associated_groups"):
|
for relation in entry.get("associated_groups"):
|
||||||
related.append({
|
related.append(
|
||||||
"dest-uuid": relation.get("id"),
|
{
|
||||||
"type": "related-to",
|
"dest-uuid": relation.get("id"),
|
||||||
}
|
"type": "related-to",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
value = ClusterValue(
|
value = ClusterValue(
|
||||||
description=entry.get("description"),
|
description=entry.get("description"),
|
||||||
|
@ -139,9 +184,18 @@ class GroupCluster(Cluster):
|
||||||
|
|
||||||
|
|
||||||
class SoftwareCluster(Cluster):
|
class SoftwareCluster(Cluster):
|
||||||
def __init__(self, authors: str, category: str, description: str, name: str, source: str, type: str, uuid: str):
|
def __init__(
|
||||||
|
self,
|
||||||
|
authors: str,
|
||||||
|
category: str,
|
||||||
|
description: str,
|
||||||
|
name: str,
|
||||||
|
source: str,
|
||||||
|
type: str,
|
||||||
|
uuid: str,
|
||||||
|
):
|
||||||
super().__init__(authors, category, description, name, source, type, uuid)
|
super().__init__(authors, category, description, name, source, type, uuid)
|
||||||
|
|
||||||
def add_values(self, data):
|
def add_values(self, data):
|
||||||
for entry in data["data"]:
|
for entry in data["data"]:
|
||||||
meta = SoftwareMeta(
|
meta = SoftwareMeta(
|
||||||
|
@ -154,16 +208,18 @@ class SoftwareCluster(Cluster):
|
||||||
)
|
)
|
||||||
related = []
|
related = []
|
||||||
for relation in entry.get("groups"):
|
for relation in entry.get("groups"):
|
||||||
related.append({
|
related.append(
|
||||||
"dest-uuid": relation.get("group_id"),
|
{
|
||||||
"type": "used-by",
|
"dest-uuid": relation.get("group_id"),
|
||||||
}
|
"type": "used-by",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
for relation in entry.get("associated_software"):
|
for relation in entry.get("associated_software"):
|
||||||
related.append({
|
related.append(
|
||||||
"dest-uuid": relation.get("id"),
|
{
|
||||||
"type": "related-to",
|
"dest-uuid": relation.get("id"),
|
||||||
}
|
"type": "related-to",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
value = ClusterValue(
|
value = ClusterValue(
|
||||||
description=entry.get("description"),
|
description=entry.get("description"),
|
||||||
|
@ -174,10 +230,20 @@ class SoftwareCluster(Cluster):
|
||||||
)
|
)
|
||||||
self.values.append(value.return_value())
|
self.values.append(value.return_value())
|
||||||
|
|
||||||
|
|
||||||
class TechniqueCluster(Cluster):
|
class TechniqueCluster(Cluster):
|
||||||
def __init__(self, authors: str, category: str, description: str, name: str, source: str, type: str, uuid: str):
|
def __init__(
|
||||||
|
self,
|
||||||
|
authors: str,
|
||||||
|
category: str,
|
||||||
|
description: str,
|
||||||
|
name: str,
|
||||||
|
source: str,
|
||||||
|
type: str,
|
||||||
|
uuid: str,
|
||||||
|
):
|
||||||
super().__init__(authors, category, description, name, source, type, uuid)
|
super().__init__(authors, category, description, name, source, type, uuid)
|
||||||
|
|
||||||
def add_values(self, data):
|
def add_values(self, data):
|
||||||
for entry in data["data"]:
|
for entry in data["data"]:
|
||||||
meta = TechniqueMeta(
|
meta = TechniqueMeta(
|
||||||
|
@ -188,10 +254,11 @@ class TechniqueCluster(Cluster):
|
||||||
)
|
)
|
||||||
related = []
|
related = []
|
||||||
for relation in entry.get("tactic"):
|
for relation in entry.get("tactic"):
|
||||||
related.append({
|
related.append(
|
||||||
"dest-uuid": relation.get("tactic_id"),
|
{
|
||||||
"type": "uses",
|
"dest-uuid": relation.get("tactic_id"),
|
||||||
}
|
"type": "uses",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
value = ClusterValue(
|
value = ClusterValue(
|
||||||
description=entry.get("description"),
|
description=entry.get("description"),
|
||||||
|
@ -202,10 +269,42 @@ class TechniqueCluster(Cluster):
|
||||||
)
|
)
|
||||||
self.values.append(value.return_value())
|
self.values.append(value.return_value())
|
||||||
|
|
||||||
|
for sub_technique in entry.get("sub_technique"):
|
||||||
|
meta = SubTechniqueMeta(
|
||||||
|
source=sub_technique.get("source"),
|
||||||
|
technique_attack_id=sub_technique.get("technique_attack_id"),
|
||||||
|
)
|
||||||
|
related = []
|
||||||
|
for relation in sub_technique.get("tactic"):
|
||||||
|
related.append(
|
||||||
|
{
|
||||||
|
"dest-uuid": relation.get("tactic_id"),
|
||||||
|
"type": "uses",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
value = ClusterValue(
|
||||||
|
description=sub_technique.get("description"),
|
||||||
|
meta=meta,
|
||||||
|
related=related,
|
||||||
|
uuid=sub_technique.get("id"),
|
||||||
|
value=sub_technique.get("name"),
|
||||||
|
)
|
||||||
|
self.values.append(value.return_value())
|
||||||
|
|
||||||
|
|
||||||
class TacticCluster(Cluster):
|
class TacticCluster(Cluster):
|
||||||
def __init__(self, authors: str, category: str, description: str, name: str, source: str, type: str, uuid: str):
|
def __init__(
|
||||||
|
self,
|
||||||
|
authors: str,
|
||||||
|
category: str,
|
||||||
|
description: str,
|
||||||
|
name: str,
|
||||||
|
source: str,
|
||||||
|
type: str,
|
||||||
|
uuid: str,
|
||||||
|
):
|
||||||
super().__init__(authors, category, description, name, source, type, uuid)
|
super().__init__(authors, category, description, name, source, type, uuid)
|
||||||
|
|
||||||
def add_values(self, data):
|
def add_values(self, data):
|
||||||
for entry in data["data"]:
|
for entry in data["data"]:
|
||||||
meta = TacticMeta(
|
meta = TacticMeta(
|
||||||
|
@ -217,10 +316,11 @@ class TacticCluster(Cluster):
|
||||||
)
|
)
|
||||||
related = []
|
related = []
|
||||||
for relation in entry.get("techniques"):
|
for relation in entry.get("techniques"):
|
||||||
related.append({
|
related.append(
|
||||||
"dest-uuid": relation.get("technique_id"),
|
{
|
||||||
"type": "uses",
|
"dest-uuid": relation.get("technique_id"),
|
||||||
}
|
"type": "uses",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
value = ClusterValue(
|
value = ClusterValue(
|
||||||
description=entry.get("description"),
|
description=entry.get("description"),
|
||||||
|
@ -231,10 +331,20 @@ class TacticCluster(Cluster):
|
||||||
)
|
)
|
||||||
self.values.append(value.return_value())
|
self.values.append(value.return_value())
|
||||||
|
|
||||||
|
|
||||||
class ReferencesCluster(Cluster):
|
class ReferencesCluster(Cluster):
|
||||||
def __init__(self, authors: str, category: str, description: str, name: str, source: str, type: str, uuid: str):
|
def __init__(
|
||||||
|
self,
|
||||||
|
authors: str,
|
||||||
|
category: str,
|
||||||
|
description: str,
|
||||||
|
name: str,
|
||||||
|
source: str,
|
||||||
|
type: str,
|
||||||
|
uuid: str,
|
||||||
|
):
|
||||||
super().__init__(authors, category, description, name, source, type, uuid)
|
super().__init__(authors, category, description, name, source, type, uuid)
|
||||||
|
|
||||||
def add_values(self, data):
|
def add_values(self, data):
|
||||||
for entry in data["data"]:
|
for entry in data["data"]:
|
||||||
meta = ReferencesMeta(
|
meta = ReferencesMeta(
|
||||||
|
@ -255,8 +365,18 @@ class ReferencesCluster(Cluster):
|
||||||
)
|
)
|
||||||
self.values.append(value.return_value())
|
self.values.append(value.return_value())
|
||||||
|
|
||||||
|
|
||||||
class CampaignsCluster(Cluster):
|
class CampaignsCluster(Cluster):
|
||||||
def __init__(self, authors: str, category: str, description: str, name: str, source: str, type: str, uuid: str):
|
def __init__(
|
||||||
|
self,
|
||||||
|
authors: str,
|
||||||
|
category: str,
|
||||||
|
description: str,
|
||||||
|
name: str,
|
||||||
|
source: str,
|
||||||
|
type: str,
|
||||||
|
uuid: str,
|
||||||
|
):
|
||||||
super().__init__(authors, category, description, name, source, type, uuid)
|
super().__init__(authors, category, description, name, source, type, uuid)
|
||||||
|
|
||||||
def add_values(self, data):
|
def add_values(self, data):
|
||||||
|
@ -277,4 +397,4 @@ class CampaignsCluster(Cluster):
|
||||||
uuid=entry.get("id"),
|
uuid=entry.get("id"),
|
||||||
value=entry.get("name"),
|
value=entry.get("name"),
|
||||||
)
|
)
|
||||||
self.values.append(value.return_value())
|
self.values.append(value.return_value())
|
||||||
|
|
Loading…
Reference in a new issue