mirror of
https://github.com/MISP/misp-galaxy.git
synced 2024-11-26 16:57:18 +00:00
commit
5f1b2305cf
6 changed files with 6050 additions and 0 deletions
5736
clusters/intelligence-agencies.json
Normal file
5736
clusters/intelligence-agencies.json
Normal file
File diff suppressed because it is too large
Load diff
9
galaxies/intelligence-agencies.json
Normal file
9
galaxies/intelligence-agencies.json
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
{
|
||||||
|
"description": "List of intelligence agencies",
|
||||||
|
"icon": "ninja",
|
||||||
|
"name": "Intelligence Agencies",
|
||||||
|
"namespace": "intelligence-agency",
|
||||||
|
"type": "intelligence-agency",
|
||||||
|
"uuid": "3ef969e7-96cd-4048-aa83-191ac457d0db",
|
||||||
|
"version": 1
|
||||||
|
}
|
157
tools/IntelAgencies/main.py
Normal file
157
tools/IntelAgencies/main.py
Normal file
|
@ -0,0 +1,157 @@
|
||||||
|
from modules.api import WikipediaAPI
|
||||||
|
from modules.intel import IntelAgency, Meta, Galaxy, Cluster
|
||||||
|
import os
|
||||||
|
import uuid
|
||||||
|
import json
|
||||||
|
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
import pycountry
|
||||||
|
|
||||||
|
CLUSTER_PATH = '../../clusters'
|
||||||
|
GALAXY_PATH = '../../galaxies'
|
||||||
|
GALAXY_NAME = 'intelligence-agencies'
|
||||||
|
UUID = "3ef969e7-96cd-4048-aa83-191ac457d0db"
|
||||||
|
WIKIPEDIA_URL = "https://en.wikipedia.org"
|
||||||
|
|
||||||
|
COUNTRY_CODES = {
|
||||||
|
"Brunei": "BN",
|
||||||
|
"People's Republic of China": "CN",
|
||||||
|
"Democratic Republic of the Congo": "CD", # Note: This is for the Democratic Republic of the Congo, not to be confused with the Republic of the Congo (CG)
|
||||||
|
"Czech Republic": "CZ",
|
||||||
|
"Iran": "IR",
|
||||||
|
"Moldova": "MD", # Officially known as the Republic of Moldova
|
||||||
|
"North Korea": "KP", # Officially the Democratic People's Republic of Korea (DPRK)
|
||||||
|
"Palestine": "PS",
|
||||||
|
"Russia": "RU", # Officially the Russian Federation
|
||||||
|
"South Korea": "KR", # Officially the Republic of Korea (ROK)
|
||||||
|
"Syria": "SY", # Officially the Syrian Arab Republic
|
||||||
|
"Taiwan": "TW", # ISO code is assigned as "Taiwan, Province of China"
|
||||||
|
"Tanzania": "TZ", # Officially the United Republic of Tanzania
|
||||||
|
"Trinidad & Tobago": "TT",
|
||||||
|
"Turkey": "TR",
|
||||||
|
"Venezuela": "VE", # Officially the Bolivarian Republic of Venezuela
|
||||||
|
"Vietnam": "VN", # Officially the Socialist Republic of Vietnam
|
||||||
|
"European Union": None, # Not a country, no ISO code
|
||||||
|
"Shanghai Cooperation Organisation": None # Not a country, no ISO code
|
||||||
|
}
|
||||||
|
|
||||||
|
def compute_uuid(value, namespace=UUID):
|
||||||
|
return str(uuid.uuid5(uuid.UUID(namespace), value))
|
||||||
|
|
||||||
|
def get_notes_on_lower_level(content):
|
||||||
|
notes = []
|
||||||
|
for li in content.find_all('li', recursive=False):
|
||||||
|
if li.find('ul'):
|
||||||
|
notes.extend(get_notes_on_lower_level(li.find('ul')))
|
||||||
|
else:
|
||||||
|
a_tag = li.find('a')
|
||||||
|
|
||||||
|
title = li.text
|
||||||
|
link_href = None
|
||||||
|
description = li.text
|
||||||
|
|
||||||
|
i_tag = li.find_all('i')
|
||||||
|
synonyms = [i.text for i in i_tag]
|
||||||
|
|
||||||
|
if a_tag:
|
||||||
|
title = a_tag.get('title', description)
|
||||||
|
if a_tag.has_attr('href'):
|
||||||
|
link_href = f'{WIKIPEDIA_URL}{a_tag["href"]}'
|
||||||
|
|
||||||
|
if len(synonyms) == 0 or synonyms[0] == title:
|
||||||
|
synonyms = None
|
||||||
|
|
||||||
|
notes.append((title, link_href, description, synonyms))
|
||||||
|
return notes
|
||||||
|
|
||||||
|
def get_agencies_from_country(heading, current_country):
|
||||||
|
agencies = []
|
||||||
|
contents = []
|
||||||
|
contents.append(heading.find_next('ul'))
|
||||||
|
|
||||||
|
current_content = contents[0]
|
||||||
|
while True:
|
||||||
|
next_sibling = current_content.find_next_sibling()
|
||||||
|
|
||||||
|
if next_sibling is None or next_sibling.name == 'h2':
|
||||||
|
break
|
||||||
|
|
||||||
|
if next_sibling.name == 'ul':
|
||||||
|
contents.append(next_sibling)
|
||||||
|
|
||||||
|
current_content = next_sibling
|
||||||
|
|
||||||
|
for content in contents:
|
||||||
|
agency_names = get_notes_on_lower_level(content)
|
||||||
|
for name, links, description, synonyms in agency_names:
|
||||||
|
country_code = pycountry.countries.get(name=current_country)
|
||||||
|
|
||||||
|
# Set country
|
||||||
|
country_name = current_country
|
||||||
|
|
||||||
|
if country_code:
|
||||||
|
country_code = country_code.alpha_2
|
||||||
|
else:
|
||||||
|
country_code = COUNTRY_CODES.get(current_country)
|
||||||
|
|
||||||
|
if current_country in ["European Union", "Shanghai Cooperation Organisation"]: # Not a country
|
||||||
|
country_name = None
|
||||||
|
|
||||||
|
# Set names for duplicates
|
||||||
|
if name in ['Special Branch', 'Financial Intelligence Unit']:
|
||||||
|
name = f'{name} ({current_country})'
|
||||||
|
|
||||||
|
agencies.append(IntelAgency(value=name, uuid=compute_uuid(name), meta=Meta(country=country_code, country_name=country_name, refs=[links], synonyms=synonyms), description=description))
|
||||||
|
|
||||||
|
return agencies
|
||||||
|
|
||||||
|
def extract_info(content):
|
||||||
|
IGNORE = ["See also", "References", "External links", "Further reading"]
|
||||||
|
soup = BeautifulSoup(content, 'html.parser')
|
||||||
|
agencies = []
|
||||||
|
current_country = None
|
||||||
|
for h2 in soup.find_all('h2'):
|
||||||
|
span = h2.find('span', {'class': 'mw-headline'})
|
||||||
|
if span and span.text not in IGNORE:
|
||||||
|
current_country = span.text.strip()
|
||||||
|
agencies.extend(get_agencies_from_country(h2, current_country))
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
return agencies
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
wiki = WikipediaAPI()
|
||||||
|
page_title = 'List of intelligence agencies'
|
||||||
|
content = wiki.get_page_html(page_title)
|
||||||
|
if content:
|
||||||
|
agencies = extract_info(content)
|
||||||
|
else:
|
||||||
|
raise ValueError("Error: No content found: ", content)
|
||||||
|
|
||||||
|
authors = [x['name'] for x in wiki.get_authors(page_title)]
|
||||||
|
# Write to files
|
||||||
|
galaxy = Galaxy(
|
||||||
|
description="List of intelligence agencies",
|
||||||
|
icon="ninja",
|
||||||
|
name="Intelligence Agencies",
|
||||||
|
namespace="intelligence-agency",
|
||||||
|
type="intelligence-agency",
|
||||||
|
uuid=UUID,
|
||||||
|
version=1,
|
||||||
|
)
|
||||||
|
galaxy.save_to_file(os.path.join(GALAXY_PATH, f'{GALAXY_NAME}.json'))
|
||||||
|
|
||||||
|
cluster = Cluster(
|
||||||
|
authors=authors,
|
||||||
|
category="Intelligence Agencies",
|
||||||
|
description="List of intelligence agencies",
|
||||||
|
name="Intelligence Agencies",
|
||||||
|
source="https://en.wikipedia.org/wiki/List_of_intelligence_agencies",
|
||||||
|
type="intelligence-agency",
|
||||||
|
uuid=UUID,
|
||||||
|
version=1,
|
||||||
|
)
|
||||||
|
for agency in agencies:
|
||||||
|
cluster.add_value(agency)
|
||||||
|
|
||||||
|
cluster.save_to_file(os.path.join(CLUSTER_PATH, f'{GALAXY_NAME}.json'))
|
0
tools/IntelAgencies/modules/__init__.py
Normal file
0
tools/IntelAgencies/modules/__init__.py
Normal file
72
tools/IntelAgencies/modules/api.py
Normal file
72
tools/IntelAgencies/modules/api.py
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
import requests
|
||||||
|
|
||||||
|
class WikipediaAPI():
|
||||||
|
def __init__(self):
|
||||||
|
self.base_url = 'https://en.wikipedia.org/w/api.php'
|
||||||
|
|
||||||
|
def get_page_summary(self, page_title):
|
||||||
|
params = {
|
||||||
|
'action': 'query',
|
||||||
|
'format': 'json',
|
||||||
|
'titles': page_title,
|
||||||
|
'prop': 'extracts',
|
||||||
|
'explaintext': True,
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.get(self.base_url, params=params)
|
||||||
|
data = response.json()
|
||||||
|
page_id = next(iter(data['query']['pages']))
|
||||||
|
return data['query']['pages'][page_id]['extract']
|
||||||
|
except Exception as e:
|
||||||
|
print(f'Error: {e}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_page_content(self, page_title):
|
||||||
|
params = {
|
||||||
|
'action': 'query',
|
||||||
|
'format': 'json',
|
||||||
|
'titles': page_title,
|
||||||
|
'prop': 'revisions',
|
||||||
|
'rvprop': 'content',
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.get(self.base_url, params=params)
|
||||||
|
data = response.json()
|
||||||
|
page_id = next(iter(data['query']['pages']))
|
||||||
|
return data['query']['pages'][page_id]['revisions'][0]['*']
|
||||||
|
except Exception as e:
|
||||||
|
print(f'Error: {e}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_page_html(self, page_title):
|
||||||
|
params = {
|
||||||
|
'action': 'parse',
|
||||||
|
'format': 'json',
|
||||||
|
'page': page_title,
|
||||||
|
'prop': 'text',
|
||||||
|
'disableeditsection': True,
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.get(self.base_url, params=params)
|
||||||
|
data = response.json()
|
||||||
|
return data['parse']['text']['*']
|
||||||
|
except Exception as e:
|
||||||
|
print(f'Error: {e}')
|
||||||
|
return None
|
||||||
|
|
||||||
|
def get_authors(self, page_title):
|
||||||
|
params = {
|
||||||
|
'action': 'query',
|
||||||
|
'format': 'json',
|
||||||
|
'titles': page_title,
|
||||||
|
'prop': 'contributors',
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.get(self.base_url, params=params)
|
||||||
|
data = response.json()
|
||||||
|
page_id = next(iter(data['query']['pages']))
|
||||||
|
return data['query']['pages'][page_id]['contributors']
|
||||||
|
except Exception as e:
|
||||||
|
print(f'Error: {e}')
|
||||||
|
return None
|
76
tools/IntelAgencies/modules/intel.py
Normal file
76
tools/IntelAgencies/modules/intel.py
Normal file
|
@ -0,0 +1,76 @@
|
||||||
|
from dataclasses import dataclass, field, asdict, is_dataclass
|
||||||
|
import json
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Meta:
|
||||||
|
country: str = None
|
||||||
|
country_name: str = None
|
||||||
|
refs: list = field(default_factory=list)
|
||||||
|
synonyms: list = field(default_factory=list)
|
||||||
|
|
||||||
|
def custom_asdict(obj):
|
||||||
|
if is_dataclass(obj):
|
||||||
|
result = {}
|
||||||
|
for field_name, field_def in obj.__dataclass_fields__.items():
|
||||||
|
value = getattr(obj, field_name)
|
||||||
|
if field_name == 'meta':
|
||||||
|
meta_value = custom_asdict(value)
|
||||||
|
meta_value = {k: v for k, v in meta_value.items() if v is not None and not (k in ['refs', 'synonyms'] and (not v or all(e is None for e in v)))}
|
||||||
|
value = meta_value
|
||||||
|
elif isinstance(value, (list, tuple)) and all(is_dataclass(i) for i in value):
|
||||||
|
value = [custom_asdict(i) for i in value]
|
||||||
|
elif isinstance(value, list) and all(e is None for e in value):
|
||||||
|
continue
|
||||||
|
if value is None and field_name in ['country', 'country_name']:
|
||||||
|
continue
|
||||||
|
result[field_name] = value
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
return obj
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class IntelAgency:
|
||||||
|
description: str = ""
|
||||||
|
meta: Meta = field(default_factory=Meta)
|
||||||
|
related: list = field(default_factory=list)
|
||||||
|
uuid: str = None
|
||||||
|
value: str = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if not self.value:
|
||||||
|
raise ValueError("IntelAgency 'value' cannot be empty.")
|
||||||
|
if not self.uuid:
|
||||||
|
raise ValueError("IntelAgency 'uuid' cannot be empty.")
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Galaxy:
|
||||||
|
description: str
|
||||||
|
icon: str
|
||||||
|
name: str
|
||||||
|
namespace: str
|
||||||
|
type: str
|
||||||
|
uuid: str
|
||||||
|
version: int
|
||||||
|
|
||||||
|
def save_to_file(self, path: str):
|
||||||
|
with open(path, "w") as file:
|
||||||
|
file.write(json.dumps(asdict(self), indent=4))
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Cluster:
|
||||||
|
authors: str
|
||||||
|
category: str
|
||||||
|
description: str
|
||||||
|
name: str
|
||||||
|
source: str
|
||||||
|
type: str
|
||||||
|
uuid: str
|
||||||
|
version: int
|
||||||
|
values: list = field(default_factory=list)
|
||||||
|
|
||||||
|
def add_value(self, value: IntelAgency):
|
||||||
|
self.values.append(value)
|
||||||
|
|
||||||
|
def save_to_file(self, path: str):
|
||||||
|
with open(path, "w") as file:
|
||||||
|
file.write(json.dumps(custom_asdict(self), indent=4, ensure_ascii=False))
|
Loading…
Reference in a new issue