fix incoming encoding file issue

This commit is contained in:
Terrtia 2018-04-13 09:17:56 +02:00
parent 347588fdec
commit 3395b16873
2 changed files with 16 additions and 12 deletions

View file

@ -76,5 +76,6 @@ if __name__ == '__main__':
with open(filename, 'wb') as f: with open(filename, 'wb') as f:
f.write(base64.standard_b64decode(gzip64encoded)) f.write(base64.standard_b64decode(gzip64encoded))
print(filename)
p.populate_set_out(filename.encode('utf8')) p.populate_set_out(filename.encode('utf8'))
processed_paste+=1 processed_paste+=1

View file

@ -57,7 +57,8 @@ class PubSub(object):
for address in addresses.split(','): for address in addresses.split(','):
new_sub = context.socket(zmq.SUB) new_sub = context.socket(zmq.SUB)
new_sub.connect(address) new_sub.connect(address)
new_sub.setsockopt_string(zmq.SUBSCRIBE, channel) # bytes64 encode bytes to ascii only bytes
new_sub.setsockopt(zmq.SUBSCRIBE, channel.encode('ascii'))
self.subscribers.append(new_sub) self.subscribers.append(new_sub)
def setup_publish(self, conn_name): def setup_publish(self, conn_name):
@ -77,14 +78,16 @@ class PubSub(object):
self.publishers['ZMQ'].append((p, channel)) self.publishers['ZMQ'].append((p, channel))
def publish(self, message): def publish(self, message):
m = json.loads(message.decode('utf8')) m = json.loads(message.decode('ascii'))
channel_message = m.get('channel') channel_message = m.get('channel')
for p, channel in self.publishers['Redis']: for p, channel in self.publishers['Redis']:
if channel_message is None or channel_message == channel: if channel_message is None or channel_message == channel:
p.publish(channel, m['message']) p.publish(channel, ( m['message']).encode('ascii') )
for p, channel in self.publishers['ZMQ']: for p, channel in self.publishers['ZMQ']:
if channel_message is None or channel_message == channel: if channel_message is None or channel_message == channel:
p.send('{} {}'.format(channel, m['message'])) mess = ( m['message'] ).encode('ascii')
p.send(b' '.join( [channel, mess] ) )
def subscribe(self): def subscribe(self):
if self.redis_sub: if self.redis_sub:
@ -158,7 +161,7 @@ class Process(object):
if b'.gz' in message: if b'.gz' in message:
path = message.split(b".")[-2].split(b"/")[-1] path = message.split(b".")[-2].split(b"/")[-1]
#find start of path with AIL_HOME #find start of path with AIL_HOME
index_s = (message.decode('utf8')).find(os.environ['AIL_HOME']) index_s = (message.decode('ascii')).find(os.environ['AIL_HOME'])
#Stop when .gz #Stop when .gz
index_e = message.find(b".gz")+3 index_e = message.find(b".gz")+3
if(index_s == -1): if(index_s == -1):
@ -167,9 +170,10 @@ class Process(object):
complete_path = message[index_s:index_e] complete_path = message[index_s:index_e]
else: else:
path = "?" path = "-"
complete_path = "?"
value = str(timestamp) + ", " + path.decode('utf8') value = str(timestamp) + ", " + path.decode('ascii')
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value) self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum), value)
self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path) self.r_temp.set("MODULE_"+self.subscriber_name + "_" + str(self.moduleNum) + "_PATH", complete_path)
self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum)) self.r_temp.sadd("MODULE_TYPE_"+self.subscriber_name, str(self.moduleNum))
@ -186,15 +190,14 @@ class Process(object):
def populate_set_out(self, msg, channel=None): def populate_set_out(self, msg, channel=None):
# multiproc # multiproc
msg = msg.decode('utf8') msg = msg.decode('ascii')
msg = {'message': msg} msg = {'message': msg}
if channel is not None: if channel is not None:
msg.update({'channel': channel}) msg.update({'channel': channel})
# TODO use bytes here ? # bytes64 encode bytes to ascii only bytes
#j = (json.dumps(msg)).encode('utf8') j = (json.dumps(msg)).encode('ascii')
j = json.dumps(msg) self.r_temp.sadd(self.subscriber_name + 'out', j)
self.r_temp.sadd(self.subscriber_name + 'out', json.dumps(msg))
def publish(self): def publish(self):
# monoproc # monoproc