diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py index 13ea781d..815440d7 100755 --- a/bin/core/ail_2_ail.py +++ b/bin/core/ail_2_ail.py @@ -89,10 +89,62 @@ def set_last_updated_sync_config(): # # TODO: get connection status # # TODO: get connection METADATA +# # TODO: client => reconnect on fails (with timeout) + +# # TODO: API KEY change => trigger kill connection ############################# # # #### SYNC CLIENT MANAGER #### +#### ail servers connected clients #### +# +# ail_uuid => WEBSOCKETS => ail_uuid +# AIL: clients: +# - 1 push +# - 1 pull +# - N api + +def get_server_all_connected_clients(): + return r_cache.smembers('ail_2_ail:server:all_clients') + +def add_server_connected_client(ail_uuid, sync_mode): + r_cache.sadd('ail_2_ail:server:all_clients', ail_uuid) + r_cache.hset(f'ail_2_ail:server:client:{ail_uuid}', sync_mode, True) + +def remove_server_connected_client(ail_uuid, sync_mode=None, is_connected=False): + if sync_mode: + r_cache.hdel(f'ail_2_ail:server:client:{ail_uuid}', sync_mode) + if not is_connected: + r_cache.srem('ail_2_ail:server:all_clients', ail_uuid) + +def is_server_client_sync_mode_connected(ail_uuid, sync_mode): + res = r_cache.hexists(f'ail_2_ail:server:client:{ail_uuid}', sync_mode) + return res == 1 + +def clear_server_connected_clients(): + for ail_uuid in get_server_all_connected_clients(): + r_cache.delete(f'ail_2_ail:server:client:{ail_uuid}') + r_cache.delete('ail_2_ail:server:all_clients') + +def get_server_controller_command(): + res = r_cache.spop('ail_2_ail:server_controller:command') + if res: + return json.loads(res) + else: + return None + +# command: -kill +# -killall or shutdown / restart ? +## TODO: ADD command +def send_command_to_server_controller(command, ail_uuid=None): + dict_action = {'command': command, 'ail_uuid': ail_uuid} + if ail_uuid: + dict_action['ail_uuid'] = ail_uuid + str_command = json.dumps(dict_action) + r_cache.sadd('ail_2_ail:server_controller:command', str_command) + +##-- --## + def get_all_sync_clients(r_set=False): res = r_cache.smembers('ail_2_ail:all_sync_clients') if r_set: @@ -387,7 +439,7 @@ def get_ail_server_error(ail_uuid): return r_cache.hget(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error') # # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) -def get_ail_instance_metadata(ail_uuid, sync_queues=False): +def get_ail_instance_metadata(ail_uuid, client_sync_mode=False, sync_queues=False): dict_meta = {} dict_meta['uuid'] = ail_uuid dict_meta['url'] = get_ail_instance_url(ail_uuid) @@ -404,8 +456,11 @@ def get_ail_instance_metadata(ail_uuid, sync_queues=False): if sync_queues: dict_meta['sync_queues'] = get_ail_instance_all_sync_queue(ail_uuid) - # # TODO: - # - set UUID sync_queue + if client_sync_mode: + dict_meta['client_sync_mode'] = {} + dict_meta['client_sync_mode']['pull'] = is_server_client_sync_mode_connected(ail_uuid, 'pull') + dict_meta['client_sync_mode']['push'] = is_server_client_sync_mode_connected(ail_uuid, 'push') + dict_meta['client_sync_mode']['api'] = is_server_client_sync_mode_connected(ail_uuid, 'api') return dict_meta @@ -415,10 +470,11 @@ def get_all_ail_instances_metadata(): l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) return l_servers -def get_ail_instances_metadata(l_ail_servers): +def get_ail_instances_metadata(l_ail_servers, sync_queues=True, client_sync_mode=False): l_servers = [] for ail_uuid in l_ail_servers: - l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) + server_metadata = get_ail_instance_metadata(ail_uuid, sync_queues=sync_queues, client_sync_mode=client_sync_mode) + l_servers.append(server_metadata) return l_servers def edit_ail_instance_key(ail_uuid, new_key): @@ -1022,12 +1078,12 @@ if __name__ == '__main__': # res = get_all_unregistred_queue_by_ail_instance(ail_uuid) ail_uuid = 'c3c2f3ef-ca53-4ff6-8317-51169b73f731' - ail_uuid = '2dfeff47-777d-4e70-8c30-07c059307e6a' + #ail_uuid = '2dfeff47-777d-4e70-8c30-07c059307e6a' # res = ping_remote_ail_server(ail_uuid) # print(res) # - res = ping_remote_ail_server(ail_uuid) + res = send_command_to_server_controller('kill', ail_uuid=ail_uuid) #res = _get_remote_ail_server_response(ail_uuid, 'pin') print(res) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py index c0a0dd21..6ebbda6e 100755 --- a/bin/core/ail_2_ail_server.py +++ b/bin/core/ail_2_ail_server.py @@ -26,7 +26,7 @@ redis_logger.channel = 'Sync' ############################# -CONNECTED_CLIENT = set() +CONNECTED_CLIENTS = {} # # TODO: Store in redis ############################# @@ -63,23 +63,59 @@ def unpack_path(path): # # # # # # # +# # TODO: ADD more commands +async def server_controller(): + while True: + command_dict = ail_2_ail.get_server_controller_command() + if command_dict: + command = command_dict.get('command') + if command == 'kill': + ail_uuid = command_dict.get('ail_uuid') + connected_clients = CONNECTED_CLIENTS[ail_uuid].copy() + for c_websocket in connected_clients: + await c_websocket.close() + redis_logger.info(f'Server Command Connection closed: {ail_uuid}') + print(f'Server Command Connection closed: {ail_uuid}') -# async def send_object(): -# if CONNECTED_CLIENT: -# message = 'new json object {"id": "test01"}' -# await asyncio.wait([user.send(message) for user in USERS]) + await asyncio.sleep(10) +# # # # # # # async def register(websocket): ail_uuid = websocket.ail_uuid remote_address = websocket.remote_address + sync_mode = websocket.sync_mode redis_logger.info(f'Client Connected: {ail_uuid} {remote_address}') print(f'Client Connected: {ail_uuid} {remote_address}') - CONNECTED_CLIENT.add(websocket) - #print(CONNECTED_CLIENT) + + if not ail_uuid in CONNECTED_CLIENTS: + CONNECTED_CLIENTS[ail_uuid] = set() + CONNECTED_CLIENTS[ail_uuid].add(websocket) + ail_2_ail.add_server_connected_client(ail_uuid, sync_mode) + + print('Register client') + print(CONNECTED_CLIENTS) + print() async def unregister(websocket): - CONNECTED_CLIENT.remove(websocket) + ail_uuid = websocket.ail_uuid + sync_mode = websocket.sync_mode + CONNECTED_CLIENTS[ail_uuid].remove(websocket) + connected_clients = CONNECTED_CLIENTS[ail_uuid].copy() + for c_websocket in connected_clients: + if c_websocket.sync_mode == sync_mode: + sync_mode = None + break + if not CONNECTED_CLIENTS[ail_uuid]: + is_connected = False + CONNECTED_CLIENTS.pop(ail_uuid) + else: + is_connected = True + ail_2_ail.remove_server_connected_client(ail_uuid, sync_mode=sync_mode, is_connected=is_connected) + + print('Unregister client') + print(CONNECTED_CLIENTS) + print() # PULL: Send data to client # # TODO: ADD TIMEOUT ??? @@ -218,8 +254,9 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): self.ail_key = api_key self.ail_uuid = ail_uuid + self.sync_mode = dict_path['sync_mode'] - if dict_path['sync_mode'] == 'pull' or dict_path['sync_mode'] == 'push': + if self.sync_mode == 'pull' or self.sync_mode == 'push': # QUEUE UUID # if dict_path['queue_uuid']: @@ -237,14 +274,14 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): # return http.HTTPStatus.FORBIDDEN, [], b"UUID not found\n" # SYNC MODE - if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=dict_path['sync_mode']): - sync_mode = dict_path['sync_mode'] + if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=self.sync_mode): + sync_mode = self.sync_mode redis_logger.warning(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}') print(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}') return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n" # # TODO: CHECK API - elif dict_path['sync_mode'] == 'api': + elif self.sync_mode == 'api': pass else: @@ -252,11 +289,9 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): redis_logger.info(f'Invalid path: {self.remote_address}') return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" - ########################################### # # TODO: clean shutdown / kill all connections -# # TODO: API # # TODO: Filter object # # TODO: IP/uuid to block @@ -268,6 +303,8 @@ if __name__ == '__main__': print('Launching Server...') redis_logger.info('Launching Server...') + ail_2_ail.clear_server_connected_clients() + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) cert_dir = os.environ['AIL_FLASK'] ssl_context.load_cert_chain(certfile=os.path.join(cert_dir, 'server.crt'), keyfile=os.path.join(cert_dir, 'server.key')) @@ -277,5 +314,9 @@ if __name__ == '__main__': print(f'Server Launched: wss://{host}:{port}') redis_logger.info(f'Server Launched: wss://{host}:{port}') - asyncio.get_event_loop().run_until_complete(start_server) - asyncio.get_event_loop().run_forever() + loop = asyncio.get_event_loop() + # server command + loop.create_task(server_controller()) + # websockets server + loop.run_until_complete(start_server) + loop.run_forever() diff --git a/var/www/blueprints/ail_2_ail_sync.py b/var/www/blueprints/ail_2_ail_sync.py index cd285018..c85230d9 100644 --- a/var/www/blueprints/ail_2_ail_sync.py +++ b/var/www/blueprints/ail_2_ail_sync.py @@ -56,8 +56,11 @@ def ail_2_ail_dashboard(): ail_uuid = ail_2_ail.get_ail_uuid() l_servers = ail_2_ail.get_all_running_sync_servers() l_servers = ail_2_ail.get_ail_instances_metadata(l_servers) + l_clients = ail_2_ail.get_server_all_connected_clients() + l_clients = ail_2_ail.get_ail_instances_metadata(l_clients, sync_queues=False, client_sync_mode=True) return render_template("ail_2_ail_dashboard.html", ail_uuid=ail_uuid, - l_servers=l_servers) + l_servers=l_servers, + l_clients=l_clients) ###################### # # diff --git a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html index d407bc91..eca4d874 100644 --- a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html +++ b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html @@ -84,6 +84,44 @@ + +

Connected Clients:

+ + + + + + + + + + + + {% for dict_server in l_clients %} + + + + + + + {% endfor %} + +
uuidurldescriptionsync mode
+ + {{ dict_server['uuid']}} + + {{ dict_server['url']}}{{ dict_server['description']}} + {% if dict_server['client_sync_mode']['api'] %} + API + {% endif %} + {% if dict_server['client_sync_mode']['pull'] %} + PULL + {% endif %} + {% if dict_server['client_sync_mode']['push'] %} + PUSH + {% endif %} +
+