chg: [backend] Usage of aiohttp in place of eventlet

This commit is contained in:
Sami Mokaddem 2024-07-02 16:48:55 +02:00
parent f236da0055
commit f191af8573
2 changed files with 47 additions and 38 deletions

View file

@ -1,6 +1,6 @@
pyzmq pyzmq
python-socketio python-socketio
eventlet aiohttp
requests requests
requests-cache requests-cache
jq jq

View file

@ -6,8 +6,8 @@ import sys
import time import time
import zmq import zmq
import socketio import socketio
import eventlet from aiohttp import web
from eventlet.green import zmq as gzmq import zmq.asyncio
import exercise as exercise_model import exercise as exercise_model
import notification as notification_model import notification as notification_model
@ -41,77 +41,82 @@ def debounce(debounce_seconds: int = 1):
# Initialize ZeroMQ context and subscriber socket # Initialize ZeroMQ context and subscriber socket
context = gzmq.Context() context = zmq.asyncio.Context()
zsocket = context.socket(gzmq.SUB) zsocket = context.socket(zmq.SUB)
zmq_url = config.zmq_url zmq_url = config.zmq_url
zsocket.connect(zmq_url) zsocket.connect(zmq_url)
zsocket.setsockopt_string(gzmq.SUBSCRIBE, '') zsocket.setsockopt_string(zmq.SUBSCRIBE, '')
# Initialize Socket.IO server # Initialize Socket.IO server
sio = socketio.Server(cors_allowed_origins='*', async_mode='eventlet') # sio = socketio.Server(cors_allowed_origins='*', async_mode='eventlet')
app = socketio.WSGIApp(sio, static_files={ sio = socketio.AsyncServer(cors_allowed_origins='*', async_mode='aiohttp')
'/': {'content_type': 'text/html', 'filename': 'dist/index.html'}, app = web.Application()
'/assets': './dist/assets', sio.attach(app)
})
async def index(request):
with open('dist/index.html') as f:
return web.Response(text=f.read(), content_type='text/html')
@sio.event @sio.event
def connect(sid, environ): async def connect(sid, environ):
logger.debug("Client connected: %s", sid) logger.debug("Client connected: %s", sid)
@sio.event @sio.event
def disconnect(sid): async def disconnect(sid):
logger.debug("Client disconnected: %s", sid) logger.debug("Client disconnected: %s", sid)
@sio.event @sio.event
def get_exercises(sid): async def get_exercises(sid):
return exercise_model.get_exercises() return exercise_model.get_exercises()
@sio.event @sio.event
def get_selected_exercises(sid): async def get_selected_exercises(sid):
return exercise_model.get_selected_exercises() return exercise_model.get_selected_exercises()
@sio.event @sio.event
def change_exercise_selection(sid, payload): async def change_exercise_selection(sid, payload):
return exercise_model.change_exercise_selection(payload['exercise_uuid'], payload['selected']) return exercise_model.change_exercise_selection(payload['exercise_uuid'], payload['selected'])
@sio.event @sio.event
def get_progress(sid): async def get_progress(sid):
return exercise_model.get_progress() return exercise_model.get_progress()
@sio.event @sio.event
def get_notifications(sid): async def get_notifications(sid):
return notification_model.get_notifications() return notification_model.get_notifications()
@sio.event @sio.event
def mark_task_completed(sid, payload): async def mark_task_completed(sid, payload):
return exercise_model.mark_task_completed(int(payload['user_id']), payload['exercise_uuid'], payload['task_uuid']) return exercise_model.mark_task_completed(int(payload['user_id']), payload['exercise_uuid'], payload['task_uuid'])
@sio.event @sio.event
def mark_task_incomplete(sid, payload): async def mark_task_incomplete(sid, payload):
return exercise_model.mark_task_incomplete(int(payload['user_id']), payload['exercise_uuid'], payload['task_uuid']) return exercise_model.mark_task_incomplete(int(payload['user_id']), payload['exercise_uuid'], payload['task_uuid'])
@sio.event @sio.event
def reset_all_exercise_progress(sid): async def reset_all_exercise_progress(sid):
return exercise_model.resetAllExerciseProgress() return exercise_model.resetAllExerciseProgress()
@sio.event @sio.event
def reset_notifications(sid): async def reset_notifications(sid):
return notification_model.reset_notifications() return notification_model.reset_notifications()
@sio.event @sio.event
def get_diagnostic(sid): async def get_diagnostic(sid):
return getDiagnostic() return getDiagnostic()
@sio.event @sio.event
def toggle_verbose_mode(sid, payload): async def toggle_verbose_mode(sid, payload):
return notification_model.set_verbose_mode(payload['verbose']) return notification_model.set_verbose_mode(payload['verbose'])
@sio.on('*') @sio.on('*')
def any_event(event, sid, data={}): async def any_event(event, sid, data={}):
logger.info('>> Unhandled event %s', event) logger.info('>> Unhandled event %s', event)
def handleMessage(topic, s, message): async def handleMessage(topic, s, message):
data = json.loads(message) data = json.loads(message)
if topic == 'misp_json_audit': if topic == 'misp_json_audit':
@ -119,7 +124,7 @@ def handleMessage(topic, s, message):
if user_id is not None and '@' in email: if user_id is not None and '@' in email:
if user_id not in db.USER_ID_TO_EMAIL_MAPPING: if user_id not in db.USER_ID_TO_EMAIL_MAPPING:
db.USER_ID_TO_EMAIL_MAPPING[user_id] = email db.USER_ID_TO_EMAIL_MAPPING[user_id] = email
sio.emit('new_user', email) await sio.emit('new_user', email)
user_id, authkey = notification_model.get_user_authkey_id_pair(data) user_id, authkey = notification_model.get_user_authkey_id_pair(data)
if user_id is not None: if user_id is not None:
@ -131,7 +136,7 @@ def handleMessage(topic, s, message):
notification = notification_model.get_notification_message(data) notification = notification_model.get_notification_message(data)
if notification_model.is_accepted_notification(notification): if notification_model.is_accepted_notification(notification):
notification_model.record_notification(notification) notification_model.record_notification(notification)
sio.emit('notification', notification) await sio.emit('notification', notification)
user_id = notification_model.get_user_id(data) user_id = notification_model.get_user_id(data)
if user_id is not None: if user_id is not None:
@ -143,8 +148,8 @@ def handleMessage(topic, s, message):
@debounce(debounce_seconds=1) @debounce(debounce_seconds=1)
def sendRefreshScore(): async def sendRefreshScore():
sio.emit('refresh_score') await sio.emit('refresh_score')
def get_context(data: dict) -> dict: def get_context(data: dict) -> dict:
@ -174,19 +179,27 @@ def getDiagnostic() -> dict:
# Function to forward zmq messages to Socket.IO # Function to forward zmq messages to Socket.IO
def forward_zmq_to_socketio(): async def forward_zmq_to_socketio():
global ZMQ_MESSAGE_COUNT global ZMQ_MESSAGE_COUNT
while True: while True:
message = zsocket.recv_string() message = await zsocket.recv_string()
topic, s, m = message.partition(" ") topic, s, m = message.partition(" ")
try: try:
ZMQ_MESSAGE_COUNT += 1 ZMQ_MESSAGE_COUNT += 1
handleMessage(topic, s, m) await handleMessage(topic, s, m)
except Exception as e: except Exception as e:
logger.error('Error handling message %s', e) logger.error('Error handling message %s', e)
async def init_app():
sio.start_background_task(forward_zmq_to_socketio)
return app
app.router.add_static('/assets', 'dist/assets')
app.router.add_get('/', index)
if __name__ == "__main__": if __name__ == "__main__":
exercises_loaded = exercise_model.load_exercises() exercises_loaded = exercise_model.load_exercises()
@ -194,8 +207,4 @@ if __name__ == "__main__":
logger.critical('Could not load exercises') logger.critical('Could not load exercises')
sys.exit(1) sys.exit(1)
# Start the forwarding in a separate thread web.run_app(init_app(), host=config.server_host, port=config.server_port)
eventlet.spawn_n(forward_zmq_to_socketio)
# Run the Socket.IO server
eventlet.wsgi.server(eventlet.listen((config.server_host, config.server_port)), app)