mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-30 01:37:17 +00:00
544 lines
20 KiB
Python
544 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
# -*-coding:UTF-8 -*
|
|
|
|
'''
|
|
Flask functions and routes for the trending modules page
|
|
'''
|
|
import os
|
|
import sys
|
|
import datetime
|
|
|
|
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, send_file
|
|
from Role_Manager import login_admin, login_analyst, login_read_only
|
|
from flask_login import login_required
|
|
|
|
sys.path.append(os.environ['AIL_BIN'])
|
|
##################################
|
|
# Import Project packages
|
|
##################################
|
|
from lib.objects import ail_objects
|
|
|
|
from packages.Date import Date
|
|
|
|
# ============ VARIABLES ============
|
|
import Flask_config
|
|
|
|
app = Flask_config.app
|
|
baseUrl = Flask_config.baseUrl
|
|
r_serv_metadata = Flask_config.r_serv_metadata
|
|
vt_enabled = Flask_config.vt_enabled
|
|
vt_auth = Flask_config.vt_auth
|
|
PASTES_FOLDER = Flask_config.PASTES_FOLDER
|
|
|
|
hashDecoded = Blueprint('hashDecoded', __name__, template_folder='templates')
|
|
|
|
## TODO: put me in option
|
|
all_cryptocurrency = ['bitcoin', 'ethereum', 'bitcoin-cash', 'litecoin', 'monero', 'zcash', 'dash']
|
|
all_pgpdump = ['key', 'name', 'mail']
|
|
all_username = ['telegram', 'twitter', 'jabber']
|
|
|
|
# ============ FUNCTIONS ============
|
|
|
|
def get_date_range(num_day):
|
|
curr_date = datetime.date.today()
|
|
date = Date(str(curr_date.year)+str(curr_date.month).zfill(2)+str(curr_date.day).zfill(2))
|
|
date_list = []
|
|
|
|
for i in range(0, num_day+1):
|
|
date_list.append(date.substract_day(i))
|
|
|
|
return list(reversed(date_list))
|
|
|
|
def substract_date(date_from, date_to):
|
|
date_from = datetime.date(int(date_from[0:4]), int(date_from[4:6]), int(date_from[6:8]))
|
|
date_to = datetime.date(int(date_to[0:4]), int(date_to[4:6]), int(date_to[6:8]))
|
|
delta = date_to - date_from # timedelta
|
|
l_date = []
|
|
for i in range(delta.days + 1):
|
|
date = date_from + datetime.timedelta(i)
|
|
l_date.append( date.strftime('%Y%m%d') )
|
|
return l_date
|
|
|
|
|
|
def get_icon(correlation_type, type_id):
|
|
icon_text = 'fas fa-sticky-note'
|
|
if correlation_type == 'pgpdump':
|
|
# set type_id icon
|
|
if type_id == 'key':
|
|
icon_text = 'fas fa-key'
|
|
elif type_id == 'name':
|
|
icon_text = 'fas fa-user-tag'
|
|
elif type_id == 'mail':
|
|
icon_text = 'fas fa-at'
|
|
else:
|
|
icon_text = 'times'
|
|
elif correlation_type == 'cryptocurrency':
|
|
if type_id == 'bitcoin':
|
|
icon_text = 'fab fa-btc'
|
|
elif type_id == 'monero':
|
|
icon_text = 'fab fa-monero'
|
|
elif type_id == 'ethereum':
|
|
icon_text = 'fab fa-ethereum'
|
|
else:
|
|
icon_text = 'fas fa-coins'
|
|
elif correlation_type == 'username':
|
|
if type_id == 'telegram':
|
|
icon_text = 'fab fa-telegram-plane'
|
|
elif type_id == 'twitter':
|
|
icon_text = 'fab fa-twitter'
|
|
elif type_id == 'jabber':
|
|
icon_text = 'fas fa-user'
|
|
return icon_text
|
|
|
|
def get_icon_text(correlation_type, type_id):
|
|
icon_text = '\uf249'
|
|
if correlation_type == 'pgpdump':
|
|
if type_id == 'key':
|
|
icon_text = '\uf084'
|
|
elif type_id == 'name':
|
|
icon_text = '\uf507'
|
|
elif type_id == 'mail':
|
|
icon_text = '\uf1fa'
|
|
else:
|
|
icon_text = 'times'
|
|
elif correlation_type == 'cryptocurrency':
|
|
if type_id == 'bitcoin':
|
|
icon_text = '\uf15a'
|
|
elif type_id == 'monero':
|
|
icon_text = '\uf3d0'
|
|
elif type_id == 'ethereum':
|
|
icon_text = '\uf42e'
|
|
else:
|
|
icon_text = '\uf51e'
|
|
elif correlation_type == 'username':
|
|
if type_id == 'telegram':
|
|
icon_text = '\uf2c6'
|
|
elif type_id == 'twitter':
|
|
icon_text = '\uf099'
|
|
elif type_id == 'jabber':
|
|
icon_text = '\uf007'
|
|
return icon_text
|
|
|
|
def get_all_types_id(correlation_type):
|
|
if correlation_type == 'pgpdump':
|
|
return all_pgpdump
|
|
elif correlation_type == 'cryptocurrency':
|
|
return all_cryptocurrency
|
|
elif correlation_type == 'username':
|
|
return all_username
|
|
else:
|
|
return []
|
|
|
|
def get_key_id_metadata(obj_type, subtype, obj_id):
|
|
obj = ail_objects.get_object_meta(obj_type, subtype, obj_id)
|
|
return obj
|
|
|
|
def list_sparkline_type_id_values(date_range_sparkline, correlation_type, type_id, key_id):
|
|
sparklines_value = []
|
|
for date_day in date_range_sparkline:
|
|
nb_seen_this_day = r_serv_metadata.hget('{}:{}:{}'.format(correlation_type, type_id, date_day), key_id)
|
|
if nb_seen_this_day is None:
|
|
nb_seen_this_day = 0
|
|
sparklines_value.append(int(nb_seen_this_day))
|
|
return sparklines_value
|
|
|
|
def get_correlation_type_search_endpoint(correlation_type):
|
|
if correlation_type == 'pgpdump':
|
|
endpoint = 'hashDecoded.all_pgpdump_search'
|
|
elif correlation_type == 'cryptocurrency':
|
|
endpoint = 'hashDecoded.all_cryptocurrency_search'
|
|
elif correlation_type == 'username':
|
|
endpoint = 'hashDecoded.all_username_search'
|
|
else:
|
|
endpoint = 'hashDecoded.hashDecoded_page'
|
|
return endpoint
|
|
|
|
def get_correlation_type_page_endpoint(correlation_type):
|
|
if correlation_type == 'pgpdump':
|
|
endpoint = 'hashDecoded.pgpdump_page'
|
|
elif correlation_type == 'cryptocurrency':
|
|
endpoint = 'hashDecoded.cryptocurrency_page'
|
|
elif correlation_type == 'username':
|
|
endpoint = 'hashDecoded.username_page'
|
|
else:
|
|
endpoint = 'hashDecoded.hashDecoded_page'
|
|
return endpoint
|
|
|
|
def get_show_key_id_endpoint(correlation_type):
|
|
return 'correlation.show_correlation'
|
|
|
|
def get_range_type_json_endpoint(correlation_type):
|
|
if correlation_type == 'pgpdump':
|
|
endpoint = 'hashDecoded.pgpdump_range_type_json'
|
|
elif correlation_type == 'cryptocurrency':
|
|
endpoint = 'hashDecoded.cryptocurrency_range_type_json'
|
|
elif correlation_type == 'username':
|
|
endpoint = 'hashDecoded.username_range_type_json'
|
|
else:
|
|
endpoint = 'hashDecoded.hashDecoded_page'
|
|
return endpoint
|
|
|
|
############ CORE CORRELATION ############
|
|
|
|
def main_correlation_page(correlation_type, type_id, date_from, date_to, show_decoded_files):
|
|
|
|
if type_id == 'All types':
|
|
type_id = None
|
|
|
|
# verify type input
|
|
if type_id is not None:
|
|
#retrieve char
|
|
type_id = type_id.replace(' ', '')
|
|
if not ail_objects.is_valid_object_subtype(correlation_type, type_id):
|
|
type_id = None
|
|
|
|
date_range = []
|
|
if date_from is not None and date_to is not None:
|
|
#change format
|
|
try:
|
|
if len(date_from) != 8:
|
|
date_from = date_from[0:4] + date_from[5:7] + date_from[8:10]
|
|
date_to = date_to[0:4] + date_to[5:7] + date_to[8:10]
|
|
date_range = substract_date(date_from, date_to)
|
|
except:
|
|
pass
|
|
|
|
if not date_range:
|
|
date_range.append(datetime.date.today().strftime("%Y%m%d"))
|
|
date_from = date_range[0][0:4] + '-' + date_range[0][4:6] + '-' + date_range[0][6:8]
|
|
date_to = date_from
|
|
|
|
else:
|
|
date_from = date_from[0:4] + '-' + date_from[4:6] + '-' + date_from[6:8]
|
|
date_to = date_to[0:4] + '-' + date_to[4:6] + '-' + date_to[6:8]
|
|
|
|
# display day type bar chart
|
|
if len(date_range) == 1 and type is None:
|
|
daily_type_chart = True
|
|
daily_date = date_range[0]
|
|
else:
|
|
daily_type_chart = False
|
|
daily_date = None
|
|
|
|
if type_id is None:
|
|
all_type_id = get_all_types_id(correlation_type)
|
|
else:
|
|
all_type_id = type_id
|
|
|
|
l_keys_id_dump = set()
|
|
if show_decoded_files:
|
|
for date in date_range:
|
|
if isinstance(all_type_id, str):
|
|
l_dump = r_serv_metadata.hkeys('{}:{}:{}'.format(correlation_type, all_type_id, date))
|
|
if l_dump:
|
|
for dump in l_dump:
|
|
l_keys_id_dump.add( (dump, all_type_id) )
|
|
else:
|
|
for typ_id in all_type_id:
|
|
l_dump = r_serv_metadata.hkeys('{}:{}:{}'.format(correlation_type, typ_id, date))
|
|
if l_dump:
|
|
for dump in l_dump:
|
|
l_keys_id_dump.add( (dump, typ_id) )
|
|
|
|
|
|
num_day_sparkline = 6
|
|
date_range_sparkline = get_date_range(num_day_sparkline)
|
|
|
|
sparkline_id = 0
|
|
keys_id_metadata = {}
|
|
for dump_res in l_keys_id_dump:
|
|
new_key_id, typ_id = dump_res
|
|
|
|
keys_id_metadata[new_key_id] = get_key_id_metadata(correlation_type, typ_id, new_key_id)
|
|
|
|
if keys_id_metadata[new_key_id]:
|
|
keys_id_metadata[new_key_id]['type_id'] = typ_id
|
|
keys_id_metadata[new_key_id]['type_icon'] = get_icon(correlation_type, typ_id)
|
|
|
|
keys_id_metadata[new_key_id]['sparklines_data'] = list_sparkline_type_id_values(date_range_sparkline, correlation_type, typ_id, new_key_id)
|
|
keys_id_metadata[new_key_id]['sparklines_id'] = sparkline_id
|
|
sparkline_id += 1
|
|
|
|
l_type = get_all_types_id(correlation_type)
|
|
|
|
correlation_type_n = correlation_type
|
|
if correlation_type_n=='pgpdump':
|
|
correlation_type_n = 'pgp'
|
|
|
|
return render_template("DaysCorrelation.html", all_metadata=keys_id_metadata,
|
|
correlation_type=correlation_type,
|
|
correlation_type_n=correlation_type_n,
|
|
correlation_type_endpoint=get_correlation_type_page_endpoint(correlation_type),
|
|
correlation_type_search_endpoint=get_correlation_type_search_endpoint(correlation_type),
|
|
show_key_id_endpoint=get_show_key_id_endpoint(correlation_type),
|
|
range_type_json_endpoint=get_range_type_json_endpoint(correlation_type),
|
|
l_type=l_type, type_id=type_id,
|
|
daily_type_chart=daily_type_chart, daily_date=daily_date,
|
|
date_from=date_from, date_to=date_to,
|
|
show_decoded_files=show_decoded_files)
|
|
|
|
|
|
|
|
def correlation_type_range_type_json(correlation_type, date_from, date_to):
|
|
date_range = []
|
|
if date_from is not None and date_to is not None:
|
|
#change format
|
|
if len(date_from) != 8:
|
|
date_from = date_from[0:4] + date_from[5:7] + date_from[8:10]
|
|
date_to = date_to[0:4] + date_to[5:7] + date_to[8:10]
|
|
date_range = substract_date(date_from, date_to)
|
|
|
|
if not date_range:
|
|
date_range.append(datetime.date.today().strftime("%Y%m%d"))
|
|
|
|
range_type = []
|
|
all_types_id = get_all_types_id(correlation_type)
|
|
|
|
# one day
|
|
if len(date_range) == 1:
|
|
for type_id in all_types_id:
|
|
day_type = {}
|
|
# init 0
|
|
for typ_id in all_types_id:
|
|
day_type[typ_id] = 0
|
|
day_type['date'] = type_id
|
|
num_day_type_id = 0
|
|
all_keys = r_serv_metadata.hvals('{}:{}:{}'.format(correlation_type, type_id, date_range[0]))
|
|
if all_keys:
|
|
for val in all_keys:
|
|
num_day_type_id += int(val)
|
|
day_type[type_id]= num_day_type_id
|
|
|
|
#if day_type[type_id] != 0:
|
|
range_type.append(day_type)
|
|
|
|
else:
|
|
# display type_id
|
|
for date in date_range:
|
|
day_type = {}
|
|
day_type['date']= date[0:4] + '-' + date[4:6] + '-' + date[6:8]
|
|
for type_id in all_types_id:
|
|
num_day_type_id = 0
|
|
all_keys = r_serv_metadata.hvals('{}:{}:{}'.format(correlation_type, type_id, date))
|
|
if all_keys:
|
|
for val in all_keys:
|
|
num_day_type_id += int(val)
|
|
day_type[type_id]= num_day_type_id
|
|
range_type.append(day_type)
|
|
|
|
return jsonify(range_type)
|
|
|
|
# ============= ROUTES ==============
|
|
|
|
|
|
############################ PGPDump ############################
|
|
|
|
@hashDecoded.route('/decoded/pgp_by_type_json') ## TODO: REFRACTOR
|
|
@login_required
|
|
@login_read_only
|
|
def pgp_by_type_json():
|
|
type_id = request.args.get('type_id')
|
|
date_from = request.args.get('date_from')
|
|
|
|
if date_from is None:
|
|
date_from = datetime.date.today().strftime("%Y%m%d")
|
|
|
|
#retrieve + char
|
|
type_id = type_id.replace(' ', '+')
|
|
default = False
|
|
|
|
if type_id is None:
|
|
default = True
|
|
all_type = ['key', 'name', 'mail']
|
|
else:
|
|
all_type = [ type_id ]
|
|
|
|
num_day_type = 30
|
|
date_range = get_date_range(num_day_type)
|
|
|
|
#verify input
|
|
if verify_pgp_type_id(type_id) or default:
|
|
|
|
type_value = []
|
|
|
|
range_decoder = []
|
|
for date in date_range:
|
|
day_type_id = {}
|
|
day_type_id['date']= date[0:4] + '-' + date[4:6] + '-' + date[6:8]
|
|
for type_pgp in all_type:
|
|
all_vals_key = r_serv_metadata.hvals('pgp:{}:date'.format(type_id, date))
|
|
num_day_type_id = 0
|
|
if all_vals_key is not None:
|
|
for val_key in all_vals_key:
|
|
num_day_type_id += int(val_key)
|
|
day_type_id[type_pgp]= num_day_type_id
|
|
range_decoder.append(day_type_id)
|
|
|
|
return jsonify(range_decoder)
|
|
else:
|
|
return jsonify()
|
|
|
|
############################ DateRange ############################
|
|
@hashDecoded.route("/correlation/pgpdump", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def pgpdump_page():
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
type_id = request.args.get('type_id')
|
|
|
|
show_decoded_files = request.args.get('show_decoded_files')
|
|
res = main_correlation_page('pgpdump', type_id, date_from, date_to, show_decoded_files)
|
|
return res
|
|
|
|
@hashDecoded.route("/correlation/cryptocurrency", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def cryptocurrency_page():
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
type_id = request.args.get('type_id')
|
|
|
|
show_decoded_files = request.args.get('show_decoded_files')
|
|
res = main_correlation_page('cryptocurrency', type_id, date_from, date_to, show_decoded_files)
|
|
return res
|
|
|
|
@hashDecoded.route("/correlation/username", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def username_page():
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
type_id = request.args.get('type_id')
|
|
|
|
show_decoded_files = request.args.get('show_decoded_files')
|
|
res = main_correlation_page('username', type_id, date_from, date_to, show_decoded_files)
|
|
return res
|
|
|
|
@hashDecoded.route("/correlation/all_pgpdump_search", methods=['POST'])
|
|
@login_required
|
|
@login_read_only
|
|
def all_pgpdump_search():
|
|
date_from = request.form.get('date_from')
|
|
date_to = request.form.get('date_to')
|
|
type_id = request.form.get('type')
|
|
show_decoded_files = request.form.get('show_decoded_files')
|
|
return redirect(url_for('hashDecoded.pgpdump_page', date_from=date_from, date_to=date_to, type_id=type_id, show_decoded_files=show_decoded_files))
|
|
|
|
@hashDecoded.route("/correlation/all_cryptocurrency_search", methods=['POST'])
|
|
@login_required
|
|
@login_read_only
|
|
def all_cryptocurrency_search():
|
|
date_from = request.form.get('date_from')
|
|
date_to = request.form.get('date_to')
|
|
type_id = request.form.get('type')
|
|
show_decoded_files = request.form.get('show_decoded_files')
|
|
return redirect(url_for('hashDecoded.cryptocurrency_page', date_from=date_from, date_to=date_to, type_id=type_id, show_decoded_files=show_decoded_files))
|
|
|
|
@hashDecoded.route("/correlation/all_username_search", methods=['POST'])
|
|
@login_required
|
|
@login_read_only
|
|
def all_username_search():
|
|
date_from = request.form.get('date_from')
|
|
date_to = request.form.get('date_to')
|
|
type_id = request.form.get('type')
|
|
show_decoded_files = request.form.get('show_decoded_files')
|
|
return redirect(url_for('hashDecoded.username_page', date_from=date_from, date_to=date_to, type_id=type_id, show_decoded_files=show_decoded_files))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@hashDecoded.route('/correlation/cryptocurrency_range_type_json')
|
|
@login_required
|
|
@login_read_only
|
|
def cryptocurrency_range_type_json():
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
return correlation_type_range_type_json('cryptocurrency', date_from, date_to)
|
|
|
|
@hashDecoded.route('/correlation/pgpdump_range_type_json')
|
|
@login_required
|
|
@login_read_only
|
|
def pgpdump_range_type_json():
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
return correlation_type_range_type_json('pgpdump', date_from, date_to)
|
|
|
|
@hashDecoded.route('/correlation/username_range_type_json')
|
|
@login_required
|
|
@login_read_only
|
|
def username_range_type_json():
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
return correlation_type_range_type_json('username', date_from, date_to)
|
|
|
|
##########################################################################################
|
|
##########################################################################################
|
|
##########################################################################################
|
|
##########################################################################################
|
|
##########################################################################################
|
|
##########################################################################################
|
|
##########################################################################################
|
|
##########################################################################################
|
|
##########################################################################################
|
|
|
|
|
|
# # TODO: REFRACTOR
|
|
@hashDecoded.route('/correlation/pgpdump_graph_line_json')
|
|
@login_required
|
|
@login_read_only
|
|
def pgpdump_graph_line_json():
|
|
type_id = request.args.get('type_id')
|
|
key_id = request.args.get('key_id')
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
return correlation_graph_line_json('pgpdump', type_id, key_id, date_from, date_to)
|
|
|
|
def correlation_graph_line_json(correlation_type, type_id, key_id, date_from, date_to):
|
|
# verify input
|
|
if key_id is not None and ail_objects.is_valid_object_subtype(correlation_type, type_id) and ail_objects.exists_obj(correlation_type, type_id, key_id):
|
|
|
|
if date_from is None or date_to is None:
|
|
nb_days_seen_in_pastes = 30
|
|
else:
|
|
# # TODO: # FIXME:
|
|
nb_days_seen_in_pastes = 30
|
|
|
|
date_range_seen_in_pastes = get_date_range(nb_days_seen_in_pastes)
|
|
|
|
json_seen_in_paste = []
|
|
for date in date_range_seen_in_pastes:
|
|
nb_seen_this_day = r_serv_metadata.hget('{}:{}:{}'.format(correlation_type, type_id, date), key_id)
|
|
if nb_seen_this_day is None:
|
|
nb_seen_this_day = 0
|
|
date = date[0:4] + '-' + date[4:6] + '-' + date[6:8]
|
|
json_seen_in_paste.append({'date': date, 'value': int(nb_seen_this_day)})
|
|
|
|
return jsonify(json_seen_in_paste)
|
|
else:
|
|
return jsonify()
|
|
|
|
@hashDecoded.route('/correlation/cryptocurrency_graph_line_json')
|
|
@login_required
|
|
@login_read_only
|
|
def cryptocurrency_graph_line_json():
|
|
type_id = request.args.get('type_id')
|
|
key_id = request.args.get('key_id')
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
return correlation_graph_line_json('cryptocurrency', type_id, key_id, date_from, date_to)
|
|
|
|
@hashDecoded.route('/correlation/username_graph_line_json')
|
|
@login_required
|
|
@login_read_only
|
|
def username_graph_line_json():
|
|
type_id = request.args.get('type_id')
|
|
key_id = request.args.get('key_id')
|
|
date_from = request.args.get('date_from')
|
|
date_to = request.args.get('date_to')
|
|
return correlation_graph_line_json('username', type_id, key_id, date_from, date_to)
|
|
|
|
# ========= REGISTRATION =========
|
|
app.register_blueprint(hashDecoded, url_prefix=baseUrl)
|