chg: [ail sync server] add server controller + list connected clients ail_uuid->sync_modes

This commit is contained in:
Terrtia 2021-11-29 16:18:47 +01:00
parent a1768e50da
commit 57f5afe831
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
4 changed files with 162 additions and 24 deletions

View file

@ -89,10 +89,62 @@ def set_last_updated_sync_config():
# # TODO: get connection status # # TODO: get connection status
# # TODO: get connection METADATA # # TODO: get connection METADATA
# # TODO: client => reconnect on fails (with timeout)
# # TODO: API KEY change => trigger kill connection
############################# #############################
# # # #
#### SYNC CLIENT MANAGER #### #### SYNC CLIENT MANAGER ####
#### ail servers connected clients ####
#
# ail_uuid => WEBSOCKETS => ail_uuid
# AIL: clients:
# - 1 push
# - 1 pull
# - N api
def get_server_all_connected_clients():
return r_cache.smembers('ail_2_ail:server:all_clients')
def add_server_connected_client(ail_uuid, sync_mode):
r_cache.sadd('ail_2_ail:server:all_clients', ail_uuid)
r_cache.hset(f'ail_2_ail:server:client:{ail_uuid}', sync_mode, True)
def remove_server_connected_client(ail_uuid, sync_mode=None, is_connected=False):
if sync_mode:
r_cache.hdel(f'ail_2_ail:server:client:{ail_uuid}', sync_mode)
if not is_connected:
r_cache.srem('ail_2_ail:server:all_clients', ail_uuid)
def is_server_client_sync_mode_connected(ail_uuid, sync_mode):
res = r_cache.hexists(f'ail_2_ail:server:client:{ail_uuid}', sync_mode)
return res == 1
def clear_server_connected_clients():
for ail_uuid in get_server_all_connected_clients():
r_cache.delete(f'ail_2_ail:server:client:{ail_uuid}')
r_cache.delete('ail_2_ail:server:all_clients')
def get_server_controller_command():
res = r_cache.spop('ail_2_ail:server_controller:command')
if res:
return json.loads(res)
else:
return None
# command: -kill
# -killall or shutdown / restart ?
## TODO: ADD command
def send_command_to_server_controller(command, ail_uuid=None):
dict_action = {'command': command, 'ail_uuid': ail_uuid}
if ail_uuid:
dict_action['ail_uuid'] = ail_uuid
str_command = json.dumps(dict_action)
r_cache.sadd('ail_2_ail:server_controller:command', str_command)
##-- --##
def get_all_sync_clients(r_set=False): def get_all_sync_clients(r_set=False):
res = r_cache.smembers('ail_2_ail:all_sync_clients') res = r_cache.smembers('ail_2_ail:all_sync_clients')
if r_set: if r_set:
@ -387,7 +439,7 @@ def get_ail_server_error(ail_uuid):
return r_cache.hget(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error') return r_cache.hget(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error')
# # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) # # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE)
def get_ail_instance_metadata(ail_uuid, sync_queues=False): def get_ail_instance_metadata(ail_uuid, client_sync_mode=False, sync_queues=False):
dict_meta = {} dict_meta = {}
dict_meta['uuid'] = ail_uuid dict_meta['uuid'] = ail_uuid
dict_meta['url'] = get_ail_instance_url(ail_uuid) dict_meta['url'] = get_ail_instance_url(ail_uuid)
@ -404,8 +456,11 @@ def get_ail_instance_metadata(ail_uuid, sync_queues=False):
if sync_queues: if sync_queues:
dict_meta['sync_queues'] = get_ail_instance_all_sync_queue(ail_uuid) dict_meta['sync_queues'] = get_ail_instance_all_sync_queue(ail_uuid)
# # TODO: if client_sync_mode:
# - set UUID sync_queue dict_meta['client_sync_mode'] = {}
dict_meta['client_sync_mode']['pull'] = is_server_client_sync_mode_connected(ail_uuid, 'pull')
dict_meta['client_sync_mode']['push'] = is_server_client_sync_mode_connected(ail_uuid, 'push')
dict_meta['client_sync_mode']['api'] = is_server_client_sync_mode_connected(ail_uuid, 'api')
return dict_meta return dict_meta
@ -415,10 +470,11 @@ def get_all_ail_instances_metadata():
l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True))
return l_servers return l_servers
def get_ail_instances_metadata(l_ail_servers): def get_ail_instances_metadata(l_ail_servers, sync_queues=True, client_sync_mode=False):
l_servers = [] l_servers = []
for ail_uuid in l_ail_servers: for ail_uuid in l_ail_servers:
l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True)) server_metadata = get_ail_instance_metadata(ail_uuid, sync_queues=sync_queues, client_sync_mode=client_sync_mode)
l_servers.append(server_metadata)
return l_servers return l_servers
def edit_ail_instance_key(ail_uuid, new_key): def edit_ail_instance_key(ail_uuid, new_key):
@ -1022,12 +1078,12 @@ if __name__ == '__main__':
# res = get_all_unregistred_queue_by_ail_instance(ail_uuid) # res = get_all_unregistred_queue_by_ail_instance(ail_uuid)
ail_uuid = 'c3c2f3ef-ca53-4ff6-8317-51169b73f731' ail_uuid = 'c3c2f3ef-ca53-4ff6-8317-51169b73f731'
ail_uuid = '2dfeff47-777d-4e70-8c30-07c059307e6a' #ail_uuid = '2dfeff47-777d-4e70-8c30-07c059307e6a'
# res = ping_remote_ail_server(ail_uuid) # res = ping_remote_ail_server(ail_uuid)
# print(res) # print(res)
# #
res = ping_remote_ail_server(ail_uuid) res = send_command_to_server_controller('kill', ail_uuid=ail_uuid)
#res = _get_remote_ail_server_response(ail_uuid, 'pin') #res = _get_remote_ail_server_response(ail_uuid, 'pin')
print(res) print(res)

