diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py index 815440d7..607bf9dd 100755 --- a/bin/core/ail_2_ail.py +++ b/bin/core/ail_2_ail.py @@ -121,6 +121,9 @@ def is_server_client_sync_mode_connected(ail_uuid, sync_mode): res = r_cache.hexists(f'ail_2_ail:server:client:{ail_uuid}', sync_mode) return res == 1 +def is_server_client_connected(ail_uuid): + return r_cache.sismember('ail_2_ail:server:all_clients', ail_uuid) + def clear_server_connected_clients(): for ail_uuid in get_server_all_connected_clients(): r_cache.delete(f'ail_2_ail:server:client:{ail_uuid}') @@ -144,6 +147,11 @@ def send_command_to_server_controller(command, ail_uuid=None): r_cache.sadd('ail_2_ail:server_controller:command', str_command) ##-- --## +def get_new_sync_client_id(): + for new_id in range(120000, 100000, -1): + new_id = str(new_id) + if not r_cache.exists(f'ail_2_ail:sync_client:{new_id}'): + return str(new_id) def get_all_sync_clients(r_set=False): res = r_cache.smembers('ail_2_ail:all_sync_clients') @@ -155,11 +163,38 @@ def get_all_sync_clients(r_set=False): def get_sync_client_ail_uuid(client_id): return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid') +def get_sync_client_sync_mode(client_id): + return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'sync_mode') + +def set_sync_client_sync_mode(client_id, sync_mode): + r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'sync_mode', sync_mode) + +def create_sync_client_cache(ail_uuid, sync_mode, client_id=None): + if client_id is None: + client_id = get_new_sync_client_id() + # save sync client status + r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid', ail_uuid) + r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'launch_time', int(time.time())) + set_sync_client_sync_mode(client_id, sync_mode) + + r_cache.sadd('ail_2_ail:all_sync_clients', client_id) + + # create map ail_uuid/queue_uuid + r_cache.sadd(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) + return client_id + # current: only one push registred -def get_client_id_by_ail_uuid(ail_uuid): +def get_client_id_by_ail_uuid(ail_uuid, filter_push=True): res = r_cache.smembers(f'ail_2_ail:ail_uuid:{ail_uuid}') - if res: - return int(res.pop()) + if not filter_push: + return res + else: + clients_id = [] + for client_id in res: + client_id = int(client_id) + if client_id <= 100000: + clients_id.append(client_id) + return clients_id def get_all_running_sync_servers(): running_ail_servers= [] @@ -168,11 +203,18 @@ def get_all_running_sync_servers(): running_ail_servers.append(ail_uuid) return running_ail_servers +def get_ail_instance_all_running_sync_mode(ail_uuid): + clients_id = get_client_id_by_ail_uuid(ail_uuid, filter_push=False) + running_sync_mode = {'api': False, 'pull': False, 'push': False} + for client_id in clients_id: + sync_mode = get_sync_client_sync_mode(client_id) + running_sync_mode[sync_mode] = True + return running_sync_mode + def delete_sync_client_cache(client_id): ail_uuid = get_sync_client_ail_uuid(client_id) - # map ail_uuid/queue_uuid + # map ail_uuid r_cache.srem(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) - r_cache.srem(f'ail_2_ail:queue_uuid:{queue_uuid}', client_id) r_cache.delete(f'ail_2_ail:sync_client:{client_id}') r_cache.srem('ail_2_ail:all_sync_clients', client_id) @@ -193,9 +235,10 @@ def send_command_to_manager(command, client_id=-1, ail_uuid=None): str_command = json.dumps(dict_action) r_cache.sadd('ail_2_ail:client_manager:command', str_command) - def refresh_ail_instance_connection(ail_uuid): - client_id = get_client_id_by_ail_uuid(ail_uuid) + clients_id = get_client_id_by_ail_uuid(ail_uuid) + if clients_id: + clients_id = clients_id[0] launch_required = is_ail_instance_push_enabled(ail_uuid) # relaunch @@ -253,6 +296,7 @@ class AIL2AILClientManager(object): def launch_sync_client(self, ail_uuid): dir_project = os.environ['AIL_HOME'] + sync_mode = 'push' client_id = self.get_new_sync_client_id() script_options = f'-u {ail_uuid} -m push -i {client_id}' screen.create_screen(AIL2AILClientManager.SCREEN_NAME) @@ -262,13 +306,7 @@ class AIL2AILClientManager(object): AIL2AILClientManager.SCRIPT_NAME, script_options=script_options, kill_previous_windows=True) # save sync client status - r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid', ail_uuid) - r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'launch_time', int(time.time())) - - r_cache.sadd('ail_2_ail:all_sync_clients', client_id) - - # create map ail_uuid/queue_uuid - r_cache.sadd(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id) + create_sync_client_cache(ail_uuid, sync_mode, client_id=client_id) self.clients[client_id] = {'ail_uuid': ail_uuid} @@ -439,7 +477,7 @@ def get_ail_server_error(ail_uuid): return r_cache.hget(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error') # # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) -def get_ail_instance_metadata(ail_uuid, client_sync_mode=False, sync_queues=False): +def get_ail_instance_metadata(ail_uuid, client_sync_mode=False, server_sync_mode=False, sync_queues=False): dict_meta = {} dict_meta['uuid'] = ail_uuid dict_meta['url'] = get_ail_instance_url(ail_uuid) @@ -462,6 +500,9 @@ def get_ail_instance_metadata(ail_uuid, client_sync_mode=False, sync_queues=Fals dict_meta['client_sync_mode']['push'] = is_server_client_sync_mode_connected(ail_uuid, 'push') dict_meta['client_sync_mode']['api'] = is_server_client_sync_mode_connected(ail_uuid, 'api') + if server_sync_mode: + dict_meta['server_sync_mode'] = get_ail_instance_all_running_sync_mode(ail_uuid) + return dict_meta def get_all_ail_instances_metadata(): @@ -470,10 +511,11 @@ def get_all_ail_instances_metadata(): l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) return l_servers -def get_ail_instances_metadata(l_ail_servers, sync_queues=True, client_sync_mode=False): +def get_ail_instances_metadata(l_ail_servers, sync_queues=True, client_sync_mode=False, server_sync_mode=False): l_servers = [] for ail_uuid in l_ail_servers: - server_metadata = get_ail_instance_metadata(ail_uuid, sync_queues=sync_queues, client_sync_mode=client_sync_mode) + server_metadata = get_ail_instance_metadata(ail_uuid, sync_queues=sync_queues, + client_sync_mode=client_sync_mode, server_sync_mode=server_sync_mode) l_servers.append(server_metadata) return l_servers @@ -644,6 +686,65 @@ def api_get_remote_ail_server_version(json_dict): res = get_remote_ail_server_version(ail_uuid) return res, 200 +def api_kill_server_connected_clients(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + if not is_server_client_connected(ail_uuid): + return {"status": "error", "reason": "Client not connected"}, 400 + + res = send_command_to_server_controller('kill', ail_uuid=ail_uuid) + return res, 200 + +def api_kill_sync_client(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + clients_id = get_client_id_by_ail_uuid(ail_uuid) + if not clients_id: + return {"status": "error", "reason": "Client not connected"}, 400 + + for client_id in clients_id: + res = send_command_to_manager('kill', client_id=client_id, ail_uuid=ail_uuid) + return res, 200 + +def api_launch_sync_client(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + clients_id = get_client_id_by_ail_uuid(ail_uuid) + if clients_id: + return {"status": "error", "reason": "Client already connected"}, 400 + + res = send_command_to_manager('launch', ail_uuid=ail_uuid) + return res, 200 + +def api_relaunch_sync_client(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + clients_id = get_client_id_by_ail_uuid(ail_uuid) + if not clients_id: + return {"status": "error", "reason": "Client not connected"}, 400 + for client_id in clients_id: + res = send_command_to_manager('relaunch', client_id=client_id, ail_uuid=ail_uuid) + return res, 200 + def api_create_ail_instance(json_dict): ail_uuid = json_dict.get('uuid').replace(' ', '') if not is_valid_uuid_v4(ail_uuid): diff --git a/bin/core/ail_2_ail_client.py b/bin/core/ail_2_ail_client.py index 6308cab4..3e7e5267 100755 --- a/bin/core/ail_2_ail_client.py +++ b/bin/core/ail_2_ail_client.py @@ -70,7 +70,7 @@ async def push(websocket, ail_uuid): else: await asyncio.sleep(10) -async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): +async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None, client_id=None): if not ail_2_ail.exists_ail_instance(ail_uuid): print('AIL server not found') return @@ -88,7 +88,8 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): uri = f"{ail_url}/{sync_mode}/{local_ail_uuid}" #print(uri) - ail_2_ail.clear_save_ail_server_error(ail_uuid) + if client_id is None: + client_id = ail_2_ail.create_sync_client_cache(ail_uuid, sync_mode) try: async with websockets.client.connect( @@ -97,6 +98,8 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): #open_timeout=10, websockers 10.0 /!\ python>=3.7 extra_headers={"Authorization": f"{ail_key}"} ) as websocket: + # success + ail_2_ail.clear_save_ail_server_error(ail_uuid) if sync_mode == 'pull': await pull(websocket, ail_uuid) @@ -155,6 +158,8 @@ async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): redis_logger.critical(f'{ail_uuid}: {error_message}') ail_2_ail.save_ail_server_error(ail_uuid, error_message) + ail_2_ail.delete_sync_client_cache(client_id) + if __name__ == '__main__': parser = argparse.ArgumentParser(description='Websocket SYNC Client') @@ -168,6 +173,7 @@ if __name__ == '__main__': ail_uuid = args.ail_uuid sync_mode = args.sync_mode api = args.api + client_id = args.client_id if ail_uuid is None or sync_mode not in ['api', 'pull', 'push']: parser.print_help() @@ -184,4 +190,4 @@ if __name__ == '__main__': ssl_context.verify_mode = ssl.CERT_NONE # SELF SIGNED CERTIFICATES - asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode, api)) + asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode, api, client_id=client_id)) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py index 6ebbda6e..2ac8e546 100755 --- a/bin/core/ail_2_ail_server.py +++ b/bin/core/ail_2_ail_server.py @@ -73,7 +73,7 @@ async def server_controller(): ail_uuid = command_dict.get('ail_uuid') connected_clients = CONNECTED_CLIENTS[ail_uuid].copy() for c_websocket in connected_clients: - await c_websocket.close() + await c_websocket.close(code=1000) redis_logger.info(f'Server Command Connection closed: {ail_uuid}') print(f'Server Command Connection closed: {ail_uuid}') diff --git a/var/www/blueprints/ail_2_ail_sync.py b/var/www/blueprints/ail_2_ail_sync.py index c85230d9..037f8e5f 100644 --- a/var/www/blueprints/ail_2_ail_sync.py +++ b/var/www/blueprints/ail_2_ail_sync.py @@ -55,7 +55,7 @@ def create_json_response(data, status_code): def ail_2_ail_dashboard(): ail_uuid = ail_2_ail.get_ail_uuid() l_servers = ail_2_ail.get_all_running_sync_servers() - l_servers = ail_2_ail.get_ail_instances_metadata(l_servers) + l_servers = ail_2_ail.get_ail_instances_metadata(l_servers, server_sync_mode= True) l_clients = ail_2_ail.get_server_all_connected_clients() l_clients = ail_2_ail.get_ail_instances_metadata(l_clients, sync_queues=False, client_sync_mode=True) return render_template("ail_2_ail_dashboard.html", ail_uuid=ail_uuid, @@ -79,7 +79,7 @@ def ail_servers(): @login_admin def ail_server_view(): ail_uuid = request.args.get('uuid') - server_metadata = ail_2_ail.get_ail_instance_metadata(ail_uuid,sync_queues=True) + server_metadata = ail_2_ail.get_ail_instance_metadata(ail_uuid, client_sync_mode=True, server_sync_mode=True, sync_queues=True) server_metadata['sync_queues'] = ail_2_ail.get_queues_metadata(server_metadata['sync_queues']) return render_template("view_ail_server.html", server_metadata=server_metadata, @@ -107,6 +107,50 @@ def ail_server_api_version(): return create_json_response(res[0], res[1]) return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=ail_uuid)) +@ail_2_ail_sync.route('/settings/ail_2_ail/server/client/kill', methods=['GET']) +@login_required +@login_admin +def ail_server_client_kill(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_kill_server_connected_clients(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_2_ail_dashboard')) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/sync_client/kill', methods=['GET']) +@login_required +@login_admin +def ail_server_sync_client_kill(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_kill_sync_client(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_2_ail_dashboard')) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/sync_client/relaunch', methods=['GET']) +@login_required +@login_admin +def ail_server_sync_client_relaunch(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_relaunch_sync_client(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_2_ail_dashboard')) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/sync_client/launch', methods=['GET']) +@login_required +@login_admin +def ail_server_sync_client_launch(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_launch_sync_client(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_2_ail_dashboard')) + @ail_2_ail_sync.route('/settings/ail_2_ail/server/add', methods=['GET', 'POST']) @login_required @login_admin diff --git a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html index eca4d874..036441dc 100644 --- a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html +++ b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html @@ -51,7 +51,7 @@ -

