From 998f8cc8e15f81dff5a4d006509d5c58883da629 Mon Sep 17 00:00:00 2001 From: Terrtia Date: Thu, 27 Feb 2020 13:23:40 +0100 Subject: [PATCH] fix: [ZMQ Feeder] performance: replace zmq recv NOBLOCK by Poller --- bin/Global.py | 2 +- bin/Helper.py | 27 ++++++++++++++++----------- bin/Mixer.py | 1 - 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/bin/Global.py b/bin/Global.py index 3bf36215..ff7e8e52 100755 --- a/bin/Global.py +++ b/bin/Global.py @@ -95,7 +95,7 @@ if __name__ == '__main__': #publisher.info(to_print) time_1 = time.time() processed_paste = 0 - time.sleep(1) + time.sleep(0.5) continue # remove PASTES_FOLDER from item path (crawled item + submited) diff --git a/bin/Helper.py b/bin/Helper.py index cda26ce5..31eb20e9 100755 --- a/bin/Helper.py +++ b/bin/Helper.py @@ -53,13 +53,14 @@ class PubSub(object): ## TODO: remove config, use ConfigLoader by default self.zmq_sub = True context = zmq.Context() + # Get all feeds self.subscribers = [] addresses = self.config.get(conn_name, 'address') for address in addresses.split(','): - new_sub = context.socket(zmq.SUB) - new_sub.connect(address) - new_sub.setsockopt_string(zmq.SUBSCRIBE, channel) - self.subscribers.append(new_sub) + subscriber = context.socket(zmq.SUB) + subscriber.connect(address) + subscriber.setsockopt_string(zmq.SUBSCRIBE, channel) + self.subscribers.append(subscriber) def setup_publish(self, conn_name): if self.config.has_section(conn_name): @@ -96,14 +97,18 @@ class PubSub(object): ## TODO: remove config, use ConfigLoader by default if msg.get('data', None) is not None: yield msg['data'] elif self.zmq_sub: + # Initialize poll set + poller = zmq.Poller() + for subscriber in self.subscribers: + poller.register(subscriber, zmq.POLLIN) + while True: - for sub in self.subscribers: - try: - msg = sub.recv(zmq.NOBLOCK) - yield msg.split(b" ", 1)[1] - except zmq.error.Again as e: - time.sleep(0.2) - pass + socks = dict(poller.poll()) + + for subscriber in self.subscribers: + if subscriber in socks: + message = subscriber.recv() + yield message.split(b' ', 1)[1] else: raise Exception('No subscribe function defined') diff --git a/bin/Mixer.py b/bin/Mixer.py index fd8bedc5..9f170277 100755 --- a/bin/Mixer.py +++ b/bin/Mixer.py @@ -186,7 +186,6 @@ if __name__ == '__main__': print("Empty Paste: not processed") publisher.debug("Empty Paste: {0} not processed".format(message)) else: - print("Empty Queues: Waiting...") if int(time.time() - time_1) > refresh_time: # update internal feeder