chg: [v4.0 AIL SYNC / AIL 2 AIL] SYNC Manager + fixs + views

This commit is contained in:
Terrtia 2021-11-22 23:45:41 +01:00
parent 966f61bb94
commit 997a2c602a
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
21 changed files with 2006 additions and 73 deletions

View file

@ -51,9 +51,11 @@ class Sync_module(AbstractModule):
print(message)
### REFRESH DICT
if self.last_refresh < ail_2_ail.get_last_updated_ail_instance():
self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict()
if self.last_refresh < ail_2_ail.get_last_updated_sync_config():
self.last_refresh = time.time()
self.dict_sync_queues = ail_2_ail.get_all_sync_queue_dict()
print('sync queues refreshed')
print(self.dict_sync_queues)
# Extract object from message
# # TODO: USE JSON DICT ????

View file

@ -4,10 +4,13 @@
import os
import json
import secrets
import re
import sys
import time
import uuid
from flask import escape
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
@ -16,6 +19,7 @@ import screen
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'packages/'))
from Item import Item
import Tag
config_loader = ConfigLoader.ConfigLoader()
r_cache = config_loader.get_redis_conn("Redis_Cache")
@ -23,6 +27,20 @@ r_serv_db = config_loader.get_redis_conn("ARDB_DB")
r_serv_sync = config_loader.get_redis_conn("ARDB_DB")
config_loader = None
def is_valid_uuid_v4(UUID):
if not UUID:
return False
UUID = UUID.replace('-', '')
try:
uuid_test = uuid.UUID(hex=UUID, version=4)
return uuid_test.hex == UUID
except:
return False
def sanityze_uuid(UUID):
sanityzed_uuid = uuid.UUID(hex=UUID, version=4)
return str(sanityzed_uuid)
def generate_uuid():
return str(uuid.uuid4()).replace('-', '')
@ -32,7 +50,31 @@ def generate_sync_api_key():
def get_ail_uuid():
return r_serv_db.get('ail:uuid')
# # TODO: # TODO: # TODO: # TODO: # TODO: ADD SYNC MODE == PUSH
def is_valid_websocket_url(websocket_url):
regex_websocket_url = r'^(wss:\/\/)([0-9]{1,3}(?:\.[0-9]{1,3}){3}|(?=[^\/]{1,254}(?![^\/]))(?:(?=[a-zA-Z0-9-]{1,63}\.?)(?:xn--+)?[a-zA-Z0-9]+(?:-[a-zA-Z0-9]+)*\.?)+[a-zA-Z]{2,63}):([0-9]{1,5})$'
if re.match(regex_websocket_url, websocket_url):
return True
return False
def is_valid_websocket_key(ail_key):
regex_key = r'^[A-Za-z0-9-_]{56}$'
if re.match(regex_key, ail_key):
return True
return False
#### HANDLE CONFIG UPDATE ####
def get_last_updated_sync_config():
epoch = r_serv_sync.get(f'ail:instance:queue:last_updated_sync_config')
if not epoch:
epoch = 0
return float(epoch)
def set_last_updated_sync_config():
epoch = int(time.time())
r_serv_sync.set(f'ail:instance:queue:last_updated_sync_config', epoch)
return epoch
# # TODO: get connection status
# # TODO: get connection METADATA
#############################
@ -49,12 +91,21 @@ def get_all_sync_clients(r_set=False):
def get_sync_client_ail_uuid(client_id):
return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid')
def get_sync_client_queue_uuid(client_id):
return r_cache.hget(f'ail_2_ail:sync_client:{client_id}', 'queue_uuid')
# current: only one push registred
def get_client_id_by_ail_uuid(ail_uuid):
res = r_cache.smembers(f'ail_2_ail:ail_uuid:{ail_uuid}')
if res:
return int(res.pop())
def get_all_running_sync_servers():
running_ail_servers= []
for client_id in get_all_sync_clients():
ail_uuid = get_sync_client_ail_uuid(client_id)
running_ail_servers.append(ail_uuid)
return running_ail_servers
def delete_sync_client_cache(client_id):
ail_uuid = get_sync_client_ail_uuid(client_id)
queue_uuid = get_sync_client_queue_uuid(client_id)
# map ail_uuid/queue_uuid
r_cache.srem(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id)
r_cache.srem(f'ail_2_ail:queue_uuid:{queue_uuid}', client_id)
@ -71,9 +122,31 @@ def delete_all_sync_clients_cache():
# -kill
# -relaunch
## TODO: check command
def send_command_to_manager(command, client_id=-1):
def send_command_to_manager(command, client_id=-1, ail_uuid=None):
dict_action = {'command': command, 'client_id': client_id}
r_cache.sadd('ail_2_ail:client_manager:command', dict_action)
if ail_uuid:
dict_action['ail_uuid'] = ail_uuid
str_command = json.dumps(dict_action)
r_cache.sadd('ail_2_ail:client_manager:command', str_command)
def refresh_ail_instance_connection(ail_uuid):
client_id = get_client_id_by_ail_uuid(ail_uuid)
launch_required = is_ail_instance_push_enabled(ail_uuid)
print(client_id)
print(launch_required)
# relaunch
if client_id and launch_required:
send_command_to_manager('relaunch', client_id=client_id)
# kill
elif client_id:
send_command_to_manager('kill', client_id=client_id)
# launch
elif launch_required:
send_command_to_manager('launch', ail_uuid=ail_uuid)
class AIL2AILClientManager(object):
"""AIL2AILClientManager."""
@ -93,19 +166,23 @@ class AIL2AILClientManager(object):
# return new client id
def get_new_sync_client_id(self):
for new_id in range(100000):
for new_id in range(1, 100000):
new_id = str(new_id)
if new_id not in self.clients:
return str(new_id)
def get_sync_client_ail_uuid(self, client_id):
return self.clients[client_id]['ail_uuid']
def get_sync_client_queue_uuid(self, client_id):
return self.clients[client_id]['queue_uuid']
# def get_sync_client_queue_uuid(self, client_id):
# return self.clients[client_id]['queue_uuid']
# # TODO: check PUSH ACL
def get_all_sync_clients_to_launch(self):
return get_all_ail_instance()
ail_instances_to_launch = []
for ail_uuid in get_all_ail_instance():
if is_ail_instance_push_enabled(ail_uuid):
ail_instances_to_launch.append(ail_uuid)
return ail_instances_to_launch
def relaunch_all_sync_clients(self):
delete_all_sync_clients_cache()
@ -127,6 +204,8 @@ class AIL2AILClientManager(object):
r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'ail_uuid', ail_uuid)
r_cache.hset(f'ail_2_ail:sync_client:{client_id}', 'launch_time', int(time.time()))
r_cache.sadd('ail_2_ail:all_sync_clients', client_id)
# create map ail_uuid/queue_uuid
r_cache.sadd(f'ail_2_ail:ail_uuid:{ail_uuid}', client_id)
@ -135,7 +214,7 @@ class AIL2AILClientManager(object):
# # TODO: FORCE KILL ????????????
# # TODO: check if exists
def kill_sync_client(self, client_id):
if not kill_screen_window('AIL_2_AIL', client_id):
if not screen.kill_screen_window('AIL_2_AIL',client_id):
# # TODO: log kill error
pass
@ -147,17 +226,19 @@ class AIL2AILClientManager(object):
def get_manager_command(self):
res = r_cache.spop('ail_2_ail:client_manager:command')
if res:
return json.dumps(res)
print(res)
print(type(res))
return json.loads(res)
else:
return None
def execute_manager_command(self, command_dict):
print(command_dict)
command = command_dict.get('command')
if command == 'launch':
ail_uuid = int(command_dict.get('ail_uuid'))
queue_uuid = int(command_dict.get('queue_uuid'))
self.launch_sync_client(ail_uuid, queue_uuid)
elif command == 'relaunch':
ail_uuid = command_dict.get('ail_uuid')
self.launch_sync_client(ail_uuid)
elif command == 'relaunch_all':
self.relaunch_all_sync_clients()
else:
# only one sync client
@ -165,13 +246,13 @@ class AIL2AILClientManager(object):
if client_id < 1:
print('Invalid client id')
return None
client_id = str(client_id)
if command == 'kill':
self.kill_sync_client(client_id)
elif command == 'relaunch':
ail_uuid = self.get_sync_client_ail_uuid(client_id)
queue_uuid = self.get_sync_client_queue_uuid(client_id)
self.kill_sync_client(client_id)
self.launch_sync_client(ail_uuid, queue_uuid)
self.launch_sync_client(ail_uuid)
########################################
########################################
@ -181,7 +262,6 @@ class AIL2AILClientManager(object):
def get_sync_client_status(client_id):
dict_client = {'id': client_id}
dict_client['ail_uuid'] = get_sync_client_ail_uuid(client_id)
dict_client['queue_uuid'] = get_sync_client_queue_uuid(client_id)
return dict_client
def get_all_sync_client_status():
@ -232,12 +312,18 @@ def get_ail_instance_all_sync_queue(ail_uuid):
def is_ail_instance_queue(ail_uuid, queue_uuid):
return r_serv_sync.sismember(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid)
def exists_ail_instance(ail_uuid):
return r_serv_sync.exists(f'ail:instance:{ail_uuid}')
def get_ail_instance_url(ail_uuid):
return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'url')
def get_ail_instance_description(ail_uuid):
return r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'description')
def exists_ail_instance(ail_uuid):
return r_serv_sync.sismember('ail:instance:all', ail_uuid)
def is_ail_instance_push_enabled(ail_uuid):
res = r_serv_sync.hget(f'ail:instance:{ail_uuid}', 'push')
return res == 'True'
@ -268,27 +354,44 @@ def change_pull_push_state(ail_uuid, pull=False, push=False):
push = False
r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'push', push)
r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'pull', pull)
set_last_updated_sync_config()
refresh_ail_instance_connection(ail_uuid)
# # TODO: HIDE ADD GLOBAL FILTER (ON BOTH SIDE)
# # TODO: push/pull
def get_ail_instance_metadata(ail_uuid):
def get_ail_instance_metadata(ail_uuid, sync_queues=False):
dict_meta = {}
dict_meta['uuid'] = ail_uuid
dict_meta['url'] = get_ail_instance_url(ail_uuid)
dict_meta['description'] = get_ail_instance_description(ail_uuid)
dict_meta['pull'] = is_ail_instance_pull_enabled(ail_uuid)
dict_meta['push'] = is_ail_instance_pull_enabled(ail_uuid)
# # TODO: HIDE
dict_meta['api_key'] = get_ail_instance_key(ail_uuid)
if sync_queues:
dict_meta['sync_queues'] = get_ail_instance_all_sync_queue(ail_uuid)
# # TODO:
# - set UUID sync_queue
return dict_meta
def get_all_ail_instances_metadata():
l_servers = []
for ail_uuid in get_all_ail_instance():
l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True))
return l_servers
def get_ail_instances_metadata(l_ail_servers):
l_servers = []
for ail_uuid in l_ail_servers:
l_servers.append(get_ail_instance_metadata(ail_uuid, sync_queues=True))
return l_servers
# # TODO: VALIDATE URL
# API KEY
def create_ail_instance(ail_uuid, url, api_key=None, description=None):
def create_ail_instance(ail_uuid, url, api_key=None, description=None, pull=True, push=True):
r_serv_sync.sadd('ail:instance:all', ail_uuid)
r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'url', url)
## API KEY ##
@ -300,6 +403,9 @@ def create_ail_instance(ail_uuid, url, api_key=None, description=None):
#- API KEY -#
if description:
r_serv_sync.hset(f'ail:instance:{ail_uuid}', 'description', description)
change_pull_push_state(ail_uuid, pull=pull, push=push)
set_last_updated_sync_config()
refresh_ail_instance_connection(ail_uuid)
return ail_uuid
def delete_ail_instance(ail_uuid):
@ -311,12 +417,54 @@ def delete_ail_instance(ail_uuid):
r_serv_sync.srem('ail:instance:key:all', ail_uuid)
r_serv_sync.delete(f'ail:instance:key:{key}', ail_uuid)
r_serv_sync.srem('ail:instance:all', ail_uuid)
set_last_updated_sync_config()
refresh_ail_instance_connection(ail_uuid)
return ail_uuid
def get_last_updated_ail_instance():
epoch = r_serv_sync.get(f'ail:instance:queue:last_updated')
if not epoch:
epoch = 0
return float(epoch)
## API ##
def api_create_ail_instance(json_dict):
ail_uuid = json_dict.get('uuid').replace(' ', '')
if not is_valid_uuid_v4(ail_uuid):
return {"status": "error", "reason": "Invalid ail uuid"}, 400
ail_uuid = sanityze_uuid(ail_uuid)
if exists_ail_instance(ail_uuid):
return {"status": "error", "reason": "AIL uuid already exists"}, 400
if json_dict.get('pull'):
pull = True
else:
pull = False
if json_dict.get('push'):
push = True
else:
push = False
description = json_dict.get('description')
ail_url = json_dict.get('url').replace(' ', '')
if not is_valid_websocket_url(ail_url):
return {"status": "error", "reason": "Invalid websocket url"}, 400
ail_key = json_dict.get('key')
if ail_key:
ail_key = ail_key.replace(' ', '')
if not is_valid_websocket_key(ail_key):
return {"status": "error", "reason": "Invalid websocket key"}, 400
res = create_ail_instance(ail_uuid, ail_url, api_key=ail_key, description=description,
pull=pull, push=push)
return res, 200
def api_delete_ail_instance(json_dict):
ail_uuid = json_dict.get('uuid', '').replace(' ', '')
if not is_valid_uuid_v4(ail_uuid):
return {"status": "error", "reason": "Invalid AIL uuid"}, 400
ail_uuid = sanityze_uuid(ail_uuid)
if not exists_ail_instance(ail_uuid):
return {"status": "error", "reason": "AIL server not found"}, 404
res = delete_ail_instance(ail_uuid)
return res, 200
####################
# #
@ -334,32 +482,57 @@ def get_all_sync_queue():
def get_sync_queue_all_ail_instance(queue_uuid):
return r_serv_sync.smembers(f'ail2ail:sync_queue:ail_instance:{queue_uuid}')
def exists_sync_queue(queue_uuid):
return r_serv_sync.exists(f'ail2ail:sync_queue:{queue_uuid}')
# # TODO: check if push or pull enabled ?
def is_queue_used_by_ail_instace(queue_uuid):
def is_queue_used_by_ail_instance(queue_uuid):
return r_serv_sync.exists(f'ail2ail:sync_queue:ail_instance:{queue_uuid}')
# # TODO: add others filter
def get_sync_queue_filter(queue_uuid):
return r_serv_sync.smembers(f'ail2ail:sync_queue:filter:tags:{queue_uuid}')
def get_sync_queue_name(queue_uuid):
return r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'name')
def get_sync_queue_description(queue_uuid):
return r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'description')
def get_sync_queue_max_size(queue_uuid):
return r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'max_size')
# # TODO: ADD FILTER
def get_sync_queue_metadata(queue_uuid):
dict_meta = {}
dict_meta['uuid'] = queue_uuid
dict_meta['name'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'name')
dict_meta['description'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'description')
dict_meta['max_size'] = r_serv_sync.hget(f'ail2ail:sync_queue:{queue_uuid}', 'max_size')
dict_meta['name'] = get_sync_queue_name(queue_uuid)
dict_meta['description'] = get_sync_queue_description(queue_uuid)
dict_meta['max_size'] = get_sync_queue_max_size(queue_uuid)
dict_meta['tags'] = get_sync_queue_filter(queue_uuid)
# # TODO: TO ADD:
# - set uuid instance
# - get uuid instance
return dict_meta
def get_all_queues_metadata():
l_queues = []
for queue_uuid in get_all_sync_queue():
l_queues.append(get_sync_queue_metadata(queue_uuid))
return l_queues
def get_queues_metadata(l_queues_uuid):
l_queues = []
for queue_uuid in l_queues_uuid:
l_queues.append(get_sync_queue_metadata(queue_uuid))
return l_queues
#####################################################
def get_all_sync_queue_dict():
dict_sync_queues = {}
for queue_uuid in get_all_sync_queue():
if is_queue_used_by_ail_instace(queue_uuid):
if is_queue_used_by_ail_instance(queue_uuid):
dict_queue = {}
dict_queue['filter'] = get_sync_queue_filter(queue_uuid)
@ -374,14 +547,22 @@ def get_all_sync_queue_dict():
dict_sync_queues[queue_uuid] = dict_queue
return dict_sync_queues
def is_queue_registred_by_ail_instance(queue_uuid, ail_uuid):
return r_serv_sync.sismember(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid)
def register_ail_to_sync_queue(ail_uuid, queue_uuid):
r_serv_sync.sadd(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid)
r_serv_sync.sadd(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid)
set_last_updated_sync_config()
# # # FIXME: TODO: delete sync queue ????????????????????????????????????????????????????
def unregister_ail_to_sync_queue(ail_uuid, queue_uuid):
r_serv_sync.srem(f'ail2ail:sync_queue:ail_instance:{queue_uuid}', ail_uuid)
r_serv_sync.srem(f'ail:instance:sync_queue:{ail_uuid}', queue_uuid)
set_last_updated_sync_config()
def get_all_unregistred_queue_by_ail_instance(ail_uuid):
return r_serv_sync.sdiff('ail2ail:sync_queue:all', f'ail:instance:sync_queue:{ail_uuid}')
# # TODO: optionnal name ???
# # TODO: SANITYZE TAGS
@ -397,15 +578,96 @@ def create_sync_queue(name, tags=[], description=None, max_size=100):
for tag in tags:
r_serv_sync.sadd(f'ail2ail:sync_queue:filter:tags:{queue_uuid}', tag)
set_last_updated_sync_config()
return queue_uuid
def delete_sync_queue(queue_uuid):
for ail_uuid in get_sync_queue_all_ail_instance(queue_uuid):
unregister_ail_to_sync_queue(ail_uuid, queue_uuid)
r_serv_sync.delete(f'ail2ail:sync_queue:{queue_uuid}')
r_serv_sync.delete(f'ail2ail:sync_queue:filter:tags:{queue_uuid}')
r_serv_sync.srem('ail2ail:sync_queue:all', queue_uuid)
set_last_updated_sync_config()
return queue_uuid
## API ##
# # TODO: sanityze queue_name
def api_create_sync_queue(json_dict):
description = json_dict.get('description')
description = escape(description)
queue_name = json_dict.get('name')
if queue_name: #################################################
queue_name = escape(queue_name)
tags = json_dict.get('tags')
if not tags:
{"status": "error", "reason": "no tags provided"}, 400
if not Tag.are_enabled_tags(tags):
{"status": "error", "reason": "Invalid/Disabled tags"}, 400
max_size = json_dict.get('max_size')
if not max_size:
max_size = 100
try:
max_size = int(max_size)
except ValueError:
{"status": "error", "reason": "Invalid queue size value"}, 400
if not max_size > 0:
return {"status": "error", "reason": "Invalid queue size value"}, 400
queue_uuid = create_sync_queue(queue_name, tags=tags, description=description,
max_size=max_size)
return queue_uuid, 200
def api_delete_sync_queue(json_dict):
queue_uuid = json_dict.get('uuid', '').replace(' ', '').replace('-', '')
if not is_valid_uuid_v4(queue_uuid):
return {"status": "error", "reason": "Invalid Queue uuid"}, 400
if not exists_sync_queue(queue_uuid):
return {"status": "error", "reason": "Queue Sync not found"}, 404
res = delete_sync_queue(queue_uuid)
return res, 200
def api_register_ail_to_sync_queue(json_dict):
ail_uuid = json_dict.get('ail_uuid', '').replace(' ', '')
if not is_valid_uuid_v4(ail_uuid):
return {"status": "error", "reason": "Invalid AIL uuid"}, 400
ail_uuid = sanityze_uuid(ail_uuid)
queue_uuid = json_dict.get('queue_uuid', '').replace(' ', '').replace('-', '')
if not is_valid_uuid_v4(queue_uuid):
return {"status": "error", "reason": "Invalid Queue uuid"}, 400
if not exists_ail_instance(ail_uuid):
return {"status": "error", "reason": "AIL server not found"}, 404
if not exists_sync_queue(queue_uuid):
return {"status": "error", "reason": "Queue Sync not found"}, 404
if is_queue_registred_by_ail_instance(queue_uuid, ail_uuid):
return {"status": "error", "reason": "Queue already registred"}, 400
res = register_ail_to_sync_queue(ail_uuid, queue_uuid)
return res, 200
def api_unregister_ail_to_sync_queue(json_dict):
ail_uuid = json_dict.get('ail_uuid', '').replace(' ', '')
if not is_valid_uuid_v4(ail_uuid):
return {"status": "error", "reason": "Invalid ail uuid"}, 400
ail_uuid = sanityze_uuid(ail_uuid)
queue_uuid = json_dict.get('queue_uuid', '').replace(' ', '').replace('-', '')
if not is_valid_uuid_v4(queue_uuid):
return {"status": "error", "reason": "Invalid ail uuid"}, 400
if not exists_ail_instance(ail_uuid):
return {"status": "error", "reason": "AIL server not found"}, 404
if not exists_sync_queue(queue_uuid):
return {"status": "error", "reason": "Queue Sync not found"}, 404
if not is_queue_registred_by_ail_instance(queue_uuid, ail_uuid):
return {"status": "error", "reason": "Queue not registred"}, 400
res = unregister_ail_to_sync_queue(ail_uuid, queue_uuid)
return res, 200
#############################
# #
#### SYNC REDIS QUEUE #######
@ -487,6 +749,13 @@ if __name__ == '__main__':
#res = delete_sync_queue(queue_uuid)
#res = register_ail_to_sync_queue(ail_uuid, queue_uuid)
res = change_pull_push_state(ail_uuid, push=True, pull=True)
#res = change_pull_push_state(ail_uuid, push=True, pull=True)
# print(get_ail_instance_all_sync_queue(ail_uuid))
# print(get_all_sync_queue())
# res = get_all_unregistred_queue_by_ail_instance(ail_uuid)
ail_uuid = 'd82d3e61-2438-4ede-93bf-37b6fd9d7510'
res = get_client_id_by_ail_uuid(ail_uuid)
print(res)

