chg: [users] manually logout users/kill user session + add users meta:creator, creation date, last_edit, last_login, last_seen, is_logged

This commit is contained in:
terrtia 2024-07-01 14:54:19 +02:00
parent 12e260a4d9
commit 2106a0a1a3
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
6 changed files with 246 additions and 77 deletions

View file

@ -11,6 +11,7 @@ import sys
import segno
from base64 import b64encode
from datetime import datetime
from flask_login import UserMixin
from io import BytesIO
from uuid import uuid4
@ -75,6 +76,11 @@ def kill_sessions():
r_cache.delete('ail:sessions')
r_cache.delete('ail:sessions:users')
def is_user_logged(user_id):
if get_user_session(user_id):
return True
else:
return False
#### PASSWORDS ####
@ -255,6 +261,111 @@ def get_users():
def get_user_role(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role')
## --USERS-- ##
#### USERS ####
def get_user_creator(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'creator')
def get_user_creation_date(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'created_at')
def get_user_last_edit(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'last_edit') # self edit or admin ???
def get_user_last_login(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'last_login')
def get_user_last_seen(user_id):
return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'last_seen')
def get_disabled_users():
return r_serv_db.smembers(f'ail:users:disabled')
def is_user_disabled(user_id):
return r_serv_db.sismember(f'ail:users:disabled', user_id)
def disable_user(user_id):
r_serv_db.sadd(f'ail:users:disabled', user_id)
def enable_user(user_id):
r_serv_db.srem(f'ail:users:disabled', user_id)
def create_user(user_id, password=None, admin_id=None, chg_passwd=True, role=None, otp=False): # TODO LOGS
# # TODO: check password strength
if password:
new_password = password
else:
new_password = gen_password()
password_hash = hashing_password(new_password)
# EDIT
if exists_user(user_id):
if password or chg_passwd:
edit_user(user_id, password_hash, chg_passwd=chg_passwd)
if role:
edit_user_role(user_id, role)
# CREATE USER
elif admin_id:
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'creator', admin_id)
date = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'created_at', date)
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'last_edit', date)
# Role
if not role:
role = get_default_role()
if role in get_all_roles():
for role_to_add in get_all_user_role(role):
r_serv_db.sadd(f'ail:users:role:{role_to_add}', user_id)
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role)
r_serv_db.hset('ail:users:all', user_id, password_hash)
if chg_passwd:
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True')
# create user token
generate_new_token(user_id)
if otp or is_2fa_enabled():
enable_user_2fa(user_id)
def edit_user(user_id, password_hash, chg_passwd=False, otp=True):
if chg_passwd:
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True')
r_serv_db.hset('ail:users:all', user_id, password_hash)
# create new token
generate_new_token(user_id)
else:
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'change_passwd')
# 2FA OTP
if otp or is_2fa_enabled():
enable_user_2fa(user_id)
else:
disable_user_2fa(user_id)
date = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'last_edit', date)
# Remove default user password file
if user_id == 'admin@admin.test':
default_passwd_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD')
if os.path.isfile(default_passwd_file):
os.remove(default_passwd_file)
## --USER-- ##
########################################################################################################################
########################################################################################################################
@ -295,8 +406,24 @@ class AILUser(UserMixin):
def exists(self): # TODO CHECK USAGE
return r_serv_db.exists(f'ail:user:metadata:{self.user_id}')
def get_meta(self, options=set()): # TODO user creation date
def update_last_seen(self):
r_serv_db.hset(f'ail:user:metadata:{self.user_id}', 'last_seen', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
def update_last_login(self):
r_serv_db.hset(f'ail:user:metadata:{self.user_id}', 'last_login', datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'))
def get_meta(self, options=set()):
meta = {'id': self.user_id}
if 'creator' in options:
meta['creator'] = get_user_creator(self.user_id)
if 'created_at' in options:
meta['created_at'] = get_user_creation_date(self.user_id)
if 'last_edit' in options:
meta['last_edit'] = get_user_last_edit(self.user_id)
if 'last_login' in options:
meta['last_login'] = get_user_last_login(self.user_id)
if 'last_seen' in options:
meta['last_seen'] = get_user_last_seen(self.user_id)
if 'api_key' in options: # TODO add option to censor key
meta['api_key'] = self.get_api_key()
if 'role' in options:
@ -305,12 +432,16 @@ class AILUser(UserMixin):
meta['2fa'] = self.is_2fa_enabled()
if 'otp_setup' in options:
meta['otp_setup'] = self.is_2fa_setup()
if 'is_disabled' in options:
meta['is_disabled'] = self.is_disabled()
if 'is_logged' in options:
meta['is_logged'] = is_user_logged(self.user_id)
return meta
## SESSION ##
def is_logged(self): #####################################################################################################
pass
def is_disabled(self):
return is_user_disabled(self.user_id)
def get_session(self):
return self.id
@ -426,7 +557,7 @@ class AILUser(UserMixin):
def api_get_users_meta():
meta = {'users': []}
options = {'api_key', 'role', '2fa', 'otp_setup'}
options = {'api_key', 'creator', 'created_at', 'is_logged', 'last_edit', 'last_login', 'last_seen', 'role', '2fa', 'otp_setup'}
for user_id in get_users():
user = AILUser(user_id)
meta['users'].append(user.get_meta(options=options))
@ -447,6 +578,35 @@ def api_get_user_hotp(user_id):
hotp = get_user_hotp_code(user_id)
return hotp, 200
def api_logout_user(admin_id, user_id): # TODO LOG ADMIN ID
user = AILUser(user_id)
if not user.exists():
return {'status': 'error', 'reason': 'User not found'}, 404
print(admin_id)
return user.kill_session(), 200
def api_logout_users(admin_id): # TODO LOG ADMIN ID
print(admin_id)
return kill_sessions(), 200
def api_disable_user(admin_id, user_id): # TODO LOG ADMIN ID
user = AILUser(user_id)
if not user.exists():
return {'status': 'error', 'reason': 'User not found'}, 404
if user.is_disabled():
return {'status': 'error', 'reason': 'User is already disabled'}, 400
print(admin_id)
disable_user(user_id)
def api_enable_user(admin_id, user_id): # TODO LOG ADMIN ID
user = AILUser(user_id)
if not user.exists():
return {'status': 'error', 'reason': 'User not found'}, 404
if not user.is_disabled():
return {'status': 'error', 'reason': 'User is not disabled'}, 400
print(admin_id)
enable_user(user_id)
def api_enable_user_otp(user_id):
user = AILUser(user_id)
if not user.exists():
@ -521,60 +681,6 @@ def get_users_metadata(list_users):
users.append(get_user_metadata(user))
return users
def create_user(user_id, password=None, chg_passwd=True, role=None, otp=False): # TODO ###############################################################
# # TODO: check password strength
if password:
new_password = password
else:
new_password = gen_password()
password_hash = hashing_password(new_password)
# EDIT
if exists_user(user_id):
if password or chg_passwd:
edit_user(user_id, password_hash, chg_passwd=chg_passwd)
if role:
edit_user_role(user_id, role)
# CREATE USER
else:
# Role
if not role:
role = get_default_role()
if role in get_all_roles():
for role_to_add in get_all_user_role(role):
r_serv_db.sadd(f'ail:users:role:{role_to_add}', user_id)
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role)
r_serv_db.hset('ail:users:all', user_id, password_hash)
if chg_passwd:
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True')
# create user token
generate_new_token(user_id)
if otp or is_2fa_enabled():
enable_user_2fa(user_id)
def edit_user(user_id, password_hash, chg_passwd=False, otp=False): # TODO ######################################################3333
if chg_passwd:
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True')
else:
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'change_passwd')
# remove default user password file
if user_id == 'admin@admin.test':
default_passwd_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD')
if os.path.isfile(default_passwd_file):
os.remove(default_passwd_file)
r_serv_db.hset('ail:users:all', user_id, password_hash)
# create new token
generate_new_token(user_id)
if otp or is_2fa_enabled():
enable_user_2fa(user_id)
else:
disable_user_2fa(user_id)
# # TODO: solve edge_case self delete
def delete_user(user_id):
if exists_user(user_id):
@ -585,6 +691,7 @@ def delete_user(user_id):
r_serv_db.hdel('ail:users:tokens', user_token)
r_serv_db.delete(f'ail:user:metadata:{user_id}')
r_serv_db.hdel('ail:users:all', user_id)
r_serv_db.srem(f'ail:users:disabled', user_id)
# # TODO: raise Exception
else:
@ -662,9 +769,9 @@ def check_user_role_integrity(user_id):
## --ROLES-- ##
if __name__ == '__main__':
user_id = 'admin@admin.test'
instance_name = 'AIL TEST'
delete_user_otp(user_id)
# q = get_user_otp_qr_code(user_id, instance_name)
# print(q)
# if __name__ == '__main__':
# user_id = 'admin@admin.test'
# instance_name = 'AIL TEST'
# delete_user_otp(user_id)
# # q = get_user_otp_qr_code(user_id, instance_name)
# # print(q)

