ail-framework/bin/core/Sync_importer.py

88 lines
2.3 KiB
Python
Raw Normal View History

2021-10-29 16:48:12 +00:00
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The SYNC Module
================================
This module .
"""
##################################
# Import External packages
##################################
import json
import os
import sys
import time
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
from core import ail_2_ail
from modules.abstract_module import AbstractModule
# from lib.ConfigLoader import ConfigLoader
2021-10-29 16:48:12 +00:00
#### CONFIG ####
# config_loader = ConfigLoader()
#
# config_loader = None
#### ------ ####
2021-10-29 16:48:12 +00:00
class Sync_importer(AbstractModule):
"""
Sync_importer module for AIL framework
2021-10-29 16:48:12 +00:00
"""
def __init__(self):
super(Sync_importer, self).__init__()
# Waiting time in seconds between to message processed
2021-10-29 16:48:12 +00:00
self.pending_seconds = 10
# self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict()
# self.last_refresh = time.time()
2021-10-29 16:48:12 +00:00
# Send module state to logs
self.redis_logger.info(f'Module {self.module_name} Launched')
def run(self):
while self.proceed:
### REFRESH DICT
# if self.last_refresh < ail_2_ail.get_last_updated_ail_instance():
# self.dict_ail_sync_filters = ail_2_ail.get_all_sync_queue_dict()
# self.last_refresh = time.time()
ail_stream = ail_2_ail.get_sync_importer_ail_stream()
if ail_stream:
ail_stream = json.loads(ail_stream)
self.compute(ail_stream)
else:
self.computeNone()
# Wait before next process
self.redis_logger.debug(f"{self.module_name}, waiting for new message, Idling {self.pending_seconds}s")
time.sleep(self.pending_seconds)
def compute(self, ail_stream):
# # TODO: SANITYZE AIL STREAM
# # TODO: CHECK FILTER
# import Object
b64_gzip_content = ail_stream['payload']['raw']
# # TODO: create default id
item_id = ail_stream['meta']['ail:id']
2021-10-29 16:48:12 +00:00
message = f'sync {item_id} {b64_gzip_content}'
2021-12-01 14:37:28 +00:00
print(item_id)
self.add_message_to_queue(message, 'Importers')
2021-10-29 16:48:12 +00:00
if __name__ == '__main__':
module = Sync_importer()
module.run()