diff --git a/bin/lib/ail_users.py b/bin/lib/ail_users.py index f7f3ffe0..e3eb4434 100755 --- a/bin/lib/ail_users.py +++ b/bin/lib/ail_users.py @@ -8,7 +8,11 @@ import re import secrets import sys +import segno + +from base64 import b64encode from flask_login import UserMixin +from io import BytesIO from uuid import uuid4 sys.path.append(os.environ['AIL_BIN']) @@ -21,6 +25,11 @@ from lib.ConfigLoader import ConfigLoader config_loader = ConfigLoader() r_serv_db = config_loader.get_db_conn("Kvrocks_DB") r_cache = config_loader.get_redis_conn("Redis_Cache") + +if config_loader.get_config_boolean('Users', 'force_2fa'): + r_serv_db.hset('ail:2fa', '2fa', 1) +else: + r_serv_db.hset('ail:2fa', '2fa', 0) config_loader = None regex_password = r'^(?=(.*\d){2})(?=.*[a-z])(?=.*[A-Z]).{10,100}$' @@ -83,6 +92,9 @@ def hashing_password(password): password = password.encode() return bcrypt.hashpw(password, bcrypt.gensalt()) +def get_user_passwd_hash(user_id): + return r_serv_db.hget('ail:users:all', user_id) + ## --PASSWORDS-- ## def check_email(email): @@ -95,6 +107,15 @@ def check_email(email): #### TOKENS #### +def get_user_token(user_id): + return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token') + +def get_token_user(token): + return r_serv_db.hget('ail:users:tokens', token) + +def exists_token(token): + return r_serv_db.hexists('ail:users:tokens', token) + def gen_token(): return secrets.token_urlsafe(41) @@ -136,8 +157,93 @@ def _get_hotp(secret): def _verify_hotp(hotp, counter, code): return hotp.verify(code, counter) +def get_user_otp_secret(user_id): + return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_secret') + +def get_user_hotp_counter(user_id): + return int(r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_counter')) + +def verify_user_totp(user_id, code): + totp = _get_totp(get_user_otp_secret(user_id)) + return _verify_totp(totp, code) + +def verify_user_hotp(user_id, code): # TODO IF valid increase counter + hotp = _get_hotp(get_user_otp_secret(user_id)) + counter = get_user_hotp_counter(user_id) + return _verify_hotp(hotp, counter, code) + +def verify_user_otp(user_id, code): + if verify_user_totp(user_id, code): + return True + elif verify_user_hotp(user_id, code): + return True + return False + +def create_user_otp(user_id): + secret = pyotp.random_base32() + r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_secret', secret) + r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_counter', 0) + enable_user_2fa(user_id) + +def delete_user_otp(user_id): + r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'otp_secret') + r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'otp_counter') + r_serv_db.hset(f'ail:user:metadata:{user_id}', 'otp_setup', 0) + disable_user_2fa(user_id) + +def get_user_otp_uri(user_id, instance_name): + return pyotp.totp.TOTP(get_user_otp_secret(user_id)).provisioning_uri(name=user_id, issuer_name=instance_name) + +def get_user_otp_qr_code(user_id, instance_name): + uri = get_user_otp_uri(user_id, instance_name) + qrcode = segno.make_qr(uri) + buff = BytesIO() + qrcode.save(buff, kind='png', scale=10) + return b64encode(buff.getvalue()).decode() + # qrcode.save('qrcode.png', scale=10) + +def get_user_hotp_code(user_id): + hotp = _get_hotp(get_user_otp_secret(user_id)) + counter = get_user_hotp_counter(user_id) + codes = [] + for i in range(counter, counter + 20): + codes.append(f'{i}: {hotp.at(i)}') + return codes + +# TODO GET USER HOTP LISTS +# TODO RESET OTP + +def is_user_otp_setup(user_id): + otp_setup = r_serv_db.hget(f'ail:user:metadata:{user_id}', 'otp_setup') + if otp_setup: + return int(otp_setup) == 1 + return False + ## --OTP-- ## +#### 2FA #### + +# Global 2fa option +def is_2fa_enabled(): + fa2 = r_serv_db.hget('ail:2fa', '2fa') + if fa2: + return int(fa2) == 1 + return False + +def is_user_2fa_enabled(user_id): + fa2 = r_serv_db.hget(f'ail:user:metadata:{user_id}', '2fa') + if fa2: + return int(fa2) == 1 + return False + +def enable_user_2fa(user_id): + return r_serv_db.hset(f'ail:user:metadata:{user_id}', '2fa', 1) + +def disable_user_2fa(user_id): + return r_serv_db.hset(f'ail:user:metadata:{user_id}', '2fa', 0) + +## --2FA-- ## + #### USERS #### def get_users(): @@ -146,35 +252,6 @@ def get_users(): def get_user_role(user_id): return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role') -def get_user_passwd_hash(user_id): - return r_serv_db.hget('ail:users:all', user_id) - -def get_user_token(user_id): - return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token') - -def get_token_user(token): - return r_serv_db.hget('ail:users:tokens', token) - -def exists_token(token): - return r_serv_db.hexists('ail:users:tokens', token) - -# def _get_user_otp(user_id): -# -# -# def get_user_hotps(user_id): -# -# -# def _get_user_hotp(user_id): -# -# -# def verify_user_otp(user_id, code): -# -# -# def get_user_hotp_counter(user_id): -# return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'hotp:counter') -# -# def verify_user_hotp(user_id, code): -# counter ######################################################################################################################## ######################################################################################################################## @@ -284,6 +361,31 @@ class AILUser(UserMixin): _set_user_token(self.user_id, new_api_key) return new_api_key + ## OTP ## + + def is_2fa_setup(self): + return is_user_otp_setup(self.user_id) + + def is_2fa_enabled(self): + if is_2fa_enabled(): + return True + else: + return is_user_2fa_enabled(self.user_id) + + def get_htop_counter(self): + return get_user_hotp_counter(self.user_id) + + def is_valid_otp(self, code): + return verify_user_otp(self.user_id, code) + + def init_setup_2fa(self, create=True): + if create: + create_user_otp(self.user_id) + return get_user_otp_qr_code(self.user_id, 'AIL TEST'), get_user_hotp_code(self.user_id) + + def setup_2fa(self): + r_serv_db.hset(f'ail:user:metadata:{self.user_id}', 'otp_setup', 1) + ## ROLE ## def is_in_role(self, role): # TODO Get role via user alternative ID @@ -501,3 +603,10 @@ def check_user_role_integrity(user_id): return res ## --ROLES-- ## + +if __name__ == '__main__': + user_id = 'admin@admin.test' + instance_name = 'AIL TEST' + delete_user_otp(user_id) + # q = get_user_otp_qr_code(user_id, instance_name) + # print(q) \ No newline at end of file diff --git a/configs/core.cfg.sample b/configs/core.cfg.sample index 26aac94d..1bb77e4a 100644 --- a/configs/core.cfg.sample +++ b/configs/core.cfg.sample @@ -79,6 +79,10 @@ minute_processed_paste = 10 #Maximum line length authorized to make a diff between duplicates DiffMaxLineLength = 10000 +##### Users ##### +[Users] +force_2fa = False + [AIL_2_AIL] server_host = 0.0.0.0 server_port = 4443 diff --git a/var/www/blueprints/root.py b/var/www/blueprints/root.py index 829e326d..a705fc38 100644 --- a/var/www/blueprints/root.py +++ b/var/www/blueprints/root.py @@ -7,8 +7,10 @@ import os import sys +import time from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response +from flask import session from flask_login import login_required, current_user, login_user, logout_user sys.path.append('modules') @@ -62,7 +64,6 @@ def login(): if username is not None: user = AILUser.get(username) # TODO ANONYMOUS USER - print(user.is_anonymous) # brute force by user_id login_failed_user_id = r_cache.get(f'failed_login_user_id:{username}') @@ -78,20 +79,32 @@ def login(): logging_error = 'Incorrect User ACL, Please contact your administrator' return render_template("login.html", error=logging_error) - # Login User - user.rotate_session() - login_user(user) + if user.is_2fa_enabled(): - if user.request_password_change(): - return redirect(url_for('root.change_password')) - else: - # update note - # next page - if next_page and next_page != 'None' and next_page != '/': - return redirect(next_page) - # dashboard + session['user_id'] = user.get_user_id() + session['otp_expire'] = int(time.time()) + 10800 + + if not user.is_2fa_setup(): + return redirect(url_for('root.setup_2fa')) else: - return redirect(url_for('dashboard.index')) + htop_counter = user.get_htop_counter() + return redirect(url_for('root.verify_2fa', htop_counter=htop_counter)) + + else: + # Login User + user.rotate_session() + login_user(user) + + if user.request_password_change(): + return redirect(url_for('root.change_password')) + else: + # update note + # next page + if next_page and next_page != 'None' and next_page != '/': + return redirect(next_page) + # dashboard + else: + return redirect(url_for('dashboard.index')) # LOGIN FAILED else: @@ -117,6 +130,103 @@ def login(): error = request.args.get('error') return render_template("login.html", next_page=next_page, error=error) +@root.route('/2fa', methods=['POST', 'GET']) # TODO CHECK IF user_id exists +def verify_2fa(): + user_id = session.get('user_id', None) + otp_expire = session.get('otp_expire', None) + + if not user_id or not otp_expire: # TODO LOG + return redirect(url_for('root.login')) + + # Check if Login is expired + if otp_expire < int(time.time()): # TODO LOG + session.pop('user_id', None) + session.pop('otp_expire', None) + error = "First Login Expired" + return redirect(url_for('root.login', error=error)) + + user = AILUser.get(user_id) + if not user.is_2fa_setup(): + return redirect(url_for('root.setup_2fa')) + + if request.method == 'POST': + + code = request.form.get('otp') + + if user.is_valid_otp(code): + session.pop('user_id', None) + session.pop('otp_expire', None) + + # Login User + user.rotate_session() + login_user(user) + + if user.request_password_change(): + return redirect(url_for('root.change_password')) + else: + # # next page + # if next_page and next_page != 'None' and next_page != '/': + # return redirect(next_page) + # dashboard + # else: + return redirect(url_for('dashboard.index')) + else: + htop_counter = user.get_htop_counter() + error = "The OTP is incorrect or has expired" + return render_template("verify_otp.html", htop_counter=htop_counter, error=error) + + else: + htop_counter = user.get_htop_counter() + return render_template("verify_otp.html", htop_counter=htop_counter) + +@root.route('/2fa/setup', methods=['POST', 'GET']) +def setup_2fa(): + user_id = session.get('user_id', None) + otp_expire = session.get('otp_expire', None) + + if not user_id or not otp_expire: # TODO LOG + return redirect(url_for('root.login')) + + # Check if Login is expired + if otp_expire < int(time.time()): # TODO LOG + session.pop('user_id', None) + session.pop('otp_expire', None) + error = "First Login Expired" + return redirect(url_for('root.login', error=error)) + + user = AILUser.get(user_id) + + if user.is_2fa_setup(): + return redirect(url_for('root.verify_2fa')) + + if request.method == 'POST': + code = request.form.get('otp') + + if user.is_valid_otp(code): + user.setup_2fa() + + session.pop('user_id', None) + session.pop('otp_expire', None) + + # Login User + user.rotate_session() + login_user(user) + + if user.request_password_change(): + return redirect(url_for('root.change_password')) + else: + return redirect(url_for('dashboard.index')) + else: + error = "The OTP is incorrect or has expired" + return redirect(url_for('root.setup_2fa', error=error)) + else: + error = request.args.get('error') + if error: + qr_code, hotp_codes = user.init_setup_2fa(create=False) + else: + qr_code, hotp_codes = user.init_setup_2fa() + return render_template("setup_otp.html", qr_code=qr_code, hotp_codes=hotp_codes, error=error) + @root.route('/change_password', methods=['POST', 'GET']) @login_required def change_password(): diff --git a/var/www/templates/setup_otp.html b/var/www/templates/setup_otp.html new file mode 100644 index 00000000..e2548b60 --- /dev/null +++ b/var/www/templates/setup_otp.html @@ -0,0 +1,52 @@ + + + +
+ +