View file

@ -221,6 +221,13 @@ def add_header(response):
response.headers['Cache-Control'] = 'private, max-age=0'
return response
# ========== USERS ============
@app.before_request
def before_request():
if current_user.is_authenticated:
current_user.update_last_seen()
# ========== ROUTES ============
#@app.route('/endpoints')

View file

@ -14,7 +14,6 @@ from flask import session
from flask_login import login_required, current_user, login_user, logout_user
sys.path.append('modules')
import Flask_config
# Import Role_Manager
from Role_Manager import login_admin, login_analyst
@ -24,9 +23,18 @@ sys.path.append(os.environ['AIL_BIN'])
# Import Project packages
##################################
from lib import Users
from lib.ail_users import AILUser
from lib.ail_users import AILUser, kill_sessions
from lib.ConfigLoader import ConfigLoader
# Config
config_loader = ConfigLoader()
r_cache = config_loader.get_redis_conn("Redis_Cache")
config_loader = None
# Kill previous sessions
kill_sessions()
r_cache = Flask_config.r_cache
# ============ BLUEPRINT ============
@ -96,6 +104,7 @@ def login():
# Login User
user.rotate_session()
login_user(user)
user.update_last_login()
if user.request_password_change():
return redirect(url_for('root.change_password'))
@ -163,6 +172,7 @@ def verify_2fa():
# Login User
user.rotate_session()
login_user(user)
user.update_last_login()
if user.request_password_change():
return redirect(url_for('root.change_password'))
@ -213,6 +223,7 @@ def setup_2fa():
# Login User
user.rotate_session()
login_user(user)
user.update_last_login()
if user.request_password_change():
return redirect(url_for('root.change_password'))

