mirror of
https://github.com/ail-project/ail-framework.git
synced 2024-11-14 10:38:22 +00:00
185 lines
5.8 KiB
Python
Executable file
185 lines
5.8 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*-coding:UTF-8 -*
|
|
|
|
import os
|
|
import sys
|
|
import uuid
|
|
|
|
sys.path.append(os.environ['AIL_BIN'])
|
|
##################################
|
|
# Import Project packages
|
|
##################################
|
|
from lib.ConfigLoader import ConfigLoader
|
|
|
|
config_loader = ConfigLoader()
|
|
r_serv_db = config_loader.get_db_conn("Kvrocks_DB")
|
|
r_object = config_loader.get_db_conn("Kvrocks_Objects")
|
|
config_loader = None
|
|
|
|
AIL_OBJECTS = sorted({'chat', 'chat-subchannel', 'chat-thread', 'cookie-name', 'cve', 'cryptocurrency', 'decoded',
|
|
'domain', 'etag', 'favicon', 'file-name', 'hhhash',
|
|
'item', 'image', 'message', 'ocr', 'pgp', 'screenshot', 'title', 'user-account', 'username'})
|
|
|
|
AIL_OBJECTS_WITH_SUBTYPES = {'chat', 'chat-subchannel', 'cryptocurrency', 'pgp', 'username', 'user-account'}
|
|
|
|
# TODO by object TYPE ????
|
|
AIL_OBJECTS_CORRELATIONS_DEFAULT = sorted({'chat', 'chat-subchannel', 'chat-thread', 'cve', 'cryptocurrency', 'decoded',
|
|
'domain', 'favicon', 'file-name',
|
|
'item', 'image', 'message', 'ocr', 'pgp', 'screenshot', 'title', 'user-account', 'username'})
|
|
|
|
def get_ail_uuid():
|
|
ail_uuid = r_serv_db.get('ail:uuid')
|
|
if not ail_uuid:
|
|
ail_uuid = _set_ail_uuid()
|
|
return ail_uuid
|
|
|
|
def _set_ail_uuid():
|
|
ail_uuid = generate_uuid()
|
|
r_serv_db.set('ail:uuid', ail_uuid)
|
|
return ail_uuid
|
|
|
|
def is_valid_uuid_v4(header_uuid):
|
|
try:
|
|
header_uuid = header_uuid.replace('-', '')
|
|
uuid_test = uuid.UUID(hex=header_uuid, version=4)
|
|
return uuid_test.hex == header_uuid
|
|
except:
|
|
return False
|
|
|
|
def generate_uuid():
|
|
return str(uuid.uuid4())
|
|
|
|
#### AIL OBJECTS ####
|
|
|
|
def get_all_objects():
|
|
return AIL_OBJECTS
|
|
|
|
def is_object_type(obj_type):
|
|
return obj_type in AIL_OBJECTS
|
|
|
|
def get_objects_with_subtypes():
|
|
return AIL_OBJECTS_WITH_SUBTYPES
|
|
|
|
def get_object_all_subtypes(obj_type): # TODO Dynamic subtype
|
|
if obj_type == 'chat':
|
|
return r_object.smembers(f'all_chat:subtypes')
|
|
if obj_type == 'chat-subchannel':
|
|
return r_object.smembers(f'all_chat-subchannel:subtypes')
|
|
if obj_type == 'chat-thread':
|
|
return r_object.smembers(f'all_chat-thread:subtypes')
|
|
if obj_type == 'cryptocurrency':
|
|
return ['bitcoin', 'bitcoin-cash', 'dash', 'ethereum', 'litecoin', 'monero', 'tron', 'zcash']
|
|
if obj_type == 'pgp':
|
|
return ['key', 'mail', 'name']
|
|
if obj_type == 'username':
|
|
return ['telegram', 'twitter', 'jabber']
|
|
if obj_type == 'user-account':
|
|
return r_object.smembers(f'all_chat:subtypes')
|
|
return []
|
|
|
|
def get_default_correlation_objects():
|
|
return AIL_OBJECTS_CORRELATIONS_DEFAULT
|
|
|
|
def get_obj_queued():
|
|
return ['item', 'image', 'message', 'ocr']
|
|
|
|
def get_objects_tracked():
|
|
return ['decoded', 'item', 'pgp', 'message', 'ocr', 'title']
|
|
|
|
def get_objects_retro_hunted():
|
|
return ['decoded', 'item', 'message']
|
|
|
|
def get_all_objects_with_subtypes_tuple():
|
|
str_objs = []
|
|
for obj_type in get_all_objects():
|
|
subtypes = get_object_all_subtypes(obj_type)
|
|
if subtypes:
|
|
for subtype in subtypes:
|
|
str_objs.append((obj_type, subtype))
|
|
else:
|
|
str_objs.append((obj_type, ''))
|
|
return str_objs
|
|
|
|
def unpack_obj_global_id(global_id, r_type='tuple'):
|
|
if r_type == 'dict':
|
|
obj = global_id.split(':', 2)
|
|
return {'type': obj[0], 'subtype': obj[1], 'id': obj[2]}
|
|
else: # tuple(type, subtype, id)
|
|
return global_id.split(':', 2) # TODO REPLACE get_obj_type_subtype_id_from_global_id(global_id)
|
|
|
|
def unpack_objs_global_id(objs_global_id, r_type='tuple'):
|
|
objs = []
|
|
for global_id in objs_global_id:
|
|
objs.append(unpack_obj_global_id(global_id, r_type=r_type))
|
|
return objs
|
|
|
|
def unpack_correl_obj__id(obj_type, global_id, r_type='tuple'):
|
|
obj = global_id.split(':', 1)
|
|
if r_type == 'dict':
|
|
return {'type': obj_type, 'subtype': obj[0], 'id': obj[1]}
|
|
else: # tuple(type, subtype, id)
|
|
return obj_type, obj[0], obj[1]
|
|
|
|
def unpack_correl_objs_id(obj_type, correl_objs_id, r_type='tuple'):
|
|
objs = []
|
|
for correl_obj_id in correl_objs_id:
|
|
objs.append(unpack_correl_obj__id(obj_type, correl_obj_id, r_type=r_type))
|
|
return objs
|
|
|
|
##-- AIL OBJECTS --##
|
|
|
|
#### Redis ####
|
|
|
|
def _parse_zscan(response):
|
|
cursor, r = response
|
|
it = iter(r)
|
|
return str(cursor), list(it)
|
|
|
|
def zscan_iter(r_redis, name): # count ???
|
|
cursor = 0
|
|
while cursor != "0":
|
|
cursor, data = _parse_zscan(r_redis.zscan(name, cursor=cursor))
|
|
yield from data
|
|
|
|
## -- Redis -- ##
|
|
|
|
def rreplace(s, old, new, occurrence):
|
|
li = s.rsplit(old, occurrence)
|
|
return new.join(li)
|
|
|
|
def paginate_iterator(iter_elems, nb_obj=50, page=1):
|
|
dict_page = {'nb_all_elem': len(iter_elems)}
|
|
nb_pages = dict_page['nb_all_elem'] / nb_obj
|
|
if not nb_pages.is_integer():
|
|
nb_pages = int(nb_pages)+1
|
|
else:
|
|
nb_pages = int(nb_pages)
|
|
if page > nb_pages:
|
|
page = nb_pages
|
|
|
|
# multiple pages
|
|
if nb_pages > 1:
|
|
dict_page['list_elem'] = []
|
|
start = nb_obj*(page - 1)
|
|
stop = (nb_obj*page) - 1
|
|
current_index = 0
|
|
for elem in iter_elems:
|
|
if current_index > stop:
|
|
break
|
|
if start <= current_index <= stop:
|
|
dict_page['list_elem'].append(elem)
|
|
current_index += 1
|
|
stop += 1
|
|
if stop > dict_page['nb_all_elem']:
|
|
stop = dict_page['nb_all_elem']
|
|
|
|
else:
|
|
start = 0
|
|
stop = dict_page['nb_all_elem']
|
|
dict_page['list_elem'] = list(iter_elems)
|
|
dict_page['page'] = page
|
|
dict_page['nb_pages'] = nb_pages
|
|
# UI
|
|
dict_page['nb_first_elem'] = start+1
|
|
dict_page['nb_last_elem'] = stop
|
|
return dict_page
|