fix: [backend] Fixed bugs related to message parsing and issue with wait and debounce

This commit is contained in:
Sami Mokaddem 2024-07-15 10:57:06 +02:00
parent e3df5d715e
commit caade24f8c
3 changed files with 68 additions and 8 deletions

View file

@ -94,6 +94,9 @@ def restore_exercices_progress():
db.USER_ID_TO_EMAIL_MAPPING = {} db.USER_ID_TO_EMAIL_MAPPING = {}
db.USER_ID_TO_AUTHKEY_MAPPING = {} db.USER_ID_TO_AUTHKEY_MAPPING = {}
if len(db.EXERCISES_STATUS) == 0:
init_exercises_tasks()
def is_validate_exercises(exercises: list) -> bool: def is_validate_exercises(exercises: list) -> bool:
exercises_uuid = set() exercises_uuid = set()
@ -384,6 +387,7 @@ async def inject_checker_router(user_id: int, inject_evaluation: dict, data: dic
return False return False
if 'evaluation_strategy' not in inject_evaluation: if 'evaluation_strategy' not in inject_evaluation:
logger.warning('Evaluation strategy not specified in inject')
return False return False
data_to_validate = await get_data_to_validate(user_id, inject_evaluation, data) data_to_validate = await get_data_to_validate(user_id, inject_evaluation, data)
@ -437,7 +441,7 @@ def parse_event_id_from_log(data: dict) -> Union[int, None]:
if 'model' in log and 'model_id' in log and log['model'] == 'Event': if 'model' in log and 'model_id' in log and log['model'] == 'Event':
return int(log['model_id']) return int(log['model_id'])
if 'change' in log: if 'change' in log:
if 'event_id' in log: if 'event_id' in log and log['event_id'] is not None:
return int(log['event_id']) return int(log['event_id'])
return None return None

View file

@ -95,7 +95,7 @@ def eval_condition_list(evaluation_config: dict, data_to_validate: str, context:
if comparison_type == 'contains-regex': if comparison_type == 'contains-regex':
regex = re.compile(values[0]) regex = re.compile(values[0])
for candidate in data_to_validate: for candidate in data_to_validate:
if regex.match(candidate): if regex.match(candidate) is not None:
return True return True
return False return False
elif comparison_type == 'count': elif comparison_type == 'count':

View file

@ -5,6 +5,7 @@ import functools
import json import json
import sys import sys
import time import time
import traceback
import zmq import zmq
import socketio import socketio
from aiohttp import web from aiohttp import web
@ -43,6 +44,20 @@ def debounce(debounce_seconds: int = 1):
return decorator return decorator
def timer():
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t1 = time.time()
res = func(*args, **kwargs)
elapsed = time.time() - t1
if elapsed > 0.1:
print(elapsed)
return res
return wrapper
return decorator
# Initialize ZeroMQ context and subscriber socket # Initialize ZeroMQ context and subscriber socket
context = zmq.asyncio.Context() context = zmq.asyncio.Context()
@ -164,9 +179,12 @@ async def handleMessage(topic, s, message):
if user_id is not None: if user_id is not None:
if exercise_model.is_accepted_query(data): if exercise_model.is_accepted_query(data):
context = get_context(topic, user_id, data) context = get_context(topic, user_id, data)
succeeded_once = await exercise_model.check_active_tasks(user_id, data, context) checking_task = exercise_model.check_active_tasks(user_id, data, context)
if checking_task is not None: # Make sure check_active_tasks was not debounced
succeeded_once = await checking_task
if succeeded_once: if succeeded_once:
await sendRefreshScore() sendRefreshScoreTask = sendRefreshScore()
await sendRefreshScoreTask if sendRefreshScoreTask is not None else None # Make sure check_active_tasks was not debounced
@debounce(debounce_seconds=1) @debounce(debounce_seconds=1)
@ -264,7 +282,47 @@ async def forward_zmq_to_socketio():
logger.error('Error handling message %s', e) logger.error('Error handling message %s', e)
# Function to forward zmq messages to Socket.IO
async def forward_fake_zmq_to_socketio():
global ZMQ_MESSAGE_COUNT, ZMQ_LAST_TIME
filename = sys.argv[1]
line_number = sum(1 for _ in open(filename))
print(f'Preparing to feed {line_number} lines..')
await sio.sleep(2)
print('Feeding started')
line_count = 0
last_print = time.time()
with open(filename) as f:
for line in f:
line_count += 1
now = time.time()
if line_count % (int(line_number/100)) == 0 or (now - last_print >= 5):
last_print = now
print(f'Feeding {line_count} / {line_number} - ({100* line_count / line_number:.1f}%)')
split = line.split(' ', 1)
topic = split[0]
s = ''
m = split[1]
if topic != 'misp_json_self':
await sio.sleep(0.01)
try:
ZMQ_MESSAGE_COUNT += 1
ZMQ_LAST_TIME = time.time()
await handleMessage(topic, s, m)
except Exception as e:
print(e)
print(line)
print(traceback.format_exc())
logger.error('Error handling message: %s', e)
await sio.sleep(5)
async def init_app(): async def init_app():
if len(sys.argv) == 2:
sio.start_background_task(forward_fake_zmq_to_socketio)
else:
exercise_model.restore_exercices_progress()
sio.start_background_task(forward_zmq_to_socketio) sio.start_background_task(forward_zmq_to_socketio)
sio.start_background_task(keepalive) sio.start_background_task(keepalive)
sio.start_background_task(notification_history) sio.start_background_task(notification_history)
@ -283,6 +341,4 @@ if __name__ == "__main__":
logger.critical('Could not load exercises') logger.critical('Could not load exercises')
sys.exit(1) sys.exit(1)
exercise_model.restore_exercices_progress()
web.run_app(init_app(), host=config.server_host, port=config.server_port) web.run_app(init_app(), host=config.server_host, port=config.server_port)