View file

@ -157,7 +157,7 @@ def user_otp_reset(): # TODO ask for password ?
else:
user = ail_users.AILUser.get(user_id)
user.kill_session()
return redirect(url_for('settings_b.user_profile'))
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/user/api_key/new", methods=['GET'])
@login_required
@ -182,6 +182,29 @@ def new_token_user():
else:
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/user/logout", methods=['GET'])
@login_required
@login_admin
def user_logout():
user_id = request.args.get('user_id') # TODO LOGS
admin_id = current_user.get_user_id()
r = ail_users.api_logout_user(admin_id, user_id)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/users/logout", methods=['GET'])
@login_required
@login_admin
def users_logout():
admin_id = current_user.get_user_id() # TODO LOGS
r = ail_users.api_logout_users(admin_id)
if r[1] != 200:
return create_json_response(r[0], r[1])
else:
return redirect(url_for('settings_b.users_list'))
@settings_b.route("/settings/create_user", methods=['GET'])
@login_required
@login_admin
@ -248,7 +271,7 @@ def create_user_post():
if not password1 and not password2:
password = None
str_password = 'Password not changed'
ail_users.create_user(email, password=password, role=role, otp=enable_2_fa)
ail_users.create_user(email, password=password, admin_id=admin_id, role=role, otp=enable_2_fa)
new_user = {'email': email, 'password': str_password, 'otp': enable_2_fa}
return render_template("create_user.html", new_user=new_user, meta={}, all_roles=all_roles, acl_admin=True)

View file

@ -8,18 +8,18 @@ sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import Users
from lib import ail_users
if __name__ == "__main__":
# create role_list
Users._create_roles_list()
ail_users._create_roles_list()
user_id = 'admin@admin.test'
password = Users.gen_password()
password = ail_users.gen_password()
Users.create_user(user_id, password=password, role='admin')
token = Users.get_default_admin_token()
ail_users.create_user(user_id, password=password, admin_id='admin@admin.test', role='admin')
token = ail_users.get_default_admin_token()
default_passwd_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD')
to_write_str = '# Password Generated by default\n# This file is deleted after the first login\n#\nemail=admin@admin.test\npassword='
@ -27,6 +27,6 @@ if __name__ == "__main__":
with open(default_passwd_file, 'w') as f:
f.write(to_write_str)
print('new user created: {}'.format(user_id))
print('password: {}'.format(password))
print('token: {}'.format(token))
print(f'new user created: {user_id}')
print(f'password: {password}')
print(f'token: {token}')

View file

@ -32,6 +32,9 @@
<thead class="thead-dark">
<tr>
<th>User</th>
<th>Last Edit</th>
<th>Last Login</th>
<th>Last Seen</th>
<th>Role</th>
<th>Api Key</th>
<th></th>
@ -43,6 +46,20 @@
{% for user in meta['users'] %}
<tr>
<td>{{user['id']}}</td>
<td>{{user['last_edit']}}</td>
<td>
{% if user['last_login'] %}
{{user['last_login']}}
{% else %}-{% endif %}
{{ meta['is_logged'] }}
{% if user['is_logged'] %}
<i class="fas fa-plug text-success"></i>
<a class="btn btn-outline-danger px-1 py-0" href="{{ url_for('settings_b.user_logout', user_id=user['id']) }}">
<i class="fas fa-sign-out-alt"></i>
</a>
{% endif %}
</td>
<td>{% if user['last_seen'] %}{{user['last_seen']}}{% else %}-{% endif %}</td>
<td>{{user['role']}}</td>
<td>
<span id="censored_key_{{loop.index0}}">
@ -96,6 +113,10 @@
</tbody>
</table>
<a class="btn btn-danger my-4" href="{{ url_for('settings_b.users_logout') }}">
<i class="fas fa-sign-out-alt"></i> Logout All Users
</a>
</div>
</div>
</div>