SkillAegis/notification.py
2024-07-18 14:51:36 +02:00

222 lines
No EOL
6.7 KiB
Python

#!/usr/bin/env python3
import json
import re
from typing import Union
import db
import config
import appConfig
from urllib.parse import parse_qs
VERBOSE_MODE = False
APIQUERY_MODE = False
NOTIFICATION_COUNT = 1
def set_verbose_mode(enabled: bool):
global VERBOSE_MODE
VERBOSE_MODE = enabled
def set_apiquery_mode(enabled: bool):
global APIQUERY_MODE
APIQUERY_MODE = enabled
def get_notifications() -> list[dict]:
return list(db.NOTIFICATION_MESSAGES)
def get_notifications_history() -> dict:
return {
'history': list(db.NOTIFICATION_HISTORY),
'config': {
'buffer_resolution_per_minute': db.NOTIFICATION_HISTORY_BUFFER_RESOLUTION_PER_MIN,
'buffer_timestamp_min': db.NOTIFICATION_HISTORY_BUFFER_TIMESPAN_MIN,
'frequency': db.NOTIFICATION_HISTORY_FREQUENCY,
'notification_history_size': db.notification_history_buffer_size,
},
}
def get_users_activity() -> dict:
return {
'activity': {user_id: list(activity) for user_id, activity in db.USER_ACTIVITY.items()},
'config': {
'timestamp_min': db.USER_ACTIVITY_TIMESPAN_MIN,
'buffer_resolution_per_minute': db.USER_ACTIVITY_BUFFER_RESOLUTION_PER_MIN,
'frequency': db.USER_ACTIVITY_FREQUENCY,
'activity_buffer_size': db.user_activity_buffer_size,
},
}
def reset_notifications():
db.resetNotificationMessage()
def record_notification(notification: dict):
db.NOTIFICATION_MESSAGES.appendleft(notification)
def record_notification_history(message_count: int):
db.NOTIFICATION_HISTORY.append(message_count)
def record_user_activity(user_id: int, count: int):
db.addUserActivity(user_id, count)
def get_user_id(data: dict):
if 'user_id' in data:
return int(data['user_id'])
if 'Log' in data:
data = data['Log']
if 'user_id' in data:
return int(data['user_id'])
if 'AuditLog' in data:
data = data['AuditLog']
if 'user_id' in data:
return int(data['user_id'])
return None
def get_user_email_id_pair(data: dict):
if 'Log' in data:
data = data['Log']
if 'email' in data and 'user_id' in data:
return (int(data['user_id']), data['email'],)
return (None, None,)
def get_user_authkey_id_pair(data: dict):
authkey_title_regex = r".*API key.*\((\w+)\)"
if 'Log' in data:
data = data['Log']
if 'user_id' in data and 'title' in data :
if data['title'].startswith('Successful authentication using API key'):
authkey_search = re.search(authkey_title_regex, data['title'], re.IGNORECASE)
if authkey_search is not None:
authkey = authkey_search.group(1)
return (int(data['user_id']), authkey,)
return (None, None,)
def is_http_request(data: dict) -> bool:
if ('url' in data and
'request_method' in data and
'response_code' in data and
'user_id' in data):
return True
return False
def get_content_type(data: dict) -> Union[None, str]:
if 'request' not in data:
return None
request_content = data['request']
content_type, _ = request_content.split('\n\n')
return content_type
def clean_form_urlencoded_data(post_body_parsed: dict) -> dict:
cleaned = {}
for k, v in post_body_parsed.items():
if k.startswith('data[') and not k.startswith('data[_'):
clean_k = '.'.join([k for k in re.split(r'[\[\]]', k) if k != ''][1:])
clean_v = v[0] if type(v) is list and len(v) == 1 else v
cleaned[clean_k] = clean_v
return cleaned
def get_request_post_body(data: dict) -> dict:
if 'request' not in data:
return {}
request_content = data['request']
content_type, post_body = request_content.split('\n\n')
if content_type == 'application/json':
post_body_parsed = json.loads(post_body) if len(post_body) > 0 else {}
return post_body_parsed
elif content_type == 'application/x-www-form-urlencoded':
post_body_parsed = parse_qs(post_body)
post_body_clean = clean_form_urlencoded_data(post_body_parsed)
return post_body_clean
return {}
def is_api_request(data: dict) -> bool:
content_type = get_content_type(data)
return content_type == 'application/json'
def get_notification_message(data: dict) -> dict:
global NOTIFICATION_COUNT
id = NOTIFICATION_COUNT
NOTIFICATION_COUNT += 1
user = db.USER_ID_TO_EMAIL_MAPPING.get(int(data['user_id']), '?')
time = data['created'].split(' ')[1].split('.')[0]
url = data['url']
http_method = data.get('request_method', 'GET')
response_code = data.get('response_code', '?')
user_agent = data.get('user_agent', '?')
_, action = get_scope_action_from_url(url)
http_method = 'DELETE' if (http_method == 'POST' or http_method == 'PUT') and action == 'delete' else http_method # small override for UI
payload = get_request_post_body(data)
return {
'id': id,
'user': user,
'time': time,
'url': url,
'http_method': http_method,
'user_agent': user_agent,
'is_api_request': is_api_request(data),
'response_code': response_code,
'payload': payload,
}
def get_scope_action_from_url(url) -> Union[str, None]:
split = url.split('/')
if len(split) > 2:
return (split[1], split[2],)
else:
return (None, None,)
def is_accepted_notification(notification) -> bool:
global VERBOSE_MODE
if notification['user_agent'] == 'SkillAegis': # Ignore message generated from this app
return False
if VERBOSE_MODE:
return True
if APIQUERY_MODE and not notification['is_api_request']:
return False
if '@' not in notification['user']: # Ignore message from system
return False
scope, action = get_scope_action_from_url(notification['url'])
if scope in appConfig.live_logs_accepted_scope:
if appConfig.live_logs_accepted_scope == '*':
return True
elif action in appConfig.live_logs_accepted_scope[scope]:
return True
return False
def is_accepted_user_activity(notification) -> bool:
global VERBOSE_MODE
if notification['user_agent'] == 'SkillAegis': # Ignore message generated from this app
return False
if '@' not in notification['user']: # Ignore message from system
return False
scope, action = get_scope_action_from_url(notification['url'])
if scope in appConfig.user_activity_accepted_scope:
if appConfig.user_activity_accepted_scope == '*':
return True
elif action in appConfig.user_activity_accepted_scope[scope]:
return True
return False