chg: [importer] refactor ZMQ + Feeder importer

This commit is contained in:
Terrtia 2023-02-03 16:13:57 +01:00
parent 3365a054a8
commit de0a60ba8b
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
23 changed files with 588 additions and 517 deletions

View file

@ -100,15 +100,19 @@ def core_migration():
versions_to_update = r_serv_db.smembers('ail:to_update')
for version in versions_to_update:
r_kvrocks.sadd('ail:update:to_update', version)
update_error = r_serv_db.get('ail:update_error')
update_in_progress = r_serv_db.get('ail:update_in_progress')
if update_error:
r_kvrocks.set('ail:update:error', update_error)
update_in_progress = r_serv_db.get('ail:update_in_progress')
if update_in_progress:
r_kvrocks.set('ail:update:update_in_progress', update_in_progress)
# d4 passivedns
d4_enabled = r_serv_db.hget('d4:passivedns', 'enabled')
d4_enabled = bool(r_serv_db.hget('d4:passivedns', 'enabled'))
d4_update_time = r_serv_db.hget('d4:passivedns', 'update_time')
r_kvrocks.hset('d4:passivedns', 'enabled', bool(d4_enabled))
r_kvrocks.hset('d4:passivedns', 'enabled', str(d4_enabled))
r_kvrocks.hset('d4:passivedns', 'update_time', d4_update_time)
# Crawler Manager
@ -172,6 +176,7 @@ def user_migration():
Users.create_user(user_id, password=None, chg_passwd=chg_passwd, role=role)
Users.edit_user_password(user_id, password_hash, chg_passwd=chg_passwd)
Users._delete_user_token(user_id)
print(user_id, token)
Users._set_user_token(user_id, token)
for invite_row in r_crawler.smembers('telegram:invite_code'):
@ -871,15 +876,15 @@ def cves_migration():
if __name__ == '__main__':
#core_migration()
#user_migration()
core_migration()
user_migration()
#tags_migration()
# items_migration()
# crawler_migration()
domain_migration() # TO TEST ###########################
# domain_migration() # TO TEST ###########################
# decodeds_migration()
# screenshots_migration()
subtypes_obj_migration()
# subtypes_obj_migration()
# ail_2_ail_migration()
# trackers_migration()
# investigations_migration()
@ -891,6 +896,6 @@ if __name__ == '__main__':
# crawler queues + auto_crawlers
# stats - Cred - Mail - Provider
# TODO FEEDER IMPORT -> return r_serv_db.lpop('importer:json:item')
##########################################################

View file

