chg: [login] add 2FA TOTP and HOTP

This commit is contained in:
terrtia 2024-06-26 13:55:39 +02:00
parent 8d4721d703
commit 073181fbd8
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
5 changed files with 401 additions and 42 deletions

View file

@ -8,7 +8,11 @@ import re
import secrets import secrets
import sys import sys
import segno
from base64 import b64encode
from flask_login import UserMixin from flask_login import UserMixin
from io import BytesIO
from uuid import uuid4 from uuid import uuid4
sys.path.append(os.environ['AIL_BIN']) sys.path.append(os.environ['AIL_BIN'])
@ -21,6 +25,11 @@ from lib.ConfigLoader import ConfigLoader
config_loader = ConfigLoader() config_loader = ConfigLoader()
r_serv_db = config_loader.get_db_conn("Kvrocks_DB") r_serv_db = config_loader.get_db_conn("Kvrocks_DB")
r_cache = config_loader.get_redis_conn("Redis_Cache") r_cache = config_loader.get_redis_conn("Redis_Cache")
if config_loader.get_config_boolean('Users', 'force_2fa'):
r_serv_db.hset('ail:2fa', '2fa', 1)
else:
r_serv_db.hset('ail:2fa', '2fa', 0)
config_loader = None config_loader = None
regex_password = r'^(?=(.*\d){2})(?=.*[a-z])(?=.*[A-Z]).{10,100}$' regex_password = r'^(?=(.*\d){2})(?=.*[a-z])(?=.*[A-Z]).{10,100}$'
@ -83,6 +92,9 @@ def hashing_password(password):
password = password.encode() password = password.encode()
return bcrypt.hashpw(password, bcrypt.gensalt()) return bcrypt.hashpw(password, bcrypt.gensalt())
def get_user_passwd_hash(user_id):
return r_serv_db.hget('ail:users:all', user_id)
## --PASSWORDS-- ## ## --PASSWORDS-- ##
def check_email(email): def check_email(email):
@ -95,6 +107,15 @@ def check_email(email):
#### TOKENS #### #### TOKENS ####
def get_user_token(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token')
def get_token_user(token):
return r_serv_db.hget('ail:users:tokens', token)
def exists_token(token):
return r_serv_db.hexists('ail:users:tokens', token)
def gen_token(): def gen_token():
return secrets.token_urlsafe(41) return secrets.token_urlsafe(41)
@ -136,8 +157,93 @@ def _get_hotp(secret):
def _verify_hotp(hotp, counter, code): def _verify_hotp(hotp, counter, code):
return hotp.verify(code, counter) return hotp.verify(code, counter)
def get_user_otp_secret(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_secret')
def get_user_hotp_counter(user_id):
return int(r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_counter'))
def verify_user_totp(user_id, code):
totp = _get_totp(get_user_otp_secret(user_id))
return _verify_totp(totp, code)
def verify_user_hotp(user_id, code): # TODO IF valid increase counter
hotp = _get_hotp(get_user_otp_secret(user_id))
counter = get_user_hotp_counter(user_id)
return _verify_hotp(hotp, counter, code)
def verify_user_otp(user_id, code):
if verify_user_totp(user_id, code):
return True
elif verify_user_hotp(user_id, code):
return True
return False
def create_user_otp(user_id):
secret = pyotp.random_base32()
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_secret', secret)
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_counter', 0)
enable_user_2fa(user_id)
def delete_user_otp(user_id):
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'otp_secret')
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'otp_counter')
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_setup', 0)
disable_user_2fa(user_id)
def get_user_otp_uri(user_id, instance_name):
return pyotp.totp.TOTP(get_user_otp_secret(user_id)).provisioning_uri(name=user_id, issuer_name=instance_name)
def get_user_otp_qr_code(user_id, instance_name):
uri = get_user_otp_uri(user_id, instance_name)
qrcode = segno.make_qr(uri)
buff = BytesIO()
qrcode.save(buff, kind='png', scale=10)
return b64encode(buff.getvalue()).decode()
# qrcode.save('qrcode.png', scale=10)
def get_user_hotp_code(user_id):
hotp = _get_hotp(get_user_otp_secret(user_id))
counter = get_user_hotp_counter(user_id)
codes = []
for i in range(counter, counter + 20):
codes.append(f'{i}: {hotp.at(i)}')
return codes
# TODO GET USER HOTP LISTS
# TODO RESET OTP
def is_user_otp_setup(user_id):
otp_setup = r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_setup')
if otp_setup:
return int(otp_setup) == 1
return False
## --OTP-- ## ## --OTP-- ##
#### 2FA ####
# Global 2fa option
def is_2fa_enabled():
fa2 = r_serv_db.hget('ail:2fa', '2fa')
if fa2:
return int(fa2) == 1
return False
def is_user_2fa_enabled(user_id):
fa2 = r_serv_db.hget(f'ail:user:metadata:{user_id}', '2fa')
if fa2:
return int(fa2) == 1
return False
def enable_user_2fa(user_id):
return r_serv_db.hset(f'ail:user:metadata:{user_id}', '2fa', 1)
def disable_user_2fa(user_id):
return r_serv_db.hset(f'ail:user:metadata:{user_id}', '2fa', 0)
## --2FA-- ##
#### USERS #### #### USERS ####
def get_users(): def get_users():
@ -146,35 +252,6 @@ def get_users():
def get_user_role(user_id): def get_user_role(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role') return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role')
def get_user_passwd_hash(user_id):
return r_serv_db.hget('ail:users:all', user_id)
def get_user_token(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token')
def get_token_user(token):
return r_serv_db.hget('ail:users:tokens', token)
def exists_token(token):
return r_serv_db.hexists('ail:users:tokens', token)
# def _get_user_otp(user_id):
#
#
# def get_user_hotps(user_id):
#
#
# def _get_user_hotp(user_id):
#
#
# def verify_user_otp(user_id, code):
#
#
# def get_user_hotp_counter(user_id):
# return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'hotp:counter')
#
# def verify_user_hotp(user_id, code):
# counter
######################################################################################################################## ########################################################################################################################
######################################################################################################################## ########################################################################################################################
@ -284,6 +361,31 @@ class AILUser(UserMixin):
_set_user_token(self.user_id, new_api_key) _set_user_token(self.user_id, new_api_key)
return new_api_key return new_api_key
## OTP ##
def is_2fa_setup(self):
return is_user_otp_setup(self.user_id)
def is_2fa_enabled(self):
if is_2fa_enabled():
return True
else:
return is_user_2fa_enabled(self.user_id)
def get_htop_counter(self):
return get_user_hotp_counter(self.user_id)
def is_valid_otp(self, code):
return verify_user_otp(self.user_id, code)
def init_setup_2fa(self, create=True):
if create:
create_user_otp(self.user_id)
return get_user_otp_qr_code(self.user_id, 'AIL TEST'), get_user_hotp_code(self.user_id)
def setup_2fa(self):
r_serv_db.hset(f'ail:user:metadata:{self.user_id}', 'otp_setup', 1)
## ROLE ## ## ROLE ##
def is_in_role(self, role): # TODO Get role via user alternative ID def is_in_role(self, role): # TODO Get role via user alternative ID
@ -501,3 +603,10 @@ def check_user_role_integrity(user_id):
return res return res
## --ROLES-- ## ## --ROLES-- ##
if __name__ == '__main__':
user_id = 'admin@admin.test'
instance_name = 'AIL TEST'
delete_user_otp(user_id)
# q = get_user_otp_qr_code(user_id, instance_name)
# print(q)

View file

@ -79,6 +79,10 @@ minute_processed_paste = 10
#Maximum line length authorized to make a diff between duplicates #Maximum line length authorized to make a diff between duplicates
DiffMaxLineLength = 10000 DiffMaxLineLength = 10000
##### Users #####
[Users]
force_2fa = False
[AIL_2_AIL] [AIL_2_AIL]
server_host = 0.0.0.0 server_host = 0.0.0.0
server_port = 4443 server_port = 4443

View file

@ -7,8 +7,10 @@
import os import os
import sys import sys
import time
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response
from flask import session
from flask_login import login_required, current_user, login_user, logout_user from flask_login import login_required, current_user, login_user, logout_user
sys.path.append('modules') sys.path.append('modules')
@ -62,7 +64,6 @@ def login():
if username is not None: if username is not None:
user = AILUser.get(username) # TODO ANONYMOUS USER user = AILUser.get(username) # TODO ANONYMOUS USER
print(user.is_anonymous)
# brute force by user_id # brute force by user_id
login_failed_user_id = r_cache.get(f'failed_login_user_id:{username}') login_failed_user_id = r_cache.get(f'failed_login_user_id:{username}')
@ -78,6 +79,18 @@ def login():
logging_error = 'Incorrect User ACL, Please contact your administrator' logging_error = 'Incorrect User ACL, Please contact your administrator'
return render_template("login.html", error=logging_error) return render_template("login.html", error=logging_error)
if user.is_2fa_enabled():
session['user_id'] = user.get_user_id()
session['otp_expire'] = int(time.time()) + 10800
if not user.is_2fa_setup():
return redirect(url_for('root.setup_2fa'))
else:
htop_counter = user.get_htop_counter()
return redirect(url_for('root.verify_2fa', htop_counter=htop_counter))
else:
# Login User # Login User
user.rotate_session() user.rotate_session()
login_user(user) login_user(user)
@ -117,6 +130,103 @@ def login():
error = request.args.get('error') error = request.args.get('error')
return render_template("login.html", next_page=next_page, error=error) return render_template("login.html", next_page=next_page, error=error)
@root.route('/2fa', methods=['POST', 'GET']) # TODO CHECK IF user_id exists
def verify_2fa():
user_id = session.get('user_id', None)
otp_expire = session.get('otp_expire', None)
if not user_id or not otp_expire: # TODO LOG
return redirect(url_for('root.login'))
# Check if Login is expired
if otp_expire < int(time.time()): # TODO LOG
session.pop('user_id', None)
session.pop('otp_expire', None)
error = "First Login Expired"
return redirect(url_for('root.login', error=error))
user = AILUser.get(user_id)
if not user.is_2fa_setup():
return redirect(url_for('root.setup_2fa'))
if request.method == 'POST':
code = request.form.get('otp')
if user.is_valid_otp(code):
session.pop('user_id', None)
session.pop('otp_expire', None)
# Login User
user.rotate_session()
login_user(user)
if user.request_password_change():
return redirect(url_for('root.change_password'))
else:
# # next page
# if next_page and next_page != 'None' and next_page != '/':
# return redirect(next_page)
# dashboard
# else:
return redirect(url_for('dashboard.index'))
else:
htop_counter = user.get_htop_counter()
error = "The OTP is incorrect or has expired"
return render_template("verify_otp.html", htop_counter=htop_counter, error=error)
else:
htop_counter = user.get_htop_counter()
return render_template("verify_otp.html", htop_counter=htop_counter)
@root.route('/2fa/setup', methods=['POST', 'GET'])
def setup_2fa():
user_id = session.get('user_id', None)
otp_expire = session.get('otp_expire', None)
if not user_id or not otp_expire: # TODO LOG
return redirect(url_for('root.login'))
# Check if Login is expired
if otp_expire < int(time.time()): # TODO LOG
session.pop('user_id', None)
session.pop('otp_expire', None)
error = "First Login Expired"
return redirect(url_for('root.login', error=error))
user = AILUser.get(user_id)
if user.is_2fa_setup():
return redirect(url_for('root.verify_2fa'))
if request.method == 'POST':
code = request.form.get('otp')
if user.is_valid_otp(code):
user.setup_2fa()
session.pop('user_id', None)
session.pop('otp_expire', None)
# Login User
user.rotate_session()
login_user(user)
if user.request_password_change():
return redirect(url_for('root.change_password'))
else:
return redirect(url_for('dashboard.index'))
else:
error = "The OTP is incorrect or has expired"
return redirect(url_for('root.setup_2fa', error=error))
else:
error = request.args.get('error')
if error:
qr_code, hotp_codes = user.init_setup_2fa(create=False)
else:
qr_code, hotp_codes = user.init_setup_2fa()
return render_template("setup_otp.html", qr_code=qr_code, hotp_codes=hotp_codes, error=error)
@root.route('/change_password', methods=['POST', 'GET']) @root.route('/change_password', methods=['POST', 'GET'])
@login_required @login_required
def change_password(): def change_password():

View file

@ -0,0 +1,52 @@
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>AIL-Framework</title>
<link rel="icon" href="{{ url_for('static', filename='image/ail-icon.png')}}">
<!-- Core CSS -->
<link href="{{ url_for('static', filename='css/bootstrap4.min.css') }}" rel="stylesheet">
<link href="{{ url_for('static', filename='css/font-awesome.min.css') }}" rel="stylesheet">
<!-- JS -->
<script src="{{ url_for('static', filename='js/jquery.js')}}"></script>
<script src="{{ url_for('static', filename='js/bootstrap4.min.js')}}"></script>
</head>
<body class="text-center">
<img class="mb-4" src="{{ url_for('static', filename='image/ail-project.png')}}" width="200">
<h1 class="text-secondary">2FA: OTP Setup</h1>
<div class="row">
<div class="col-lg-6">
<h3 class="text-secondary">TOTP</h3>
<img src="data:image/png;base64, {{ qr_code }}">
<div style="font-size: 20px;"> - Install an <b>authenticator application</b> on your mobile.</div>
<div style="font-size: 20px;"> - <b>Scan</b> the QRCode</div>
</div>
<div class="col-lg-6">
<h3 class="text-secondary">HOTP</h3>
{% for code in hotp_codes %}
<div><i>{{ code[:-6] }}</i> <b>{{ code[-6:] }}</b></div>
{% endfor %}
</div>
</div>
<form class="form-signin" action="{{ url_for('root.setup_2fa')}}" method="post">
{# <h1 class="h3 mb-3 text-secondary">2FA: Please Enter your TOTP or HOTP</h1>#}
<div class="mx-4">
<input type="text" id="otp" name="otp" class="form-control {% if error %}is-invalid{% endif %}" placeholder="OTP Code" autocomplete="off" required autofocus>
{% if error %}
<div class="invalid-feedback">
{{error}}
</div>
{% endif %}
<button class="btn btn-lg btn-primary btn-block mb-4" type="submit">Once you have scanned the QRCode or copied the HOTP codes, Please Enter your TOTP or HOTP</button>
</div>
</form>
</body>

View file

@ -0,0 +1,84 @@
<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>AIL-Framework</title>
<link rel="icon" href="{{ url_for('static', filename='image/ail-icon.png')}}">
<!-- Core CSS -->
<link href="{{ url_for('static', filename='css/bootstrap4.min.css') }}" rel="stylesheet">
<link href="{{ url_for('static', filename='css/font-awesome.min.css') }}" rel="stylesheet">
<!-- JS -->
<script src="{{ url_for('static', filename='js/jquery.js')}}"></script>
<script src="{{ url_for('static', filename='js/bootstrap4.min.js')}}"></script>
<style>
html,
body {
height: 100%;
}
body {
display: -ms-flexbox;
display: flex;
-ms-flex-align: center;
align-items: center;
padding-top: 40px;
padding-bottom: 40px;
background-color: #f5f5f5;
}
.form-signin {
width: 100%;
max-width: 330px;
padding: 15px;
margin: auto;
}
.form-signin .checkbox {
font-weight: 400;
}
.form-signin .form-control {
position: relative;
box-sizing: border-box;
height: auto;
padding: 10px;
font-size: 16px;
}
.form-signin .form-control:focus {
z-index: 2;
}
.form-signin input[type="email"] {
margin-bottom: -1px;
border-bottom-right-radius: 0;
border-bottom-left-radius: 0;
}
.form-signin input[type="password"] {
margin-bottom: 10px;
border-top-left-radius: 0;
border-top-right-radius: 0; 102025
}
</style>
</head>
<body class="text-center">
<form class="form-signin" action="{{ url_for('root.verify_2fa')}}" method="post">
<img class="mb-4" src="{{ url_for('static', filename='image/ail-project.png')}}" width="300">
<h1 class="h3 mb-3 text-secondary">2FA: Please Enter your TOTP or HOTP</h1>
{# <label for="inputEmail" class="sr-only">Email address</label>#}
<input type="text" id="otp" name="otp" class="form-control {% if error %}is-invalid{% endif %}" placeholder="OTP Code" autocomplete="off" required autofocus>
{# <input type="text" id="next_page" name="next_page" value="{{next_page}}" hidden>#}
{% if error %}
<div class="invalid-feedback">
{{error}}
</div>
{% endif %}
<button class="btn btn-lg btn-primary btn-block" type="submit">Sign in</button>
</form>
</body>