mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-24 14:57:16 +00:00
407 lines
14 KiB
Python
407 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# -*-coding:UTF-8 -*
|
|
|
|
'''
|
|
Blueprint Flask: ail_investigations
|
|
'''
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
|
|
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response, abort, send_file
|
|
from flask_login import login_required, current_user
|
|
|
|
# Import Role_Manager
|
|
from Role_Manager import login_admin, login_user, login_read_only
|
|
|
|
sys.path.append(os.environ['AIL_BIN'])
|
|
##################################
|
|
# Import Project packages
|
|
##################################
|
|
from lib import ail_updates
|
|
from lib import ail_orgs
|
|
from lib import ail_users
|
|
from lib import d4
|
|
from packages import git_status
|
|
|
|
# ============ BLUEPRINT ============
|
|
settings_b = Blueprint('settings_b', __name__, template_folder=os.path.join(os.environ['AIL_FLASK'], 'templates/settings'))
|
|
|
|
# ============ VARIABLES ============
|
|
# bootstrap_label = Flask_config.bootstrap_label
|
|
|
|
# ============ FUNCTIONS ============
|
|
|
|
def create_json_response(data, status_code):
|
|
return Response(json.dumps(data, indent=2, sort_keys=True), mimetype='application/json'), status_code
|
|
|
|
# ============= ROUTES ==============
|
|
|
|
@settings_b.route("/settings", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def settings_page():
|
|
git_metadata = git_status.get_git_metadata()
|
|
ail_version = ail_updates.get_ail_version()
|
|
acl_admin = current_user.is_in_role('admin')
|
|
return render_template("settings_index.html", git_metadata=git_metadata,
|
|
ail_version=ail_version, acl_admin=acl_admin)
|
|
|
|
@settings_b.route("/settings/background_update/json", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def get_background_update_metadata_json():
|
|
return jsonify(ail_updates.get_update_background_meta(options={}))
|
|
|
|
@settings_b.route("/settings/modules", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def settings_modules():
|
|
acl_admin = current_user.is_in_role('admin')
|
|
return render_template("settings/modules.html", acl_admin=acl_admin)
|
|
|
|
@settings_b.route("/settings/user/profile", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def user_profile():
|
|
user_id = current_user.get_user_id()
|
|
acl_admin = current_user.is_in_role('admin')
|
|
r = ail_users.api_get_user_profile(user_id)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
meta = r[0]
|
|
global_2fa = ail_users.is_2fa_enabled()
|
|
return render_template("user_profile.html", meta=meta, global_2fa=global_2fa,acl_admin=acl_admin)
|
|
|
|
@settings_b.route("/settings/user/hotp", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def user_hotp():
|
|
# if not current_user.is_authenticated: # TODO CHECK IF FRESH LOGIN/SESSION -> check last loging time -> rerequest if expired
|
|
|
|
acl_admin = current_user.is_in_role('admin')
|
|
user_id = current_user.get_user_id()
|
|
r = ail_users.api_get_user_hotp(user_id)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
hotp = r[0]
|
|
return render_template("user_hotp.html", hotp=hotp, acl_admin=acl_admin)
|
|
|
|
@settings_b.route("/settings/user/otp/enable/self", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def user_otp_enable_self():
|
|
user_id = current_user.get_user_id()
|
|
r = ail_users.api_enable_user_otp(user_id, request.access_route[0])
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
current_user.kill_session()
|
|
return redirect(url_for('settings_b.user_profile'))
|
|
|
|
@settings_b.route("/settings/user/otp/disable/self", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def user_otp_disable_self():
|
|
user_id = current_user.get_user_id()
|
|
r = ail_users.api_disable_user_otp(user_id, request.access_route[0])
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
current_user.kill_session()
|
|
return redirect(url_for('settings_b.user_profile'))
|
|
|
|
@settings_b.route("/settings/user/otp/reset/self", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def user_otp_reset_self(): # TODO ask for password ?
|
|
user_id = current_user.get_user_id()
|
|
r = ail_users.api_reset_user_otp(user_id, user_id, request.access_route[0])
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
current_user.kill_session()
|
|
return redirect(url_for('settings_b.user_profile'))
|
|
|
|
@settings_b.route("/settings/user/otp/enable", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def user_otp_enable():
|
|
user_id = request.args.get('user_id')
|
|
r = ail_users.api_enable_user_otp(user_id, request.access_route[0])
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
user = ail_users.AILUser.get(user_id)
|
|
user.kill_session()
|
|
return redirect(url_for('settings_b.users_list'))
|
|
|
|
@settings_b.route("/settings/user/otp/disable", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def user_otp_disable():
|
|
user_id = request.args.get('user_id')
|
|
r = ail_users.api_disable_user_otp(user_id, request.access_route[0])
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
user = ail_users.AILUser.get(user_id)
|
|
user.kill_session()
|
|
return redirect(url_for('settings_b.users_list'))
|
|
|
|
@settings_b.route("/settings/user/otp/reset", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def user_otp_reset(): # TODO ask for password ?
|
|
user_id = request.args.get('user_id')
|
|
admin_id = current_user.get_user_id()
|
|
r = ail_users.api_reset_user_otp(admin_id, user_id, request.access_route[0])
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
user = ail_users.AILUser.get(user_id)
|
|
user.kill_session()
|
|
return redirect(url_for('settings_b.users_list'))
|
|
|
|
@settings_b.route("/settings/user/api_key/new", methods=['GET'])
|
|
@login_required
|
|
@login_user
|
|
def new_token_user_self():
|
|
user_id = current_user.get_user_id()
|
|
r = ail_users.api_create_user_api_key_self(user_id, request.access_route[0], request.user_agent)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
return redirect(url_for('settings_b.user_profile'))
|
|
|
|
@settings_b.route("/settings/new_user_api_key", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def new_token_user():
|
|
user_id = request.args.get('user_id')
|
|
admin_id = current_user.get_user_id()
|
|
r = ail_users.api_create_user_api_key(user_id, admin_id, request.access_route[0], request.user_agent)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
return redirect(url_for('settings_b.users_list'))
|
|
|
|
@settings_b.route("/settings/user/logout", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def user_logout():
|
|
user_id = request.args.get('user_id') # TODO LOGS
|
|
admin_id = current_user.get_user_id()
|
|
r = ail_users.api_logout_user(admin_id, user_id, request.access_route[0], request.user_agent)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
return redirect(url_for('settings_b.users_list'))
|
|
|
|
@settings_b.route("/settings/users/logout", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def users_logout():
|
|
admin_id = current_user.get_user_id() # TODO LOGS
|
|
r = ail_users.api_logout_users(admin_id, request.access_route[0], request.user_agent)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
return redirect(url_for('settings_b.users_list'))
|
|
|
|
@settings_b.route("/settings/create_user", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def create_user():
|
|
user_id = request.args.get('user_id')
|
|
error = request.args.get('error')
|
|
error_mail = request.args.get('error_mail')
|
|
meta = {}
|
|
if user_id:
|
|
r = ail_users.api_get_user_profile(user_id)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
meta = r[0]
|
|
all_roles = ail_users.get_roles()
|
|
orgs = ail_orgs.get_orgs_selector()
|
|
if meta:
|
|
selector_val = f"{meta['org']}: {meta['org_name']}"
|
|
else:
|
|
selector_val = None
|
|
return render_template("create_user.html", all_roles=all_roles, orgs=orgs, meta=meta,
|
|
error=error, error_mail=error_mail, selector_val=selector_val,
|
|
acl_admin=True)
|
|
|
|
@settings_b.route("/settings/edit_user", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def edit_user():
|
|
user_id = request.args.get('user_id')
|
|
return redirect(url_for('settings_b.create_user', user_id=user_id))
|
|
|
|
|
|
@settings_b.route("/settings/create_user_post", methods=['POST'])
|
|
@login_required
|
|
@login_admin
|
|
def create_user_post():
|
|
# Admin ID
|
|
admin_id = current_user.get_user_id()
|
|
|
|
email = request.form.get('username')
|
|
org_uuid = request.form.get('user_organisation')
|
|
role = request.form.get('user_role')
|
|
password1 = request.form.get('password1')
|
|
password2 = request.form.get('password2')
|
|
enable_2_fa = request.form.get('enable_2_fa')
|
|
if enable_2_fa or ail_users.is_2fa_enabled():
|
|
enable_2_fa = True
|
|
else:
|
|
enable_2_fa = False
|
|
|
|
if org_uuid:
|
|
org_uuid = org_uuid[2:].split(':', 1)[0]
|
|
|
|
all_roles = ail_users.get_roles()
|
|
|
|
if email and len(email) < 300 and ail_users.check_email(email) and role:
|
|
if role in all_roles:
|
|
# password set
|
|
if password1 and password2:
|
|
if password1 == password2:
|
|
if ail_users.check_password_strength(password1):
|
|
password = password1
|
|
else:
|
|
return render_template("create_user.html", all_roles=all_roles, error="Incorrect Password", acl_admin=True)
|
|
else:
|
|
return render_template("create_user.html", all_roles=all_roles, error="Passwords don't match", acl_admin=True)
|
|
# generate password
|
|
else:
|
|
password = ail_users.gen_password()
|
|
|
|
if current_user.is_admin():
|
|
str_password = password
|
|
if ail_users.exists_user(email):
|
|
if not password1 and not password2:
|
|
password = None
|
|
str_password = 'Password not changed'
|
|
edit = True
|
|
else:
|
|
edit = False
|
|
r = ail_users.api_create_user(admin_id, request.access_route[0], request.user_agent, email, password, org_uuid, role, enable_2_fa)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
|
|
new_user = {'email': email, 'password': str_password, 'org': org_uuid, 'otp': enable_2_fa, 'edited': edit}
|
|
# qr_code = ail_users.create_qr_code(f'{email} - {password}')
|
|
return render_template("create_user.html", new_user=new_user, meta={},
|
|
all_roles=all_roles, acl_admin=True)
|
|
|
|
else:
|
|
return render_template("create_user.html", all_roles=all_roles, meta={}, acl_admin=True)
|
|
else:
|
|
return render_template("create_user.html", all_roles=all_roles, meta={}, error_mail=True, acl_admin=True)
|
|
|
|
|
|
|
|
@settings_b.route("/settings/delete_user", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def delete_user():
|
|
user_id = request.args.get('user_id')
|
|
admin_id = current_user.get_user_id()
|
|
r = ail_users.api_delete_user(user_id, admin_id, request.access_route[0], request.user_agent)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
return redirect(url_for('settings_b.users_list'))
|
|
|
|
@settings_b.route("/settings/users", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def users_list():
|
|
meta = ail_users.api_get_users_meta()
|
|
return render_template("users_list.html", meta=meta, acl_admin=True)
|
|
|
|
#############################################
|
|
|
|
@settings_b.route("/settings/organisations", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def organisations_list():
|
|
meta = ail_orgs.api_get_orgs_meta()
|
|
return render_template("orgs_list.html", meta=meta, acl_admin=True)
|
|
|
|
@settings_b.route("/settings/organisation", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def organisation():
|
|
org_uuid = request.args.get('uuid')
|
|
meta, r = ail_orgs.api_get_org_meta(org_uuid)
|
|
if r != 200:
|
|
return create_json_response(meta, r)
|
|
if 'users' in meta:
|
|
meta['users'] = ail_users.get_users_meta(meta['users'])
|
|
return render_template("view_organisation.html", meta=meta, acl_admin=True)
|
|
|
|
@settings_b.route("/settings/create_organisation", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def create_organisation():
|
|
meta = {}
|
|
return render_template("create_org.html", meta=meta, error_mail=False, acl_admin=True)
|
|
|
|
@settings_b.route("/settings/create_org_post", methods=['POST'])
|
|
@login_required
|
|
@login_admin
|
|
def create_org_post():
|
|
# Admin ID
|
|
admin_id = current_user.get_user_id()
|
|
|
|
org_uuid = request.form.get('uuid')
|
|
name = request.form.get('name')
|
|
description = request.form.get('description')
|
|
|
|
r = ail_orgs.api_create_org(admin_id, org_uuid, name, request.access_route[0], request.user_agent, description=description)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
return redirect(url_for('settings_b.organisations_list'))
|
|
|
|
|
|
# TODO check if uuid4
|
|
# TODO check name format + length
|
|
|
|
@settings_b.route("/settings/delete_org", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def delete_org():
|
|
admin_id = current_user.get_user_id()
|
|
org_uuid = request.args.get('uuid')
|
|
r = ail_orgs.api_delete_org(org_uuid, admin_id, request.access_route[0], request.user_agent)
|
|
if r[1] != 200:
|
|
return create_json_response(r[0], r[1])
|
|
else:
|
|
return redirect(url_for('settings_b.organisations_list'))
|
|
|
|
|
|
#############################################
|
|
|
|
@settings_b.route("/settings/passivedns", methods=['GET'])
|
|
@login_required
|
|
@login_read_only
|
|
def passive_dns():
|
|
passivedns_enabled = d4.is_passive_dns_enabled()
|
|
return render_template("passive_dns.html", passivedns_enabled=passivedns_enabled)
|
|
|
|
|
|
@settings_b.route("/settings/passivedns/change_state", methods=['GET'])
|
|
@login_required
|
|
@login_admin
|
|
def passive_dns_change_state():
|
|
new_state = request.args.get('state') == 'enable'
|
|
passivedns_enabled = d4.change_passive_dns_state(new_state)
|
|
return redirect(url_for('settings_b.passive_dns'))
|
|
|
|
# @settings.route("/settings/ail", methods=['GET'])
|
|
# @login_required
|
|
# @login_admin
|
|
# def ail_configs():
|
|
# return render_template("ail_configs.html", passivedns_enabled=None)
|
|
|