View file

@ -15,23 +15,14 @@ sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from pubsublogger import publisher
from core import ail_2_ail
obj_test = {
"format": "ail",
"version": 1,
"type": "item",
"meta": {
"ail:uuid": "03c51929-eeab-4d47-9dc0-c667f94c7d2c",
"ail:uuid_org": "28bc3db3-16da-461c-b20b-b944f4058708",
},
"payload": {
"raw" : "MjhiYzNkYjMtMTZkYS00NjFjLWIyMGItYjk0NGY0MDU4NzA4Cg==",
"compress": "gzip",
"encoding": "base64"
}
}
# # TODO: refactor logging
#### LOGS ####
redis_logger = publisher
redis_logger.port = 6380
redis_logger.channel = 'AIL_SYNC'
#############################
@ -92,7 +83,7 @@ async def pull(websocket, ail_uuid):
if Obj:
obj_ail_stream = ail_2_ail.create_ail_stream(Obj)
Obj = json.dumps(obj_ail_stream)
print(Obj)
#print(Obj)
# send objects
await websocket.send(Obj)
@ -107,27 +98,29 @@ async def pull(websocket, ail_uuid):
# PUSH: receive data from client
# # TODO: optional queue_uuid
async def push(websocket, ail_uuid):
print(ail_uuid)
#print(ail_uuid)
while True:
ail_stream = await websocket.recv()
# # TODO: CHECK ail_stream
ail_stream = json.loads(ail_stream)
print(ail_stream)
#print(ail_stream)
ail_2_ail.add_ail_stream_to_sync_importer(ail_stream)
async def ail_to_ail_serv(websocket, path):
# # TODO: save in class
ail_uuid = websocket.ail_uuid
remote_address = websocket.remote_address
path = unpack_path(path)
sync_mode = path['sync_mode']
# # TODO: check if it works
# # DEBUG:
print(websocket.ail_key)
print(websocket.ail_uuid)
print(websocket.remote_address)
path = unpack_path(path)
sync_mode = path['sync_mode']
print(f'sync mode: {sync_mode}')
await register(websocket)
@ -135,7 +128,8 @@ async def ail_to_ail_serv(websocket, path):
if sync_mode == 'pull':
await pull(websocket, websocket.ail_uuid)
await websocket.close()
print('closed')
redis_logger.info(f'Connection closed: {ail_uuid} {remote_address}')
print(f'Connection closed: {ail_uuid} {remote_address}')
elif sync_mode == 'push':
await push(websocket, websocket.ail_uuid)
@ -163,29 +157,34 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol):
api_key = request_headers.get('Authorization', '')
print(api_key)
if api_key is None:
print('Missing token')
redis_logger.warning(f'Missing token: {self.remote_address}')
print(f'Missing token: {self.remote_address}')
return http.HTTPStatus.UNAUTHORIZED, [], b"Missing token\n"
if not ail_2_ail.is_allowed_ail_instance_key(api_key):
print('Invalid token')
redis_logger.warning(f'Invalid token: {self.remote_address}')
print(f'Invalid token: {self.remote_address}')
return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n"
# PATH
try:
dict_path = unpack_path(path)
except Exception as e:
print('Invalid path')
redis_logger.warning(f'Invalid path: {self.remote_address}')
print(f'Invalid path: {self.remote_address}')
return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n"
ail_uuid = ail_2_ail.get_ail_instance_by_key(api_key)
if ail_uuid != dict_path['ail_uuid']:
print('Invalid token')
redis_logger.warning(f'Invalid token: {self.remote_address} {ail_uuid}')
print(f'Invalid token: {self.remote_address} {ail_uuid}')
return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n"
if not api_key != ail_2_ail.get_ail_instance_key(api_key):
print('Invalid token')
redis_logger.warning(f'Invalid token: {self.remote_address} {ail_uuid}')
print(f'Invalid token: {self.remote_address} {ail_uuid}')
return http.HTTPStatus.UNAUTHORIZED, [], b"Invalid token\n"
self.ail_key = api_key
@ -210,7 +209,9 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol):
# SYNC MODE
if not ail_2_ail.is_ail_instance_sync_enabled(self.ail_uuid, sync_mode=dict_path['sync_mode']):
print('SYNC mode disabled')
sync_mode = dict_path['sync_mode']
redis_logger.warning(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}')
print(f'SYNC mode disabled: {self.remote_address} {ail_uuid} {sync_mode}')
return http.HTTPStatus.FORBIDDEN, [], b"SYNC mode disabled\n"
# # TODO: CHECK API
@ -218,17 +219,16 @@ class AIL_2_AIL_Protocol(websockets.WebSocketServerProtocol):
pass
else:
print('Invalid path')
print(f'Invalid path: {self.remote_address}')
redis_logger.info(f'Invalid path: {self.remote_address}')
return http.HTTPStatus.BAD_REQUEST, [], b"Invalid path\n"
###########################################
# # TODO: logging
# # TODO: clean shutdown / kill all connections
# # TODO: API
# # TODO: Filter object
# # TODO: process_request check
# # TODO: IP/uuid to block
if __name__ == '__main__':
@ -237,6 +237,7 @@ if __name__ == '__main__':
port = 4443
print('Launching Server...')
redis_logger.info('Launching Server...')
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
cert_dir = os.environ['AIL_FLASK']
@ -245,6 +246,7 @@ if __name__ == '__main__':
start_server = websockets.serve(ail_to_ail_serv, "localhost", 4443, ssl=ssl_context, create_protocol=AIL_2_AIL_Protocol)
print(f'Server Launched: wss://{host}:{port}')
redis_logger.info(f'Server Launched: wss://{host}:{port}')
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