View file

@ -26,7 +26,7 @@ redis_logger.channel = 'Sync'
############################# #############################
CONNECTED_CLIENT = set() CONNECTED_CLIENTS = {}
# # TODO: Store in redis # # TODO: Store in redis
############################# #############################
@ -63,23 +63,59 @@ def unpack_path(path):
# # # # # # # # # # # # # #
# # TODO: ADD more commands
async def server_controller():
while True:
command_dict = ail_2_ail.get_server_controller_command()
if command_dict:
command = command_dict.get('command')
if command == 'kill':
ail_uuid = command_dict.get('ail_uuid')
connected_clients = CONNECTED_CLIENTS[ail_uuid].copy()
for c_websocket in connected_clients:
await c_websocket.close()
redis_logger.info(f'Server Command Connection closed: {ail_uuid}')
print(f'Server Command Connection closed: {ail_uuid}')
# async def send_object(): await asyncio.sleep(10)
# if CONNECTED_CLIENT:
# message = 'new json object {"id": "test01"}'
# await asyncio.wait([user.send(message) for user in USERS])
# # # # # # #
async def register(websocket): async def register(websocket):
ail_uuid = websocket.ail_uuid ail_uuid = websocket.ail_uuid
remote_address = websocket.remote_address remote_address = websocket.remote_address
sync_mode = websocket.sync_mode
redis_logger.info(f'Client Connected: {ail_uuid} {remote_address}') redis_logger.info(f'Client Connected: {ail_uuid} {remote_address}')
print(f'Client Connected: {ail_uuid} {remote_address}') print(f'Client Connected: {ail_uuid} {remote_address}')
CONNECTED_CLIENT.add(websocket)
#print(CONNECTED_CLIENT) if not ail_uuid in CONNECTED_CLIENTS:
CONNECTED_CLIENTS[ail_uuid] = set()
CONNECTED_CLIENTS[ail_uuid].add(websocket)
ail_2_ail.add_server_connected_client(ail_uuid, sync_mode)
print('Register client')
print(CONNECTED_CLIENTS)
print()
async def unregister(websocket): async def unregister(websocket):
CONNECTED_CLIENT.remove(websocket) ail_uuid = websocket.ail_uuid
sync_mode = websocket.sync_mode
CONNECTED_CLIENTS[ail_uuid].remove(websocket)
connected_clients = CONNECTED_CLIENTS[ail_uuid].copy()
for c_websocket in connected_clients:
if c_websocket.sync_mode == sync_mode:
sync_mode = None
break
if not CONNECTED_CLIENTS[ail_uuid]:
is_connected = False
CONNECTED_CLIENTS.pop(ail_uuid)
else:
is_connected = True
ail_2_ail.remove_server_connected_client(ail_uuid, sync_mode=sync_mode, is_connected=is_connected)
print('Unregister client')
print(CONNECTED_CLIENTS)
print()
# PULL: Send data to client # PULL: Send data to client
# # TODO: ADD TIMEOUT ??? # # TODO: ADD TIMEOUT ???
@ -218,8 +254,9 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol):
self.ail_key = api_key self.ail_key = api_key
self.ail_uuid = ail_uuid self.ail_uuid = ail_uuid
self.sync_mode = dict_path['sync_mode']
if dict_path['sync_mode'] == 'pull' or dict_path['sync_mode'] == 'push': if self.sync_mode == 'pull' or self.sync_mode == 'push':
# QUEUE UUID # QUEUE UUID
# if dict_path['queue_uuid']: # if dict_path['queue_uuid']:
@ -237,14 +274,14 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol):
# return http.HTTPStatus.FORBIDDEN, [], b"UUID not found\n" # return http.HTTPStatus.FORBIDDEN, [], b"UUID not found\n"
# SYNC MODE # SYNC MODE
if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=dict_path['sync_mode']): if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=self.sync_mode):
sync_mode = dict_path['sync_mode'] sync_mode = self.sync_mode
redis_logger.warning(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}') redis_logger.warning(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}')
print(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}') print(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}')
return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n" return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n"
# # TODO: CHECK API # # TODO: CHECK API
elif dict_path['sync_mode'] == 'api': elif self.sync_mode == 'api':
pass pass
else: else:
@ -252,11 +289,9 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol):
redis_logger.info(f'Invalid path: {self.remote_address}') redis_logger.info(f'Invalid path: {self.remote_address}')
return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n" return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n"
########################################### ###########################################
# # TODO: clean shutdown / kill all connections # # TODO: clean shutdown / kill all connections
# # TODO: API
# # TODO: Filter object # # TODO: Filter object
# # TODO: IP/uuid to block # # TODO: IP/uuid to block
@ -268,6 +303,8 @@ if __name__ == '__main__':
print('Launching Server...') print('Launching Server...')
redis_logger.info('Launching Server...') redis_logger.info('Launching Server...')
ail_2_ail.clear_server_connected_clients()
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
cert_dir = os.environ['AIL_FLASK'] cert_dir = os.environ['AIL_FLASK']
ssl_context.load_cert_chain(certfile=os.path.join(cert_dir, 'server.crt'), keyfile=os.path.join(cert_dir, 'server.key')) ssl_context.load_cert_chain(certfile=os.path.join(cert_dir, 'server.crt'), keyfile=os.path.join(cert_dir, 'server.key'))
@ -277,5 +314,9 @@ if __name__ == '__main__':
print(f'Server Launched: wss://{host}:{port}') print(f'Server Launched: wss://{host}:{port}')
redis_logger.info(f'Server Launched: wss://{host}:{port}') redis_logger.info(f'Server Launched: wss://{host}:{port}')
asyncio.get_event_loop().run_until_complete(start_server) loop = asyncio.get_event_loop()
asyncio.get_event_loop().run_forever() # server command
loop.create_task(server_controller())
# websockets server
loop.run_until_complete(start_server)
loop.run_forever()

