From bbfba0d6e48de6ae6422af580efabd3a511b6a0b Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Tue, 2 Jul 2024 11:41:17 +0200 Subject: [PATCH] new: [backend] Added debounce function to throttle exercise checks --- exercise.py | 39 +++++++++++++++++++++++++++++++++++++-- misp_api.py | 4 ++-- server.py | 34 +++++++++++++++++++++++++++++++--- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/exercise.py b/exercise.py index 2ff8e05..8e5984b 100644 --- a/exercise.py +++ b/exercise.py @@ -1,10 +1,14 @@ #!/usr/bin/env python3 + +import functools +import time from collections import defaultdict from pathlib import Path import json import re from typing import Union +import jq import db from inject_evaluator import eval_data_filtering, eval_query_comparison import misp_api @@ -12,6 +16,26 @@ import config ACTIVE_EXERCISES_DIR = "active_exercises" +def debounce_check_active_tasks(debounce_seconds: int = 1): + func_last_execution_time = {} + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + user_id = args[0] + now = time.time() + key = f"{user_id}_{func.__name__}" + if key not in func_last_execution_time: + func_last_execution_time[key] = now + return func(*args, **kwargs) + elif now >= func_last_execution_time[key] + debounce_seconds: + func_last_execution_time[key] = now + return func(*args, **kwargs) + else: + print(f">> Debounced for `{user_id}`") + return None + return wrapper + return decorator + def load_exercises() -> bool: db.ALL_EXERCISES = read_exercise_dir() @@ -54,6 +78,17 @@ def is_validate_exercises(exercises: list) -> bool: return False tasks_uuid.add(t_uuid) task_by_uuid[t_uuid] = inject + + for inject_evaluation in inject.get('inject_evaluation', []): + if inject_evaluation.get('evaluation_strategy', None) == 'data_filtering': + for evaluation in inject_evaluation.get('parameters', []): + jq_path = list(evaluation.keys())[0] + try: + jq.compile(jq_path) + except ValueError as e: + print(f"[{t_uuid} :: {inject['name']}] Could not compile jq path `{jq_path}`\n", e) + return False + return True @@ -382,15 +417,15 @@ def fetch_data_for_query_comparison(user_id: int, inject_evaluation: dict, perfo return data +@debounce_check_active_tasks(debounce_seconds=5) def check_active_tasks(user_id: int, data: dict, context: dict) -> bool: succeeded_once = False available_tasks = get_available_tasks_for_user(user_id) for task_uuid in available_tasks: inject = db.INJECT_BY_UUID[task_uuid] if inject['exercise_uuid'] not in db.SELECTED_EXERCISES: - print(f"exercise not active for this inject {inject['name']}") continue - print(f"checking: {inject['name']}") + print(f"[{task_uuid}] :: checking: {inject['name']}") completed = check_inject(user_id, inject, data, context) if completed: succeeded_once = True diff --git a/misp_api.py b/misp_api.py index 457f188..3c8dee2 100644 --- a/misp_api.py +++ b/misp_api.py @@ -22,7 +22,7 @@ def get(url, data={}, api_key=misp_apikey): try: response = requests.get(full_url, data=data, headers=headers, verify=not misp_skipssl) except requests.exceptions.ConnectionError as e: - print(e) + print('Could not perform request on MISP.', e) return None return response.json() if response.headers['content-type'].startswith('application/json') else response.text @@ -38,7 +38,7 @@ def post(url, data={}, api_key=misp_apikey): try: response = requests.post(full_url, data=json.dumps(data), headers=headers, verify=not misp_skipssl) except requests.exceptions.ConnectionError as e: - print(e) + print('Could not perform request on MISP.', e) return None return response.json() if response.headers['content-type'].startswith('application/json') else response.text diff --git a/server.py b/server.py index cc9cfe6..1ff756c 100755 --- a/server.py +++ b/server.py @@ -1,7 +1,9 @@ #!/usr/bin/env python3 +import functools import json import sys +import time import zmq import socketio import eventlet @@ -18,6 +20,26 @@ import misp_api ZMQ_MESSAGE_COUNT = 0 +def debounce(debounce_seconds: int = 1): + func_last_execution_time = {} + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + now = time.time() + key = func.__name__ + if key not in func_last_execution_time: + func_last_execution_time[key] = now + return func(*args, **kwargs) + elif now >= func_last_execution_time[key] + debounce_seconds: + func_last_execution_time[key] = now + return func(*args, **kwargs) + else: + return None + return wrapper + return decorator + + + # Initialize ZeroMQ context and subscriber socket context = gzmq.Context() zsocket = context.socket(gzmq.SUB) @@ -117,7 +139,12 @@ def handleMessage(topic, s, message): context = get_context(data) succeeded_once = exercise_model.check_active_tasks(user_id, data, context) if succeeded_once: - sio.emit('refresh_score') + sendRefreshScore() + + +@debounce(debounce_seconds=1) +def sendRefreshScore(): + sio.emit('refresh_score') def get_context(data: dict) -> dict: @@ -153,11 +180,12 @@ def forward_zmq_to_socketio(): while True: message = zsocket.recv_string() topic, s, m = message.partition(" ") + handleMessage(topic, s, m) try: ZMQ_MESSAGE_COUNT += 1 - handleMessage(topic, s, m) + # handleMessage(topic, s, m) except Exception as e: - print(e) + print('Error handling message', e) if __name__ == "__main__":