chg: [importers] add Dir/File Importer

This commit is contained in:
Terrtia 2023-05-22 15:31:48 +02:00
parent d55f065a26
commit af719d1d94
No known key found for this signature in database
GPG key ID: 1E1B1F50D84613D0
5 changed files with 323 additions and 107 deletions

View file

@ -11,7 +11,7 @@ For the moment, there are three different ways to feed AIL with data:
2. You can setup [pystemon](https://github.com/cvandeplas/pystemon) and use the custom feeder provided by AIL (see below). 2. You can setup [pystemon](https://github.com/cvandeplas/pystemon) and use the custom feeder provided by AIL (see below).
3. You can feed your own data using the [./bin/import_dir.py](./bin/import_dir.py) script. 3. You can feed your own data using the [./bin/file_dir_importer.py](./bin/import_dir.py) script.
### Feeding AIL with pystemon ### Feeding AIL with pystemon

97
bin/importer/FileImporter.py Executable file
View file

@ -0,0 +1,97 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
Importer Class
================
Import Content
"""
import logging.config
import os
import sys
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from importer.abstract_importer import AbstractImporter
# from modules.abstract_module import AbstractModule
from lib import ail_logger
from lib.ail_queues import AILQueue
from lib import ail_files # TODO RENAME ME
logging.config.dictConfig(ail_logger.get_config(name='modules'))
# TODO Clean queue one object destruct
class FileImporter(AbstractImporter):
def __init__(self, feeder='file_import'):
super().__init__()
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.feeder_name = feeder # TODO sanityze feeder name
# Setup the I/O queues
self.queue = AILQueue('FileImporter', 'manual')
def importer(self, path):
if os.path.isfile(path):
with open(path, 'rb') as f:
content = f.read()
mimetype = ail_files.get_mimetype(content)
if ail_files.is_text(mimetype):
item_id = ail_files.create_item_id(self.feeder_name, path)
content = ail_files.create_gzipped_b64(content)
if content:
message = f'dir_import {item_id} {content}'
self.logger.info(message)
self.queue.send_message(message)
elif mimetype == 'application/gzip':
item_id = ail_files.create_item_id(self.feeder_name, path)
content = ail_files.create_b64(content)
if content:
message = f'dir_import {item_id} {content}'
self.logger.info(message)
self.queue.send_message(message)
class DirImporter(AbstractImporter):
def __init__(self):
super().__init__()
self.logger = logging.getLogger(f'{self.__class__.__name__}')
self.file_importer = FileImporter()
def importer(self, dir_path):
if not os.path.isdir(dir_path):
message = f'Error, {dir_path} is not a directory'
self.logger.warning(message)
raise Exception(message)
for dirname, _, filenames in os.walk(dir_path):
for filename in filenames:
path = os.path.join(dirname, filename)
self.file_importer.importer(path)
# if __name__ == '__main__':
# import argparse
# # TODO multiple files/dirs ???
# parser = argparse.ArgumentParser(description='Directory or file importer')
# parser.add_argument('-d', '--directory', type=str, help='Root directory to import')
# parser.add_argument('-f', '--file', type=str, help='File to import')
# args = parser.parse_args()
#
# if not args.directory and not args.file:
# parser.print_help()
# sys.exit(0)
#
# if args.directory:
# dir_path = args.directory
# dir_importer = DirImporter()
# dir_importer.importer(dir_path)
#
# if args.file:
# file_path = args.file
# file_importer = FileImporter()
# file_importer.importer(file_path)

195
bin/lib/ail_files.py Executable file
View file

@ -0,0 +1,195 @@
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
import base64
import datetime
import gzip
import logging.config
import magic
import os
import sys
from werkzeug.utils import secure_filename
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from lib import ail_logger
from lib.ail_core import generate_uuid
# from lib import ConfigLoader
from packages import Date
logging.config.dictConfig(ail_logger.get_config(name='modules'))
logger = logging.getLogger()
# config_loader = ConfigLoader.ConfigLoader()
# r_serv = config_loader.get_db_conn("Kvrocks_Stats") # TODO CHANGE DB
# r_cache = config_loader.get_redis_conn("Redis_Log_submit")
#
# # Text max size
# TEXT_MAX_SIZE = ConfigLoader.ConfigLoader().get_config_int("SubmitPaste", "TEXT_MAX_SIZE")
# # File max size
# FILE_MAX_SIZE = ConfigLoader.ConfigLoader().get_config_int("SubmitPaste", "FILE_MAX_SIZE")
# # Allowed file type
# ALLOWED_EXTENSIONS = ConfigLoader.ConfigLoader().get_config_str("SubmitPaste", "FILE_ALLOWED_EXTENSIONS").split(',')
# config_loader = None
#
# # TODO generate UUID
#
# # TODO Source ????
#
# # TODO RENAME ME
# class Submit:
# def __init__(self, submit_uuid):
# self.uuid = submit_uuid
#
# def exists(self):
# return r_serv.exists(f'submit:{self.uuid}')
#
# def is_item(self):
# return r_serv.hexists(f'submit:{self.uuid}', 'content')
#
# def is_file(self):
# return r_serv.hexists(f'submit:{self.uuid}', 'filename')
#
# def get_filename(self):
# return r_serv.hget(f'submit:{self.uuid}', 'filename')
#
# def get_content(self):
# return r_serv.hget(f'submit:{self.uuid}', 'content')
#
# def get_password(self):
# r_serv.hget(f'submit:{self.uuid}', 'password')
#
# def get_tags(self):
# return r_serv.smembers(f'submit:tags:{self.uuid}')
#
# def get_error(self):
# return r_cache.hget(f'submit:{self.uuid}:', 'error')
#
# def get_stats(self):
# stats = {'ended': r_cache.hget(f'submit:{self.uuid}', 'ended'), # boolean
# 'objs': r_cache.hget(f'submit:{self.uuid}', 'objs'), # objs IDs
# 'nb_files': r_cache.hget(f'submit:{self.uuid}', 'nb_files'),
# 'nb_done': r_cache.hget(f'submit:{self.uuid}', 'nb_done'),
# 'submitted': r_cache.hget(f'submit:{self.uuid}', 'submitted'),
# 'error': self.get_error()}
# return stats
#
#
# def get_meta(self):
# meta = {'uuid': self.uuid}
# return meta
#
# def is_compressed(self):
# pass
#
#
# def abort(self, message):
# self.set_error(message)
# r_cache.hset(f'submit:{self.uuid}', 'ended', 'True')
# self.delete()
#
# def set_error(self, message):
#
# r_serv.hset(f'submit:{self.uuid}', 'error', )
#
# # source ???
# def create(self, content='', filename='', tags=[], password=None):
#
#
#
#
# r_serv.sadd(f'submits:all')
#
#
# def delete(self):
# r_serv.srem(f'submits:all', self.uuid)
# r_cache.delete(f'submit:{self.uuid}')
# r_serv.delete(f'submit:tags:{self.uuid}')
# r_serv.delete(f'submit:{self.uuid}')
#
#
# def create_submit(tags=[]):
# submit_uuid = generate_uuid()
# submit = Submit(submit_uuid)
#
# def api_create_submit():
# pass
#########################################################################################
#########################################################################################
#########################################################################################
ARCHIVE_MIME_TYPE = {
'application/zip',
# application/bzip2
'application/x-bzip2',
'application/java-archive',
'application/x-tar',
'application/gzip',
# application/x-gzip
'application/x-lzma',
'application/x-xz',
# application/x-xz-compressed-tar
'application/x-lz',
'application/x-7z-compressed',
'application/x-rar',
# application/x-rar-compressed
'application/x-iso9660-image',
'application/vnd.ms-cab-compressed',
# application/x-lzma
# application/x-compress
# application/x-lzip
# application/x-lz4
# application/zstd
}
def is_archive(mimetype):
return mimetype in ARCHIVE_MIME_TYPE
def is_text(mimetype):
return mimetype.split('/')[0] == 'text'
def get_mimetype(b_content):
return magic.from_buffer(b_content, mime=True)
def create_item_id(feeder_name, path):
names = path.split('/')
try:
date = datetime.datetime(int(names[-4]), int(names[-3]), int(names[-2])).strftime("%Y%m%d")
basename = names[-1]
except (IndexError, ValueError):
date = Date.get_today_date_str()
basename = path # TODO check max depth
date = f'{date[0:4]}/{date[4:6]}/{date[6:8]}'
basename = secure_filename(basename)
if len(basename) < 1:
basename = generate_uuid()
if len(basename) > 215:
basename = basename[-215:] + str(generate_uuid())
if not basename.endswith('.gz'):
basename = basename.replace('.', '_')
basename = f'{basename}.gz'
else:
nb = basename.count('.') - 1
if nb > 0:
basename = basename.replace('.', '_', nb)
item_id = os.path.join(feeder_name, date, basename)
# TODO check if already exists
return item_id
def create_b64(b_content):
return base64.standard_b64encode(b_content).decode()
def create_gzipped_b64(b_content):
try:
gzipencoded = gzip.compress(b_content)
gzip64encoded = create_b64(gzipencoded)
return gzip64encoded
except Exception as e:
logger.warning(e)
return ''

View file

@ -7,6 +7,9 @@ publish = Importers
[Importer_Json] [Importer_Json]
publish = Importers,Tags publish = Importers,Tags
[FileImporter]
publish = Importers
[PystemonModuleImporter] [PystemonModuleImporter]
publish = Importers publish = Importers

View file

@ -1,119 +1,40 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""
DIR/File Importer Helper
================
Import Content
"""
import zmq
import base64
from io import StringIO
import datetime
import gzip
import argparse import argparse
import binascii
import os import os
import time, datetime import sys
import re
''' sys.path.append(os.environ['AIL_BIN'])
' ##################################
' Import content/pastes into redis. # Import Project packages
' If content is not compressed yet, compress it (only text). ##################################
' from importer import FileImporter
' /!\ WARNING /!\
Content to be imported can be placed in a directory tree of the form
root/
|
+-- Year/
|
+-- Month/
|
+-- Day/
|
+-- Content
e.g.:
~/to_import/2017/08/22/paste1.gz
or this directory tree will be created with the current date
e.g.:
~/to_import/paste1.gz
'
'''
def is_gzip_file(magic_nuber):
return binascii.hexlify(magic_nuber) == b'1f8b'
def is_hierachy_valid(path):
var = path.split('/')
try:
newDate = datetime.datetime(int(var[-4]), int(var[-3]), int(var[-2]))
correctDate = True
except ValueError:
correctDate = False
except IndexError:
correctDate = False
except:
correctDate = False
return correctDate
def sanitize_str(str_var, invalid_char_regex):
res = re.sub(invalid_char_regex, "-", str_var)
return res.replace(' ', '_')
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Take files from a directory and push them into a 0MQ feed.') parser = argparse.ArgumentParser(description='Directory or file importer')
parser.add_argument('-d', '--directory', type=str, required=True, help='Root directory to import') parser.add_argument('-d', '--directory', type=str, help='Root directory to import')
parser.add_argument('-p', '--port', type=int, default=5556, help='Zero MQ port') parser.add_argument('-f', '--file', type=str, help='File to import')
parser.add_argument('-c', '--channel', type=str, default='102', help='Zero MQ channel')
parser.add_argument('-n', '--name', type=str, default='import_dir', help='Name of the feeder')
parser.add_argument('-s', '--seconds', type=float, default=0.2, help='Second between pastes')
parser.add_argument('--hierarchy', type=int, default=1, help='Number of parent directory forming the name')
args = parser.parse_args() args = parser.parse_args()
context = zmq.Context() if not args.directory and not args.file:
socket = context.socket(zmq.PUB) parser.print_help()
socket.bind("tcp://*:{}".format(args.port)) sys.exit(0)
time.sleep(1) #Important, avoid loosing the 1 message
invalid_char = r'[\\/*?&%=:"<>|#\\\']' if args.directory:
invalid_char_dir = r'[\\*?&%=:"<>|#\\\']' dir_path = args.directory
dir_importer = FileImporter.DirImporter()
dir_importer.importer(dir_path)
for dirname, dirnames, filenames in os.walk(args.directory): if args.file:
for filename in filenames: file_path = args.file
complete_path = os.path.join(dirname, filename) file_importer = FileImporter.FileImporter()
file_importer.importer(file_path)
with open(complete_path, 'rb') as f:
messagedata = f.read()
#verify that the data is gzipEncoded. if not compress it
if not is_gzip_file(messagedata[0:2]):
messagedata = gzip.compress(messagedata)
complete_path += '.gz'
if complete_path[-4:] != '.gz':
#if paste do not have a 'date hierarchy', create it
if not is_hierachy_valid(complete_path):
now = datetime.datetime.now()
paste_name = complete_path.split('/')[-1]
paste_name = sanitize_str(paste_name, invalid_char)
directory = complete_path.split('/')[-2]
directory = sanitize_str(directory, invalid_char_dir)
wanted_path = os.path.join(directory, now.strftime("%Y"), now.strftime("%m"), now.strftime("%d"), paste_name)
wanted_path = os.path.relpath(wanted_path)
else:
#take wanted path of the file
wanted_path = os.path.relpath(complete_path)
wanted_path = wanted_path.split('/')
wanted_path = '/'.join(wanted_path[-(4+args.hierarchy):])
wanted_path = sanitize_str(wanted_path, invalid_char_dir)
# sanitize feeder_name
feeder_name = os.path.relpath(sanitize_str(args.name, invalid_char))
path_to_send = 'import_dir/' + feeder_name + '>>' + wanted_path
s = b' '.join( [ args.channel.encode(), path_to_send.encode(), base64.b64encode(messagedata) ] )
socket.send(s)
print('import_dir/' + feeder_name+'>>'+wanted_path)
time.sleep(args.seconds)
else:
print('{} : incorrect type'.format(complete_path))