chg: [trackers] refactor trackers

This commit is contained in:
Terrtia 2023-05-11 16:21:43 +02:00
parent 6b60041db2
commit 4473086f89
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
14 changed files with 871 additions and 875 deletions

View file

@ -9,7 +9,7 @@ import os
import sys
import json
from flask import render_template, jsonify, request, Blueprint, redirect, url_for, Response, escape
from flask import render_template, jsonify, request, Blueprint, redirect, url_for, Response, escape, abort
from flask_login import login_required, current_user, login_user, logout_user
sys.path.append('modules')
@ -27,6 +27,7 @@ from lib.objects import ail_objects
from lib import item_basic
from lib import Tracker
from lib import Tag
from packages import Date
bootstrap_label = Flask_config.bootstrap_label
@ -47,6 +48,14 @@ def create_json_response(data, status_code):
# ============= ROUTES ==============
@hunters.route("/yara/rule/default/content", methods=['GET'])
@login_required
@login_read_only
def get_default_yara_rule_content():
default_yara_rule = request.args.get('rule')
res = Tracker.api_get_default_rule_content(default_yara_rule)
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]
##################
# TRACKERS #
##################
@ -120,98 +129,160 @@ def tracked_menu_typosquatting():
return render_template("trackersManagement.html", user_trackers=user_trackers, global_trackers=global_trackers,
bootstrap_label=bootstrap_label, tracker_type=tracker_type)
@hunters.route("/tracker/show")
@login_required
@login_read_only
def show_tracker():
user_id = current_user.get_id()
tracker_uuid = request.args.get('uuid', None)
res = Tracker.api_is_allowed_to_edit_tracker(tracker_uuid, user_id)
if res[1] != 200: # invalid access
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
if date_from:
date_from = date_from.replace('-', '')
if date_to:
date_to = date_to.replace('-', '')
tracker = Tracker.Tracker(tracker_uuid)
meta = tracker.get_meta(options={'description', 'level', 'mails', 'filters', 'sparkline', 'tags',
'user', 'webhook', 'nb_objs'})
if meta['type'] == 'yara':
yara_rule_content = Tracker.get_yara_rule_content(meta['tracked'])
else:
yara_rule_content = None
if meta['type'] == 'typosquatting':
typo_squatting = Tracker.get_tracked_typosquatting_domains(meta['tracked'])
sorted(typo_squatting)
else:
typo_squatting = set()
if date_from:
date_from, date_to = Date.sanitise_daterange(meta['first_seen'], meta['last_seen'])
objs = tracker.get_objs_by_daterange(date_from, date_to)
meta['objs'] = ail_objects.get_objects_meta(objs, flask_context=True)
else:
date_from = ''
date_to = ''
meta['objs'] = []
meta['date_from'] = date_from
meta['date_to'] = date_to
meta['item_sources'] = sorted(meta['filters'].get('item', {}).get('sources', []))
if meta['filters']:
meta['filters'] = json.dumps(meta['filters'], indent=4)
return render_template("tracker_show.html", meta=meta,
rule_content=yara_rule_content,
typo_squatting=typo_squatting,
bootstrap_label=bootstrap_label)
def parse_add_edit_request(request_form):
to_track = request_form.get("tracker")
tracker_uuid = request_form.get("tracker_uuid")
tracker_type = request_form.get("tracker_type")
nb_words = request_form.get("nb_word", 1)
description = request.form.get("description", '')
webhook = request_form.get("webhook", '')
level = request_form.get("level", 0)
mails = request_form.get("mails", [])
# TAGS
tags = request_form.get("tags", [])
taxonomies_tags = request_form.get('taxonomies_tags')
if taxonomies_tags:
try:
taxonomies_tags = json.loads(taxonomies_tags)
except:
taxonomies_tags = []
else:
taxonomies_tags = []
galaxies_tags = request_form.get('galaxies_tags')
if galaxies_tags:
try:
galaxies_tags = json.loads(galaxies_tags)
except:
galaxies_tags = []
else:
galaxies_tags = []
# custom tags
if tags:
tags = tags.split()
else:
tags = []
escaped = []
for tag in tags:
escaped.append(tag)
tags = escaped + taxonomies_tags + galaxies_tags
# YARA #
if tracker_type == 'yara':
yara_default_rule = request_form.get("yara_default_rule")
yara_custom_rule = request_form.get("yara_custom_rule")
if yara_custom_rule:
to_track = yara_custom_rule
tracker_type = 'yara_custom'
else:
to_track = yara_default_rule
tracker_type = 'yara_default'
if level == 'on':
level = 1
else:
level = 0
if mails:
mails = mails.split()
else:
mails = []
# FILTERS
filters = {}
for obj_type in Tracker.get_objects_tracked():
new_filter = request_form.get(f'{obj_type}_obj')
if new_filter == 'on':
filters[obj_type] = {}
# Mimetypes
mimetypes = request_form.get(f'mimetypes_{obj_type}', [])
if mimetypes:
mimetypes = json.loads(mimetypes)
filters[obj_type]['mimetypes'] = mimetypes
# Sources
sources = request_form.get(f'sources_{obj_type}', [])
if sources:
sources = json.loads(sources)
filters[obj_type]['sources'] = sources
# Subtypes
for obj_subtype in ail_core.get_object_all_subtypes(obj_type):
subtype = request_form.get(f'filter_{obj_type}_{obj_subtype}')
if subtype == 'on':
if 'subtypes' not in filters[obj_type]:
filters[obj_type]['subtypes'] = []
filters[obj_type]['subtypes'].append(obj_subtype)
input_dict = {"tracked": to_track, "type": tracker_type,
"tags": tags, "mails": mails, "filters": filters,
"level": level, "description": description, "webhook": webhook}
if tracker_uuid:
input_dict['uuid'] = tracker_uuid
if tracker_type == 'set':
try:
input_dict['nb_words'] = int(nb_words)
except (ValueError, TypeError):
input_dict['nb_words'] = 1
return input_dict
@hunters.route("/tracker/add", methods=['GET', 'POST'])
@login_required
@login_analyst
def add_tracked_menu():
if request.method == 'POST':
to_track = request.form.get("tracker")
tracker_uuid = request.form.get("tracker_uuid")
tracker_type = request.form.get("tracker_type")
nb_words = request.form.get("nb_word", 1)
description = request.form.get("description", '')
webhook = request.form.get("webhook", '')
level = request.form.get("level", 0)
mails = request.form.get("mails", [])
# TAGS
tags = request.form.get("tags", [])
taxonomies_tags = request.form.get('taxonomies_tags')
if taxonomies_tags:
try:
taxonomies_tags = json.loads(taxonomies_tags)
except:
taxonomies_tags = []
else:
taxonomies_tags = []
galaxies_tags = request.form.get('galaxies_tags')
if galaxies_tags:
try:
galaxies_tags = json.loads(galaxies_tags)
except:
galaxies_tags = []
else:
galaxies_tags = []
# custom tags
if tags:
tags = tags.split()
else:
tags = []
tags = tags + taxonomies_tags + galaxies_tags
# YARA #
if tracker_type == 'yara':
yara_default_rule = request.form.get("yara_default_rule")
yara_custom_rule = request.form.get("yara_custom_rule")
if yara_custom_rule:
to_track = yara_custom_rule
tracker_type = 'yara_custom'
else:
to_track = yara_default_rule
tracker_type = 'yara_default'
if level == 'on':
level = 1
else:
level = 0
if mails:
mails = mails.split()
else:
tags = []
# FILTERS
filters = {}
for obj_type in Tracker.get_objects_tracked():
new_filter = request.form.get(f'{obj_type}_obj')
if new_filter == 'on':
filters[obj_type] = {}
# Mimetypes
mimetypes = request.form.get(f'mimetypes_{obj_type}', [])
if mimetypes:
mimetypes = json.loads(mimetypes)
filters[obj_type]['mimetypes'] = mimetypes
# Sources
sources = request.form.get(f'sources_{obj_type}', [])
if sources:
sources = json.loads(sources)
filters[obj_type]['sources'] = sources
# Subtypes
for obj_subtype in ail_core.get_object_all_subtypes(obj_type):
subtype = request.form.get(f'filter_{obj_type}_{obj_subtype}')
if subtype == 'on':
if 'subtypes' not in filters[obj_type]:
filters[obj_type]['subtypes'] = []
filters[obj_type]['subtypes'].append(obj_subtype)
input_dict = {"tracked": to_track, "type": tracker_type,
"tags": tags, "mails": mails, "filters": filters,
"level": level, "description": description, "webhook": webhook}
if tracker_type == 'set':
try:
input_dict['nb_words'] = int(nb_words)
except TypeError:
input_dict['nb_words'] = 1
input_dict = parse_add_edit_request(request.form)
user_id = current_user.get_id()
res = Tracker.api_add_tracker(input_dict, user_id)
if res[1] == 200:
@ -220,9 +291,42 @@ def add_tracked_menu():
return create_json_response(res[0], res[1])
else:
return render_template("tracker_add.html",
all_sources=item_basic.get_all_items_sources(r_list=True),
tags_selector_data=Tag.get_tags_selector_data(),
all_yara_files=Tracker.get_all_default_yara_files())
all_sources=item_basic.get_all_items_sources(r_list=True),
tags_selector_data=Tag.get_tags_selector_data(),
all_yara_files=Tracker.get_all_default_yara_files())
@hunters.route("/tracker/edit", methods=['GET', 'POST'])
@login_required
@login_analyst
def tracker_edit():
if request.method == 'POST':
input_dict = parse_add_edit_request(request.form)
user_id = current_user.get_id()
res = Tracker.api_edit_tracker(input_dict, user_id)
if res[1] == 200:
return redirect(url_for('hunters.show_tracker', uuid=res[0].get('uuid')))
else:
user_id = current_user.get_id()
tracker_uuid = request.args.get('uuid', None)
res = Tracker.api_is_allowed_to_edit_tracker(tracker_uuid, user_id)
if res[1] != 200: # invalid access
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]
tracker = Tracker.Tracker(tracker_uuid)
dict_tracker = tracker.get_meta(options={'description', 'level', 'mails', 'filters', 'tags', 'webhook'})
if dict_tracker['type'] == 'yara':
if not Tracker.is_default_yara_rule(dict_tracker['tracked']):
dict_tracker['content'] = Tracker.get_yara_rule_content(dict_tracker['tracked'])
taxonomies_tags, galaxies_tags, custom_tags = Tag.sort_tags_taxonomies_galaxies_customs(dict_tracker['tags'])
tags_selector_data = Tag.get_tags_selector_data()
tags_selector_data['taxonomies_tags'] = taxonomies_tags
tags_selector_data['galaxies_tags'] = galaxies_tags
dict_tracker['tags'] = custom_tags
return render_template("tracker_add.html",
dict_tracker=dict_tracker,
all_sources=item_basic.get_all_items_sources(r_list=True),
tags_selector_data=tags_selector_data,
all_yara_files=Tracker.get_all_default_yara_files())
@hunters.route('/tracker/delete', methods=['GET'])
@login_required
@ -234,7 +338,31 @@ def tracker_delete():
if res[1] != 200:
return create_json_response(res[0], res[1])
else:
return redirect(url_for('hunter.tracked_menu'))
return redirect(url_for('hunters.trackers_dashboard'))
@hunters.route("/tracker/graph/json", methods=['GET'])
@login_required
@login_read_only
def get_json_tracker_graph():
user_id = current_user.get_id()
tracker_uuid = request.args.get('uuid')
res = Tracker.api_check_tracker_acl(tracker_uuid, user_id)
if res:
return create_json_response(res[0], res[1])
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
if date_from:
date_from = date_from.replace('-', '')
if date_to:
date_to = date_to.replace('-', '')
if date_from and date_to:
res = Tracker.get_trackers_graph_by_day([tracker_uuid], date_from=date_from, date_to=date_to)
else:
res = Tracker.get_trackers_graph_by_day([tracker_uuid])
return jsonify(res)
####################