chg: [access logs] add user_agent

This commit is contained in:
terrtia 2024-09-06 15:04:28 +02:00
parent 8e502e299b
commit ca13a33472
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
2 changed files with 15 additions and 15 deletions

View file

@ -34,7 +34,7 @@ def get_access_config(create=False):
logger = logging.getLogger('access.log') logger = logging.getLogger('access.log')
if create: if create:
formatter = logging.Formatter('%(asctime)s - %(ip_address)s - %(levelname)s - %(user_id)s - %(message)s') formatter = logging.Formatter('%(asctime)s - %(ip_address)s - %(user_agent)s - %(levelname)s - %(user_id)s - %(message)s')
# STDOUT # STDOUT
handler = logging.StreamHandler() handler = logging.StreamHandler()

View file

@ -65,7 +65,7 @@ def login():
username = request.form.get('username') username = request.form.get('username')
if not username: if not username:
username = '' username = ''
access_logger.warning(f'Brute Force', extra={'user_id': username, 'ip_address': current_ip}) access_logger.warning(f'Brute Force', extra={'user_id': username, 'ip_address': current_ip, 'user_agent': request.user_agent})
logging_error = f'Max Connection Attempts reached, Please wait {wait_time}s' logging_error = f'Max Connection Attempts reached, Please wait {wait_time}s'
return render_template("login.html", error=logging_error) return render_template("login.html", error=logging_error)
@ -86,14 +86,14 @@ def login():
login_failed_user_id = int(login_failed_user_id) login_failed_user_id = int(login_failed_user_id)
if login_failed_user_id >= 5: if login_failed_user_id >= 5:
wait_time = r_cache.ttl(f'failed_login_user_id:{username}') wait_time = r_cache.ttl(f'failed_login_user_id:{username}')
access_logger.warning(f'Max login attempts reached', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) access_logger.warning(f'Max login attempts reached', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent})
logging_error = f'Max Connection Attempts reached, Please wait {wait_time}s' logging_error = f'Max Connection Attempts reached, Please wait {wait_time}s'
return render_template("login.html", error=logging_error) return render_template("login.html", error=logging_error)
if user.exists() and user.check_password(password): if user.exists() and user.check_password(password):
if not check_user_role_integrity(user.get_user_id()): if not check_user_role_integrity(user.get_user_id()):
logging_error = 'Incorrect User ACL, Please contact your administrator' logging_error = 'Incorrect User ACL, Please contact your administrator'
access_logger.info(f'Login fail: Invalid ACL', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) access_logger.info(f'Login fail: Invalid ACL', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent})
return render_template("login.html", error=logging_error) return render_template("login.html", error=logging_error)
if user.is_2fa_enabled(): if user.is_2fa_enabled():
@ -104,7 +104,7 @@ def login():
if not user.is_2fa_setup(): if not user.is_2fa_setup():
return redirect(url_for('root.setup_2fa')) return redirect(url_for('root.setup_2fa'))
else: else:
access_logger.info(f'First Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) access_logger.info(f'First Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent})
if next_page and next_page != 'None' and next_page != '/': if next_page and next_page != 'None' and next_page != '/':
return redirect(url_for('root.verify_2fa', next=next_page)) return redirect(url_for('root.verify_2fa', next=next_page))
else: else:
@ -115,7 +115,7 @@ def login():
user.rotate_session() user.rotate_session()
login_user(user) login_user(user)
user.update_last_login() user.update_last_login()
access_logger.info(f'Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip}) access_logger.info(f'Login', extra={'user_id': user.get_user_id(), 'ip_address': current_ip, 'user_agent': request.user_agent})
if user.request_password_change(): if user.request_password_change():
return redirect(url_for('root.change_password')) return redirect(url_for('root.change_password'))
@ -138,7 +138,7 @@ def login():
r_cache.expire(f'failed_login_user_id:{username}', 300) r_cache.expire(f'failed_login_user_id:{username}', 300)
# #
access_logger.info(f'Login Failed', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) access_logger.info(f'Login Failed', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
logging_error = 'Login/Password Incorrect' logging_error = 'Login/Password Incorrect'
return render_template("login.html", error=logging_error) return render_template("login.html", error=logging_error)
@ -166,7 +166,7 @@ def verify_2fa():
if otp_expire < int(time.time()): # TODO LOG if otp_expire < int(time.time()): # TODO LOG
session.pop('user_id', None) session.pop('user_id', None)
session.pop('otp_expire', None) session.pop('otp_expire', None)
access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0]}) access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
error = "First Login Expired" error = "First Login Expired"
return redirect(url_for('root.login', error=error)) return redirect(url_for('root.login', error=error))
@ -188,7 +188,7 @@ def verify_2fa():
login_user(user) login_user(user)
user.update_last_login() user.update_last_login()
access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
if user.request_password_change(): if user.request_password_change():
return redirect(url_for('root.change_password')) return redirect(url_for('root.change_password'))
@ -199,7 +199,7 @@ def verify_2fa():
return redirect(url_for('dashboard.index')) return redirect(url_for('dashboard.index'))
else: else:
htop_counter = user.get_htop_counter() htop_counter = user.get_htop_counter()
access_logger.info(f'Invalid OTP', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) access_logger.info(f'Invalid OTP', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
error = "The OTP is incorrect or has expired" error = "The OTP is incorrect or has expired"
return render_template("verify_otp.html", htop_counter=htop_counter, next_page=next_page, error=error) return render_template("verify_otp.html", htop_counter=htop_counter, next_page=next_page, error=error)
@ -220,7 +220,7 @@ def setup_2fa():
if otp_expire < int(time.time()): # TODO LOG if otp_expire < int(time.time()): # TODO LOG
session.pop('user_id', None) session.pop('user_id', None)
session.pop('otp_expire', None) session.pop('otp_expire', None)
access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0]}) access_logger.info(f'First Login Expired', extra={'user_id': user_id, 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
error = "First Login Expired" error = "First Login Expired"
return redirect(url_for('root.login', error=error)) return redirect(url_for('root.login', error=error))
@ -243,14 +243,14 @@ def setup_2fa():
login_user(user) login_user(user)
user.update_last_login() user.update_last_login()
access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) access_logger.info(f'2FA login', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
if user.request_password_change(): if user.request_password_change():
return redirect(url_for('root.change_password')) return redirect(url_for('root.change_password'))
else: else:
return redirect(url_for('dashboard.index')) return redirect(url_for('dashboard.index'))
else: else:
access_logger.info(f'OTP Invalid', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0]}) access_logger.info(f'OTP Invalid', extra={'user_id': user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
error = "The OTP is incorrect or has expired" error = "The OTP is incorrect or has expired"
return redirect(url_for('root.setup_2fa', error=error)) return redirect(url_for('root.setup_2fa', error=error))
else: else:
@ -278,7 +278,7 @@ def change_password():
res = api_change_user_self_password(user_id, password1) res = api_change_user_self_password(user_id, password1)
if res[1] != 200: if res[1] != 200:
return create_json_response(res[0], res[1]) return create_json_response(res[0], res[1])
access_logger.info(f'Password change', extra={'user_id': user_id, 'ip_address': request.access_route[0]}) access_logger.info(f'Password change', extra={'user_id': user_id, 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
# update Note # update Note
# dashboard # dashboard
return redirect(url_for('dashboard.index', update_note=True)) return redirect(url_for('dashboard.index', update_note=True))
@ -295,7 +295,7 @@ def change_password():
@root.route('/logout') @root.route('/logout')
@login_required @login_required
def logout(): def logout():
access_logger.info(f'Logout', extra={'user_id': current_user.get_user_id(), 'ip_address': request.access_route[0]}) access_logger.info(f'Logout', extra={'user_id': current_user.get_user_id(), 'ip_address': request.access_route[0], 'user_agent': request.user_agent})
current_user.kill_session() current_user.kill_session()
logout_user() logout_user()
return redirect(url_for('root.login')) return redirect(url_for('root.login'))