ail-framework/var/www/blueprints/settings_b.py

407 lines
14 KiB
Python

#!/usr/bin/env python3
# -*-coding:UTF-8 -*
'''
Blueprint Flask: ail_investigations
'''
import os
import sys
import json
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response, abort, send_file
from flask_login import login_required, current_user
# Import Role_Manager
from Role_Manager import login_admin, login_user, login_read_only
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import ail_updates
from lib import ail_orgs
from lib import ail_users
from lib import d4
from packages import git_status
# ============ BLUEPRINT ============
settings_b = Blueprint('settings_b', __name__, template_folder=os.path.join(os.environ['AIL_FLASK'], 'templates/settings'))
# ============ VARIABLES ============
# bootstrap_label = Flask_config.bootstrap_label
# ============ FUNCTIONS ============
def create_json_response(data, status_code):
return Response(json.dumps(data, indent=2, sort_keys=True), mimetype='application/json'), status_code
# ============= ROUTES ==============
@settings_b.route("/settings", methods=['GET'])
@login_required
@login_read_only
def settings_page():
git_metadata = git_status.get_git_metadata()
ail_version = ail_updates.get_ail_version()
acl_admin = current_user.is_in_role('admin')
return render_template("settings_index.html", git_metadata=git_metadata,
ail_version=ail_version, acl_admin=acl_admin)
@settings_b.route("/settings/background_update/json", methods=['GET'])
@login_required
@login_read_only
def get_background_update_metadata_json():
return jsonify(ail_updates.get_update_background_meta(options={}))
@settings_b.route("/settings/modules", methods=['GET'])
@login_required
@login_read_only
def settings_modules():
acl_admin = current_user.is_in_role('admin')
return render_template("settings/modules.html", acl_admin=acl_admin)
@settings_b.route("/settings/user/profile", methods=['GET'])
@login_required
@login_read_only
def user_profile():
user_id = current_user.get_user_id()
acl_admin = current_user.is_in_role('admin')
r = ail_users.api_get_user_profile(user_id)
if r[1] != 200:
return create_json_response(r[0], r[1])
meta = r[0]
global_2fa = ail_users.is_2fa_enabled()
return render_template("user_profile.html", meta=meta, global_2fa=global_2fa,acl_admin=acl_admin)
@settings_b.route("/settings/user/hotp", methods=['GET'])
@login_required
@login_read_only
def user_hotp():
# if not current_user.is_authenticated: # TODO CHECK IF FRESH LOGIN/SESSION -> check last loging time -> rerequest if expired
acl_admin = current_user.is_in_role('admin')
user_id = current_user.get_user_id()
r = ail_users.api_get_user_hotp(user_id)
if r[1] != 200:
return create_json_response(r[0], r[1])
hotp = r[0]
return render_template("user_hotp.html", hotp=hotp, acl_admin=acl_admin)
@settings_b.route("/settings/user/otp/enable/self", methods=['GET'])
@login_required
@login_read_only
def user_otp_enable_self():
user_id = current_user.get_user_id()
r = ail_users.api_enable_user_otp(user_id, request.access_route[0])
if r[1] != 200:
return create_json_response(r[0], r[1])
current_user.kill_session()
return redirect(url_for('settings_b.user_profile'))
@settings_b.route("/settings/user/otp/disable/self", methods=['GET'])
@login_required
@login_read_only
def user_otp_disable_self():
user_id = current_user.get_user_id()
r = ail_users.api_disable_user_otp(user_id, request.access_route[0])
if r[1] != 200:
return create_json_response(r[0], r[1])
current_user.kill_session()
return redirect(url_for('settings_b.user_profile'))
@settings_b.route("/settings/user/otp/reset/self", methods=['GET'])
@login_required
@login_admin
def user_otp_reset_self(): # TODO ask for password ?
user_id = current_user.get_user_id()
r = ail_users.api_reset_user_otp(user_id, user_id, request.access_route[0])
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
current_user.kill_session()
return redirect(url_for('settings_b.user_profile'))
@settings_b.route("/settings/user/otp/enable", methods=['GET'])
@login_required
@login_admin
def user_otp_enable():
user_id = request.args.get('user_id')
r = ail_users.api_enable_user_otp(user_id, request.access_route[0])
if r[1] != 200:
return create_json_response(r[0], r[1])
user = ail_users.AILUser.get(user_id)
user.kill_session()
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/user/otp/disable", methods=['GET'])
@login_required
@login_admin
def user_otp_disable():
user_id = request.args.get('user_id')
r = ail_users.api_disable_user_otp(user_id, request.access_route[0])
if r[1] != 200:
return create_json_response(r[0], r[1])
user = ail_users.AILUser.get(user_id)
user.kill_session()
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/user/otp/reset", methods=['GET'])
@login_required
@login_admin
def user_otp_reset(): # TODO ask for password ?
user_id = request.args.get('user_id')
admin_id = current_user.get_user_id()
r = ail_users.api_reset_user_otp(admin_id, user_id, request.access_route[0])
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
user = ail_users.AILUser.get(user_id)
user.kill_session()
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/user/api_key/new", methods=['GET'])
@login_required
@login_user
def new_token_user_self():
user_id = current_user.get_user_id()
r = ail_users.api_create_user_api_key_self(user_id, request.access_route[0], request.user_agent)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.user_profile'))
@settings_b.route("/settings/new_user_api_key", methods=['GET'])
@login_required
@login_admin
def new_token_user():
user_id = request.args.get('user_id')
admin_id = current_user.get_user_id()
r = ail_users.api_create_user_api_key(user_id, admin_id, request.access_route[0], request.user_agent)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/user/logout", methods=['GET'])
@login_required
@login_admin
def user_logout():
user_id = request.args.get('user_id') # TODO LOGS
admin_id = current_user.get_user_id()
r = ail_users.api_logout_user(admin_id, user_id, request.access_route[0], request.user_agent)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/users/logout", methods=['GET'])
@login_required
@login_admin
def users_logout():
admin_id = current_user.get_user_id() # TODO LOGS
r = ail_users.api_logout_users(admin_id, request.access_route[0], request.user_agent)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/create_user", methods=['GET'])
@login_required
@login_admin
def create_user():
user_id = request.args.get('user_id')
error = request.args.get('error')
error_mail = request.args.get('error_mail')
meta = {}
if user_id:
r = ail_users.api_get_user_profile(user_id)
if r[1] != 200:
return create_json_response(r[0], r[1])
meta = r[0]
all_roles = ail_users.get_roles()
orgs = ail_orgs.get_orgs_selector()
if meta:
selector_val = f"{meta['org']}: {meta['org_name']}"
else:
selector_val = None
return render_template("create_user.html", all_roles=all_roles, orgs=orgs, meta=meta,
error=error, error_mail=error_mail, selector_val=selector_val,
acl_admin=True)
@settings_b.route("/settings/edit_user", methods=['GET'])
@login_required
@login_admin
def edit_user():
user_id = request.args.get('user_id')
return redirect(url_for('settings_b.create_user', user_id=user_id))
@settings_b.route("/settings/create_user_post", methods=['POST'])
@login_required
@login_admin
def create_user_post():
# Admin ID
admin_id = current_user.get_user_id()
email = request.form.get('username')
org_uuid = request.form.get('user_organisation')
role = request.form.get('user_role')
password1 = request.form.get('password1')
password2 = request.form.get('password2')
enable_2_fa = request.form.get('enable_2_fa')
if enable_2_fa or ail_users.is_2fa_enabled():
enable_2_fa = True
else:
enable_2_fa = False
if org_uuid:
org_uuid = org_uuid[2:].split(':', 1)[0]
all_roles = ail_users.get_roles()
if email and len(email) < 300 and ail_users.check_email(email) and role:
if role in all_roles:
# password set
if password1 and password2:
if password1 == password2:
if ail_users.check_password_strength(password1):
password = password1
else:
return render_template("create_user.html", all_roles=all_roles, error="Incorrect Password", acl_admin=True)
else:
return render_template("create_user.html", all_roles=all_roles, error="Passwords don't match", acl_admin=True)
# generate password
else:
password = ail_users.gen_password()
if current_user.is_admin():
str_password = password
if ail_users.exists_user(email):
if not password1 and not password2:
password = None
str_password = 'Password not changed'
edit = True
else:
edit = False
r = ail_users.api_create_user(admin_id, request.access_route[0], request.user_agent, email, password, org_uuid, role, enable_2_fa)
if r[1] != 200:
return create_json_response(r[0], r[1])
new_user = {'email': email, 'password': str_password, 'org': org_uuid, 'otp': enable_2_fa, 'edited': edit}
# qr_code = ail_users.create_qr_code(f'{email} - {password}')
return render_template("create_user.html", new_user=new_user, meta={},
all_roles=all_roles, acl_admin=True)
else:
return render_template("create_user.html", all_roles=all_roles, meta={}, acl_admin=True)
else:
return render_template("create_user.html", all_roles=all_roles, meta={}, error_mail=True, acl_admin=True)
@settings_b.route("/settings/delete_user", methods=['GET'])
@login_required
@login_admin
def delete_user():
user_id = request.args.get('user_id')
admin_id = current_user.get_user_id()
r = ail_users.api_delete_user(user_id, admin_id, request.access_route[0], request.user_agent)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/users", methods=['GET'])
@login_required
@login_admin
def users_list():
meta = ail_users.api_get_users_meta()
return render_template("users_list.html", meta=meta, acl_admin=True)
#############################################
@settings_b.route("/settings/organisations", methods=['GET'])
@login_required
@login_admin
def organisations_list():
meta = ail_orgs.api_get_orgs_meta()
return render_template("orgs_list.html", meta=meta, acl_admin=True)
@settings_b.route("/settings/organisation", methods=['GET'])
@login_required
@login_admin
def organisation():
org_uuid = request.args.get('uuid')
meta, r = ail_orgs.api_get_org_meta(org_uuid)
if r != 200:
return create_json_response(meta, r)
if 'users' in meta:
meta['users'] = ail_users.get_users_meta(meta['users'])
return render_template("view_organisation.html", meta=meta, acl_admin=True)
@settings_b.route("/settings/create_organisation", methods=['GET'])
@login_required
@login_admin
def create_organisation():
meta = {}
return render_template("create_org.html", meta=meta, error_mail=False, acl_admin=True)
@settings_b.route("/settings/create_org_post", methods=['POST'])
@login_required
@login_admin
def create_org_post():
# Admin ID
admin_id = current_user.get_user_id()
org_uuid = request.form.get('uuid')
name = request.form.get('name')
description = request.form.get('description')
r = ail_orgs.api_create_org(admin_id, org_uuid, name, request.access_route[0], request.user_agent, description=description)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.organisations_list'))
# TODO check if uuid4
# TODO check name format + length
@settings_b.route("/settings/delete_org", methods=['GET'])
@login_required
@login_admin
def delete_org():
admin_id = current_user.get_user_id()
org_uuid = request.args.get('uuid')
r = ail_orgs.api_delete_org(org_uuid, admin_id, request.access_route[0], request.user_agent)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.organisations_list'))
#############################################
@settings_b.route("/settings/passivedns", methods=['GET'])
@login_required
@login_read_only
def passive_dns():
passivedns_enabled = d4.is_passive_dns_enabled()
return render_template("passive_dns.html", passivedns_enabled=passivedns_enabled)
@settings_b.route("/settings/passivedns/change_state", methods=['GET'])
@login_required
@login_admin
def passive_dns_change_state():
new_state = request.args.get('state') == 'enable'
passivedns_enabled = d4.change_passive_dns_state(new_state)
return redirect(url_for('settings_b.passive_dns'))
# @settings.route("/settings/ail", methods=['GET'])
# @login_required
# @login_admin
# def ail_configs():
# return render_template("ail_configs.html", passivedns_enabled=None)