View file

@ -56,8 +56,11 @@ def ail_2_ail_dashboard():
ail_uuid = ail_2_ail.get_ail_uuid() ail_uuid = ail_2_ail.get_ail_uuid()
l_servers = ail_2_ail.get_all_running_sync_servers() l_servers = ail_2_ail.get_all_running_sync_servers()
l_servers = ail_2_ail.get_ail_instances_metadata(l_servers) l_servers = ail_2_ail.get_ail_instances_metadata(l_servers)
l_clients = ail_2_ail.get_server_all_connected_clients()
l_clients = ail_2_ail.get_ail_instances_metadata(l_clients, sync_queues=False, client_sync_mode=True)
return render_template("ail_2_ail_dashboard.html", ail_uuid=ail_uuid, return render_template("ail_2_ail_dashboard.html", ail_uuid=ail_uuid,
l_servers=l_servers) l_servers=l_servers,
l_clients=l_clients)
###################### ######################
# # # #

View file

@ -84,6 +84,44 @@
</tbody> </tbody>
</table> </table>
<h1>Connected Clients:</h3>
<table id="table_servers" class="table table-striped border-primary">
<thead class="bg-dark text-white">
<tr>
<th>uuid</th>
<th>url</th>
<th>description</th>
<th>sync mode</th>
</tr>
</thead>
<tbody style="font-size: 15px;">
{% for dict_server in l_clients %}
<tr class="border-color: blue;">
<td>
<a href="{{ url_for('ail_2_ail_sync.ail_server_view') }}?uuid={{ dict_server['uuid'] }}">
{{ dict_server['uuid']}}
</a>
</td>
<td>{{ dict_server['url']}}</td>
<td>{{ dict_server['description']}}</td>
<td class="text-center">
{% if dict_server['client_sync_mode']['api'] %}
API
{% endif %}
{% if dict_server['client_sync_mode']['pull'] %}
PULL
{% endif %}
{% if dict_server['client_sync_mode']['push'] %}
PUSH
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div> </div>
</div> </div>