From caade24f8c7b00105348e3156459c7e3bd0efd01 Mon Sep 17 00:00:00 2001 From: Sami Mokaddem Date: Mon, 15 Jul 2024 10:57:06 +0200 Subject: [PATCH] fix: [backend] Fixed bugs related to message parsing and issue with wait and debounce --- exercise.py | 6 +++- inject_evaluator.py | 2 +- server.py | 68 +++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/exercise.py b/exercise.py index 60523c6..340b5fd 100644 --- a/exercise.py +++ b/exercise.py @@ -94,6 +94,9 @@ def restore_exercices_progress(): db.USER_ID_TO_EMAIL_MAPPING = {} db.USER_ID_TO_AUTHKEY_MAPPING = {} + if len(db.EXERCISES_STATUS) == 0: + init_exercises_tasks() + def is_validate_exercises(exercises: list) -> bool: exercises_uuid = set() @@ -384,6 +387,7 @@ async def inject_checker_router(user_id: int, inject_evaluation: dict, data: dic return False if 'evaluation_strategy' not in inject_evaluation: + logger.warning('Evaluation strategy not specified in inject') return False data_to_validate = await get_data_to_validate(user_id, inject_evaluation, data) @@ -437,7 +441,7 @@ def parse_event_id_from_log(data: dict) -> Union[int, None]: if 'model' in log and 'model_id' in log and log['model'] == 'Event': return int(log['model_id']) if 'change' in log: - if 'event_id' in log: + if 'event_id' in log and log['event_id'] is not None: return int(log['event_id']) return None diff --git a/inject_evaluator.py b/inject_evaluator.py index 9057d03..9f94610 100644 --- a/inject_evaluator.py +++ b/inject_evaluator.py @@ -95,7 +95,7 @@ def eval_condition_list(evaluation_config: dict, data_to_validate: str, context: if comparison_type == 'contains-regex': regex = re.compile(values[0]) for candidate in data_to_validate: - if regex.match(candidate): + if regex.match(candidate) is not None: return True return False elif comparison_type == 'count': diff --git a/server.py b/server.py index 1dff2ef..32220b2 100755 --- a/server.py +++ b/server.py @@ -5,6 +5,7 @@ import functools import json import sys import time +import traceback import zmq import socketio from aiohttp import web @@ -43,6 +44,20 @@ def debounce(debounce_seconds: int = 1): return decorator +def timer(): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + t1 = time.time() + res = func(*args, **kwargs) + elapsed = time.time() - t1 + if elapsed > 0.1: + print(elapsed) + return res + return wrapper + return decorator + + # Initialize ZeroMQ context and subscriber socket context = zmq.asyncio.Context() @@ -164,9 +179,12 @@ async def handleMessage(topic, s, message): if user_id is not None: if exercise_model.is_accepted_query(data): context = get_context(topic, user_id, data) - succeeded_once = await exercise_model.check_active_tasks(user_id, data, context) - if succeeded_once: - await sendRefreshScore() + checking_task = exercise_model.check_active_tasks(user_id, data, context) + if checking_task is not None: # Make sure check_active_tasks was not debounced + succeeded_once = await checking_task + if succeeded_once: + sendRefreshScoreTask = sendRefreshScore() + await sendRefreshScoreTask if sendRefreshScoreTask is not None else None # Make sure check_active_tasks was not debounced @debounce(debounce_seconds=1) @@ -264,8 +282,48 @@ async def forward_zmq_to_socketio(): logger.error('Error handling message %s', e) +# Function to forward zmq messages to Socket.IO +async def forward_fake_zmq_to_socketio(): + global ZMQ_MESSAGE_COUNT, ZMQ_LAST_TIME + filename = sys.argv[1] + line_number = sum(1 for _ in open(filename)) + print(f'Preparing to feed {line_number} lines..') + await sio.sleep(2) + + print('Feeding started') + line_count = 0 + last_print = time.time() + with open(filename) as f: + for line in f: + line_count += 1 + now = time.time() + if line_count % (int(line_number/100)) == 0 or (now - last_print >= 5): + last_print = now + print(f'Feeding {line_count} / {line_number} - ({100* line_count / line_number:.1f}%)') + split = line.split(' ', 1) + topic = split[0] + s = '' + m = split[1] + if topic != 'misp_json_self': + await sio.sleep(0.01) + try: + ZMQ_MESSAGE_COUNT += 1 + ZMQ_LAST_TIME = time.time() + await handleMessage(topic, s, m) + except Exception as e: + print(e) + print(line) + print(traceback.format_exc()) + logger.error('Error handling message: %s', e) + await sio.sleep(5) + + async def init_app(): - sio.start_background_task(forward_zmq_to_socketio) + if len(sys.argv) == 2: + sio.start_background_task(forward_fake_zmq_to_socketio) + else: + exercise_model.restore_exercices_progress() + sio.start_background_task(forward_zmq_to_socketio) sio.start_background_task(keepalive) sio.start_background_task(notification_history) sio.start_background_task(record_users_activity) @@ -283,6 +341,4 @@ if __name__ == "__main__": logger.critical('Could not load exercises') sys.exit(1) - exercise_model.restore_exercices_progress() - web.run_app(init_app(), host=config.server_host, port=config.server_port)