diff --git a/bin/core/ail_2_ail.py b/bin/core/ail_2_ail.py index a927641a..285c0162 100755 --- a/bin/core/ail_2_ail.py +++ b/bin/core/ail_2_ail.py @@ -9,7 +9,10 @@ import sys import time import uuid +import subprocess + from flask import escape +from pubsublogger import publisher sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/')) import ConfigLoader @@ -27,6 +30,12 @@ r_serv_db = config_loader.get_redis_conn("ARDB_DB") r_serv_sync = config_loader.get_redis_conn("ARDB_DB") config_loader = None +#### LOGS #### +# redis_logger = publisher +# redis_logger.port = 6380 +# redis_logger.channel = 'AIL_SYNC' +##-- LOGS --## + def is_valid_uuid_v4(UUID): if not UUID: return False @@ -50,6 +59,9 @@ def generate_sync_api_key(): def get_ail_uuid(): return r_serv_db.get('ail:uuid') +def get_sync_server_version(): + return '0.1' + def is_valid_websocket_url(websocket_url): regex_websocket_url = r'^(wss:\/\/)([0-9]{1,3}(?:\.[0-9]{1,3}){3}|(?=[^\/]{1,254}(?![^\/]))(?:(?=[a-zA-Z0-9-]{1,63}\.?)(?:xn--+)?[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)*\.?)+[a-zA-Z]{2,63}):([0-9]{1,5})$' if re.match(regex_websocket_url, websocket_url): @@ -134,9 +146,6 @@ def refresh_ail_instance_connection(ail_uuid): client_id = get_client_id_by_ail_uuid(ail_uuid) launch_required = is_ail_instance_push_enabled(ail_uuid) - print(client_id) - print(launch_required) - # relaunch if client_id and launch_required: send_command_to_manager('relaunch', client_id=client_id) @@ -193,7 +202,7 @@ class AIL2AILClientManager(object): def launch_sync_client(self, ail_uuid): dir_project = os.environ['AIL_HOME'] client_id = self.get_new_sync_client_id() - script_options = f'-a {ail_uuid} -m push -i {client_id}' + script_options = f'-u {ail_uuid} -m push -i {client_id}' screen.create_screen(AIL2AILClientManager.SCREEN_NAME) screen.launch_uniq_windows_script(AIL2AILClientManager.SCREEN_NAME, client_id, dir_project, @@ -226,14 +235,11 @@ class AIL2AILClientManager(object): def get_manager_command(self): res = r_cache.spop('ail_2_ail:client_manager:command') if res: - print(res) - print(type(res)) return json.loads(res) else: return None def execute_manager_command(self, command_dict): - print(command_dict) command = command_dict.get('command') if command == 'launch': ail_uuid = command_dict.get('ail_uuid') @@ -357,6 +363,16 @@ def change_pull_push_state(ail_uuid, pull=False, push=False): set_last_updated_sync_config() refresh_ail_instance_connection(ail_uuid) +def get_ail_server_version(ail_uuid): + return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'version') + +def get_ail_server_ping(ail_uuid): + res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'ping') + return res == 'True' + +def get_ail_server_error(ail_uuid): + return r_cache.hget(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error') + # # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE) def get_ail_instance_metadata(ail_uuid, sync_queues=False): dict_meta = {} @@ -365,6 +381,9 @@ def get_ail_instance_metadata(ail_uuid, sync_queues=False): dict_meta['description'] = get_ail_instance_description(ail_uuid) dict_meta['pull'] = is_ail_instance_pull_enabled(ail_uuid) dict_meta['push'] = is_ail_instance_pull_enabled(ail_uuid) + dict_meta['ping'] = get_ail_server_ping(ail_uuid) + dict_meta['version'] = get_ail_server_version(ail_uuid) + dict_meta['error'] = get_ail_server_error(ail_uuid) # # TODO: HIDE dict_meta['api_key'] = get_ail_instance_key(ail_uuid) @@ -421,8 +440,105 @@ def delete_ail_instance(ail_uuid): refresh_ail_instance_connection(ail_uuid) return ail_uuid +## WEBSOCKET API - ERRORS ## + +def set_ail_server_version(ail_uuid, version): + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'version', version) + +def set_ail_server_ping(ail_uuid, pong): + r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'ping', bool(pong)) + +def save_ail_server_error(ail_uuid, error_message): + r_cache.hset(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error', error_message) + +def clear_save_ail_server_error(ail_uuid): + r_cache.hdel(f'ail_2_ail:all_servers:metadata:{ail_uuid}', 'error') + +def _get_remote_ail_server_response(ail_uuid, api_request): + websocket_client = os.path.join(os.environ['AIL_BIN'], 'core', 'ail_2_ail_client.py') + l_command = ['python', websocket_client, '-u', ail_uuid, '-m', 'api', '-a', api_request] + process = subprocess.Popen(l_command, stdout=subprocess.PIPE) + while process.poll() is None: + time.sleep(1) + + if process.returncode == 0: + # Scrapy-Splash ERRORS + if process.stderr: + stderr = process.stderr.read().decode() + if stderr: + print(f'stderr: {stderr}') + + if process.stdout: + output = process.stdout.read().decode() + #print(output) + if output: + try: + message = json.loads(output) + return message + except Exception as e: + print(e) + error = f'Error: {e}' + save_ail_server_error(ail_uuid, error) + return + # ERROR + else: + if process.stderr: + stderr = process.stderr.read().decode() + else: + stderr = '' + if process.stdout: + stdout = process.stdout.read().decode() + else: + stdout ='' + if stderr or stdout: + error = f'-stderr-\n{stderr}\n-stdout-\n{stdout}' + print(error) + save_ail_server_error(ail_uuid, error) + return + +def get_remote_ail_server_version(ail_uuid): + response = _get_remote_ail_server_response(ail_uuid, 'version') + if response: + version = response.get('version') + if version: + version = float(version) + if version >= 0.1: + set_ail_server_version(ail_uuid, version) + return version + +# # TODO: CATCH WEBSOCKETS RESPONSE CODE +def ping_remote_ail_server(ail_uuid): + response = _get_remote_ail_server_response(ail_uuid, 'ping') + if response: + response = response.get('message', False) + pong = response == 'pong' + set_ail_server_ping(ail_uuid, pong) + return pong + ## API ## +def api_ping_remote_ail_server(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + res = ping_remote_ail_server(ail_uuid) + return res, 200 + +def api_get_remote_ail_server_version(json_dict): + ail_uuid = json_dict.get('uuid').replace(' ', '') + if not is_valid_uuid_v4(ail_uuid): + return {"status": "error", "reason": "Invalid ail uuid"}, 400 + ail_uuid = sanityze_uuid(ail_uuid) + if not exists_ail_instance(ail_uuid): + return {"status": "error", "reason": "AIL server not found"}, 404 + + res = get_remote_ail_server_version(ail_uuid) + return res, 200 + def api_create_ail_instance(json_dict): ail_uuid = json_dict.get('uuid').replace(' ', '') if not is_valid_uuid_v4(ail_uuid): @@ -755,7 +871,13 @@ if __name__ == '__main__': # print(get_all_sync_queue()) # res = get_all_unregistred_queue_by_ail_instance(ail_uuid) - ail_uuid = 'd82d3e61-2438-4ede-93bf-37b6fd9d7510' - res = get_client_id_by_ail_uuid(ail_uuid) + ail_uuid = 'c3c2f3ef-ca53-4ff6-8317-51169b73f731' + ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' + # res = ping_remote_ail_server(ail_uuid) + # print(res) + # + res = get_remote_ail_server_version(ail_uuid) + + #res = _get_remote_ail_server_response(ail_uuid, 'pin') print(res) diff --git a/bin/core/ail_2_ail_client.py b/bin/core/ail_2_ail_client.py index 678999a3..797c5872 100755 --- a/bin/core/ail_2_ail_client.py +++ b/bin/core/ail_2_ail_client.py @@ -6,6 +6,7 @@ import json import os import sys import time +from pubsublogger import publisher from urllib.parse import urljoin import asyncio @@ -19,6 +20,12 @@ sys.path.append(os.environ['AIL_BIN']) ################################## from core import ail_2_ail +#### LOGS #### +redis_logger = publisher +redis_logger.port = 6380 +redis_logger.channel = 'AIL_SYNC_client' +##-- LOGS --## + #################################################################### class AIL2AILClient(object): @@ -29,18 +36,21 @@ class AIL2AILClient(object): self.ail_uuid = ail_uuid self.sync_mode = sync_mode - # # TODO: - self.ail_url = "wss://localhost:4443" - self.uri = f"{ail_url}/{sync_mode}/{ail_uuid}" #################################################################### +# # TODO: ADD TIMEOUT => 30s +async def api_request(websocket, ail_uuid): + res = await websocket.recv() + # API OUTPUT + sys.stdout.write(res) + # # TODO: ADD TIMEOUT async def pull(websocket, ail_uuid): while True: obj = await websocket.recv() - print(obj) + sys.stdout.write(res) async def push(websocket, ail_uuid): @@ -60,51 +70,100 @@ async def push(websocket, ail_uuid): await asyncio.sleep(10) -async def ail_to_ail_client(ail_uuid, sync_mode, ail_key=None): +async def ail_to_ail_client(ail_uuid, sync_mode, api, ail_key=None): + if not ail_2_ail.exists_ail_instance(ail_uuid): + print('AIL server not found') + return + if not ail_key: ail_key = ail_2_ail.get_ail_instance_key(ail_uuid) - ail_url = "wss://localhost:4443" - uri = f"{ail_url}/{sync_mode}/{ail_uuid}" - print(uri) + # # TODO: raise exception + ail_url = ail_2_ail.get_ail_instance_url(ail_uuid) + local_ail_uuid = ail_2_ail.get_ail_uuid() - async with websockets.connect( - uri, - ssl=ssl_context, - extra_headers={"Authorization": f"{ail_key}"} - ) as websocket: + if sync_mode == 'api': + uri = f"{ail_url}/{sync_mode}/{api}/{local_ail_uuid}" + else: + uri = f"{ail_url}/{sync_mode}/{local_ail_uuid}" + #print(uri) - if sync_mode == 'pull': - await pull(websocket, ail_uuid) + ail_2_ail.clear_save_ail_server_error(ail_uuid) + + try: + async with websockets.connect( + uri, + ssl=ssl_context, + extra_headers={"Authorization": f"{ail_key}"} + ) as websocket: + + if sync_mode == 'pull': + await pull(websocket, ail_uuid) + + elif sync_mode == 'push': + await push(websocket, ail_uuid) + await websocket.close() + + elif sync_mode == 'api': + await api_request(websocket, ail_uuid) + await websocket.close() + except websockets.exceptions.InvalidStatusCode as e: + status_code = e.status_code + error_message = '' + # success + if status_code == 1000: + print('connection closed') + elif status_code == 400: + error_message = 'BAD_REQUEST: Invalid path' + elif status_code == 401: + error_message = 'UNAUTHORIZED: Invalid Key' + elif status_code == 403: + error_message = 'FORBIDDEN: SYNC mode disabled' + else: + error_message = str(e) + if error_message: + sys.stderr.write(error_message) + redis_logger.warning(f'{error_message}: {ail_uuid}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except websockets.exceptions.InvalidURI as e: + error_message = f'Invalid AIL url: {e.uri}' + sys.stderr.write(error_message) + redis_logger.warning(f'{error_message}: {ail_uuid}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except ConnectionError as e: + error_message = str(e) + sys.stderr.write(error_message) + redis_logger.info(f'{error_message}: {ail_uuid}') + ail_2_ail.save_ail_server_error(ail_uuid, error_message) + except websockets.exceptions.ConnectionClosedOK as e: + print('connection closed') + # except Exception as e: + # print(e) - elif sync_mode == 'push': - await push(websocket, ail_uuid) - await websocket.close() - elif sync_mode == 'api': - await websocket.close() -##########################################################3 -# # TODO:manual key -########################################################## if __name__ == '__main__': parser = argparse.ArgumentParser(description='Websocket SYNC Client') - parser.add_argument('-a', '--ail', help='AIL UUID', type=str, dest='ail_uuid', required=True, default=None) + parser.add_argument('-u', '--uuid', help='AIL UUID', type=str, dest='ail_uuid', required=True, default=None) parser.add_argument('-i', '--client_id', help='Client ID', type=str, dest='client_id', default=None) - parser.add_argument('-m', '--mode', help='SYNC Mode, pull or push', type=str, dest='sync_mode', default='pull') + parser.add_argument('-m', '--mode', help='SYNC Mode, pull, push or api', type=str, dest='sync_mode', default='pull') + parser.add_argument('-a', '--api', help='API, ping or version', type=str, dest='api', default=None) #parser.add_argument('-k', '--key', type=str, default='', help='AIL Key') args = parser.parse_args() ail_uuid = args.ail_uuid sync_mode = args.sync_mode + api = args.api - if ail_uuid is None or sync_mode not in ['pull', 'push']: + if ail_uuid is None or sync_mode not in ['api', 'pull', 'push']: parser.print_help() sys.exit(0) - #ail_uuid = '03c51929-eeab-4d47-9dc0-c667f94c7d2d' - #sync_mode = 'pull' + if api: + if api not in ['ping', 'version']: + parser.print_help() + sys.exit(0) # SELF SIGNED CERTIFICATES ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) @@ -112,4 +171,4 @@ if __name__ == '__main__': ssl_context.verify_mode = ssl.CERT_NONE # SELF SIGNED CERTIFICATES - asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode)) + asyncio.get_event_loop().run_until_complete(ail_to_ail_client(ail_uuid, sync_mode, api)) diff --git a/bin/core/ail_2_ail_server.py b/bin/core/ail_2_ail_server.py index 5daf78ff..d4e98a1e 100755 --- a/bin/core/ail_2_ail_server.py +++ b/bin/core/ail_2_ail_server.py @@ -22,7 +22,7 @@ from core import ail_2_ail #### LOGS #### redis_logger = publisher redis_logger.port = 6380 -redis_logger.channel = 'AIL_SYNC' +redis_logger.channel = 'AIL_SYNC_Server' ############################# @@ -52,8 +52,13 @@ def unpack_path(path): path = path.split('/') if len(path) < 3: raise Exception('Invalid url path') + if not len(path[-1]): + path = path[:-1] + dict_path['sync_mode'] = path[1] - dict_path['ail_uuid'] = path[2] + dict_path['ail_uuid'] = path[-1] + dict_path['api'] = path[2:-1] + return dict_path # # # # # # # @@ -66,8 +71,12 @@ def unpack_path(path): async def register(websocket): + ail_uuid = websocket.ail_uuid + remote_address = websocket.remote_address + redis_logger.info(f'Client Connected: {ail_uuid} {remote_address}') + print(f'Client Connected: {ail_uuid} {remote_address}') CONNECTED_CLIENT.add(websocket) - print(CONNECTED_CLIENT) + #print(CONNECTED_CLIENT) async def unregister(websocket): CONNECTED_CLIENT.remove(websocket) @@ -108,6 +117,23 @@ async def push(websocket, ail_uuid): ail_2_ail.add_ail_stream_to_sync_importer(ail_stream) +# API: server API +# # TODO: ADD TIMEOUT ??? +async def api(websocket, ail_uuid, api): + api = api[0] + if api == 'ping': + message = {'message':'pong'} + message = json.dumps(message) + await websocket.send(message) + elif api == 'version': + sync_version = ail_2_ail.get_sync_server_version() + message = {'version': sync_version} + message = json.dumps(message) + await websocket.send(message) + + # END API + return + async def ail_to_ail_serv(websocket, path): # # TODO: save in class @@ -118,10 +144,9 @@ async def ail_to_ail_serv(websocket, path): # # TODO: check if it works # # DEBUG: - print(websocket.ail_key) - print(websocket.ail_uuid) - print(websocket.remote_address) - print(f'sync mode: {sync_mode}') + # print(websocket.ail_uuid) + # print(websocket.remote_address) + # print(f'sync mode: {sync_mode}') await register(websocket) try: @@ -135,7 +160,10 @@ async def ail_to_ail_serv(websocket, path): await push(websocket, websocket.ail_uuid) elif sync_mode == 'api': + await api(websocket, websocket.ail_uuid, path['api']) await websocket.close() + redis_logger.info(f'Connection closed: {ail_uuid} {remote_address}') + print(f'Connection closed: {ail_uuid} {remote_address}') finally: await unregister(websocket) @@ -151,11 +179,12 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): async def process_request(self, path, request_headers): - print(self.remote_address) - print(request_headers) + # DEBUG: + # print(self.remote_address) + # print(request_headers) + # API TOKEN api_key = request_headers.get('Authorization', '') - print(api_key) if api_key is None: redis_logger.warning(f'Missing token: {self.remote_address}') print(f'Missing token: {self.remote_address}') @@ -215,7 +244,7 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol): return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n" # # TODO: CHECK API - elif dict_path[sync_mode] == 'api': + elif dict_path['sync_mode'] == 'api': pass else: diff --git a/var/www/blueprints/ail_2_ail_sync.py b/var/www/blueprints/ail_2_ail_sync.py index b9d35a42..37643346 100644 --- a/var/www/blueprints/ail_2_ail_sync.py +++ b/var/www/blueprints/ail_2_ail_sync.py @@ -53,9 +53,11 @@ def create_json_response(data, status_code): @login_required @login_admin def ail_2_ail_dashboard(): + ail_uuid = ail_2_ail.get_ail_uuid() l_servers = ail_2_ail.get_all_running_sync_servers() l_servers = ail_2_ail.get_ail_instances_metadata(l_servers) - return render_template("ail_2_ail_dashboard.html", l_servers=l_servers) + return render_template("ail_2_ail_dashboard.html", ail_uuid=ail_uuid, + l_servers=l_servers) ###################### # # @@ -80,6 +82,28 @@ def ail_server_view(): return render_template("view_ail_server.html", server_metadata=server_metadata, bootstrap_label=bootstrap_label) +@ail_2_ail_sync.route('/settings/ail_2_ail/server/api/ping', methods=['GET']) +@login_required +@login_admin +def ail_server_api_ping(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_ping_remote_ail_server(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=ail_uuid)) + +@ail_2_ail_sync.route('/settings/ail_2_ail/server/api/version', methods=['GET']) +@login_required +@login_admin +def ail_server_api_version(): + ail_uuid = request.args.get('uuid') + input_dict = {"uuid": ail_uuid} + res = ail_2_ail.api_get_remote_ail_server_version(input_dict) + if res[1] != 200: + return create_json_response(res[0], res[1]) + return redirect(url_for('ail_2_ail_sync.ail_server_view', uuid=ail_uuid)) + @ail_2_ail_sync.route('/settings/ail_2_ail/server/add', methods=['GET', 'POST']) @login_required @login_admin diff --git a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html index 0efc7f82..e5a4bfaf 100644 --- a/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html +++ b/var/www/templates/ail_2_ail/ail_2_ail_dashboard.html @@ -30,6 +30,20 @@