View file

@ -63,11 +63,17 @@ def get_galaxy_from_tag(tag):
except IndexError:
return None
def get_active_taxonomies():
return r_serv_tags.smembers('active_taxonomies')
def get_active_taxonomies(r_set=False):
res = r_serv_tags.smembers('active_taxonomies')
if r_set:
return set(res)
return res
def get_active_galaxies():
return r_serv_tags.smembers('active_galaxies')
def get_active_galaxies(r_set=False):
res = r_serv_tags.smembers('active_galaxies')
if r_set:
return set(res)
return res
def get_all_taxonomies_tags(): # # TODO: add + REMOVE + Update
return r_serv_tags.smembers('active_taxonomies_tags')
@ -75,6 +81,44 @@ def get_all_taxonomies_tags(): # # TODO: add + REMOVE + Update
def get_all_galaxies_tags(): # # TODO: add + REMOVE + Update
return r_serv_tags.smembers('active_galaxies_tags')
def get_taxonomies_enabled_tags(r_list=False):
l_tag_keys = []
for taxonomie in get_active_taxonomies():
l_tag_keys.append(f'active_tag_{taxonomie}')
if len(l_tag_keys) > 1:
res = r_serv_tags.sunion(l_tag_keys[0], *l_tag_keys[1:])
elif l_tag_keys:
res = r_serv_tags.smembers(l_tag_keys[0])
if r_list:
return list(res)
else:
return res
def get_galaxies_enabled_tags():
l_tag_keys = []
for galaxy in get_active_galaxies():
l_tag_keys.append(f'active_tag_galaxies_{galaxy}')
if len(l_tag_keys) > 1:
return r_serv_tags.sunion(l_tag_keys[0], *l_tag_keys[1:])
elif l_tag_keys:
return r_serv_tags.smembers(l_tag_keys[0])
else:
return []
def get_taxonomie_enabled_tags(taxonomie, r_list=False):
res = r_serv_tags.smembers(f'active_tag_{taxonomie}')
if r_list:
return list(res)
else:
return res
def get_galaxy_enabled_tags(galaxy, r_list=False):
res = r_serv_tags.smembers(f'active_tag_galaxies_{galaxy}')
if r_list:
return list(res)
else:
return res
def is_taxonomie_tag_enabled(taxonomie, tag):
if tag in r_serv_tags.smembers('active_tag_' + taxonomie):
return True
@ -136,6 +180,67 @@ def is_valid_tags_taxonomies_galaxy(list_tags, list_tags_galaxy):
return False
return True
def is_taxonomie_tag(tag, namespace=None):
if not namespace:
namespace = tag.split(':')[0]
if namespace != 'misp-galaxy':
return True
else:
return False
def is_galaxy_tag(tag, namespace=None):
if not namespace:
namespace = tag.split(':')[0]
if namespace == 'misp-galaxy':
return True
else:
return False
# # TODO:
# def is_valid_tag(tag):
# pass
def is_enabled_tag(tag, enabled_namespace=None):
if is_taxonomie_tag(tag):
return is_enabled_taxonomie_tag(tag, enabled_taxonomies=enabled_namespace)
else:
return is_enabled_galaxy_tag(tag, enabled_galaxies=enabled_namespace)
def are_enabled_tags(tags):
enabled_taxonomies = get_active_taxonomies(r_set=True)
enabled_galaxies = get_active_galaxies(r_set=True)
for tag in tags:
if is_taxonomie_tag(tag):
res = is_enabled_taxonomie_tag(tag, enabled_taxonomies=enabled_taxonomies)
else:
res = is_enabled_galaxy_tag(tag, enabled_galaxies=enabled_galaxies)
if not res:
return False
return True
def is_enabled_taxonomie_tag(tag, enabled_taxonomies=None):
if not enabled_taxonomies:
enabled_taxonomies = get_active_taxonomies()
taxonomie = get_taxonomie_from_tag(tag)
if taxonomie is None:
return False
if taxonomie not in enabled_taxonomies:
return False
if not is_taxonomie_tag_enabled(taxonomie, tag):
return False
def is_enabled_galaxy_tag(tag, enabled_galaxies=None):
if not enabled_galaxies:
enabled_galaxies = get_active_galaxies()
galaxy = get_galaxy_from_tag(tag)
if galaxy is None:
return False
if galaxy not in enabled_galaxies:
return False
if not is_galaxy_tag_enabled(galaxy, tag):
return False
return True
#### ####
def is_tag_in_all_tag(tag):
@ -144,6 +249,31 @@ def is_tag_in_all_tag(tag):
else:
return False
def get_tag_synonyms(tag):
return r_serv_tags.smembers(f'synonym_tag_{tag}')
def get_tag_dislay_name(tag):
tag_synonyms = get_tag_synonyms(tag)
if not tag_synonyms:
return tag
else:
return tag + ', '.join(tag_synonyms)
def get_tags_selector_dict(tags):
list_tags = []
for tag in tags:
list_tags.append(get_tag_selector_dict(tag))
return list_tags
def get_tag_selector_dict(tag):
return {'name':get_tag_dislay_name(tag),'id':tag}
def get_tags_selector_data():
dict_selector = {}
dict_selector['active_taxonomies'] = get_active_taxonomies()
dict_selector['active_galaxies'] = get_active_galaxies()
return dict_selector
def get_min_tag(tag):
tag = tag.split('=')
if len(tag) > 1: