#!/usr/bin/env python3 # # A simple convertor of the GSMA Mobile Threat Intelligence Framework (MoTIF) Principles to a MISP Galaxy datastructure. # https://www.gsma.com/security/resources/fs-57-mobile-threat-intelligence-framework-motif-principles/ # Copyright (c) 2024 MISP Project # Copyright (c) 2024 Christophe Vandeplas # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import pdfplumber import requests import json import tempfile import os import uuid pdf_url = 'https://www.gsma.com/solutions-and-impact/technologies/security/wp-content/uploads/2024/04/FS.57-MoTIF-Principles-v1.0.pdf' uuid_seed = '5022ff98-cf0d-45d2-89b5-5c63104197cc' def sub_table_to_list(table: list) -> list: if len(table) == 0: return [] try: result = [] # FIXME use header row to know column names for row in table: result.append({ 'ID': row[2].replace('\n', ''), 'Name': row[4]. replace('\n', ' ').strip(), 'Description': row[5] }) return result except IndexError: return [] def table_to_technique(table: list) -> dict: ''' Convert a table to a technique dictionary ''' result = {} row_index = 0 while row_index < len(table): row = table[row_index] # row[1] is None : sub-table in table field = cleanup_field(row[0]) try: if result['ID'] == 'MOT1036.301': pass except KeyError: pass if field == 'Procedure Examples': # extract sub-table in the next rows sub_table = [] try: while table[row_index + 1][0] is None: sub_table.append(table[row_index + 1]) row_index += 1 except IndexError: # just the end of the page, will be handled in the next page pass value = sub_table_to_list(sub_table) elif field == 'Analogous technique in other frameworks': # column index is not always the same... so figure out the first non-empty cell i = 1 value = '' while i < len(row): try: if row[i] is not None: value = row[i] break except IndexError: pass i += 1 elif not field: # annoyingly a sub-table might have been parsed differently from previous page. So bad luck. There's not much we can do about it except even worse code than we have here. row_index += 1 continue else: value = row[1].replace('\n', ' ').strip() result[field] = value row_index += 1 return result def cleanup_field(field: str) -> str: ''' Cleanup a field name ''' try: return field.strip().replace(':', '').replace('\n', ' ').replace('- ', '-').strip() except AttributeError: return '' def is_end_of_table(table: list) -> bool: ''' Check if this is the end of the table, by checking the last row in the table. ''' try: # Techniques if table['ID'].startswith('MOT') and 'Analogous technique in other frameworks' in table: return True # Mitigations if table['ID'].startswith('MOS') and 'References' in table: return True except KeyError: pass return False def parse_pdf(pdf_file_name: str) -> dict: table_settings = { "vertical_strategy": "lines", "horizontal_strategy": "lines", # "explicit_vertical_lines": [], # "explicit_horizontal_lines": [], # "snap_tolerance": 6, "snap_x_tolerance": 6, # pg49: must be 6 "snap_y_tolerance": 3, # max 14 # "join_tolerance": 3, # "join_x_tolerance": 3, # "join_y_tolerance": 3, # "edge_min_length": 3, # "min_words_vertical": 3, # "min_words_horizontal": 1, # "intersection_tolerance": 3, # "intersection_x_tolerance": 3, # "intersection_y_tolerance": 3, # "text_tolerance": 3, # "text_x_tolerance": 3, # "text_y_tolerance": 3, } entries = {} with pdfplumber.open(pdf_file_name) as pdfp: page_index = 0 title_seen = False curr_table = None while page_index < len(pdfp.pages): page = pdfp.pages[page_index] # skip to section 4.1 Techniques and Sub-techniques Definition if not title_seen: page_text = page.extract_text() if '4.1 Techniques and Sub-techniques Definition' not in page_text or 'Table of Contents' in page_text: # print(f"Skipping page {page_index}") page_index += 1 continue title_seen = True # parse technique tables for table in page.extract_tables(table_settings=table_settings): if curr_table: # merge tables if continuation # if first row does not have a first column, then it's the continuation of the previous row if table[0][0] == '' and table[0][1] != '': curr_table[-1][1] += ' ' + table[0][1] # add description of new row to previous row table.pop(0) # remove the first new row of the table # annoyingly a sub-table might have been parsed differently from previous page. So bad luck. There's not much we can do about it except even worse code than we have here. # handle rest of merging case table = curr_table + table curr_table = None # reset for clean start parsed_table = table_to_technique(table) if is_end_of_table(parsed_table): # valid table parsed_table['page'] = page_index + 1 # minor bug: we document the page where the table ends, not where it starts entries[parsed_table['ID']] = parsed_table else: # incomplete table, store in curr_table and continue next row curr_table = table page_index += 1 return entries print(f"Downloading PDF: {pdf_url}") r = requests.get(pdf_url, allow_redirects=True) with tempfile.TemporaryFile() as tmp_f: tmp_f.write(r.content) print("Parsing PDF ... this takes time") items = parse_pdf(tmp_f) print("Converting to MISP Galaxy ...") # now convert and extract data to have something clean and usable kill_chain_tactics = { 'Techniques': [], } techniques = [] for item in items.values(): if item['ID'].startswith('MOT'): kill_chain_root = 'Techniques' else: # TODO skip these MOS softwares for now continue if ',' in item['Tactic']: tactics = [t.strip().replace(' ', '-') for t in item['Tactic'].split(',')] else: tactics = [item['Tactic'].replace(' ', '-')] kill_chain = [] for tactic in tactics: kill_chain_tactics[kill_chain_root].append(tactic) kill_chain.append(f"{kill_chain_root}:{tactic}") technique = { 'value': item['Name'], 'description': item['Description'], 'uuid': str(uuid.uuid5(uuid.UUID(uuid_seed), item['ID'])), 'meta': { 'kill_chain': kill_chain, 'refs': [ f"page {item['page']} of {pdf_url}" ], 'external_id': item['ID'], } } if item['References']: technique['meta']['refs'].append(item['References']) if item['Analogous technique in other frameworks']: technique['meta']['refs'].append(item['Analogous technique in other frameworks']) techniques.append(technique) # TODO relations + refs as subtechniques # make entries unique kill_chain_tactics['Techniques'] = list(set(kill_chain_tactics['Techniques'])) galaxy_fname = 'gsma-motif.json' galaxy_type = "gsma-motif" galaxy_name = "GSMA MoTIF" galaxy_description = 'Mobile Threat Intelligence Framework (MoTIF) Principles. ' galaxy_source = 'https://www.gsma.com/solutions-and-impact/technologies/security/latest-news/establishing-motif-the-mobile-threat-intelligence-framework/' json_galaxy = { 'description': galaxy_description, 'icon': "user-shield", 'kill_chain_order': kill_chain_tactics, 'name': galaxy_name, 'namespace': "gsma", 'type': galaxy_type, 'uuid': "57cf3a17-e186-407a-b58b-d53887ce4950", 'version': 1 } json_cluster = { 'authors': ["GSMA"], 'category': 'attack-pattern', 'name': galaxy_name, 'description': galaxy_description, 'source': galaxy_source, 'type': galaxy_type, 'uuid': "02cb3863-ecb2-4a93-a5ed-18bb6dfd5c89", 'values': list(techniques), 'version': 1 } # save the Galaxy and Cluster file # with open(os.path.join('..', 'galaxies', galaxy_fname), 'w') as f: # # sort_keys, even if it breaks the kill_chain_order , but jq_all_the_things requires sorted keys # json.dump(json_galaxy, f, indent=2, sort_keys=True, ensure_ascii=False) # f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things with open(os.path.join('..', 'clusters', galaxy_fname), 'w') as f: json.dump(json_cluster, f, indent=2, sort_keys=True, ensure_ascii=False) f.write('\n') # only needed for the beauty and to be compliant with jq_all_the_things print("All done, please don't forget to ./jq_all_the_things.sh, commit, and then ./validate_all.sh.")