Connected Servers:

+

Connected - AIL Sync:

@@ -60,6 +60,8 @@ + + @@ -77,15 +79,40 @@ {{queue_uuid}}
- {% endfor %} + {% endfor %} + + {% endfor %}
url description sync queuessync mode
+ {% if dict_server['server_sync_mode']['api'] %} + API + {% endif %} + {% if dict_server['server_sync_mode']['pull'] %} + PULL + {% endif %} + {% if dict_server['server_sync_mode']['push'] %} + PUSH + {% endif %} + + {% if dict_server['server_sync_mode']['push'] %} + + + + + + + {% endif %} +
-

Connected Clients:

+

Connected - Remote Sync:

@@ -94,6 +121,7 @@ + @@ -109,14 +137,21 @@ + {% endfor %} diff --git a/var/www/templates/ail_2_ail/view_ail_server.html b/var/www/templates/ail_2_ail/view_ail_server.html index d4d75e67..31461acf 100644 --- a/var/www/templates/ail_2_ail/view_ail_server.html +++ b/var/www/templates/ail_2_ail/view_ail_server.html @@ -125,6 +125,61 @@ + + + + + + + +
url description sync mode
{% if dict_server['client_sync_mode']['api'] %} API - {% endif %} + {% endif %} {% if dict_server['client_sync_mode']['pull'] %} PULL - {% endif %} + {% endif %} {% if dict_server['client_sync_mode']['push'] %} PUSH - {% endif %} + {% endif %} + + + +
AIL Sync + {% if server_metadata['server_sync_mode']['api'] %} + API + {% endif %} + {% if server_metadata['server_sync_mode']['pull'] %} + PULL + {% endif %} + {% if server_metadata['server_sync_mode']['push'] %} + PUSH + + + + + + + + {% else %} + {% if (server_metadata['pull'] or server_metadata['push']) and server_metadata['sync_queues']%} + + + + {% endif %} + {% endif %} +
Remote Sync + {% if server_metadata['client_sync_mode']['api'] %} + API + {% endif %} + {% if server_metadata['client_sync_mode']['pull'] %} + PULL + {% endif %} + {% if server_metadata['client_sync_mode']['push'] %} + PUSH + {% endif %} + + {% if server_metadata['client_sync_mode']['api'] or server_metadata['client_sync_mode']['pull'] or server_metadata['client_sync_mode']['push'] %} + + + + {% endif %} +