mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-10 08:38:28 +00:00
fix: [url_prefix] add root blueprint, fix:#403
This commit is contained in:
parent
734c94453a
commit
9051c4081b
6 changed files with 166 additions and 121 deletions
|
@ -2,7 +2,6 @@
|
|||
# -*-coding:UTF-8 -*
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import ssl
|
||||
import json
|
||||
|
@ -17,15 +16,11 @@ import configparser
|
|||
from flask import Flask, render_template, jsonify, request, Request, Response, session, redirect, url_for
|
||||
from flask_login import LoginManager, current_user, login_user, logout_user, login_required
|
||||
|
||||
import bcrypt
|
||||
|
||||
import flask
|
||||
import importlib
|
||||
from os.path import join
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/'))
|
||||
sys.path.append('./modules/')
|
||||
import Paste
|
||||
from Date import Date
|
||||
|
||||
from User import User
|
||||
|
||||
|
@ -34,9 +29,8 @@ from pytaxonomies import Taxonomies
|
|||
# Import config
|
||||
import Flask_config
|
||||
|
||||
# Import Role_Manager
|
||||
from Role_Manager import create_user_db, check_password_strength, check_user_role_integrity
|
||||
from Role_Manager import login_admin, login_analyst
|
||||
# Import Blueprint
|
||||
from blueprints.root import root
|
||||
|
||||
Flask_dir = os.environ['AIL_FLASK']
|
||||
|
||||
|
@ -92,19 +86,25 @@ Flask_config.app = Flask(__name__, static_url_path=baseUrl+'/static/')
|
|||
app = Flask_config.app
|
||||
app.config['MAX_CONTENT_LENGTH'] = 900 * 1024 * 1024
|
||||
|
||||
# ========= BLUEPRINT =========#
|
||||
app.register_blueprint(root, url_prefix=baseUrl)
|
||||
# ========= =========#
|
||||
|
||||
# ========= session ========
|
||||
app.secret_key = str(random.getrandbits(256))
|
||||
login_manager = LoginManager()
|
||||
login_manager.login_view = 'login'
|
||||
login_manager.login_view = 'root.login'
|
||||
login_manager.init_app(app)
|
||||
|
||||
print()
|
||||
|
||||
# ========= LOGIN MANAGER ========
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
return User.get(user_id)
|
||||
|
||||
# ========= HEADER GENERATION ========
|
||||
# ========= HEADER GENERATION ======== DEPRECATED
|
||||
|
||||
# Get headers items that should be ignored (not displayed)
|
||||
toIgnoreModule = set()
|
||||
|
@ -182,119 +182,24 @@ def add_header(response):
|
|||
response.headers['Cache-Control'] = 'private, max-age=0'
|
||||
return response
|
||||
|
||||
# @app.route('/test', methods=['GET'])
|
||||
# def test():
|
||||
# for rule in app.url_map.iter_rules():
|
||||
# print(rule)
|
||||
# return 'o'
|
||||
|
||||
# ========== ROUTES ============
|
||||
@app.route('/login', methods=['POST', 'GET'])
|
||||
def login():
|
||||
|
||||
current_ip = request.remote_addr
|
||||
login_failed_ip = r_cache.get('failed_login_ip:{}'.format(current_ip))
|
||||
|
||||
# brute force by ip
|
||||
if login_failed_ip:
|
||||
login_failed_ip = int(login_failed_ip)
|
||||
if login_failed_ip >= 5:
|
||||
error = 'Max Connection Attempts reached, Please wait {}s'.format(r_cache.ttl('failed_login_ip:{}'.format(current_ip)))
|
||||
return render_template("login.html", error=error)
|
||||
|
||||
if request.method == 'POST':
|
||||
username = request.form.get('username')
|
||||
password = request.form.get('password')
|
||||
#next_page = request.form.get('next_page')
|
||||
|
||||
if username is not None:
|
||||
user = User.get(username)
|
||||
login_failed_user_id = r_cache.get('failed_login_user_id:{}'.format(username))
|
||||
# brute force by user_id
|
||||
if login_failed_user_id:
|
||||
login_failed_user_id = int(login_failed_user_id)
|
||||
if login_failed_user_id >= 5:
|
||||
error = 'Max Connection Attempts reached, Please wait {}s'.format(r_cache.ttl('failed_login_user_id:{}'.format(username)))
|
||||
return render_template("login.html", error=error)
|
||||
|
||||
if user and user.check_password(password):
|
||||
if not check_user_role_integrity(user.get_id()):
|
||||
error = 'Incorrect User ACL, Please contact your administrator'
|
||||
return render_template("login.html", error=error)
|
||||
login_user(user) ## TODO: use remember me ?
|
||||
if user.request_password_change():
|
||||
return redirect(url_for('change_password'))
|
||||
else:
|
||||
return redirect(url_for('dashboard.index'))
|
||||
# login failed
|
||||
else:
|
||||
# set brute force protection
|
||||
#logger.warning("Login failed, ip={}, username={}".format(current_ip, username))
|
||||
r_cache.incr('failed_login_ip:{}'.format(current_ip))
|
||||
r_cache.expire('failed_login_ip:{}'.format(current_ip), 300)
|
||||
r_cache.incr('failed_login_user_id:{}'.format(username))
|
||||
r_cache.expire('failed_login_user_id:{}'.format(username), 300)
|
||||
#
|
||||
|
||||
error = 'Password Incorrect'
|
||||
return render_template("login.html", error=error)
|
||||
|
||||
return 'please provide a valid username'
|
||||
|
||||
else:
|
||||
#next_page = request.args.get('next')
|
||||
error = request.args.get('error')
|
||||
return render_template("login.html" , error=error)
|
||||
|
||||
@app.route('/change_password', methods=['POST', 'GET'])
|
||||
@login_required
|
||||
def change_password():
|
||||
password1 = request.form.get('password1')
|
||||
password2 = request.form.get('password2')
|
||||
error = request.args.get('error')
|
||||
|
||||
if error:
|
||||
return render_template("change_password.html", error=error)
|
||||
|
||||
if current_user.is_authenticated and password1!=None:
|
||||
if password1==password2:
|
||||
if check_password_strength(password1):
|
||||
user_id = current_user.get_id()
|
||||
create_user_db(user_id , password1, update=True)
|
||||
return redirect(url_for('dashboard.index'))
|
||||
else:
|
||||
error = 'Incorrect password'
|
||||
return render_template("change_password.html", error=error)
|
||||
else:
|
||||
error = "Passwords don't match"
|
||||
return render_template("change_password.html", error=error)
|
||||
else:
|
||||
error = 'Please choose a new password'
|
||||
return render_template("change_password.html", error=error)
|
||||
|
||||
@app.route('/logout')
|
||||
@login_required
|
||||
def logout():
|
||||
logout_user()
|
||||
return redirect(url_for('login'))
|
||||
|
||||
# role error template
|
||||
@app.route('/role', methods=['POST', 'GET'])
|
||||
@login_required
|
||||
def role():
|
||||
return render_template("error/403.html"), 403
|
||||
|
||||
@app.route('/searchbox/')
|
||||
@login_required
|
||||
@login_analyst
|
||||
def searchbox():
|
||||
return render_template("searchbox.html")
|
||||
#@app.route('/endpoints')
|
||||
#def endpoints():
|
||||
# for rule in app.url_map.iter_rules():
|
||||
# str_endpoint = str(rule)
|
||||
# if len(str_endpoint)>5:
|
||||
# if str_endpoint[0:5]=='/api/': ## add baseUrl ???
|
||||
# print(str_endpoint)
|
||||
# #print(rule.endpoint) #internal endpoint name
|
||||
# #print(rule.methods)
|
||||
# return 'ok'
|
||||
|
||||
# ========== ERROR HANDLER ============
|
||||
|
||||
@app.errorhandler(405)
|
||||
def _handle_client_error(e):
|
||||
if request.path.startswith('/api/'):
|
||||
if request.path.startswith('/api/'): ## # TODO: add baseUrl
|
||||
res_dict = {"status": "error", "reason": "Method Not Allowed: The method is not allowed for the requested URL"}
|
||||
anchor_id = request.path[8:]
|
||||
anchor_id = anchor_id.replace('/', '_')
|
||||
|
@ -306,7 +211,7 @@ def _handle_client_error(e):
|
|||
|
||||
@app.errorhandler(404)
|
||||
def error_page_not_found(e):
|
||||
if request.path.startswith('/api/'):
|
||||
if request.path.startswith('/api/'): ## # TODO: add baseUrl
|
||||
return Response(json.dumps({"status": "error", "reason": "404 Not Found"}, indent=2, sort_keys=True), mimetype='application/json'), 404
|
||||
else:
|
||||
# avoid endpoint enumeration
|
||||
|
|
140
var/www/blueprints/root.py
Normal file
140
var/www/blueprints/root.py
Normal file
|
@ -0,0 +1,140 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*-coding:UTF-8 -*
|
||||
|
||||
'''
|
||||
Blueprint Flask: root endpoints: login, ...
|
||||
'''
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response
|
||||
from flask_login import login_required, current_user, login_user, logout_user
|
||||
|
||||
sys.path.append('modules')
|
||||
import Flask_config
|
||||
|
||||
# Import Role_Manager
|
||||
from Role_Manager import create_user_db, check_password_strength, check_user_role_integrity
|
||||
from Role_Manager import login_admin, login_analyst
|
||||
|
||||
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages'))
|
||||
from User import User
|
||||
|
||||
r_cache = Flask_config.r_cache
|
||||
r_serv_db = Flask_config.r_serv_db
|
||||
r_serv_tags = Flask_config.r_serv_tags
|
||||
|
||||
# ============ BLUEPRINT ============
|
||||
|
||||
root = Blueprint('root', __name__, template_folder='templates')
|
||||
|
||||
# ============ VARIABLES ============
|
||||
|
||||
|
||||
|
||||
# ============ FUNCTIONS ============
|
||||
|
||||
|
||||
|
||||
# ============= ROUTES ==============
|
||||
@root.route('/login', methods=['POST', 'GET'])
|
||||
def login():
|
||||
|
||||
current_ip = request.remote_addr
|
||||
login_failed_ip = r_cache.get('failed_login_ip:{}'.format(current_ip))
|
||||
|
||||
# brute force by ip
|
||||
if login_failed_ip:
|
||||
login_failed_ip = int(login_failed_ip)
|
||||
if login_failed_ip >= 5:
|
||||
error = 'Max Connection Attempts reached, Please wait {}s'.format(r_cache.ttl('failed_login_ip:{}'.format(current_ip)))
|
||||
return render_template("login.html", error=error)
|
||||
|
||||
if request.method == 'POST':
|
||||
username = request.form.get('username')
|
||||
password = request.form.get('password')
|
||||
#next_page = request.form.get('next_page')
|
||||
|
||||
if username is not None:
|
||||
user = User.get(username)
|
||||
login_failed_user_id = r_cache.get('failed_login_user_id:{}'.format(username))
|
||||
# brute force by user_id
|
||||
if login_failed_user_id:
|
||||
login_failed_user_id = int(login_failed_user_id)
|
||||
if login_failed_user_id >= 5:
|
||||
error = 'Max Connection Attempts reached, Please wait {}s'.format(r_cache.ttl('failed_login_user_id:{}'.format(username)))
|
||||
return render_template("login.html", error=error)
|
||||
|
||||
if user and user.check_password(password):
|
||||
if not check_user_role_integrity(user.get_id()):
|
||||
error = 'Incorrect User ACL, Please contact your administrator'
|
||||
return render_template("login.html", error=error)
|
||||
login_user(user) ## TODO: use remember me ?
|
||||
if user.request_password_change():
|
||||
return redirect(url_for('root.change_password'))
|
||||
else:
|
||||
return redirect(url_for('dashboard.index'))
|
||||
# login failed
|
||||
else:
|
||||
# set brute force protection
|
||||
#logger.warning("Login failed, ip={}, username={}".format(current_ip, username))
|
||||
r_cache.incr('failed_login_ip:{}'.format(current_ip))
|
||||
r_cache.expire('failed_login_ip:{}'.format(current_ip), 300)
|
||||
r_cache.incr('failed_login_user_id:{}'.format(username))
|
||||
r_cache.expire('failed_login_user_id:{}'.format(username), 300)
|
||||
#
|
||||
|
||||
error = 'Password Incorrect'
|
||||
return render_template("login.html", error=error)
|
||||
|
||||
return 'please provide a valid username'
|
||||
|
||||
else:
|
||||
#next_page = request.args.get('next')
|
||||
error = request.args.get('error')
|
||||
return render_template("login.html" , error=error)
|
||||
|
||||
@root.route('/change_password', methods=['POST', 'GET'])
|
||||
@login_required
|
||||
def change_password():
|
||||
password1 = request.form.get('password1')
|
||||
password2 = request.form.get('password2')
|
||||
error = request.args.get('error')
|
||||
|
||||
if error:
|
||||
return render_template("change_password.html", error=error)
|
||||
|
||||
if current_user.is_authenticated and password1!=None:
|
||||
if password1==password2:
|
||||
if check_password_strength(password1):
|
||||
user_id = current_user.get_id()
|
||||
create_user_db(user_id , password1, update=True)
|
||||
return redirect(url_for('dashboard.index'))
|
||||
else:
|
||||
error = 'Incorrect password'
|
||||
return render_template("change_password.html", error=error)
|
||||
else:
|
||||
error = "Passwords don't match"
|
||||
return render_template("change_password.html", error=error)
|
||||
else:
|
||||
error = 'Please choose a new password'
|
||||
return render_template("change_password.html", error=error)
|
||||
|
||||
@root.route('/logout')
|
||||
@login_required
|
||||
def logout():
|
||||
logout_user()
|
||||
return redirect(url_for('root.login'))
|
||||
|
||||
# role error template
|
||||
@root.route('/role', methods=['POST', 'GET'])
|
||||
@login_required
|
||||
def role():
|
||||
return render_template("error/403.html"), 403
|
||||
|
||||
@root.route('/searchbox/')
|
||||
@login_required
|
||||
@login_analyst
|
||||
def searchbox():
|
||||
return render_template("searchbox.html")
|
|
@ -61,7 +61,7 @@
|
|||
<body class="text-center">
|
||||
|
||||
|
||||
<form class="form-signin" action="{{ url_for('change_password')}}" autocomplete="off" method="post">
|
||||
<form class="form-signin" action="{{ url_for('root.change_password')}}" autocomplete="off" method="post">
|
||||
<img class="mb-4" src="{{ url_for('static', filename='image/AIL-logo.png')}}" width="300">
|
||||
<h1 class="h3 mb-3 text-secondary">Change Password</h1>
|
||||
<label for="inputPassword1" class="sr-only">Password</label>
|
||||
|
|
|
@ -66,7 +66,7 @@
|
|||
<body class="text-center">
|
||||
|
||||
|
||||
<form class="form-signin" action="{{ url_for('login')}}" method="post">
|
||||
<form class="form-signin" action="{{ url_for('root.login')}}" method="post">
|
||||
<img class="mb-4" src="{{ url_for('static', filename='image/AIL-logo.png')}}" width="300">
|
||||
<h1 class="h3 mb-3 text-secondary">Please sign in</h1>
|
||||
<label for="inputEmail" class="sr-only">Email address</label>
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
<a class="nav-link" id="page-options" href="{{ url_for('settings.settings_page') }}" aria-disabled="true"><i class="fas fa-cog"></i> Server Management</a>
|
||||
</li>
|
||||
<li class="nav-item mr-3">
|
||||
<a class="nav-link" id="page-options" href="{{ url_for('logout') }}" aria-disabled="true"><i class="fas fa-sign-out-alt"></i> Log Out</a>
|
||||
<a class="nav-link" id="page-options" href="{{ url_for('root.logout') }}" aria-disabled="true"><i class="fas fa-sign-out-alt"></i> Log Out</a>
|
||||
</li>
|
||||
</ul>
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
</a>
|
||||
</li>
|
||||
<li class="nav-item">
|
||||
<a class="nav-link" href="{{url_for('change_password')}}" id="nav_dashboard">
|
||||
<a class="nav-link" href="{{url_for('root.change_password')}}" id="nav_dashboard">
|
||||
<i class="fas fa-key"></i>
|
||||
<span>Change Password</span>
|
||||
</a>
|
||||
|
|
Loading…
Reference in a new issue