move Redis_Data_Merging to Paste

This commit is contained in:
Raphaël Vinot 2014-08-21 12:22:07 +02:00
parent 50cfac857e
commit 63b29176c1
8 changed files with 35 additions and 73 deletions

View file

@ -27,7 +27,6 @@ Requirements
*Need the ZMQ_PubSub_Line_Q Module running to be able to work properly.
"""
import redis
import argparse
import time
from packages import Paste
@ -63,13 +62,6 @@ if __name__ == "__main__":
args = parser.parse_args()
# REDIS #
# FIXME move it in the Paste object
r_serv = redis.StrictRedis(
host=h.config.get("Redis_Data_Merging", "host"),
port=h.config.getint("Redis_Data_Merging", "port"),
db=h.config.getint("Redis_Data_Merging", "db"))
channel_0 = h.config.get("PubSub_Longlines", "channel_0")
channel_1 = h.config.get("PubSub_Longlines", "channel_1")
@ -94,11 +86,10 @@ if __name__ == "__main__":
lines_infos = PST.get_lines_info()
PST.save_attribute_redis(r_serv, "p_nb_lines", lines_infos[0])
PST.save_attribute_redis(r_serv, "p_max_length_line",
lines_infos[1])
PST.save_attribute_redis("p_nb_lines", lines_infos[0])
PST.save_attribute_redis("p_max_length_line", lines_infos[1])
r_serv.sadd("Pastes_Objects", PST.p_path)
PST.store.sadd("Pastes_Objects", PST.p_path)
if lines_infos[1] >= args.max:
h.pub_channel = channel_0
else:

View file

@ -5,10 +5,10 @@
The ZMQ_Sub_Attribute Module
============================
This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module.
This module is consuming the Redis-list created by the ZMQ_PubSub_Line_Q Module
It perform a sorting on the line's length and publish/forward them to differents
channels:
It perform a sorting on the line's length and publish/forward them to
differents channels:
*Channel 1 if max length(line) < max
*Channel 2 if max length(line) > max
@ -26,7 +26,6 @@ Requirements
*Need the ZMQ_PubSub_Line_Q Module running to be able to work properly.
"""
import redis
import time
from packages import Paste
from pubsublogger import publisher
@ -45,12 +44,6 @@ if __name__ == "__main__":
# Subscriber
h.zmq_sub(config_section)
# REDIS #
r_serv = redis.StrictRedis(
host=h.config.get("Redis_Data_Merging", "host"),
port=h.config.getint("Redis_Data_Merging", "port"),
db=h.config.getint("Redis_Data_Merging", "db"))
# FUNCTIONS #
publisher.info("""ZMQ Attribute is Running""")
@ -72,12 +65,12 @@ if __name__ == "__main__":
encoding = PST._get_p_encoding()
language = PST._get_p_language()
PST.save_attribute_redis(r_serv, "p_encoding", encoding)
PST.save_attribute_redis(r_serv, "p_language", language)
PST.save_attribute_redis("p_encoding", encoding)
PST.save_attribute_redis("p_language", language)
r_serv.sadd("Pastes_Objects", PST.p_path)
PST.store.sadd("Pastes_Objects", PST.p_path)
PST.save_all_attributes_redis(r_serv)
PST.save_all_attributes_redis()
except IOError:
print "CRC Checksum Failed on :", PST.p_path
publisher.error('{0};{1};{2};{3};{4}'.format(

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python2
# -*-coding:UTF-8 -*
import redis
import pprint
import time
from packages import Paste
@ -21,12 +20,6 @@ if __name__ == "__main__":
# Subscriber
h.zmq_sub(config_section)
# REDIS #
r_serv1 = redis.StrictRedis(
host=h.config.get("Redis_Data_Merging", "host"),
port=h.config.getint("Redis_Data_Merging", "port"),
db=h.config.getint("Redis_Data_Merging", "db"))
# FUNCTIONS #
publisher.info("Creditcard script subscribed to channel creditcard_categ")
@ -57,7 +50,7 @@ if __name__ == "__main__":
creditcard_set.add(x)
PST.__setattr__(channel, creditcard_set)
PST.save_attribute_redis(r_serv1, channel, creditcard_set)
PST.save_attribute_redis(channel, creditcard_set)
pprint.pprint(creditcard_set)
to_print = 'CreditCard;{};{};{};'.format(

View file

@ -33,12 +33,6 @@ if __name__ == "__main__":
# Subscriber
h.zmq_sub(config_section)
# REDIS #
r_serv_merge = redis.StrictRedis(
host=h.config.get("Redis_Data_Merging", "host"),
port=h.config.getint("Redis_Data_Merging", "port"),
db=h.config.getint("Redis_Data_Merging", "db"))
# REDIS #
# DB OBJECT & HASHS ( DISK )
# FIXME increase flexibility
@ -168,7 +162,7 @@ if __name__ == "__main__":
PST.p_source, PST.p_date, PST.p_name)
if dupl != []:
PST.__setattr__("p_duplicate", dupl)
PST.save_attribute_redis(r_serv_merge, "p_duplicate", dupl)
PST.save_attribute_redis("p_duplicate", dupl)
publisher.info('{}Detected {}'.format(to_print, len(dupl)))
y = time.time()

View file

@ -24,11 +24,6 @@ if __name__ == "__main__":
h.zmq_sub(config_section)
# REDIS #
r_serv1 = redis.StrictRedis(
host=h.config.get("Redis_Data_Merging", "host"),
port=h.config.getint("Redis_Data_Merging", "port"),
db=h.config.getint("Redis_Data_Merging", "db"))
r_serv2 = redis.StrictRedis(
host=h.config.get("Redis_Cache", "host"),
port=h.config.getint("Redis_Cache", "port"),
@ -58,8 +53,7 @@ if __name__ == "__main__":
if MX_values[0] >= 1:
PST.__setattr__(channel, MX_values)
PST.save_attribute_redis(r_serv1, channel,
(MX_values[0],
PST.save_attribute_redis(channel, (MX_values[0],
list(MX_values[1])))
pprint.pprint(MX_values)

View file

@ -21,7 +21,6 @@ Requirements
*Need the ZMQ_Sub_Onion_Q Module running to be able to work properly.
"""
import redis
import pprint
import time
from packages import Paste
@ -42,12 +41,6 @@ if __name__ == "__main__":
# Subscriber
h.zmq_sub(config_section)
# REDIS #
r_serv1 = redis.StrictRedis(
host=h.config.get("Redis_Data_Merging", "host"),
port=h.config.getint("Redis_Data_Merging", "port"),
db=h.config.getint("Redis_Data_Merging", "db"))
# FUNCTIONS #
publisher.info("Script subscribed to channel onion_categ")
@ -78,7 +71,7 @@ if __name__ == "__main__":
# Saving the list of extracted onion domains.
PST.__setattr__(channel, domains_list)
PST.save_attribute_redis(r_serv1, channel, domains_list)
PST.save_attribute_redis(channel, domains_list)
pprint.pprint(domains_list)
print PST.p_path
to_print = 'Onion;{};{};{};'.format(PST.p_source, PST.p_date,

View file

@ -34,11 +34,6 @@ if __name__ == "__main__":
h.zmq_sub(config_section)
# REDIS #
r_serv1 = redis.StrictRedis(
host=h.config.get("Redis_Data_Merging", "host"),
port=h.config.getint("Redis_Data_Merging", "port"),
db=h.config.getint("Redis_Data_Merging", "db"))
r_serv2 = redis.StrictRedis(
host=h.config.get("Redis_Cache", "host"),
port=h.config.getint("Redis_Cache", "port"),
@ -108,8 +103,7 @@ if __name__ == "__main__":
domains_list)
if A_values[0] >= 1:
PST.__setattr__(channel, A_values)
PST.save_attribute_redis(r_serv1, channel,
(A_values[0],
PST.save_attribute_redis(channel, (A_values[0],
list(A_values[1])))
pprint.pprint(A_values)

View file

@ -44,7 +44,8 @@ class Paste(object):
This class representing a Paste as an object.
When created, the object will have by default some "main attributes"
such as the size or the date of the paste already calculated, whereas other
attributes are not set and need to be "asked to be calculated" by their methods.
attributes are not set and need to be "asked to be calculated" by their
methods.
It was design like this because some attributes take time to be calculated
such as the langage or the duplicate...
@ -56,16 +57,25 @@ class Paste(object):
def __init__(self, p_path):
configfile = './packages/config.cfg'
configfile = os.path.join(os.environ('AIL_BIN'), 'packages/config.cfg')
if not os.path.exists(configfile):
raise Exception('Unable to find the configuration file. \
Did you set environment variables? \
Or activate the virtualenv.')
cfg = ConfigParser.ConfigParser()
cfg.read(configfile)
self.cache = redis.StrictRedis(
host=cfg.get("Redis_Queues", "host"),
port=cfg.getint("Redis_Queues", "port"),
db=cfg.getint("Redis_Queues", "db"))
self.store = redis.StrictRedis(
host=cfg.get("Redis_Data_Merging", "host"),
port=cfg.getint("Redis_Data_Merging", "port"),
db=cfg.getint("Redis_Data_Merging", "db"))
self.p_path = p_path
self.p_name = self.p_path.split('/')[-1]
self.p_name = os.path.basename(self.p_path)
self.p_size = round(os.path.getsize(self.p_path)/1024.0, 2)
self.p_mime = magic.from_buffer(self.get_p_content(), mime=True)
@ -260,7 +270,7 @@ class Paste(object):
else:
return False, var
def save_all_attributes_redis(self, r_serv, key=None):
def save_all_attributes_redis(self, key=None):
"""
Saving all the attributes in a "Redis-like" Database (Redis, LevelDB)
@ -277,7 +287,7 @@ class Paste(object):
"""
# LevelDB Compatibility
p = r_serv.pipeline(False)
p = self.store.pipeline(False)
p.hset(self.p_path, "p_name", self.p_name)
p.hset(self.p_path, "p_size", self.p_size)
p.hset(self.p_path, "p_mime", self.p_mime)
@ -296,14 +306,14 @@ class Paste(object):
pass
p.execute()
def save_attribute_redis(self, r_serv, attr_name, value):
def save_attribute_redis(self, attr_name, value):
"""
Save an attribute as a field
"""
if type(value) == set:
r_serv.hset(self.p_path, attr_name, json.dumps(list(value)))
self.store.hset(self.p_path, attr_name, json.dumps(list(value)))
else:
r_serv.hset(self.p_path, attr_name, json.dumps(value))
self.store.hset(self.p_path, attr_name, json.dumps(value))
def _get_from_redis(self, r_serv):
return r_serv.hgetall(self.p_hash)