@ -162,7 +162,7 @@ function launching_scripts {
# sleep 0.1
echo -e $GREEN"\t* Launching core scripts ..."$DEFAULT
# TODO: MOOVE IMPORTER ???? => multiple scripts
# TODO: IMPORTER SCREEN ????
#### SYNC ####
screen -S "Script_AIL" -X screen -t "Sync_importer" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./Sync_importer.py; read x"
@ -173,7 +173,9 @@ function launching_scripts {
sleep 0.1
##-- SYNC --##
screen -S "Script_AIL" -X screen -t "JSON_importer" bash -c "cd ${AIL_BIN}/import; ${ENV_PY} ./JSON_importer.py; read x"
screen -S "Script_AIL" -X screen -t "ZMQImporter" bash -c "cd ${AIL_BIN}/importer; ${ENV_PY} ./ZMQImporter.py; read x"
sleep 0.1
screen -S "Script_AIL" -X screen -t "FeederImporter" bash -c "cd ${AIL_BIN}/importer; ${ENV_PY} ./FeederImporter.py; read x"
sleep 0.1
screen -S "Script_AIL" -X screen -t "D4_client" bash -c "cd ${AIL_BIN}/core; ${ENV_PY} ./D4_client.py; read x"
sleep 0.1
@ -279,7 +281,7 @@ function launching_scripts {
# sleep 0.1
##################################
# #
# TO MIGRATE #
##################################
screen -S "Script_AIL" -X screen -t "ModuleInformation" bash -c "cd ${AIL_BIN}; ${ENV_PY} ./ModulesInformationV2.py -k 0 -c 1; read x"
sleep 0.1

View file

@ -1,60 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import json
import redis
import sys
import time
sys.path.append(os.environ['AIL_BIN'])
from Helper import Process
from pubsublogger import publisher
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
import importer
if __name__ == '__main__':
publisher.port = 6380
publisher.channel = 'Script'
config_section = 'Importer_Json'
process = Process(config_section)
config_loader = ConfigLoader.ConfigLoader()
# REDIS #
server_cache = config_loader.get_redis_conn("Redis_Log_submit")
config_loader = None
# LOGGING #
publisher.info("JSON Feed Script started to receive & publish.")
# OTHER CONFIG #
DEFAULT_FEEDER_NAME = 'Unknow Feeder'
while True:
json_item = importer.get_json_item_to_import()
if json_item:
json_item = json.loads(json_item)
feeder_name = importer.get_json_source(json_item)
print('importing: {} feeder'.format(feeder_name))
json_import_class = importer.get_json_receiver_class(feeder_name)
importer_obj = json_import_class(feeder_name, json_item)
importer.process_json(importer_obj, process)
else:
time.sleep(5)

View file

@ -1,54 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Receiver Jabber Json Items
"""
import os
import sys
import time
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import item_basic
from lib.objects.Usernames import Username
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer'))
from Default_json import Default_json
class Ail_feeder_jabber(Default_json):
"""Jabber Feeder functions"""
def __init__(self, name, json_item):
super().__init__(name, json_item)
def get_feeder_name(self):
return 'jabber'
# define item id
def get_item_id(self):
item_date = time.strptime(self.json_item['meta']['jabber:ts'], "%Y-%m-%dT%H:%M:%S.%f")
item_date_str = time.strftime("%Y/%m/%d", item_date)
item_id = str(self.json_item['meta']['jabber:id'])
return os.path.join('jabber', item_date_str, item_id) + '.gz'
def process_json_meta(self, process, item_id):
'''
Process JSON meta filed.
'''
jabber_id = str(self.json_item['meta']['jabber:id'])
item_basic.add_map_obj_id_item_id(jabber_id, item_id, 'jabber_id')
to = str(self.json_item['meta']['jabber:to'])
fr = str(self.json_item['meta']['jabber:from'])
item_date = item_basic.get_item_date(item_id)
user_to = Username(to, 'jabber')
user_fr = Username(fr, 'jabber')
user_to.add(date, item_id)
user_fr.add(date, item_id)
return None

View file

@ -1,61 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import sys
import datetime
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import item_basic
from lib.objects.Usernames import Username
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer'))
from Default_json import Default_json
class Ail_feeder_telegram(Default_json):
"""Twitter Feeder functions"""
def __init__(self, name, json_item):
super().__init__(name, json_item)
def get_feeder_name(self):
return 'telegram'
# define item id
def get_item_id(self):
# use twitter timestamp ?
item_date = datetime.date.today().strftime("%Y/%m/%d")
channel_id = str(self.json_item['meta']['channel_id'])
message_id = str(self.json_item['meta']['message_id'])
item_id = f'{channel_id}_{message_id}'
return os.path.join('telegram', item_date, item_id) + '.gz'
def process_json_meta(self, process, item_id):
'''
Process JSON meta filed.
'''
channel_id = str(self.json_item['meta']['channel_id'])
message_id = str(self.json_item['meta']['message_id'])
telegram_id = f'{channel_id}_{message_id}'
item_basic.add_map_obj_id_item_id(telegram_id, item_id, 'telegram_id')
#print(self.json_item['meta'])
user = None
if self.json_item['meta'].get('user'):
user = str(self.json_item['meta']['user'])
else:
if self.json_item['meta'].get('channel'):
user = str(self.json_item['meta']['channel']['username'])
if user:
item_date = item_basic.get_item_date(item_id)
username = Username(user, 'telegram')
username.add(date, item_id)
return None

View file

@ -1,52 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import sys
import datetime
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import item_basic
from lib.objects.Usernames import Username
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer'))
from Default_json import Default_json
class Ail_feeder_twitter(Default_json):
"""Twitter Feeder functions"""
def __init__(self, name, json_item):
super().__init__(name, json_item)
def get_feeder_name(self):
return 'twitter'
# define item id
def get_item_id(self):
# use twitter timestamp ?
item_date = datetime.date.today().strftime("%Y/%m/%d")
item_id = str(self.json_item['meta']['twitter:tweet_id'])
return os.path.join('twitter', item_date, item_id) + '.gz'
def process_json_meta(self, process, item_id):
'''
Process JSON meta filed.
'''
tweet_id = str(self.json_item['meta']['twitter:tweet_id'])
item_basic.add_map_obj_id_item_id(tweet_id, item_id, 'twitter_id')
date = item_basic.get_item_date(item_id)
user = str(self.json_item['meta']['twitter:id'])
username = Username(user, 'twitter')
username.add(date, item_id)
return None

View file

@ -1,63 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import sys
import datetime
import uuid
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer'))
from Default_json import Default_json
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib.objects.Items import Item
class Ail_feeder_urlextract(Default_json):
"""urlextract Feeder functions"""
def __init__(self, name, json_item):
super().__init__(name, json_item)
def get_feeder_name(self):
return 'urlextract'
# define item id
def get_item_id(self):
# use twitter timestamp ?
item_date = datetime.date.today().strftime("%Y/%m/%d")
item_id = str(self.json_item['meta']['twitter:url-extracted'])
item_id = item_id.split('//')
if len(item_id) > 1:
item_id = ''.join(item_id[1:])
else:
item_id = item_id[0]
item_id = item_id.replace('/', '_')
if len(item_id) > 215:
item_id = '{}{}.gz'.format(item_id[:215], str(uuid.uuid4()))
else:
item_id = '{}{}.gz'.format(item_id, str(uuid.uuid4()))
return os.path.join('urlextract', item_date, item_id)
# # TODO:
def process_json_meta(self, process, item_id):
"""
Process JSON meta filed.
"""
json_meta = self.get_json_meta()
parent_id = str(json_meta['parent:twitter:tweet_id']) # TODO SEARCH IN CACHE !!!
item = Item(item_id)
item.set_parent(parent_id)
# # TODO: change me
# parent_type = 'twitter_id'
# item_basic.add_item_parent_by_parent_id(parent_type, parent_id, item_id)

View file

@ -1,72 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import datetime
import json
import redis
import time
import sys
import uuid
#sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
#import ConfigLoader
#import item_basic
class Default_json(object):
"""Default Feeder functions"""
def __init__(self, feeder_name, json_item):
self.name = feeder_name
self.json_item = json_item
def get_feeder_source(self):
'''
Return the original feeder name (json source field).
'''
return self.name
def get_feeder_name(self):
'''
Return feeder name. first part of the item_id and display in the UI
'''
return self.name
def get_json_file(self):
'''
Return the JSON dict,
'''
return self.json_item
def get_json_meta(self):
return self.json_item['meta']
def get_feeder_uuid(self):
pass
def get_item_gzip64encoded_content(self):
'''
Return item base64 encoded gzip content,
'''
return self.json_item['data']
## OVERWRITE ME ##
def get_item_id(self):
'''
Return item id. define item id
'''
item_date = datetime.date.today().strftime("%Y/%m/%d")
return os.path.join(self.get_feeder_name(), item_date, str(uuid.uuid4())) + '.gz'
## OVERWRITE ME ##
def process_json_meta(self, process, item_id):
'''
Process JSON meta filed.
'''
return None

View file

@ -1,109 +0,0 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import importlib
import json
import sys
import time
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'lib/'))
import ConfigLoader
# Import all receiver
#from all_json_receiver import *
#### CONFIG ####
config_loader = ConfigLoader.ConfigLoader()
server_cache = config_loader.get_redis_conn("Redis_Log_submit")
r_serv_db = config_loader.get_redis_conn("ARDB_DB")
config_loader = None
DEFAULT_FEEDER_NAME = 'Default_json'
#### ------ ####
def reload_json_importer_list():
global importer_list
importer_json_dir = os.path.join(os.environ['AIL_BIN'], 'import', 'ail_json_importer')
importer_list = [f[:-3] for f in os.listdir(importer_json_dir) if os.path.isfile(os.path.join(importer_json_dir, f))]
# init importer list
importer_list = []
reload_json_importer_list()
#### FUNCTIONS ####
def get_json_importer_list():
return importer_list
def add_json_to_json_queue(json_item):
json_item = json.dumps(json_item)
r_serv_db.rpush('importer:json:item', json_item)
def get_json_item_to_import():
return r_serv_db.lpop('importer:json:item')
def get_json_receiver_class(feeder_name_in):
global importer_list
# sanitize class name
feeder_name = feeder_name_in[:1].upper() + feeder_name_in[1:]
feeder_name = feeder_name.replace('-', '_')
if feeder_name is None or feeder_name not in get_json_importer_list():
reload_json_importer_list() # add refresh timing ?
if feeder_name not in get_json_importer_list():
print('Unknow feeder: {}'.format(feeder_name_in))
feeder_name = 'Default_json'
# avoid subpackages
parts = feeder_name.split('.')
module = 'ail_json_importer.{}'.format(parts[-1])
# import json importer class
try:
mod = importlib.import_module(module)
except:
raise
mod = importlib.import_module(module)
class_name = getattr(mod, feeder_name)
return class_name
def get_json_source(json_item):
return json_item.get('source', DEFAULT_FEEDER_NAME)
def process_json(importer_obj, process):
item_id = importer_obj.get_item_id()
if 'meta' in importer_obj.get_json_file():
importer_obj.process_json_meta(process, item_id)
# send data to queue
send_item_to_ail_queue(item_id, importer_obj.get_item_gzip64encoded_content(), importer_obj.get_feeder_name(), process)
def send_item_to_ail_queue(item_id, gzip64encoded_content, feeder_name, process):
# Send item to queue
# send paste to Global
relay_message = "{0} {1}".format(item_id, gzip64encoded_content)
process.populate_set_out(relay_message, 'Mixer')
# increase nb of paste by feeder name
server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1)
#### ---- ####
#### API ####
def api_import_json_item(data_json):
if not data_json:
return ({'status': 'error', 'reason': 'Malformed JSON'}, 400)
# # TODO: add JSON verification
res = add_json_to_json_queue(data_json)
if res:
return ({'status': 'error'}, 400)
return ({'status': 'success'}, 200)

124
bin/importer/FeederImporter.py Executable file
View file

@ -0,0 +1,124 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
Importer Class
================
Import Content
"""
import os
import sys
import importlib
import json
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from importer.abstract_importer import AbstractImporter
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
#### CONFIG ####
config_loader = ConfigLoader()
r_db = config_loader.get_db_conn('Kvrocks_DB')
config_loader = None
# --- CONFIG --- #
class FeederImporter(AbstractImporter):
def __init__(self):
super().__init__()
self.feeders = {}
self.reload_feeders()
# TODO ADD TIMEOUT RELOAD
def reload_feeders(self):
feeder_dir = os.path.join(os.environ['AIL_BIN'], 'importer', 'feeders')
feeders = [f[:-3] for f in os.listdir(feeder_dir) if os.path.isfile(os.path.join(feeder_dir, f))]
self.feeders = {}
for feeder in feeders:
print(feeder)
part = feeder.split('.')[-1]
# import json importer class
mod = importlib.import_module(f'importer.feeders.{part}')
cls = getattr(mod, f'{feeder}Feeder')
print(cls)
self.feeders[feeder] = cls
print()
print(self.feeders)
print()
def get_feeder(self, json_data):
class_name = None
feeder_name = json_data.get('source')
if feeder_name:
if feeder_name.startswith('ail_feeder_'):
feeder_name = feeder_name.replace('ail_feeder_', '', 1)
class_name = feeder_name.replace('-', '_').title()
if not class_name or class_name not in self.feeders:
class_name = 'Default'
cls = self.feeders[class_name]
return cls(json_data)
def importer(self, json_data):
feeder = self.get_feeder(json_data)
feeder_name = feeder.get_name()
print(f'importing: {feeder_name} feeder')
item_id = feeder.get_item_id()
# process meta
if feeder.get_json_meta():
feeder.process_meta()
gzip64_content = feeder.get_gzip64_content()
return f'{item_id} {gzip64_content}'
class FeederModuleImporter(AbstractModule):
def __init__(self):
super(FeederModuleImporter, self).__init__()
self.pending_seconds = 5
config = ConfigLoader()
self.r_db = config.get_db_conn('Kvrocks_DB')
self.importer = FeederImporter()
def get_message(self):
return self.r_db.lpop('importer:feeder') # TODO CHOOSE DB
# TODO RELOAD LIST
# after delta
def compute(self, message):
# TODO HANDLE Invalid JSON
json_data = json.loads(message)
relay_message = self.importer.importer(json_data)
self.send_message_to_queue(relay_message)
# TODO IN MIXER
# increase nb of paste by feeder name
# server_cache.hincrby("mixer_cache:list_feeder", feeder_name, 1)
def add_json_feeder_to_queue(json_data):
json_data = json.dumps(json_data)
return r_db.rpush('importer:feeder', json_data)
def api_add_json_feeder_to_queue(json_data):
if not json_data:
return {'status': 'error', 'reason': 'Malformed JSON'}, 400
# # TODO: add JSON verification
res = add_json_feeder_to_queue(json_data)
if not res:
return {'status': 'error'}, 400
return {'status': 'success'}, 200
# Launch Importer
if __name__ == '__main__':
module = FeederModuleImporter()
module.run()

80
bin/importer/ZMQImporter.py Executable file
View file

@ -0,0 +1,80 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
Importer Class
================
Import Content
"""
import os
import sys
import time
import zmq
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from importer.abstract_importer import AbstractImporter
from modules.abstract_module import AbstractModule
from lib.ConfigLoader import ConfigLoader
class ZMQImporters(AbstractImporter):
def __init__(self):
super().__init__()
self.subscribers = []
# Initialize poll set
self.poller = zmq.Poller()
def add(self, address, channel):
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
r = subscriber.connect(address)
print(r)
subscriber.setsockopt_string(zmq.SUBSCRIBE, channel)
self.subscribers.append(subscriber)
self.poller.register(subscriber, zmq.POLLIN)
def importer(self, timeout=None): # -> FOR loop required
"""
:param timeout: The timeout (in milliseconds) to wait for an event.
If unspecified (or specified None), will wait forever for an event.
:returns: messages generator
"""
for event in self.poller.poll(timeout=timeout):
socket, event_mask = event
# DEBUG
print(socket, event_mask)
yield socket.recv()
class ZMQModuleImporter(AbstractModule):
def __init__(self):
super().__init__()
config_loader = ConfigLoader()
address = config_loader.get_config_str('ZMQ_Global', 'address')
channel = config_loader.get_config_str('ZMQ_Global', 'channel')
self.zmq_importer = ZMQImporters()
# TODO register all Importers
self.zmq_importer.add(address, channel)
def get_message(self):
for message in self.zmq_importer.importer():
# remove channel from message
yield message.split(b' ', 1)[1]
def compute(self, messages):
for message in messages:
message = message.decode()
print(message.split(' ', 1)[0])
self.send_message_to_queue(message)
if __name__ == '__main__':
module = ZMQModuleImporter()
module.run()

View file

@ -0,0 +1,42 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
Importer Class
================
Import Content
"""
import os
import sys
from abc import ABC, abstractmethod
# sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
# from ConfigLoader import ConfigLoader
class AbstractImporter(ABC):
def __init__(self):
"""
Init Module
importer_name: str; set the importer name if different from the instance ClassName
"""
# Module name if provided else instance className
self.name = self._name()
@abstractmethod
def importer(self, *args, **kwargs):
"""Importer function"""
pass
def _name(self):
"""
Returns the instance class name (ie. the Exporter Name)
"""
return self.__class__.__name__

70
bin/importer/feeders/Default.py Executable file
View file

@ -0,0 +1,70 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Default JSON Feeder Importer Module
================
Process Feeder Json (example: Twitter feeder)
"""
import os
import datetime
import uuid
class DefaultFeeder:
"""Default Feeder"""
def __init__(self, json_data):
self.json_data = json_data
self.item_id = None
self.name = None
def get_name(self):
"""
Return feeder name. first part of the item_id and display in the UI
"""
if not self.name:
return self.get_source()
return self.name
def get_source(self):
return self.json_data.get('source')
def get_json_data(self):
"""
Return the JSON data,
"""
return self.json_data
def get_json_meta(self):
return self.json_data.get('meta')
def get_uuid(self):
return self.json_data.get('source_uuid')
def get_default_encoding(self):
return self.json_data.get('default_encoding')
def get_gzip64_content(self):
"""
Return the base64 encoded gzip content,
"""
return self.json_data.get('data')
## OVERWRITE ME ##
def get_item_id(self):
"""
Return item id. define item id
"""
date = datetime.date.today().strftime("%Y/%m/%d")
item_id = os.path.join(self.get_name(), date, str(uuid.uuid4()))
self.item_id = f'{item_id}.gz'
return self.item_id
## OVERWRITE ME ##
def process_meta(self):
"""
Process JSON meta filed.
"""
# meta = self.get_json_meta()
pass

53
bin/importer/feeders/Jabber.py Executable file
View file

@ -0,0 +1,53 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Jabber Feeder Importer Module
================
Process Jabber JSON
"""
import os
import sys
import time
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from importer.feeders.Default import DefaultFeeder
from lib.objects.Usernames import Username
from lib import item_basic
class JabberFeeder(DefaultFeeder):
"""Jabber Feeder functions"""
def __init__(self, json_data):
super().__init__(json_data)
self.name = 'jabber'
# define item id
def get_item_id(self):
date = time.strptime(self.json_data['meta']['jabber:ts'], "%Y-%m-%dT%H:%M:%S.%f")
date_str = time.strftime("%Y/%m/%d", date)
item_id = str(self.json_data['meta']['jabber:id'])
item_id = os.path.join('jabber', date_str, item_id)
self.item_id = f'{item_id}.gz'
return self.item_id
def process_meta(self):
"""
Process JSON meta field.
"""
# jabber_id = str(self.json_data['meta']['jabber:id'])
# item_basic.add_map_obj_id_item_id(jabber_id, item_id, 'jabber_id') ##############################################
to = str(self.json_data['meta']['jabber:to'])
fr = str(self.json_data['meta']['jabber:from'])
date = item_basic.get_item_date(item_id)
user_to = Username(to, 'jabber')
user_fr = Username(fr, 'jabber')
user_to.add(date, self.item_id)
user_fr.add(date, self.item_id)
return None

View file

@ -0,0 +1,56 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Telegram Feeder Importer Module
================
Process Telegram JSON
"""
import os
import sys
import datetime
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from importer.feeders.Default import DefaultFeeder
from lib.objects.Usernames import Username
from lib import item_basic
class TelegramFeeder(DefaultFeeder):
def __init__(self, json_data):
super().__init__(json_data)
self.name = 'telegram'
# define item id
def get_item_id(self):
# TODO use telegram message date
date = datetime.date.today().strftime("%Y/%m/%d")
channel_id = str(self.json_data['meta']['channel_id'])
message_id = str(self.json_data['meta']['message_id'])
item_id = f'{channel_id}_{message_id}'
item_id = os.path.join('telegram', date, item_id)
self.item_id = f'{item_id}.gz'
return self.item_id
def process_meta(self):
"""
Process JSON meta field.
"""
# channel_id = str(self.json_data['meta']['channel_id'])
# message_id = str(self.json_data['meta']['message_id'])
# telegram_id = f'{channel_id}_{message_id}'
# item_basic.add_map_obj_id_item_id(telegram_id, item_id, 'telegram_id') #########################################
user = None
if self.json_data['meta'].get('user'):
user = str(self.json_data['meta']['user'])
elif self.json_data['meta'].get('channel'):
user = str(self.json_data['meta']['channel'].get('username'))
if user:
date = item_basic.get_item_date(self.item_id)
username = Username(user, 'telegram')
username.add(date, self.item_id)
return None

48
bin/importer/feeders/Twitter.py Executable file
View file

@ -0,0 +1,48 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The Twitter Feeder Importer Module
================
Process Twitter JSON
"""
import os
import sys
import datetime
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from importer.feeders.Default import DefaultFeeder
from lib.objects.Usernames import Username
from lib import item_basic
class TwitterFeeder(DefaultFeeder):
def __init__(self, json_data):
super().__init__(json_data)
self.name = 'twitter'
# define item id
def get_item_id(self):
# TODO twitter timestamp message date
date = datetime.date.today().strftime("%Y/%m/%d")
item_id = str(self.json_data['meta']['twitter:tweet_id'])
item_id = os.path.join('twitter', date, item_id)
self.item_id = f'{item_id}.gz'
return self.item_id
def process_meta(self):
'''
Process JSON meta field.
'''
# tweet_id = str(self.json_data['meta']['twitter:tweet_id'])
# item_basic.add_map_obj_id_item_id(tweet_id, item_id, 'twitter_id') ############################################
date = item_basic.get_item_date(self.item_id)
user = str(self.json_data['meta']['twitter:id'])
username = Username(user, 'twitter')
username.add(date, item_id)
return None

View file

@ -0,0 +1,58 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The JSON Receiver Module
================
Recieve Json Items (example: Twitter feeder)
"""
import os
import sys
import datetime
import uuid
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from importer.feeders.Default import DefaultFeeder
from lib.objects.Items import Item
class UrlextractFeeder(DefaultFeeder):
def __init__(self, json_data):
super().__init__(json_data)
self.name = 'urlextract'
# define item id
def get_item_id(self):
date = datetime.date.today().strftime("%Y/%m/%d")
item_id = str(self.json_data['meta']['twitter:url-extracted'])
item_id = item_id.split('//')
if len(item_id) > 1:
item_id = ''.join(item_id[1:])
else:
item_id = item_id[0]
item_id = item_id.replace('/', '_')
# limit ID length
if len(item_id) > 215:
item_id = item_id[:215]
item_id = f'{item_id}{str(uuid.uuid4())}.gz'
self.item_id = os.path.join('urlextract', date, item_id)
return self.item_id
def process_meta(self):
"""
Process JSON meta field.
"""
# ADD Other parents here
parent_id = None
if self.json_data['meta'].get('parent:twitter:tweet_id'):
parent_id = str(self.json_data['meta']['parent:twitter:tweet_id'])
if parent_id:
item = Item(self.item_id)
item.set_parent(parent_id)

View file

@ -48,6 +48,7 @@ def gen_token():
def _delete_user_token(user_id):
current_token = get_user_token(user_id)
if current_token:
r_serv_db.hdel('ail:users:tokens', current_token)
def _set_user_token(user_id, token):
@ -82,6 +83,12 @@ def get_user_passwd_hash(user_id):
def get_user_token(user_id):
return r_serv_db.hget(f'ail:users:metadata:{user_id}', 'token')
def get_token_user(token):
return r_serv_db.hget('ail:users:tokens', token)
def exists_token(token):
return r_serv_db.hexists('ail:users:tokens', token)
def exists_user(user_id):
return r_serv_db.exists(f'ail:user:metadata:{user_id}')
@ -131,7 +138,7 @@ def create_user(user_id, password=None, chg_passwd=True, role=None):
def edit_user_password(user_id, password_hash, chg_passwd=False):
if chg_passwd:
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', True)
r_serv_db.hset(f'ail:user:metadata:{user_id}', 'change_passwd', 'True')
else:
r_serv_db.hdel(f'ail:user:metadata:{user_id}', 'change_passwd')
# remove default user password file
@ -194,6 +201,9 @@ def get_all_user_upper_role(user_role):
def get_default_role():
return 'read_only'
def is_in_role(user_id, role):
return r_serv_db.sismember(f'ail:users:role:{role}', user_id)
def edit_user_role(user_id, role):
current_role = get_user_role(user_id)
if role != current_role:

View file

@ -12,7 +12,7 @@ sys.path.append(os.environ['AIL_BIN'])
from lib.ConfigLoader import ConfigLoader
config_loader = ConfigLoader()
r_db = config_loader.get_redis_conn("Kvrocks_DB")
r_db = config_loader.get_db_conn("Kvrocks_DB")
config_loader = None
BACKGROUND_UPDATES = {

View file

@ -2,11 +2,10 @@
# -*-coding:UTF-8 -*
"""
The ZMQ_Sub_Indexer Module
The Indexer Module
============================
The ZMQ_Sub_Indexer modules is fetching the list of files to be processed
and index each file with a full-text indexer (Whoosh until now).
each file with a full-text indexer (Whoosh until now).
"""
##################################

View file

@ -1,6 +1,16 @@
[ZMQModuleImporter]
publish = Redis_Import
[FeederModuleImporter]
publish = Redis_Import
####################################################
[Mixer]
subscribe = ZMQ_Global
publish = Redis_Mixer,Redis_preProcess1
# subscribe = ZMQ_Global
subscribe = Redis_Import
publish = Redis_Mixer
#publish = Redis_Mixer,Redis_preProcess1
[Sync_importer]
publish = Redis_Mixer,Redis_Tags

View file

@ -169,7 +169,8 @@ def index():
update_message = ''
if ail_updates.get_current_background_update():
background_update = True
update_message = ail_updates.get_update_background_message()
# update_message = ail_updates.get_update_background_message()
update_message = None
return render_template("index.html", default_minute = default_minute,
threshold_stucked_module=threshold_stucked_module,

View file

@ -16,6 +16,7 @@ sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import Users
from lib.objects.Items import Item
from lib import Tag
from lib import Tracker
@ -24,8 +25,7 @@ from packages import Term
from packages import Import_helper
sys.path.append(os.path.join(os.environ['AIL_BIN'], 'import'))
import importer
from importer.FeederImporter import api_add_json_feeder_to_queue
from flask import Flask, render_template, jsonify, request, Blueprint, redirect, url_for, Response, escape
@ -40,7 +40,6 @@ import Flask_config
app = Flask_config.app
baseUrl = Flask_config.baseUrl
r_cache = Flask_config.r_cache
r_serv_db = Flask_config.r_serv_db
restApi = Blueprint('restApi', __name__, template_folder='templates')
@ -57,31 +56,16 @@ def verify_token(token):
if not check_token_format(token):
return False
if r_serv_db.hexists('user:tokens', token):
return True
else:
return False
def get_user_from_token(token):
return r_serv_db.hget('user:tokens', token)
return Users.exists_token(token)
def verify_user_role(role, token):
# User without API
if role == 'user_no_api':
return False
user_id = get_user_from_token(token)
user_id = Users.get_token_user(token)
if user_id:
if is_in_role(user_id, role):
return True
else:
return False
else:
return False
def is_in_role(user_id, role):
if r_serv_db.sismember('user_role:{}'.format(role), user_id):
return True
return Users.is_in_role(user_id, role)
else:
return False
@ -366,7 +350,7 @@ def get_all_tags():
def add_tracker_term():
data = request.get_json()
user_token = get_auth_from_header()
user_id = get_user_from_token(user_token)
user_id = Users.get_token_user(user_token)
res = Tracker.api_add_tracker(data, user_id)
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]
@ -375,7 +359,7 @@ def add_tracker_term():
def delete_tracker_term():
data = request.get_json()
user_token = get_auth_from_header()
user_id = get_user_from_token(user_token)
user_id = Users.get_token_user(user_token)
res = Term.parse_tracked_term_to_delete(data, user_id)
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]
@ -384,7 +368,7 @@ def delete_tracker_term():
def get_tracker_term_item():
data = request.get_json()
user_token = get_auth_from_header()
user_id = get_user_from_token(user_token)
user_id = Users.get_token_user(user_token)
res = Term.parse_get_tracker_term_item(data, user_id)
return Response(json.dumps(res[0], indent=2, sort_keys=True), mimetype='application/json'), res[1]
@ -687,7 +671,7 @@ def import_item_uuid():
def import_json_item():
data_json = request.get_json()
res = importer.api_import_json_item(data_json)
res = api_add_json_feeder_to_queue(data_json)
return Response(json.dumps(res[0]), mimetype='application/json'), res[1]