#!/usr/bin/env python3 # -*-coding:UTF-8 -* import os import sys from datetime import datetime from flask import url_for # from pymisp import MISPObject sys.path.append(os.environ['AIL_BIN']) ################################## # Import Project packages ################################## from lib import ail_core from lib.ConfigLoader import ConfigLoader from lib.objects.abstract_subtype_object import AbstractSubtypeObject, get_all_id from lib.data_retention_engine import update_obj_date from lib.objects import ail_objects from lib.timeline_engine import Timeline from lib.correlations_engine import get_correlation_by_correl_type config_loader = ConfigLoader() baseurl = config_loader.get_config_str("Notifications", "ail_domain") r_object = config_loader.get_db_conn("Kvrocks_Objects") r_cache = config_loader.get_redis_conn("Redis_Cache") config_loader = None ################################################################################ ################################################################################ ################################################################################ class Chat(AbstractSubtypeObject): # TODO # ID == username ????? """ AIL Chat Object. (strings) """ def __init__(self, id, subtype): super(Chat, self).__init__('chat-thread', id, subtype) # def get_ail_2_ail_payload(self): # payload = {'raw': self.get_gzip_content(b64=True), # 'compress': 'gzip'} # return payload # # WARNING: UNCLEAN DELETE /!\ TEST ONLY /!\ def delete(self): # # TODO: pass def get_link(self, flask_context=False): if flask_context: url = url_for('correlation.show_correlation', type=self.type, subtype=self.subtype, id=self.id) else: url = f'{baseurl}/correlation/show?type={self.type}&subtype={self.subtype}&id={self.id}' return url def get_svg_icon(self): # TODO # if self.subtype == 'telegram': # style = 'fab' # icon = '\uf2c6' # elif self.subtype == 'discord': # style = 'fab' # icon = '\uf099' # else: # style = 'fas' # icon = '\uf007' style = 'fas' icon = '\uf086' return {'style': style, 'icon': icon, 'color': '#4dffff', 'radius': 5} def get_meta(self, options=set()): meta = self._get_meta(options=options) meta['id'] = self.id meta['subtype'] = self.subtype meta['tags'] = self.get_tags(r_list=True) if 'username': meta['username'] = self.get_username() return meta def get_misp_object(self): return ############################################################################ ############################################################################ # others optional metas, ... -> # TODO ALL meta in hset #### Messages #### TODO set parents # def get_last_message_id(self): # # return r_object.hget(f'meta:{self.type}:{self.subtype}:{self.id}', 'last:message:id') if __name__ == '__main__': chat = Chat('test', 'telegram') r = chat.get_messages() print(r)