From e752f59a0ad46b413af5a9a1e3515d6c57117757 Mon Sep 17 00:00:00 2001 From: terrtia Date: Tue, 13 Aug 2024 14:10:17 +0200 Subject: [PATCH] chg: [ail users] remove old User lib + improve API test --- bin/lib/Tracker.py | 8 +- bin/lib/Users.py | 295 ---------------------------- bin/lib/ail_api.py | 8 +- bin/lib/ail_users.py | 59 +----- tests/test_api.py | 15 +- update/v5.0/DB_KVROCKS_MIGRATION.py | 9 +- var/www/blueprints/root.py | 19 +- 7 files changed, 36 insertions(+), 377 deletions(-) delete mode 100755 bin/lib/Users.py diff --git a/bin/lib/Tracker.py b/bin/lib/Tracker.py index 954cda50..5bd1c47b 100755 --- a/bin/lib/Tracker.py +++ b/bin/lib/Tracker.py @@ -29,7 +29,7 @@ from lib import ail_logger from lib import ConfigLoader from lib import item_basic from lib import Tag -from lib.Users import User +from lib.ail_users import AILUser # LOGS logging.config.dictConfig(ail_logger.get_config(name='modules')) @@ -795,7 +795,7 @@ def api_check_tracker_acl(tracker_uuid, user_id): return res tracker = Tracker(tracker_uuid) if tracker.is_level_user(): - if tracker.get_user() != user_id or not User(user_id).is_in_role('admin'): + if tracker.get_user() != user_id or not AILUser(user_id).is_in_role('admin'): return {"status": "error", "reason": "Access Denied"}, 403 return None @@ -805,7 +805,7 @@ def api_is_allowed_to_edit_tracker(tracker_uuid, user_id): tracker_creator = r_tracker.hget('tracker:{}'.format(tracker_uuid), 'user_id') if not tracker_creator: return {"status": "error", "reason": "Unknown uuid"}, 404 - user = User(user_id) + user = AILUser(user_id) if not user.is_in_role('admin') and user_id != tracker_creator: return {"status": "error", "reason": "Access Denied"}, 403 return {"uuid": tracker_uuid}, 200 @@ -817,7 +817,7 @@ def api_is_allowed_to_access_tracker(tracker_uuid, user_id): tracker_creator = r_tracker.hget('tracker:{}'.format(tracker_uuid), 'user_id') if not tracker_creator: return {"status": "error", "reason": "Unknown uuid"}, 404 - user = User(user_id) + user = AILUser(user_id) if not is_tracker_global_level(tracker_uuid): if not user.is_in_role('admin') and user_id != tracker_creator: return {"status": "error", "reason": "Access Denied"}, 403 diff --git a/bin/lib/Users.py b/bin/lib/Users.py deleted file mode 100755 index 765b1360..00000000 --- a/bin/lib/Users.py +++ /dev/null @@ -1,295 +0,0 @@ -#!/usr/bin/env python3 -# -*-coding:UTF-8 -* - -import bcrypt -import os -import re -import secrets -import sys - -from flask_login import UserMixin - -sys.path.append(os.environ['AIL_BIN']) -################################## -# Import Project packages -################################## -from lib.ConfigLoader import ConfigLoader - -# Config -config_loader = ConfigLoader() -r_serv_db = config_loader.get_db_conn("Kvrocks_DB") -config_loader = None - -regex_password = r'^(?=(.*\d){2})(?=.*[a-z])(?=.*[A-Z]).{10,100}$' -regex_password = re.compile(regex_password) - -# # TODO: ADD FUNCTIONS PASSWORD RESET + API KEY RESET + CREATE USER - -# # TODO: migrate Role_Manager - -#### PASSWORDS + TOKENS #### - -def check_password_strength(password): - result = regex_password.match(password) - if result: - return True - else: - return False - -def gen_password(): - return secrets.token_urlsafe(30) - -def hashing_password(password): - password = password.encode() - return bcrypt.hashpw(password, bcrypt.gensalt()) - -def gen_token(): - return secrets.token_urlsafe(41) - -def _delete_user_token(user_id): - current_token = get_user_token(user_id) - if current_token: - r_serv_db.hdel('ail:users:tokens', current_token) - -def _set_user_token(user_id, token): - r_serv_db.hset('ail:users:tokens', token, user_id) - r_serv_db.hset(f'ail:user:metadata:{user_id}', 'token', token) - -def generate_new_token(user_id): - # create user token - _delete_user_token(user_id) - token = gen_token() - _set_user_token(user_id, token) - -def get_default_admin_token(): - if r_serv_db.exists('ail:user:metadata:admin@admin.test'): - return r_serv_db.hget('ail:user:metadata:admin@admin.test', 'token') - else: - return '' - -##-- PASSWORDS + TOKENS --## - -#### USERS #### - -def get_all_users(): - return r_serv_db.hkeys('ail:users:all') - -def get_user_role(user_id): - return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role') - -def get_user_passwd_hash(user_id): - return r_serv_db.hget('ail:users:all', user_id) - -def get_user_token(user_id): - return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token') - -def get_token_user(token): - return r_serv_db.hget('ail:users:tokens', token) - -def exists_token(token): - return r_serv_db.hexists('ail:users:tokens', token) - -def exists_user(user_id): - return r_serv_db.exists(f'ail:user:metadata:{user_id}') - -def get_user_metadata(user_id): - user_metadata = {'email': user_id, - 'role': r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role'), - 'api_key': r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token')} - return user_metadata - -def get_users_metadata(list_users): - users = [] - for user in list_users: - users.append(get_user_metadata(user)) - return users - -def create_user(user_id, password=None, chg_passwd=True, role=None): - # # TODO: check password strength - if password: - new_password = password - else: - new_password = gen_password() - password_hash = hashing_password(new_password) - - # EDIT - if exists_user(user_id): - if password or chg_passwd: - edit_user_password(user_id, password_hash, chg_passwd=chg_passwd) - if role: - edit_user_role(user_id, role) - # CREATE USER - else: - # Role - if not role: - role = get_default_role() - - if role in get_all_roles(): - for role_to_add in get_all_user_role(role): - r_serv_db.sadd(f'ail:users:role:{role_to_add}', user_id) - r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role) - - r_serv_db.hset('ail:users:all', user_id, password_hash) - if chg_passwd: - r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True') - - # create user token - generate_new_token(user_id) - -def edit_user_password(user_id, password_hash, chg_passwd=False): - if chg_passwd: - r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True') - else: - r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'change_passwd') - # remove default user password file - if user_id == 'admin@admin.test': - default_passwd_file = os.path.join(os.environ['AIL_HOME'], 'DEFAULT_PASSWORD') - if os.path.isfile(default_passwd_file): - os.remove(default_passwd_file) - r_serv_db.hset('ail:users:all', user_id, password_hash) - # create new token - generate_new_token(user_id) - -# # TODO: solve edge_case self delete -def delete_user(user_id): - if exists_user(user_id): - for role_id in get_all_roles(): - r_serv_db.srem(f'ail:users:role:{role_id}', user_id) - user_token = get_user_token(user_id) - if user_token: - r_serv_db.hdel('ail:users:tokens', user_token) - r_serv_db.delete(f'ail:user:metadata:{user_id}') - r_serv_db.hdel('ail:users:all', user_id) - - # # TODO: raise Exception - else: - print(f'Error: user {user_id} do not exist') - -##-- USERS --## - -#### ROLES #### - -def get_all_roles(): - return r_serv_db.zrange('ail:roles:all', 0, -1) - -# create role_list -def _create_roles_list(): - if not r_serv_db.exists('ail:roles:all'): - r_serv_db.zadd('ail:roles:all', {'admin': 1}) - r_serv_db.zadd('ail:roles:all', {'analyst': 2}) - r_serv_db.zadd('ail:roles:all', {'user': 3}) - r_serv_db.zadd('ail:roles:all', {'user_no_api': 4}) - r_serv_db.zadd('ail:roles:all', {'read_only': 5}) - -def get_role_level(role): - return int(r_serv_db.zscore('ail:roles:all', role)) - -def get_user_role_by_range(inf, sup): - return r_serv_db.zrange('ail:roles:all', inf, sup) - -def get_all_user_role(user_role): - current_role_val = get_role_level(user_role) - return r_serv_db.zrange('ail:roles:all', current_role_val - 1, -1) - -def get_all_user_upper_role(user_role): - current_role_val = get_role_level(user_role) - # remove one rank - if current_role_val > 1: - return r_serv_db.zrange('ail:roles:all', 0, current_role_val -2) - else: - return [] - -def get_default_role(): - return 'read_only' - -def is_in_role(user_id, role): - return r_serv_db.sismember(f'ail:users:role:{role}', user_id) - -def edit_user_role(user_id, role): - current_role = get_user_role(user_id) - if role != current_role: - request_level = get_role_level(role) - current_role = get_role_level(current_role) - - if current_role < request_level: - role_to_remove = get_user_role_by_range(current_role - 1, request_level - 2) - for role_id in role_to_remove: - r_serv_db.srem(f'ail:users:role:{role_id}', user_id) - r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role) - else: - role_to_add = get_user_role_by_range(request_level - 1, current_role) - for role_id in role_to_add: - r_serv_db.sadd(f'ail:users:role:{role_id}', user_id) - r_serv_db.hset(f'ail:user:metadata:{user_id}', 'role', role) - -def check_user_role_integrity(user_id): - user_role = get_user_role(user_id) - all_user_role = get_all_user_role(user_role) - res = True - for role in all_user_role: - if not r_serv_db.sismember(f'ail:users:role:{role}', user_id): - res = False - upper_role = get_all_user_upper_role(user_role) - for role in upper_role: - if r_serv_db.sismember(f'ail:users:role:{role}', user_id): - res = False - return res - -##-- ROLES --## - -class User(UserMixin): - - def __init__(self, id): - - if r_serv_db.hexists('ail:users:all', id): - self.id = id - else: - self.id = "__anonymous__" - - def exists(self): - if self.id == "__anonymous__": - return False - else: - return r_serv_db.exists(f'ail:user:metadata:{self.id}') - - # return True or False - # def is_authenticated(): - - # return True or False - # def is_anonymous(): - - @classmethod - def get(self_class, id): - return self_class(id) - - def user_is_anonymous(self): - if self.id == "__anonymous__": - return True - else: - return False - - def check_password(self, password): - if self.user_is_anonymous(): - return False - - password = password.encode() - hashed_password = r_serv_db.hget('ail:users:all', self.id).encode() - if bcrypt.checkpw(password, hashed_password): - return True - else: - return False - - def request_password_change(self): - if r_serv_db.hget(f'ail:user:metadata:{self.id}', 'change_passwd') == 'True': - return True - else: - return False - - def is_in_role(self, role): - if r_serv_db.sismember(f'ail:users:role:{role}', self.id): - return True - else: - return False - - def get_role(self): - return r_serv_db.hget(f'ail:user:metadata:{self.id}', 'role') diff --git a/bin/lib/ail_api.py b/bin/lib/ail_api.py index 1e05b1cc..56918924 100755 --- a/bin/lib/ail_api.py +++ b/bin/lib/ail_api.py @@ -10,7 +10,7 @@ sys.path.append(os.environ['AIL_BIN']) # Import Project packages ################################## from lib.ConfigLoader import ConfigLoader -from lib import Users +from lib import ail_users config_loader = ConfigLoader() r_cache = config_loader.get_redis_conn("Redis_Cache") @@ -21,10 +21,10 @@ def check_token_format(token, search=re.compile(r'[^a-zA-Z0-9_-]').search): #### return not bool(search(token)) def is_valid_token(token): - return Users.exists_token(token) + return ail_users.exists_token(token) def get_user_from_token(token): - return Users.get_token_user(token) + return ail_users.get_token_user(token) def is_user_in_role(role, token): # verify_user_role # User without API @@ -33,7 +33,7 @@ def is_user_in_role(role, token): # verify_user_role user_id = get_user_from_token(token) if user_id: - return Users.is_in_role(user_id, role) + return ail_users.is_in_role(user_id, role) else: return False diff --git a/bin/lib/ail_users.py b/bin/lib/ail_users.py index 3c8c48a1..398cb8bd 100755 --- a/bin/lib/ail_users.py +++ b/bin/lib/ail_users.py @@ -263,7 +263,10 @@ def get_user_role(user_id): ## --USERS-- ## -#### USERS #### +#### USER #### + +def exists_user(user_id): + return r_serv_db.exists(f'ail:user:metadata:{user_id}') def get_user_creator(user_id): return r_serv_db.hget(f'ail:user:metadata:{user_id}', 'creator') @@ -358,24 +361,12 @@ def edit_user(user_id, password_hash, chg_passwd=False, otp=True): if os.path.isfile(default_passwd_file): os.remove(default_passwd_file) - - - - - - - ## --USER-- ## ######################################################################################################################## ######################################################################################################################## -# TODO USER LAST LOGIN TIME -# TODO Check if logged - # TODO USER: - Creation Date -# - Last Login -# - Last Request # - Last API Usage # - Organisation ??? # - Disabled / Lock @@ -423,7 +414,7 @@ class AILUser(UserMixin): if 'last_login' in options: meta['last_login'] = get_user_last_login(self.user_id) if 'last_seen' in options: - meta['last_seen'] = get_user_last_seen(self.user_id) + meta['last_seen'] = get_user_last_seen(self.user_id) if 'api_key' in options: # TODO add option to censor key meta['api_key'] = self.get_api_key() if 'role' in options: @@ -548,8 +539,6 @@ class AILUser(UserMixin): r_serv_db.hdel('ail:users:all', self.user_id) -# def create_user(user_id): - #### API #### def api_get_users_meta(): @@ -662,40 +651,6 @@ def api_delete_user(user_id, admin_id): # TODO LOG ADMIN ID ######################################################################################################################## ######################################################################################################################## - -def exists_user(user_id): - return r_serv_db.exists(f'ail:user:metadata:{user_id}') - -def get_user_metadata(user_id): - user_metadata = {'email': user_id, - 'role': r_serv_db.hget(f'ail:user:metadata:{user_id}', 'role'), - 'api_key': r_serv_db.hget(f'ail:user:metadata:{user_id}', 'token')} - return user_metadata - -def get_users_metadata(list_users): - users = [] - for user in list_users: - users.append(get_user_metadata(user)) - return users - -# # TODO: solve edge_case self delete -def delete_user(user_id): - if exists_user(user_id): - for role_id in get_all_roles(): - r_serv_db.srem(f'ail:users:role:{role_id}', user_id) - user_token = get_user_token(user_id) - if user_token: - r_serv_db.hdel('ail:users:tokens', user_token) - r_serv_db.delete(f'ail:user:metadata:{user_id}') - r_serv_db.hdel('ail:users:all', user_id) - r_serv_db.srem(f'ail:users:disabled', user_id) - - # # TODO: raise Exception - else: - print(f'Error: user {user_id} do not exist') - -## --USERS-- ## - #### ROLES #### def get_all_roles(): @@ -724,7 +679,7 @@ def get_all_user_upper_role(user_role): current_role_val = get_role_level(user_role) # remove one rank if current_role_val > 1: - return r_serv_db.zrange('ail:roles:all', 0, current_role_val -2) + return r_serv_db.zrange('ail:roles:all', 0, current_role_val - 2) else: return [] @@ -771,4 +726,4 @@ def check_user_role_integrity(user_id): # instance_name = 'AIL TEST' # delete_user_otp(user_id) # # q = get_user_otp_qr_code(user_id, instance_name) -# # print(q) \ No newline at end of file +# # print(q) diff --git a/tests/test_api.py b/tests/test_api.py index dc1a6860..5f8a7a67 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -11,21 +11,26 @@ sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## -from lib import Users +from lib import ail_users -sys.path.append(os.environ['AIL_FLASK']) -sys.path.append(os.path.join(os.environ['AIL_FLASK'], 'modules')) +from lib.ConfigLoader import ConfigLoader class TestApiV1(unittest.TestCase): def setUp(self): - # TODO GET HOST + PORT - self.ail = PyAIL('https://localhost:7000', Users.get_user_token('admin@admin.test'), ssl=False) + config = ConfigLoader() + port = config.get_config_str('Flask', 'port') + self.ail = PyAIL(f'https://localhost:{port}', ail_users.get_user_token('admin@admin.test'), ssl=False) # GET /api/v1/ping def test_0001_api_ping(self): r = self.ail.ping_ail() self.assertEqual(r.get('status'), 'pong') + print() + print('----------------------------------------------------') + print(' AIL successfully reached Flask / Web interface') + print('----------------------------------------------------') + print() # # GET /api/v1/uuid # def test_0001_api_uuid(self): diff --git a/update/v5.0/DB_KVROCKS_MIGRATION.py b/update/v5.0/DB_KVROCKS_MIGRATION.py index c33247a0..81cf8484 100755 --- a/update/v5.0/DB_KVROCKS_MIGRATION.py +++ b/update/v5.0/DB_KVROCKS_MIGRATION.py @@ -15,7 +15,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from lib.ConfigLoader import ConfigLoader from lib import Tag -from lib import Users +from lib import ail_users from lib.objects import Decodeds from lib.objects import Domains from lib.objects import Items @@ -111,7 +111,7 @@ def user_migration(): print('USER MIGRATION...') # create role_list - Users._create_roles_list() + ail_users._create_roles_list() for user_id in r_serv_db.hkeys('user:all'): role = r_serv_db.hget(f'user_metadata:{user_id}', 'role') @@ -121,11 +121,8 @@ def user_migration(): if not chg_passwd: chg_passwd = None - Users.create_user(user_id, password=None, chg_passwd=chg_passwd, role=role) - Users.edit_user_password(user_id, password_hash, chg_passwd=chg_passwd) - Users._delete_user_token(user_id) + ail_users.create_user(user_id, password=password_hash, chg_passwd=chg_passwd, role=role) print(user_id, token) - Users._set_user_token(user_id, token) for invite_row in r_crawler.smembers('telegram:invite_code'): r_obj.sadd('telegram:invite_code', invite_row) diff --git a/var/www/blueprints/root.py b/var/www/blueprints/root.py index ec1e5fad..df3c0161 100644 --- a/var/www/blueprints/root.py +++ b/var/www/blueprints/root.py @@ -1,15 +1,15 @@ #!/usr/bin/env python3 # -*-coding:UTF-8 -* -''' +""" Blueprint Flask: root endpoints: login, ... -''' +""" import os import sys import time -from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response +from flask import render_template, jsonify, request, Blueprint, redirect, url_for, Response from flask import session from flask_login import login_required, current_user, login_user, logout_user @@ -22,8 +22,7 @@ sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## -from lib import Users # TODO ########################################################3 -from lib.ail_users import AILUser, kill_sessions +from lib.ail_users import AILUser, kill_sessions, create_user, check_password_strength, check_user_role_integrity from lib.ConfigLoader import ConfigLoader @@ -43,11 +42,9 @@ root = Blueprint('root', __name__, template_folder='templates') # ============ VARIABLES ============ - # ============ FUNCTIONS ============ - # ============= ROUTES ============== @root.route('/login', methods=['POST', 'GET']) # TODO LOG BRUTEFORCE ATTEMPT def login(): @@ -83,7 +80,7 @@ def login(): return render_template("login.html", error=logging_error) if user.exists() and user.check_password(password): - if not Users.check_user_role_integrity(user.get_user_id()): + if not check_user_role_integrity(user.get_user_id()): logging_error = 'Incorrect User ACL, Please contact your administrator' return render_template("login.html", error=logging_error) @@ -250,11 +247,11 @@ def change_password(): if error: return render_template("change_password.html", error=error) - if current_user.is_authenticated and password1 != None: + if current_user.is_authenticated and password1 is not None: if password1 == password2: - if Users.check_password_strength(password1): + if check_password_strength(password1): user_id = current_user.get_user_id() - Users.create_user(user_id, password=password1, chg_passwd=False) + create_user(user_id, password=password1, chg_passwd=False) # TODO RENAME ME # update Note # dashboard return redirect(url_for('dashboard.index', update_note=True))