#!/usr/bin/env python2 # -*-coding:UTF-8 -* """ Queue helper module ============================ This module subscribe to a Publisher stream and put the received messages into a Redis-list waiting to be popped later by others scripts. ..note:: Module ZMQ_Something_Q and ZMQ_Something are closely bound, always put the same Subscriber name in both of them. """ import redis try: # dirty to support python3 import ConfigParser except: import configparser ConfigParser = configparser import os import zmq import time import datetime import json class PubSub(object): def __init__(self): configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') if not os.path.exists(configfile): raise Exception('Unable to find the configuration file. \ Did you set environment variables? \ Or activate the virtualenv.') self.config = ConfigParser.ConfigParser() self.config.read(configfile) self.redis_sub = False self.zmq_sub = False self.subscribers = None self.publishers = {'Redis': [], 'ZMQ': []} def setup_subscribe(self, conn_name): if self.config.has_section(conn_name): channel = self.config.get(conn_name, 'channel') else: channel = conn_name.split('_')[1] if conn_name.startswith('Redis'): self.redis_sub = True r = redis.StrictRedis( host=self.config.get('RedisPubSub', 'host'), port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) self.subscribers = r.pubsub(ignore_subscribe_messages=True) self.subscribers.psubscribe(channel) elif conn_name.startswith('ZMQ'): self.zmq_sub = True context = zmq.Context() self.subscribers = [] addresses = self.config.get(conn_name, 'address') for address in addresses.split(','): new_sub = context.socket(zmq.SUB) new_sub.connect(address) new_sub.setsockopt(zmq.SUBSCRIBE, channel) self.subscribers.append(new_sub) def setup_publish(self, conn_name): if self.config.has_section(conn_name): channel = self.config.get(conn_name, 'channel') else: channel = conn_name.split('_')[1] if conn_name.startswith('Redis'): r = redis.StrictRedis(host=self.config.get('RedisPubSub', 'host'), port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) self.publishers['Redis'].append((r, channel)) elif conn_name.startswith('ZMQ'): context = zmq.Context() p = context.socket(zmq.PUB) p.bind(self.config.get(conn_name, 'address')) self.publishers['ZMQ'].append((p, channel)) def publish(self, message): m = json.loads(message) channel_message = m.get('channel') for p, channel in self.publishers['Redis']: if channel_message is None or channel_message == channel: p.publish(channel, m['message']) for p, channel in self.publishers['ZMQ']: if channel_message is None or channel_message == channel: p.send('{} {}'.format(channel, m['message'])) def subscribe(self): if self.redis_sub: for msg in self.subscribers.listen(): if msg.get('data', None) is not None: yield msg['data'] elif self.zmq_sub: while True: for sub in self.subscribers: try: msg = sub.recv(zmq.NOBLOCK) yield msg.split(' ', 1)[1] except zmq.error.Again as e: time.sleep(0.2) pass else: raise Exception('No subscribe function defined') class Process(object): def __init__(self, conf_section): configfile = os.path.join(os.environ['AIL_BIN'], 'packages/config.cfg') if not os.path.exists(configfile): raise Exception('Unable to find the configuration file. \ Did you set environment variables? \ Or activate the virtualenv.') modulesfile = os.path.join(os.environ['AIL_BIN'], 'packages/modules.cfg') self.config = ConfigParser.ConfigParser() self.config.read(configfile) self.modules = ConfigParser.ConfigParser() self.modules.read(modulesfile) self.subscriber_name = conf_section self.pubsub = None if self.modules.has_section(conf_section): self.pubsub = PubSub() else: raise Exception('Your process has to listen to at least one feed.') self.r_temp = redis.StrictRedis( host=self.config.get('RedisPubSub', 'host'), port=self.config.get('RedisPubSub', 'port'), db=self.config.get('RedisPubSub', 'db')) self.moduleNum = os.getpid() def populate_set_in(self): # monoproc src = self.modules.get(self.subscriber_name, 'subscribe') self.pubsub.setup_subscribe(src) for msg in self.pubsub.subscribe(): in_set = self.subscriber_name + 'in' self.r_temp.sadd(in_set, msg) self.r_temp.hset('queues', self.subscriber_name, int(self.r_temp.scard(in_set))) def get_from_set(self): # multiproc in_set = self.subscriber_name + 'in' self.r_temp.hset('queues', self.subscriber_name, int(self.r_temp.scard(in_set))) message = self.r_temp.spop(in_set) timestamp = int(time.mktime(datetime.datetime.now().timetuple())) dir_name = os.environ['AIL_HOME']+self.config.get('Directories', 'pastes') if message is None: return None else: try: if ".gz" in message: path = message.split(".")[-2].split("/")[-1] #find start of path with AIL_HOME index_s = message.find(os.environ['AIL_HOME']) #Stop when .gz index_e = message.find(".gz")+3 complete_path = message[index_s:index_e] else: path = "?" value = str(timestamp) + ", " + path self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path) self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) return message except: path = "?" value = str(timestamp) + ", " + path self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", "?") self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) return message def populate_set_out(self, msg, channel=None): # multiproc msg = {'message': msg} if channel is not None: msg.update({'channel': channel}) self.r_temp.sadd(self.subscriber_name + 'out', json.dumps(msg)) def publish(self): # monoproc if not self.modules.has_option(self.subscriber_name, 'publish'): return False dest = self.modules.get(self.subscriber_name, 'publish') # We can have multiple publisher for name in dest.split(','): self.pubsub.setup_publish(name) while True: message = self.r_temp.spop(self.subscriber_name + 'out') if message is None: time.sleep(1) continue self.pubsub.publish(message)